]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
OFAgent: Process port_update notifications in the main agent loop
authorfumihiko kakuma <kakuma@valinux.co.jp>
Sun, 16 Mar 2014 04:13:03 +0000 (13:13 +0900)
committerfumihiko kakuma <kakuma@valinux.co.jp>
Thu, 17 Apr 2014 23:28:43 +0000 (08:28 +0900)
Port the following patch to OFAgent.
commit: 5e6e592132aa9a98936ce3bfdb66efc7832caafb
https://review.openstack.org/#/c/61964/

Partial-Bug: 1293265

Change-Id: I53813d12c66dc746cd373fd91ff9bd9bdbf222db

neutron/plugins/ofagent/agent/ofa_neutron_agent.py
neutron/tests/unit/ofagent/test_ofa_neutron_agent.py

index 62bbd86693ddb43f297bdf8df3a8112df1502214..7e351272140c795ad392ddbb945d1b89132c5b82 100644 (file)
@@ -36,10 +36,8 @@ from neutron.common import constants as n_const
 from neutron.common import topics
 from neutron.common import utils as n_utils
 from neutron import context
-from neutron.extensions import securitygroup as ext_sg
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import common as rpc_common
 from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.ofagent.common import config  # noqa
@@ -277,6 +275,8 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
         self.sg_agent = OFASecurityGroupAgent(self.context,
                                               self.plugin_rpc,
                                               self.root_helper)
+        # Stores port update notifications for processing in main loop
+        self.updated_ports = set()
         # Initialize iteration counter
         self.iter_num = 0
 
@@ -353,33 +353,12 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
 
     def port_update(self, context, **kwargs):
         port = kwargs.get('port')
+        # Put the port identifier in the updated_ports set.
+        # Even if full port details might be provided to this call,
+        # they are not used since there is no guarantee the notifications
+        # are processed in the same order as the relevant API requests
+        self.updated_ports.add(port['id'])
         LOG.debug(_("port_update received port %s"), port['id'])
-        # Validate that port is on OVS
-        vif_port = self.int_br.get_vif_port_by_id(port['id'])
-        if not vif_port:
-            return
-
-        if ext_sg.SECURITYGROUPS in port:
-            self.sg_agent.refresh_firewall()
-        network_type = kwargs.get('network_type')
-        segmentation_id = kwargs.get('segmentation_id')
-        physical_network = kwargs.get('physical_network')
-        self.treat_vif_port(vif_port, port['id'], port['network_id'],
-                            network_type, physical_network,
-                            segmentation_id, port['admin_state_up'])
-        try:
-            if port['admin_state_up']:
-                # update plugin about port status
-                self.plugin_rpc.update_device_up(self.context, port['id'],
-                                                 self.agent_id,
-                                                 cfg.CONF.host)
-            else:
-                # update plugin about port status
-                self.plugin_rpc.update_device_down(self.context, port['id'],
-                                                   self.agent_id,
-                                                   cfg.CONF.host)
-        except rpc_common.Timeout:
-            LOG.error(_("RPC timeout while updating port %s"), port['id'])
 
     def tunnel_update(self, context, **kwargs):
         LOG.debug(_("tunnel_update received"))
@@ -973,16 +952,27 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
             self._phys_br_patch_physical_bridge_with_integration_bridge(
                 br, physical_network, bridge, ip_wrapper)
 
