From: Darragh O'Reilly Date: Mon, 16 Dec 2013 14:03:37 +0000 (+0000) Subject: linuxbridge-agent: process port updates in the main loop X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=ece14aab5836949faa5be6fb872840367423a57c;p=openstack-build%2Fneutron-build.git linuxbridge-agent: process port updates in the main loop This patch changes the way the linuxbridge agent processes port update notifications. It does the same for the lb-agent as was done for the ovs-agent in I219c6bdf63b0b5e945b655677f9e28fa591f03cd. Now the RPC call just adds the updated tap device name to a set of updated devices, and the actual processing is done in the main RPC loop. This should solve the problems were port_update RPCs were competing with the main loop/greenthread and with each other to do the same task - like adding a tap or interface to a bridge - which lead to races and was inefficient. Now repeated port_update notifications received within a single main loop iteration will be coalesced and processed only once. Closes-Bug: 1256950 Change-Id: I7fd48542f12b39ffc1346d1a6c9a387ecda6d812 --- diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index 06357a493..c905ec43a 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -510,16 +510,6 @@ class LinuxBridgeManager: int_vxlan.link.delete() LOG.debug(_("Done deleting vxlan interface %s"), interface) - def update_devices(self, registered_devices): - devices = self.get_tap_devices() - if devices == registered_devices: - return - added = devices - registered_devices - removed = registered_devices - devices - return {'current': devices, - 'added': added, - 'removed': removed} - def get_tap_devices(self): devices = set() for device in os.listdir(BRIDGE_FS): @@ -680,57 +670,14 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback, self.agent.br_mgr.delete_vlan_bridge(bridge_name) def port_update(self, context, **kwargs): - LOG.debug(_("port_update received")) - # Check port exists on node - port = kwargs.get('port') - tap_device_name = self.agent.br_mgr.get_tap_device_name(port['id']) - devices = self.agent.br_mgr.get_tap_devices() - if tap_device_name not in devices: - return - - if 'security_groups' in port: - self.sg_agent.refresh_firewall() - try: - if port['admin_state_up']: - network_type = kwargs.get('network_type') - if network_type: - segmentation_id = kwargs.get('segmentation_id') - else: - # compatibility with pre-Havana RPC vlan_id encoding - vlan_id = kwargs.get('vlan_id') - (network_type, - segmentation_id) = lconst.interpret_vlan_id(vlan_id) - physical_network = kwargs.get('physical_network') - # create the networking for the port - if self.agent.br_mgr.add_interface(port['network_id'], - network_type, - physical_network, - segmentation_id, - port['id']): - # update plugin about port status - self.agent.plugin_rpc.update_device_up(self.context, - tap_device_name, - self.agent.agent_id, - cfg.CONF.host) - else: - self.agent.plugin_rpc.update_device_down( - self.context, - tap_device_name, - self.agent.agent_id, - cfg.CONF.host - ) - else: - bridge_name = self.agent.br_mgr.get_bridge_name( - port['network_id']) - self.agent.br_mgr.remove_interface(bridge_name, - tap_device_name) - # update plugin about port status - self.agent.plugin_rpc.update_device_down(self.context, - tap_device_name, - self.agent.agent_id, - cfg.CONF.host) - except rpc_compat.MessagingTimeout: - LOG.error(_("RPC timeout while updating port %s"), port['id']) + port_id = kwargs['port']['id'] + tap_name = self.agent.br_mgr.get_tap_device_name(port_id) + # Put the tap name in the updated_devices set. + # Do not store port details, as if they're used for processing + # notifications there is no guarantee the notifications are + # processed in the same order as the relevant API requests. + self.agent.updated_devices.add(tap_name) + LOG.debug(_("port_update RPC received for port: %s"), port_id) def fdb_add(self, context, fdb_entries): LOG.debug(_("fdb_add received")) @@ -844,6 +791,8 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): 'agent_type': constants.AGENT_TYPE_LINUXBRIDGE, 'start_flag': True} + # stores received port_updates for processing by the main loop + self.updated_devices = set() self.setup_rpc(interface_mappings.values()) self.init_firewall() @@ -907,18 +856,30 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): def process_network_devices(self, device_info): resync_a = False resync_b = False - if 'added' in device_info: - resync_a = self.treat_devices_added(device_info['added']) - if 'removed' in device_info: + + self.prepare_devices_filter(device_info.get('added')) + + if device_info.get('updated'): + self.refresh_firewall() + + # Updated devices are processed the same as new ones, as their + # admin_state_up may have changed. The set union prevents duplicating + # work when a device is new and updated in the same polling iteration. + devices_added_updated = (set(device_info.get('added')) + | set(device_info.get('updated'))) + if devices_added_updated: + resync_a = self.treat_devices_added_updated(devices_added_updated) + + if device_info.get('removed'): resync_b = self.treat_devices_removed(device_info['removed']) # If one of the above operations fails => resync with plugin return (resync_a | resync_b) - def treat_devices_added(self, devices): + def treat_devices_added_updated(self, devices): resync = False - self.prepare_devices_filter(devices) + for device in devices: - LOG.debug(_("Port %s added"), device) + LOG.debug(_("Treating added or updated device: %s"), device) try: details = self.plugin_rpc.get_device_details(self.context, device, @@ -987,6 +948,22 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): self.br_mgr.remove_empty_bridges() return resync + def scan_devices(self, registered_devices, updated_devices): + curr_devices = self.br_mgr.get_tap_devices() + device_info = {} + device_info['current'] = curr_devices + device_info['added'] = curr_devices - registered_devices + # we don't want to process updates for devices that don't exist + device_info['updated'] = updated_devices & curr_devices + # we need to clean up after devices are removed + device_info['removed'] = registered_devices - curr_devices + return device_info + + def _device_info_has_changes(self, device_info): + return (device_info.get('added') + or device_info.get('updated') + or device_info.get('removed')) + def daemon_loop(self): sync = True devices = set() @@ -1000,15 +977,16 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): devices.clear() sync = False device_info = {} + # Save updated devices dict to perform rollback in case + # resync would be needed, and then clear self.updated_devices. + # As the greenthread should not yield between these + # two statements, this will should be thread-safe. + updated_devices_copy = self.updated_devices + self.updated_devices = set() try: - device_info = self.br_mgr.update_devices(devices) - except Exception: - LOG.exception(_("Update devices failed")) - sync = True - try: - # notify plugin about device deltas - if device_info: - LOG.debug(_("Agent loop has new devices!")) + device_info = self.scan_devices(devices, updated_devices_copy) + if self._device_info_has_changes(device_info): + LOG.debug(_("Agent loop found changes! %s"), device_info) # If treat devices fails - indicates must resync with # plugin sync = self.process_network_devices(device_info) @@ -1017,6 +995,10 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): LOG.exception(_("Error in agent loop. Devices info: %s"), device_info) sync = True + # Restore devices that were removed from this set earlier + # without overwriting ones that may have arrived since. + self.updated_devices |= updated_devices_copy + # sleep till end of polling interval elapsed = (time.time() - start) if (elapsed < self.polling_interval): diff --git a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py index 34671424a..d72b5615f 100644 --- a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py +++ b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py @@ -25,7 +25,6 @@ from neutron.agent.linux import ip_lib from neutron.agent.linux import utils from neutron.common import constants from neutron.common import exceptions -from neutron.common import rpc_compat from neutron.plugins.common import constants as p_const from neutron.plugins.linuxbridge.agent import linuxbridge_neutron_agent from neutron.plugins.linuxbridge.common import constants as lconst @@ -111,6 +110,9 @@ class TestLinuxBridgeAgent(base.BaseTestCase): 'get_interface_mac') self.get_mac = self.get_mac_p.start() self.get_mac.return_value = '00:00:00:00:00:01' + self.agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({}, + 0, + None) def test_treat_devices_removed_with_existed_device(self): agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({}, @@ -168,53 +170,147 @@ class TestLinuxBridgeAgent(base.BaseTestCase): self.assertTrue(fn_udd.called) self.assertTrue(fn_rdf.called) - def test_update_devices_failed(self): - agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({}, - 0, - None) - raise_exception = [0] + def test_loop_restores_updated_devices_on_exception(self): + agent = self.agent + agent.updated_devices = set(['tap1', 'tap2']) - def info_mock(msg): - if raise_exception[0] < 2: - raise_exception[0] += 1 - else: - raise RuntimeError() - with mock.patch.object(agent.br_mgr, - "update_devices") as update_devices: - update_devices.side_effect = RuntimeError - with mock.patch.object(linuxbridge_neutron_agent.LOG, - 'info') as log: - log.side_effect = info_mock - with testtools.ExpectedException(RuntimeError): + with contextlib.nested( + mock.patch.object(agent, 'scan_devices'), + mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'), + mock.patch.object(agent, 'process_network_devices') + ) as (scan_devices, log, process_network_devices): + # Simulate effect of 2 port_update()s when loop is running. + # And break out of loop at start of 2nd iteration. + log.side_effect = [agent.updated_devices.add('tap3'), + agent.updated_devices.add('tap4'), + ValueError] + scan_devices.side_effect = RuntimeError + + with testtools.ExpectedException(ValueError): agent.daemon_loop() - self.assertEqual(3, log.call_count) - - def test_process_network_devices_failed(self): - device_info = {'current': [1, 2, 3]} - agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({}, - 0, - None) - raise_exception = [0] - - def info_mock(msg): - if raise_exception[0] < 2: - raise_exception[0] += 1 - else: - raise RuntimeError() - with mock.patch.object(agent.br_mgr, - "update_devices") as update_devices: - update_devices.side_effect = device_info - with contextlib.nested( - mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'), - mock.patch.object(agent, 'process_network_devices') - ) as (log, process_network_devices): - log.side_effect = info_mock - process_network_devices.side_effect = RuntimeError - with testtools.ExpectedException(RuntimeError): - agent.daemon_loop() + # Check that the originals {tap1,tap2} have been restored + # and the new updates {tap3, tap4} have not been overwritten. + self.assertEqual(set(['tap1', 'tap2', 'tap3', 'tap4']), + agent.updated_devices) self.assertEqual(3, log.call_count) + def mock_scan_devices(self, expected, mock_current, + registered_devices, updated_devices): + self.agent.br_mgr = mock.Mock() + self.agent.br_mgr.get_tap_devices.return_value = mock_current + + results = self.agent.scan_devices(registered_devices, updated_devices) + self.assertEqual(expected, results) + + def test_scan_devices_returns_empty_sets(self): + registered = set() + updated = set() + mock_current = set() + expected = {'current': set(), + 'updated': set(), + 'added': set(), + 'removed': set()} + self.mock_scan_devices(expected, mock_current, registered, updated) + + def test_scan_devices_no_changes(self): + registered = set(['tap1', 'tap2']) + updated = set() + mock_current = set(['tap1', 'tap2']) + expected = {'current': set(['tap1', 'tap2']), + 'updated': set(), + 'added': set(), + 'removed': set()} + self.mock_scan_devices(expected, mock_current, registered, updated) + + def test_scan_devices_new_and_removed(self): + registered = set(['tap1', 'tap2']) + updated = set() + mock_current = set(['tap2', 'tap3']) + expected = {'current': set(['tap2', 'tap3']), + 'updated': set(), + 'added': set(['tap3']), + 'removed': set(['tap1'])} + self.mock_scan_devices(expected, mock_current, registered, updated) + + def test_scan_devices_new_updates(self): + registered = set(['tap1']) + updated = set(['tap2']) + mock_current = set(['tap1', 'tap2']) + expected = {'current': set(['tap1', 'tap2']), + 'updated': set(['tap2']), + 'added': set(['tap2']), + 'removed': set()} + self.mock_scan_devices(expected, mock_current, registered, updated) + + def test_scan_devices_updated_missing(self): + registered = set(['tap1']) + updated = set(['tap2']) + mock_current = set(['tap1']) + expected = {'current': set(['tap1']), + 'updated': set(), + 'added': set(), + 'removed': set()} + self.mock_scan_devices(expected, mock_current, registered, updated) + + def test_process_network_devices(self): + agent = self.agent + device_info = {'current': set(), + 'added': set(['tap3', 'tap4']), + 'updated': set(['tap2', 'tap3']), + 'removed': set(['tap1'])} + agent.prepare_devices_filter = mock.Mock() + agent.refresh_firewall = mock.Mock() + agent.treat_devices_added_updated = mock.Mock(return_value=False) + agent.treat_devices_removed = mock.Mock(return_value=False) + + agent.process_network_devices(device_info) + + agent.prepare_devices_filter.assert_called_with(set(['tap3', 'tap4'])) + self.assertTrue(agent.refresh_firewall.called) + agent.treat_devices_added_updated.assert_called_with(set(['tap2', + 'tap3', + 'tap4'])) + agent.treat_devices_removed.assert_called_with(set(['tap1'])) + + def test_treat_devices_added_updated_admin_state_up_true(self): + agent = self.agent + mock_details = {'port_id': 'port123', + 'network_id': 'net123', + 'admin_state_up': True, + 'network_type': 'vlan', + 'segmentation_id': 100, + 'physical_network': 'physnet1'} + agent.plugin_rpc = mock.Mock() + agent.plugin_rpc.get_device_details.return_value = mock_details + agent.br_mgr = mock.Mock() + agent.br_mgr.add_interface.return_value = True + + resync_needed = agent.treat_devices_added_updated(set(['tap1'])) + + self.assertFalse(resync_needed) + agent.br_mgr.add_interface.assert_called_with('net123', 'vlan', + 'physnet1', 100, + 'port123') + self.assertTrue(agent.plugin_rpc.update_device_up.called) + + def test_treat_devices_added_updated_admin_state_up_false(self): + mock_details = {'port_id': 'port123', + 'network_id': 'net123', + 'admin_state_up': False, + 'network_type': 'vlan', + 'segmentation_id': 100, + 'physical_network': 'physnet1'} + self.agent.plugin_rpc = mock.Mock() + self.agent.plugin_rpc.get_device_details.return_value = mock_details + self.agent.remove_port_binding = mock.Mock() + + resync_needed = self.agent.treat_devices_added_updated(set(['tap1'])) + + self.assertFalse(resync_needed) + self.agent.remove_port_binding.assert_called_with('net123', 'port123') + self.assertFalse(self.agent.plugin_rpc.update_device_up.called) + class TestLinuxBridgeManager(base.BaseTestCase): def setUp(self): @@ -704,18 +800,6 @@ class TestLinuxBridgeManager(base.BaseTestCase): self.lbm.delete_vlan("eth1.1") self.assertTrue(exec_fn.called) - def test_update_devices(self): - with mock.patch.object(self.lbm, "get_tap_devices") as gt_fn: - gt_fn.return_value = set(["dev1"]) - self.assertIsNone(self.lbm.update_devices(set(["dev1"]))) - - gt_fn.return_value = set(["dev1", "dev2"]) - self.assertEqual(self.lbm.update_devices(set(["dev2", "dev3"])), - {"current": set(["dev1", "dev2"]), - "added": set(["dev1"]), - "removed": set(["dev3"]) - }) - def _check_vxlan_support(self, expected, vxlan_module_supported, vxlan_ucast_supported, vxlan_mcast_supported): with contextlib.nested( @@ -857,145 +941,6 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase): get_br_fn.assert_called_with("123") del_fn.assert_called_with("br0") - def test_port_update(self): - with contextlib.nested( - mock.patch.object(self.lb_rpc.agent.br_mgr, - "get_tap_device_name"), - mock.patch.object(self.lb_rpc.agent.br_mgr, - "get_tap_devices"), - mock.patch.object(self.lb_rpc.agent.br_mgr, - "get_bridge_name"), - mock.patch.object(self.lb_rpc.agent.br_mgr, - "remove_interface"), - mock.patch.object(self.lb_rpc.agent.br_mgr, "add_interface"), - mock.patch.object(self.lb_rpc.agent, - "plugin_rpc", create=True), - mock.patch.object(self.lb_rpc.sg_agent, - "refresh_firewall", create=True) - ) as (get_tap_fn, get_tap_devs_fn, getbr_fn, remif_fn, - addif_fn, rpc_obj, reffw_fn): - get_tap_fn.return_value = "tap123" - get_tap_devs_fn.return_value = set(["tap123", "tap124"]) - port = {"admin_state_up": True, - "id": "1234-5678", - "network_id": "123-123"} - self.lb_rpc.port_update("unused_context", port=port, - vlan_id="1", physical_network="physnet1") - self.assertFalse(reffw_fn.called) - addif_fn.assert_called_with(port["network_id"], p_const.TYPE_VLAN, - "physnet1", "1", port["id"]) - - self.lb_rpc.port_update("unused_context", port=port, - network_type=p_const.TYPE_VLAN, - segmentation_id="2", - physical_network="physnet1") - self.assertFalse(reffw_fn.called) - addif_fn.assert_called_with(port["network_id"], p_const.TYPE_VLAN, - "physnet1", "2", port["id"]) - - self.lb_rpc.port_update("unused_context", port=port, - vlan_id=lconst.FLAT_VLAN_ID, - physical_network="physnet1") - self.assertFalse(reffw_fn.called) - addif_fn.assert_called_with(port["network_id"], p_const.TYPE_FLAT, - "physnet1", None, port["id"]) - - self.lb_rpc.port_update("unused_context", port=port, - network_type=p_const.TYPE_FLAT, - segmentation_id=None, - physical_network="physnet1") - self.assertFalse(reffw_fn.called) - addif_fn.assert_called_with(port["network_id"], p_const.TYPE_FLAT, - "physnet1", None, port["id"]) - - self.lb_rpc.port_update("unused_context", port=port, - vlan_id=lconst.LOCAL_VLAN_ID, - physical_network=None) - self.assertFalse(reffw_fn.called) - addif_fn.assert_called_with(port["network_id"], p_const.TYPE_LOCAL, - None, None, port["id"]) - - self.lb_rpc.port_update("unused_context", port=port, - network_type=p_const.TYPE_LOCAL, - segmentation_id=None, - physical_network=None) - self.assertFalse(reffw_fn.called) - addif_fn.assert_called_with(port["network_id"], p_const.TYPE_LOCAL, - None, None, port["id"]) - - addif_fn.return_value = True - self.lb_rpc.port_update("unused_context", port=port, - network_type=p_const.TYPE_LOCAL, - segmentation_id=None, - physical_network=None) - rpc_obj.update_device_up.assert_called_with( - self.lb_rpc.context, - "tap123", - self.lb_rpc.agent.agent_id, - cfg.CONF.host - ) - - addif_fn.return_value = False - self.lb_rpc.port_update("unused_context", port=port, - network_type=p_const.TYPE_LOCAL, - segmentation_id=None, - physical_network=None) - rpc_obj.update_device_down.assert_called_with( - self.lb_rpc.context, - "tap123", - self.lb_rpc.agent.agent_id, - cfg.CONF.host - ) - - port["admin_state_up"] = False - port["security_groups"] = True - getbr_fn.return_value = "br0" - self.lb_rpc.port_update("unused_context", port=port, - vlan_id="1", physical_network="physnet1") - self.assertTrue(reffw_fn.called) - remif_fn.assert_called_with("br0", "tap123") - rpc_obj.update_device_down.assert_called_with( - self.lb_rpc.context, - "tap123", - self.lb_rpc.agent.agent_id, - cfg.CONF.host - ) - - def test_port_update_plugin_rpc_failed(self): - with contextlib.nested( - mock.patch.object(self.lb_rpc.agent.br_mgr, - "get_tap_device_name"), - mock.patch.object(self.lb_rpc.agent.br_mgr, - "get_tap_devices"), - mock.patch.object(self.lb_rpc.agent.br_mgr, - "get_bridge_name"), - mock.patch.object(self.lb_rpc.agent.br_mgr, - "remove_interface"), - mock.patch.object(self.lb_rpc.agent.br_mgr, "add_interface"), - mock.patch.object(self.lb_rpc.sg_agent, - "refresh_firewall", create=True), - mock.patch.object(self.lb_rpc.agent, - "plugin_rpc", create=True), - mock.patch.object(linuxbridge_neutron_agent.LOG, 'error'), - ) as (get_tap_fn, get_tap_devs_fn, _, _, _, _, plugin_rpc, log): - get_tap_fn.return_value = "tap123" - get_tap_devs_fn.return_value = set(["tap123", "tap124"]) - port = {"admin_state_up": True, - "id": "1234-5678", - "network_id": "123-123"} - timeout_class = rpc_compat.MessagingTimeout - plugin_rpc.update_device_up.side_effect = timeout_class - self.lb_rpc.port_update(mock.Mock(), port=port) - self.assertTrue(plugin_rpc.update_device_up.called) - self.assertEqual(log.call_count, 1) - - log.reset_mock() - port["admin_state_up"] = False - plugin_rpc.update_device_down.side_effect = timeout_class - self.lb_rpc.port_update(mock.Mock(), port=port) - self.assertTrue(plugin_rpc.update_device_down.called) - self.assertEqual(log.call_count, 1) - def test_fdb_add(self): fdb_entries = {'net_id': {'ports':