]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Prepare for unit test reorg
authorMaru Newby <marun@redhat.com>
Fri, 3 Apr 2015 23:42:31 +0000 (23:42 +0000)
committerMaru Newby <marun@redhat.com>
Sat, 4 Apr 2015 00:45:05 +0000 (00:45 +0000)
The unit test reorg is about moving files around so a test module is
clearly associated with the code module it targets, but the test
modules in this change needed to be manually merged because they both
targeted the same module.

test_api_v2 is also updated to use the path of neutron/tests/base.py
as the root of path to test implementations of extensions.

Change-Id: I432b84339e51c26ef0aa26d44e29b5a3311626ad
Implements: bp/reorganize-unit-test-tree

neutron/tests/unit/agent/l3/test_l3_router.py [deleted file]
neutron/tests/unit/agent/l3/test_router_info.py
neutron/tests/unit/agent/linux/test_process_monitor.py [deleted file]
neutron/tests/unit/test_api_v2.py
neutron/tests/unit/test_db_plugin.py
neutron/tests/unit/test_db_plugin_level.py [deleted file]
neutron/tests/unit/test_extension_extended_attribute.py [deleted file]
neutron/tests/unit/test_extensions.py
neutron/tests/unit/test_linux_external_process.py

diff --git a/neutron/tests/unit/agent/l3/test_l3_router.py b/neutron/tests/unit/agent/l3/test_l3_router.py
deleted file mode 100644 (file)
index 27d5b80..0000000
+++ /dev/null
@@ -1,226 +0,0 @@
-# Copyright (c) 2015 Openstack Foundation
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import mock
-
-from neutron.agent.l3 import router_info
-from neutron.agent.linux import ip_lib
-from neutron.common import constants as l3_constants
-from neutron.common import exceptions as n_exc
-from neutron.openstack.common import uuidutils
-from neutron.tests import base
-
-_uuid = uuidutils.generate_uuid
-
-
-class BasicRouterTestCaseFramework(base.BaseTestCase):
-    def _create_router(self, router=None, **kwargs):
-        if not router:
-            router = mock.MagicMock()
-        self.agent_conf = mock.Mock()
-        # NOTE The use_namespaces config will soon be deprecated
-        self.agent_conf.use_namespaces = True
-        self.router_id = _uuid()
-        return router_info.RouterInfo(self.router_id,
-                                      router,
-                                      self.agent_conf,
-                                      mock.sentinel.interface_driver,
-                                      **kwargs)
-
-
-class TestBasicRouterOperations(BasicRouterTestCaseFramework):
-
-    def test_get_floating_ips(self):
-        router = mock.MagicMock()
-        router.get.return_value = [mock.sentinel.floating_ip]
-        ri = self._create_router(router)
-
-        fips = ri.get_floating_ips()
-
-        self.assertEqual([mock.sentinel.floating_ip], fips)
-
-    def test_process_floating_ip_nat_rules(self):
-        ri = self._create_router()
-        fips = [{'fixed_ip_address': mock.sentinel.ip,
-                 'floating_ip_address': mock.sentinel.fip}]
-        ri.get_floating_ips = mock.Mock(return_value=fips)
-        ri.iptables_manager = mock.MagicMock()
-        ipv4_nat = ri.iptables_manager.ipv4['nat']
-        ri.floating_forward_rules = mock.Mock(
-            return_value=[(mock.sentinel.chain, mock.sentinel.rule)])
-
-        ri.process_floating_ip_nat_rules()
-
-        # Be sure that the rules are cleared first and apply is called last
-        self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
-                         ipv4_nat.mock_calls[0])
-        self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
-
-        # Be sure that add_rule is called somewhere in the middle
-        ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain,
-                                                  mock.sentinel.rule,
-                                                  tag='floating_ip')
-
-    def test_process_floating_ip_nat_rules_removed(self):
-        ri = self._create_router()
-        ri.get_floating_ips = mock.Mock(return_value=[])
-        ri.iptables_manager = mock.MagicMock()
-        ipv4_nat = ri.iptables_manager.ipv4['nat']
-
-        ri.process_floating_ip_nat_rules()
-
-        # Be sure that the rules are cleared first and apply is called last
-        self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
-                         ipv4_nat.mock_calls[0])
-        self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
-
-        # Be sure that add_rule is called somewhere in the middle
-        self.assertFalse(ipv4_nat.add_rule.called)
-
-    def _test_add_fip_addr_to_device_error(self, device):
-        ri = self._create_router()
-        ip = '15.1.2.3'
-
-        result = ri._add_fip_addr_to_device(
-            {'id': mock.sentinel.id, 'floating_ip_address': ip}, device)
-
-        device.addr.add.assert_called_with(ip + '/32')
-        return result
-
-    def test__add_fip_addr_to_device(self):
-        result = self._test_add_fip_addr_to_device_error(mock.Mock())
-        self.assertTrue(result)
-
-    def test__add_fip_addr_to_device_error(self):
-        device = mock.Mock()
-        device.addr.add.side_effect = RuntimeError
-        result = self._test_add_fip_addr_to_device_error(device)
-        self.assertFalse(result)
-
-    def test_process_snat_dnat_for_fip(self):
-        ri = self._create_router()
-        ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception)
-
-        self.assertRaises(n_exc.FloatingIpSetupException,
-                          ri.process_snat_dnat_for_fip)
-
-        ri.process_floating_ip_nat_rules.assert_called_once_with()
-
-    def test_put_fips_in_error_state(self):
-        ri = self._create_router()
-        ri.router = mock.Mock()
-        ri.router.get.return_value = [{'id': mock.sentinel.id1},
-                                      {'id': mock.sentinel.id2}]
-
-        statuses = ri.put_fips_in_error_state()
-
-        expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR,
-                     mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}]
-        self.assertNotEqual(expected, statuses)
-
-    def test_configure_fip_addresses(self):
-        ri = self._create_router()
-        ri.process_floating_ip_addresses = mock.Mock(
-            side_effect=Exception)
-
-        self.assertRaises(n_exc.FloatingIpSetupException,
-                          ri.configure_fip_addresses,
-                          mock.sentinel.interface_name)
-
-        ri.process_floating_ip_addresses.assert_called_once_with(
-            mock.sentinel.interface_name)
-
-    def test_get_router_cidrs_returns_cidrs(self):
-        ri = self._create_router()
-        addresses = ['15.1.2.2/24', '15.1.2.3/32']
-        device = mock.MagicMock()
-        device.addr.list.return_value = [{'cidr': addresses[0]},
-                                         {'cidr': addresses[1]}]
-        self.assertEqual(set(addresses), ri.get_router_cidrs(device))
-
-
-@mock.patch.object(ip_lib, 'IPDevice')
-class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework):
-
-    def test_process_floating_ip_addresses_remap(self, IPDevice):
-        fip_id = _uuid()
-        fip = {
-            'id': fip_id, 'port_id': _uuid(),
-            'floating_ip_address': '15.1.2.3',
-            'fixed_ip_address': '192.168.0.2'
-        }
-
-        IPDevice.return_value = device = mock.Mock()
-        device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
-        ri = self._create_router()
-        ri.get_floating_ips = mock.Mock(return_value=[fip])
-
-        fip_statuses = ri.process_floating_ip_addresses(
-            mock.sentinel.interface_name)
-        self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE},
-                         fip_statuses)
-
-        self.assertFalse(device.addr.add.called)
-        self.assertFalse(device.addr.delete.called)
-
-    def test_process_router_with_disabled_floating_ip(self, IPDevice):
-        fip_id = _uuid()
-        fip = {
-            'id': fip_id, 'port_id': _uuid(),
-            'floating_ip_address': '15.1.2.3',
-            'fixed_ip_address': '192.168.0.2'
-        }
-
-        ri = self._create_router()
-        ri.floating_ips = [fip]
-        ri.get_floating_ips = mock.Mock(return_value=[])
-
-        fip_statuses = ri.process_floating_ip_addresses(
-            mock.sentinel.interface_name)
-
-        self.assertIsNone(fip_statuses.get(fip_id))
-
-    def test_process_router_floating_ip_with_device_add_error(self, IPDevice):
-        IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError)
-        device.addr.list.return_value = []
-        fip_id = _uuid()
-        fip = {
-            'id': fip_id, 'port_id': _uuid(),
-            'floating_ip_address': '15.1.2.3',
-            'fixed_ip_address': '192.168.0.2'
-        }
-        ri = self._create_router()
-        ri.add_floating_ip = mock.Mock(
-            return_value=l3_constants.FLOATINGIP_STATUS_ERROR)
-        ri.get_floating_ips = mock.Mock(return_value=[fip])
-
-        fip_statuses = ri.process_floating_ip_addresses(
-            mock.sentinel.interface_name)
-
-        self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR},
-                         fip_statuses)
-
-    # TODO(mrsmith): refactor for DVR cases
-    def test_process_floating_ip_addresses_remove(self, IPDevice):
-        IPDevice.return_value = device = mock.Mock()
-        device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
-
-        ri = self._create_router()
-        ri.remove_floating_ip = mock.Mock()
-        ri.router.get = mock.Mock(return_value=[])
-
-        fip_statuses = ri.process_floating_ip_addresses(
-            mock.sentinel.interface_name)
-        self.assertEqual({}, fip_statuses)
-        ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32')
index acb764d373cace7133a56d96819f57c24f5c35d6..04aa55748c3e494b3d3973055a39d8f829fa1dfd 100644 (file)
@@ -14,6 +14,9 @@ import mock
 
 from neutron.agent.common import config as agent_config
 from neutron.agent.l3 import router_info