-    def update_ports(self, registered_ports):
-        ports = self.int_br.get_vif_port_set()
-        if ports == registered_ports:
-            return
-        self.int_br_device_count = len(ports)
-        added = ports - registered_ports
-        removed = registered_ports - ports
-        return {'current': ports,
-                'added': added,
-                'removed': removed}
+    def scan_ports(self, registered_ports, updated_ports=None):
+        cur_ports = self.int_br.get_vif_port_set()
+        self.int_br_device_count = len(cur_ports)
+        port_info = {'current': cur_ports}
+        if updated_ports:
+            # Some updated ports might have been removed in the
+            # meanwhile, and therefore should not be processed.
+            # In this case the updated port won't be found among
+            # current ports.
+            updated_ports &= cur_ports
+            if updated_ports:
+                port_info['updated'] = updated_ports
+
+        if cur_ports == registered_ports:
+            # No added or removed ports to set, just return here
+            return port_info
+
+        port_info['added'] = cur_ports - registered_ports
+        # Remove all the known ports not found on the integration bridge
+        port_info['removed'] = registered_ports - cur_ports
+        return port_info
 
     def update_ancillary_ports(self, registered_ports):
         ports = set()
@@ -1066,11 +1056,10 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
                     self.ryu_send_msg(msg)
         return ofport
 
-    def treat_devices_added(self, devices):
+    def treat_devices_added_or_updated(self, devices):
         resync = False
-        self.sg_agent.prepare_devices_filter(devices)
         for device in devices:
-            LOG.info(_("Port %s added"), device)
+            LOG.debug(_("Processing port %s"), device)
             try:
                 details = self.plugin_rpc.get_device_details(self.context,
                                                              device,
@@ -1093,12 +1082,17 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
                                     details['admin_state_up'])
 
                 # update plugin about port status
-                self.plugin_rpc.update_device_up(self.context,
-                                                 device,
-                                                 self.agent_id,
-                                                 cfg.CONF.host)
+                if details.get('admin_state_up'):
+                    LOG.debug(_("Setting status for %s to UP"), device)
+                    self.plugin_rpc.update_device_up(
+                        self.context, device, self.agent_id, cfg.CONF.host)
+                else:
+                    LOG.debug(_("Setting status for %s to DOWN"), device)
+                    self.plugin_rpc.update_device_down(
+                        self.context, device, self.agent_id, cfg.CONF.host)
+                LOG.info(_("Configuration for device %s completed."), device)
             else:
-                LOG.debug(_("Device %s not defined on plugin"), device)
+                LOG.warn(_("Device %s not defined on plugin"), device)
                 if (port and int(port.ofport) != -1):
                     self.port_dead(port)
         return resync
@@ -1166,11 +1160,25 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
     def process_network_ports(self, port_info):
         resync_add = False
         resync_removed = False
-        if 'added' in port_info:
+        # If there is an exception while processing security groups ports
+        # will not be wired anyway, and a resync will be triggered
+        self.sg_agent.prepare_devices_filter(port_info.get('added', set()))
+        if port_info.get('updated'):
+            self.sg_agent.refresh_firewall()
+        # VIF wiring needs to be performed always for 'new' devices.
+        # For updated ports, re-wiring is not needed in most cases, but needs
+        # to be performed anyway when the admin state of a device is changed.
+        # A device might be both in the 'added' and 'updated'
+        # list at the same time; avoid processing it twice.
+        devices_added_updated = (port_info.get('added', set()) |
+                                 port_info.get('updated', set()))
+        if devices_added_updated:
             start = time.time()
-            resync_add = self.treat_devices_added(port_info['added'])
+            resync_add = self.treat_devices_added_or_updated(
+                devices_added_updated)
             LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "
-                        "treat_devices_added completed in %(elapsed).3f"),
+                        "treat_devices_added_or_updated completed "
+                        "in %(elapsed).3f"),
                       {'iter_num': self.iter_num,
                        'elapsed': time.time() - start})
         if 'removed' in port_info:
@@ -1230,40 +1238,60 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
             resync = True
         return resync
 
+    def _agent_has_updates(self, polling_manager):
+        return (polling_manager.is_polling_required or
+                self.updated_ports)
+
+    def _port_info_has_changes(self, port_info):
+        return (port_info.get('added') or
+                port_info.get('removed') or
+                port_info.get('updated'))
+
     def ovsdb_monitor_loop(self, polling_manager=None):
         if not polling_manager:
             polling_manager = polling.AlwaysPoll()
 
         sync = True
         ports = set()
