From: fumihiko kakuma Date: Sun, 16 Mar 2014 04:13:03 +0000 (+0900) Subject: OFAgent: Process port_update notifications in the main agent loop X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=34e3be16a196c3da32cd6fa6147f6e00636a9aaf;p=openstack-build%2Fneutron-build.git OFAgent: Process port_update notifications in the main agent loop Port the following patch to OFAgent. commit: 5e6e592132aa9a98936ce3bfdb66efc7832caafb https://review.openstack.org/#/c/61964/ Partial-Bug: 1293265 Change-Id: I53813d12c66dc746cd373fd91ff9bd9bdbf222db --- diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py index 62bbd8669..7e3512721 100644 --- a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py +++ b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py @@ -36,10 +36,8 @@ from neutron.common import constants as n_const from neutron.common import topics from neutron.common import utils as n_utils from neutron import context -from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import common as rpc_common from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.ofagent.common import config # noqa @@ -277,6 +275,8 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): self.sg_agent = OFASecurityGroupAgent(self.context, self.plugin_rpc, self.root_helper) + # Stores port update notifications for processing in main loop + self.updated_ports = set() # Initialize iteration counter self.iter_num = 0 @@ -353,33 +353,12 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): def port_update(self, context, **kwargs): port = kwargs.get('port') + # Put the port identifier in the updated_ports set. + # Even if full port details might be provided to this call, + # they are not used since there is no guarantee the notifications + # are processed in the same order as the relevant API requests + self.updated_ports.add(port['id']) LOG.debug(_("port_update received port %s"), port['id']) - # Validate that port is on OVS - vif_port = self.int_br.get_vif_port_by_id(port['id']) - if not vif_port: - return - - if ext_sg.SECURITYGROUPS in port: - self.sg_agent.refresh_firewall() - network_type = kwargs.get('network_type') - segmentation_id = kwargs.get('segmentation_id') - physical_network = kwargs.get('physical_network') - self.treat_vif_port(vif_port, port['id'], port['network_id'], - network_type, physical_network, - segmentation_id, port['admin_state_up']) - try: - if port['admin_state_up']: - # update plugin about port status - self.plugin_rpc.update_device_up(self.context, port['id'], - self.agent_id, - cfg.CONF.host) - else: - # update plugin about port status - self.plugin_rpc.update_device_down(self.context, port['id'], - self.agent_id, - cfg.CONF.host) - except rpc_common.Timeout: - LOG.error(_("RPC timeout while updating port %s"), port['id']) def tunnel_update(self, context, **kwargs): LOG.debug(_("tunnel_update received")) @@ -973,16 +952,27 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): self._phys_br_patch_physical_bridge_with_integration_bridge( br, physical_network, bridge, ip_wrapper) - def update_ports(self, registered_ports): - ports = self.int_br.get_vif_port_set() - if ports == registered_ports: - return - self.int_br_device_count = len(ports) - added = ports - registered_ports - removed = registered_ports - ports - return {'current': ports, - 'added': added, - 'removed': removed} + def scan_ports(self, registered_ports, updated_ports=None): + cur_ports = self.int_br.get_vif_port_set() + self.int_br_device_count = len(cur_ports) + port_info = {'current': cur_ports} + if updated_ports: + # Some updated ports might have been removed in the + # meanwhile, and therefore should not be processed. + # In this case the updated port won't be found among + # current ports. + updated_ports &= cur_ports + if updated_ports: + port_info['updated'] = updated_ports + + if cur_ports == registered_ports: + # No added or removed ports to set, just return here + return port_info + + port_info['added'] = cur_ports - registered_ports + # Remove all the known ports not found on the integration bridge + port_info['removed'] = registered_ports - cur_ports + return port_info def update_ancillary_ports(self, registered_ports): ports = set() @@ -1066,11 +1056,10 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): self.ryu_send_msg(msg) return ofport - def treat_devices_added(self, devices): + def treat_devices_added_or_updated(self, devices): resync = False - self.sg_agent.prepare_devices_filter(devices) for device in devices: - LOG.info(_("Port %s added"), device) + LOG.debug(_("Processing port %s"), device) try: details = self.plugin_rpc.get_device_details(self.context, device, @@ -1093,12 +1082,17 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): details['admin_state_up']) # update plugin about port status - self.plugin_rpc.update_device_up(self.context, - device, - self.agent_id, - cfg.CONF.host) + if details.get('admin_state_up'): + LOG.debug(_("Setting status for %s to UP"), device) + self.plugin_rpc.update_device_up( + self.context, device, self.agent_id, cfg.CONF.host) + else: + LOG.debug(_("Setting status for %s to DOWN"), device) + self.plugin_rpc.update_device_down( + self.context, device, self.agent_id, cfg.CONF.host) + LOG.info(_("Configuration for device %s completed."), device) else: - LOG.debug(_("Device %s not defined on plugin"), device) + LOG.warn(_("Device %s not defined on plugin"), device) if (port and int(port.ofport) != -1): self.port_dead(port) return resync @@ -1166,11 +1160,25 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): def process_network_ports(self, port_info): resync_add = False resync_removed = False - if 'added' in port_info: + # If there is an exception while processing security groups ports + # will not be wired anyway, and a resync will be triggered + self.sg_agent.prepare_devices_filter(port_info.get('added', set())) + if port_info.get('updated'): + self.sg_agent.refresh_firewall() + # VIF wiring needs to be performed always for 'new' devices. + # For updated ports, re-wiring is not needed in most cases, but needs + # to be performed anyway when the admin state of a device is changed. + # A device might be both in the 'added' and 'updated' + # list at the same time; avoid processing it twice. + devices_added_updated = (port_info.get('added', set()) | + port_info.get('updated', set())) + if devices_added_updated: start = time.time() - resync_add = self.treat_devices_added(port_info['added']) + resync_add = self.treat_devices_added_or_updated( + devices_added_updated) LOG.debug(_("process_network_ports - iteration:%(iter_num)d - " - "treat_devices_added completed in %(elapsed).3f"), + "treat_devices_added_or_updated completed " + "in %(elapsed).3f"), {'iter_num': self.iter_num, 'elapsed': time.time() - start}) if 'removed' in port_info: @@ -1230,40 +1238,60 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): resync = True return resync + def _agent_has_updates(self, polling_manager): + return (polling_manager.is_polling_required or + self.updated_ports) + + def _port_info_has_changes(self, port_info): + return (port_info.get('added') or + port_info.get('removed') or + port_info.get('updated')) + def ovsdb_monitor_loop(self, polling_manager=None): if not polling_manager: polling_manager = polling.AlwaysPoll() sync = True ports = set() + updated_ports_copy = set() ancillary_ports = set() tunnel_sync = True while True: - try: - start = time.time() - port_stats = {'regular': {'added': 0, 'removed': 0}, - 'ancillary': {'added': 0, 'removed': 0}} - LOG.debug(_("Agent ovsdb_monitor_loop - " - "iteration:%d started"), - self.iter_num) - if sync: - LOG.info(_("Agent out of sync with plugin!")) - ports.clear() - ancillary_ports.clear() - sync = False - polling_manager.force_polling() - - # Notify the plugin of tunnel IP - if self.enable_tunneling and tunnel_sync: - LOG.info(_("Agent tunnel out of sync with plugin!")) + start = time.time() + port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0}, + 'ancillary': {'added': 0, 'removed': 0}} + LOG.debug(_("Agent ovsdb_monitor_loop - " + "iteration:%d started"), + self.iter_num) + if sync: + LOG.info(_("Agent out of sync with plugin!")) + ports.clear() + ancillary_ports.clear() + sync = False + polling_manager.force_polling() + # Notify the plugin of tunnel IP + if self.enable_tunneling and tunnel_sync: + LOG.info(_("Agent tunnel out of sync with plugin!")) + try: tunnel_sync = self.tunnel_sync() - if polling_manager.is_polling_required: + except Exception: + LOG.exception(_("Error while synchronizing tunnels")) + tunnel_sync = True + if self._agent_has_updates(polling_manager): + try: LOG.debug(_("Agent ovsdb_monitor_loop - " "iteration:%(iter_num)d - " "starting polling. Elapsed:%(elapsed).3f"), {'iter_num': self.iter_num, 'elapsed': time.time() - start}) - port_info = self.update_ports(ports) + # Save updated ports dict to perform rollback in + # case resync would be needed, and then clear + # self.updated_ports. As the greenthread should not yield + # between these two statements, this will be thread-safe + updated_ports_copy = self.updated_ports + self.updated_ports = set() + port_info = self.scan_ports(ports, updated_ports_copy) + ports = port_info['current'] LOG.debug(_("Agent ovsdb_monitor_loop - " "iteration:%(iter_num)d - " "port information retrieved. " @@ -1271,8 +1299,9 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): {'iter_num': self.iter_num, 'elapsed': time.time() - start}) # notify plugin about port deltas - if port_info: - LOG.debug(_("Agent loop has new devices!")) + if self._port_info_has_changes(port_info): + LOG.debug(_("Starting to process devices in:%s"), + port_info) # If treat devices fails - must resync with plugin sync = self.process_network_ports(port_info) LOG.debug(_("Agent ovsdb_monitor_loop - " @@ -1280,9 +1309,10 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): "ports processed. Elapsed:%(elapsed).3f"), {'iter_num': self.iter_num, 'elapsed': time.time() - start}) - ports = port_info['current'] port_stats['regular']['added'] = ( len(port_info.get('added', []))) + port_stats['regular']['updated'] = ( + len(port_info.get('updated', []))) port_stats['regular']['removed'] = ( len(port_info.get('removed', []))) # Treat ancillary devices if they exist @@ -1313,11 +1343,11 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): sync = sync | rc polling_manager.polling_completed() - - except Exception: - LOG.exception(_("Error in agent event loop")) - sync = True - tunnel_sync = True + except Exception: + LOG.exception(_("Error while processing VIF ports")) + # Put the ports back in self.updated_port + self.updated_ports |= updated_ports_copy + sync = True # sleep till end of polling interval elapsed = (time.time() - start) diff --git a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py index bbda0ee31..b95d15edc 100644 --- a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py +++ b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py @@ -26,7 +26,6 @@ import testtools from neutron.agent.linux import ip_lib from neutron.agent.linux import utils from neutron.openstack.common import importutils -from neutron.openstack.common.rpc import common as rpc_common from neutron.plugins.common import constants as p_const from neutron.plugins.openvswitch.common import constants from neutron.tests import base @@ -310,28 +309,69 @@ class TestOFANeutronAgent(OFAAgentTestCase): def test_port_dead_with_port_already_dead(self): self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG) - def mock_update_ports(self, vif_port_set=None, registered_ports=None): + def mock_scan_ports(self, vif_port_set=None, registered_ports=None, + updated_ports=None): with mock.patch.object(self.agent.int_br, 'get_vif_port_set', return_value=vif_port_set): - return self.agent.update_ports(registered_ports) + return self.agent.scan_ports(registered_ports, updated_ports) - def test_update_ports_returns_none_for_unchanged_ports(self): - self.assertIsNone(self.mock_update_ports()) + def test_scan_ports_returns_current_only_for_unchanged_ports(self): + vif_port_set = set([1, 3]) + registered_ports = set([1, 3]) + expected = {'current': vif_port_set} + actual = self.mock_scan_ports(vif_port_set, registered_ports) + self.assertEqual(expected, actual) - def test_update_ports_returns_port_changes(self): + def test_scan_ports_returns_port_changes(self): vif_port_set = set([1, 3]) registered_ports = set([1, 2]) expected = dict(current=vif_port_set, added=set([3]), removed=set([2])) - actual = self.mock_update_ports(vif_port_set, registered_ports) + actual = self.mock_scan_ports(vif_port_set, registered_ports) + self.assertEqual(expected, actual) + + def _test_scan_ports_with_updated_ports(self, updated_ports): + vif_port_set = set([1, 3, 4]) + registered_ports = set([1, 2, 4]) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([4])) + actual = self.mock_scan_ports(vif_port_set, registered_ports, + updated_ports) + self.assertEqual(expected, actual) + + def test_scan_ports_finds_known_updated_ports(self): + self._test_scan_ports_with_updated_ports(set([4])) + + def test_scan_ports_ignores_unknown_updated_ports(self): + # the port '5' was not seen on current ports. Hence it has either + # never been wired or already removed and should be ignored + self._test_scan_ports_with_updated_ports(set([4, 5])) + + def test_scan_ports_ignores_updated_port_if_removed(self): + vif_port_set = set([1, 3]) + registered_ports = set([1, 2]) + updated_ports = set([1, 2]) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([1])) + actual = self.mock_scan_ports(vif_port_set, registered_ports, + updated_ports) + self.assertEqual(expected, actual) + + def test_scan_ports_no_vif_changes_returns_updated_port_only(self): + vif_port_set = set([1, 2, 3]) + registered_ports = set([1, 2, 3]) + updated_ports = set([2]) + expected = dict(current=vif_port_set, updated=set([2])) + actual = self.mock_scan_ports(vif_port_set, registered_ports, + updated_ports) self.assertEqual(expected, actual) def test_treat_devices_added_returns_true_for_missing_device(self): with mock.patch.object(self.agent.plugin_rpc, 'get_device_details', side_effect=Exception()): - self.assertTrue(self.agent.treat_devices_added([{}])) + self.assertTrue(self.agent.treat_devices_added_or_updated([{}])) - def _mock_treat_devices_added(self, details, port, func_name): - """Mock treat devices added. + def _mock_treat_devices_added_updated(self, details, port, func_name): + """Mock treat devices added or updated. :param details: the details to return for the device :param port: the port that get_vif_port_by_id should return @@ -344,29 +384,51 @@ class TestOFANeutronAgent(OFAAgentTestCase): mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=port), mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, func_name) - ) as (get_dev_fn, get_vif_func, upd_dev_up, func): - self.assertFalse(self.agent.treat_devices_added([{}])) + ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func): + self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) return func.called - def test_treat_devices_added_ignores_invalid_ofport(self): + def test_treat_devices_added_updated_ignores_invalid_ofport(self): port = mock.Mock() port.ofport = -1 - self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port, - 'port_dead')) + self.assertFalse(self._mock_treat_devices_added_updated( + mock.MagicMock(), port, 'port_dead')) - def test_treat_devices_added_marks_unknown_port_as_dead(self): + def test_treat_devices_added_updated_marks_unknown_port_as_dead(self): port = mock.Mock() port.ofport = 1 - self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port, - 'port_dead')) + self.assertTrue(self._mock_treat_devices_added_updated( + mock.MagicMock(), port, 'port_dead')) - def test_treat_devices_added_updates_known_port(self): + def test_treat_devices_added_updated_updates_known_port(self): details = mock.MagicMock() details.__contains__.side_effect = lambda x: True - self.assertTrue(self._mock_treat_devices_added(details, - mock.Mock(), - 'treat_vif_port')) + self.assertTrue(self._mock_treat_devices_added_updated( + details, mock.Mock(), 'treat_vif_port')) + + def test_treat_devices_added_updated_put_port_down(self): + fake_details_dict = {'admin_state_up': False, + 'port_id': 'xxx', + 'device': 'xxx', + 'network_id': 'yyy', + 'physical_network': 'foo', + 'segmentation_id': 'bar', + 'network_type': 'baz'} + with contextlib.nested( + mock.patch.object(self.agent.plugin_rpc, 'get_device_details', + return_value=fake_details_dict), + mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', + return_value=mock.MagicMock()), + mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), + mock.patch.object(self.agent, 'treat_vif_port') + ) as (get_dev_fn, get_vif_func, upd_dev_up, + upd_dev_down, treat_vif_port): + self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) + self.assertTrue(treat_vif_port.called) + self.assertTrue(upd_dev_down.called) def test_treat_devices_removed_returns_true_for_missing_device(self): with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', @@ -387,17 +449,36 @@ class TestOFANeutronAgent(OFAAgentTestCase): def test_treat_devices_removed_ignores_missing_port(self): self._mock_treat_devices_removed(False) + def _test_process_network_ports(self, port_info): + with contextlib.nested( + mock.patch.object(self.agent.sg_agent, "prepare_devices_filter"), + mock.patch.object(self.agent.sg_agent, "refresh_firewall"), + mock.patch.object(self.agent, "treat_devices_added_or_updated", + return_value=False), + mock.patch.object(self.agent, "treat_devices_removed", + return_value=False) + ) as (prep_dev_filter, refresh_fw, + device_added_updated, device_removed): + self.assertFalse(self.agent.process_network_ports(port_info)) + prep_dev_filter.assert_called_once_with(port_info['added']) + if port_info.get('updated'): + self.assertEqual(1, refresh_fw.call_count) + device_added_updated.assert_called_once_with( + port_info['added'] | port_info.get('updated', set())) + device_removed.assert_called_once_with(port_info['removed']) + def test_process_network_ports(self): - reply = {'current': set(['tap0']), - 'removed': set(['eth0']), - 'added': set(['eth1'])} - with mock.patch.object(self.agent, 'treat_devices_added', - return_value=False) as device_added: - with mock.patch.object(self.agent, 'treat_devices_removed', - return_value=False) as device_removed: - self.assertFalse(self.agent.process_network_ports(reply)) - device_added.assert_called_once_with(set(['eth1'])) - device_removed.assert_called_once_with(set(['eth0'])) + self._test_process_network_ports( + {'current': set(['tap0']), + 'removed': set(['eth0']), + 'added': set(['eth1'])}) + + def test_process_network_port_with_updated_ports(self): + self._test_process_network_ports( + {'current': set(['tap0', 'tap1']), + 'updated': set(['tap1', 'eth1']), + 'removed': set(['eth0']), + 'added': set(['eth1'])}) def test_report_state(self): with mock.patch.object(self.agent.state_rpc, @@ -424,61 +505,15 @@ class TestOFANeutronAgent(OFAAgentTestCase): recl_fn.assert_called_with("123") def test_port_update(self): - with contextlib.nested( - mock.patch.object(self.agent.int_br, "get_vif_port_by_id"), - mock.patch.object(self.agent, "treat_vif_port"), - mock.patch.object(self.agent.plugin_rpc, "update_device_up"), - mock.patch.object(self.agent.plugin_rpc, "update_device_down") - ) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn): - port = {"id": "123", - "network_id": "124", - "admin_state_up": False} - getvif_fn.return_value = "vif_port_obj" - self.agent.port_update("unused_context", - port=port, - network_type="vlan", - segmentation_id="1", - physical_network="physnet") - treatvif_fn.assert_called_with("vif_port_obj", "123", - "124", "vlan", "physnet", - "1", False) - upddown_fn.assert_called_with(self.agent.context, - "123", self.agent.agent_id, - cfg.CONF.host) - - port["admin_state_up"] = True - self.agent.port_update("unused_context", - port=port, - network_type="vlan", - segmentation_id="1", - physical_network="physnet") - updup_fn.assert_called_with(self.agent.context, - "123", self.agent.agent_id, - cfg.CONF.host) - - def test_port_update_plugin_rpc_failed(self): - port = {'id': 1, - 'network_id': 1, - 'admin_state_up': True} - with contextlib.nested( - mock.patch.object(self.mod_agent.LOG, 'error'), - mock.patch.object(self.agent.int_br, "get_vif_port_by_id"), - mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), - mock.patch.object(self.agent, 'port_bound'), - mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), - mock.patch.object(self.agent, 'port_dead') - ) as (log, _, device_up, _, device_down, _): - device_up.side_effect = rpc_common.Timeout - self.agent.port_update(mock.Mock(), port=port) - self.assertTrue(device_up.called) - self.assertEqual(log.call_count, 1) - - log.reset_mock() - port['admin_state_up'] = False - device_down.side_effect = rpc_common.Timeout - self.agent.port_update(mock.Mock(), port=port) - self.assertTrue(device_down.called) - self.assertEqual(log.call_count, 1) + port = {"id": "123", + "network_id": "124", + "admin_state_up": False} + self.agent.port_update("unused_context", + port=port, + network_type="vlan", + segmentation_id="1", + physical_network="physnet") + self.assertEqual(set(['123']), self.agent.updated_ports) def test_setup_physical_bridges(self): with contextlib.nested(