from neutron.common import topics
from neutron.db import agents_db
from neutron.db import agentschedulers_db
+from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
from neutron.db import external_net_db
from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
+from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import portbindings
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
agentschedulers_db.DhcpAgentSchedulerDbMixin,
nec_router.L3AgentSchedulerDbMixin,
packet_filter.PacketFilterMixin,
- portbindings_db.PortBindingMixin):
+ portbindings_db.PortBindingMixin,
+ addr_pair_db.AllowedAddressPairsMixin):
"""NECPluginV2 controls an OpenFlow Controller.
The Neutron NECPluginV2 maps L2 logical networks to L2 virtualized networks
information to and from the plugin.
"""
_supported_extension_aliases = ["agent",
+ "allowed-address-pairs",
"binding",
"dhcp_agent_scheduler",
"external-net",
self._process_portbindings_create(context, port_data, port)
self._process_port_create_security_group(
context, port, sgids)
+ port[addr_pair.ADDRESS_PAIRS] = (
+ self._process_create_allowed_address_pairs(
+ context, port,
+ port_data.get(addr_pair.ADDRESS_PAIRS)))
self.notify_security_groups_member_updated(context, port)
handler = self._get_port_handler('create', port['device_owner'])
"id=%(id)s port=%(port)s ."),
{'id': id, 'port': port})
need_port_update_notify = False
+ changed_fixed_ips = 'fixed_ips' in port['port']
with context.session.begin(subtransactions=True):
old_port = super(NECPluginV2, self).get_port(context, id)
new_port = super(NECPluginV2, self).update_port(context, id, port)
portinfo_changed = self._process_portbindings_update(
context, port['port'], new_port)
- need_port_update_notify = self.update_security_group_on_port(
+ if addr_pair.ADDRESS_PAIRS in port['port']:
+ self._delete_allowed_address_pairs(context, id)
+ self._process_create_allowed_address_pairs(
+ context, new_port,
+ port['port'][addr_pair.ADDRESS_PAIRS])
+ need_port_update_notify = True
+ elif changed_fixed_ips:
+ self._check_fixed_ips_and_address_pairs_no_overlap(
+ context, new_port)
+ need_port_update_notify |= self.update_security_group_on_port(
context, id, port, old_port, new_port)
need_port_update_notify |= self.is_security_group_member_updated(
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(NecSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
- self.notifier = manager.NeutronManager.get_plugin().notifier
+ plugin = manager.NeutronManager.get_plugin()
+ self.notifier = plugin.notifier
+ self.rpc = plugin.callback_sg
def tearDown(self):
super(NecSecurityGroupsTestCase, self).tearDown()
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
+class TestNecSGServerRpcCallBack(
+ test_sg_rpc.SGServerRpcCallBackMixinTestCase,
+ NecSecurityGroupsTestCase):
+ pass
+
+
+# TODO(amotoki): The test is blocked by 1229954
+# class TestNecSGServerRpcCallBackXML(
+# test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML,
+# NecSecurityGroupsTestCase):
+# pass
+
+
class TestNecSecurityGroups(NecSecurityGroupsTestCase,
test_sg.TestSecurityGroups,
test_sg_rpc.SGNotificationTestMixin):