]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Revert "OVS agent reacts to events instead of polling"
authorArmando Migliaccio <armamig@gmail.com>
Tue, 10 Nov 2015 20:03:10 +0000 (20:03 +0000)
committerCarl Baldwin <carl@ecbaldwin.net>
Wed, 11 Nov 2015 05:16:53 +0000 (05:16 +0000)
This might be associated to manifestation of bug #1514935

This reverts commit 1992d52d63dc32c63faa5a3f482d5b8ebe925a77.

Closes-Bug: #1514935
Change-Id: If01cc87b6735e1bc039f99c4c6121e7c5ce547d0

neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/functional/agent/l2/base.py
neutron/tests/functional/agent/test_l2_ovs_agent.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py

index 8078b4dafb7f34f23acacc685b383d38f1f4a873..230fbb20dfa3543fd11ed275c0684c6c188f3409 100644 (file)
@@ -34,7 +34,6 @@ from neutron.agent.common import polling
 from neutron.agent.common import utils
 from neutron.agent.l2.extensions import manager as ext_manager
 from neutron.agent.linux import ip_lib
-from neutron.agent.linux import polling as linux_polling
 from neutron.agent import rpc as agent_rpc
 from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.api.rpc.handlers import dvr_rpc
@@ -1213,88 +1212,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         port_info['removed'] = registered_ports - cur_ports
         return port_info
 
-    def process_ports_events(self, events, registered_ports, ancillary_ports,
-                             updated_ports=None):
-        port_info = {}
-        port_info['added'] = set()
-        port_info['removed'] = set()
-        port_info['current'] = registered_ports
-
-        ancillary_port_info = {}
-        ancillary_port_info['added'] = set()
-        ancillary_port_info['removed'] = set()
-        ancillary_port_info['current'] = (
-            ancillary_ports if ancillary_ports else set())
-
-        # if a port was added and then removed or viceversa since the agent
-        # can't know the order of the operations, check the status of the port
-        # to determine if the port was added or deleted
-        device_removed_or_added = [
-            dev for dev in events['added'] if dev in events['removed']]
-        for device in device_removed_or_added:
-            if ovs_lib.BaseOVS().port_exists(device['name']):
-                events['removed'].remove(device)
-            else:
-                events['added'].remove(device)
-
-        #TODO(rossella_s): scanning the ancillary bridge won't be needed
-        # anymore when https://review.openstack.org/#/c/203381 since the bridge
-        # id stored in external_ids will be used to identify the bridge the
-        # port belongs to
-        cur_ancillary_ports = set()
-        for bridge in self.ancillary_brs:
-            cur_ancillary_ports |= bridge.get_vif_port_set()
-        cur_ancillary_ports |= ancillary_port_info['current']
-
-        def _process_device(device, devices, ancillary_devices):
-            # check 'iface-id' is set otherwise is not a port
-            # the agent should care about
-            if 'attached-mac' in device.get('external_ids', []):
-                iface_id = self.int_br.portid_from_external_ids(
-                    device['external_ids'])
-                if iface_id:
-                    if device['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
-                        #TODO(rossella_s) it's extreme to trigger a full resync
-                        # if a port is not ready, resync only the device that
-                        # is not ready
-                        raise Exception(
-                            _("Port %s is not ready, resync needed") % device[
-                                  'name'])
-                    # check if device belong to ancillary bridge
-                    if iface_id in cur_ancillary_ports:
-                        ancillary_devices.add(iface_id)
-                    else:
-                        devices.add(iface_id)
-
-        for device in events['added']:
-            _process_device(device, port_info['added'],
-                            ancillary_port_info['added'])
-        for device in events['removed']:
-            _process_device(device, port_info['removed'],
-                            ancillary_port_info['removed'])
-
-        if updated_ports is None:
-            updated_ports = set()
-        updated_ports.update(self.check_changed_vlans())
-
-        # Disregard devices that were never noticed by the agent
-        port_info['removed'] &= port_info['current']
-        port_info['current'] |= port_info['added']
-        port_info['current'] -= port_info['removed']
-
-        ancillary_port_info['removed'] &= ancillary_port_info['current']
-        ancillary_port_info['current'] |= ancillary_port_info['added']
-        ancillary_port_info['current'] -= ancillary_port_info['removed']
-
-        if updated_ports:
-            # Some updated ports might have been removed in the
-            # meanwhile, and therefore should not be processed.
-            # In this case the updated port won't be found among
-            # current ports.
-            updated_ports &= port_info['current']
-            port_info['updated'] = updated_ports
-        return port_info, ancillary_port_info
-
     def scan_ports(self, registered_ports, sync, updated_ports=None):
         cur_ports = self.int_br.get_vif_port_set()
         self.int_br_device_count = len(cur_ports)
@@ -1739,64 +1656,12 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
                 bridge.cleanup_flows()
 
-    def process_port_info(self, start, polling_manager, sync, ovs_restarted,
-                       ports, ancillary_ports, updated_ports_copy,
-                       consecutive_resyncs):
-        # There are polling managers that don't have get_events, e.g.
-        # AlwaysPoll used by windows implementations
-        # REVISIT (rossella_s) This needs to be reworked to hide implementation
-        # details regarding polling in BasePollingManager subclasses
-        if sync or not (hasattr(polling_manager, 'get_events')):
-            if sync:
-                LOG.info(_LI("Agent out of sync with plugin!"))
-                consecutive_resyncs = consecutive_resyncs + 1
-                if (consecutive_resyncs >=
-                        constants.MAX_DEVICE_RETRIES):
-                    LOG.warn(_LW(
-                        "Clearing cache of registered ports,"
-                        " retries to resync were > %s"),
-                             constants.MAX_DEVICE_RETRIES)
-                    ports.clear()
-                    ancillary_ports.clear()
-                    consecutive_resyncs = 0
-            else:
-                consecutive_resyncs = 0
-
-            # NOTE(rossella_s) don't empty the queue of events
-            # calling polling_manager.get_events() since
-            # the agent might miss some event (for example a port
-            # deletion)
-            reg_ports = (set() if ovs_restarted else ports)
-            port_info = self.scan_ports(reg_ports, sync,
-                                        updated_ports_copy)
-            # Treat ancillary devices if they exist
-            if self.ancillary_brs:
-                ancillary_port_info = self.scan_ancillary_ports(
-                    ancillary_ports, sync)
-                LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
-                          " - ancillary port info retrieved. "
-                          "Elapsed:%(elapsed).3f",
-                          {'iter_num': self.iter_num,
-                           'elapsed': time.time() - start})
-            else:
-                ancillary_port_info = {}
-
-        else:
-            consecutive_resyncs = 0
-            events = polling_manager.get_events()
-            ancillary_ports = (
-                ancillary_ports if self.ancillary_brs else None)
-            port_info, ancillary_port_info = (
-                self.process_ports_events(events, ports,
-                                          ancillary_ports, updated_ports_copy))
-        return port_info, ancillary_port_info, consecutive_resyncs
-
     def rpc_loop(self, polling_manager=None):
         if not polling_manager:
             polling_manager = polling.get_polling_manager(
                 minimize_polling=False)
 