+        updated_ports_copy = set()
         ancillary_ports = set()
         tunnel_sync = True
         while True:
-            try:
-                start = time.time()
-                port_stats = {'regular': {'added': 0, 'removed': 0},
-                              'ancillary': {'added': 0, 'removed': 0}}
-                LOG.debug(_("Agent ovsdb_monitor_loop - "
-                          "iteration:%d started"),
-                          self.iter_num)
-                if sync:
-                    LOG.info(_("Agent out of sync with plugin!"))
-                    ports.clear()
-                    ancillary_ports.clear()
-                    sync = False
-                    polling_manager.force_polling()
-
-                # Notify the plugin of tunnel IP
-                if self.enable_tunneling and tunnel_sync:
-                    LOG.info(_("Agent tunnel out of sync with plugin!"))
+            start = time.time()
+            port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0},
+                          'ancillary': {'added': 0, 'removed': 0}}
+            LOG.debug(_("Agent ovsdb_monitor_loop - "
+                      "iteration:%d started"),
+                      self.iter_num)
+            if sync:
+                LOG.info(_("Agent out of sync with plugin!"))
+                ports.clear()
+                ancillary_ports.clear()
+                sync = False
+                polling_manager.force_polling()
+            # Notify the plugin of tunnel IP
+            if self.enable_tunneling and tunnel_sync:
+                LOG.info(_("Agent tunnel out of sync with plugin!"))
+                try:
                     tunnel_sync = self.tunnel_sync()
-                if polling_manager.is_polling_required:
+                except Exception:
+                    LOG.exception(_("Error while synchronizing tunnels"))
+                    tunnel_sync = True
+            if self._agent_has_updates(polling_manager):
+                try:
                     LOG.debug(_("Agent ovsdb_monitor_loop - "
                                 "iteration:%(iter_num)d - "
                                 "starting polling. Elapsed:%(elapsed).3f"),
                               {'iter_num': self.iter_num,
                                'elapsed': time.time() - start})
-                    port_info = self.update_ports(ports)
+                    # Save updated ports dict to perform rollback in
+                    # case resync would be needed, and then clear
+                    # self.updated_ports. As the greenthread should not yield
+                    # between these two statements, this will be thread-safe
+                    updated_ports_copy = self.updated_ports
+                    self.updated_ports = set()
+                    port_info = self.scan_ports(ports, updated_ports_copy)
+                    ports = port_info['current']
                     LOG.debug(_("Agent ovsdb_monitor_loop - "
                                 "iteration:%(iter_num)d - "
                                 "port information retrieved. "
@@ -1271,8 +1299,9 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
                               {'iter_num': self.iter_num,
                                'elapsed': time.time() - start})
                     # notify plugin about port deltas
-                    if port_info:
-                        LOG.debug(_("Agent loop has new devices!"))
+                    if self._port_info_has_changes(port_info):
+                        LOG.debug(_("Starting to process devices in:%s"),
+                                  port_info)
                         # If treat devices fails - must resync with plugin
                         sync = self.process_network_ports(port_info)
                         LOG.debug(_("Agent ovsdb_monitor_loop - "
@@ -1280,9 +1309,10 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
                                     "ports processed. Elapsed:%(elapsed).3f"),
                                   {'iter_num': self.iter_num,
                                    'elapsed': time.time() - start})
-                        ports = port_info['current']
                         port_stats['regular']['added'] = (
                             len(port_info.get('added', [])))
+                        port_stats['regular']['updated'] = (
+                            len(port_info.get('updated', [])))
                         port_stats['regular']['removed'] = (
                             len(port_info.get('removed', [])))
                     # Treat ancillary devices if they exist
@@ -1313,11 +1343,11 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
                             sync = sync | rc
 
                     polling_manager.polling_completed()
-
-            except Exception:
-                LOG.exception(_("Error in agent event loop"))
-                sync = True
-                tunnel_sync = True
+                except Exception:
+                    LOG.exception(_("Error while processing VIF ports"))
+                    # Put the ports back in self.updated_port
+                    self.updated_ports |= updated_ports_copy
+                    sync = True
 
             # sleep till end of polling interval
             elapsed = (time.time() - start)
index bbda0ee31775eecb86e94cc1fec2de37c429d420..b95d15edc8678638a05139a86a41909a334de735 100644 (file)
@@ -26,7 +26,6 @@ import testtools
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import utils
 from neutron.openstack.common import importutils
-from neutron.openstack.common.rpc import common as rpc_common
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.openvswitch.common import constants
 from neutron.tests import base
@@ -310,28 +309,69 @@ class TestOFANeutronAgent(OFAAgentTestCase):
     def test_port_dead_with_port_already_dead(self):
         self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
 
-    def mock_update_ports(self, vif_port_set=None, registered_ports=None):
+    def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
+                        updated_ports=None):
         with mock.patch.object(self.agent.int_br, 'get_vif_port_set',
                                return_value=vif_port_set):