+from neutron.agent.linux import ip_lib
+from neutron.common import constants as l3_constants
+from neutron.common import exceptions as n_exc
 from neutron.openstack.common import uuidutils
 from neutron.tests import base
 
@@ -104,3 +107,205 @@ class TestRouterInfo(base.BaseTestCase):
         expected = [['ip', 'route', 'delete', 'to', '110.100.30.0/24',
                     'via', '10.100.10.30']]
         self._check_agent_method_called(expected)
+
+
+class BasicRouterTestCaseFramework(base.BaseTestCase):
+    def _create_router(self, router=None, **kwargs):
+        if not router:
+            router = mock.MagicMock()
+        self.agent_conf = mock.Mock()
+        # NOTE The use_namespaces config will soon be deprecated
+        self.agent_conf.use_namespaces = True
+        self.router_id = _uuid()
+        return router_info.RouterInfo(self.router_id,
+                                      router,
+                                      self.agent_conf,
+                                      mock.sentinel.interface_driver,
+                                      **kwargs)
+
+
+class TestBasicRouterOperations(BasicRouterTestCaseFramework):
+
+    def test_get_floating_ips(self):
+        router = mock.MagicMock()
+        router.get.return_value = [mock.sentinel.floating_ip]
+        ri = self._create_router(router)
+
+        fips = ri.get_floating_ips()
+
+        self.assertEqual([mock.sentinel.floating_ip], fips)
+
+    def test_process_floating_ip_nat_rules(self):
+        ri = self._create_router()
+        fips = [{'fixed_ip_address': mock.sentinel.ip,
+                 'floating_ip_address': mock.sentinel.fip}]
+        ri.get_floating_ips = mock.Mock(return_value=fips)
+        ri.iptables_manager = mock.MagicMock()
+        ipv4_nat = ri.iptables_manager.ipv4['nat']
+        ri.floating_forward_rules = mock.Mock(
+            return_value=[(mock.sentinel.chain, mock.sentinel.rule)])
+
+        ri.process_floating_ip_nat_rules()
+
+        # Be sure that the rules are cleared first and apply is called last
+        self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
+                         ipv4_nat.mock_calls[0])
+        self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
+
+        # Be sure that add_rule is called somewhere in the middle
+        ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain,
+                                                  mock.sentinel.rule,
+                                                  tag='floating_ip')
+
+    def test_process_floating_ip_nat_rules_removed(self):
+        ri = self._create_router()
+        ri.get_floating_ips = mock.Mock(return_value=[])
+        ri.iptables_manager = mock.MagicMock()
+        ipv4_nat = ri.iptables_manager.ipv4['nat']
+
+        ri.process_floating_ip_nat_rules()
+
+        # Be sure that the rules are cleared first and apply is called last
+        self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
+                         ipv4_nat.mock_calls[0])
+        self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
+
+        # Be sure that add_rule is called somewhere in the middle
+        self.assertFalse(ipv4_nat.add_rule.called)
+
+    def _test_add_fip_addr_to_device_error(self, device):
+        ri = self._create_router()
+        ip = '15.1.2.3'
+
+        result = ri._add_fip_addr_to_device(
+            {'id': mock.sentinel.id, 'floating_ip_address': ip}, device)
+
+        device.addr.add.assert_called_with(ip + '/32')
+        return result
+
+    def test__add_fip_addr_to_device(self):
+        result = self._test_add_fip_addr_to_device_error(mock.Mock())
+        self.assertTrue(result)
+
+    def test__add_fip_addr_to_device_error(self):
+        device = mock.Mock()
+        device.addr.add.side_effect = RuntimeError
+        result = self._test_add_fip_addr_to_device_error(device)
+        self.assertFalse(result)
+
+    def test_process_snat_dnat_for_fip(self):
+        ri = self._create_router()
+        ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception)
+
+        self.assertRaises(n_exc.FloatingIpSetupException,
+                          ri.process_snat_dnat_for_fip)
+
+        ri.process_floating_ip_nat_rules.assert_called_once_with()
+
+    def test_put_fips_in_error_state(self):
+        ri = self._create_router()
+        ri.router = mock.Mock()
+        ri.router.get.return_value = [{'id': mock.sentinel.id1},
+                                      {'id': mock.sentinel.id2}]
+
+        statuses = ri.put_fips_in_error_state()
+
+        expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR,
+                     mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}]
+        self.assertNotEqual(expected, statuses)
+
+    def test_configure_fip_addresses(self):
+        ri = self._create_router()
+        ri.process_floating_ip_addresses = mock.Mock(
+            side_effect=Exception)
+
+        self.assertRaises(n_exc.FloatingIpSetupException,
+                          ri.configure_fip_addresses,
+                          mock.sentinel.interface_name)
+
+        ri.process_floating_ip_addresses.assert_called_once_with(
+            mock.sentinel.interface_name)
+
+    def test_get_router_cidrs_returns_cidrs(self):
+        ri = self._create_router()
+        addresses = ['15.1.2.2/24', '15.1.2.3/32']
+        device = mock.MagicMock()
+        device.addr.list.return_value = [{'cidr': addresses[0]},
+                                         {'cidr': addresses[1]}]
+        self.assertEqual(set(addresses), ri.get_router_cidrs(device))
+
+
+@mock.patch.object(ip_lib, 'IPDevice')
+class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework):
+
+    def test_process_floating_ip_addresses_remap(self, IPDevice):
+        fip_id = _uuid()
+        fip = {
+            'id': fip_id, 'port_id': _uuid(),
+            'floating_ip_address': '15.1.2.3',
+            'fixed_ip_address': '192.168.0.2'
+        }
+
+        IPDevice.return_value = device = mock.Mock()
+        device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
+        ri = self._create_router()
+        ri.get_floating_ips = mock.Mock(return_value=[fip])
+
+        fip_statuses = ri.process_floating_ip_addresses(
+            mock.sentinel.interface_name)
+        self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE},
+                         fip_statuses)
+
+        self.assertFalse(device.addr.add.called)
+        self.assertFalse(device.addr.delete.called)
+
+    def test_process_router_with_disabled_floating_ip(self, IPDevice):
+        fip_id = _uuid()
+        fip = {
+            'id': fip_id, 'port_id': _uuid(),
+            'floating_ip_address': '15.1.2.3',
+            'fixed_ip_address': '192.168.0.2'
+        }
+
+        ri = self._create_router()
+        ri.floating_ips = [fip]
+        ri.get_floating_ips = mock.Mock(return_value=[])
+
+        fip_statuses = ri.process_floating_ip_addresses(
+            mock.sentinel.interface_name)
+
+        self.assertIsNone(fip_statuses.get(fip_id))
+
+    def test_process_router_floating_ip_with_device_add_error(self, IPDevice):
+        IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError)
+        device.addr.list.return_value = []
+        fip_id = _uuid()
+        fip = {
+            'id': fip_id, 'port_id': _uuid(),
+            'floating_ip_address': '15.1.2.3',
+            'fixed_ip_address': '192.168.0.2'
+        }
+        ri = self._create_router()
+        ri.add_floating_ip = mock.Mock(
+            return_value=l3_constants.FLOATINGIP_STATUS_ERROR)
+        ri.get_floating_ips = mock.Mock(return_value=[fip])
+
+        fip_statuses = ri.process_floating_ip_addresses(
+            mock.sentinel.interface_name)
+
+        self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR},
+                         fip_statuses)
+
+    # TODO(mrsmith): refactor for DVR cases
+    def test_process_floating_ip_addresses_remove(self, IPDevice):
+        IPDevice.return_value = device = mock.Mock()
+        device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
+
+        ri = self._create_router()
+        ri.remove_floating_ip = mock.Mock()
+        ri.router.get = mock.Mock(return_value=[])
+
+        fip_statuses = ri.process_floating_ip_addresses(
+            mock.sentinel.interface_name)
+        self.assertEqual({}, fip_statuses)
+        ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32')
diff --git a/neutron/tests/unit/agent/linux/test_process_monitor.py b/neutron/tests/unit/agent/linux/test_process_monitor.py
deleted file mode 100644 (file)
index fe40938..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-# Copyright 2014 Red Hat Inc.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-#
-
-import mock
-
-from neutron.agent.linux import external_process
-from neutron.tests import base
-
-TEST_UUID = 'test-uuid'
-TEST_SERVICE = 'testsvc'
-TEST_PID = 1234
-
-
-class BaseTestProcessMonitor(base.BaseTestCase):
-
-    def setUp(self):
-        super(BaseTestProcessMonitor, self).setUp()
-        self.log_patch = mock.patch("neutron.agent.linux.external_process."
-                                    "LOG.error")
-        self.error_log = self.log_patch.start()
-
-        self.spawn_patch = mock.patch("eventlet.spawn")
-        self.eventlent_spawn = self.spawn_patch.start()
-
-        # create a default process monitor
-        self.create_child_process_monitor('respawn')
-
-    def create_child_process_monitor(self, action):
-        conf = mock.Mock()
-        conf.AGENT.check_child_processes_action = action
-        conf.AGENT.check_child_processes = True
-        self.pmonitor = external_process.ProcessMonitor(
-            config=conf,
-            resource_type='test')
-
-    def get_monitored_process(self, uuid, service=None):
-        monitored_process = mock.Mock()
-        self.pmonitor.register(uuid=uuid,
-                               service_name=service,
-                               monitored_process=monitored_process)
-        return monitored_process
-
-
-class TestProcessMonitor(BaseTestProcessMonitor):
-
-    def test_error_logged(self):
-        pm = self.get_monitored_process(TEST_UUID)
-        pm.active = False
-        self.pmonitor._check_child_processes()
-        self.assertTrue(self.error_log.called)
-
-    def test_exit_handler(self):
-        self.create_child_process_monitor('exit')
-        pm = self.get_monitored_process(TEST_UUID)
-        pm.active = False
-        with mock.patch.object(external_process.ProcessMonitor,
-                               '_exit_handler') as exit_handler:
-            self.pmonitor._check_child_processes()
-            exit_handler.assert_called_once_with(TEST_UUID, None)
-
-    def test_register(self):
-        pm = self.get_monitored_process(TEST_UUID)
-        self.assertEqual(len(self.pmonitor._monitored_processes), 1)
-        self.assertIn(pm, self.pmonitor._monitored_processes.values())
-
-    def test_register_same_service_twice(self):
-        self.get_monitored_process(TEST_UUID)
-        self.get_monitored_process(TEST_UUID)
-        self.assertEqual(len(self.pmonitor._monitored_processes), 1)
-
-    def test_register_different_service_types(self):
-        self.get_monitored_process(TEST_UUID)
-        self.get_monitored_process(TEST_UUID, TEST_SERVICE)
-        self.assertEqual(len(self.pmonitor._monitored_processes), 2)
-
-    def test_unregister(self):
-        self.get_monitored_process(TEST_UUID)
-        self.pmonitor.unregister(TEST_UUID, None)
-        self.assertEqual(len(self.pmonitor._monitored_processes), 0)
-
-    def test_unregister_unknown_process(self):
-        self.pmonitor.unregister(TEST_UUID, None)
-        self.assertEqual(len(self.pmonitor._monitored_processes), 0)
index dd98d9046f5f5df232f9ea7467fbc9d2635d9b9f..c4b706bd5ab26474da61593e8d0c1fc51c79e515 100644 (file)
@@ -40,8 +40,7 @@ from neutron.tests import fake_notifier
 from neutron.tests.unit import testlib_api
 
 
