]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
change port status only if port is bound to the good host
authormathieu-rohon <mathieu.rohon@gmail.com>
Wed, 2 Oct 2013 12:13:36 +0000 (14:13 +0200)
committerMark McClain <mark.mcclain@dreamhost.com>
Tue, 8 Oct 2013 15:02:55 +0000 (11:02 -0400)
if host is set in the rpc message update_device_up/down sent by the agent,
the port status will be changed only if the port is bound to the host.

Change-Id: I0e607c734fbebf0b69f83c3bbd3e25a9783672dc
Closes-Bug: #1224967

16 files changed:
neutron/agent/rpc.py
neutron/plugins/hyperv/agent/hyperv_neutron_agent.py
neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py
neutron/plugins/linuxbridge/lb_neutron_plugin.py
neutron/plugins/ml2/db.py
neutron/plugins/ml2/plugin.py
neutron/plugins/ml2/rpc.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/plugins/openvswitch/ovs_neutron_plugin.py
neutron/tests/unit/hyperv/test_hyperv_rpcapi.py
neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py
neutron/tests/unit/linuxbridge/test_rpcapi.py
neutron/tests/unit/ml2/test_rpcapi.py
neutron/tests/unit/mlnx/test_rpcapi.py
neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py
neutron/tests/unit/openvswitch/test_ovs_rpcapi.py

index 16f1cfb0293f0658a9404a579289e40b9fd4ca50..3dbed74de728cc07b646d6e895f9792e678560f5 100644 (file)
@@ -94,16 +94,16 @@ class PluginApi(proxy.RpcProxy):
                                        agent_id=agent_id),
                          topic=self.topic)
 
-    def update_device_down(self, context, device, agent_id):
+    def update_device_down(self, context, device, agent_id, host=None):
         return self.call(context,
                          self.make_msg('update_device_down', device=device,
-                                       agent_id=agent_id),
+                                       agent_id=agent_id, host=host),
                          topic=self.topic)
 
-    def update_device_up(self, context, device, agent_id):
+    def update_device_up(self, context, device, agent_id, host=None):
         return self.call(context,
                          self.make_msg('update_device_up', device=device,
-                                       agent_id=agent_id),
+                                       agent_id=agent_id, host=host),
                          topic=self.topic)
 
     def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
index c4421a3b3b7a222d7ab124c49e10623d30152ee9..209b9122c432fd315928e0c357a642c664fd9671 100644 (file)
@@ -319,7 +319,8 @@ class HyperVNeutronAgent(object):
             try:
                 self.plugin_rpc.update_device_down(self.context,
                                                    device,
-                                                   self.agent_id)
+                                                   self.agent_id,
+                                                   cfg.CONF.host)
             except Exception as e:
                 LOG.debug(
                     _("Removing port failed for device %(device)s: %(e)s"),
index 549a08c9e64f2b427ea4511015030a3ff2c0f72d..b25f56fb5fd728c4da2562fa893cfe9b325137bd 100755 (executable)
@@ -653,11 +653,13 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                     # update plugin about port status
                     self.agent.plugin_rpc.update_device_up(self.context,
                                                            tap_device_name,
-                                                           self.agent.agent_id)
+                                                           self.agent.agent_id,
+                                                           cfg.CONF.host)
                 else:
                     self.plugin_rpc.update_device_down(self.context,
                                                        tap_device_name,
-                                                       self.agent.agent_id)
+                                                       self.agent.agent_id,
+                                                       cfg.CONF.host)
             else:
                 bridge_name = self.agent.br_mgr.get_bridge_name(
                     port['network_id'])
@@ -666,7 +668,8 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 # update plugin about port status
                 self.agent.plugin_rpc.update_device_down(self.context,
                                                          tap_device_name,
-                                                         self.agent.agent_id)
+                                                         self.agent.agent_id,
+                                                         cfg.CONF.host)
         except rpc_common.Timeout:
             LOG.error(_("RPC timeout while updating port %s"), port['id'])
 