-            return self.agent.update_ports(registered_ports)
+            return self.agent.scan_ports(registered_ports, updated_ports)
 
-    def test_update_ports_returns_none_for_unchanged_ports(self):
-        self.assertIsNone(self.mock_update_ports())
+    def test_scan_ports_returns_current_only_for_unchanged_ports(self):
+        vif_port_set = set([1, 3])
+        registered_ports = set([1, 3])
+        expected = {'current': vif_port_set}
+        actual = self.mock_scan_ports(vif_port_set, registered_ports)
+        self.assertEqual(expected, actual)
 
-    def test_update_ports_returns_port_changes(self):
+    def test_scan_ports_returns_port_changes(self):
         vif_port_set = set([1, 3])
         registered_ports = set([1, 2])
         expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
-        actual = self.mock_update_ports(vif_port_set, registered_ports)
+        actual = self.mock_scan_ports(vif_port_set, registered_ports)
+        self.assertEqual(expected, actual)
+
+    def _test_scan_ports_with_updated_ports(self, updated_ports):
+        vif_port_set = set([1, 3, 4])
+        registered_ports = set([1, 2, 4])
+        expected = dict(current=vif_port_set, added=set([3]),
+                        removed=set([2]), updated=set([4]))
+        actual = self.mock_scan_ports(vif_port_set, registered_ports,
+                                      updated_ports)
+        self.assertEqual(expected, actual)
+
+    def test_scan_ports_finds_known_updated_ports(self):
+        self._test_scan_ports_with_updated_ports(set([4]))
+
+    def test_scan_ports_ignores_unknown_updated_ports(self):
+        # the port '5' was not seen on current ports. Hence it has either
+        # never been wired or already removed and should be ignored
+        self._test_scan_ports_with_updated_ports(set([4, 5]))
+
+    def test_scan_ports_ignores_updated_port_if_removed(self):
+        vif_port_set = set([1, 3])
+        registered_ports = set([1, 2])
+        updated_ports = set([1, 2])
+        expected = dict(current=vif_port_set, added=set([3]),
+                        removed=set([2]), updated=set([1]))
+        actual = self.mock_scan_ports(vif_port_set, registered_ports,
+                                      updated_ports)
+        self.assertEqual(expected, actual)
+
+    def test_scan_ports_no_vif_changes_returns_updated_port_only(self):
+        vif_port_set = set([1, 2, 3])
+        registered_ports = set([1, 2, 3])
+        updated_ports = set([2])
+        expected = dict(current=vif_port_set, updated=set([2]))
+        actual = self.mock_scan_ports(vif_port_set, registered_ports,
+                                      updated_ports)
         self.assertEqual(expected, actual)
 
     def test_treat_devices_added_returns_true_for_missing_device(self):
         with mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
                                side_effect=Exception()):
-            self.assertTrue(self.agent.treat_devices_added([{}]))
+            self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
 
