]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Stale VXLAN and GRE tunnel port/flow deletion
authorRomil Gupta <romilg@hp.com>
Thu, 20 Nov 2014 19:32:07 +0000 (11:32 -0800)
committerRomil Gupta <romilg@hp.com>
Wed, 11 Feb 2015 08:24:37 +0000 (00:24 -0800)
Description:
Stale GRE and VXLAN tunnel endpoints persists in neutron db this should be
deleted from the database. Also, if local_ip of L2 agent changes the
stale tunnel ports and flows persists on br-tun on other Compute Nodes and
Network Nodes for that remote ip this should also be removed.

Implementation

Plugin changes:
The plugin side changes are covered in following patch-set
https://review.openstack.org/#/c/121000/.

Agent changes:
Added tunnel_delete rpc for removing stale ports and flows from br-tun.
tunnel_sync rpc signature upgrade to obtain 'host'.
Added testcases for TunnelRpcCallbackMixin().

This patch-set agent deals with agent side changes.

Closes-Bug: #1179223
Closes-Bug: #1381071
Closes-Bug: #1276629

Co-Authored-By: Aman Kumar <amank@hp.com>
Co-Authored-By: phanipawan <ppawan@hp.com>
Change-Id: I291992ffde5c3ab7152f0d7462deca2e4ac1ba3f

12 files changed:
neutron/agent/rpc.py
neutron/plugins/ml2/drivers/type_tunnel.py
neutron/plugins/ml2/rpc.py
neutron/plugins/ofagent/agent/ofa_neutron_agent.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/unit/hyperv/test_hyperv_rpcapi.py
neutron/tests/unit/ml2/test_rpcapi.py
neutron/tests/unit/ml2/test_type_gre.py
neutron/tests/unit/ml2/test_type_tunnel.py
neutron/tests/unit/ml2/test_type_vxlan.py
neutron/tests/unit/ofagent/test_ofa_neutron_agent.py
neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py

index 9909856e0f3ae959e6c86ff72f8d01de4de4e948..f8adbc2b936bf756d778d0a0c256ed50bf203ae3 100644 (file)
@@ -79,6 +79,7 @@ class PluginApi(object):
         1.3 - get_device_details rpc signature upgrade to obtain 'host' and
               return value to include fixed_ips and device_owner for
               the device port