-ROOTDIR = os.path.dirname(os.path.dirname(__file__))
-EXTDIR = os.path.join(ROOTDIR, 'unit/extensions')
+EXTDIR = os.path.join(base.ROOTDIR, 'unit/extensions')
 
 _uuid = uuidutils.generate_uuid
 
index b62e0f1da6ad60022d7e31ac0ff6109f0144b2b9..842cc50c986d25b794998ea458c33e2a2e9c18b8 100644 (file)
@@ -5458,3 +5458,63 @@ class NeutronDbPluginV2AsMixinTestCase(testlib_api.SqlTestCase):
         self.net_data['network']['status'] = 'BUILD'
         net = self.plugin.create_network(self.context, self.net_data)
         self.assertEqual(net['status'], 'BUILD')
+
+
+class TestNetworks(testlib_api.SqlTestCase):
+    def setUp(self):
+        super(TestNetworks, self).setUp()
+        self._tenant_id = 'test-tenant'
+
+        # Update the plugin
+        self.setup_coreplugin(DB_PLUGIN_KLASS)
+
+    def _create_network(self, plugin, ctx, shared=True):
+        network = {'network': {'name': 'net',
+                               'shared': shared,
+                               'admin_state_up': True,
+                               'tenant_id': self._tenant_id}}
+        created_network = plugin.create_network(ctx, network)
+        return (network, created_network['id'])
+
+    def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id):
+        port = {'port': {'name': 'port',
+                         'network_id': net_id,
+                         'mac_address': attributes.ATTR_NOT_SPECIFIED,
+                         'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
+                         'admin_state_up': True,
+                         'device_id': 'device_id',
+                         'device_owner': device_owner,
+                         'tenant_id': tenant_id}}
+        plugin.create_port(ctx, port)
+
+    def _test_update_shared_net_used(self,
+                                     device_owner,
+                                     expected_exception=None):
+        plugin = manager.NeutronManager.get_plugin()
+        ctx = context.get_admin_context()
+        network, net_id = self._create_network(plugin, ctx)
+
+        self._create_port(plugin,
+                          ctx,
+                          net_id,
+                          device_owner,
+                          self._tenant_id + '1')
+
+        network['network']['shared'] = False
+
+        if (expected_exception):
+            with testlib_api.ExpectedException(expected_exception):
+                plugin.update_network(ctx, net_id, network)
+        else:
+            plugin.update_network(ctx, net_id, network)
+
+    def test_update_shared_net_used_fails(self):
+        self._test_update_shared_net_used('', n_exc.InvalidSharedSetting)
+
+    def test_update_shared_net_used_as_router_gateway(self):
+        self._test_update_shared_net_used(
+            constants.DEVICE_OWNER_ROUTER_GW)
+
+    def test_update_shared_net_used_by_floating_ip(self):
+        self._test_update_shared_net_used(
+            constants.DEVICE_OWNER_FLOATINGIP)
diff --git a/neutron/tests/unit/test_db_plugin_level.py b/neutron/tests/unit/test_db_plugin_level.py
deleted file mode 100644 (file)
index a0ebbb5..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-# Copyright (c) 2014 Red Hat, Inc.
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-from neutron.api.v2 import attributes
-from neutron.common import constants
-from neutron.common import exceptions as n_exc
-from neutron import context
-from neutron import manager
-from neutron.tests.unit import test_db_plugin
-from neutron.tests.unit import testlib_api
-
-
-class TestNetworks(testlib_api.SqlTestCase):
-    def setUp(self):
-        super(TestNetworks, self).setUp()
-        self._tenant_id = 'test-tenant'
-
-        # Update the plugin
-        self.setup_coreplugin(test_db_plugin.DB_PLUGIN_KLASS)
-
-    def _create_network(self, plugin, ctx, shared=True):
-        network = {'network': {'name': 'net',
-                               'shared': shared,
-                               'admin_state_up': True,
-                               'tenant_id': self._tenant_id}}
-        created_network = plugin.create_network(ctx, network)
-        return (network, created_network['id'])
-
-    def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id):
-        port = {'port': {'name': 'port',
-                         'network_id': net_id,
-                         'mac_address': attributes.ATTR_NOT_SPECIFIED,
-                         'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
-                         'admin_state_up': True,
-                         'device_id': 'device_id',
-                         'device_owner': device_owner,
-                         'tenant_id': tenant_id}}
-        plugin.create_port(ctx, port)
-
-    def _test_update_shared_net_used(self,
-                                     device_owner,
-                                     expected_exception=None):
-        plugin = manager.NeutronManager.get_plugin()
-        ctx = context.get_admin_context()
-        network, net_id = self._create_network(plugin, ctx)
-
-        self._create_port(plugin,
-                          ctx,
-                          net_id,
-                          device_owner,
-                          self._tenant_id + '1')
-
-        network['network']['shared'] = False
-
-        if (expected_exception):
-            with testlib_api.ExpectedException(expected_exception):
-                plugin.update_network(ctx, net_id, network)
-        else:
-            plugin.update_network(ctx, net_id, network)
-
-    def test_update_shared_net_used_fails(self):
-        self._test_update_shared_net_used('', n_exc.InvalidSharedSetting)
-
-    def test_update_shared_net_used_as_router_gateway(self):
-        self._test_update_shared_net_used(
-            constants.DEVICE_OWNER_ROUTER_GW)
-
-    def test_update_shared_net_used_by_floating_ip(self):
-        self._test_update_shared_net_used(
-            constants.DEVICE_OWNER_FLOATINGIP)
diff --git a/neutron/tests/unit/test_extension_extended_attribute.py b/neutron/tests/unit/test_extension_extended_attribute.py
deleted file mode 100644 (file)
index 7ff48c8..0000000
+++ /dev/null
@@ -1,154 +0,0 @@
-# Copyright 2013 VMware, Inc
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-"""
-Unit tests for extension extended attribute
-"""
-
-from oslo_config import cfg
-import webob.exc as webexc
-
-import neutron
-from neutron.api import extensions
-from neutron.api.v2 import attributes
-from neutron.common import config
-from neutron import manager
-from neutron.plugins.common import constants
-from neutron.plugins.ml2 import plugin as ml2_plugin
-from neutron import quota
-from neutron.tests import base
-from neutron.tests.unit.extensions import extendedattribute as extattr
-from neutron.tests.unit import test_api_v2
-from neutron.tests.unit import testlib_api
-from neutron import wsgi
-
-_uuid = test_api_v2._uuid
-_get_path = test_api_v2._get_path
-extensions_path = ':'.join(neutron.tests.unit.extensions.__path__)
-
-
-class ExtensionExtendedAttributeTestPlugin(
-    ml2_plugin.Ml2Plugin):
-
-    supported_extension_aliases = [
-        'ext-obj-test', "extended-ext-attr"
-    ]
-
-    def __init__(self, configfile=None):
-        super(ExtensionExtendedAttributeTestPlugin, self)
-        self.objs = []
-        self.objh = {}
-
-    def create_ext_test_resource(self, context, ext_test_resource):
-        obj = ext_test_resource['ext_test_resource']
-        id = _uuid()
-        obj['id'] = id
-        self.objs.append(obj)
-        self.objh.update({id: obj})
-        return obj
-
-    def get_ext_test_resources(self, context, filters=None, fields=None):
-        return self.objs
-
-    def get_ext_test_resource(self, context, id, fields=None):
-        return self.objh[id]
-
-
-class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
-    def setUp(self):
-        super(ExtensionExtendedAttributeTestCase, self).setUp()
-        plugin = (
-            "neutron.tests.unit.test_extension_extended_attribute."
-            "ExtensionExtendedAttributeTestPlugin"
-        )
-
-        # point config file to: neutron/tests/etc/neutron.conf.test
-        self.config_parse()
-
-        self.setup_coreplugin(plugin)
-
-        ext_mgr = extensions.PluginAwareExtensionManager(
-            extensions_path,
-            {constants.CORE: ExtensionExtendedAttributeTestPlugin}
-        )
-        ext_mgr.extend_resources("2.0", {})
-        extensions.PluginAwareExtensionManager._instance = ext_mgr
-
-        app = config.load_paste_app('extensions_test_app')
-        self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
-
-        self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
-        # Save the global RESOURCE_ATTRIBUTE_MAP
-        self.saved_attr_map = {}
-        for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
-            self.saved_attr_map[resource] = attrs.copy()
-        # Add the resources to the global attribute map
-        # This is done here as the setup process won't
-        # initialize the main API router which extends
-        # the global attribute map
-        attributes.RESOURCE_ATTRIBUTE_MAP.update(
-            extattr.EXTENDED_ATTRIBUTES_2_0)
-        self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin()
-        self.addCleanup(self.restore_attribute_map)
-
-        quota.QUOTAS._driver = None
-        cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
-                              group='QUOTAS')
-
-    def restore_attribute_map(self):
-        # Restore the original RESOURCE_ATTRIBUTE_MAP
-        attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
-
-    def _do_request(self, method, path, data=None, params=None, action=None):
-        content_type = 'application/json'
-        body = None
-        if data is not None:  # empty dict is valid
-            body = wsgi.Serializer().serialize(data, content_type)
-
-        req = testlib_api.create_request(
-            path, body, content_type,
-            method, query_string=params)
-        res = req.get_response(self._api)
-        if res.status_code >= 400:
-            raise webexc.HTTPClientError(detail=res.body, code=res.status_code)
-        if res.status_code != webexc.HTTPNoContent.code:
-            return res.json
-
-    def _ext_test_resource_create(self, attr=None):
-        data = {
-            "ext_test_resource": {
-                "tenant_id": self._tenant_id,
-                "name": "test",
-                extattr.EXTENDED_ATTRIBUTE: attr
-            }
-        }
-
-        res = self._do_request('POST', _get_path('ext_test_resources'), data)
-        return res['ext_test_resource']
-
-    def test_ext_test_resource_create(self):
-        ext_test_resource = self._ext_test_resource_create()
-        attr = _uuid()
-        ext_test_resource = self._ext_test_resource_create(attr)
-        self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr)
-
-    def test_ext_test_resource_get(self):
-        attr = _uuid()
-        obj = self._ext_test_resource_create(attr)
-        obj_id = obj['id']
-        res = self._do_request('GET', _get_path(
-            'ext_test_resources/{0}'.format(obj_id)))
-        obj2 = res['ext_test_resource']
-        self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr)
index e1407299334cfe67e852fcd12f8d0a57e949ae4e..ae533032b033f00d149b5f963cf84aae00c498fa 100644 (file)
 import abc
 
 import mock