-    def _mock_treat_devices_added(self, details, port, func_name):
-        """Mock treat devices added.
+    def _mock_treat_devices_added_updated(self, details, port, func_name):
+        """Mock treat devices added or updated.
 
         :param details: the details to return for the device
         :param port: the port that get_vif_port_by_id should return
@@ -344,29 +384,51 @@ class TestOFANeutronAgent(OFAAgentTestCase):
             mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
                               return_value=port),
             mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+            mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
             mock.patch.object(self.agent, func_name)
-        ) as (get_dev_fn, get_vif_func, upd_dev_up, func):
-            self.assertFalse(self.agent.treat_devices_added([{}]))
+        ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
+            self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
         return func.called
 
-    def test_treat_devices_added_ignores_invalid_ofport(self):
+    def test_treat_devices_added_updated_ignores_invalid_ofport(self):
         port = mock.Mock()
         port.ofport = -1
-        self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port,
-                                                        'port_dead'))
+        self.assertFalse(self._mock_treat_devices_added_updated(
+            mock.MagicMock(), port, 'port_dead'))
 
-    def test_treat_devices_added_marks_unknown_port_as_dead(self):
+    def test_treat_devices_added_updated_marks_unknown_port_as_dead(self):
         port = mock.Mock()
         port.ofport = 1
-        self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port,
-                                                       'port_dead'))
+        self.assertTrue(self._mock_treat_devices_added_updated(
+            mock.MagicMock(), port, 'port_dead'))
 
-    def test_treat_devices_added_updates_known_port(self):
+    def test_treat_devices_added_updated_updates_known_port(self):
         details = mock.MagicMock()
         details.__contains__.side_effect = lambda x: True
-        self.assertTrue(self._mock_treat_devices_added(details,
-                                                       mock.Mock(),
-                                                       'treat_vif_port'))
+        self.assertTrue(self._mock_treat_devices_added_updated(
+            details, mock.Mock(), 'treat_vif_port'))
+
+    def test_treat_devices_added_updated_put_port_down(self):
+        fake_details_dict = {'admin_state_up': False,
+                             'port_id': 'xxx',
+                             'device': 'xxx',
+                             'network_id': 'yyy',
+                             'physical_network': 'foo',
+                             'segmentation_id': 'bar',
+                             'network_type': 'baz'}
+        with contextlib.nested(
+            mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
+                              return_value=fake_details_dict),
+            mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
+                              return_value=mock.MagicMock()),
+            mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+            mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
+            mock.patch.object(self.agent, 'treat_vif_port')
+        ) as (get_dev_fn, get_vif_func, upd_dev_up,
+              upd_dev_down, treat_vif_port):
+            self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+            self.assertTrue(treat_vif_port.called)
+            self.assertTrue(upd_dev_down.called)
 
     def test_treat_devices_removed_returns_true_for_missing_device(self):
         with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
@@ -387,17 +449,36 @@ class TestOFANeutronAgent(OFAAgentTestCase):
     def test_treat_devices_removed_ignores_missing_port(self):
         self._mock_treat_devices_removed(False)
 
+    def _test_process_network_ports(self, port_info):
+        with contextlib.nested(
+            mock.patch.object(self.agent.sg_agent, "prepare_devices_filter"),
+            mock.patch.object(self.agent.sg_agent, "refresh_firewall"),
+            mock.patch.object(self.agent, "treat_devices_added_or_updated",
+                              return_value=False),
+            mock.patch.object(self.agent, "treat_devices_removed",
+                              return_value=False)
+        ) as (prep_dev_filter, refresh_fw,
+              device_added_updated, device_removed):
+            self.assertFalse(self.agent.process_network_ports(port_info))
+            prep_dev_filter.assert_called_once_with(port_info['added'])
+            if port_info.get('updated'):
+                self.assertEqual(1, refresh_fw.call_count)
+            device_added_updated.assert_called_once_with(
+                port_info['added'] | port_info.get('updated', set()))
+            device_removed.assert_called_once_with(port_info['removed'])
+
     def test_process_network_ports(self):
