else:
self.refresh_firewall(devices)
- def security_groups_provider_updated(self):
+ def security_groups_provider_updated(self, devices_to_update):
LOG.info(_LI("Provider rule updated"))
if self.defer_refresh_firewall:
- # NOTE(salv-orlando): A 'global refresh' might not be
- # necessary if the subnet for which the provider rules
- # were updated is known
- self.global_refresh_firewall = True
+ if devices_to_update is None:
+ self.global_refresh_firewall = True
+ else:
+ self.devices_to_refilter |= set(devices_to_update)
else:
- self.refresh_firewall()
+ self.refresh_firewall(devices_to_update)
def remove_devices_filter(self, device_ids):
if not device_ids:
cctxt.cast(context, 'security_groups_member_updated',
security_groups=security_groups)
- def security_groups_provider_updated(self, context):
+ def security_groups_provider_updated(self, context,
+ devices_to_update=None):
"""Notify provider updated security groups."""
- cctxt = self.client.prepare(version=self.SG_RPC_VERSION,
+ cctxt = self.client.prepare(version=1.3,
topic=self._get_security_group_topic(),
fanout=True)
- cctxt.cast(context, 'security_groups_provider_updated')
+ cctxt.cast(context, 'security_groups_provider_updated',
+ devices_to_update=devices_to_update)
class SecurityGroupAgentRpcCallbackMixin(object):
def security_groups_provider_updated(self, context, **kwargs):
"""Callback for security group provider update."""
LOG.debug("Provider rule updated")
+ devices_to_update = kwargs.get('devices_to_update')
if not self.sg_agent:
return self._security_groups_agent_not_set()
- self.sg_agent.security_groups_provider_updated()
+ self.sg_agent.security_groups_provider_updated(devices_to_update)
occurs and the plugin agent fetches the update provider
rule in the other RPC call (security_group_rules_for_devices).
"""
- security_groups_provider_updated = False
+ sg_provider_updated_networks = set()
sec_groups = set()
for port in ports:
if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
- security_groups_provider_updated = True
+ sg_provider_updated_networks.add(
+ port['network_id'])
# For IPv6, provider rule need to be updated in case router
# interface is created or updated after VM port is created.
elif port['device_owner'] == q_const.DEVICE_OWNER_ROUTER_INTF:
if any(netaddr.IPAddress(fixed_ip['ip_address']).version == 6
for fixed_ip in port['fixed_ips']):
- security_groups_provider_updated = True
+ sg_provider_updated_networks.add(
+ port['network_id'])
else:
sec_groups |= set(port.get(ext_sg.SECURITYGROUPS))
- if security_groups_provider_updated:
- self.notifier.security_groups_provider_updated(context)
+ if sg_provider_updated_networks:
+ ports_query = context.session.query(models_v2.Port.id).filter(
+ models_v2.Port.network_id.in_(
+ sg_provider_updated_networks)).all()
+ ports_to_update = [p.id for p in ports_query]
+ self.notifier.security_groups_provider_updated(
+ context, ports_to_update)
if sec_groups:
self.notifier.security_groups_member_updated(
context, list(sec_groups))
# Set RPC API version to 1.0 by default.
# history
# 1.1 Support Security Group RPC
- target = oslo_messaging.Target(version='1.1')
+ # 1.3 Added param devices_to_update to security_groups_provider_updated
+ target = oslo_messaging.Target(version='1.3')
def __init__(self, context, agent, sg_agent):
super(LinuxBridgeRpcCallbacks, self).__init__()
# 1.0 Initial version
# 1.1 Support Security Group RPC
# 1.2 Support DVR (Distributed Virtual Router) RPC
- target = oslo_messaging.Target(version='1.2')
+ # 1.3 Added param devices_to_update to security_groups_provider_updated
+ target = oslo_messaging.Target(version='1.3')
def __init__(self, integ_br, tun_br, local_ip,
bridge_mappings, polling_interval, tunnel_types=None,
def test_security_groups_provider_updated(self):
self.agent.refresh_firewall = mock.Mock()
- self.agent.security_groups_provider_updated()
+ self.agent.security_groups_provider_updated(None)
self.agent.refresh_firewall.assert_has_calls(
- [mock.call.refresh_firewall()])
+ [mock.call.refresh_firewall(None)])
def test_refresh_firewall(self):
self.agent.prepare_devices_filter(['fake_port_id'])
def test_security_groups_provider_updated_enhanced_rpc(self):
self.agent.refresh_firewall = mock.Mock()
- self.agent.security_groups_provider_updated()
+ self.agent.security_groups_provider_updated(None)
self.agent.refresh_firewall.assert_has_calls(
- [mock.call.refresh_firewall()])
+ [mock.call.refresh_firewall(None)])
def test_refresh_firewall_enhanced_rpc(self):
self.agent.prepare_devices_filter(['fake_port_id'])
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
def test_security_groups_provider_updated(self):
- self.agent.security_groups_provider_updated()
+ self.agent.security_groups_provider_updated(None)
self.assertTrue(self.agent.global_refresh_firewall)
+ def test_security_groups_provider_updated_devices_specified(self):
+ self.agent.security_groups_provider_updated(
+ ['fake_device_1', 'fake_device_2'])
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.assertIn('fake_device_1', self.agent.devices_to_refilter)
+ self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
def test_setup_port_filters_new_ports_only(self):
self.agent.prepare_devices_filter = mock.Mock()
self.agent.refresh_firewall = mock.Mock()
def test_security_groups_provider_updated(self):
self.notifier.security_groups_provider_updated(None)
self.mock_cast.assert_has_calls(
- [mock.call(None, 'security_groups_provider_updated')])
+ [mock.call(None, 'security_groups_provider_updated',
+ devices_to_update=None)])
def test_security_groups_rule_updated(self):
self.notifier.security_groups_rule_updated(
def test_security_groups_provider_updated(self):
self.rpc.security_groups_provider_updated(None)
self.rpc.sg_agent.assert_has_calls(
- [mock.call.security_groups_provider_updated()])
+ [mock.call.security_groups_provider_updated(None)])
m_upd.assert_called_once_with(ctx, used_sg)
self.assertFalse(p_upd.called)
+ def _check_security_groups_provider_updated_args(self, p_upd_mock, net_id):
+ query_params = "network_id=%s" % net_id
+ network_ports = self._list('ports', query_params=query_params)
+ network_ports_ids = [port['id'] for port in network_ports['ports']]
+ self.assertTrue(p_upd_mock.called)
+ p_upd_args = p_upd_mock.call_args
+ ports_ids = p_upd_args[0][1]
+ self.assertEqual(sorted(network_ports_ids), sorted(ports_ids))
+
def test_create_ports_bulk_with_sec_grp_member_provider_update(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
ports = self.deserialize(self.fmt, res)
used_sg = ports['ports'][0]['security_groups']
m_upd.assert_called_once_with(ctx, used_sg)
- p_upd.assert_called_once_with(ctx)
-
+ self._check_security_groups_provider_updated_args(p_upd, net_id)
m_upd.reset_mock()
p_upd.reset_mock()
data[0]['device_owner'] = constants.DEVICE_OWNER_DHCP
self._create_bulk_from_list(self.fmt, 'port',
data, context=ctx)
self.assertFalse(m_upd.called)
- p_upd.assert_called_once_with(ctx)
+ self._check_security_groups_provider_updated_args(p_upd, net_id)
def test_create_ports_bulk_with_sec_grp_provider_update_ipv6(self):
ctx = context.get_admin_context()
self._create_bulk_from_list(self.fmt, 'port',
data, context=ctx)
self.assertFalse(m_upd.called)
- p_upd.assert_called_once_with(ctx)
+ self._check_security_groups_provider_updated_args(
+ p_upd, net_id)
def test_delete_port_no_notify_in_disassociate_floatingips(self):
ctx = context.get_admin_context()