+        1.4 - tunnel_sync rpc signature upgrade to obtain 'host'
     '''
 
     def __init__(self, topic):
@@ -117,7 +118,14 @@ class PluginApi(object):
         return cctxt.call(context, 'update_device_up', device=device,
                           agent_id=agent_id, host=host)
 
-    def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
-        cctxt = self.client.prepare()
-        return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
-                          tunnel_type=tunnel_type)
+    def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
+        try:
+            cctxt = self.client.prepare(version='1.4')
+            res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
+                             tunnel_type=tunnel_type, host=host)
+        except oslo_messaging.UnsupportedVersion:
+            LOG.warn(_LW('Tunnel synchronization requires a server upgrade.'))
+            cctxt = self.client.prepare()
+            res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
+                             tunnel_type=tunnel_type)
+        return res
index c64e24e8e185957dd04beee7ed2571055aec20d5..25472438ac48f24358e1b0554efaddecdcb05353 100644 (file)
@@ -265,4 +265,13 @@ class TunnelAgentRpcApiMixin(object):
         cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
                    tunnel_type=tunnel_type)
 
-    # TODO(romilg): Add tunnel_delete rpc in dependent patch-set
+    def _get_tunnel_delete_topic(self):
+        return topics.get_topic_name(self.topic,
+                                     TUNNEL,
+                                     topics.DELETE)
+
+    def tunnel_delete(self, context, tunnel_ip, tunnel_type):
+        cctxt = self.client.prepare(topic=self._get_tunnel_delete_topic(),
+                                    fanout=True)
+        cctxt.cast(context, 'tunnel_delete', tunnel_ip=tunnel_ip,
+                   tunnel_type=tunnel_type)
index b08d06be1697c92fa7e946aca4a5385e8032c8b9..c92e5474046924e811d1a660544d222b0d3fb6bb 100644 (file)
@@ -45,7 +45,8 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
     #   1.3 get_device_details rpc signature upgrade to obtain 'host' and
     #       return value to include fixed_ips and device_owner for
     #       the device port
-    target = oslo_messaging.Target(version='1.3')
+    #   1.4 tunnel_sync rpc signature upgrade to obtain 'host'
+    target = oslo_messaging.Target(version='1.4')
 
     def __init__(self, notifier, type_manager):
         self.setup_tunnel_callback_mixin(notifier, type_manager)
index 6525df0af18d65ef4384b7cc6a71a63eef495afa..98856c5a5304866686722a5611d3d2249643cccf 100644 (file)
@@ -787,7 +787,8 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             for tunnel_type in self.tunnel_types:
                 self.plugin_rpc.tunnel_sync(self.context,
                                             self.local_ip,
-                                            tunnel_type)
+                                            tunnel_type,
+                                            cfg.CONF.host)
         except Exception as e:
             LOG.debug("Unable to sync tunnel IP %(local_ip)s: %(e)s",
                       {'local_ip': self.local_ip, 'e': e})
index b1a3565bb7c7a55846d8e19d6699e5f144b1cf50..311b4a2af47381fab1d4a10b04ca27ab754f3a45 100644 (file)
@@ -296,6 +296,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.NETWORK, topics.DELETE],
                      [constants.TUNNEL, topics.UPDATE],
+                     [constants.TUNNEL, topics.DELETE],
                      [topics.SECURITY_GROUP, topics.UPDATE],
                      [topics.DVR, topics.UPDATE]]
         if self.l2_pop:
@@ -354,6 +355,25 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             self._setup_tunnel_port(self.tun_br, tun_name, tunnel_ip,
                                     tunnel_type)
 
+    def tunnel_delete(self, context, **kwargs):
+        LOG.debug("tunnel_delete received")
+        if not self.enable_tunneling:
+            return
+        tunnel_ip = kwargs.get('tunnel_ip')
+        if not tunnel_ip:
+            LOG.error(_LE("No tunnel_ip specified, cannot delete tunnels"))
+            return
+        tunnel_type = kwargs.get('tunnel_type')
+        if not tunnel_type:
+            LOG.error(_LE("No tunnel_type specified, cannot delete tunnels"))
+            return
+        if tunnel_type not in self.tunnel_types:
+            LOG.error(_LE("tunnel_type %s not supported by agent"),
+                      tunnel_type)
+            return
+        ofport = self.tun_br_ofports[tunnel_type].get(tunnel_ip)
+        self.cleanup_tunnel_port(self.tun_br, ofport, tunnel_type)
+
     def fdb_add(self, context, fdb_entries):
         LOG.debug("fdb_add received")
         for lvm, agent_ports in self.get_agent_ports(fdb_entries,
@@ -1309,8 +1329,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         try:
             return '%08x' % netaddr.IPAddress(ip_address, version=4)
         except Exception:
-            LOG.warn(_LW("Unable to create tunnel port. "
-                         "Invalid remote IP: %s"), ip_address)
+            LOG.warn(_LW("Invalid remote IP: %s"), ip_address)
             return
 
     def tunnel_sync(self):
@@ -1318,7 +1337,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             for tunnel_type in self.tunnel_types:
                 details = self.plugin_rpc.tunnel_sync(self.context,
                                                       self.local_ip,
-                                                      tunnel_type)
+                                                      tunnel_type,
+                                                      cfg.CONF.host)
                 if not self.l2_pop:
                     tunnels = details['tunnels']
                     for tunnel in tunnels:
index 7f2c8672ff3393e6693c45655a823a95ed94b016..6c144ad92f9fc88b59afac6a76eea22e5d49394d 100644 (file)
@@ -142,4 +142,6 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
             rpcapi, None,
             'tunnel_sync', rpc_method='call',
             tunnel_ip='fake_tunnel_ip',
-            tunnel_type=None)
+            tunnel_type=None,
+            host='fake_host',
+            version='1.4')
index a3b4955f62a6b8f7e43119e080ec6fa630e8446b..efb1dbd57847f65747feb108b2df5f239ae0e51d 100644 (file)
@@ -29,6 +29,7 @@ from neutron.common import constants
 from neutron.common import exceptions
 from neutron.common import topics
 from neutron.plugins.ml2.drivers import type_tunnel
+from neutron.plugins.ml2 import managers
 from neutron.plugins.ml2 import rpc as plugin_rpc
 from neutron.tests import base
 
@@ -37,7 +38,10 @@ class RpcCallbacksTestCase(base.BaseTestCase):
 
     def setUp(self):
         super(RpcCallbacksTestCase, self).setUp()
-        self.callbacks = plugin_rpc.RpcCallbacks(mock.Mock(), mock.Mock())
+        self.type_manager = managers.TypeManager()
+        self.notifier = plugin_rpc.AgentNotifierApi(topics.AGENT)
+        self.callbacks = plugin_rpc.RpcCallbacks(self.notifier,
+                                                 self.type_manager)
         self.manager = mock.patch.object(
             plugin_rpc.manager, 'NeutronManager').start()
         self.l3plugin = mock.Mock()
@@ -234,6 +238,17 @@ class RpcApiTestCase(base.BaseTestCase):
                 fanout=True,
                 tunnel_ip='fake_ip', tunnel_type='gre')
 
+    def test_tunnel_delete(self):
+        rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
+        self._test_rpc_api(
+                rpcapi,
+                topics.get_topic_name(topics.AGENT,
+                                      type_tunnel.TUNNEL,
+                                      topics.DELETE),
+                'tunnel_delete', rpc_method='cast',
+                fanout=True,
+                tunnel_ip='fake_ip', tunnel_type='gre')
+
     def test_device_details(self):
         rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
         self._test_rpc_api(rpcapi, None,
@@ -263,7 +278,9 @@ class RpcApiTestCase(base.BaseTestCase):
         self._test_rpc_api(rpcapi, None,
                            'tunnel_sync', rpc_method='call',
                            tunnel_ip='fake_tunnel_ip',
-                           tunnel_type=None)
+                           tunnel_type=None,
+                           host='fake_host',
+                           version='1.4')
 
     def test_update_device_up(self):
         rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
index 3bdb4e103af5de96c28b1bc82370685a31f78ba6..6c4051792ceeb6324d96a564f36a3bb160bdf73c 100644 (file)
@@ -17,6 +17,7 @@ import mock
 
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.ml2.drivers import type_gre
+from neutron.tests.unit.ml2 import test_rpcapi
 from neutron.tests.unit.ml2 import test_type_tunnel
 from neutron.tests.unit import testlib_api
 
@@ -85,5 +86,12 @@ class GreTypeTest(test_type_tunnel.TunnelTypeTestMixin,
 
 
 class GreTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin,
-                           testlib_api.SqlTestCase):
+                            testlib_api.SqlTestCase):
     DRIVER_CLASS = type_gre.GreTypeDriver
+
+
+class GreTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin,
+                             test_rpcapi.RpcCallbacksTestCase,
+                             testlib_api.SqlTestCase):
+        DRIVER_CLASS = type_gre.GreTypeDriver
+        TYPE = p_const.TYPE_GRE
index 2095f90eb31652decbf34b5821266bd7cd882bf2..6637c6035b209161cb3c6b9259ea01830f3e7f47 100644 (file)
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import contextlib
+import mock
 from six import moves
 import testtools
 from testtools import matchers
@@ -21,6 +23,10 @@ from neutron.common import exceptions as exc
 from neutron.db import api as db
 from neutron.plugins.ml2 import driver_api as api
 
+TUNNEL_IP_ONE = "10.10.10.10"
+TUNNEL_IP_TWO = "10.10.10.20"
+HOST_ONE = 'fake_host_one'
+HOST_TWO = 'fake_host_two'
 TUN_MIN = 100
 TUN_MAX = 109
 TUNNEL_RANGES = [(TUN_MIN, TUN_MAX)]
@@ -219,3 +225,94 @@ class TunnelTypeMultiRangeTestMixin(object):
                     self.TUN_MIN1, self.TUN_MAX1):
             alloc = self.driver.get_allocation(self.session, key)
             self.assertFalse(alloc.allocated)
+
+
+class TunnelRpcCallbackTestMixin(object):
+
+    DRIVER_CLASS = None
+    TYPE = None
+
+    def setUp(self):
+        super(TunnelRpcCallbackTestMixin, self).setUp()
+        self.driver = self.DRIVER_CLASS()
+
+    def _test_tunnel_sync(self, kwargs, delete_tunnel=False):
+        with contextlib.nested(
+            mock.patch.object(self.notifier, 'tunnel_update'),
+            mock.patch.object(self.notifier, 'tunnel_delete')
+        ) as (tunnel_update, tunnel_delete):
+            details = self.callbacks.tunnel_sync('fake_context', **kwargs)
+            tunnels = details['tunnels']
+            for tunnel in tunnels:
+                self.assertEqual(kwargs['tunnel_ip'], tunnel['ip_address'])
+                self.assertEqual(kwargs['host'], tunnel['host'])
+            self.assertTrue(tunnel_update.called)
+            if delete_tunnel:
+                self.assertTrue(tunnel_delete.called)
+            else:
+                self.assertFalse(tunnel_delete.called)
+
+    def _test_tunnel_sync_raises(self, kwargs):
+        with contextlib.nested(
+            mock.patch.object(self.notifier, 'tunnel_update'),
+            mock.patch.object(self.notifier, 'tunnel_delete')
+        ) as (tunnel_update, tunnel_delete):
+            self.assertRaises(exc.InvalidInput,
+                              self.callbacks.tunnel_sync,
+                              'fake_context', **kwargs)
+            self.assertFalse(tunnel_update.called)
+            self.assertFalse(tunnel_delete.called)
+
+    def test_tunnel_sync_called_without_host_passed(self):
+        kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+                  'host': None}
+        self._test_tunnel_sync(kwargs)
+
+    def test_tunnel_sync_called_with_host_passed_for_existing_tunnel_ip(self):
+        self.driver.add_endpoint(TUNNEL_IP_ONE, None)
+
+        kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+                  'host': HOST_ONE}
+        self._test_tunnel_sync(kwargs)
+
+    def test_tunnel_sync_called_with_host_passed(self):
+        kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+                  'host': HOST_ONE}
+        self._test_tunnel_sync(kwargs)
+
+    def test_tunnel_sync_called_for_existing_endpoint(self):
+        self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
+
+        kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+                  'host': HOST_ONE}
+        self._test_tunnel_sync(kwargs)
+
+    def test_tunnel_sync_called_for_existing_host_with_tunnel_ip_changed(self):
+        self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
+
+        kwargs = {'tunnel_ip': TUNNEL_IP_TWO, 'tunnel_type': self.TYPE,
+                  'host': HOST_ONE}
+        self._test_tunnel_sync(kwargs, True)
+
+    def test_tunnel_sync_called_with_used_tunnel_ip_case_one(self):
+        self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
+
+        kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+                  'host': HOST_TWO}
+        self._test_tunnel_sync_raises(kwargs)
+
+    def test_tunnel_sync_called_with_used_tunnel_ip_case_two(self):
+        self.driver.add_endpoint(TUNNEL_IP_ONE, None)
+        self.driver.add_endpoint(TUNNEL_IP_TWO, HOST_TWO)
+
+        kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+                  'host': HOST_TWO}
+        self._test_tunnel_sync_raises(kwargs)
+
+    def test_tunnel_sync_called_without_tunnel_ip(self):
+        kwargs = {'tunnel_type': self.TYPE, 'host': None}
+        self._test_tunnel_sync_raises(kwargs)
+
+    def test_tunnel_sync_called_without_tunnel_type(self):
+        kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'host': None}
+        self._test_tunnel_sync_raises(kwargs)
index bf57b1d4edfcee6b5e819bda3760e8a0bbdfb096..b794261b2ac6b47f19af6d14fee49bbcbe6bfea8 100644 (file)
@@ -17,6 +17,7 @@ import mock
 
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.ml2.drivers import type_vxlan
+from neutron.tests.unit.ml2 import test_rpcapi
 from neutron.tests.unit.ml2 import test_type_tunnel
 from neutron.tests.unit import testlib_api
 
@@ -96,3 +97,10 @@ class VxlanTypeTest(test_type_tunnel.TunnelTypeTestMixin,
 class VxlanTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin,
                               testlib_api.SqlTestCase):
     DRIVER_CLASS = type_vxlan.VxlanTypeDriver
+
+
+class VxlanTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin,
+                               test_rpcapi.RpcCallbacksTestCase,
+                               testlib_api.SqlTestCase):
+        DRIVER_CLASS = type_vxlan.VxlanTypeDriver
+        TYPE = p_const.TYPE_VXLAN
index 8398a2394b1781e821f2dfbb3dceddee7412d3e9..ee728a85e8a5882a2a30fdfacb0763c69a1ce752 100644 (file)
@@ -761,6 +761,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
         self.agent.local_ip = 'agent_ip'
         self.agent.context = 'fake_context'
         self.agent.tunnel_types = ['vxlan']
+        self.agent.host = cfg.CONF.host
         with mock.patch.object(
             self.agent.plugin_rpc, 'tunnel_sync'
         ) as tunnel_sync_rpc_fn:
@@ -768,7 +769,8 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
             tunnel_sync_rpc_fn.assert_called_once_with(
                 self.agent.context,
                 self.agent.local_ip,
-                self.agent.tunnel_types[0])
+                self.agent.tunnel_types[0],
+                self.agent.host)
 
     def test__get_ports(self):
         ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
index 3e124fbf69a75d250e708bdba954e4186733d845..03d5a724b8563cd169cf42c2594f14b2958c71ac 100644 (file)
@@ -966,6 +966,18 @@ class TestOvsNeutronAgent(base.BaseTestCase):
             mock.call(self.agent.tun_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')]
         self.agent._setup_tunnel_port.assert_has_calls(expected_calls)
 
+    def test_tunnel_delete(self):
+        kwargs = {'tunnel_ip': '10.10.10.10',
+                  'tunnel_type': 'gre'}
+        self.agent.enable_tunneling = True
+        self.agent.tunnel_types = ['gre']
+        self.agent.tun_br_ofports = {'gre': {'10.10.10.10': '1'}}
+        with mock.patch.object(
+            self.agent, 'cleanup_tunnel_port'
+        ) as clean_tun_fn:
+            self.agent.tunnel_delete(context=None, **kwargs)
+            self.assertTrue(clean_tun_fn.called)
+
     def test_ovs_status(self):
         reply2 = {'current': set(['tap0']),
                   'added': set(['tap2']),