-        sync = False
+        sync = True
         ports = set()
         updated_ports_copy = set()
         ancillary_ports = set()
@@ -1809,6 +1674,20 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             start = time.time()
             LOG.debug("Agent rpc_loop - iteration:%d started",
                       self.iter_num)
+            if sync:
+                LOG.info(_LI("Agent out of sync with plugin!"))
+                polling_manager.force_polling()
+                consecutive_resyncs = consecutive_resyncs + 1
+                if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
+                    LOG.warn(_LW("Clearing cache of registered ports, retrials"
+                                 " to resync were > %s"),
+                             constants.MAX_DEVICE_RETRIES)
+                    ports.clear()
+                    ancillary_ports.clear()
+                    sync = False
+                    consecutive_resyncs = 0
+            else:
+                consecutive_resyncs = 0
             ovs_status = self.check_ovs_status()
             if ovs_status == constants.OVS_RESTARTED:
                 self.setup_integration_br()
@@ -1824,15 +1703,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                                                  self.patch_tun_ofport)
                     self.dvr_agent.reset_dvr_parameters()
                     self.dvr_agent.setup_dvr_flows()
-                # restart the polling manager so that it will signal as added
-                # all the current ports
-                # REVISIT (rossella_s) Define a method "reset" in
-                # BasePollingManager that will be implemented by AlwaysPoll as
-                # no action and by InterfacePollingMinimizer as start/stop
-                if isinstance(
-                    polling_manager, linux_polling.InterfacePollingMinimizer):
-                    polling_manager.stop()
-                    polling_manager.start()
             elif ovs_status == constants.OVS_DEAD:
                 # Agent doesn't apply any operations when ovs is dead, to
                 # prevent unexpected failure or crash. Sleep and continue