-        reply = {'current': set(['tap0']),
-                 'removed': set(['eth0']),
-                 'added': set(['eth1'])}
-        with mock.patch.object(self.agent, 'treat_devices_added',
-                               return_value=False) as device_added:
-            with mock.patch.object(self.agent, 'treat_devices_removed',
-                                   return_value=False) as device_removed:
-                self.assertFalse(self.agent.process_network_ports(reply))
-                device_added.assert_called_once_with(set(['eth1']))
-                device_removed.assert_called_once_with(set(['eth0']))
+        self._test_process_network_ports(
+            {'current': set(['tap0']),
+             'removed': set(['eth0']),
+             'added': set(['eth1'])})
+
+    def test_process_network_port_with_updated_ports(self):
+        self._test_process_network_ports(
+            {'current': set(['tap0', 'tap1']),
+             'updated': set(['tap1', 'eth1']),
+             'removed': set(['eth0']),
+             'added': set(['eth1'])})
 
     def test_report_state(self):
         with mock.patch.object(self.agent.state_rpc,
@@ -424,61 +505,15 @@ class TestOFANeutronAgent(OFAAgentTestCase):
             recl_fn.assert_called_with("123")
 
     def test_port_update(self):
-        with contextlib.nested(
-            mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
-            mock.patch.object(self.agent, "treat_vif_port"),
-            mock.patch.object(self.agent.plugin_rpc, "update_device_up"),
-            mock.patch.object(self.agent.plugin_rpc, "update_device_down")
-        ) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn):
-            port = {"id": "123",
-                    "network_id": "124",
-                    "admin_state_up": False}
-            getvif_fn.return_value = "vif_port_obj"
-            self.agent.port_update("unused_context",
-                                   port=port,
-                                   network_type="vlan",
-                                   segmentation_id="1",
-                                   physical_network="physnet")
-            treatvif_fn.assert_called_with("vif_port_obj", "123",
-                                           "124", "vlan", "physnet",
-                                           "1", False)
-            upddown_fn.assert_called_with(self.agent.context,
-                                          "123", self.agent.agent_id,
-                                          cfg.CONF.host)
-
-            port["admin_state_up"] = True
-            self.agent.port_update("unused_context",
-                                   port=port,
-                                   network_type="vlan",
-                                   segmentation_id="1",
-                                   physical_network="physnet")
-            updup_fn.assert_called_with(self.agent.context,
-                                        "123", self.agent.agent_id,
-                                        cfg.CONF.host)
-
-    def test_port_update_plugin_rpc_failed(self):
-        port = {'id': 1,
-                'network_id': 1,
-                'admin_state_up': True}
-        with contextlib.nested(
-            mock.patch.object(self.mod_agent.LOG, 'error'),
-            mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
-            mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
-            mock.patch.object(self.agent, 'port_bound'),
-            mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
-            mock.patch.object(self.agent, 'port_dead')
-        ) as (log, _, device_up, _, device_down, _):
-            device_up.side_effect = rpc_common.Timeout
-            self.agent.port_update(mock.Mock(), port=port)
-            self.assertTrue(device_up.called)
-            self.assertEqual(log.call_count, 1)
-
-            log.reset_mock()
-            port['admin_state_up'] = False
-            device_down.side_effect = rpc_common.Timeout
-            self.agent.port_update(mock.Mock(), port=port)
-            self.assertTrue(device_down.called)
-            self.assertEqual(log.call_count, 1)
+        port = {"id": "123",
+                "network_id": "124",
+                "admin_state_up": False}
+        self.agent.port_update("unused_context",
+                               port=port,
+                               network_type="vlan",
+                               segmentation_id="1",
+                               physical_network="physnet")
+        self.assertEqual(set(['123']), self.agent.updated_ports)
 
     def test_setup_physical_bridges(self):
         with contextlib.nested(