]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
OVS agent reacts to events instead of polling
authorrossella <rsblendido@suse.com>
Thu, 5 Mar 2015 09:24:10 +0000 (09:24 +0000)
committerrossella <rsblendido@suse.com>
Thu, 5 Nov 2015 17:40:24 +0000 (18:40 +0100)
OVSDB monitor generates the events that the OVS agent
needs to process (device added or updated). Instead of
polling the agent processes the queue of events.

Change-Id: I168a3cc3aa96a809153a30635ad7bda29e8ee47c
Partially-Implements: blueprint restructure-l2-agent

neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/functional/agent/l2/base.py
neutron/tests/functional/agent/test_l2_ovs_agent.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py

index 230fbb20dfa3543fd11ed275c0684c6c188f3409..8078b4dafb7f34f23acacc685b383d38f1f4a873 100644 (file)
@@ -34,6 +34,7 @@ from neutron.agent.common import polling
 from neutron.agent.common import utils
 from neutron.agent.l2.extensions import manager as ext_manager
 from neutron.agent.linux import ip_lib
+from neutron.agent.linux import polling as linux_polling
 from neutron.agent import rpc as agent_rpc
 from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.api.rpc.handlers import dvr_rpc
@@ -1212,6 +1213,88 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         port_info['removed'] = registered_ports - cur_ports
         return port_info
 
+    def process_ports_events(self, events, registered_ports, ancillary_ports,
+                             updated_ports=None):
+        port_info = {}
+        port_info['added'] = set()
+        port_info['removed'] = set()
+        port_info['current'] = registered_ports
+
+        ancillary_port_info = {}
+        ancillary_port_info['added'] = set()
+        ancillary_port_info['removed'] = set()
+        ancillary_port_info['current'] = (
+            ancillary_ports if ancillary_ports else set())
+
+        # if a port was added and then removed or viceversa since the agent
+        # can't know the order of the operations, check the status of the port
+        # to determine if the port was added or deleted
+        device_removed_or_added = [
+            dev for dev in events['added'] if dev in events['removed']]
+        for device in device_removed_or_added:
+            if ovs_lib.BaseOVS().port_exists(device['name']):
+                events['removed'].remove(device)
+            else:
+                events['added'].remove(device)
+
+        #TODO(rossella_s): scanning the ancillary bridge won't be needed
+        # anymore when https://review.openstack.org/#/c/203381 since the bridge
+        # id stored in external_ids will be used to identify the bridge the
+        # port belongs to
+        cur_ancillary_ports = set()
+        for bridge in self.ancillary_brs:
+            cur_ancillary_ports |= bridge.get_vif_port_set()
+        cur_ancillary_ports |= ancillary_port_info['current']
+
+        def _process_device(device, devices, ancillary_devices):
+            # check 'iface-id' is set otherwise is not a port
+            # the agent should care about
+            if 'attached-mac' in device.get('external_ids', []):
+                iface_id = self.int_br.portid_from_external_ids(
+                    device['external_ids'])
+                if iface_id:
+                    if device['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
+                        #TODO(rossella_s) it's extreme to trigger a full resync
+                        # if a port is not ready, resync only the device that
+                        # is not ready
+                        raise Exception(
+                            _("Port %s is not ready, resync needed") % device[
+                                  'name'])
+                    # check if device belong to ancillary bridge
+                    if iface_id in cur_ancillary_ports:
+                        ancillary_devices.add(iface_id)
+                    else:
+                        devices.add(iface_id)
+
+        for device in events['added']:
+            _process_device(device, port_info['added'],
+                            ancillary_port_info['added'])
+        for device in events['removed']:
+            _process_device(device, port_info['removed'],
+                            ancillary_port_info['removed'])
+
+        if updated_ports is None:
+            updated_ports = set()
+        updated_ports.update(self.check_changed_vlans())
+
+        # Disregard devices that were never noticed by the agent
+        port_info['removed'] &= port_info['current']
+        port_info['current'] |= port_info['added']
+        port_info['current'] -= port_info['removed']
+
+        ancillary_port_info['removed'] &= ancillary_port_info['current']
+        ancillary_port_info['current'] |= ancillary_port_info['added']
+        ancillary_port_info['current'] -= ancillary_port_info['removed']
+
+        if updated_ports:
+            # Some updated ports might have been removed in the
+            # meanwhile, and therefore should not be processed.
+            # In this case the updated port won't be found among
+            # current ports.
+            updated_ports &= port_info['current']
+            port_info['updated'] = updated_ports
+        return port_info, ancillary_port_info
+
     def scan_ports(self, registered_ports, sync, updated_ports=None):
         cur_ports = self.int_br.get_vif_port_set()
         self.int_br_device_count = len(cur_ports)
@@ -1656,12 +1739,64 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
                 bridge.cleanup_flows()
 
+    def process_port_info(self, start, polling_manager, sync, ovs_restarted,
+                       ports, ancillary_ports, updated_ports_copy,
+                       consecutive_resyncs):
+        # There are polling managers that don't have get_events, e.g.
+        # AlwaysPoll used by windows implementations
+        # REVISIT (rossella_s) This needs to be reworked to hide implementation
+        # details regarding polling in BasePollingManager subclasses
+        if sync or not (hasattr(polling_manager, 'get_events')):
+            if sync:
+                LOG.info(_LI("Agent out of sync with plugin!"))
+                consecutive_resyncs = consecutive_resyncs + 1
+                if (consecutive_resyncs >=
+                        constants.MAX_DEVICE_RETRIES):
+                    LOG.warn(_LW(
+                        "Clearing cache of registered ports,"
+                        " retries to resync were > %s"),
+                             constants.MAX_DEVICE_RETRIES)
+                    ports.clear()
+                    ancillary_ports.clear()
+                    consecutive_resyncs = 0
+            else:
+                consecutive_resyncs = 0
+
+            # NOTE(rossella_s) don't empty the queue of events
+            # calling polling_manager.get_events() since
+            # the agent might miss some event (for example a port
+            # deletion)
+            reg_ports = (set() if ovs_restarted else ports)
+            port_info = self.scan_ports(reg_ports, sync,
+                                        updated_ports_copy)
+            # Treat ancillary devices if they exist
+            if self.ancillary_brs:
+                ancillary_port_info = self.scan_ancillary_ports(
+                    ancillary_ports, sync)
+                LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
+                          " - ancillary port info retrieved. "
+                          "Elapsed:%(elapsed).3f",
+                          {'iter_num': self.iter_num,
+                           'elapsed': time.time() - start})
+            else:
+                ancillary_port_info = {}
+
+        else:
+            consecutive_resyncs = 0
+            events = polling_manager.get_events()
+            ancillary_ports = (
+                ancillary_ports if self.ancillary_brs else None)
+            port_info, ancillary_port_info = (
+                self.process_ports_events(events, ports,
+                                          ancillary_ports, updated_ports_copy))
+        return port_info, ancillary_port_info, consecutive_resyncs
+
     def rpc_loop(self, polling_manager=None):
         if not polling_manager:
             polling_manager = polling.get_polling_manager(
                 minimize_polling=False)
 
