]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add devices to update in RPC call security_groups_provider_updated
authorrossella <rsblendido@suse.com>
Thu, 15 Jan 2015 15:15:23 +0000 (16:15 +0100)
committerrossella <rsblendido@suse.com>
Mon, 27 Apr 2015 20:47:39 +0000 (20:47 +0000)
When a security_groups_provider_updated is received then a global
refresh of the firewall is performed. This can be avoided if the
plugins pass as parameter of the call the devices that belongs to
the network updated.

Partially-Implements: blueprint restructure-l2-agent
Change-Id: I1e78f3a5ec7e5c5bcba338a0097566422411ef7e

neutron/agent/securitygroups_rpc.py
neutron/api/rpc/handlers/securitygroups_rpc.py
neutron/db/securitygroups_rpc_base.py
neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/unit/agent/test_securitygroups_rpc.py
neutron/tests/unit/api/rpc/handlers/test_securitygroups_rpc.py
neutron/tests/unit/plugins/ml2/test_plugin.py

index 5b24dbe3af29bee1acdf21b73233590e07866d77..a7dc73123b972ab92a02e75c03d946fce5cff7d8 100644 (file)
@@ -202,15 +202,15 @@ class SecurityGroupAgentRpc(object):
             else:
                 self.refresh_firewall(devices)
 
-    def security_groups_provider_updated(self):
+    def security_groups_provider_updated(self, devices_to_update):
         LOG.info(_LI("Provider rule updated"))
         if self.defer_refresh_firewall:
-            # NOTE(salv-orlando): A 'global refresh' might not be
-            # necessary if the subnet for which the provider rules
-            # were updated is known
-            self.global_refresh_firewall = True
+            if devices_to_update is None:
+                self.global_refresh_firewall = True
+            else:
+                self.devices_to_refilter |= set(devices_to_update)
         else:
-            self.refresh_firewall()
+            self.refresh_firewall(devices_to_update)
 
     def remove_devices_filter(self, device_ids):
         if not device_ids:
index 58d9c7d3dcd2d88c62c340fdc3096c65047c2e24..24e268065cef6e8d46cec9523c75a8dc4cc69532 100644 (file)
@@ -153,12 +153,14 @@ class SecurityGroupAgentRpcApiMixin(object):
         cctxt.cast(context, 'security_groups_member_updated',
                    security_groups=security_groups)
 
-    def security_groups_provider_updated(self, context):
+    def security_groups_provider_updated(self, context,
+                                         devices_to_update=None):
         """Notify provider updated security groups."""