@@ -855,11 +858,13 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
                         # update plugin about port status
                         self.plugin_rpc.update_device_up(self.context,
                                                          device,
-                                                         self.agent_id)
+                                                         self.agent_id,
+                                                         cfg.CONF.host)
                     else:
                         self.plugin_rpc.update_device_down(self.context,
                                                            device,
-                                                           self.agent_id)
+                                                           self.agent_id,
+                                                           cfg.CONF.host)
                 else:
                     self.remove_port_binding(details['network_id'],
                                              details['port_id'])
@@ -875,7 +880,8 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
             try:
                 details = self.plugin_rpc.update_device_down(self.context,
                                                              device,
-                                                             self.agent_id)
+                                                             self.agent_id,
+                                                             cfg.CONF.host)
             except Exception as e:
                 LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
                           {'device': device, 'e': e})
index 588dd4ece948ed37c269956d017c1d2eaa2f1daf..1920d0e8d96243fa77a9e6ecd12d27735a346794 100644 (file)
@@ -41,6 +41,7 @@ from neutron.db import quota_db  # noqa
 from neutron.db import securitygroups_rpc_base as sg_db_rpc
 from neutron.extensions import portbindings
 from neutron.extensions import providernet as provider
+from neutron import manager
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import rpc
@@ -116,13 +117,20 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
         # TODO(garyk) - live migration and port status
         agent_id = kwargs.get('agent_id')
         device = kwargs.get('device')
+        host = kwargs.get('host')
+        port = self.get_port_from_device(device)
         LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
                   {'device': device, 'agent_id': agent_id})
-        port = self.get_port_from_device(device)
+        plugin = manager.NeutronManager.get_plugin()
         if port:
             entry = {'device': device,
                      'exists': True}
-            if port['status'] != q_const.PORT_STATUS_DOWN:
+            if (host and not
+                plugin.get_port_host(rpc_context, port['id']) == host):
+                LOG.debug(_("Device %(device)s not bound to the"
+                            " agent host %(host)s"),
+                          {'device': device, 'host': host})
+            elif port['status'] != q_const.PORT_STATUS_DOWN:
                 # Set port status to DOWN
                 db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
         else:
@@ -135,13 +143,21 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
         """Device is up on agent."""
         agent_id = kwargs.get('agent_id')
         device = kwargs.get('device')
-        LOG.debug(_("Device %(device)s up %(agent_id)s"),
+        host = kwargs.get('host')
+        port = self.get_port_from_device.get_port(device)
+        LOG.debug(_("Device %(device)s up on %(agent_id)s"),
                   {'device': device, 'agent_id': agent_id})
-        port = self.get_port_from_device(device)
+        plugin = manager.NeutronManager.get_plugin()
         if port:
-            if port['status'] != q_const.PORT_STATUS_ACTIVE:
-                # Set port status to ACTIVE
-                db.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE)
+            if (host and
+                not plugin.get_port_host(rpc_context, port['id']) == host):
+                LOG.debug(_("Device %(device)s not bound to the"
+                            " agent host %(host)s"),
+                          {'device': device, 'host': host})
+                return
+            elif port['status'] != q_const.PORT_STATUS_ACTIVE:
+                db.set_port_status(port['id'],
+                                   q_const.PORT_STATUS_ACTIVE)
         else:
             LOG.debug(_("%s can not be found in database"), device)
 
index c987d78602a5da1b0ca8b0af745e4d8349e283b2..bf911d3d1a2cce497e77925bad045ac32fb98586 100644 (file)
@@ -119,3 +119,17 @@ def get_port_and_sgs(port_id):
         port_dict['fixed_ips'] = [ip['ip_address']
                                   for ip in port['fixed_ips']]
         return port_dict
+
+
+def get_port_binding_host(port_id):
+    session = db_api.get_session()
+    with session.begin(subtransactions=True):
+        try:
+            query = (session.query(models.PortBinding).
+                     filter(models.PortBinding.port_id.startswith(port_id)).
+                     one())
+        except exc.NoResultFound:
+            LOG.debug(_("No binding found for port %(port_id)s"),
+                      {'port_id': port_id})
+            return
+    return query.host
index 3c8a7d5f28e453e7173ed211175cc2e7a094c4b4..ce4c8d635f155bda4b8c4752b73a5d4494c51746 100644 (file)
@@ -591,7 +591,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                 LOG.warning(_("Port %(port)s updated up by agent not found"),
                             {'port': port_id})
                 return False