@@ -1849,7 +1719,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                     LOG.exception(_LE("Error while synchronizing tunnels"))
                     tunnel_sync = True
             ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
-            if self._agent_has_updates(polling_manager) or sync:
+            if self._agent_has_updates(polling_manager) or ovs_restarted:
                 try:
                     LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
                               "starting polling. Elapsed:%(elapsed).3f",
@@ -1861,13 +1731,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                     # between these two statements, this will be thread-safe
                     updated_ports_copy = self.updated_ports
                     self.updated_ports = set()
-                    port_info, ancillary_port_info, consecutive_resyncs = (
-                        self.process_port_info(
-                            start, polling_manager, sync, ovs_restarted,
-                            ports, ancillary_ports, updated_ports_copy,
-                            consecutive_resyncs)
-                    )
-
+                    reg_ports = (set() if ovs_restarted else ports)
+                    port_info = self.scan_ports(reg_ports, sync,
+                                                updated_ports_copy)
                     self.process_deleted_ports(port_info)
                     ofport_changed_ports = self.update_stale_ofport_rules()
                     if ofport_changed_ports:
@@ -1878,6 +1744,16 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                               "Elapsed:%(elapsed).3f",
                               {'iter_num': self.iter_num,
                                'elapsed': time.time() - start})
+                    # Treat ancillary devices if they exist
+                    if self.ancillary_brs:
+                        ancillary_port_info = self.scan_ancillary_ports(
+                            ancillary_ports, sync)
+                        LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
+                                  "ancillary port info retrieved. "
+                                  "Elapsed:%(elapsed).3f",
+                                  {'iter_num': self.iter_num,
+                                   'elapsed': time.time() - start})
+                    sync = False
                     # Secure and wire/unwire VIFs and update their status
                     # on Neutron server
                     if (self._port_info_has_changes(port_info) or
index 9fa7540d4675dfdc0fadf7cf25ab125222809ba8..39e511b11dffd737ca1c54ab772978f0db60de9c 100644 (file)
@@ -118,38 +118,13 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
         if tunnel_types:
             self.addCleanup(self.ovs.delete_bridge, self.br_tun)
         agent.sg_agent = mock.Mock()
-        agent.ancillary_brs = None
         return agent
 
-    def _mock_get_events(self, agent, polling_manager, ports):
-        get_events = polling_manager.get_events
-        p_ids = [p['id'] for p in ports]
-
-        def filter_events():
-            events = get_events()
-            filtered_ports = []
-            for dev in events['added']:
-                iface_id = agent.int_br.portid_from_external_ids(
-                    dev.get('external_ids', []))
-                if iface_id in p_ids:
-                    # if the event is not about a port that was created by
-                    # this test, we filter the event out. Since these tests are
-                    # not run in isolation processing all the events might make
-                    # some test fail ( e.g. the agent might keep resycing
-                    # because it keeps finding not ready ports that are created
-                    # by other tests)
-                    filtered_ports.append(dev)
-            return {'added': filtered_ports, 'removed': events['removed']}
-        polling_manager.get_events = mock.Mock(side_effect=filter_events)
-
-    def start_agent(self, agent, ports=None, unplug_ports=None):
+    def start_agent(self, agent, unplug_ports=None):
         if unplug_ports is None:
             unplug_ports = []
-        if ports is None:
-            ports = []
         self.setup_agent_rpc_mocks(agent, unplug_ports)
         polling_manager = polling.InterfacePollingMinimizer()
-        self._mock_get_events(agent, polling_manager, ports)
         self.addCleanup(polling_manager.stop)
         polling_manager.start()
         agent_utils.wait_until_true(
@@ -163,7 +138,6 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
             rpc_loop_thread.wait()
 
         self.addCleanup(stop_agent, agent, t)
-        return polling_manager
 
     def _create_test_port_dict(self):
         return {'id': uuidutils.generate_uuid(),
@@ -306,10 +280,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
 
     def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
                               trigger_resync=False):
-        self.ports = port_dicts
         self.agent = self.create_agent(create_tunnels=create_tunnels)
-        self.polling_manager = self.start_agent(self.agent, ports=self.ports)
+        self.start_agent(self.agent)
         self.network = self._create_test_network_dict()
+        self.ports = port_dicts
         if trigger_resync:
             self._prepare_resync_trigger(self.agent)
         self._plug_ports(self.network, self.ports, self.agent)
index ab59680ac213c6894dec57c9ce013be827046d21..b57a8f93158249eec8a0f922ef5cb3c18255f60b 100644 (file)
@@ -73,6 +73,7 @@ class TestOVSAgent(base.OVSAgentTestFramework):
             port_dicts=self.create_test_ports())
         self.wait_until_ports_state(self.ports, up=True)
         self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED
+        # OVS restarted, the agent should reprocess all the ports
         self.agent.plugin_rpc.update_device_list.reset_mock()
         self.wait_until_ports_state(self.ports, up=True)
 
index d4cf3877bf56c71368930c3377627e9b6a2d711b..322e4b921af4f31f74c47a1abda1f4476885d7fe 100644 (file)
@@ -401,130 +401,6 @@ class TestOvsNeutronAgent(object):
                                       updated_ports)
         self.assertEqual(expected, actual)
 