-        cctxt = self.client.prepare(version=self.SG_RPC_VERSION,
+        cctxt = self.client.prepare(version=1.3,
                                     topic=self._get_security_group_topic(),
                                     fanout=True)
-        cctxt.cast(context, 'security_groups_provider_updated')
+        cctxt.cast(context, 'security_groups_provider_updated',
+                   devices_to_update=devices_to_update)
 
 
 class SecurityGroupAgentRpcCallbackMixin(object):
@@ -205,6 +207,7 @@ class SecurityGroupAgentRpcCallbackMixin(object):
     def security_groups_provider_updated(self, context, **kwargs):
         """Callback for security group provider update."""
         LOG.debug("Provider rule updated")
+        devices_to_update = kwargs.get('devices_to_update')
         if not self.sg_agent:
             return self._security_groups_agent_not_set()
-        self.sg_agent.security_groups_provider_updated()
+        self.sg_agent.security_groups_provider_updated(devices_to_update)
index c47493599e165f494bd7742d40a3c8763af6e2fc..9d448a9b96da4fef1ff4ac6d5f0e6a102d3825f6 100644 (file)
@@ -147,22 +147,29 @@ class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
         occurs and the plugin agent fetches the update provider
         rule in the other RPC call (security_group_rules_for_devices).
         """
-        security_groups_provider_updated = False
+        sg_provider_updated_networks = set()
         sec_groups = set()
         for port in ports:
             if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
-                security_groups_provider_updated = True
+                sg_provider_updated_networks.add(
+                    port['network_id'])
             # For IPv6, provider rule need to be updated in case router
             # interface is created or updated after VM port is created.
             elif port['device_owner'] == q_const.DEVICE_OWNER_ROUTER_INTF:
                 if any(netaddr.IPAddress(fixed_ip['ip_address']).version == 6
                        for fixed_ip in port['fixed_ips']):
-                    security_groups_provider_updated = True
+                    sg_provider_updated_networks.add(
+                        port['network_id'])
             else:
                 sec_groups |= set(port.get(ext_sg.SECURITYGROUPS))
 
-        if security_groups_provider_updated:
-            self.notifier.security_groups_provider_updated(context)
+        if sg_provider_updated_networks:
+            ports_query = context.session.query(models_v2.Port.id).filter(
+                models_v2.Port.network_id.in_(
+                    sg_provider_updated_networks)).all()
+            ports_to_update = [p.id for p in ports_query]
+            self.notifier.security_groups_provider_updated(
+                context, ports_to_update)
         if sec_groups:
             self.notifier.security_groups_member_updated(
                 context, list(sec_groups))
index 7b1f2861500062369a0d3f6c6d279ab4a34c3d9f..24f94d035e180950d0537c4be37aacbc9823c114 100644 (file)
@@ -637,7 +637,8 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
     # Set RPC API version to 1.0 by default.
     # history
     #   1.1 Support Security Group RPC
-    target = oslo_messaging.Target(version='1.1')
+    #   1.3 Added param devices_to_update to security_groups_provider_updated
+    target = oslo_messaging.Target(version='1.3')
 
     def __init__(self, context, agent, sg_agent):
         super(LinuxBridgeRpcCallbacks, self).__init__()
index 9e510c70b369d44001153346448af66f1d0945db..8185d4d88de4c24ed405e5e7914b29ba43b2996f 100644 (file)
@@ -117,7 +117,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
     #   1.0 Initial version
     #   1.1 Support Security Group RPC
     #   1.2 Support DVR (Distributed Virtual Router) RPC
-    target = oslo_messaging.Target(version='1.2')
+    #   1.3 Added param devices_to_update to security_groups_provider_updated
+    target = oslo_messaging.Target(version='1.3')
 
     def __init__(self, integ_br, tun_br, local_ip,
                  bridge_mappings, polling_interval, tunnel_types=None,
index 8dd6c90b0a651df325cdfe099ebc760addf99715..0493540789f7bba4cdb0b6baf9f080cf9275858a 100644 (file)
@@ -1186,9 +1186,9 @@ class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
 
     def test_security_groups_provider_updated(self):
         self.agent.refresh_firewall = mock.Mock()
-        self.agent.security_groups_provider_updated()
+        self.agent.security_groups_provider_updated(None)
         self.agent.refresh_firewall.assert_has_calls(
-            [mock.call.refresh_firewall()])
+            [mock.call.refresh_firewall(None)])
 
     def test_refresh_firewall(self):
         self.agent.prepare_devices_filter(['fake_port_id'])
@@ -1304,9 +1304,9 @@ class SecurityGroupAgentEnhancedRpcTestCase(
 
     def test_security_groups_provider_updated_enhanced_rpc(self):
         self.agent.refresh_firewall = mock.Mock()
-        self.agent.security_groups_provider_updated()
+        self.agent.security_groups_provider_updated(None)
         self.agent.refresh_firewall.assert_has_calls(
-            [mock.call.refresh_firewall()])
+            [mock.call.refresh_firewall(None)])
 
     def test_refresh_firewall_enhanced_rpc(self):
         self.agent.prepare_devices_filter(['fake_port_id'])
@@ -1438,9 +1438,16 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
             self.assertIn('fake_device_2', self.agent.devices_to_refilter)
 
     def test_security_groups_provider_updated(self):
-        self.agent.security_groups_provider_updated()
+        self.agent.security_groups_provider_updated(None)
         self.assertTrue(self.agent.global_refresh_firewall)
 
+    def test_security_groups_provider_updated_devices_specified(self):
+        self.agent.security_groups_provider_updated(
+            ['fake_device_1', 'fake_device_2'])
+        self.assertFalse(self.agent.global_refresh_firewall)
+        self.assertIn('fake_device_1', self.agent.devices_to_refilter)
+        self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
     def test_setup_port_filters_new_ports_only(self):
         self.agent.prepare_devices_filter = mock.Mock()
         self.agent.refresh_firewall = mock.Mock()
@@ -1593,7 +1600,8 @@ class SecurityGroupAgentRpcApiTestCase(base.BaseTestCase):
     def test_security_groups_provider_updated(self):
         self.notifier.security_groups_provider_updated(None)
         self.mock_cast.assert_has_calls(
-            [mock.call(None, 'security_groups_provider_updated')])
+            [mock.call(None, 'security_groups_provider_updated',
+                       devices_to_update=None)])
 
     def test_security_groups_rule_updated(self):
         self.notifier.security_groups_rule_updated(
index 7c8b79f67d1c8b5bdb10ccc58db41fe0b605be5c..a3bc79cf1d24f2e03d7e9dd288c9c608838361bc 100644 (file)
@@ -61,4 +61,4 @@ class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase):
     def test_security_groups_provider_updated(self):
         self.rpc.security_groups_provider_updated(None)
         self.rpc.sg_agent.assert_has_calls(
-            [mock.call.security_groups_provider_updated()])
+            [mock.call.security_groups_provider_updated(None)])
index cc51029fdfa2ab558094e60f9d4f6797870515c8..f20a2202462e5c117b922ec5d86f877532c50161 100644 (file)
@@ -468,6 +468,15 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
             m_upd.assert_called_once_with(ctx, used_sg)
             self.assertFalse(p_upd.called)
 
+    def _check_security_groups_provider_updated_args(self, p_upd_mock, net_id):
+        query_params = "network_id=%s" % net_id
+        network_ports = self._list('ports', query_params=query_params)
+        network_ports_ids = [port['id'] for port in network_ports['ports']]
+        self.assertTrue(p_upd_mock.called)
+        p_upd_args = p_upd_mock.call_args
+        ports_ids = p_upd_args[0][1]
+        self.assertEqual(sorted(network_ports_ids), sorted(ports_ids))
+
     def test_create_ports_bulk_with_sec_grp_member_provider_update(self):
         ctx = context.get_admin_context()
         plugin = manager.NeutronManager.get_plugin()
@@ -496,15 +505,14 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
             ports = self.deserialize(self.fmt, res)
             used_sg = ports['ports'][0]['security_groups']
             m_upd.assert_called_once_with(ctx, used_sg)
-            p_upd.assert_called_once_with(ctx)
-
+            self._check_security_groups_provider_updated_args(p_upd, net_id)
             m_upd.reset_mock()
             p_upd.reset_mock()
             data[0]['device_owner'] = constants.DEVICE_OWNER_DHCP
             self._create_bulk_from_list(self.fmt, 'port',
                                         data, context=ctx)
             self.assertFalse(m_upd.called)
-            p_upd.assert_called_once_with(ctx)
+            self._check_security_groups_provider_updated_args(p_upd, net_id)
 
     def test_create_ports_bulk_with_sec_grp_provider_update_ipv6(self):
         ctx = context.get_admin_context()
@@ -532,7 +540,8 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
                 self._create_bulk_from_list(self.fmt, 'port',
                                             data, context=ctx)
                 self.assertFalse(m_upd.called)
-                p_upd.assert_called_once_with(ctx)
+                self._check_security_groups_provider_updated_args(
+                    p_upd, net_id)
 
     def test_delete_port_no_notify_in_disassociate_floatingips(self):
         ctx = context.get_admin_context()