-        sync = True
+        sync = False
         ports = set()
         updated_ports_copy = set()
         ancillary_ports = set()
@@ -1674,20 +1809,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             start = time.time()
             LOG.debug("Agent rpc_loop - iteration:%d started",
                       self.iter_num)
-            if sync:
-                LOG.info(_LI("Agent out of sync with plugin!"))
-                polling_manager.force_polling()
-                consecutive_resyncs = consecutive_resyncs + 1
-                if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
-                    LOG.warn(_LW("Clearing cache of registered ports, retrials"
-                                 " to resync were > %s"),
-                             constants.MAX_DEVICE_RETRIES)
-                    ports.clear()
-                    ancillary_ports.clear()
-                    sync = False
-                    consecutive_resyncs = 0
-            else:
-                consecutive_resyncs = 0
             ovs_status = self.check_ovs_status()
             if ovs_status == constants.OVS_RESTARTED:
                 self.setup_integration_br()
@@ -1703,6 +1824,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                                                  self.patch_tun_ofport)
                     self.dvr_agent.reset_dvr_parameters()
                     self.dvr_agent.setup_dvr_flows()
+                # restart the polling manager so that it will signal as added
+                # all the current ports
+                # REVISIT (rossella_s) Define a method "reset" in
+                # BasePollingManager that will be implemented by AlwaysPoll as
+                # no action and by InterfacePollingMinimizer as start/stop
+                if isinstance(
+                    polling_manager, linux_polling.InterfacePollingMinimizer):
+                    polling_manager.stop()
+                    polling_manager.start()
             elif ovs_status == constants.OVS_DEAD:
                 # Agent doesn't apply any operations when ovs is dead, to
                 # prevent unexpected failure or crash. Sleep and continue