-
             if port.status != status:
                 original_port = self._make_port_dict(port)
                 port.status = status
@@ -608,3 +607,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             self.mechanism_manager.update_port_postcommit(mech_context)
 
         return True
+
+    def port_bound_to_host(self, port_id, host):
+        port_host = db.get_port_binding_host(port_id)
+        return (port_host == host)
index 4ead0e339422a7e9d9f5565765288b17a2441362..f44a3eb1e329adb93d4daeedc03ffcd5bc0fd718 100644 (file)
@@ -153,12 +153,20 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
         # TODO(garyk) - live migration and port status
         agent_id = kwargs.get('agent_id')
         device = kwargs.get('device')
+        host = kwargs.get('host')
         LOG.debug(_("Device %(device)s no longer exists at agent "
                     "%(agent_id)s"),
                   {'device': device, 'agent_id': agent_id})
+        plugin = manager.NeutronManager.get_plugin()
         port_id = self._device_to_port_id(device)
+        port_exists = True
+        if (host and not plugin.port_bound_to_host(port_id, host)):
+            LOG.debug(_("Device %(device)s not bound to the"
+                        " agent host %(host)s"),
+                      {'device': device, 'host': host})
+            return {'device': device,
+                    'exists': port_exists}
 
-        plugin = manager.NeutronManager.get_plugin()
         port_exists = plugin.update_port_status(rpc_context, port_id,
                                                 q_const.PORT_STATUS_DOWN)
 
@@ -169,11 +177,17 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
         """Device is up on agent."""
         agent_id = kwargs.get('agent_id')
         device = kwargs.get('device')
+        host = kwargs.get('host')
         LOG.debug(_("Device %(device)s up at agent %(agent_id)s"),
                   {'device': device, 'agent_id': agent_id})
+        plugin = manager.NeutronManager.get_plugin()
         port_id = self._device_to_port_id(device)
+        if (host and not plugin.port_bound_to_host(port_id, host)):
+            LOG.debug(_("Device %(device)s not bound to the"
+                        " agent host %(host)s"),
+                      {'device': device, 'host': host})
+            return
 
-        plugin = manager.NeutronManager.get_plugin()
         plugin.update_port_status(rpc_context, port_id,
                                   q_const.PORT_STATUS_ACTIVE)
 
index eefe384367fda0000ddc115b872292b30b452a64..d702b0b8ec2c1e1ffd78e8a8c42c7ab5ddd4f4ed 100644 (file)
@@ -297,11 +297,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             if port['admin_state_up']:
                 # update plugin about port status
                 self.plugin_rpc.update_device_up(self.context, port['id'],
-                                                 self.agent_id)
+                                                 self.agent_id,
+                                                 cfg.CONF.host)
             else:
                 # update plugin about port status
                 self.plugin_rpc.update_device_down(self.context, port['id'],
-                                                   self.agent_id)
+                                                   self.agent_id,
+                                                   cfg.CONF.host)
         except rpc_common.Timeout:
             LOG.error(_("RPC timeout while updating port %s"), port['id'])
 
@@ -910,7 +912,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 # update plugin about port status
                 self.plugin_rpc.update_device_up(self.context,
                                                  device,
-                                                 self.agent_id)
+                                                 self.agent_id,
+                                                 cfg.CONF.host)
             else:
                 LOG.debug(_("Device %s not defined on plugin"), device)
                 if (port and int(port.ofport) != -1):
@@ -934,7 +937,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             # update plugin about port status
             self.plugin_rpc.update_device_up(self.context,
                                              device,
-                                             self.agent_id)
+                                             self.agent_id,
+                                             cfg.CONF.host)
         return resync
 
     def treat_devices_removed(self, devices):
@@ -945,7 +949,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             try:
                 details = self.plugin_rpc.update_device_down(self.context,
                                                              device,
-                                                             self.agent_id)
+                                                             self.agent_id,
+                                                             cfg.CONF.host)
             except Exception as e:
                 LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
                           {'device': device, 'e': e})
