self.eswitch.remove_network(network_id)
def port_update(self, context, **kwargs):
- LOG.debug(_("port_update received"))
port = kwargs.get('port')
- net_type = kwargs.get('network_type')
- segmentation_id = kwargs.get('segmentation_id')
- if not segmentation_id:
- # compatibility with pre-Havana RPC vlan_id encoding
- segmentation_id = kwargs.get('vlan_id')
- physical_network = kwargs.get('physical_network')
- net_id = port['network_id']
- if self.eswitch.vnic_port_exists(port['mac_address']):
- if 'security_groups' in port:
- self.sg_agent.refresh_firewall()
- try:
- if port['admin_state_up']:
- self.eswitch.port_up(net_id,
- net_type,
- physical_network,
- segmentation_id,
- port['id'],
- port['mac_address'])
- # update plugin about port status
- self.agent.plugin_rpc.update_device_up(self.context,
- port['mac_address'],
- self.agent.agent_id,
- cfg.CONF.host)
- else:
- self.eswitch.port_down(net_id,
- physical_network,
- port['mac_address'])
- # update plugin about port status
- self.agent.plugin_rpc.update_device_down(
- self.context,
- port['mac_address'],
- self.agent.agent_id,
- cfg.CONF.host)
- except n_rpc.MessagingTimeout:
- LOG.error(_("RPC timeout while updating port %s"), port['id'])
- else:
- LOG.debug(_("No port %s defined on agent."), port['id'])
+ self.agent.add_port_update(port['mac_address'])
+ LOG.debug("port_update message processed for port with mac %s",
+ port['mac_address'])
class MlnxEswitchPluginApi(agent_rpc.PluginApi,
'configurations': configurations,
'agent_type': q_constants.AGENT_TYPE_MLNX,
'start_flag': True}
+ # Stores port update notifications for processing in main rpc loop
+ self.updated_ports = set()
self._setup_rpc()
self.init_firewall()
self._report_state)
heartbeat.start(interval=report_interval)
- def update_ports(self, registered_ports):
- ports = self.eswitch.get_vnics_mac()
- if ports == registered_ports:
- return
- added = ports - registered_ports
- removed = registered_ports - ports
- return {'current': ports,
- 'added': added,
- 'removed': removed}
+ def add_port_update(self, port):
+ self.updated_ports.add(port)
+
+ def scan_ports(self, registered_ports, updated_ports_copy=None):
+ cur_ports = self.eswitch.get_vnics_mac()
+ port_info = {'current': cur_ports}
+ # Shouldn't process updates for not existing ports
+ port_info['updated'] = updated_ports_copy & cur_ports
+ port_info['added'] = cur_ports - registered_ports
+ port_info['removed'] = registered_ports - cur_ports
+ return port_info
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
- if port_info.get('added'):
- LOG.debug(_("Ports added!"))
- resync_a = self.treat_devices_added(port_info['added'])
- if port_info.get('removed'):
- LOG.debug(_("Ports removed!"))
+ device_added_updated = port_info['added'] | port_info['updated']
+
+ if device_added_updated:
+ resync_a = self.treat_devices_added_or_updated(
+ device_added_updated)
+ if port_info['removed']:
resync_b = self.treat_devices_removed(port_info['removed'])
# If one of the above opertaions fails => resync with plugin
return (resync_a | resync_b)
else:
LOG.debug(_("No port %s defined on agent."), port_id)
- def treat_devices_added(self, devices):
+ def treat_devices_added_or_updated(self, devices):
try:
devs_details_list = self.plugin_rpc.get_devices_details_list(
self.context,
for dev_details in devs_details_list:
device = dev_details['device']
- LOG.info(_("Adding port with mac %s"), device)
+ LOG.info(_("Adding or updating port with mac %s"), device)
if 'port_id' in dev_details:
LOG.info(_("Port %s updated"), device)
- LOG.debug(_("Device details %s"), str(dev_details))
+ LOG.debug("Device details %s", str(dev_details))
self.treat_vif_port(dev_details['port_id'],
dev_details['device'],
dev_details['network_id'],
dev_details['segmentation_id'],
dev_details['admin_state_up'])
if dev_details.get('admin_state_up'):
- self.plugin_rpc.update_device_up(self.context,
- device,
- self.agent_id)
+ LOG.debug("Setting status for %s to UP", device)
+ self.plugin_rpc.update_device_up(
+ self.context, device, self.agent_id)
+ else:
+ LOG.debug("Setting status for %s to DOWN", device)
+ self.plugin_rpc.update_device_down(
+ self.context, device, self.agent_id)
else:
- LOG.debug(_("Device with mac_address %s not defined "
- "on Neutron Plugin"), device)
+ LOG.debug("Device with mac_address %s not defined "
+ "on Neutron Plugin", device)
return False
def treat_devices_removed(self, devices):
self.eswitch.port_release(device)
return resync
+ def _port_info_has_changes(self, port_info):
+ return (port_info['added'] or
+ port_info['removed'] or
+ port_info['updated'])
+
def daemon_loop(self):
sync = True
ports = set()
+ updated_ports_copy = set()
LOG.info(_("eSwitch Agent Started!"))
while True:
+ start = time.time()
+ if sync:
+ LOG.info(_("Agent out of sync with plugin!"))
+ ports.clear()
+ sync = False
+
try:
- start = time.time()
- if sync:
- LOG.info(_("Agent out of sync with plugin!"))
- ports.clear()
- sync = False
-
- port_info = self.update_ports(ports)
- # notify plugin about port deltas
- if port_info:
- LOG.debug(_("Agent loop process devices!"))
- # If treat devices fails - must resync with plugin
+ updated_ports_copy = self.updated_ports
+ self.updated_ports = set()
+ port_info = self.scan_ports(ports, updated_ports_copy)
+ LOG.debug("Agent loop process devices!")
+ # If treat devices fails - must resync with plugin
+ ports = port_info['current']
+ if self._port_info_has_changes(port_info):
+ LOG.debug("Starting to process devices in:%s", port_info)
+ # sync with upper/lower layers about port deltas
sync = self.process_network_ports(port_info)
- ports = port_info['current']
+
except exceptions.RequestTimeout:
LOG.exception(_("Request timeout in agent event loop "
"eSwitchD is not responding - exiting..."))
except Exception:
LOG.exception(_("Error in agent event loop"))
sync = True
+ self.updated_ports |= updated_ports_copy
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self._polling_interval):
self.manager.get_port_id_by_mac('no-such-mac')
+class TestMlnxEswitchRpcCallbacks(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestMlnxEswitchRpcCallbacks, self).setUp()
+ agent = mock.Mock()
+ self.rpc_callbacks = eswitch_neutron_agent.MlnxEswitchRpcCallbacks(
+ 'context',
+ agent
+ )
+
+ def test_port_update(self):
+ port = {'mac_address': '10:20:30:40:50:60'}
+ add_port_update = self.rpc_callbacks.agent.add_port_update
+ self.rpc_callbacks.port_update('context', port=port)
+ add_port_update.assert_called_once_with(port['mac_address'])
+
+
class TestEswitchAgent(base.BaseTestCase):
def setUp(self):
mock.patch('neutron.plugins.mlnx.agent.eswitch_neutron_agent.'
'EswitchManager.get_vnics_mac',
return_value=[])):
- self.assertTrue(self.agent.treat_devices_added([{}]))
+ self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
- def _mock_treat_devices_added(self, details, func_name):
+ def _mock_treat_devices_added_updated(self, details, func_name):
"""Mock treat devices added.
:param details: the details to return for the device
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent, func_name)
) as (vnics_fn, get_dev_fn, upd_dev_up, func):
- self.assertFalse(self.agent.treat_devices_added([{}]))
+ self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
return (func.called, upd_dev_up.called)
def test_treat_devices_added_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
- func, dev_up = self._mock_treat_devices_added(details,
- 'treat_vif_port')
+ func, dev_up = self._mock_treat_devices_added_updated(details,
+ 'treat_vif_port')
self.assertTrue(func)
self.assertTrue(dev_up)
'physical_network': 'default',
'segmentation_id': 2,
'admin_state_up': False}
- func, dev_up = self._mock_treat_devices_added(details,
- 'treat_vif_port')
+ func, dev_up = self._mock_treat_devices_added_updated(details,
+ 'treat_vif_port')
self.assertTrue(func)
self.assertFalse(dev_up)
self.assertFalse(self.agent.treat_devices_removed([{}]))
self.assertTrue(port_release.called)
+ def _test_process_network_ports(self, port_info):
+ with contextlib.nested(
+ mock.patch.object(self.agent, 'treat_devices_added_or_updated',
+ return_value=False),
+ mock.patch.object(self.agent, 'treat_devices_removed',
+ return_value=False)
+ ) as (device_added_updated, device_removed):
+ self.assertFalse(self.agent.process_network_ports(port_info))
+ device_added_updated.assert_called_once_with(
+ port_info['added'] | port_info['updated'])
+ device_removed.assert_called_once_with(port_info['removed'])
+
def test_process_network_ports(self):
- current_ports = set(['01:02:03:04:05:06'])
- added_ports = set(['10:20:30:40:50:60'])
- removed_ports = set(['11:22:33:44:55:66'])
- reply = {'current': current_ports,
- 'removed': removed_ports,
- 'added': added_ports}
- with mock.patch.object(self.agent, 'treat_devices_added',
- return_value=False) as device_added:
- with mock.patch.object(self.agent, 'treat_devices_removed',
- return_value=False) as device_removed:
- self.assertFalse(self.agent.process_network_ports(reply))
- device_added.assert_called_once_with(added_ports)
- device_removed.assert_called_once_with(removed_ports)
+ self._test_process_network_ports(
+ {'current': set(['10:20:30:40:50:60']),
+ 'updated': set(),
+ 'added': set(['11:21:31:41:51:61']),
+ 'removed': set(['13:23:33:43:53:63'])})
+
+ def test_process_network_ports_with_updated_ports(self):
+ self._test_process_network_ports(
+ {'current': set(['10:20:30:40:50:60']),
+ 'updated': set(['12:22:32:42:52:62']),
+ 'added': set(['11:21:31:41:51:61']),
+ 'removed': set(['13:23:33:43:53:63'])})
+
+ def test_add_port_update(self):
+ mac_addr = '10:20:30:40:50:60'
+ self.agent.add_port_update(mac_addr)
+ self.assertEqual(set([mac_addr]), self.agent.updated_ports)
+
+ def _mock_scan_ports(self, vif_port_set, registered_ports, updated_ports):
+ with mock.patch.object(self.agent.eswitch, 'get_vnics_mac',
+ return_value=vif_port_set):
+ return self.agent.scan_ports(registered_ports, updated_ports)
+
+ def test_scan_ports_return_current_for_unchanged_ports(self):
+ vif_port_set = set([1, 2])
+ registered_ports = set([1, 2])
+ actual = self._mock_scan_ports(vif_port_set,
+ registered_ports, set())
+ expected = dict(current=vif_port_set, added=set(),
+ removed=set(), updated=set())
+ self.assertEqual(expected, actual)
+
+ def test_scan_ports_return_port_changes(self):
+ vif_port_set = set([1, 3])
+ registered_ports = set([1, 2])
+ actual = self._mock_scan_ports(vif_port_set,
+ registered_ports, set())
+ expected = dict(current=vif_port_set, added=set([3]),
+ removed=set([2]), updated=set())
+ self.assertEqual(expected, actual)
+
+ def test_scan_ports_with_updated_ports(self):
+ vif_port_set = set([1, 3, 4])
+ registered_ports = set([1, 2, 4])
+ actual = self._mock_scan_ports(vif_port_set,
+ registered_ports, set([4]))
+ expected = dict(current=vif_port_set, added=set([3]),
+ removed=set([2]), updated=set([4]))
+ self.assertEqual(expected, actual)
+
+ def test_scan_ports_with_unknown_updated_ports(self):
+ vif_port_set = set([1, 3, 4])
+ registered_ports = set([1, 2, 4])
+ actual = self._mock_scan_ports(vif_port_set,
+ registered_ports,
+ updated_ports=set([4, 5]))
+ expected = dict(current=vif_port_set, added=set([3]),
+ removed=set([2]), updated=set([4]))
+ self.assertEqual(expected, actual)