@@ -1719,7 +1849,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                     LOG.exception(_LE("Error while synchronizing tunnels"))
                     tunnel_sync = True
             ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
-            if self._agent_has_updates(polling_manager) or ovs_restarted:
+            if self._agent_has_updates(polling_manager) or sync:
                 try:
                     LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
                               "starting polling. Elapsed:%(elapsed).3f",
@@ -1731,9 +1861,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                     # between these two statements, this will be thread-safe
                     updated_ports_copy = self.updated_ports
                     self.updated_ports = set()
-                    reg_ports = (set() if ovs_restarted else ports)
-                    port_info = self.scan_ports(reg_ports, sync,
-                                                updated_ports_copy)
+                    port_info, ancillary_port_info, consecutive_resyncs = (
+                        self.process_port_info(
+                            start, polling_manager, sync, ovs_restarted,
+                            ports, ancillary_ports, updated_ports_copy,
+                            consecutive_resyncs)
+                    )
+
                     self.process_deleted_ports(port_info)
                     ofport_changed_ports = self.update_stale_ofport_rules()
                     if ofport_changed_ports:
@@ -1744,16 +1878,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                               "Elapsed:%(elapsed).3f",
                               {'iter_num': self.iter_num,
                                'elapsed': time.time() - start})
-                    # Treat ancillary devices if they exist
-                    if self.ancillary_brs:
-                        ancillary_port_info = self.scan_ancillary_ports(
-                            ancillary_ports, sync)
-                        LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
-                                  "ancillary port info retrieved. "
-                                  "Elapsed:%(elapsed).3f",
-                                  {'iter_num': self.iter_num,
-                                   'elapsed': time.time() - start})
-                    sync = False
                     # Secure and wire/unwire VIFs and update their status
                     # on Neutron server
                     if (self._port_info_has_changes(port_info) or
index 39e511b11dffd737ca1c54ab772978f0db60de9c..9fa7540d4675dfdc0fadf7cf25ab125222809ba8 100644 (file)
@@ -118,13 +118,38 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
         if tunnel_types:
             self.addCleanup(self.ovs.delete_bridge, self.br_tun)
         agent.sg_agent = mock.Mock()
+        agent.ancillary_brs = None
         return agent
 
-    def start_agent(self, agent, unplug_ports=None):
+    def _mock_get_events(self, agent, polling_manager, ports):
+        get_events = polling_manager.get_events
+        p_ids = [p['id'] for p in ports]
+
+        def filter_events():
+            events = get_events()
+            filtered_ports = []
+            for dev in events['added']:
+                iface_id = agent.int_br.portid_from_external_ids(
+                    dev.get('external_ids', []))
+                if iface_id in p_ids:
+                    # if the event is not about a port that was created by
+                    # this test, we filter the event out. Since these tests are
+                    # not run in isolation processing all the events might make
+                    # some test fail ( e.g. the agent might keep resycing
+                    # because it keeps finding not ready ports that are created
+                    # by other tests)
+                    filtered_ports.append(dev)
+            return {'added': filtered_ports, 'removed': events['removed']}
+        polling_manager.get_events = mock.Mock(side_effect=filter_events)
+
+    def start_agent(self, agent, ports=None, unplug_ports=None):
         if unplug_ports is None:
             unplug_ports = []
+        if ports is None:
+            ports = []
         self.setup_agent_rpc_mocks(agent, unplug_ports)
         polling_manager = polling.InterfacePollingMinimizer()
+        self._mock_get_events(agent, polling_manager, ports)
         self.addCleanup(polling_manager.stop)
         polling_manager.start()
         agent_utils.wait_until_true(
@@ -138,6 +163,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
             rpc_loop_thread.wait()
 
         self.addCleanup(stop_agent, agent, t)
+        return polling_manager
 
     def _create_test_port_dict(self):
         return {'id': uuidutils.generate_uuid(),
@@ -280,10 +306,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
 
     def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
                               trigger_resync=False):
+        self.ports = port_dicts
         self.agent = self.create_agent(create_tunnels=create_tunnels)
-        self.start_agent(self.agent)
+        self.polling_manager = self.start_agent(self.agent, ports=self.ports)
         self.network = self._create_test_network_dict()
-        self.ports = port_dicts
         if trigger_resync:
             self._prepare_resync_trigger(self.agent)
         self._plug_ports(self.network, self.ports, self.agent)
index b57a8f93158249eec8a0f922ef5cb3c18255f60b..ab59680ac213c6894dec57c9ce013be827046d21 100644 (file)
@@ -73,7 +73,6 @@ class TestOVSAgent(base.OVSAgentTestFramework):
             port_dicts=self.create_test_ports())
         self.wait_until_ports_state(self.ports, up=True)
         self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED
-        # OVS restarted, the agent should reprocess all the ports
         self.agent.plugin_rpc.update_device_list.reset_mock()
         self.wait_until_ports_state(self.ports, up=True)
 
index 322e4b921af4f31f74c47a1abda1f4476885d7fe..d4cf3877bf56c71368930c3377627e9b6a2d711b 100644 (file)
@@ -401,6 +401,130 @@ class TestOvsNeutronAgent(object):
                                       updated_ports)
         self.assertEqual(expected, actual)
 