-    def test_process_ports_events_returns_current_for_unchanged_ports(self):
-        with mock.patch.object(self.agent, 'check_changed_vlans',
-                               return_value=set()):
-            events = {'added': [], 'removed': []}
-            registered_ports = {1, 3}
-            ancillary_ports = {2, 5}
-            expected_ports = {'current': registered_ports, 'added': set(),
-                              'removed': set()}
-            expected_ancillary = {'current': ancillary_ports, 'added': set(),
-                                  'removed': set()}
-            actual = self.agent.process_ports_events(events, registered_ports,
-                                                     ancillary_ports)
-            self.assertEqual((expected_ports, expected_ancillary), actual)
-
-    def test_process_port_events_returns_port_changes(self):
-        events = {'added': [{'name': 'port3', 'ofport': 3,
-                             'external_ids': {'attached-mac': 'test-mac'}},
-                            {'name': 'qg-port2', 'ofport': 5,
-                             'external_ids': {'attached-mac': 'test-mac'}}],
-                  'removed': [{'name': 'port2', 'ofport': 2,
-                              'external_ids': {'attached-mac': 'test-mac'}},
-                              {'name': 'qg-port1', 'ofport': 4,
-                               'external_ids': {'attached-mac': 'test-mac'}}]}
-        registered_ports = {1, 2}
-        ancillary_ports = {4}
-        expected_ports = dict(
-            current={1, 3}, added={3}, removed={2})
-        expected_ancillary_ports = dict(
-            current={5}, added={5}, removed={4})
-        ancillary_bridge = mock.Mock()
-        ancillary_bridge.get_vif_port_set.return_value = {4, 5}
-        self.agent.ancillary_brs = [ancillary_bridge]
-        with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
-                              side_effect=[3, 5, 2, 4]), \
-            mock.patch.object(self.agent, 'check_changed_vlans',
-                              return_value=set()):
-            actual = self.agent.process_ports_events(
-                 events, registered_ports, ancillary_ports)
-            self.assertEqual(
-                (expected_ports, expected_ancillary_ports), actual)
-
-    def _test_process_port_events_with_updated_ports(self, updated_ports):
-        events = {'added': [{'name': 'port3', 'ofport': 3,
-                            'external_ids': {'attached-mac': 'test-mac'}},
-                            {'name': 'qg-port2', 'ofport': 6,
-                             'external_ids': {'attached-mac': 'test-mac'}}],
-                  'removed': [{'name': 'port2', 'ofport': 2,
-                               'external_ids': {'attached-mac': 'test-mac'}},
-                              {'name': 'qg-port1', 'ofport': 5,
-                               'external_ids': {'attached-mac': 'test-mac'}}]}
-        registered_ports = {1, 2, 4}
-        ancillary_ports = {5, 8}
-        expected_ports = dict(current={1, 3, 4}, added={3},
-                              removed={2}, updated={4})
-        expected_ancillary = dict(current={6, 8}, added={6},
-                                  removed={5})
-        ancillary_bridge = mock.Mock()
-        ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
-        self.agent.ancillary_brs = [ancillary_bridge]
-        with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
-                              side_effect=[3, 6, 2, 5]), \
-            mock.patch.object(self.agent, 'check_changed_vlans',
-                              return_value=set()):
-
-            actual = self.agent.process_ports_events(
-                events, registered_ports, ancillary_ports, updated_ports)
-            self.assertEqual((expected_ports, expected_ancillary), actual)
-
-    def test_process_port_events_finds_known_updated_ports(self):
-        self._test_process_port_events_with_updated_ports({4})
-
-    def test_process_port_events_ignores_unknown_updated_ports(self):
-        # the port '5' was not seen on current ports. Hence it has either
-        # never been wired or already removed and should be ignored
-        self._test_process_port_events_with_updated_ports({4, 5})
-
-    def test_process_port_events_ignores_updated_port_if_removed(self):
-        events = {'added': [{'name': 'port3', 'ofport': 3,
-                             'external_ids': {'attached-mac': 'test-mac'}}],
-                  'removed': [{'name': 'port2', 'ofport': 2,
-                               'external_ids': {'attached-mac': 'test-mac'}}]}
-        registered_ports = {1, 2}
-        updated_ports = {1, 2}
-        expected_ports = dict(current={1, 3}, added={3},
-                              removed={2}, updated={1})
-        expected_ancillary = dict(current=set(), added=set(), removed=set())
-        with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
-                              side_effect=[3, 2]), \
-            mock.patch.object(self.agent, 'check_changed_vlans',
-                              return_value=set()):
-
-            actual = self.agent.process_ports_events(
-                events, registered_ports, None, updated_ports)
-            self.assertEqual((expected_ports, expected_ancillary), actual)
-
-    def test_process_port_events_no_vif_changes_return_updated_port_only(self):
-        events = {'added': [], 'removed': []}
-        registered_ports = {1, 2, 3}
-        updated_ports = {2}
-        expected_ports = dict(current=registered_ports, updated={2},
-                              added=set(), removed=set())
-        expected_ancillary = dict(current=set(), added=set(), removed=set())
-        with mock.patch.object(self.agent, 'check_changed_vlans',
-                               return_value=set()):
-            actual = self.agent.process_ports_events(
-                events, registered_ports, None, updated_ports)
-            self.assertEqual((expected_ports, expected_ancillary), actual)
-
-    def test_process_port_events_ignores_removed_port_if_never_added(self):
-        events = {'added': [],
-                  'removed': [{'name': 'port2', 'ofport': 2,
-                               'external_ids': {'attached-mac': 'test-mac'}}]}
-        registered_ports = {1}
-        expected_ports = dict(current=registered_ports, added=set(),
-                              removed=set())
-        expected_ancillary = dict(current=set(), added=set(), removed=set())
-        with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
-                              side_effect=[2]), \
-            mock.patch.object(self.agent, 'check_changed_vlans',
-                              return_value=set()):
-            actual = self.agent.process_ports_events(events, registered_ports,
-                                                     None)
-            self.assertEqual((expected_ports, expected_ancillary), actual)
-
     def test_update_ports_returns_changed_vlan(self):
         br = self.br_int_cls('br-int')
         mac = "ca:fe:de:ad:be:ef"
