# under the License.
import random
+import weakref
import netaddr
from oslo.config import cfg
model_hooks[name] = {'query': query_hook, 'filter': filter_hook,
'result_filters': result_filters}
+ @property
+ def safe_reference(self):
+ """Return a weakref to the instance.
+
+ Minimize the potential for the instance persisting
+ unnecessarily in memory by returning a weakref proxy that
+ won't prevent deallocation.
+ """
+ return weakref.proxy(self)
+
def _model_query(self, context, model):
query = context.session.query(model)
# define basic filter condition for model query
# License for the specific language governing permissions and limitations
# under the License.
+import weakref
+
from oslo.config import cfg
from neutron.common import utils
@classmethod
@utils.synchronized("manager")
def _create_instance(cls):
- if cls._instance is None:
+ if not cls.has_instance():
cls._instance = cls()
+ @classmethod
+ def has_instance(cls):
+ return cls._instance is not None
+
+ @classmethod
+ def clear_instance(cls):
+ cls._instance = None
+
@classmethod
def get_instance(cls):
# double checked locking
- if cls._instance is None:
+ if not cls.has_instance():
cls._create_instance()
return cls._instance
@classmethod
def get_plugin(cls):
- return cls.get_instance().plugin
+ # Return a weakref to minimize gc-preventing references.
+ return weakref.proxy(cls.get_instance().plugin)
@classmethod
def get_service_plugins(cls):
- return cls.get_instance().service_plugins
+ # Return weakrefs to minimize gc-preventing references.
+ return dict((x, weakref.proxy(y))
+ for x, y in cls.get_instance().service_plugins.iteritems())
def __init__(self):
super(NECPluginV2, self).__init__()
- self.ofc = ofc_manager.OFCManager(self)
+ self.ofc = ofc_manager.OFCManager(self.safe_reference)
self.base_binding_dict = self._get_base_binding_dict()
portbindings_base.register_port_dict_function()
config.CONF.router_scheduler_driver
)
- nec_router.load_driver(self, self.ofc)
+ nec_router.load_driver(self.safe_reference, self.ofc)
self.port_handlers = {
'create': {
const.DEVICE_OWNER_ROUTER_GW: self.create_router_port,
# NOTE: callback_sg is referred to from the sg unit test.
self.callback_sg = SecurityGroupServerRpcCallback()
- callbacks = [NECPluginV2RPCCallbacks(self),
+ callbacks = [NECPluginV2RPCCallbacks(self.safe_reference),
DhcpRpcCallback(),
L3RpcCallback(),
self.callback_sg,
self.tun_client.create_tunnel_key(net_id, tunnel_key)
def _client_delete_network(self, net_id):
+ RyuNeutronPluginV2._safe_client_delete_network(self.safe_reference,
+ net_id)
+
+ @staticmethod
+ def _safe_client_delete_network(safe_reference, net_id):
+ # Avoid handing naked plugin references to the client. When
+ # the client is mocked for testing, such references can
+ # prevent the plugin from being deallocated.
client.ignore_http_not_found(
- lambda: self.client.delete_network(net_id))
+ lambda: safe_reference.client.delete_network(net_id))
client.ignore_http_not_found(
- lambda: self.tun_client.delete_tunnel_key(net_id))
+ lambda: safe_reference.tun_client.delete_tunnel_key(net_id))
def create_network(self, context, network):
session = context.session
"""Base Test Case for all Unit Tests"""
import contextlib
+import gc
import logging
import os
import os.path
import sys
+import weakref
import eventlet.timeout
import fixtures
import testtools
from neutron.common import config
-from neutron.common import constants as const
+from neutron.db import agentschedulers_db
from neutron import manager
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common.notifier import test_notifier
class BaseTestCase(testtools.TestCase):
- def _cleanup_coreplugin(self):
- if manager.NeutronManager._instance:
- agent_notifiers = getattr(manager.NeutronManager._instance.plugin,
- 'agent_notifiers', {})
- dhcp_agent_notifier = agent_notifiers.get(const.AGENT_TYPE_DHCP)
- if dhcp_agent_notifier:
- dhcp_agent_notifier._plugin = None
- manager.NeutronManager._instance = self._saved_instance
+ def cleanup_core_plugin(self):
+ """Ensure that the core plugin is deallocated."""
+ nm = manager.NeutronManager
+ if not nm.has_instance():
+ return
+
+ #TODO(marun) Fix plugins that do not properly initialize notifiers
+ agentschedulers_db.AgentSchedulerDbMixin.agent_notifiers = {}
+
+ plugin = weakref.ref(nm._instance.plugin)
+ nm.clear_instance()
+ gc.collect()
+
+ #TODO(marun) Ensure that mocks are deallocated?
+ if plugin() and not isinstance(plugin(), mock.Base):
+ self.fail('The plugin for this test was not deallocated.')
def setup_coreplugin(self, core_plugin=None):
- self._saved_instance = manager.NeutronManager._instance
- self.addCleanup(self._cleanup_coreplugin)
- manager.NeutronManager._instance = None
if core_plugin is not None:
cfg.CONF.set_override('core_plugin', core_plugin)
def setUp(self):
super(BaseTestCase, self).setUp()
+
+ # Ensure plugin cleanup is triggered last so that
+ # test-specific cleanup has a chance to release references.
+ self.addCleanup(self.cleanup_core_plugin)
+
self.addCleanup(self._cleanup_rpc_backend)
# Configure this first to ensure pm debugging support for setUp()
from neutron.plugins.ml2 import config as config
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import mechanism_odl
+from neutron.plugins.ml2 import plugin
+from neutron.tests import base
from neutron.tests.unit import test_db_plugin as test_plugin
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
self.assertFalse(self.mech.check_segment(self.segment))
-class OpenDayLightMechanismConfigTests(test_plugin.NeutronDbPluginV2TestCase):
+class OpenDayLightMechanismConfigTests(base.BaseTestCase):
- def _setUp(self):
+ def _set_config(self, url='http://127.0.0.1:9999', username='someuser',
+ password='somepass'):
config.cfg.CONF.set_override('mechanism_drivers',
['logger', 'opendaylight'],
'ml2')
- config.cfg.CONF.set_override('url', 'http://127.0.0.1:9999', 'ml2_odl')
- config.cfg.CONF.set_override('username', 'someuser', 'ml2_odl')
- config.cfg.CONF.set_override('password', 'somepass', 'ml2_odl')
+ config.cfg.CONF.set_override('url', url, 'ml2_odl')
+ config.cfg.CONF.set_override('username', username, 'ml2_odl')
+ config.cfg.CONF.set_override('password', password, 'ml2_odl')
+
+ def _test_missing_config(self, **kwargs):
+ self._set_config(**kwargs)
+ self.assertRaises(config.cfg.RequiredOptError,
+ plugin.Ml2Plugin)
+
+ def test_valid_config(self):
+ self._set_config()
+ plugin.Ml2Plugin()
- def test_url_required(self):
- self._setUp()
- config.cfg.CONF.set_override('url', None, 'ml2_odl')
- self.assertRaises(config.cfg.RequiredOptError, self.setUp, PLUGIN_NAME)
+ def test_missing_url_raises_exception(self):
+ self._test_missing_config(url=None)
- def test_username_required(self):
- self._setUp()
- config.cfg.CONF.set_override('username', None, 'ml2_odl')
- self.assertRaises(config.cfg.RequiredOptError, self.setUp, PLUGIN_NAME)
+ def test_missing_username_raises_exception(self):
+ self._test_missing_config(username=None)
- def test_password_required(self):
- self._setUp()
- config.cfg.CONF.set_override('password', None, 'ml2_odl')
- self.assertRaises(config.cfg.RequiredOptError, self.setUp, PLUGIN_NAME)
+ def test_missing_password_raises_exception(self):
+ self._test_missing_config(password=None)
class OpenDaylightMechanismTestBasicGet(test_plugin.TestBasicGet,
self.context = context.get_admin_context()
-class TestMl2BulkToggle(Ml2PluginV2TestCase):
+class TestMl2BulkToggleWithBulkless(Ml2PluginV2TestCase):
+
+ _mechanism_drivers = ['logger', 'test', 'bulkless']
def test_bulk_disable_with_bulkless_driver(self):
- self.tearDown()
- self._mechanism_drivers = ['logger', 'test', 'bulkless']
- self.setUp()
self.assertTrue(self._skip_native_bulk)
+
+class TestMl2BulkToggleWithoutBulkless(Ml2PluginV2TestCase):
+
+ _mechanism_drivers = ['logger', 'test']
+
def test_bulk_enabled_with_bulk_drivers(self):
- self.tearDown()
- self._mechanism_drivers = ['logger', 'test']
- self.setUp()
self.assertFalse(self._skip_native_bulk)
self.skipTest("Plugin does not support native bulk port create")
ctx = context.get_admin_context()
with self.network() as net:
- orig = manager.NeutronManager._instance.plugin.create_port
- with mock.patch.object(manager.NeutronManager._instance.plugin,
- 'create_port') as patched_plugin:
+ plugin = manager.NeutronManager.get_plugin()
+ orig = plugin.create_port
+ with mock.patch.object(plugin, 'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
def test_create_subnets_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")
- orig = manager.NeutronManager._instance.plugin.create_subnet
- with mock.patch.object(manager.NeutronManager._instance.plugin,
- 'create_subnet') as patched_plugin:
+ plugin = manager.NeutronManager.get_plugin()
+ orig = plugin.create_subnet
+ with mock.patch.object(plugin, 'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)