+    def test_process_ports_events_returns_current_for_unchanged_ports(self):
+        with mock.patch.object(self.agent, 'check_changed_vlans',
+                               return_value=set()):
+            events = {'added': [], 'removed': []}
+            registered_ports = {1, 3}
+            ancillary_ports = {2, 5}
+            expected_ports = {'current': registered_ports, 'added': set(),
+                              'removed': set()}
+            expected_ancillary = {'current': ancillary_ports, 'added': set(),
+                                  'removed': set()}
+            actual = self.agent.process_ports_events(events, registered_ports,
+                                                     ancillary_ports)
+            self.assertEqual((expected_ports, expected_ancillary), actual)
+
+    def test_process_port_events_returns_port_changes(self):
+        events = {'added': [{'name': 'port3', 'ofport': 3,
+                             'external_ids': {'attached-mac': 'test-mac'}},
+                            {'name': 'qg-port2', 'ofport': 5,
+                             'external_ids': {'attached-mac': 'test-mac'}}],
+                  'removed': [{'name': 'port2', 'ofport': 2,
+                              'external_ids': {'attached-mac': 'test-mac'}},
+                              {'name': 'qg-port1', 'ofport': 4,
+                               'external_ids': {'attached-mac': 'test-mac'}}]}
+        registered_ports = {1, 2}
+        ancillary_ports = {4}
+        expected_ports = dict(
+            current={1, 3}, added={3}, removed={2})
+        expected_ancillary_ports = dict(
+            current={5}, added={5}, removed={4})
+        ancillary_bridge = mock.Mock()
+        ancillary_bridge.get_vif_port_set.return_value = {4, 5}
+        self.agent.ancillary_brs = [ancillary_bridge]
+        with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+                              side_effect=[3, 5, 2, 4]), \
+            mock.patch.object(self.agent, 'check_changed_vlans',
+                              return_value=set()):
+            actual = self.agent.process_ports_events(
+                 events, registered_ports, ancillary_ports)
+            self.assertEqual(
+                (expected_ports, expected_ancillary_ports), actual)
+
+    def _test_process_port_events_with_updated_ports(self, updated_ports):
+        events = {'added': [{'name': 'port3', 'ofport': 3,
+                            'external_ids': {'attached-mac': 'test-mac'}},
+                            {'name': 'qg-port2', 'ofport': 6,
+                             'external_ids': {'attached-mac': 'test-mac'}}],
+                  'removed': [{'name': 'port2', 'ofport': 2,
+                               'external_ids': {'attached-mac': 'test-mac'}},
+                              {'name': 'qg-port1', 'ofport': 5,
+                               'external_ids': {'attached-mac': 'test-mac'}}]}
+        registered_ports = {1, 2, 4}
+        ancillary_ports = {5, 8}
+        expected_ports = dict(current={1, 3, 4}, added={3},
+                              removed={2}, updated={4})
+        expected_ancillary = dict(current={6, 8}, added={6},
+                                  removed={5})
+        ancillary_bridge = mock.Mock()
+        ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
+        self.agent.ancillary_brs = [ancillary_bridge]
+        with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+                              side_effect=[3, 6, 2, 5]), \
+            mock.patch.object(self.agent, 'check_changed_vlans',
+                              return_value=set()):
+
+            actual = self.agent.process_ports_events(
+                events, registered_ports, ancillary_ports, updated_ports)
+            self.assertEqual((expected_ports, expected_ancillary), actual)
+
+    def test_process_port_events_finds_known_updated_ports(self):
+        self._test_process_port_events_with_updated_ports({4})
+
+    def test_process_port_events_ignores_unknown_updated_ports(self):
+        # the port '5' was not seen on current ports. Hence it has either
+        # never been wired or already removed and should be ignored
+        self._test_process_port_events_with_updated_ports({4, 5})
+
+    def test_process_port_events_ignores_updated_port_if_removed(self):
+        events = {'added': [{'name': 'port3', 'ofport': 3,
+                             'external_ids': {'attached-mac': 'test-mac'}}],
+                  'removed': [{'name': 'port2', 'ofport': 2,
+                               'external_ids': {'attached-mac': 'test-mac'}}]}
+        registered_ports = {1, 2}
+        updated_ports = {1, 2}
+        expected_ports = dict(current={1, 3}, added={3},
+                              removed={2}, updated={1})
+        expected_ancillary = dict(current=set(), added=set(), removed=set())
+        with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+                              side_effect=[3, 2]), \
+            mock.patch.object(self.agent, 'check_changed_vlans',
+                              return_value=set()):
+
+            actual = self.agent.process_ports_events(
+                events, registered_ports, None, updated_ports)
+            self.assertEqual((expected_ports, expected_ancillary), actual)
+
+    def test_process_port_events_no_vif_changes_return_updated_port_only(self):
+        events = {'added': [], 'removed': []}
+        registered_ports = {1, 2, 3}
+        updated_ports = {2}
+        expected_ports = dict(current=registered_ports, updated={2},
+                              added=set(), removed=set())
+        expected_ancillary = dict(current=set(), added=set(), removed=set())
+        with mock.patch.object(self.agent, 'check_changed_vlans',
+                               return_value=set()):
+            actual = self.agent.process_ports_events(
+                events, registered_ports, None, updated_ports)
+            self.assertEqual((expected_ports, expected_ancillary), actual)
+
+    def test_process_port_events_ignores_removed_port_if_never_added(self):
+        events = {'added': [],
+                  'removed': [{'name': 'port2', 'ofport': 2,
+                               'external_ids': {'attached-mac': 'test-mac'}}]}
+        registered_ports = {1}
+        expected_ports = dict(current=registered_ports, added=set(),
+                              removed=set())
+        expected_ancillary = dict(current=set(), added=set(), removed=set())
+        with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+                              side_effect=[2]), \
+            mock.patch.object(self.agent, 'check_changed_vlans',
+                              return_value=set()):
+            actual = self.agent.process_ports_events(events, registered_ports,
+                                                     None)
+            self.assertEqual((expected_ports, expected_ancillary), actual)
+
     def test_update_ports_returns_changed_vlan(self):
         br = self.br_int_cls('br-int')
         mac = "ca:fe:de:ad:be:ef"
