import six
import testtools
-from neutron.tests import sub_base
+from neutron.tests import base
@six.add_metaclass(abc.ABCMeta)
pass
-class BaseTestApi(sub_base.SubBaseTestCase):
+class BaseTestApi(base.BaseTestCase):
scenarios = ()
self.client.setUp(self)
def test_network_lifecycle(self):
- net = self.client.create_network(name=sub_base.get_rand_name())
+ net = self.client.create_network(name=base.get_rand_name())
listed_networks = dict((x.id, x.name)
for x in self.client.get_networks())
self.assertIn(net.id, listed_networks)
import testscenarios
from neutron.tests.api import base_v2
-from neutron.tests import sub_base
-from neutron.tests.tempest import test as t_test
+from neutron.tests import base
+from neutron.tests.tempest.api.network import base as t_base
# Required to generate tests from scenarios. Not compatible with nose.
load_tests = testscenarios.load_tests_apply_scenarios
@property
def client(self):
if not hasattr(self, '_client'):
- manager = t_test.BaseTestCase.get_client_manager()
+ manager = t_base.BaseNetworkTest.get_client_manager()
self._client = manager.network_client
return self._client
def _create_network(self, **kwargs):
# Internal method - use create_network() instead
body = self.client.create_network(**kwargs)
- return sub_base.AttributeDict(body['network'])
+ return base.AttributeDict(body['network'])
def update_network(self, id_, **kwargs):
body = self.client.update_network(id_, **kwargs)
- return sub_base.AttributeDict(body['network'])
+ return base.AttributeDict(body['network'])
def get_network(self, id_, **kwargs):
body = self.client.show_network(id_, **kwargs)
- return sub_base.AttributeDict(body['network'])
+ return base.AttributeDict(body['network'])
def get_networks(self, **kwargs):
body = self.client.list_networks(**kwargs)
- return [sub_base.AttributeDict(x) for x in body['networks']]
+ return [base.AttributeDict(x) for x in body['networks']]
def delete_network(self, id_):
self.client.delete_network(id_)
class TestApiWithRestClient(base_v2.BaseTestApi):
scenarios = [('tempest', {'client': TempestRestClient()})]
+
+ def setUp(self):
+ raise self.skipException(
+ 'Tempest fixture requirements prevent this test from running')
# License for the specific language governing permissions and limitations
# under the License.
-"""Base test case for tests that do not rely on Tempest.
-
-To change behavoir that is common to all tests, please target
-the neutron.tests.sub_base module instead.
-
-If a test needs to import a dependency like Tempest, see
-neutron.tests.sub_base for a base test class that can be used without
-errors due to duplicate configuration definitions.
+"""Base test cases for all neutron tests.
"""
+import contextlib
+import gc
import logging as std_logging
+import os
import os.path
+import random
+import traceback
+import weakref
+import eventlet.timeout
import fixtures
import mock
from oslo_concurrency.fixture import lockutils
from oslo_config import cfg
from oslo_messaging import conffixture as messaging_conffixture
+from oslo_utils import strutils
+import testtools
from neutron.agent.linux import external_process
from neutron.common import config
from neutron.common import rpc as n_rpc
+from neutron.db import agentschedulers_db
+from neutron import manager
from neutron import policy
from neutron.tests import fake_notifier
-from neutron.tests import sub_base
+from neutron.tests import post_mortem_debug
CONF = cfg.CONF
CONF.import_opt('state_path', 'neutron.common.config')
-LOG_FORMAT = sub_base.LOG_FORMAT
+LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
TEST_ROOT_DIR = os.path.dirname(__file__)
return []
-bool_from_env = sub_base.bool_from_env
+def get_rand_name(max_length=None, prefix='test'):
+ name = prefix + str(random.randint(1, 0x7fffffff))
+ return name[:max_length] if max_length is not None else name
+
+
+def bool_from_env(key, strict=False, default=False):
+ value = os.environ.get(key)
+ return strutils.bool_from_string(value, strict=strict, default=default)
+
+
+class AttributeDict(dict):
+
+ """
+ Provide attribute access (dict.key) to dictionary values.
+ """
+
+ def __getattr__(self, name):
+ """Allow attribute access for all keys in the dict."""
+ if name in self:
+ return self[name]
+ raise AttributeError(_("Unknown attribute '%s'.") % name)
+
+
+class DietTestCase(testtools.TestCase):
+ """Same great taste, less filling.
+
+ BaseTestCase is responsible for doing lots of plugin-centric setup
+ that not all tests require (or can tolerate). This class provides
+ only functionality that is common across all tests.
+ """
+
+ def setUp(self):
+ super(DietTestCase, self).setUp()
+
+ # Configure this first to ensure pm debugging support for setUp()
+ debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
+ if debugger:
+ self.addOnException(post_mortem_debug.get_exception_handler(
+ debugger))
+
+ if bool_from_env('OS_DEBUG'):
+ _level = std_logging.DEBUG
+ else:
+ _level = std_logging.INFO
+ capture_logs = bool_from_env('OS_LOG_CAPTURE')
+ if not capture_logs:
+ std_logging.basicConfig(format=LOG_FORMAT, level=_level)
+ self.log_fixture = self.useFixture(
+ fixtures.FakeLogger(
+ format=LOG_FORMAT,
+ level=_level,
+ nuke_handlers=capture_logs,
+ ))
+
+ test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
+ if test_timeout == -1:
+ test_timeout = 0
+ if test_timeout > 0:
+ self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
+
+ # If someone does use tempfile directly, ensure that it's cleaned up
+ self.useFixture(fixtures.NestedTempfile())
+ self.useFixture(fixtures.TempHomeDir())
+
+ self.addCleanup(mock.patch.stopall)
+
+ if bool_from_env('OS_STDOUT_CAPTURE'):
+ stdout = self.useFixture(fixtures.StringStream('stdout')).stream
+ self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
+ if bool_from_env('OS_STDERR_CAPTURE'):
+ stderr = self.useFixture(fixtures.StringStream('stderr')).stream
+ self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
+
+ self.addOnException(self.check_for_systemexit)
+
+ def check_for_systemexit(self, exc_info):
+ if isinstance(exc_info[1], SystemExit):
+ self.fail("A SystemExit was raised during the test. %s"
+ % traceback.format_exception(*exc_info))
+
+ @contextlib.contextmanager
+ def assert_max_execution_time(self, max_execution_time=5):
+ with eventlet.timeout.Timeout(max_execution_time, False):
+ yield
+ return
+ self.fail('Execution of this test timed out')
+
+ def assertOrderedEqual(self, expected, actual):
+ expect_val = self.sort_dict_lists(expected)
+ actual_val = self.sort_dict_lists(actual)
+ self.assertEqual(expect_val, actual_val)
+
+ def sort_dict_lists(self, dic):
+ for key, value in dic.iteritems():
+ if isinstance(value, list):
+ dic[key] = sorted(value)
+ elif isinstance(value, dict):
+ dic[key] = self.sort_dict_lists(value)
+ return dic
+
+ def assertDictSupersetOf(self, expected_subset, actual_superset):
+ """Checks that actual dict contains the expected dict.
+
+ After checking that the arguments are of the right type, this checks
+ that each item in expected_subset is in, and matches, what is in
+ actual_superset. Separate tests are done, so that detailed info can
+ be reported upon failure.
+ """
+ if not isinstance(expected_subset, dict):
+ self.fail("expected_subset (%s) is not an instance of dict" %
+ type(expected_subset))
+ if not isinstance(actual_superset, dict):
+ self.fail("actual_superset (%s) is not an instance of dict" %
+ type(actual_superset))
+ for k, v in expected_subset.items():
+ self.assertIn(k, actual_superset)
+ self.assertEqual(v, actual_superset[k],
+ "Key %(key)s expected: %(exp)r, actual %(act)r" %
+ {'key': k, 'exp': v, 'act': actual_superset[k]})
class ProcessMonitorFixture(fixtures.Fixture):
self.instances.append(instance)
-class BaseTestCase(sub_base.SubBaseTestCase):
+class BaseTestCase(DietTestCase):
@staticmethod
def config_parse(conf=None, args=None):
group = kw.pop('group', None)
for k, v in kw.iteritems():
CONF.set_override(k, v, group)
+
+ def setup_coreplugin(self, core_plugin=None):
+ self.useFixture(PluginFixture(core_plugin))
+
+ def setup_notification_driver(self, notification_driver=None):
+ self.addCleanup(fake_notifier.reset)
+ if notification_driver is None:
+ notification_driver = [fake_notifier.__name__]
+ cfg.CONF.set_override("notification_driver", notification_driver)
+
+
+class PluginFixture(fixtures.Fixture):
+
+ def __init__(self, core_plugin=None):
+ self.core_plugin = core_plugin
+
+ def setUp(self):
+ super(PluginFixture, self).setUp()
+ self.dhcp_periodic_p = mock.patch(
+ 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
+ 'start_periodic_dhcp_agent_status_check')
+ self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
+ # Plugin cleanup should be triggered last so that
+ # test-specific cleanup has a chance to release references.
+ self.addCleanup(self.cleanup_core_plugin)
+ if self.core_plugin is not None:
+ cfg.CONF.set_override('core_plugin', self.core_plugin)
+
+ def cleanup_core_plugin(self):
+ """Ensure that the core plugin is deallocated."""
+ nm = manager.NeutronManager
+ if not nm.has_instance():
+ return
+
+ # TODO(marun) Fix plugins that do not properly initialize notifiers
+ agentschedulers_db.AgentSchedulerDbMixin.agent_notifiers = {}
+
+ # Perform a check for deallocation only if explicitly
+ # configured to do so since calling gc.collect() after every
+ # test increases test suite execution time by ~50%.
+ check_plugin_deallocation = (
+ bool_from_env('OS_CHECK_PLUGIN_DEALLOCATION'))
+ if check_plugin_deallocation:
+ plugin = weakref.ref(nm._instance.plugin)
+
+ nm.clear_instance()
+
+ if check_plugin_deallocation:
+ gc.collect()
+
+ # TODO(marun) Ensure that mocks are deallocated?
+ if plugin() and not isinstance(plugin(), mock.Base):
+ raise AssertionError(
+ 'The plugin for this test was not deallocated.')
#
from neutron.common import constants as n_const
-from neutron.tests import sub_base
+from neutron.tests import base
def create_resource(prefix, creation_func, *args, **kwargs):
:param *args *kwargs: These will be passed to the create function.
"""
while True:
- name = sub_base.get_rand_name(
+ name = base.get_rand_name(
max_length=n_const.DEVICE_NAME_MAX_LEN,
prefix=prefix)
try:
from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ip_lib
from neutron.common import constants as n_const
-from neutron.tests.common import base
+from neutron.tests import base as tests_base
+from neutron.tests.common import base as common_base
from neutron.tests.common import net_helpers
from neutron.tests.functional import base as functional_base
-from neutron.tests import sub_base
BR_PREFIX = 'test-br'
#TODO(jschwarz): Move these two functions to neutron/tests/common/
-get_rand_name = sub_base.get_rand_name
+get_rand_name = tests_base.get_rand_name
def get_rand_bridge_name():
self.ip = ip_lib.IPWrapper()
def create_ovs_bridge(self, br_prefix=BR_PREFIX):
- br = base.create_resource(br_prefix, self.ovs.add_bridge)
+ br = common_base.create_resource(br_prefix, self.ovs.add_bridge)
self.addCleanup(br.destroy)
return br
br.replace_port(name, ('type', 'internal'))
self.addCleanup(br.delete_port, name)
return name
- port_name = base.create_resource(PORT_PREFIX, create_port)
+ port_name = common_base.create_resource(PORT_PREFIX, create_port)
port_dev = self.ip.device(port_name)
ns.add_device_to_namespace(port_dev)
port_dev.link.set_up()
from neutron import context
from neutron import manager
from neutron.tests.api import base_v2
-from neutron.tests import sub_base
+from neutron.tests import base
from neutron.tests.unit.ml2 import test_ml2_plugin
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
# Each plugin must add a class to plugin_configurations that can configure the
kwargs.setdefault('shared', False)
data = dict(network=kwargs)
result = self.plugin.create_network(self.ctx, data)
- return sub_base.AttributeDict(result)
+ return base.AttributeDict(result)
def update_network(self, id_, **kwargs):
data = dict(network=kwargs)
result = self.plugin.update_network(self.ctx, id_, data)
- return sub_base.AttributeDict(result)
+ return base.AttributeDict(result)
def get_network(self, *args, **kwargs):
result = self.plugin.get_network(self.ctx, *args, **kwargs)
- return sub_base.AttributeDict(result)
+ return base.AttributeDict(result)
def get_networks(self, *args, **kwargs):
result = self.plugin.get_networks(self.ctx, *args, **kwargs)
- return [sub_base.AttributeDict(x) for x in result]
+ return [base.AttributeDict(x) for x in result]
def delete_network(self, id_):
self.plugin.delete_network(self.ctx, id_)
class TestPluginApi(base_v2.BaseTestApi,
- testlib_api.SqlTestCase,
- testlib_plugin.PluginSetupHelper):
+ testlib_api.SqlTestCase):
scenarios = get_scenarios()
# setUp, since that setup will be called by SqlTestCase.setUp.
super(TestPluginApi, self).setUp(setup_parent=False)
testlib_api.SqlTestCase.setUp(self)
- self.setup_coreplugin(self.plugin_conf.plugin_name)
+ self.useFixture(base.PluginFixture(self.plugin_conf.plugin_name))
self.plugin_conf.setUp(self)
from neutron.openstack.common import uuidutils
from neutron.scheduler import l3_agent_scheduler
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
_uuid = uuidutils.generate_uuid
pass
-class L3HATestFramework(testlib_api.SqlTestCase,
- testlib_plugin.PluginSetupHelper):
+class L3HATestFramework(testlib_api.SqlTestCase):
def setUp(self):
super(L3HATestFramework, self).setUp()
from neutron.services.metering.agents import metering_agent
from neutron.tests import base
from neutron.tests import fake_notifier
-from neutron.tests.unit import testlib_plugin
_uuid = uuidutils.generate_uuid
'id': _uuid()}]
-class TestMeteringOperations(base.BaseTestCase,
- testlib_plugin.NotificationSetupHelper):
+class TestMeteringOperations(base.BaseTestCase):
def setUp(self):
super(TestMeteringOperations, self).setUp()
from neutron.tests import base
from neutron.tests import fake_notifier
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
self.assertEqual(link['rel'], 'self')
-class APIv2TestBase(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
+class APIv2TestBase(base.BaseTestCase):
def setUp(self):
super(APIv2TestBase, self).setUp()
self.assertEqual(res.status_int, 400)
-class SubresourceTest(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
+class SubresourceTest(base.BaseTestCase):
def setUp(self):
super(SubresourceTest, self).setUp()
self.assertEqual(res.status_int, exc.HTTPCreated.code)
-class ExtensionTestCase(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
+class ExtensionTestCase(base.BaseTestCase):
def setUp(self):
super(ExtensionTestCase, self).setUp()
plugin = 'neutron.neutron_plugin_base_v2.NeutronPluginBaseV2'
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
-class ExtensionTestCase(testlib_api.WebTestCase,
- testlib_plugin.PluginSetupHelper):
+class ExtensionTestCase(testlib_api.WebTestCase):
def _resotre_attr_map(self):
attributes.RESOURCE_ATTRIBUTE_MAP = self._saved_attr_map
from neutron.tests import base
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
return 'create_%s' % resource
-class NeutronDbPluginV2TestCase(testlib_api.WebTestCase,
- testlib_plugin.PluginSetupHelper):
+class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
resource_prefix_map = {}
from neutron import manager
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
-class TestNetworks(testlib_api.SqlTestCase,
- testlib_plugin.PluginSetupHelper):
+class TestNetworks(testlib_api.SqlTestCase):
def setUp(self):
super(TestNetworks, self).setUp()
self._tenant_id = 'test-tenant'
from neutron.extensions import dhcpagentscheduler
from neutron.scheduler import dhcp_agent_scheduler
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
# Required to generate tests from scenarios. Not compatible with nose.
load_tests = testscenarios.load_tests_apply_scenarios
self.assertIn('foo4', res_ids)
-class DHCPAgentWeightSchedulerTestCase(TestDhcpSchedulerBaseTestCase,
- testlib_plugin.PluginSetupHelper):
+class DHCPAgentWeightSchedulerTestCase(TestDhcpSchedulerBaseTestCase):
"""Unit test scenarios for WeightScheduler.schedule."""
hostc = {
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
_uuid = uuidutils.generate_uuid
FAKE_GW_PORT_ID = _uuid()
supported_extension_aliases = ["router", "ext-gw-mode"]
-class TestL3GwModeMixin(testlib_api.SqlTestCase,
- testlib_plugin.PluginSetupHelper):
+class TestL3GwModeMixin(testlib_api.SqlTestCase):
def setUp(self):
super(TestL3GwModeMixin, self).setUp()
from neutron.tests.unit.extensions import extendedattribute as extattr
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
from neutron import wsgi
_uuid = test_api_v2._uuid
return self.objh[id]
-class ExtensionExtendedAttributeTestCase(base.BaseTestCase,
- testlib_plugin.PluginSetupHelper):
+class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
def setUp(self):
super(ExtensionExtendedAttributeTestCase, self).setUp()
plugin = (
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
class ProviderExtensionManager(object):
return pnet.get_extended_resources(version)
-class ProvidernetExtensionTestCase(testlib_api.WebTestCase,
- testlib_plugin.PluginSetupHelper):
+class ProvidernetExtensionTestCase(testlib_api.WebTestCase):
fmt = 'json'
def setUp(self):
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_api_v2_extension
from neutron.tests.unit import test_db_plugin
-from neutron.tests.unit import testlib_plugin
LOG = logging.getLogger(__name__)
self._test_notify_op_agent(self._test_floatingips_op_agent)
-class L3BaseForIntTests(test_db_plugin.NeutronDbPluginV2TestCase,
- testlib_plugin.NotificationSetupHelper):
+class L3BaseForIntTests(test_db_plugin.NeutronDbPluginV2TestCase):
mock_rescheduling = True
self.setup_notification_driver()
-class L3BaseForSepTests(test_db_plugin.NeutronDbPluginV2TestCase,
- testlib_plugin.NotificationSetupHelper):
+class L3BaseForSepTests(test_db_plugin.NeutronDbPluginV2TestCase):
def setUp(self, plugin=None, ext_mgr=None):
# the plugin without L3 support
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
# the below code is required for the following reason
# (as documented in testscenarios)
pass
-class L3DvrSchedulerTestCase(testlib_api.SqlTestCase,
- testlib_plugin.PluginSetupHelper):
+class L3DvrSchedulerTestCase(testlib_api.SqlTestCase):
def setUp(self):
plugin = 'neutron.plugins.ml2.plugin.Ml2Plugin'
class L3HATestCaseMixin(testlib_api.SqlTestCase,
- L3SchedulerBaseMixin,
- testlib_plugin.PluginSetupHelper):
+ L3SchedulerBaseMixin):
def setUp(self):
super(L3HATestCaseMixin, self).setUp()
class TestGetL3AgentsWithAgentModeFilter(testlib_api.SqlTestCase,
- testlib_plugin.PluginSetupHelper,
L3SchedulerBaseMixin):
"""Test cases to test get_l3_agents.
from neutron.plugins.common import constants
from neutron.tests import base
from neutron.tests.unit import dummy_plugin
-from neutron.tests.unit import testlib_plugin
LOG = logging.getLogger(__name__)
'dhcp': 'dhcp_agent_notifier'}
-class NeutronManagerTestCase(base.BaseTestCase,
- testlib_plugin.PluginSetupHelper):
+class NeutronManagerTestCase(base.BaseTestCase):
def setUp(self):
super(NeutronManagerTestCase, self).setUp()
from neutron.tests import base
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
TARGET_PLUGIN = 'neutron.plugins.ml2.plugin.Ml2Plugin'
_get_path = test_api_v2._get_path
-class QuotaExtensionTestCase(testlib_api.WebTestCase,
- testlib_plugin.PluginSetupHelper):
+class QuotaExtensionTestCase(testlib_api.WebTestCase):
def setUp(self):
super(QuotaExtensionTestCase, self).setUp()
from oslo_messaging import conffixture as messaging_conffixture
from neutron.common import rpc
-from neutron.tests import sub_base
+from neutron.tests import base
CONF = cfg.CONF
CONF.import_opt('state_path', 'neutron.common.config')
-class ServiceTestCase(sub_base.SubBaseTestCase):
+class ServiceTestCase(base.DietTestCase):
# the class cannot be based on BaseTestCase since it mocks rpc.Connection
def setUp(self):
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
-from neutron.tests.unit import testlib_plugin
DEFAULT_SERVICE_DEFS = [{'service_class': constants.DUMMY,
return []
-class ServiceTypeExtensionTestCaseBase(testlib_api.WebTestCase,
- testlib_plugin.PluginSetupHelper):
+class ServiceTypeExtensionTestCaseBase(testlib_api.WebTestCase):
fmt = 'json'
def setUp(self):
+++ /dev/null
-# Copyright 2014 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import gc
-import weakref
-
-import mock
-from oslo_config import cfg
-
-from neutron.db import agentschedulers_db
-from neutron import manager
-from neutron.tests import base
-from neutron.tests import fake_notifier
-
-
-class PluginSetupHelper(object):
- """Mixin for use with testtools.TestCase."""
-
- def cleanup_core_plugin(self):
- """Ensure that the core plugin is deallocated."""
- nm = manager.NeutronManager
- if not nm.has_instance():
- return
-
- # TODO(marun) Fix plugins that do not properly initialize notifiers
- agentschedulers_db.AgentSchedulerDbMixin.agent_notifiers = {}
-
- # Perform a check for deallocation only if explicitly
- # configured to do so since calling gc.collect() after every
- # test increases test suite execution time by ~50%.
- check_plugin_deallocation = (
- base.bool_from_env('OS_CHECK_PLUGIN_DEALLOCATION'))
- if check_plugin_deallocation:
- plugin = weakref.ref(nm._instance.plugin)
-
- nm.clear_instance()
-
- if check_plugin_deallocation:
- gc.collect()
-
- # TODO(marun) Ensure that mocks are deallocated?
- if plugin() and not isinstance(plugin(), mock.Base):
- self.fail('The plugin for this test was not deallocated.')
-
- def setup_coreplugin(self, core_plugin=None):
- self.dhcp_periodic_p = mock.patch(
- 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
- 'start_periodic_dhcp_agent_status_check')
- self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
- # Plugin cleanup should be triggered last so that
- # test-specific cleanup has a chance to release references.
- self.addCleanup(self.cleanup_core_plugin)
- if core_plugin is not None:
- cfg.CONF.set_override('core_plugin', core_plugin)
-
-
-class NotificationSetupHelper(object):
- """Mixin for use with testtools.TestCase."""
-
- def setup_notification_driver(self, notification_driver=None):
- self.addCleanup(fake_notifier.reset)
- if notification_driver is None:
- notification_driver = [fake_notifier.__name__]
- cfg.CONF.set_override("notification_driver", notification_driver)