]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
MLNX Agent: Process port_update notifications in the main agent loop
authorRoey Chen <roeyc@mellanox.com>
Sun, 18 May 2014 08:44:00 +0000 (11:44 +0300)
committerroeyc <roeyc@mellanox.com>
Sat, 9 Aug 2014 08:43:57 +0000 (11:43 +0300)
This patch changes the way mlnx agent process port_update notifications.
It does the same for the mlnx agent as was done for the ovs-agent in
I219c6bdf63b0b5e945b655677f9e28fa591f03cd.
Processing a port_update notification directly in the RPC
call may cause competition with the main RPC loop.
To prevent this problem, the actual process of ports updates is done
in the main RPC loop, whereas the RPC call merely adds the updated
port MAC address to a set of updated ports.
port_update notifications received within a single main loop iteration
will be coalesced and processed only once.

Closes-Bug: 1279655
Change-Id: I63dda60cb3cf171e5e9111a1ecf95e45e1d86362
Signed-off-by: Roey Chen <roeyc@mellanox.com>
neutron/plugins/mlnx/agent/eswitch_neutron_agent.py
neutron/tests/unit/mlnx/test_mlnx_neutron_agent.py

index e3e0e4feea6c82d4a027d991e6e23c91dcb84de0..f0e79915ef80a10b62f37664dae9381ede489007 100644 (file)
@@ -168,45 +168,10 @@ class MlnxEswitchRpcCallbacks(n_rpc.RpcCallback,
             self.eswitch.remove_network(network_id)
 
     def port_update(self, context, **kwargs):
-        LOG.debug(_("port_update received"))
         port = kwargs.get('port')
-        net_type = kwargs.get('network_type')
-        segmentation_id = kwargs.get('segmentation_id')
-        if not segmentation_id:
-            # compatibility with pre-Havana RPC vlan_id encoding
-            segmentation_id = kwargs.get('vlan_id')
-        physical_network = kwargs.get('physical_network')
-        net_id = port['network_id']
-        if self.eswitch.vnic_port_exists(port['mac_address']):
-            if 'security_groups' in port:
-                self.sg_agent.refresh_firewall()
-            try:
-                if port['admin_state_up']:
-                    self.eswitch.port_up(net_id,
-                                         net_type,
-                                         physical_network,
-                                         segmentation_id,
-                                         port['id'],
-                                         port['mac_address'])
-                    # update plugin about port status
-                    self.agent.plugin_rpc.update_device_up(self.context,
-                                                           port['mac_address'],
-                                                           self.agent.agent_id,
-                                                           cfg.CONF.host)
-                else:
-                    self.eswitch.port_down(net_id,
-                                           physical_network,
-                                           port['mac_address'])
-                    # update plugin about port status
-                    self.agent.plugin_rpc.update_device_down(
-                        self.context,
-                        port['mac_address'],
-                        self.agent.agent_id,
-                        cfg.CONF.host)
-            except n_rpc.MessagingTimeout:
-                LOG.error(_("RPC timeout while updating port %s"), port['id'])
-        else:
-            LOG.debug(_("No port %s defined on agent."), port['id'])
+        self.agent.add_port_update(port['mac_address'])
+        LOG.debug("port_update message processed for port with mac %s",
+                  port['mac_address'])
 
 
 class MlnxEswitchPluginApi(agent_rpc.PluginApi,
@@ -229,6 +194,8 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
             'configurations': configurations,
             'agent_type': q_constants.AGENT_TYPE_MLNX,
             'start_flag': True}
+        # Stores port update notifications for processing in main rpc loop
+        self.updated_ports = set()
         self._setup_rpc()
         self.init_firewall()
 
@@ -272,24 +239,27 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
                 self._report_state)
             heartbeat.start(interval=report_interval)
 