@@ -1371,11 +1495,17 @@ class TestOvsNeutronAgent(object):
                   'added': set([]),
                   'removed': set(['tap0'])}
 
+        reply_ancillary = {'current': set([]),
+                           'added': set([]),
+                           'removed': set([])}
+
         with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
+                mock.patch.object(async_process.AsyncProcess, "start"),\
+                mock.patch.object(async_process.AsyncProcess, "stop"),\
                 mock.patch.object(log.KeywordArgumentAdapter,
                                   'exception') as log_exception,\
                 mock.patch.object(self.mod_agent.OVSNeutronAgent,
-                                  'scan_ports') as scan_ports,\
+                                  'process_ports_events') as process_p_events,\
                 mock.patch.object(
                     self.mod_agent.OVSNeutronAgent,
                     'process_network_ports') as process_network_ports,\
@@ -1393,7 +1523,8 @@ class TestOvsNeutronAgent(object):
                                   'cleanup_stale_flows') as cleanup:
             log_exception.side_effect = Exception(
                 'Fake exception to get out of the loop')
-            scan_ports.side_effect = [reply2, reply3]
+            process_p_events.side_effect = [(reply2, reply_ancillary),
+                                            (reply3, reply_ancillary)]
             process_network_ports.side_effect = [
                 False, Exception('Fake exception to get out of the loop')]
             check_ovs_status.side_effect = args