+from oslo_config import cfg
 from oslo_log import log as logging
 from oslo_serialization import jsonutils
 import routes
 import webob
+import webob.exc as webexc
 import webtest
 
+import neutron
 from neutron.api import extensions
+from neutron.api.v2 import attributes
 from neutron.common import config
 from neutron.common import exceptions
 from neutron.db import db_base_plugin_v2
+from neutron import manager
 from neutron.plugins.common import constants
+from neutron.plugins.ml2 import plugin as ml2_plugin
+from neutron import quota
 from neutron.tests import base
 from neutron.tests.unit import extension_stubs as ext_stubs
 import neutron.tests.unit.extensions
+from neutron.tests.unit.extensions import extendedattribute as extattr
+from neutron.tests.unit import test_api_v2
 from neutron.tests.unit import testlib_api
 from neutron import wsgi
 
 
 LOG = logging.getLogger(__name__)
+_uuid = test_api_v2._uuid
+_get_path = test_api_v2._get_path
 extensions_path = ':'.join(neutron.tests.unit.extensions.__path__)
 
 
@@ -677,3 +688,118 @@ class SimpleExtensionManager(object):
         if self.request_ext:
             request_extensions.append(self.request_ext)
         return request_extensions
+
+
+class ExtensionExtendedAttributeTestPlugin(
+    ml2_plugin.Ml2Plugin):
+
+    supported_extension_aliases = [
+        'ext-obj-test', "extended-ext-attr"
+    ]
+
+    def __init__(self, configfile=None):
+        super(ExtensionExtendedAttributeTestPlugin, self)
+        self.objs = []
+        self.objh = {}
+
+    def create_ext_test_resource(self, context, ext_test_resource):
+        obj = ext_test_resource['ext_test_resource']
+        id = _uuid()
+        obj['id'] = id
+        self.objs.append(obj)
+        self.objh.update({id: obj})
+        return obj
+
+    def get_ext_test_resources(self, context, filters=None, fields=None):
+        return self.objs
+
+    def get_ext_test_resource(self, context, id, fields=None):
+        return self.objh[id]
+
+
+class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
+    def setUp(self):
+        super(ExtensionExtendedAttributeTestCase, self).setUp()
+        plugin = (
+            "neutron.tests.unit.test_extensions."
+            "ExtensionExtendedAttributeTestPlugin"
+        )
+
+        # point config file to: neutron/tests/etc/neutron.conf.test
+        self.config_parse()
+
+        self.setup_coreplugin(plugin)
+
+        ext_mgr = extensions.PluginAwareExtensionManager(
+            extensions_path,
+            {constants.CORE: ExtensionExtendedAttributeTestPlugin}
+        )
+        ext_mgr.extend_resources("2.0", {})
+        extensions.PluginAwareExtensionManager._instance = ext_mgr
+
+        app = config.load_paste_app('extensions_test_app')
+        self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
+
+        self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
+        # Save the global RESOURCE_ATTRIBUTE_MAP
+        self.saved_attr_map = {}
+        for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+            self.saved_attr_map[resource] = attrs.copy()
+        # Add the resources to the global attribute map
+        # This is done here as the setup process won't
+        # initialize the main API router which extends
+        # the global attribute map
+        attributes.RESOURCE_ATTRIBUTE_MAP.update(
+            extattr.EXTENDED_ATTRIBUTES_2_0)
+        self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin()
+        self.addCleanup(self.restore_attribute_map)
+
+        quota.QUOTAS._driver = None
+        cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
+                              group='QUOTAS')
+
+    def restore_attribute_map(self):
+        # Restore the original RESOURCE_ATTRIBUTE_MAP
+        attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+
+    def _do_request(self, method, path, data=None, params=None, action=None):
+        content_type = 'application/json'
+        body = None
+        if data is not None:  # empty dict is valid
+            body = wsgi.Serializer().serialize(data, content_type)
+
+        req = testlib_api.create_request(
+            path, body, content_type,
+            method, query_string=params)
+        res = req.get_response(self._api)
+        if res.status_code >= 400:
+            raise webexc.HTTPClientError(detail=res.body, code=res.status_code)
+        if res.status_code != webexc.HTTPNoContent.code:
+            return res.json
+
+    def _ext_test_resource_create(self, attr=None):
+        data = {
+            "ext_test_resource": {
+                "tenant_id": self._tenant_id,
+                "name": "test",
+                extattr.EXTENDED_ATTRIBUTE: attr
+            }
+        }
+
+        res = self._do_request('POST', _get_path('ext_test_resources'), data)
+        return res['ext_test_resource']
+
+    def test_ext_test_resource_create(self):
+        ext_test_resource = self._ext_test_resource_create()
+        attr = _uuid()
+        ext_test_resource = self._ext_test_resource_create(attr)
+        self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr)
+
+    def test_ext_test_resource_get(self):
+        attr = _uuid()
+        obj = self._ext_test_resource_create(attr)
+        obj_id = obj['id']
+        res = self._do_request('GET', _get_path(
+            'ext_test_resources/{0}'.format(obj_id)))
+        obj2 = res['ext_test_resource']
+        self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr)
index c2dd542207e68a13d11c408e5587fcde9e5a5b9d..b6ab45e251eeb71d8492a8fe14a84e3da9f2d1a1 100644 (file)
@@ -20,6 +20,83 @@ from neutron.agent.linux import utils
 from neutron.tests import base
 
 
