From fa5eb301dc83f15a30bc40344caca26183147f59 Mon Sep 17 00:00:00 2001 From: Roey Chen Date: Sun, 18 May 2014 11:44:00 +0300 Subject: [PATCH] MLNX Agent: Process port_update notifications in the main agent loop This patch changes the way mlnx agent process port_update notifications. It does the same for the mlnx agent as was done for the ovs-agent in I219c6bdf63b0b5e945b655677f9e28fa591f03cd. Processing a port_update notification directly in the RPC call may cause competition with the main RPC loop. To prevent this problem, the actual process of ports updates is done in the main RPC loop, whereas the RPC call merely adds the updated port MAC address to a set of updated ports. port_update notifications received within a single main loop iteration will be coalesced and processed only once. Closes-Bug: 1279655 Change-Id: I63dda60cb3cf171e5e9111a1ecf95e45e1d86362 Signed-off-by: Roey Chen --- .../mlnx/agent/eswitch_neutron_agent.py | 129 ++++++++---------- .../unit/mlnx/test_mlnx_neutron_agent.py | 115 +++++++++++++--- 2 files changed, 152 insertions(+), 92 deletions(-) diff --git a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py index e3e0e4fee..f0e79915e 100644 --- a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py +++ b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py @@ -168,45 +168,10 @@ class MlnxEswitchRpcCallbacks(n_rpc.RpcCallback, self.eswitch.remove_network(network_id) def port_update(self, context, **kwargs): - LOG.debug(_("port_update received")) port = kwargs.get('port') - net_type = kwargs.get('network_type') - segmentation_id = kwargs.get('segmentation_id') - if not segmentation_id: - # compatibility with pre-Havana RPC vlan_id encoding - segmentation_id = kwargs.get('vlan_id') - physical_network = kwargs.get('physical_network') - net_id = port['network_id'] - if self.eswitch.vnic_port_exists(port['mac_address']): - if 'security_groups' in port: - self.sg_agent.refresh_firewall() - try: - if port['admin_state_up']: - self.eswitch.port_up(net_id, - net_type, - physical_network, - segmentation_id, - port['id'], - port['mac_address']) - # update plugin about port status - self.agent.plugin_rpc.update_device_up(self.context, - port['mac_address'], - self.agent.agent_id, - cfg.CONF.host) - else: - self.eswitch.port_down(net_id, - physical_network, - port['mac_address']) - # update plugin about port status - self.agent.plugin_rpc.update_device_down( - self.context, - port['mac_address'], - self.agent.agent_id, - cfg.CONF.host) - except n_rpc.MessagingTimeout: - LOG.error(_("RPC timeout while updating port %s"), port['id']) - else: - LOG.debug(_("No port %s defined on agent."), port['id']) + self.agent.add_port_update(port['mac_address']) + LOG.debug("port_update message processed for port with mac %s", + port['mac_address']) class MlnxEswitchPluginApi(agent_rpc.PluginApi, @@ -229,6 +194,8 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): 'configurations': configurations, 'agent_type': q_constants.AGENT_TYPE_MLNX, 'start_flag': True} + # Stores port update notifications for processing in main rpc loop + self.updated_ports = set() self._setup_rpc() self.init_firewall() @@ -272,24 +239,27 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): self._report_state) heartbeat.start(interval=report_interval) - def update_ports(self, registered_ports): - ports = self.eswitch.get_vnics_mac() - if ports == registered_ports: - return - added = ports - registered_ports - removed = registered_ports - ports - return {'current': ports, - 'added': added, - 'removed': removed} + def add_port_update(self, port): + self.updated_ports.add(port) + + def scan_ports(self, registered_ports, updated_ports_copy=None): + cur_ports = self.eswitch.get_vnics_mac() + port_info = {'current': cur_ports} + # Shouldn't process updates for not existing ports + port_info['updated'] = updated_ports_copy & cur_ports + port_info['added'] = cur_ports - registered_ports + port_info['removed'] = registered_ports - cur_ports + return port_info def process_network_ports(self, port_info): resync_a = False resync_b = False - if port_info.get('added'): - LOG.debug(_("Ports added!")) - resync_a = self.treat_devices_added(port_info['added']) - if port_info.get('removed'): - LOG.debug(_("Ports removed!")) + device_added_updated = port_info['added'] | port_info['updated'] + + if device_added_updated: + resync_a = self.treat_devices_added_or_updated( + device_added_updated) + if port_info['removed']: resync_b = self.treat_devices_removed(port_info['removed']) # If one of the above opertaions fails => resync with plugin return (resync_a | resync_b) @@ -311,7 +281,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): else: LOG.debug(_("No port %s defined on agent."), port_id) - def treat_devices_added(self, devices): + def treat_devices_added_or_updated(self, devices): try: devs_details_list = self.plugin_rpc.get_devices_details_list( self.context, @@ -326,11 +296,11 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): for dev_details in devs_details_list: device = dev_details['device'] - LOG.info(_("Adding port with mac %s"), device) + LOG.info(_("Adding or updating port with mac %s"), device) if 'port_id' in dev_details: LOG.info(_("Port %s updated"), device) - LOG.debug(_("Device details %s"), str(dev_details)) + LOG.debug("Device details %s", str(dev_details)) self.treat_vif_port(dev_details['port_id'], dev_details['device'], dev_details['network_id'], @@ -339,12 +309,16 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): dev_details['segmentation_id'], dev_details['admin_state_up']) if dev_details.get('admin_state_up'): - self.plugin_rpc.update_device_up(self.context, - device, - self.agent_id) + LOG.debug("Setting status for %s to UP", device) + self.plugin_rpc.update_device_up( + self.context, device, self.agent_id) + else: + LOG.debug("Setting status for %s to DOWN", device) + self.plugin_rpc.update_device_down( + self.context, device, self.agent_id) else: - LOG.debug(_("Device with mac_address %s not defined " - "on Neutron Plugin"), device) + LOG.debug("Device with mac_address %s not defined " + "on Neutron Plugin", device) return False def treat_devices_removed(self, devices): @@ -369,27 +343,37 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): self.eswitch.port_release(device) return resync + def _port_info_has_changes(self, port_info): + return (port_info['added'] or + port_info['removed'] or + port_info['updated']) + def daemon_loop(self): sync = True ports = set() + updated_ports_copy = set() LOG.info(_("eSwitch Agent Started!")) while True: + start = time.time() + if sync: + LOG.info(_("Agent out of sync with plugin!")) + ports.clear() + sync = False + try: - start = time.time() - if sync: - LOG.info(_("Agent out of sync with plugin!")) - ports.clear() - sync = False - - port_info = self.update_ports(ports) - # notify plugin about port deltas - if port_info: - LOG.debug(_("Agent loop process devices!")) - # If treat devices fails - must resync with plugin + updated_ports_copy = self.updated_ports + self.updated_ports = set() + port_info = self.scan_ports(ports, updated_ports_copy) + LOG.debug("Agent loop process devices!") + # If treat devices fails - must resync with plugin + ports = port_info['current'] + if self._port_info_has_changes(port_info): + LOG.debug("Starting to process devices in:%s", port_info) + # sync with upper/lower layers about port deltas sync = self.process_network_ports(port_info) - ports = port_info['current'] + except exceptions.RequestTimeout: LOG.exception(_("Request timeout in agent event loop " "eSwitchD is not responding - exiting...")) @@ -397,6 +381,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): except Exception: LOG.exception(_("Error in agent event loop")) sync = True + self.updated_ports |= updated_ports_copy # sleep till end of polling interval elapsed = (time.time() - start) if (elapsed < self._polling_interval): diff --git a/neutron/tests/unit/mlnx/test_mlnx_neutron_agent.py b/neutron/tests/unit/mlnx/test_mlnx_neutron_agent.py index f45bece67..8d67e0436 100644 --- a/neutron/tests/unit/mlnx/test_mlnx_neutron_agent.py +++ b/neutron/tests/unit/mlnx/test_mlnx_neutron_agent.py @@ -45,6 +45,23 @@ class TestEswichManager(base.BaseTestCase): self.manager.get_port_id_by_mac('no-such-mac') +class TestMlnxEswitchRpcCallbacks(base.BaseTestCase): + + def setUp(self): + super(TestMlnxEswitchRpcCallbacks, self).setUp() + agent = mock.Mock() + self.rpc_callbacks = eswitch_neutron_agent.MlnxEswitchRpcCallbacks( + 'context', + agent + ) + + def test_port_update(self): + port = {'mac_address': '10:20:30:40:50:60'} + add_port_update = self.rpc_callbacks.agent.add_port_update + self.rpc_callbacks.port_update('context', port=port) + add_port_update.assert_called_once_with(port['mac_address']) + + class TestEswitchAgent(base.BaseTestCase): def setUp(self): @@ -82,9 +99,9 @@ class TestEswitchAgent(base.BaseTestCase): mock.patch('neutron.plugins.mlnx.agent.eswitch_neutron_agent.' 'EswitchManager.get_vnics_mac', return_value=[])): - self.assertTrue(self.agent.treat_devices_added([{}])) + self.assertTrue(self.agent.treat_devices_added_or_updated([{}])) - def _mock_treat_devices_added(self, details, func_name): + def _mock_treat_devices_added_updated(self, details, func_name): """Mock treat devices added. :param details: the details to return for the device @@ -101,14 +118,14 @@ class TestEswitchAgent(base.BaseTestCase): mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), mock.patch.object(self.agent, func_name) ) as (vnics_fn, get_dev_fn, upd_dev_up, func): - self.assertFalse(self.agent.treat_devices_added([{}])) + self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) return (func.called, upd_dev_up.called) def test_treat_devices_added_updates_known_port(self): details = mock.MagicMock() details.__contains__.side_effect = lambda x: True - func, dev_up = self._mock_treat_devices_added(details, - 'treat_vif_port') + func, dev_up = self._mock_treat_devices_added_updated(details, + 'treat_vif_port') self.assertTrue(func) self.assertTrue(dev_up) @@ -120,8 +137,8 @@ class TestEswitchAgent(base.BaseTestCase): 'physical_network': 'default', 'segmentation_id': 2, 'admin_state_up': False} - func, dev_up = self._mock_treat_devices_added(details, - 'treat_vif_port') + func, dev_up = self._mock_treat_devices_added_updated(details, + 'treat_vif_port') self.assertTrue(func) self.assertFalse(dev_up) @@ -139,17 +156,75 @@ class TestEswitchAgent(base.BaseTestCase): self.assertFalse(self.agent.treat_devices_removed([{}])) self.assertTrue(port_release.called) + def _test_process_network_ports(self, port_info): + with contextlib.nested( + mock.patch.object(self.agent, 'treat_devices_added_or_updated', + return_value=False), + mock.patch.object(self.agent, 'treat_devices_removed', + return_value=False) + ) as (device_added_updated, device_removed): + self.assertFalse(self.agent.process_network_ports(port_info)) + device_added_updated.assert_called_once_with( + port_info['added'] | port_info['updated']) + device_removed.assert_called_once_with(port_info['removed']) + def test_process_network_ports(self): - current_ports = set(['01:02:03:04:05:06']) - added_ports = set(['10:20:30:40:50:60']) - removed_ports = set(['11:22:33:44:55:66']) - reply = {'current': current_ports, - 'removed': removed_ports, - 'added': added_ports} - with mock.patch.object(self.agent, 'treat_devices_added', - return_value=False) as device_added: - with mock.patch.object(self.agent, 'treat_devices_removed', - return_value=False) as device_removed: - self.assertFalse(self.agent.process_network_ports(reply)) - device_added.assert_called_once_with(added_ports) - device_removed.assert_called_once_with(removed_ports) + self._test_process_network_ports( + {'current': set(['10:20:30:40:50:60']), + 'updated': set(), + 'added': set(['11:21:31:41:51:61']), + 'removed': set(['13:23:33:43:53:63'])}) + + def test_process_network_ports_with_updated_ports(self): + self._test_process_network_ports( + {'current': set(['10:20:30:40:50:60']), + 'updated': set(['12:22:32:42:52:62']), + 'added': set(['11:21:31:41:51:61']), + 'removed': set(['13:23:33:43:53:63'])}) + + def test_add_port_update(self): + mac_addr = '10:20:30:40:50:60' + self.agent.add_port_update(mac_addr) + self.assertEqual(set([mac_addr]), self.agent.updated_ports) + + def _mock_scan_ports(self, vif_port_set, registered_ports, updated_ports): + with mock.patch.object(self.agent.eswitch, 'get_vnics_mac', + return_value=vif_port_set): + return self.agent.scan_ports(registered_ports, updated_ports) + + def test_scan_ports_return_current_for_unchanged_ports(self): + vif_port_set = set([1, 2]) + registered_ports = set([1, 2]) + actual = self._mock_scan_ports(vif_port_set, + registered_ports, set()) + expected = dict(current=vif_port_set, added=set(), + removed=set(), updated=set()) + self.assertEqual(expected, actual) + + def test_scan_ports_return_port_changes(self): + vif_port_set = set([1, 3]) + registered_ports = set([1, 2]) + actual = self._mock_scan_ports(vif_port_set, + registered_ports, set()) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set()) + self.assertEqual(expected, actual) + + def test_scan_ports_with_updated_ports(self): + vif_port_set = set([1, 3, 4]) + registered_ports = set([1, 2, 4]) + actual = self._mock_scan_ports(vif_port_set, + registered_ports, set([4])) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([4])) + self.assertEqual(expected, actual) + + def test_scan_ports_with_unknown_updated_ports(self): + vif_port_set = set([1, 3, 4]) + registered_ports = set([1, 2, 4]) + actual = self._mock_scan_ports(vif_port_set, + registered_ports, + updated_ports=set([4, 5])) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([4])) + self.assertEqual(expected, actual) -- 2.45.2