@@ -1495,17 +1371,11 @@ class TestOvsNeutronAgent(object):
                   'added': set([]),
                   'removed': set(['tap0'])}
 
-        reply_ancillary = {'current': set([]),
-                           'added': set([]),
-                           'removed': set([])}
-
         with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
-                mock.patch.object(async_process.AsyncProcess, "start"),\
-                mock.patch.object(async_process.AsyncProcess, "stop"),\
                 mock.patch.object(log.KeywordArgumentAdapter,
                                   'exception') as log_exception,\
                 mock.patch.object(self.mod_agent.OVSNeutronAgent,
-                                  'process_ports_events') as process_p_events,\
+                                  'scan_ports') as scan_ports,\
                 mock.patch.object(
                     self.mod_agent.OVSNeutronAgent,
                     'process_network_ports') as process_network_ports,\
@@ -1523,8 +1393,7 @@ class TestOvsNeutronAgent(object):
                                   'cleanup_stale_flows') as cleanup:
             log_exception.side_effect = Exception(
                 'Fake exception to get out of the loop')
-            process_p_events.side_effect = [(reply2, reply_ancillary),
-                                            (reply3, reply_ancillary)]
+            scan_ports.side_effect = [reply2, reply3]
             process_network_ports.side_effect = [
                 False, Exception('Fake exception to get out of the loop')]
             check_ovs_status.side_effect = args