@@ -1402,10 +1533,12 @@ class TestOvsNeutronAgent(object):
             except Exception:
                 pass
 
-            scan_ports.assert_has_calls([
-                mock.call(set(), True, set()),
-                mock.call(set(), False, set())
+            process_p_events.assert_has_calls([
+                mock.call({'removed': [], 'added': []}, set(), None, set()),
+                mock.call({'removed': [], 'added': []}, set(['tap0']), None,
+                          set())
             ])
+
             process_network_ports.assert_has_calls([
                 mock.call(reply2, False),
                 mock.call(reply3, True)
index 079d2dea083e74df087daa34e887c574c5d10749..7bae6df2ca66cb37844c5e97d79d7d9406ef6139 100644 (file)
@@ -495,14 +495,24 @@ class TunnelTest(object):
         self._verify_mock_calls()
 
     def test_daemon_loop(self):
-        reply2 = {'current': set(['tap0']),
-                  'added': set(['tap2']),
-                  'removed': set([])}
+        reply_ge_1 = {'added': set(['tap0']),
+                      'removed': set([])}
 
-        reply3 = {'current': set(['tap2']),
-                  'added': set([]),
+        reply_ge_2 = {'added': set([]),
                   'removed': set(['tap0'])}
 
+        reply_pe_1 = {'current': set(['tap0']),
+                      'added': set(['tap0']),
+                      'removed': set([])}
+
+        reply_pe_2 = {'current': set([]),
+                      'added': set([]),
+                      'removed': set(['tap0'])}
+
+        reply_ancillary = {'current': set([]),
+                           'added': set([]),
+                           'removed': set([])}
+
         self.mock_int_bridge_expected += [
             mock.call.check_canary_table(),
             mock.call.check_canary_table()
@@ -513,7 +523,7 @@ class TunnelTest(object):
         with mock.patch.object(log.KeywordArgumentAdapter,
                                'exception') as log_exception,\
                 mock.patch.object(self.mod_agent.OVSNeutronAgent,
-                                  'scan_ports') as scan_ports,\
+                                  'process_ports_events') as process_p_events,\
                 mock.patch.object(
                     self.mod_agent.OVSNeutronAgent,
                     'process_network_ports') as process_network_ports,\
@@ -528,8 +538,11 @@ class TunnelTest(object):
                     'cleanup_stale_flows') as cleanup:
             log_exception.side_effect = Exception(
                 'Fake exception to get out of the loop')
-            scan_ports.side_effect = [reply2, reply3]
             update_stale.return_value = []
+            process_p_events.side_effect = [
+                (reply_pe_1, reply_ancillary), (reply_pe_2, reply_ancillary)]
+            interface_polling = mock.Mock()
+            interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
             process_network_ports.side_effect = [
                 False, Exception('Fake exception to get out of the loop')]
 
@@ -539,7 +552,7 @@ class TunnelTest(object):
             # We start method and expect it will raise after 2nd loop
             # If something goes wrong, assert_has_calls below will catch it
             try:
-                n_agent.daemon_loop()
+                n_agent.rpc_loop(interface_polling)
             except Exception:
                 pass
 
@@ -547,17 +560,14 @@ class TunnelTest(object):
             # messages
             log_exception.assert_called_once_with(
                 "Error while processing VIF ports")
-            scan_ports.assert_has_calls([
-                mock.call(set(), True, set()),
-                mock.call(set(['tap0']), False, set())
+            process_p_events.assert_has_calls([
+                mock.call(reply_ge_1, set(), None, set()),
+                mock.call(reply_ge_2, set(['tap0']), None, set())
             ])
             process_network_ports.assert_has_calls([
                 mock.call({'current': set(['tap0']),
                            'removed': set([]),
-                           'added': set(['tap2'])}, False),
-                mock.call({'current': set(['tap2']),
-                           'removed': set(['tap0']),
-                           'added': set([])}, False)
+                           'added': set(['tap0'])}, False),
             ])
 
             cleanup.assert_called_once_with()