@@ -966,7 +971,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             try:
                 details = self.plugin_rpc.update_device_down(self.context,
                                                              device,
-                                                             self.agent_id)
+                                                             self.agent_id,
+                                                             cfg.CONF.host)
             except Exception as e:
                 LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
                           {'device': device, 'e': e})
index f7222660525d88a831eb01f371001c0033a38be7..6377c073ba5fd8b180bc3020c38e078f6c3f9b75 100644 (file)
@@ -52,6 +52,7 @@ from neutron.extensions import allowedaddresspairs as addr_pair
 from neutron.extensions import extra_dhcp_opt as edo_ext
 from neutron.extensions import portbindings
 from neutron.extensions import providernet as provider
+from neutron import manager
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import rpc
@@ -123,18 +124,25 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
 
     def update_device_down(self, rpc_context, **kwargs):
         """Device no longer exists on agent."""
-        # TODO(garyk) - live migration and port status
         agent_id = kwargs.get('agent_id')
         device = kwargs.get('device')
+        host = kwargs.get('host')
+        port = ovs_db_v2.get_port(device)
         LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
                   {'device': device, 'agent_id': agent_id})
-        port = ovs_db_v2.get_port(device)
         if port:
             entry = {'device': device,
                      'exists': True}
-            if port['status'] != q_const.PORT_STATUS_DOWN:
+            plugin = manager.NeutronManager.get_plugin()
+            if (host and
+                not plugin.get_port_host(rpc_context, port['id']) == host):
+                LOG.debug(_("Device %(device)s not bound to the"
+                            " agent host %(host)s"),
+                          {'device': device, 'host': host})
+            elif port['status'] != q_const.PORT_STATUS_DOWN:
                 # Set port status to DOWN
-                ovs_db_v2.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
+                ovs_db_v2.set_port_status(port['id'],
+                                          q_const.PORT_STATUS_DOWN)
         else:
             entry = {'device': device,
                      'exists': False}
@@ -145,11 +153,19 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
         """Device is up on agent."""
         agent_id = kwargs.get('agent_id')
         device = kwargs.get('device')
+        host = kwargs.get('host')
+        port = ovs_db_v2.get_port(device)
         LOG.debug(_("Device %(device)s up on %(agent_id)s"),
                   {'device': device, 'agent_id': agent_id})
-        port = ovs_db_v2.get_port(device)
+        plugin = manager.NeutronManager.get_plugin()
         if port:
-            if port['status'] != q_const.PORT_STATUS_ACTIVE:
+            if (host and
+                not plugin.get_port_host(rpc_context, port['id']) == host):
+                LOG.debug(_("Device %(device)s not bound to the"
+                            " agent host %(host)s"),
+                          {'device': device, 'host': host})
+                return
+            elif port['status'] != q_const.PORT_STATUS_ACTIVE:
                 ovs_db_v2.set_port_status(port['id'],
                                           q_const.PORT_STATUS_ACTIVE)
         else:
index 70fe33da1cc39747856709d265755957ea2d06ec..2765acf0437efd2c7286a07513882907c3d25ef5 100644 (file)
@@ -116,7 +116,8 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
             rpcapi, topics.PLUGIN,
             'update_device_down', rpc_method='call',
             device='fake_device',
-            agent_id='fake_agent_id')
+            agent_id='fake_agent_id',
+            host='fake_host')
 
     def test_tunnel_sync(self):
         rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
index fd40f74d2112e180be11d1d4f7e517704df43d93..b0846439cb1138ffd93fac343696d559b91fdd0f 100644 (file)
@@ -753,7 +753,8 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
             rpc_obj.update_device_down.assert_called_with(
                 self.lb_rpc.context,
                 "tap123",
-                self.lb_rpc.agent.agent_id
+                self.lb_rpc.agent.agent_id,
+                cfg.CONF.host
             )
 
     def test_port_update_plugin_rpc_failed(self):
