--- /dev/null
+# Copyright (c) 2015 Rackspace
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.agent.l3 import dvr_snat_ns
+from neutron.agent.l3 import namespace_manager
+from neutron.agent.l3 import namespaces
+from neutron.agent.linux import ip_lib
+from neutron.openstack.common import uuidutils
+from neutron.tests.functional import base
+
+_uuid = uuidutils.generate_uuid
+
+
+class NamespaceManagerTestFramework(base.BaseSudoTestCase):
+
+ def setUp(self):
+ super(NamespaceManagerTestFramework, self).setUp()
+ self.agent_conf = mock.MagicMock()
+ self.agent_conf.router_delete_namespaces = True
+ self.namespace_manager = namespace_manager.NamespaceManager(
+ self.agent_conf, driver=None, clean_stale=True)
+
+ def _create_namespace(self, router_id, ns_class):
+ namespace = ns_class(router_id, self.agent_conf, driver=None,
+ use_ipv6=False)
+ namespace.create()
+ self.addCleanup(self._delete_namespace, namespace)
+ return namespace.name
+
+ def _delete_namespace(self, namespace):
+ try:
+ namespace.delete()
+ except RuntimeError as e:
+ # If the namespace didn't exist when delete was attempted, mission
+ # acomplished. Otherwise, re-raise the exception
+ if 'No such file or directory' not in e.message:
+ raise e
+
+ def _namespace_exists(self, namespace):
+ ip = ip_lib.IPWrapper(namespace=namespace)
+ return ip.netns.exists(namespace)
+
+
+class NamespaceManagerTestCase(NamespaceManagerTestFramework):
+
+ def test_namespace_manager(self):
+ router_id = _uuid()
+ to_keep = set()
+ to_delete = set()
+ to_retrieve = set()
+ to_keep.add(self._create_namespace(router_id,
+ namespaces.RouterNamespace))
+ to_keep.add(self._create_namespace(router_id,
+ dvr_snat_ns.SnatNamespace))
+ to_delete.add(self._create_namespace(_uuid(),
+ dvr_snat_ns.SnatNamespace))
+ to_retrieve = to_keep | to_delete
+
+ with mock.patch.object(namespace_manager.NamespaceManager, 'list_all',
+ return_value=to_retrieve):
+ with self.namespace_manager as ns_manager:
+ for ns_name in to_keep:
+ id_to_keep = ns_manager.get_prefix_and_id(ns_name)[1]
+ ns_manager.keep_router(id_to_keep)
+
+ for ns_name in to_keep:
+ self.assertTrue(self._namespace_exists(ns_name))
+ for ns_name in to_delete:
+ self.assertFalse(self._namespace_exists(ns_name))
from neutron.agent.common import ovs_lib
from neutron.agent.l3 import agent as neutron_l3_agent
from neutron.agent.l3 import dvr_snat_ns
+from neutron.agent.l3 import namespace_manager
from neutron.agent.l3 import namespaces
from neutron.agent import l3_agent as l3_agent_main
from neutron.agent.linux import dhcp
(new_external_device_ip, external_device_name),
new_config)
+ def test_periodic_sync_routers_task(self):
+ routers_to_keep = []
+ routers_to_delete = []
+ ns_names_to_retrieve = set()
+ for i in range(2):
+ routers_to_keep.append(self.generate_router_info(False))
+ self.manage_router(self.agent, routers_to_keep[i])
+ ns_names_to_retrieve.add(namespaces.NS_PREFIX +
+ routers_to_keep[i]['id'])
+ for i in range(2):
+ routers_to_delete.append(self.generate_router_info(False))
+ self.manage_router(self.agent, routers_to_delete[i])
+ ns_names_to_retrieve.add(namespaces.NS_PREFIX +
+ routers_to_delete[i]['id'])
+
+ # Mock the plugin RPC API to Simulate a situation where the agent
+ # was handling the 4 routers created above, it went down and after
+ # starting up again, two of the routers were deleted via the API
+ mocked_get_routers = (
+ neutron_l3_agent.L3PluginApi.return_value.get_routers)
+ mocked_get_routers.return_value = routers_to_keep
+
+ # Synchonize the agent with the plug-in
+ with mock.patch.object(namespace_manager.NamespaceManager, 'list_all',
+ return_value=ns_names_to_retrieve):
+ self.agent.periodic_sync_routers_task(self.agent.context)
+
+ # Mock the plugin RPC API so a known external network id is returned
+ # when the router updates are processed by the agent
+ external_network_id = _uuid()
+ mocked_get_external_network_id = (
+ neutron_l3_agent.L3PluginApi.return_value.get_external_network_id)
+ mocked_get_external_network_id.return_value = external_network_id
+
+ # Plug external_gateway_info in the routers that are not going to be
+ # deleted by the agent when it processes the updates. Otherwise,
+ # _process_router_if_compatible in the agent fails
+ for i in range(2):
+ routers_to_keep[i]['external_gateway_info'] = {'network_id':
+ external_network_id}
+
+ # Have the agent process the update from the plug-in and verify
+ # expected behavior
+ for _ in routers_to_keep + routers_to_delete:
+ self.agent._process_router_update()
+
+ for i in range(2):
+ self.assertIn(routers_to_keep[i]['id'], self.agent.router_info)
+ self.assertTrue(self._namespace_exists(namespaces.NS_PREFIX +
+ routers_to_keep[i]['id']))
+ for i in range(2):
+ self.assertNotIn(routers_to_delete[i]['id'],
+ self.agent.router_info)
+ self.assertFalse(self._namespace_exists(
+ namespaces.NS_PREFIX + routers_to_delete[i]['id']))
+
def _router_lifecycle(self, enable_ha, ip_version=4, dual_stack=False):
router_info = self.generate_router_info(enable_ha, ip_version,
dual_stack=dual_stack)
--- /dev/null
+# Copyright (c) 2015 Rackspace
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.agent.l3 import dvr_snat_ns
+from neutron.agent.l3 import namespace_manager
+from neutron.agent.l3 import namespaces
+from neutron.agent.linux import ip_lib
+from neutron.openstack.common import uuidutils
+from neutron.tests import base
+
+_uuid = uuidutils.generate_uuid
+
+
+class NamespaceManagerTestCaseFramework(base.BaseTestCase):
+
+ def _create_namespace_manager(self):
+ self.agent_conf = mock.Mock()
+ self.driver = mock.Mock()
+ return namespace_manager.NamespaceManager(self.agent_conf,
+ self.driver, True)
+
+
+class TestNamespaceManager(NamespaceManagerTestCaseFramework):
+
+ def test_get_prefix_and_id(self):
+ ns_manager = self._create_namespace_manager()
+ router_id = _uuid()
+
+ ns_prefix, ns_id = ns_manager.get_prefix_and_id(
+ namespaces.NS_PREFIX + router_id)
+ self.assertEqual(ns_prefix, namespaces.NS_PREFIX)
+ self.assertEqual(ns_id, router_id)
+
+ ns_prefix, ns_id = ns_manager.get_prefix_and_id(
+ dvr_snat_ns.SNAT_NS_PREFIX + router_id)
+ self.assertEqual(ns_prefix, dvr_snat_ns.SNAT_NS_PREFIX)
+ self.assertEqual(ns_id, router_id)
+
+ ns_name = 'dhcp-' + router_id
+ self.assertIsNone(ns_manager.get_prefix_and_id(ns_name))
+
+ def test_is_managed(self):
+ ns_manager = self._create_namespace_manager()
+ router_id = _uuid()
+
+ router_ns_name = namespaces.NS_PREFIX + router_id
+ self.assertTrue(ns_manager.is_managed(router_ns_name))
+ router_ns_name = dvr_snat_ns.SNAT_NS_PREFIX + router_id
+ self.assertTrue(ns_manager.is_managed(router_ns_name))
+ self.assertFalse(ns_manager.is_managed('dhcp-' + router_id))
+
+ def test_list_all(self):
+ ns_manager = self._create_namespace_manager()
+ ns_names = [namespaces.NS_PREFIX + _uuid(),
+ dvr_snat_ns.SNAT_NS_PREFIX + _uuid(),
+ 'dhcp-' + _uuid(), ]
+
+ # Test the normal path
+ with mock.patch.object(ip_lib.IPWrapper, 'get_namespaces',
+ return_value=ns_names):
+ retrieved_ns_names = ns_manager.list_all()
+ self.assertEqual(len(ns_names) - 1, len(retrieved_ns_names))
+ for i in range(len(retrieved_ns_names)):
+ self.assertIn(ns_names[i], retrieved_ns_names)
+ self.assertNotIn(ns_names[-1], retrieved_ns_names)
+
+ # Test path where IPWrapper raises exception
+ with mock.patch.object(ip_lib.IPWrapper, 'get_namespaces',
+ side_effect=RuntimeError):
+ retrieved_ns_names = ns_manager.list_all()
+ self.assertFalse(retrieved_ns_names)