need_notify = True
return need_notify
- def notify_security_groups_member_updated(self, context, port):
- """Notify update event of security group members.
+ def notify_security_groups_member_updated_bulk(self, context, ports):
+ """Notify update event of security group members for ports.
The agent setups the iptables rule to allow
ingress packet from the dhcp server (as a part of provider rules),
occurs and the plugin agent fetches the update provider
rule in the other RPC call (security_group_rules_for_devices).
"""
- if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
+ security_groups_provider_updated = False
+ sec_groups = set()
+ for port in ports:
+ if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
+ security_groups_provider_updated = True
+ # For IPv6, provider rule need to be updated in case router
+ # interface is created or updated after VM port is created.
+ elif port['device_owner'] == q_const.DEVICE_OWNER_ROUTER_INTF:
+ if any(netaddr.IPAddress(fixed_ip['ip_address']).version == 6
+ for fixed_ip in port['fixed_ips']):
+ security_groups_provider_updated = True
+ else:
+ sec_groups |= set(port.get(ext_sg.SECURITYGROUPS))
+
+ if security_groups_provider_updated:
self.notifier.security_groups_provider_updated(context)
- # For IPv6, provider rule need to be updated in case router
- # interface is created or updated after VM port is created.
- elif port['device_owner'] == q_const.DEVICE_OWNER_ROUTER_INTF:
- if any(netaddr.IPAddress(fixed_ip['ip_address']).version == 6
- for fixed_ip in port['fixed_ips']):
- self.notifier.security_groups_provider_updated(context)
- else:
+ if sec_groups:
self.notifier.security_groups_member_updated(
- context, port.get(ext_sg.SECURITYGROUPS))
+ context, list(sec_groups))
+
+ def notify_security_groups_member_updated(self, context, port):
+ self.notify_security_groups_member_updated_bulk(context, [port])
def security_group_info_for_ports(self, context, ports):
sg_info = {'devices': ports,
def create_port_bulk(self, context, ports):
objects = self._create_bulk_ml2(attributes.PORT, context, ports)
- for obj in objects:
- # REVISIT(rkukura): Is there any point in calling this before
- # a binding has been successfully established?
- # TODO(banix): Use a single notification for all objects
- self.notify_security_groups_member_updated(context,
- obj['result'])
+ # REVISIT(rkukura): Is there any point in calling this before
+ # a binding has been successfully established?
+ results = [obj['result'] for obj in objects]
+ self.notify_security_groups_member_updated_bulk(context, results)
+ for obj in objects:
attrs = obj['attributes']
if attrs and attrs.get(portbindings.HOST_ID):
new_host_port = self._get_host_port_if_changed(
self._validate_behavior_on_bulk_failure(
res, 'ports', webob.exc.HTTPServerError.code)
+ def test_create_ports_bulk_with_sec_grp(self):
+ ctx = context.get_admin_context()
+ plugin = manager.NeutronManager.get_plugin()
+ with contextlib.nested(
+ self.network(),
+ mock.patch.object(plugin.notifier,
+ 'security_groups_member_updated'),
+ mock.patch.object(plugin.notifier,
+ 'security_groups_provider_updated')
+ ) as (net, m_upd, p_upd):
+
+ res = self._create_port_bulk(self.fmt, 3, net['network']['id'],
+ 'test', True, context=ctx)
+ ports = self.deserialize(self.fmt, res)
+ used_sg = ports['ports'][0]['security_groups']
+ m_upd.assert_called_once_with(ctx, used_sg)
+ self.assertFalse(p_upd.called)
+
+ def test_create_ports_bulk_with_sec_grp_member_provider_update(self):
+ ctx = context.get_admin_context()
+ plugin = manager.NeutronManager.get_plugin()
+ with contextlib.nested(
+ self.network(),
+ mock.patch.object(plugin.notifier,
+ 'security_groups_member_updated'),
+ mock.patch.object(plugin.notifier,
+ 'security_groups_provider_updated')
+ ) as (net, m_upd, p_upd):
+
+ net_id = net['network']['id']
+ data = [{
+ 'network_id': net_id,
+ 'tenant_id': self._tenant_id
+ },
+ {
+ 'network_id': net_id,
+ 'tenant_id': self._tenant_id,
+ 'device_owner': constants.DEVICE_OWNER_DHCP
+ }
+ ]
+
+ res = self._create_bulk_from_list(self.fmt, 'port',
+ data, context=ctx)
+ ports = self.deserialize(self.fmt, res)
+ used_sg = ports['ports'][0]['security_groups']
+ m_upd.assert_called_once_with(ctx, used_sg)
+ p_upd.assert_called_once_with(ctx)
+
+ m_upd.reset_mock()
+ p_upd.reset_mock()
+ data[0]['device_owner'] = constants.DEVICE_OWNER_DHCP
+ self._create_bulk_from_list(self.fmt, 'port',
+ data, context=ctx)
+ self.assertFalse(m_upd.called)
+ p_upd.assert_called_once_with(ctx)
+
+ def test_create_ports_bulk_with_sec_grp_provider_update_ipv6(self):
+ ctx = context.get_admin_context()
+ plugin = manager.NeutronManager.get_plugin()
+ fake_prefix = '2001:db8::/64'
+ fake_gateway = 'fe80::1'
+ with self.network() as net:
+ with contextlib.nested(
+ self.subnet(net, gateway_ip=fake_gateway,
+ cidr=fake_prefix, ip_version=6),
+ mock.patch.object(
+ plugin.notifier, 'security_groups_member_updated'),
+ mock.patch.object(
+ plugin.notifier, 'security_groups_provider_updated')
+ ) as (snet_v6, m_upd, p_upd):
+
+ net_id = net['network']['id']
+ data = [{
+ 'network_id': net_id,
+ 'tenant_id': self._tenant_id,
+ 'fixed_ips': [{'subnet_id': snet_v6['subnet']['id']}],
+ 'device_owner': constants.DEVICE_OWNER_ROUTER_INTF
+ }
+ ]
+ self._create_bulk_from_list(self.fmt, 'port',
+ data, context=ctx)
+ self.assertFalse(m_upd.called)
+ p_upd.assert_called_once_with(ctx)
+
def test_delete_port_no_notify_in_disassociate_floatingips(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()