index 1db75a12bb6180f6021c3620e30c9921233a8b8e..e6e8587e0ecbe782709a4ee4734625207f150834 100644 (file)
@@ -118,11 +118,13 @@ class rpcApiTestCase(base.BaseTestCase):
         self._test_lb_api(rpcapi, topics.PLUGIN,
                           'update_device_down', rpc_method='call',
                           device='fake_device',
-                          agent_id='fake_agent_id')
+                          agent_id='fake_agent_id',
+                          host='fake_host')
 
     def test_update_device_up(self):
         rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
         self._test_lb_api(rpcapi, topics.PLUGIN,
                           'update_device_up', rpc_method='call',
                           device='fake_device',
-                          agent_id='fake_agent_id')
+                          agent_id='fake_agent_id',
+                          host='fake_host')
index a51c6a3c5007c7030670325cd8032dbd7a4efb0f..0f4a858b72bbdb77f6bc1de28f118420420f4982 100644 (file)
@@ -93,7 +93,8 @@ class RpcApiTestCase(base.BaseTestCase):
         self._test_rpc_api(rpcapi, topics.PLUGIN,
                            'update_device_down', rpc_method='call',
                            device='fake_device',
-                           agent_id='fake_agent_id')
+                           agent_id='fake_agent_id',
+                           host='fake_host')
 
     def test_tunnel_sync(self):
         rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
@@ -107,4 +108,5 @@ class RpcApiTestCase(base.BaseTestCase):
         self._test_rpc_api(rpcapi, topics.PLUGIN,
                            'update_device_up', rpc_method='call',
                            device='fake_device',
-                           agent_id='fake_agent_id')
+                           agent_id='fake_agent_id',
+                           host='fake_host')
index 820c33a6073902a01dae61c97746f75f7baf603f..900a50f0fd88f46b5044b8546f2fc8ce59ca7552 100644 (file)
@@ -141,11 +141,13 @@ class rpcApiTestCase(base.BaseTestCase):
         self._test_mlnx_api(rpcapi, topics.PLUGIN,
                             'update_device_down', rpc_method='call',
                             device='fake_device',
-                            agent_id='fake_agent_id')
+                            agent_id='fake_agent_id',
+                            host='fake_host')
 
     def test_update_device_up(self):
         rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
         self._test_mlnx_api(rpcapi, topics.PLUGIN,
                             'update_device_up', rpc_method='call',
                             device='fake_device',
-                            agent_id='fake_agent_id')
+                            agent_id='fake_agent_id',
+                            host='fake_host')
index e74fc3149589ee86f34d3ed4d865640aa7e448e3..c292eaf0c41dc0a453ebf079f3115771110e7545 100644 (file)
@@ -288,7 +288,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                                            "124", "vlan", "physnet",
                                            "1", False)
             upddown_fn.assert_called_with(self.agent.context,
-                                          "123", self.agent.agent_id)
+                                          "123", self.agent.agent_id,
+                                          cfg.CONF.host)
 
             port["admin_state_up"] = True
             self.agent.port_update("unused_context",
@@ -297,7 +298,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                                    segmentation_id="1",
                                    physical_network="physnet")
             updup_fn.assert_called_with(self.agent.context,
-                                        "123", self.agent.agent_id)
+                                        "123", self.agent.agent_id,
+                                        cfg.CONF.host)
 
     def test_port_update_plugin_rpc_failed(self):
         port = {'id': 1,
index 868d21dff5d025fb2db915a6e1656bd8882572a5..1a480a44a42b8fe21f6c87dec7d0e460a24d7a0c 100644 (file)
@@ -102,7 +102,8 @@ class rpcApiTestCase(base.BaseTestCase):
         self._test_ovs_api(rpcapi, topics.PLUGIN,
                            'update_device_down', rpc_method='call',
                            device='fake_device',
-                           agent_id='fake_agent_id')
+                           agent_id='fake_agent_id',
+                           host='fake_host')
 
     def test_tunnel_sync(self):
         rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
@@ -116,4 +117,5 @@ class rpcApiTestCase(base.BaseTestCase):
         self._test_ovs_api(rpcapi, topics.PLUGIN,
                            'update_device_up', rpc_method='call',
                            device='fake_device',
-                           agent_id='fake_agent_id')
+                           agent_id='fake_agent_id',
+                           host='fake_host')