-    def update_ports(self, registered_ports):
-        ports = self.eswitch.get_vnics_mac()
-        if ports == registered_ports:
-            return
-        added = ports - registered_ports
-        removed = registered_ports - ports
-        return {'current': ports,
-                'added': added,
-                'removed': removed}
+    def add_port_update(self, port):
+        self.updated_ports.add(port)
+
+    def scan_ports(self, registered_ports, updated_ports_copy=None):
+        cur_ports = self.eswitch.get_vnics_mac()
+        port_info = {'current': cur_ports}
+        # Shouldn't process updates for not existing ports
+        port_info['updated'] = updated_ports_copy & cur_ports
+        port_info['added'] = cur_ports - registered_ports
+        port_info['removed'] = registered_ports - cur_ports
+        return port_info
 
     def process_network_ports(self, port_info):
         resync_a = False
         resync_b = False
-        if port_info.get('added'):
-            LOG.debug(_("Ports added!"))
-            resync_a = self.treat_devices_added(port_info['added'])
-        if port_info.get('removed'):
-            LOG.debug(_("Ports removed!"))
+        device_added_updated = port_info['added'] | port_info['updated']
+
+        if device_added_updated:
+            resync_a = self.treat_devices_added_or_updated(
+                device_added_updated)
+        if port_info['removed']:
             resync_b = self.treat_devices_removed(port_info['removed'])
         # If one of the above opertaions fails => resync with plugin
         return (resync_a | resync_b)
@@ -311,7 +281,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
         else:
             LOG.debug(_("No port %s defined on agent."), port_id)
 
-    def treat_devices_added(self, devices):
+    def treat_devices_added_or_updated(self, devices):
         try:
             devs_details_list = self.plugin_rpc.get_devices_details_list(
                 self.context,
@@ -326,11 +296,11 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
 
         for dev_details in devs_details_list:
             device = dev_details['device']
-            LOG.info(_("Adding port with mac %s"), device)
+            LOG.info(_("Adding or updating port with mac %s"), device)
 
             if 'port_id' in dev_details:
                 LOG.info(_("Port %s updated"), device)
-                LOG.debug(_("Device details %s"), str(dev_details))
+                LOG.debug("Device details %s", str(dev_details))
                 self.treat_vif_port(dev_details['port_id'],
                                     dev_details['device'],
                                     dev_details['network_id'],
@@ -339,12 +309,16 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
                                     dev_details['segmentation_id'],
                                     dev_details['admin_state_up'])
                 if dev_details.get('admin_state_up'):
-                    self.plugin_rpc.update_device_up(self.context,
-                                                     device,
-                                                     self.agent_id)
+                    LOG.debug("Setting status for %s to UP", device)
+                    self.plugin_rpc.update_device_up(
+                        self.context, device, self.agent_id)
+                else:
+                    LOG.debug("Setting status for %s to DOWN", device)
+                    self.plugin_rpc.update_device_down(
+                        self.context, device, self.agent_id)
             else:
-                LOG.debug(_("Device with mac_address %s not defined "
-                          "on Neutron Plugin"), device)
+                LOG.debug("Device with mac_address %s not defined "
+                          "on Neutron Plugin", device)
         return False
 
     def treat_devices_removed(self, devices):
@@ -369,27 +343,37 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
             self.eswitch.port_release(device)
         return resync
 
+    def _port_info_has_changes(self, port_info):
+        return (port_info['added'] or
+                port_info['removed'] or
+                port_info['updated'])
+
     def daemon_loop(self):
         sync = True
         ports = set()
+        updated_ports_copy = set()
 
         LOG.info(_("eSwitch Agent Started!"))
 
         while True:
+            start = time.time()
+            if sync:
+                LOG.info(_("Agent out of sync with plugin!"))
+                ports.clear()
+                sync = False
+
             try:
-                start = time.time()
-                if sync:
-                    LOG.info(_("Agent out of sync with plugin!"))
-                    ports.clear()
-                    sync = False
-
-                port_info = self.update_ports(ports)
-                # notify plugin about port deltas
-                if port_info:
-                    LOG.debug(_("Agent loop process devices!"))
-                    # If treat devices fails - must resync with plugin
+                updated_ports_copy = self.updated_ports
+                self.updated_ports = set()
+                port_info = self.scan_ports(ports, updated_ports_copy)
+                LOG.debug("Agent loop process devices!")
+                # If treat devices fails - must resync with plugin
+                ports = port_info['current']
+                if self._port_info_has_changes(port_info):
+                    LOG.debug("Starting to process devices in:%s", port_info)
+                    # sync with upper/lower layers about port deltas
                     sync = self.process_network_ports(port_info)
-                    ports = port_info['current']
+
             except exceptions.RequestTimeout:
                 LOG.exception(_("Request timeout in agent event loop "
                                 "eSwitchD is not responding - exiting..."))
@@ -397,6 +381,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
             except Exception:
                 LOG.exception(_("Error in agent event loop"))
                 sync = True
+                self.updated_ports |= updated_ports_copy
             # sleep till end of polling interval
             elapsed = (time.time() - start)
             if (elapsed < self._polling_interval):
index f45bece67e5a91abc1cae5c42dc0cd1090a6efd9..8d67e043654eab420d9c39ca894a78ad24893f2a 100644 (file)
@@ -45,6 +45,23 @@ class TestEswichManager(base.BaseTestCase):
             self.manager.get_port_id_by_mac('no-such-mac')
 
 
+class TestMlnxEswitchRpcCallbacks(base.BaseTestCase):
+
+    def setUp(self):
+        super(TestMlnxEswitchRpcCallbacks, self).setUp()
+        agent = mock.Mock()
+        self.rpc_callbacks = eswitch_neutron_agent.MlnxEswitchRpcCallbacks(
+            'context',
+            agent
+        )
+
+    def test_port_update(self):
+        port = {'mac_address': '10:20:30:40:50:60'}
+        add_port_update = self.rpc_callbacks.agent.add_port_update
+        self.rpc_callbacks.port_update('context', port=port)
+        add_port_update.assert_called_once_with(port['mac_address'])
+
+
 class TestEswitchAgent(base.BaseTestCase):
 
     def setUp(self):
@@ -82,9 +99,9 @@ class TestEswitchAgent(base.BaseTestCase):
             mock.patch('neutron.plugins.mlnx.agent.eswitch_neutron_agent.'
                        'EswitchManager.get_vnics_mac',
                        return_value=[])):