+TEST_UUID = 'test-uuid'
+TEST_SERVICE = 'testsvc'
+TEST_PID = 1234
+
+
+class BaseTestProcessMonitor(base.BaseTestCase):
+
+    def setUp(self):
+        super(BaseTestProcessMonitor, self).setUp()
+        self.log_patch = mock.patch("neutron.agent.linux.external_process."
+                                    "LOG.error")
+        self.error_log = self.log_patch.start()
+
+        self.spawn_patch = mock.patch("eventlet.spawn")
+        self.eventlent_spawn = self.spawn_patch.start()
+
+        # create a default process monitor
+        self.create_child_process_monitor('respawn')
+
+    def create_child_process_monitor(self, action):
+        conf = mock.Mock()
+        conf.AGENT.check_child_processes_action = action
+        conf.AGENT.check_child_processes = True
+        self.pmonitor = ep.ProcessMonitor(
+            config=conf,
+            resource_type='test')
+
+    def get_monitored_process(self, uuid, service=None):
+        monitored_process = mock.Mock()
+        self.pmonitor.register(uuid=uuid,
+                               service_name=service,
+                               monitored_process=monitored_process)
+        return monitored_process
+
+
+class TestProcessMonitor(BaseTestProcessMonitor):
+
+    def test_error_logged(self):
+        pm = self.get_monitored_process(TEST_UUID)
+        pm.active = False
+        self.pmonitor._check_child_processes()
+        self.assertTrue(self.error_log.called)
+
+    def test_exit_handler(self):
+        self.create_child_process_monitor('exit')
+        pm = self.get_monitored_process(TEST_UUID)
+        pm.active = False
+        with mock.patch.object(ep.ProcessMonitor,
+                               '_exit_handler') as exit_handler:
+            self.pmonitor._check_child_processes()
+            exit_handler.assert_called_once_with(TEST_UUID, None)
+
+    def test_register(self):
+        pm = self.get_monitored_process(TEST_UUID)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 1)
+        self.assertIn(pm, self.pmonitor._monitored_processes.values())
+
+    def test_register_same_service_twice(self):
+        self.get_monitored_process(TEST_UUID)
+        self.get_monitored_process(TEST_UUID)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 1)
+
+    def test_register_different_service_types(self):
+        self.get_monitored_process(TEST_UUID)
+        self.get_monitored_process(TEST_UUID, TEST_SERVICE)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 2)
+
+    def test_unregister(self):
+        self.get_monitored_process(TEST_UUID)
+        self.pmonitor.unregister(TEST_UUID, None)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 0)
+
+    def test_unregister_unknown_process(self):
+        self.pmonitor.unregister(TEST_UUID, None)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 0)
+
+
 class TestProcessManager(base.BaseTestCase):
     def setUp(self):
         super(TestProcessManager, self).setUp()