@@ -1533,12 +1402,10 @@ class TestOvsNeutronAgent(object):
             except Exception:
                 pass
 
-            process_p_events.assert_has_calls([
-                mock.call({'removed': [], 'added': []}, set(), None, set()),
-                mock.call({'removed': [], 'added': []}, set(['tap0']), None,
-                          set())
+            scan_ports.assert_has_calls([
+                mock.call(set(), True, set()),
+                mock.call(set(), False, set())
             ])
-
             process_network_ports.assert_has_calls([
                 mock.call(reply2, False),
                 mock.call(reply3, True)
index 7bae6df2ca66cb37844c5e97d79d7d9406ef6139..079d2dea083e74df087daa34e887c574c5d10749 100644 (file)
@@ -495,24 +495,14 @@ class TunnelTest(object):
         self._verify_mock_calls()
 
     def test_daemon_loop(self):
-        reply_ge_1 = {'added': set(['tap0']),
-                      'removed': set([])}
+        reply2 = {'current': set(['tap0']),
+                  'added': set(['tap2']),
+                  'removed': set([])}
 
-        reply_ge_2 = {'added': set([]),
+        reply3 = {'current': set(['tap2']),
+                  'added': set([]),
                   'removed': set(['tap0'])}
 
-        reply_pe_1 = {'current': set(['tap0']),
-                      'added': set(['tap0']),
-                      'removed': set([])}
-
-        reply_pe_2 = {'current': set([]),
-                      'added': set([]),
-                      'removed': set(['tap0'])}
-
-        reply_ancillary = {'current': set([]),
-                           'added': set([]),
-                           'removed': set([])}
-
         self.mock_int_bridge_expected += [
             mock.call.check_canary_table(),
             mock.call.check_canary_table()
@@ -523,7 +513,7 @@ class TunnelTest(object):
         with mock.patch.object(log.KeywordArgumentAdapter,
                                'exception') as log_exception,\
                 mock.patch.object(self.mod_agent.OVSNeutronAgent,
-                                  'process_ports_events') as process_p_events,\
+                                  'scan_ports') as scan_ports,\
                 mock.patch.object(
                     self.mod_agent.OVSNeutronAgent,
                     'process_network_ports') as process_network_ports,\
@@ -538,11 +528,8 @@ class TunnelTest(object):
                     'cleanup_stale_flows') as cleanup:
             log_exception.side_effect = Exception(
                 'Fake exception to get out of the loop')
+            scan_ports.side_effect = [reply2, reply3]
             update_stale.return_value = []
-            process_p_events.side_effect = [
-                (reply_pe_1, reply_ancillary), (reply_pe_2, reply_ancillary)]
-            interface_polling = mock.Mock()
-            interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
             process_network_ports.side_effect = [
                 False, Exception('Fake exception to get out of the loop')]
 
@@ -552,7 +539,7 @@ class TunnelTest(object):
             # We start method and expect it will raise after 2nd loop
             # If something goes wrong, assert_has_calls below will catch it
             try:
-                n_agent.rpc_loop(interface_polling)
+                n_agent.daemon_loop()
             except Exception:
                 pass
 
@@ -560,14 +547,17 @@ class TunnelTest(object):
             # messages
             log_exception.assert_called_once_with(
                 "Error while processing VIF ports")
-            process_p_events.assert_has_calls([
-                mock.call(reply_ge_1, set(), None, set()),
-                mock.call(reply_ge_2, set(['tap0']), None, set())
+            scan_ports.assert_has_calls([
+                mock.call(set(), True, set()),
+                mock.call(set(['tap0']), False, set())
             ])
             process_network_ports.assert_has_calls([
                 mock.call({'current': set(['tap0']),
                            'removed': set([]),
-                           'added': set(['tap0'])}, False),
+                           'added': set(['tap2'])}, False),
+                mock.call({'current': set(['tap2']),
+                           'removed': set(['tap0']),
+                           'added': set([])}, False)
             ])
 
             cleanup.assert_called_once_with()