-            self.assertTrue(self.agent.treat_devices_added([{}]))
+            self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
 
-    def _mock_treat_devices_added(self, details, func_name):
+    def _mock_treat_devices_added_updated(self, details, func_name):
         """Mock treat devices added.
 
         :param details: the details to return for the device
@@ -101,14 +118,14 @@ class TestEswitchAgent(base.BaseTestCase):
             mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
             mock.patch.object(self.agent, func_name)
         ) as (vnics_fn, get_dev_fn, upd_dev_up, func):
-            self.assertFalse(self.agent.treat_devices_added([{}]))
+            self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
         return (func.called, upd_dev_up.called)
 
     def test_treat_devices_added_updates_known_port(self):
         details = mock.MagicMock()
         details.__contains__.side_effect = lambda x: True
-        func, dev_up = self._mock_treat_devices_added(details,
-                                                      'treat_vif_port')
+        func, dev_up = self._mock_treat_devices_added_updated(details,
+                                                              'treat_vif_port')
         self.assertTrue(func)
         self.assertTrue(dev_up)
 
@@ -120,8 +137,8 @@ class TestEswitchAgent(base.BaseTestCase):
                    'physical_network': 'default',
                    'segmentation_id': 2,
                    'admin_state_up': False}
-        func, dev_up = self._mock_treat_devices_added(details,
-                                                      'treat_vif_port')
+        func, dev_up = self._mock_treat_devices_added_updated(details,
+                                                              'treat_vif_port')
         self.assertTrue(func)
         self.assertFalse(dev_up)
 
@@ -139,17 +156,75 @@ class TestEswitchAgent(base.BaseTestCase):
                 self.assertFalse(self.agent.treat_devices_removed([{}]))
                 self.assertTrue(port_release.called)
 
+    def _test_process_network_ports(self, port_info):
+        with contextlib.nested(
+            mock.patch.object(self.agent, 'treat_devices_added_or_updated',
+                              return_value=False),
+            mock.patch.object(self.agent, 'treat_devices_removed',
+                              return_value=False)
+        ) as (device_added_updated, device_removed):
+            self.assertFalse(self.agent.process_network_ports(port_info))
+            device_added_updated.assert_called_once_with(
+                port_info['added'] | port_info['updated'])
+            device_removed.assert_called_once_with(port_info['removed'])
+
     def test_process_network_ports(self):
-        current_ports = set(['01:02:03:04:05:06'])
-        added_ports = set(['10:20:30:40:50:60'])
-        removed_ports = set(['11:22:33:44:55:66'])
-        reply = {'current': current_ports,
-                 'removed': removed_ports,
-                 'added': added_ports}
-        with mock.patch.object(self.agent, 'treat_devices_added',
-                               return_value=False) as device_added:
-            with mock.patch.object(self.agent, 'treat_devices_removed',
-                                   return_value=False) as device_removed:
-                self.assertFalse(self.agent.process_network_ports(reply))
-                device_added.assert_called_once_with(added_ports)
-                device_removed.assert_called_once_with(removed_ports)
+        self._test_process_network_ports(
+            {'current': set(['10:20:30:40:50:60']),
+             'updated': set(),
+             'added': set(['11:21:31:41:51:61']),
+             'removed': set(['13:23:33:43:53:63'])})
+
+    def test_process_network_ports_with_updated_ports(self):
+        self._test_process_network_ports(
+            {'current': set(['10:20:30:40:50:60']),
+             'updated': set(['12:22:32:42:52:62']),
+             'added': set(['11:21:31:41:51:61']),
+             'removed': set(['13:23:33:43:53:63'])})
+
+    def test_add_port_update(self):
+        mac_addr = '10:20:30:40:50:60'
+        self.agent.add_port_update(mac_addr)
+        self.assertEqual(set([mac_addr]), self.agent.updated_ports)
+
+    def _mock_scan_ports(self, vif_port_set, registered_ports, updated_ports):
+        with mock.patch.object(self.agent.eswitch, 'get_vnics_mac',
+                               return_value=vif_port_set):
+            return self.agent.scan_ports(registered_ports, updated_ports)
+
+    def test_scan_ports_return_current_for_unchanged_ports(self):
+        vif_port_set = set([1, 2])
+        registered_ports = set([1, 2])
+        actual = self._mock_scan_ports(vif_port_set,
+                                       registered_ports, set())
+        expected = dict(current=vif_port_set, added=set(),
+                        removed=set(), updated=set())
+        self.assertEqual(expected, actual)
+
+    def test_scan_ports_return_port_changes(self):
+        vif_port_set = set([1, 3])
+        registered_ports = set([1, 2])
+        actual = self._mock_scan_ports(vif_port_set,
+                                       registered_ports, set())
+        expected = dict(current=vif_port_set, added=set([3]),
+                        removed=set([2]), updated=set())
+        self.assertEqual(expected, actual)
+
+    def test_scan_ports_with_updated_ports(self):
+        vif_port_set = set([1, 3, 4])
+        registered_ports = set([1, 2, 4])
+        actual = self._mock_scan_ports(vif_port_set,
+                                       registered_ports, set([4]))
+        expected = dict(current=vif_port_set, added=set([3]),
+                        removed=set([2]), updated=set([4]))
+        self.assertEqual(expected, actual)
+
+    def test_scan_ports_with_unknown_updated_ports(self):
+        vif_port_set = set([1, 3, 4])
+        registered_ports = set([1, 2, 4])
+        actual = self._mock_scan_ports(vif_port_set,
+                                       registered_ports,
+                                       updated_ports=set([4, 5]))
+        expected = dict(current=vif_port_set, added=set([3]),
+                        removed=set([2]), updated=set([4]))
+        self.assertEqual(expected, actual)