From: rossella Date: Thu, 5 Mar 2015 09:24:10 +0000 (+0000) Subject: OVS agent reacts to events instead of polling X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=1992d52d63dc32c63faa5a3f482d5b8ebe925a77;p=openstack-build%2Fneutron-build.git OVS agent reacts to events instead of polling OVSDB monitor generates the events that the OVS agent needs to process (device added or updated). Instead of polling the agent processes the queue of events. Change-Id: I168a3cc3aa96a809153a30635ad7bda29e8ee47c Partially-Implements: blueprint restructure-l2-agent --- diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 230fbb20d..8078b4daf 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -34,6 +34,7 @@ from neutron.agent.common import polling from neutron.agent.common import utils from neutron.agent.l2.extensions import manager as ext_manager from neutron.agent.linux import ip_lib +from neutron.agent.linux import polling as linux_polling from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.handlers import dvr_rpc @@ -1212,6 +1213,88 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, port_info['removed'] = registered_ports - cur_ports return port_info + def process_ports_events(self, events, registered_ports, ancillary_ports, + updated_ports=None): + port_info = {} + port_info['added'] = set() + port_info['removed'] = set() + port_info['current'] = registered_ports + + ancillary_port_info = {} + ancillary_port_info['added'] = set() + ancillary_port_info['removed'] = set() + ancillary_port_info['current'] = ( + ancillary_ports if ancillary_ports else set()) + + # if a port was added and then removed or viceversa since the agent + # can't know the order of the operations, check the status of the port + # to determine if the port was added or deleted + device_removed_or_added = [ + dev for dev in events['added'] if dev in events['removed']] + for device in device_removed_or_added: + if ovs_lib.BaseOVS().port_exists(device['name']): + events['removed'].remove(device) + else: + events['added'].remove(device) + + #TODO(rossella_s): scanning the ancillary bridge won't be needed + # anymore when https://review.openstack.org/#/c/203381 since the bridge + # id stored in external_ids will be used to identify the bridge the + # port belongs to + cur_ancillary_ports = set() + for bridge in self.ancillary_brs: + cur_ancillary_ports |= bridge.get_vif_port_set() + cur_ancillary_ports |= ancillary_port_info['current'] + + def _process_device(device, devices, ancillary_devices): + # check 'iface-id' is set otherwise is not a port + # the agent should care about + if 'attached-mac' in device.get('external_ids', []): + iface_id = self.int_br.portid_from_external_ids( + device['external_ids']) + if iface_id: + if device['ofport'] == ovs_lib.UNASSIGNED_OFPORT: + #TODO(rossella_s) it's extreme to trigger a full resync + # if a port is not ready, resync only the device that + # is not ready + raise Exception( + _("Port %s is not ready, resync needed") % device[ + 'name']) + # check if device belong to ancillary bridge + if iface_id in cur_ancillary_ports: + ancillary_devices.add(iface_id) + else: + devices.add(iface_id) + + for device in events['added']: + _process_device(device, port_info['added'], + ancillary_port_info['added']) + for device in events['removed']: + _process_device(device, port_info['removed'], + ancillary_port_info['removed']) + + if updated_ports is None: + updated_ports = set() + updated_ports.update(self.check_changed_vlans()) + + # Disregard devices that were never noticed by the agent + port_info['removed'] &= port_info['current'] + port_info['current'] |= port_info['added'] + port_info['current'] -= port_info['removed'] + + ancillary_port_info['removed'] &= ancillary_port_info['current'] + ancillary_port_info['current'] |= ancillary_port_info['added'] + ancillary_port_info['current'] -= ancillary_port_info['removed'] + + if updated_ports: + # Some updated ports might have been removed in the + # meanwhile, and therefore should not be processed. + # In this case the updated port won't be found among + # current ports. + updated_ports &= port_info['current'] + port_info['updated'] = updated_ports + return port_info, ancillary_port_info + def scan_ports(self, registered_ports, sync, updated_ports=None): cur_ports = self.int_br.get_vif_port_set() self.int_br_device_count = len(cur_ports) @@ -1656,12 +1739,64 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name) bridge.cleanup_flows() + def process_port_info(self, start, polling_manager, sync, ovs_restarted, + ports, ancillary_ports, updated_ports_copy, + consecutive_resyncs): + # There are polling managers that don't have get_events, e.g. + # AlwaysPoll used by windows implementations + # REVISIT (rossella_s) This needs to be reworked to hide implementation + # details regarding polling in BasePollingManager subclasses + if sync or not (hasattr(polling_manager, 'get_events')): + if sync: + LOG.info(_LI("Agent out of sync with plugin!")) + consecutive_resyncs = consecutive_resyncs + 1 + if (consecutive_resyncs >= + constants.MAX_DEVICE_RETRIES): + LOG.warn(_LW( + "Clearing cache of registered ports," + " retries to resync were > %s"), + constants.MAX_DEVICE_RETRIES) + ports.clear() + ancillary_ports.clear() + consecutive_resyncs = 0 + else: + consecutive_resyncs = 0 + + # NOTE(rossella_s) don't empty the queue of events + # calling polling_manager.get_events() since + # the agent might miss some event (for example a port + # deletion) + reg_ports = (set() if ovs_restarted else ports) + port_info = self.scan_ports(reg_ports, sync, + updated_ports_copy) + # Treat ancillary devices if they exist + if self.ancillary_brs: + ancillary_port_info = self.scan_ancillary_ports( + ancillary_ports, sync) + LOG.debug("Agent rpc_loop - iteration:%(iter_num)d" + " - ancillary port info retrieved. " + "Elapsed:%(elapsed).3f", + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + else: + ancillary_port_info = {} + + else: + consecutive_resyncs = 0 + events = polling_manager.get_events() + ancillary_ports = ( + ancillary_ports if self.ancillary_brs else None) + port_info, ancillary_port_info = ( + self.process_ports_events(events, ports, + ancillary_ports, updated_ports_copy)) + return port_info, ancillary_port_info, consecutive_resyncs + def rpc_loop(self, polling_manager=None): if not polling_manager: polling_manager = polling.get_polling_manager( minimize_polling=False) - sync = True + sync = False ports = set() updated_ports_copy = set() ancillary_ports = set() @@ -1674,20 +1809,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, start = time.time() LOG.debug("Agent rpc_loop - iteration:%d started", self.iter_num) - if sync: - LOG.info(_LI("Agent out of sync with plugin!")) - polling_manager.force_polling() - consecutive_resyncs = consecutive_resyncs + 1 - if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES: - LOG.warn(_LW("Clearing cache of registered ports, retrials" - " to resync were > %s"), - constants.MAX_DEVICE_RETRIES) - ports.clear() - ancillary_ports.clear() - sync = False - consecutive_resyncs = 0 - else: - consecutive_resyncs = 0 ovs_status = self.check_ovs_status() if ovs_status == constants.OVS_RESTARTED: self.setup_integration_br() @@ -1703,6 +1824,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.patch_tun_ofport) self.dvr_agent.reset_dvr_parameters() self.dvr_agent.setup_dvr_flows() + # restart the polling manager so that it will signal as added + # all the current ports + # REVISIT (rossella_s) Define a method "reset" in + # BasePollingManager that will be implemented by AlwaysPoll as + # no action and by InterfacePollingMinimizer as start/stop + if isinstance( + polling_manager, linux_polling.InterfacePollingMinimizer): + polling_manager.stop() + polling_manager.start() elif ovs_status == constants.OVS_DEAD: # Agent doesn't apply any operations when ovs is dead, to # prevent unexpected failure or crash. Sleep and continue @@ -1719,7 +1849,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.exception(_LE("Error while synchronizing tunnels")) tunnel_sync = True ovs_restarted |= (ovs_status == constants.OVS_RESTARTED) - if self._agent_has_updates(polling_manager) or ovs_restarted: + if self._agent_has_updates(polling_manager) or sync: try: LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - " "starting polling. Elapsed:%(elapsed).3f", @@ -1731,9 +1861,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # between these two statements, this will be thread-safe updated_ports_copy = self.updated_ports self.updated_ports = set() - reg_ports = (set() if ovs_restarted else ports) - port_info = self.scan_ports(reg_ports, sync, - updated_ports_copy) + port_info, ancillary_port_info, consecutive_resyncs = ( + self.process_port_info( + start, polling_manager, sync, ovs_restarted, + ports, ancillary_ports, updated_ports_copy, + consecutive_resyncs) + ) + self.process_deleted_ports(port_info) ofport_changed_ports = self.update_stale_ofport_rules() if ofport_changed_ports: @@ -1744,16 +1878,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, "Elapsed:%(elapsed).3f", {'iter_num': self.iter_num, 'elapsed': time.time() - start}) - # Treat ancillary devices if they exist - if self.ancillary_brs: - ancillary_port_info = self.scan_ancillary_ports( - ancillary_ports, sync) - LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - " - "ancillary port info retrieved. " - "Elapsed:%(elapsed).3f", - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) - sync = False # Secure and wire/unwire VIFs and update their status # on Neutron server if (self._port_info_has_changes(port_info) or diff --git a/neutron/tests/functional/agent/l2/base.py b/neutron/tests/functional/agent/l2/base.py index 39e511b11..9fa7540d4 100644 --- a/neutron/tests/functional/agent/l2/base.py +++ b/neutron/tests/functional/agent/l2/base.py @@ -118,13 +118,38 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): if tunnel_types: self.addCleanup(self.ovs.delete_bridge, self.br_tun) agent.sg_agent = mock.Mock() + agent.ancillary_brs = None return agent - def start_agent(self, agent, unplug_ports=None): + def _mock_get_events(self, agent, polling_manager, ports): + get_events = polling_manager.get_events + p_ids = [p['id'] for p in ports] + + def filter_events(): + events = get_events() + filtered_ports = [] + for dev in events['added']: + iface_id = agent.int_br.portid_from_external_ids( + dev.get('external_ids', [])) + if iface_id in p_ids: + # if the event is not about a port that was created by + # this test, we filter the event out. Since these tests are + # not run in isolation processing all the events might make + # some test fail ( e.g. the agent might keep resycing + # because it keeps finding not ready ports that are created + # by other tests) + filtered_ports.append(dev) + return {'added': filtered_ports, 'removed': events['removed']} + polling_manager.get_events = mock.Mock(side_effect=filter_events) + + def start_agent(self, agent, ports=None, unplug_ports=None): if unplug_ports is None: unplug_ports = [] + if ports is None: + ports = [] self.setup_agent_rpc_mocks(agent, unplug_ports) polling_manager = polling.InterfacePollingMinimizer() + self._mock_get_events(agent, polling_manager, ports) self.addCleanup(polling_manager.stop) polling_manager.start() agent_utils.wait_until_true( @@ -138,6 +163,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): rpc_loop_thread.wait() self.addCleanup(stop_agent, agent, t) + return polling_manager def _create_test_port_dict(self): return {'id': uuidutils.generate_uuid(), @@ -280,10 +306,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): def setup_agent_and_ports(self, port_dicts, create_tunnels=True, trigger_resync=False): + self.ports = port_dicts self.agent = self.create_agent(create_tunnels=create_tunnels) - self.start_agent(self.agent) + self.polling_manager = self.start_agent(self.agent, ports=self.ports) self.network = self._create_test_network_dict() - self.ports = port_dicts if trigger_resync: self._prepare_resync_trigger(self.agent) self._plug_ports(self.network, self.ports, self.agent) diff --git a/neutron/tests/functional/agent/test_l2_ovs_agent.py b/neutron/tests/functional/agent/test_l2_ovs_agent.py index b57a8f931..ab59680ac 100644 --- a/neutron/tests/functional/agent/test_l2_ovs_agent.py +++ b/neutron/tests/functional/agent/test_l2_ovs_agent.py @@ -73,7 +73,6 @@ class TestOVSAgent(base.OVSAgentTestFramework): port_dicts=self.create_test_ports()) self.wait_until_ports_state(self.ports, up=True) self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED - # OVS restarted, the agent should reprocess all the ports self.agent.plugin_rpc.update_device_list.reset_mock() self.wait_until_ports_state(self.ports, up=True) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 322e4b921..d4cf3877b 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -401,6 +401,130 @@ class TestOvsNeutronAgent(object): updated_ports) self.assertEqual(expected, actual) + def test_process_ports_events_returns_current_for_unchanged_ports(self): + with mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()): + events = {'added': [], 'removed': []} + registered_ports = {1, 3} + ancillary_ports = {2, 5} + expected_ports = {'current': registered_ports, 'added': set(), + 'removed': set()} + expected_ancillary = {'current': ancillary_ports, 'added': set(), + 'removed': set()} + actual = self.agent.process_ports_events(events, registered_ports, + ancillary_ports) + self.assertEqual((expected_ports, expected_ancillary), actual) + + def test_process_port_events_returns_port_changes(self): + events = {'added': [{'name': 'port3', 'ofport': 3, + 'external_ids': {'attached-mac': 'test-mac'}}, + {'name': 'qg-port2', 'ofport': 5, + 'external_ids': {'attached-mac': 'test-mac'}}], + 'removed': [{'name': 'port2', 'ofport': 2, + 'external_ids': {'attached-mac': 'test-mac'}}, + {'name': 'qg-port1', 'ofport': 4, + 'external_ids': {'attached-mac': 'test-mac'}}]} + registered_ports = {1, 2} + ancillary_ports = {4} + expected_ports = dict( + current={1, 3}, added={3}, removed={2}) + expected_ancillary_ports = dict( + current={5}, added={5}, removed={4}) + ancillary_bridge = mock.Mock() + ancillary_bridge.get_vif_port_set.return_value = {4, 5} + self.agent.ancillary_brs = [ancillary_bridge] + with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', + side_effect=[3, 5, 2, 4]), \ + mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()): + actual = self.agent.process_ports_events( + events, registered_ports, ancillary_ports) + self.assertEqual( + (expected_ports, expected_ancillary_ports), actual) + + def _test_process_port_events_with_updated_ports(self, updated_ports): + events = {'added': [{'name': 'port3', 'ofport': 3, + 'external_ids': {'attached-mac': 'test-mac'}}, + {'name': 'qg-port2', 'ofport': 6, + 'external_ids': {'attached-mac': 'test-mac'}}], + 'removed': [{'name': 'port2', 'ofport': 2, + 'external_ids': {'attached-mac': 'test-mac'}}, + {'name': 'qg-port1', 'ofport': 5, + 'external_ids': {'attached-mac': 'test-mac'}}]} + registered_ports = {1, 2, 4} + ancillary_ports = {5, 8} + expected_ports = dict(current={1, 3, 4}, added={3}, + removed={2}, updated={4}) + expected_ancillary = dict(current={6, 8}, added={6}, + removed={5}) + ancillary_bridge = mock.Mock() + ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8} + self.agent.ancillary_brs = [ancillary_bridge] + with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', + side_effect=[3, 6, 2, 5]), \ + mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()): + + actual = self.agent.process_ports_events( + events, registered_ports, ancillary_ports, updated_ports) + self.assertEqual((expected_ports, expected_ancillary), actual) + + def test_process_port_events_finds_known_updated_ports(self): + self._test_process_port_events_with_updated_ports({4}) + + def test_process_port_events_ignores_unknown_updated_ports(self): + # the port '5' was not seen on current ports. Hence it has either + # never been wired or already removed and should be ignored + self._test_process_port_events_with_updated_ports({4, 5}) + + def test_process_port_events_ignores_updated_port_if_removed(self): + events = {'added': [{'name': 'port3', 'ofport': 3, + 'external_ids': {'attached-mac': 'test-mac'}}], + 'removed': [{'name': 'port2', 'ofport': 2, + 'external_ids': {'attached-mac': 'test-mac'}}]} + registered_ports = {1, 2} + updated_ports = {1, 2} + expected_ports = dict(current={1, 3}, added={3}, + removed={2}, updated={1}) + expected_ancillary = dict(current=set(), added=set(), removed=set()) + with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', + side_effect=[3, 2]), \ + mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()): + + actual = self.agent.process_ports_events( + events, registered_ports, None, updated_ports) + self.assertEqual((expected_ports, expected_ancillary), actual) + + def test_process_port_events_no_vif_changes_return_updated_port_only(self): + events = {'added': [], 'removed': []} + registered_ports = {1, 2, 3} + updated_ports = {2} + expected_ports = dict(current=registered_ports, updated={2}, + added=set(), removed=set()) + expected_ancillary = dict(current=set(), added=set(), removed=set()) + with mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()): + actual = self.agent.process_ports_events( + events, registered_ports, None, updated_ports) + self.assertEqual((expected_ports, expected_ancillary), actual) + + def test_process_port_events_ignores_removed_port_if_never_added(self): + events = {'added': [], + 'removed': [{'name': 'port2', 'ofport': 2, + 'external_ids': {'attached-mac': 'test-mac'}}]} + registered_ports = {1} + expected_ports = dict(current=registered_ports, added=set(), + removed=set()) + expected_ancillary = dict(current=set(), added=set(), removed=set()) + with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', + side_effect=[2]), \ + mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()): + actual = self.agent.process_ports_events(events, registered_ports, + None) + self.assertEqual((expected_ports, expected_ancillary), actual) + def test_update_ports_returns_changed_vlan(self): br = self.br_int_cls('br-int') mac = "ca:fe:de:ad:be:ef" @@ -1371,11 +1495,17 @@ class TestOvsNeutronAgent(object): 'added': set([]), 'removed': set(['tap0'])} + reply_ancillary = {'current': set([]), + 'added': set([]), + 'removed': set([])} + with mock.patch.object(async_process.AsyncProcess, "_spawn"),\ + mock.patch.object(async_process.AsyncProcess, "start"),\ + mock.patch.object(async_process.AsyncProcess, "stop"),\ mock.patch.object(log.KeywordArgumentAdapter, 'exception') as log_exception,\ mock.patch.object(self.mod_agent.OVSNeutronAgent, - 'scan_ports') as scan_ports,\ + 'process_ports_events') as process_p_events,\ mock.patch.object( self.mod_agent.OVSNeutronAgent, 'process_network_ports') as process_network_ports,\ @@ -1393,7 +1523,8 @@ class TestOvsNeutronAgent(object): 'cleanup_stale_flows') as cleanup: log_exception.side_effect = Exception( 'Fake exception to get out of the loop') - scan_ports.side_effect = [reply2, reply3] + process_p_events.side_effect = [(reply2, reply_ancillary), + (reply3, reply_ancillary)] process_network_ports.side_effect = [ False, Exception('Fake exception to get out of the loop')] check_ovs_status.side_effect = args @@ -1402,10 +1533,12 @@ class TestOvsNeutronAgent(object): except Exception: pass - scan_ports.assert_has_calls([ - mock.call(set(), True, set()), - mock.call(set(), False, set()) + process_p_events.assert_has_calls([ + mock.call({'removed': [], 'added': []}, set(), None, set()), + mock.call({'removed': [], 'added': []}, set(['tap0']), None, + set()) ]) + process_network_ports.assert_has_calls([ mock.call(reply2, False), mock.call(reply3, True) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py index 079d2dea0..7bae6df2c 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py @@ -495,14 +495,24 @@ class TunnelTest(object): self._verify_mock_calls() def test_daemon_loop(self): - reply2 = {'current': set(['tap0']), - 'added': set(['tap2']), - 'removed': set([])} + reply_ge_1 = {'added': set(['tap0']), + 'removed': set([])} - reply3 = {'current': set(['tap2']), - 'added': set([]), + reply_ge_2 = {'added': set([]), 'removed': set(['tap0'])} + reply_pe_1 = {'current': set(['tap0']), + 'added': set(['tap0']), + 'removed': set([])} + + reply_pe_2 = {'current': set([]), + 'added': set([]), + 'removed': set(['tap0'])} + + reply_ancillary = {'current': set([]), + 'added': set([]), + 'removed': set([])} + self.mock_int_bridge_expected += [ mock.call.check_canary_table(), mock.call.check_canary_table() @@ -513,7 +523,7 @@ class TunnelTest(object): with mock.patch.object(log.KeywordArgumentAdapter, 'exception') as log_exception,\ mock.patch.object(self.mod_agent.OVSNeutronAgent, - 'scan_ports') as scan_ports,\ + 'process_ports_events') as process_p_events,\ mock.patch.object( self.mod_agent.OVSNeutronAgent, 'process_network_ports') as process_network_ports,\ @@ -528,8 +538,11 @@ class TunnelTest(object): 'cleanup_stale_flows') as cleanup: log_exception.side_effect = Exception( 'Fake exception to get out of the loop') - scan_ports.side_effect = [reply2, reply3] update_stale.return_value = [] + process_p_events.side_effect = [ + (reply_pe_1, reply_ancillary), (reply_pe_2, reply_ancillary)] + interface_polling = mock.Mock() + interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2] process_network_ports.side_effect = [ False, Exception('Fake exception to get out of the loop')] @@ -539,7 +552,7 @@ class TunnelTest(object): # We start method and expect it will raise after 2nd loop # If something goes wrong, assert_has_calls below will catch it try: - n_agent.daemon_loop() + n_agent.rpc_loop(interface_polling) except Exception: pass @@ -547,17 +560,14 @@ class TunnelTest(object): # messages log_exception.assert_called_once_with( "Error while processing VIF ports") - scan_ports.assert_has_calls([ - mock.call(set(), True, set()), - mock.call(set(['tap0']), False, set()) + process_p_events.assert_has_calls([ + mock.call(reply_ge_1, set(), None, set()), + mock.call(reply_ge_2, set(['tap0']), None, set()) ]) process_network_ports.assert_has_calls([ mock.call({'current': set(['tap0']), 'removed': set([]), - 'added': set(['tap2'])}, False), - mock.call({'current': set(['tap2']), - 'removed': set(['tap0']), - 'added': set([])}, False) + 'added': set(['tap0'])}, False), ]) cleanup.assert_called_once_with()