int_vxlan.link.delete()
LOG.debug(_("Done deleting vxlan interface %s"), interface)
- def update_devices(self, registered_devices):
- devices = self.get_tap_devices()
- if devices == registered_devices:
- return
- added = devices - registered_devices
- removed = registered_devices - devices
- return {'current': devices,
- 'added': added,
- 'removed': removed}
-
def get_tap_devices(self):
devices = set()
for device in os.listdir(BRIDGE_FS):
self.agent.br_mgr.delete_vlan_bridge(bridge_name)
def port_update(self, context, **kwargs):
- LOG.debug(_("port_update received"))
- # Check port exists on node
- port = kwargs.get('port')
- tap_device_name = self.agent.br_mgr.get_tap_device_name(port['id'])
- devices = self.agent.br_mgr.get_tap_devices()
- if tap_device_name not in devices:
- return
-
- if 'security_groups' in port:
- self.sg_agent.refresh_firewall()
- try:
- if port['admin_state_up']:
- network_type = kwargs.get('network_type')
- if network_type:
- segmentation_id = kwargs.get('segmentation_id')
- else:
- # compatibility with pre-Havana RPC vlan_id encoding
- vlan_id = kwargs.get('vlan_id')
- (network_type,
- segmentation_id) = lconst.interpret_vlan_id(vlan_id)
- physical_network = kwargs.get('physical_network')
- # create the networking for the port
- if self.agent.br_mgr.add_interface(port['network_id'],
- network_type,
- physical_network,
- segmentation_id,
- port['id']):
- # update plugin about port status
- self.agent.plugin_rpc.update_device_up(self.context,
- tap_device_name,
- self.agent.agent_id,
- cfg.CONF.host)
- else:
- self.agent.plugin_rpc.update_device_down(
- self.context,
- tap_device_name,
- self.agent.agent_id,
- cfg.CONF.host
- )
- else:
- bridge_name = self.agent.br_mgr.get_bridge_name(
- port['network_id'])
- self.agent.br_mgr.remove_interface(bridge_name,
- tap_device_name)
- # update plugin about port status
- self.agent.plugin_rpc.update_device_down(self.context,
- tap_device_name,
- self.agent.agent_id,
- cfg.CONF.host)
- except rpc_compat.MessagingTimeout:
- LOG.error(_("RPC timeout while updating port %s"), port['id'])
+ port_id = kwargs['port']['id']
+ tap_name = self.agent.br_mgr.get_tap_device_name(port_id)
+ # Put the tap name in the updated_devices set.
+ # Do not store port details, as if they're used for processing
+ # notifications there is no guarantee the notifications are
+ # processed in the same order as the relevant API requests.
+ self.agent.updated_devices.add(tap_name)
+ LOG.debug(_("port_update RPC received for port: %s"), port_id)
def fdb_add(self, context, fdb_entries):
LOG.debug(_("fdb_add received"))
'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
'start_flag': True}
+ # stores received port_updates for processing by the main loop
+ self.updated_devices = set()
self.setup_rpc(interface_mappings.values())
self.init_firewall()
def process_network_devices(self, device_info):
resync_a = False
resync_b = False
- if 'added' in device_info:
- resync_a = self.treat_devices_added(device_info['added'])
- if 'removed' in device_info:
+
+ self.prepare_devices_filter(device_info.get('added'))
+
+ if device_info.get('updated'):
+ self.refresh_firewall()
+
+ # Updated devices are processed the same as new ones, as their
+ # admin_state_up may have changed. The set union prevents duplicating
+ # work when a device is new and updated in the same polling iteration.
+ devices_added_updated = (set(device_info.get('added'))
+ | set(device_info.get('updated')))
+ if devices_added_updated:
+ resync_a = self.treat_devices_added_updated(devices_added_updated)
+
+ if device_info.get('removed'):
resync_b = self.treat_devices_removed(device_info['removed'])
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
- def treat_devices_added(self, devices):
+ def treat_devices_added_updated(self, devices):
resync = False
- self.prepare_devices_filter(devices)
+
for device in devices:
- LOG.debug(_("Port %s added"), device)
+ LOG.debug(_("Treating added or updated device: %s"), device)
try:
details = self.plugin_rpc.get_device_details(self.context,
device,
self.br_mgr.remove_empty_bridges()
return resync
+ def scan_devices(self, registered_devices, updated_devices):
+ curr_devices = self.br_mgr.get_tap_devices()
+ device_info = {}
+ device_info['current'] = curr_devices
+ device_info['added'] = curr_devices - registered_devices
+ # we don't want to process updates for devices that don't exist
+ device_info['updated'] = updated_devices & curr_devices
+ # we need to clean up after devices are removed
+ device_info['removed'] = registered_devices - curr_devices
+ return device_info
+
+ def _device_info_has_changes(self, device_info):
+ return (device_info.get('added')
+ or device_info.get('updated')
+ or device_info.get('removed'))
+
def daemon_loop(self):
sync = True
devices = set()
devices.clear()
sync = False
device_info = {}
+ # Save updated devices dict to perform rollback in case
+ # resync would be needed, and then clear self.updated_devices.
+ # As the greenthread should not yield between these
+ # two statements, this will should be thread-safe.
+ updated_devices_copy = self.updated_devices
+ self.updated_devices = set()
try:
- device_info = self.br_mgr.update_devices(devices)
- except Exception:
- LOG.exception(_("Update devices failed"))
- sync = True
- try:
- # notify plugin about device deltas
- if device_info:
- LOG.debug(_("Agent loop has new devices!"))
+ device_info = self.scan_devices(devices, updated_devices_copy)
+ if self._device_info_has_changes(device_info):
+ LOG.debug(_("Agent loop found changes! %s"), device_info)
# If treat devices fails - indicates must resync with
# plugin
sync = self.process_network_devices(device_info)
LOG.exception(_("Error in agent loop. Devices info: %s"),
device_info)
sync = True
+ # Restore devices that were removed from this set earlier
+ # without overwriting ones that may have arrived since.
+ self.updated_devices |= updated_devices_copy
+
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self.polling_interval):
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.common import exceptions
-from neutron.common import rpc_compat
from neutron.plugins.common import constants as p_const
from neutron.plugins.linuxbridge.agent import linuxbridge_neutron_agent
from neutron.plugins.linuxbridge.common import constants as lconst
'get_interface_mac')
self.get_mac = self.get_mac_p.start()
self.get_mac.return_value = '00:00:00:00:00:01'
+ self.agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
+ 0,
+ None)
def test_treat_devices_removed_with_existed_device(self):
agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
- def test_update_devices_failed(self):
- agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
- 0,
- None)
- raise_exception = [0]
+ def test_loop_restores_updated_devices_on_exception(self):
+ agent = self.agent
+ agent.updated_devices = set(['tap1', 'tap2'])
- def info_mock(msg):
- if raise_exception[0] < 2:
- raise_exception[0] += 1
- else:
- raise RuntimeError()
- with mock.patch.object(agent.br_mgr,
- "update_devices") as update_devices:
- update_devices.side_effect = RuntimeError
- with mock.patch.object(linuxbridge_neutron_agent.LOG,
- 'info') as log:
- log.side_effect = info_mock
- with testtools.ExpectedException(RuntimeError):
+ with contextlib.nested(
+ mock.patch.object(agent, 'scan_devices'),
+ mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'),
+ mock.patch.object(agent, 'process_network_devices')
+ ) as (scan_devices, log, process_network_devices):
+ # Simulate effect of 2 port_update()s when loop is running.
+ # And break out of loop at start of 2nd iteration.
+ log.side_effect = [agent.updated_devices.add('tap3'),
+ agent.updated_devices.add('tap4'),
+ ValueError]
+ scan_devices.side_effect = RuntimeError
+
+ with testtools.ExpectedException(ValueError):
agent.daemon_loop()
- self.assertEqual(3, log.call_count)
-
- def test_process_network_devices_failed(self):
- device_info = {'current': [1, 2, 3]}
- agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
- 0,
- None)
- raise_exception = [0]
-
- def info_mock(msg):
- if raise_exception[0] < 2:
- raise_exception[0] += 1
- else:
- raise RuntimeError()
- with mock.patch.object(agent.br_mgr,
- "update_devices") as update_devices:
- update_devices.side_effect = device_info
- with contextlib.nested(
- mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'),
- mock.patch.object(agent, 'process_network_devices')
- ) as (log, process_network_devices):
- log.side_effect = info_mock
- process_network_devices.side_effect = RuntimeError
- with testtools.ExpectedException(RuntimeError):
- agent.daemon_loop()
+ # Check that the originals {tap1,tap2} have been restored
+ # and the new updates {tap3, tap4} have not been overwritten.
+ self.assertEqual(set(['tap1', 'tap2', 'tap3', 'tap4']),
+ agent.updated_devices)
self.assertEqual(3, log.call_count)
+ def mock_scan_devices(self, expected, mock_current,
+ registered_devices, updated_devices):
+ self.agent.br_mgr = mock.Mock()
+ self.agent.br_mgr.get_tap_devices.return_value = mock_current
+
+ results = self.agent.scan_devices(registered_devices, updated_devices)
+ self.assertEqual(expected, results)
+
+ def test_scan_devices_returns_empty_sets(self):
+ registered = set()
+ updated = set()
+ mock_current = set()
+ expected = {'current': set(),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set()}
+ self.mock_scan_devices(expected, mock_current, registered, updated)
+
+ def test_scan_devices_no_changes(self):
+ registered = set(['tap1', 'tap2'])
+ updated = set()
+ mock_current = set(['tap1', 'tap2'])
+ expected = {'current': set(['tap1', 'tap2']),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set()}
+ self.mock_scan_devices(expected, mock_current, registered, updated)
+
+ def test_scan_devices_new_and_removed(self):
+ registered = set(['tap1', 'tap2'])
+ updated = set()
+ mock_current = set(['tap2', 'tap3'])
+ expected = {'current': set(['tap2', 'tap3']),
+ 'updated': set(),
+ 'added': set(['tap3']),
+ 'removed': set(['tap1'])}
+ self.mock_scan_devices(expected, mock_current, registered, updated)
+
+ def test_scan_devices_new_updates(self):
+ registered = set(['tap1'])
+ updated = set(['tap2'])
+ mock_current = set(['tap1', 'tap2'])
+ expected = {'current': set(['tap1', 'tap2']),
+ 'updated': set(['tap2']),
+ 'added': set(['tap2']),
+ 'removed': set()}
+ self.mock_scan_devices(expected, mock_current, registered, updated)
+
+ def test_scan_devices_updated_missing(self):
+ registered = set(['tap1'])
+ updated = set(['tap2'])
+ mock_current = set(['tap1'])
+ expected = {'current': set(['tap1']),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set()}
+ self.mock_scan_devices(expected, mock_current, registered, updated)
+
+ def test_process_network_devices(self):
+ agent = self.agent
+ device_info = {'current': set(),
+ 'added': set(['tap3', 'tap4']),
+ 'updated': set(['tap2', 'tap3']),
+ 'removed': set(['tap1'])}
+ agent.prepare_devices_filter = mock.Mock()
+ agent.refresh_firewall = mock.Mock()
+ agent.treat_devices_added_updated = mock.Mock(return_value=False)
+ agent.treat_devices_removed = mock.Mock(return_value=False)
+
+ agent.process_network_devices(device_info)
+
+ agent.prepare_devices_filter.assert_called_with(set(['tap3', 'tap4']))
+ self.assertTrue(agent.refresh_firewall.called)
+ agent.treat_devices_added_updated.assert_called_with(set(['tap2',
+ 'tap3',
+ 'tap4']))
+ agent.treat_devices_removed.assert_called_with(set(['tap1']))
+
+ def test_treat_devices_added_updated_admin_state_up_true(self):
+ agent = self.agent
+ mock_details = {'port_id': 'port123',
+ 'network_id': 'net123',
+ 'admin_state_up': True,
+ 'network_type': 'vlan',
+ 'segmentation_id': 100,
+ 'physical_network': 'physnet1'}
+ agent.plugin_rpc = mock.Mock()
+ agent.plugin_rpc.get_device_details.return_value = mock_details
+ agent.br_mgr = mock.Mock()
+ agent.br_mgr.add_interface.return_value = True
+
+ resync_needed = agent.treat_devices_added_updated(set(['tap1']))
+
+ self.assertFalse(resync_needed)
+ agent.br_mgr.add_interface.assert_called_with('net123', 'vlan',
+ 'physnet1', 100,
+ 'port123')
+ self.assertTrue(agent.plugin_rpc.update_device_up.called)
+
+ def test_treat_devices_added_updated_admin_state_up_false(self):
+ mock_details = {'port_id': 'port123',
+ 'network_id': 'net123',
+ 'admin_state_up': False,
+ 'network_type': 'vlan',
+ 'segmentation_id': 100,
+ 'physical_network': 'physnet1'}
+ self.agent.plugin_rpc = mock.Mock()
+ self.agent.plugin_rpc.get_device_details.return_value = mock_details
+ self.agent.remove_port_binding = mock.Mock()
+
+ resync_needed = self.agent.treat_devices_added_updated(set(['tap1']))
+
+ self.assertFalse(resync_needed)
+ self.agent.remove_port_binding.assert_called_with('net123', 'port123')
+ self.assertFalse(self.agent.plugin_rpc.update_device_up.called)
+
class TestLinuxBridgeManager(base.BaseTestCase):
def setUp(self):
self.lbm.delete_vlan("eth1.1")
self.assertTrue(exec_fn.called)
- def test_update_devices(self):
- with mock.patch.object(self.lbm, "get_tap_devices") as gt_fn:
- gt_fn.return_value = set(["dev1"])
- self.assertIsNone(self.lbm.update_devices(set(["dev1"])))
-
- gt_fn.return_value = set(["dev1", "dev2"])
- self.assertEqual(self.lbm.update_devices(set(["dev2", "dev3"])),
- {"current": set(["dev1", "dev2"]),
- "added": set(["dev1"]),
- "removed": set(["dev3"])
- })
-
def _check_vxlan_support(self, expected, vxlan_module_supported,
vxlan_ucast_supported, vxlan_mcast_supported):
with contextlib.nested(
get_br_fn.assert_called_with("123")
del_fn.assert_called_with("br0")
- def test_port_update(self):
- with contextlib.nested(
- mock.patch.object(self.lb_rpc.agent.br_mgr,
- "get_tap_device_name"),
- mock.patch.object(self.lb_rpc.agent.br_mgr,
- "get_tap_devices"),
- mock.patch.object(self.lb_rpc.agent.br_mgr,
- "get_bridge_name"),
- mock.patch.object(self.lb_rpc.agent.br_mgr,
- "remove_interface"),
- mock.patch.object(self.lb_rpc.agent.br_mgr, "add_interface"),
- mock.patch.object(self.lb_rpc.agent,
- "plugin_rpc", create=True),
- mock.patch.object(self.lb_rpc.sg_agent,
- "refresh_firewall", create=True)
- ) as (get_tap_fn, get_tap_devs_fn, getbr_fn, remif_fn,
- addif_fn, rpc_obj, reffw_fn):
- get_tap_fn.return_value = "tap123"
- get_tap_devs_fn.return_value = set(["tap123", "tap124"])
- port = {"admin_state_up": True,
- "id": "1234-5678",
- "network_id": "123-123"}
- self.lb_rpc.port_update("unused_context", port=port,
- vlan_id="1", physical_network="physnet1")
- self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], p_const.TYPE_VLAN,
- "physnet1", "1", port["id"])
-
- self.lb_rpc.port_update("unused_context", port=port,
- network_type=p_const.TYPE_VLAN,
- segmentation_id="2",
- physical_network="physnet1")
- self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], p_const.TYPE_VLAN,
- "physnet1", "2", port["id"])
-
- self.lb_rpc.port_update("unused_context", port=port,
- vlan_id=lconst.FLAT_VLAN_ID,
- physical_network="physnet1")
- self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], p_const.TYPE_FLAT,
- "physnet1", None, port["id"])
-
- self.lb_rpc.port_update("unused_context", port=port,
- network_type=p_const.TYPE_FLAT,
- segmentation_id=None,
- physical_network="physnet1")
- self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], p_const.TYPE_FLAT,
- "physnet1", None, port["id"])
-
- self.lb_rpc.port_update("unused_context", port=port,
- vlan_id=lconst.LOCAL_VLAN_ID,
- physical_network=None)
- self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], p_const.TYPE_LOCAL,
- None, None, port["id"])
-
- self.lb_rpc.port_update("unused_context", port=port,
- network_type=p_const.TYPE_LOCAL,
- segmentation_id=None,
- physical_network=None)
- self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], p_const.TYPE_LOCAL,
- None, None, port["id"])
-
- addif_fn.return_value = True
- self.lb_rpc.port_update("unused_context", port=port,
- network_type=p_const.TYPE_LOCAL,
- segmentation_id=None,
- physical_network=None)
- rpc_obj.update_device_up.assert_called_with(
- self.lb_rpc.context,
- "tap123",
- self.lb_rpc.agent.agent_id,
- cfg.CONF.host
- )
-
- addif_fn.return_value = False
- self.lb_rpc.port_update("unused_context", port=port,
- network_type=p_const.TYPE_LOCAL,
- segmentation_id=None,
- physical_network=None)
- rpc_obj.update_device_down.assert_called_with(
- self.lb_rpc.context,
- "tap123",
- self.lb_rpc.agent.agent_id,
- cfg.CONF.host
- )
-
- port["admin_state_up"] = False
- port["security_groups"] = True
- getbr_fn.return_value = "br0"
- self.lb_rpc.port_update("unused_context", port=port,
- vlan_id="1", physical_network="physnet1")
- self.assertTrue(reffw_fn.called)
- remif_fn.assert_called_with("br0", "tap123")
- rpc_obj.update_device_down.assert_called_with(
- self.lb_rpc.context,
- "tap123",
- self.lb_rpc.agent.agent_id,
- cfg.CONF.host
- )
-
- def test_port_update_plugin_rpc_failed(self):
- with contextlib.nested(
- mock.patch.object(self.lb_rpc.agent.br_mgr,
- "get_tap_device_name"),
- mock.patch.object(self.lb_rpc.agent.br_mgr,
- "get_tap_devices"),
- mock.patch.object(self.lb_rpc.agent.br_mgr,
- "get_bridge_name"),
- mock.patch.object(self.lb_rpc.agent.br_mgr,
- "remove_interface"),
- mock.patch.object(self.lb_rpc.agent.br_mgr, "add_interface"),
- mock.patch.object(self.lb_rpc.sg_agent,
- "refresh_firewall", create=True),
- mock.patch.object(self.lb_rpc.agent,
- "plugin_rpc", create=True),
- mock.patch.object(linuxbridge_neutron_agent.LOG, 'error'),
- ) as (get_tap_fn, get_tap_devs_fn, _, _, _, _, plugin_rpc, log):
- get_tap_fn.return_value = "tap123"
- get_tap_devs_fn.return_value = set(["tap123", "tap124"])
- port = {"admin_state_up": True,
- "id": "1234-5678",
- "network_id": "123-123"}
- timeout_class = rpc_compat.MessagingTimeout
- plugin_rpc.update_device_up.side_effect = timeout_class
- self.lb_rpc.port_update(mock.Mock(), port=port)
- self.assertTrue(plugin_rpc.update_device_up.called)
- self.assertEqual(log.call_count, 1)
-
- log.reset_mock()
- port["admin_state_up"] = False
- plugin_rpc.update_device_down.side_effect = timeout_class
- self.lb_rpc.port_update(mock.Mock(), port=port)
- self.assertTrue(plugin_rpc.update_device_down.called)
- self.assertEqual(log.call_count, 1)
-
def test_fdb_add(self):
fdb_entries = {'net_id':
{'ports':