from oslo.config import cfg
from oslo.db.sqlalchemy import session
-import sqlalchemy as sql
-
-from neutron.db import model_base
-from neutron.openstack.common import log as logging
-
-LOG = logging.getLogger(__name__)
-
-BASE = model_base.BASEV2
_FACADE = None
return _FACADE
-def configure_db():
- """Configure database.
-
- Establish the database, create an engine if needed, and register
- the models.
- """
- register_models()
-
-
-def clear_db(base=BASE):
- unregister_models(base)
-
-
def get_engine():
"""Helper method to grab engine."""
facade = _create_facade_lazily()
facade = _create_facade_lazily()
return facade.get_session(autocommit=autocommit,
expire_on_commit=expire_on_commit)
-
-
-def register_models(base=BASE):
- """Register Models and create properties."""
- try:
- facade = _create_facade_lazily()
- engine = facade.get_engine()
- base.metadata.create_all(engine)
- except sql.exc.OperationalError as e:
- LOG.info(_("Database registration exception: %s"), e)
- return False
- return True
-
-
-def unregister_models(base=BASE):
- """Unregister Models, useful clearing out data before testing."""
- try:
- facade = _create_facade_lazily()
- engine = facade.get_engine()
- base.metadata.drop_all(engine)
- except Exception:
- LOG.exception(_("Database exception"))
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron import context as ctx
-from neutron.db import api as db
from neutron.db import common_db_mixin
from neutron.db import models_v2
from neutron.db import sqlalchemyutils
__native_sorting_support = True
def __init__(self):
- db.configure_db()
if cfg.CONF.notify_nova_on_port_status_changes:
from neutron.notifiers import nova
# NOTE(arosen) These event listeners are here to hook into when
from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api
from neutron.common import constants
-from neutron.db import api as dbapi
from neutron.db import common_db_mixin as base_db
from neutron.db import l3_db
from neutron.db import model_base
base_db.CommonDbMixin):
def __init__(self):
- dbapi.register_models()
-
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
def _make_metering_label_dict(self, metering_label, fields=None):
)
-def upgrade_quota(options=None):
- if not (options or {}).get('folsom_quota_db_enabled'):
- return
-
+def upgrade_quota():
op.create_table(
'quotas',
sa.Column('id', sa.String(length=36), nullable=False),
op.drop_table(table)
-def downgrade_quota(options=None):
- if (options or {}).get('folsom_quota_db_enabled'):
- op.drop_table('quotas')
+def downgrade_quota():
+ op.drop_table('quotas')
MYSQL_ENGINE = None
-DATABASE_QUOTA_DRIVER = 'neutron.extensions._quotav2_driver.DbQuotaDriver'
-
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
def build_options():
- return {'folsom_quota_db_enabled': is_db_quota_enabled()}
-
-
-def is_db_quota_enabled():
- return neutron_config.QUOTAS.quota_driver == DATABASE_QUOTA_DRIVER
+ return
if context.is_offline_mode():
common_ext_ops.upgrade_l3()
if migration.should_run(active_plugins, FOLSOM_QUOTA):
- common_ext_ops.upgrade_quota(options)
+ common_ext_ops.upgrade_quota()
if PLUGINS['lbr'] in active_plugins:
upgrade_linuxbridge()
downgrade_linuxbridge()
if migration.should_run(active_plugins, FOLSOM_QUOTA):
- common_ext_ops.downgrade_quota(options)
+ common_ext_ops.downgrade_quota()
if migration.should_run(active_plugins, L3_CAPABLE):
common_ext_ops.downgrade_l3()
import sqlalchemy as sa
-from neutron.db import api as db
from neutron.db import model_base
-from neutron.db import models_v2
from neutron.openstack.common import log as logging
from neutron.services import provider_configuration as pconf
return cls._instance
def __init__(self):
- self._initialize_db()
self._load_conf()
- def _initialize_db(self):
- db.configure_db()
- db.register_models(models_v2.model_base.BASEV2)
-
def _load_conf(self):
self.conf = pconf.ProviderConfiguration(
pconf.parse_service_provider_opt())
from sqlalchemy.orm import exc
from neutron.common import constants as n_constants
-from neutron.db import api as qdbapi
from neutron.db import common_db_mixin as base_db
from neutron.db import l3_agentschedulers_db as l3_agent_db
from neutron.db import l3_db
class VPNPluginDb(vpnaas.VPNPluginBase, base_db.CommonDbMixin):
"""VPN plugin database class using SQLAlchemy models."""
- def __init__(self):
- """Do the initialization for the vpn service plugin here."""
- qdbapi.register_models()
-
def _get_validator(self):
"""Obtain validator to use for attribute validation.
from neutron.plugins.cisco.common import cisco_constants as const
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.db import network_models_v2
-# Do NOT remove this import. It is required for all the models to be seen
-# by db.initialize() when called from VirtualPhysicalSwitchModelV2.__init__.
-from neutron.plugins.cisco.db import nexus_models_v2 # noqa
from neutron.plugins.openvswitch import ovs_models_v2
import sys
from neutron.api.v2 import attributes
-from neutron.db import api as db_api
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
from neutron import neutron_plugin_base_v2
self.supported_extension_aliases.extend(
self._plugins[const.VSWITCH_PLUGIN].
supported_extension_aliases)
- # At this point, all the database models should have been loaded. It's
- # possible that configure_db() may have been called by one of the
- # plugins loaded in above. Otherwise, this call is to make sure that
- # the database is initialized
- db_api.configure_db()
# Initialize credential store after database initialization
cred.Store.initialize()
class HyperVPluginDB(object):
- def initialize(self):
- db_api.configure_db()
def reserve_vlan(self, session):
with session.begin(subtransactions=True):
def __init__(self, configfile=None):
self._db = hyperv_db.HyperVPluginDB()
- self._db.initialize()
self.base_binding_dict = {
portbindings.VIF_TYPE: portbindings.VIF_TYPE_HYPERV}
portbindings_base.register_port_dict_function()
from neutron.common import exceptions as exc
from neutron.common import topics
from neutron import context as neutron_context
-from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
from neutron.db import extraroute_db
cfg._is_opt_registered = _is_opt_registered
- # Keep existing tables if multiple plugin use same table name.
- db.model_base.NeutronBase.__table_args__ = {'keep_existing': True}
-
self.plugins = {}
plugin_list = [plugin_set.split(':')
import sqlalchemy as sa
from neutron.db import api as db
-from neutron.db import model_base
from neutron.db import models_v2
from neutron.db import securitygroups_db as sg_db
from neutron.extensions import securitygroup as ext_sg
return resource_map[resource]
-def clear_db(base=model_base.BASEV2):
- db.clear_db(base)
-
-
def get_ofc_item(session, resource, neutron_id):
model = _get_resource_model(resource)
if not model:
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron import context as neutron_context
-from neutron.db import api as qdbapi
from neutron.db.firewall import firewall_db
from neutron.extensions import firewall as fw_ext
from neutron.openstack.common import log as logging
def __init__(self):
"""Do the initialization for the firewall service plugin here."""
- qdbapi.register_models()
self.endpoints = [FirewallCallbacks(self)]
#
# @author: Arvind Somya (asomya@cisco.com), Cisco Systems Inc.
-from neutron.db import api as qdbapi
from neutron.db import db_base_plugin_v2
from neutron.db import extraroute_db
from neutron.db import l3_gwmode_db
-from neutron.db import model_base
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
def __init__(self):
super(ApicL3ServicePlugin, self).__init__()
- qdbapi.register_models(base=model_base.BASEV2)
self.manager = apic_manager.APICManager()
@staticmethod
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
-from neutron.db import api as qdbapi
from neutron.db import common_db_mixin
from neutron.db import extraroute_db
from neutron.db import l3_dvr_db
from neutron.db import l3_dvrscheduler_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
-from neutron.db import model_base
from neutron.openstack.common import importutils
from neutron.plugins.common import constants
"extraroute", "l3_agent_scheduler"]
def __init__(self):
- qdbapi.register_models(base=model_base.BASEV2)
self.setup_rpc()
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
-import neutron.db.api as db
from neutron.db import models_v2 as nmodel
from neutron.services.loadbalancer.drivers.embrane import models
-def initialize():
- db.configure_db()
-
-
def add_pool_port(context, pool_id, port_id):
session = context.session
with session.begin(subtransactions=True):
class EmbraneLbaas(abstract_driver.LoadBalancerAbstractDriver):
def __init__(self, plugin):
- edb.initialize()
config_esm_mgmt = get_conf('esm_mgmt')
config_admin_username = get_conf('admin_username')
config_admin_password = get_conf('admin_password')
from neutron.api.v2 import attributes as attrs
from neutron.common import exceptions as n_exc
from neutron import context
-from neutron.db import api as qdbapi
from neutron.db.loadbalancer import loadbalancer_db as ldb
from neutron.db import servicetype_db as st_db
from neutron.extensions import loadbalancer
def __init__(self):
"""Initialization for the loadbalancer service plugin."""
- qdbapi.register_models()
self.service_type_manager = st_db.ServiceTypeManager.get_instance()
self._load_drivers()
from oslo.config import cfg
import neutron.common.test_lib as test_lib
-from neutron.db import api as db
from neutron.plugins.bigswitch import config
from neutron.tests.unit.bigswitch import fake_server
-# REVISIT(kevinbenton): This needs to be imported here to create the
-# portbindings table since it's not imported until function call time
-# in the porttracker_db module, which will cause unit test failures when
-# the unit tests are being run by testtools
-from neutron.db import portbindings_db # noqa
RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None)
# prevent the consistency watchdog from starting
self.watch_p = mock.patch(CWATCH, new=lambda *args, **kwargs: None)
- self.addCleanup(db.clear_db)
self.plugin_notifier_p.start()
self.spawn_p.start()
self.watch_p.start()
from neutron.db import api as db
from neutron.openstack.common import context
from neutron.plugins.brocade import vlanbm as vlan_bitmap
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class TestVlanBitmap(base.BaseTestCase):
+class TestVlanBitmap(testlib_api.SqlTestCase):
"""exercise Vlan bitmap ."""
def setUp(self):
super(TestVlanBitmap, self).setUp()
- db.configure_db()
- self.addCleanup(db.clear_db)
self.context = context.get_admin_context()
self.context.session = db.get_session()
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.db import n1kv_db_v2
from neutron.plugins.cisco.db import n1kv_models_v2
-from neutron.tests import base
from neutron.tests.unit import test_db_plugin as test_plugin
+from neutron.tests.unit import testlib_api
PHYS_NET = 'physnet1'
return _profile
-class VlanAllocationsTest(base.BaseTestCase):
+class VlanAllocationsTest(testlib_api.SqlTestCase):
def setUp(self):
super(VlanAllocationsTest, self).setUp()
- db.configure_db()
self.session = db.get_session()
self.net_p = _create_test_network_profile_if_not_there(self.session)
n1kv_db_v2.sync_vlan_allocations(self.session, self.net_p)
- self.addCleanup(db.clear_db)
def test_sync_vlan_allocations_outside_segment_range(self):
self.assertRaises(c_exc.VlanIDNotFound,
vlan_id)
-class VxlanAllocationsTest(base.BaseTestCase,
+class VxlanAllocationsTest(testlib_api.SqlTestCase,
n1kv_db_v2.NetworkProfile_db_mixin):
def setUp(self):
super(VxlanAllocationsTest, self).setUp()
- db.configure_db()
self.session = db.get_session()
self.net_p = _create_test_network_profile_if_not_there(
self.session, TEST_NETWORK_PROFILE_VXLAN)
n1kv_db_v2.sync_vxlan_allocations(self.session, self.net_p)
- self.addCleanup(db.clear_db)
def test_sync_vxlan_allocations_outside_segment_range(self):
self.assertRaises(c_exc.VxlanIDNotFound,
def setUp(self):
super(NetworkBindingsTest, self).setUp()
- db.configure_db()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_add_network_binding(self):
with self.network() as network:
self.assertEqual(t_members, [])
-class NetworkProfileTests(base.BaseTestCase,
+class NetworkProfileTests(testlib_api.SqlTestCase,
n1kv_db_v2.NetworkProfile_db_mixin):
def setUp(self):
super(NetworkProfileTests, self).setUp()
- db.configure_db()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_create_network_profile(self):
_db_profile = n1kv_db_v2.create_network_profile(self.session,
self.assertEqual(len(test_profiles), len(list(profiles)))
-class PolicyProfileTests(base.BaseTestCase):
+class PolicyProfileTests(testlib_api.SqlTestCase):
def setUp(self):
super(PolicyProfileTests, self).setUp()
- db.configure_db()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_create_policy_profile(self):
_db_profile = n1kv_db_v2.create_policy_profile(TEST_POLICY_PROFILE)
self.assertEqual(profile.name, got_profile.name)
-class ProfileBindingTests(base.BaseTestCase,
+class ProfileBindingTests(testlib_api.SqlTestCase,
n1kv_db_v2.NetworkProfile_db_mixin,
common_db_mixin.CommonDbMixin):
def setUp(self):
super(ProfileBindingTests, self).setUp()
- db.configure_db()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def _create_test_binding_if_not_there(self, tenant_id, profile_id,
profile_type):
attributes.RESOURCE_ATTRIBUTE_MAP["ports"].update(
n1kv.EXTENDED_ATTRIBUTES_2_0["ports"])
self.addCleanup(self.restore_resource_attribute_map)
- self.addCleanup(db.clear_db)
super(N1kvPluginTestCase, self).setUp(self._plugin_name,
ext_mgr=ext_mgr)
# Create some of the database entries that we require.
import mock
import testtools
-from neutron.db import api as db
from neutron.plugins.cisco.common import cisco_constants
from neutron.plugins.cisco.common import cisco_credentials_v2
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.common import config as config
from neutron.plugins.cisco.db import network_db_v2 as cdb
from neutron.plugins.cisco import network_plugin
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class CiscoNetworkDbTest(base.BaseTestCase):
+class CiscoNetworkDbTest(testlib_api.SqlTestCase):
"""Base class for Cisco network database unit tests."""
def setUp(self):
super(CiscoNetworkDbTest, self).setUp()
- db.configure_db()
# The Cisco network plugin includes a thin layer of QoS and
# credential API methods which indirectly call Cisco QoS and
'__init__', new=new_network_plugin_init):
self._network_plugin = network_plugin.PluginV2()
- self.addCleanup(db.clear_db)
-
class CiscoNetworkQosDbTest(CiscoNetworkDbTest):
cdb.get_credential, cred_n1kv_2_id)
-class CiscoCredentialStoreTest(base.BaseTestCase):
+class CiscoCredentialStoreTest(testlib_api.SqlTestCase):
"""Cisco Credential Store unit tests."""
- def setUp(self):
- super(CiscoCredentialStoreTest, self).setUp()
- db.configure_db()
- self.addCleanup(db.clear_db)
-
def test_cred_store_init_duplicate_creds_ignored(self):
"""Check that with multi store instances, dup creds are ignored."""
# Create a device dictionary containing credentials for 1 switch.
from neutron.plugins.cisco.common import config
from neutron.plugins.cisco.db import nexus_db_v2 as nxdb
from neutron.plugins.cisco.nexus import cisco_nexus_plugin_v2
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class CiscoNexusDbTest(base.BaseTestCase):
+class CiscoNexusDbTest(testlib_api.SqlTestCase):
"""Unit tests for cisco.db.nexus_models_v2.NexusPortBinding model."""
def setUp(self):
super(CiscoNexusDbTest, self).setUp()
- db.configure_db()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def _npb_test_obj(self, pnum, vnum, switch=None, instance=None):
"""Create a Nexus port binding test object from a pair of numbers."""
from oslo.config import cfg
-from neutron.db import api as db
from neutron.extensions import providernet as provider
from neutron.openstack.common import importutils
from neutron.plugins.cisco.common import cisco_constants as const
from neutron.plugins.cisco.common import config as cisco_config
from neutron.plugins.cisco.db import network_db_v2 as cdb
from neutron.plugins.cisco.nexus import cisco_nexus_plugin_v2
-from neutron.tests import base
-
+from neutron.tests.unit import testlib_api
NEXUS_IP_ADDRESS = '1.1.1.1'
HOSTNAME1 = 'testhost1'
const.NET_VLAN_ID]
-class TestCiscoNexusPlugin(base.BaseTestCase):
+class TestCiscoNexusPlugin(testlib_api.SqlTestCase):
def setUp(self):
"""Set up function."""
'password': 'password'
},
}
- db.configure_db()
- self.addCleanup(db.clear_db)
# Use a mock netconf client
self.mock_ncclient = mock.Mock()
from neutron.plugins.cisco.common import config as cisco_config
from neutron.plugins.cisco.models import virt_phy_sw_v2
from neutron.plugins.cisco.nexus import cisco_nexus_plugin_v2
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class TestCiscoPluginModel(base.BaseTestCase):
+class TestCiscoPluginModel(testlib_api.SqlTestCase):
def setUp(self):
# Point config file to: neutron/tests/etc/neutron.conf.test
from neutron.common import config
from neutron.common import exceptions as n_exc
from neutron import context
-import neutron.db.l3_db # noqa
from neutron.db.loadbalancer import loadbalancer_db as ldb
from neutron.db import servicetype_db as sdb
import neutron.extensions
from neutron import context
from neutron.db import agents_db
-from neutron.db import api as db
from neutron.db import db_base_plugin_v2 as base_plugin
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
class FakePlugin(base_plugin.NeutronDbPluginV2, agents_db.AgentDbMixin):
"""A fake plugin class containing all DB methods."""
-class TestAgentsDbMixin(base.BaseTestCase):
+class TestAgentsDbMixin(testlib_api.SqlTestCase):
def setUp(self):
super(TestAgentsDbMixin, self).setUp()
self.context = context.get_admin_context()
self.plugin = FakePlugin()
- self.addCleanup(db.clear_db)
self.agent_status = {
'agent_type': 'Open vSwitch agent',
from oslo.config import cfg
from neutron import context
-from neutron.db import api as db
from neutron.db import dvr_mac_db
from neutron.extensions import dvr
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
class DVRDbMixinImpl(dvr_mac_db.DVRDbMixin):
self.notifier = notifier
-class DvrDbMixinTestCase(base.BaseTestCase):
+class DvrDbMixinTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(DvrDbMixinTestCase, self).setUp()
- db.configure_db()
self.ctx = context.get_admin_context()
- self.addCleanup(db.clear_db)
self.mixin = DVRDbMixinImpl(mock.Mock())
def _create_dvr_mac_entry(self, host, mac_address):
from neutron.common import constants as l3_const
from neutron import context
-from neutron.db import api as db
from neutron.db import l3_dvr_db
from neutron import manager
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class L3DvrTestCase(base.BaseTestCase):
+class L3DvrTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(L3DvrTestCase, self).setUp()
- db.configure_db()
self.ctx = context.get_admin_context()
- self.addCleanup(db.clear_db)
self.mixin = l3_dvr_db.L3_NAT_with_dvr_db_mixin()
def _create_router(self, router):
from neutron.common import exceptions
from neutron import context
-from neutron.db import api as db
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.db import quota_db
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
class FakePlugin(base_plugin.NeutronDbPluginV2, quota_db.DbQuotaDriver):
RESOURCE = 'res_test'
-class TestDbQuotaDriver(base.BaseTestCase):
+class TestDbQuotaDriver(testlib_api.SqlTestCase):
def setUp(self):
super(TestDbQuotaDriver, self).setUp()
self.plugin = FakePlugin()
self.context = context.get_admin_context()
- self.addCleanup(db.clear_db)
def test_create_quota_limit(self):
defaults = {RESOURCE: TestResource(RESOURCE, 4)}
from oslo.config import cfg
-from neutron.db import api as db
from neutron.plugins.embrane.common import config # noqa
from neutron.tests.unit import test_extension_extraroute as extraroute_test
from neutron.tests.unit import test_l3_plugin as router_test
def setUp(self):
cfg.CONF.set_override('admin_password', "admin123", 'heleos')
- self.addCleanup(db.clear_db)
super(TestEmbraneL3NatDBTestCase, self).setUp()
import mock
from oslo.config import cfg
-from neutron.db import api as db
from neutron.plugins.embrane.common import config # noqa
from neutron.tests.unit import test_db_plugin as test_plugin
cfg.CONF.set_override('admin_password', "admin123", 'heleos')
p = mock.patch.dict(sys.modules, {'heleosapi': mock.Mock()})
p.start()
- self.addCleanup(db.clear_db)
# dict patches must be explicitly stopped
self.addCleanup(p.stop)
super(EmbranePluginV2TestCase, self).setUp(self._plugin_name)
from neutron.common import exceptions as n_exc
from neutron.db import api as db
from neutron.plugins.linuxbridge.db import l2network_db_v2 as lb_db
-from neutron.tests import base
from neutron.tests.unit import test_db_plugin as test_plugin
+from neutron.tests.unit import testlib_api
PHYS_NET = 'physnet1'
PHYS_NET_2 = 'physnet2'
'lb_neutron_plugin.LinuxBridgePluginV2')
-class NetworkStatesTest(base.BaseTestCase):
+class NetworkStatesTest(testlib_api.SqlTestCase):
def setUp(self):
super(NetworkStatesTest, self).setUp()
- db.configure_db()
lb_db.sync_network_states(VLAN_RANGES)
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_sync_network_states(self):
self.assertIsNone(lb_db.get_network_state(PHYS_NET,
cfg.CONF.set_override('network_vlan_ranges', ['physnet1:1000:2999'],
group='VLANS')
super(NetworkBindingsTest, self).setUp(plugin=PLUGIN_NAME)
- db.configure_db()
self.session = db.get_session()
def test_add_network_binding(self):
from neutron.common import exceptions as exc
from neutron.common import topics
from neutron import context
-from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import models_v2
from neutron.extensions import flavor as ext_flavor
from neutron.openstack.common import uuidutils
from neutron.plugins.metaplugin import meta_neutron_plugin
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
CONF_FILE = ""
META_PATH = "neutron.plugins.metaplugin"
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
#TODO(nati) remove this after subnet quota change is merged
cfg.CONF.set_override('max_dns_nameservers', 10)
- cfg.CONF.set_override('rpc_backend',
- 'neutron.openstack.common.rpc.impl_fake')
# Hooks registered by metaplugin must not exist for other plugins UT.
models_v2.Port, 'metaplugin_port', None, None, None)
-class MetaNeutronPluginV2Test(base.BaseTestCase):
+class MetaNeutronPluginV2Test(testlib_api.SqlTestCase):
"""Class conisting of MetaNeutronPluginV2 unit tests."""
has_l3 = True
def setUp(self):
super(MetaNeutronPluginV2Test, self).setUp()
- db._ENGINE = None
- db._MAKER = None
self.fake_tenant_id = uuidutils.generate_uuid()
self.context = context.get_admin_context()
- db.configure_db()
- self.addCleanup(db.clear_db)
self.addCleanup(unregister_meta_hooks)
setup_metaplugin_conf(self.has_l3)
self.skipTest("Test case without router")
-class MetaNeutronPluginV2TestRpcFlavor(base.BaseTestCase):
+class MetaNeutronPluginV2TestRpcFlavor(testlib_api.SqlTestCase):
"""Tests for rpc_flavor."""
def setUp(self):
super(MetaNeutronPluginV2TestRpcFlavor, self).setUp()
- db._ENGINE = None
- db._MAKER = None
- db.configure_db()
- self.addCleanup(db.clear_db)
self.addCleanup(unregister_meta_hooks)
def test_rpc_flavor(self):
from sqlalchemy.orm import query
from neutron import context
-from neutron.db import api as db_api
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.extensions import portbindings
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2 import models as ml2_models
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class Ml2DBTestCase(base.BaseTestCase):
+class Ml2DBTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(Ml2DBTestCase, self).setUp()
- db_api.configure_db()
self.ctx = context.get_admin_context()
- self.addCleanup(db_api.clear_db)
def _setup_neutron_network(self, network_id, port_ids):
with self.ctx.session.begin(subtransactions=True):
from oslo.config import cfg
from neutron.common import constants as n_const
-import neutron.db.api as ndb
from neutron.extensions import portbindings
from neutron.plugins.ml2.drivers.arista import db
from neutron.plugins.ml2.drivers.arista import exceptions as arista_exc
from neutron.plugins.ml2.drivers.arista import mechanism_arista as arista
from neutron.tests import base
+from neutron.tests.unit import testlib_api
def setup_arista_wrapper_config(value=''):
setup_arista_wrapper_config('value')
-class AristaProvisionedVlansStorageTestCase(base.BaseTestCase):
+class AristaProvisionedVlansStorageTestCase(testlib_api.SqlTestCase):
"""Test storing and retriving functionality of Arista mechanism driver.
Tests all methods of this class by invoking them separately as well
as a group.
"""
- def setUp(self):
- super(AristaProvisionedVlansStorageTestCase, self).setUp()
- ndb.configure_db()
- self.addCleanup(ndb.clear_db)
-
def test_tenant_is_remembered(self):
tenant_id = 'test'
self.assertRaises(arista_exc.AristaRpcError, drv.get_tenants)
-class RealNetStorageAristaDriverTestCase(base.BaseTestCase):
+class RealNetStorageAristaDriverTestCase(testlib_api.SqlTestCase):
"""Main test cases for Arista Mechanism driver.
Tests all mechanism driver APIs supported by Arista Driver. It invokes
def setUp(self):
super(RealNetStorageAristaDriverTestCase, self).setUp()
self.fake_rpc = mock.MagicMock()
- ndb.configure_db()
self.drv = arista.AristaDriver(self.fake_rpc)
def tearDown(self):
import testtools
from neutron.common import constants as n_const
-from neutron.db import api as db
from neutron.extensions import portbindings
from neutron.openstack.common import importutils
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers.cisco.nexus import mech_cisco_nexus
from neutron.plugins.ml2.drivers.cisco.nexus import nexus_db_v2
from neutron.plugins.ml2.drivers.cisco.nexus import nexus_network_driver
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
NEXUS_IP_ADDRESS = '1.1.1.1'
return self._segment
-class TestCiscoNexusDevice(base.BaseTestCase):
+class TestCiscoNexusDevice(testlib_api.SqlTestCase):
"""Unit tests for Cisco ML2 Nexus device driver."""
mech_instance.driver.nexus_switches = (
mech_instance._nexus_switches)
- db.configure_db()
-
mock.patch.object(mech_cisco_nexus.CiscoNexusMechanismDriver,
'__init__', new=new_nexus_init).start()
self._cisco_mech_driver = (mech_cisco_nexus.
CiscoNexusMechanismDriver())
- self.addCleanup(db.clear_db)
-
def _create_delete_port(self, port_config):
"""Tests creation and deletion of a virtual port."""
nexus_ip_addr = port_config.nexus_ip_addr
import collections
import testtools
-from neutron.db import api as db
from neutron.plugins.ml2.drivers.cisco.nexus import exceptions
from neutron.plugins.ml2.drivers.cisco.nexus import nexus_db_v2
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class CiscoNexusDbTest(base.BaseTestCase):
+class CiscoNexusDbTest(testlib_api.SqlTestCase):
"""Unit tests for Cisco mechanism driver's Nexus port binding database."""
NpbObj = collections.namedtuple('NpbObj', 'port vlan switch instance')
- def setUp(self):
- super(CiscoNexusDbTest, self).setUp()
- db.configure_db()
- self.addCleanup(db.clear_db)
-
def _npb_test_obj(self, pnum, vnum, switch='10.9.8.7', instance=None):
"""Creates a Nexus port binding test object from a pair of numbers."""
if pnum is 'router':
from neutron.common import topics
from neutron import context
from neutron.db import agents_db
-from neutron.db import api as db_api
from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet
from neutron import manager
uptime_patch = mock.patch(uptime, return_value=190)
uptime_patch.start()
- self.addCleanup(db_api.clear_db)
-
def tearDown(self):
l2_consts.SUPPORTED_AGENT_TYPES = self.orig_supported_agents
super(TestL2PopulationRpcTestCase, self).tearDown()
from neutron.plugins.ml2.drivers import helpers
from neutron.plugins.ml2.drivers import type_vlan
from neutron.tests import base
+from neutron.tests.unit import testlib_api
TENANT_NET = 'phys_net2'
}
-class HelpersTest(base.BaseTestCase):
+class HelpersTest(testlib_api.SqlTestCase):
def setUp(self):
super(HelpersTest, self).setUp()
- db.configure_db()
self.driver = type_vlan.VlanTypeDriver()
self.driver.network_vlan_ranges = NETWORK_VLAN_RANGES
self.driver._sync_vlan_allocations()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
self.useFixture(
fixtures.FakeLogger(
name=helpers.__name__,
from neutron.plugins.ml2 import plugin
from neutron.tests import base
from neutron.tests.unit import test_db_plugin as test_plugin
+from neutron.tests.unit import testlib_api
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
self.assertFalse(self.mech.check_segment(self.segment))
-class OpenDayLightMechanismConfigTests(base.BaseTestCase):
+class OpenDayLightMechanismConfigTests(testlib_api.SqlTestCase):
def _set_config(self, url='http://127.0.0.1:9999', username='someuser',
password='somepass'):
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_flat
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
from oslo.config import cfg
FLAT_NETWORKS = ['flat_net1', 'flat_net2']
-class FlatTypeTest(base.BaseTestCase):
+class FlatTypeTest(testlib_api.SqlTestCase):
def setUp(self):
super(FlatTypeTest, self).setUp()
- db.configure_db()
cfg.CONF.set_override('flat_networks', FLAT_NETWORKS,
group='ml2_type_flat')
self.driver = type_flat.FlatTypeDriver()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def _get_allocation(self, session, segment):
return session.query(type_flat.FlatAllocation).filter_by(
import neutron.db.api as db
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_gre
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
TUNNEL_IP_ONE = "10.10.10.10"
TUNNEL_IP_TWO = "10.10.10.20"
UPDATED_TUNNEL_RANGES = [(TUN_MIN + 5, TUN_MAX + 5)]
-class GreTypeTest(base.BaseTestCase):
+class GreTypeTest(testlib_api.SqlTestCase):
def setUp(self):
super(GreTypeTest, self).setUp()
- db.configure_db()
self.driver = type_gre.GreTypeDriver()
self.driver.gre_id_ranges = TUNNEL_RANGES
self.driver._sync_gre_allocations()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_validate_provider_segment(self):
segment = {api.NETWORK_TYPE: 'gre',
log_warn.assert_called_once_with(mock.ANY, TUNNEL_IP_ONE)
-class GreTypeMultiRangeTest(base.BaseTestCase):
+class GreTypeMultiRangeTest(testlib_api.SqlTestCase):
TUN_MIN0 = 100
TUN_MAX0 = 101
def setUp(self):
super(GreTypeMultiRangeTest, self).setUp()
- db.configure_db()
self.driver = type_gre.GreTypeDriver()
self.driver.gre_id_ranges = self.TUNNEL_MULTI_RANGES
self.driver._sync_gre_allocations()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_release_segment(self):
segments = [self.driver.allocate_tenant_segment(self.session)
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_vlan
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
PROVIDER_NET = 'phys_net1'
TENANT_NET = 'phys_net2'
}
-class VlanTypeTest(base.BaseTestCase):
+class VlanTypeTest(testlib_api.SqlTestCase):
def setUp(self):
super(VlanTypeTest, self).setUp()
- db.configure_db()
self.driver = type_vlan.VlanTypeDriver()
self.driver.network_vlan_ranges = NETWORK_VLAN_RANGES
self.driver._sync_vlan_allocations()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def _get_allocation(self, session, segment):
return session.query(type_vlan.VlanAllocation).filter_by(
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_vxlan
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
TUNNEL_IP_ONE = "10.10.10.10"
VXLAN_UDP_PORT_TWO = 8888
-class VxlanTypeTest(base.BaseTestCase):
+class VxlanTypeTest(testlib_api.SqlTestCase):
def setUp(self):
super(VxlanTypeTest, self).setUp()
- db.configure_db()
cfg.CONF.set_override('vni_ranges', [TUNNEL_RANGES],
group='ml2_type_vxlan')
cfg.CONF.set_override('vxlan_group', MULTICAST_GROUP,
self.driver.vxlan_vni_ranges = TUNNEL_RANGES
self.driver._sync_vxlan_allocations()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_vxlan_tunnel_type(self):
self.assertEqual(self.driver.get_type(), p_const.TYPE_VXLAN)
log_warn.assert_called_once_with(mock.ANY, TUNNEL_IP_ONE)
-class VxlanTypeMultiRangeTest(base.BaseTestCase):
+class VxlanTypeMultiRangeTest(testlib_api.SqlTestCase):
TUN_MIN0 = 100
TUN_MAX0 = 101
def setUp(self):
super(VxlanTypeMultiRangeTest, self).setUp()
- db.configure_db()
self.driver = type_vxlan.VxlanTypeDriver()
self.driver.vxlan_vni_ranges = self.TUNNEL_MULTI_RANGES
self.driver._sync_vxlan_allocations()
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_release_segment(self):
segments = [self.driver.allocate_tenant_segment(self.session)
from neutron.common import exceptions as n_exc
from neutron.db import api as db
from neutron.plugins.mlnx.db import mlnx_db_v2 as mlnx_db
-from neutron.tests import base
from neutron.tests.unit import test_db_plugin as test_plugin
+from neutron.tests.unit import testlib_api
PHYS_NET = 'physnet1'
PHYS_NET_2 = 'physnet2'
TEST_NETWORK_ID = 'abcdefghijklmnopqrstuvwxyz'
-class SegmentationIdAllocationTest(base.BaseTestCase):
+class SegmentationIdAllocationTest(testlib_api.SqlTestCase):
def setUp(self):
super(SegmentationIdAllocationTest, self).setUp()
- db.configure_db()
mlnx_db.sync_network_states(VLAN_RANGES)
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_sync_segmentationIdAllocation(self):
self.assertIsNone(mlnx_db.get_network_state(PHYS_NET,
class NetworkBindingsTest(test_plugin.NeutronDbPluginV2TestCase):
def setUp(self):
super(NetworkBindingsTest, self).setUp()
- db.configure_db()
self.session = db.get_session()
def test_add_network_binding(self):
from neutron.plugins.mlnx.common import config # noqa
from neutron.plugins.mlnx.common import constants
from neutron.plugins.mlnx import mlnx_plugin
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class TestMlnxPluginConfig(base.BaseTestCase):
+class TestMlnxPluginConfig(testlib_api.SqlTestCase):
expected_vlan_mappings = {'physnet1': [(1, 1000)],
'physnet2': [(1, 1000)]}
expected_network_types = {'physnet1': constants.TYPE_ETH,
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.db import api as ndb
-from neutron.plugins.nec.db import models as nmodels # noqa
from neutron.tests.unit.nec import test_nec_plugin
import mock
from neutron import context
-from neutron.db import api as db
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.common import config
from neutron.plugins.nec.db import api as ndb
-from neutron.plugins.nec.db import models as nmodels # noqa
from neutron.plugins.nec import ofc_manager
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
class FakePortInfo(object):
raise AttributeError(name)
-class OFCManagerTestBase(base.BaseTestCase):
+class OFCManagerTestBase(testlib_api.SqlTestCase):
"""Class conisting of OFCManager unit tests."""
def setUp(self):
super(OFCManagerTestBase, self).setUp()
- db.configure_db()
driver = "neutron.tests.unit.nec.stub_ofc_driver.StubOFCDriver"
config.CONF.set_override('driver', driver, 'OFC')
- self.addCleanup(ndb.clear_db)
self.plugin = mock.Mock()
self.plugin.get_packet_filters_for_port.return_value = None
self.ofc = ofc_manager.OFCManager(self.plugin)
from neutron.api.v2 import attributes
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
-from neutron.plugins.nec.db import api as ndb # noqa
from neutron.tests.unit.nec import test_nec_plugin
from neutron.tests.unit import test_extension_security_group as test_sg
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
from neutron.api.v2 import base as api_base
from neutron.common import exceptions as exc
from neutron import context as neutron_context
-from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
from neutron.db import l3_db
-from neutron.db import quota_db # noqa
from neutron.db import securitygroups_db
from neutron.extensions import portbindings
from neutron.extensions import securitygroup as ext_sg
cfg.CONF.keystone_authtoken = KeyStoneInfo()
mock.patch('requests.post').start().side_effect = FAKE_SERVER.request
- db.configure_db()
super(ContrailPluginTestCase, self).setUp(self._plugin_name)
from neutron.db import api as db
from neutron.plugins.openvswitch import ovs_db_v2
from neutron.plugins.openvswitch import ovs_models_v2 as ovs_models
-from neutron.tests import base
from neutron.tests.unit import test_db_plugin as test_plugin
+from neutron.tests.unit import testlib_api
PHYS_NET = 'physnet1'
PHYS_NET_2 = 'physnet2'
'ovs_neutron_plugin.OVSNeutronPluginV2')
-class VlanAllocationsTest(base.BaseTestCase):
+class VlanAllocationsTest(testlib_api.SqlTestCase):
def setUp(self):
super(VlanAllocationsTest, self).setUp()
- db.configure_db()
ovs_db_v2.sync_vlan_allocations(VLAN_RANGES)
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_sync_vlan_allocations(self):
self.assertIsNone(ovs_db_v2.get_vlan_allocation(PHYS_NET,
ovs_db_v2.sync_vlan_allocations({})
-class TunnelAllocationsTest(base.BaseTestCase):
+class TunnelAllocationsTest(testlib_api.SqlTestCase):
def setUp(self):
super(TunnelAllocationsTest, self).setUp()
- db.configure_db()
ovs_db_v2.sync_tunnel_allocations(TUNNEL_RANGES)
self.session = db.get_session()
- self.addCleanup(db.clear_db)
def test_sync_tunnel_allocations(self):
self.assertIsNone(ovs_db_v2.get_tunnel_allocation(TUN_MIN - 1))
cfg.CONF.set_override('network_vlan_ranges', ['physnet1:1000:2999'],
group='OVS')
super(NetworkBindingsTest, self).setUp(plugin=PLUGIN_NAME)
- db.configure_db()
self.session = db.get_session()
def test_add_network_binding(self):
from neutron.db import api as db
from neutron.plugins.ryu.common import config # noqa
from neutron.plugins.ryu.db import api_v2 as db_api_v2
-from neutron.plugins.ryu.db import models_v2 as ryu_models_v2 # noqa
from neutron.tests.unit import test_db_plugin as test_plugin
import mock
from neutron import manager
-from neutron.plugins.ryu.db import models_v2 as ryu_models_v2 # noqa
from neutron.tests.unit.ryu import fake_ryu
from neutron.tests.unit import test_db_plugin as test_plugin
import mock
from neutron.services.l3_router import l3_apic
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
TENANT = 'tenant1'
TENANT_CONTRACT = 'abcd'
self.subnet_id = SUBNET
-class TestCiscoApicL3Plugin(base.BaseTestCase):
+class TestCiscoApicL3Plugin(testlib_api.SqlTestCase):
def setUp(self):
super(TestCiscoApicL3Plugin, self).setUp()
# from oslo.config import cfg
from neutron import context as n_ctx
-from neutron.db import api as dbapi
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
# from neutron.services.vpn import plugin as vpn_plugin
from neutron.services.vpn.service_drivers import cisco_ipsec as ipsec_driver
from neutron.services.vpn.service_drivers import cisco_validator as validator
from neutron.tests import base
+from neutron.tests.unit import testlib_api
_uuid = uuidutils.generate_uuid
tenant_id='1000')
-class TestCiscoIPsecDriver(base.BaseTestCase):
+class TestCiscoIPsecDriver(testlib_api.SqlTestCase):
"""Test that various incoming requests are sent to device driver."""
def setUp(self):
super(TestCiscoIPsecDriver, self).setUp()
- dbapi.configure_db()
- self.addCleanup(dbapi.clear_db)
mock.patch('neutron.common.rpc.create_connection').start()
l3_agent = mock.Mock()
from neutron.common import test_lib
from neutron.common import utils
from neutron import context
-from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import models_v2
from neutron import manager
if not plugin:
plugin = DB_PLUGIN_KLASS
- # Create the default configurations
- args = ['--config-file', base.etcdir('neutron.conf.test')]
- # If test_config specifies some config-file, use it, as well
- for config_file in test_lib.test_config.get('config_files', []):
- args.extend(['--config-file', config_file])
- self.config_parse(args=args)
# Update the plugin
self.setup_coreplugin(plugin)
cfg.CONF.set_override(
self._skip_native_pagination = None
self._skip_native_sortin = None
self.ext_api = None
- # NOTE(jkoelker) for a 'pluggable' framework, Neutron sure
- # doesn't like when the plugin changes ;)
- db.clear_db()
# Restore the original attribute map
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
super(NeutronDbPluginV2TestCase, self).tearDown()
+ def setup_config(self):
+ # Create the default configurations
+ args = ['--config-file', base.etcdir('neutron.conf.test')]
+ # If test_config specifies some config-file, use it, as well
+ for config_file in test_lib.test_config.get('config_files', []):
+ args.extend(['--config-file', config_file])
+ self.config_parse(args=args)
+
def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
action=None, subresource=None, sub_id=None, context=None):
fmt = fmt or self.fmt
['b', '192.168.1.112', '192.168.1.120']], actual)
-class NeutronDbPluginV2AsMixinTestCase(base.BaseTestCase):
+class NeutronDbPluginV2AsMixinTestCase(testlib_api.SqlTestCase):
"""Tests for NeutronDbPluginV2 as Mixin.
While NeutronDbPluginV2TestCase checks NeutronDbPlugin and all plugins as
'admin_state_up': True,
'tenant_id': 'test-tenant',
'shared': False}}
- self.addCleanup(db.clear_db)
def test_create_network_with_default_status(self):
net = self.plugin.create_network(self.context, self.net_data)
from neutron.common import exceptions as n_exc
from neutron import context
from neutron import manager
-from neutron.tests import base
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import testlib_api
-class TestNetworks(base.BaseTestCase):
+class TestNetworks(testlib_api.SqlTestCase):
def setUp(self):
super(TestNetworks, self).setUp()
self._tenant_id = 'test-tenant'
from neutron import context
from neutron.db import agents_db
from neutron.db import agentschedulers_db
-from neutron.db import api as db
from neutron.db import models_v2
from neutron.openstack.common import timeutils
from neutron.scheduler import dhcp_agent_scheduler
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class DhcpSchedulerTestCase(base.BaseTestCase):
+class DhcpSchedulerTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(DhcpSchedulerTestCase, self).setUp()
- db.configure_db()
self.ctx = context.get_admin_context()
self.network_id = 'foo_network_id'
self._save_networks([self.network_id])
- self.addCleanup(db.clear_db)
def _get_agents(self, hosts):
return [
from neutron.extensions import l3
from neutron.extensions import l3_ext_gw_mode
from neutron.openstack.common import uuidutils
-from neutron.tests import base
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
+from neutron.tests.unit import testlib_api
_uuid = uuidutils.generate_uuid
FAKE_GW_PORT_ID = _uuid()
supported_extension_aliases = ["router", "ext-gw-mode"]
-class TestL3GwModeMixin(base.BaseTestCase):
+class TestL3GwModeMixin(testlib_api.SqlTestCase):
def setUp(self):
super(TestL3GwModeMixin, self).setUp()
# Patch the context
ctx_patcher = mock.patch('neutron.context', autospec=True)
mock_context = ctx_patcher.start()
- self.addCleanup(db_api.clear_db)
self.context = mock_context.get_admin_context()
# This ensure also calls to elevated work in unit tests
self.context.elevated.return_value = self.context
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron import context
-from neutron.db import api as qdbapi
from neutron.db import common_db_mixin
from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
from neutron.db import l3_db
from neutron.db import l3_dvr_db
from neutron.db import l3_rpc_base
-from neutron.db import model_base
from neutron.extensions import external_net
from neutron.extensions import l3
from neutron.extensions import portbindings
supported_extension_aliases = ["router"]
- def __init__(self):
- qdbapi.register_models(base=model_base.BASEV2)
-
def get_plugin_type(self):
return service_constants.L3_ROUTER_NAT
from neutron import manager
from neutron.openstack.common import timeutils
from neutron.scheduler import l3_agent_scheduler
-from neutron.tests import base
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
+from neutron.tests.unit import testlib_api
HOST = 'my_l3_host'
FIRST_L3_AGENT = {
pass
-class L3DvrSchedulerTestCase(base.BaseTestCase):
+class L3DvrSchedulerTestCase(testlib_api.SqlTestCase):
def setUp(self):
plugin = 'neutron.plugins.ml2.plugin.Ml2Plugin'
from neutron.common import config
from neutron.common import exceptions
from neutron import context
-from neutron.db import api as db
from neutron.db import quota_db
from neutron import quota
from neutron.tests import base
# extra1 here is added later, so have to do it manually
quota.QUOTAS.register_resource_by_name('extra1')
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
- db.configure_db()
app = config.load_paste_app('extensions_test_app')
ext_middleware = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
self.api = webtest.TestApp(ext_middleware)
def tearDown(self):
self.api = None
self.plugin = None
- db.clear_db()
-
# Restore the global RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
super(QuotaExtensionTestCase, self).tearDown()
from neutron.api.v2 import router
from neutron.common import config
from neutron import context as q_context
-from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import l3_db
from neutron.db.loadbalancer import loadbalancer_db as lb_db
from neutron.extensions import routedserviceinsertion as rsi
from neutron.extensions import routerservicetype as rst
from neutron.plugins.common import constants
-from neutron.tests import base
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import testlib_api
from neutron import wsgi
pass
-class RouterServiceInsertionTestCase(base.BaseTestCase):
+class RouterServiceInsertionTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(RouterServiceInsertionTestCase, self).setUp()
plugin = (
def tearDown(self):
self.api = None
- db.clear_db()
super(RouterServiceInsertionTestCase, self).tearDown()
def _setup_core_resources(self):
from neutron.api import extensions
from neutron.common import exceptions as n_exc
from neutron import context
-from neutron.db import api as db_api
from neutron.db import servicetype_db as st_db
from neutron.extensions import servicetype
from neutron.plugins.common import constants
from neutron.services import provider_configuration as provconf
-from neutron.tests import base
from neutron.tests.unit import dummy_plugin as dp
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_db_plugin
_get_path = test_api_v2._get_path
-class ServiceTypeManagerTestCase(base.BaseTestCase):
+class ServiceTypeManagerTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(ServiceTypeManagerTestCase, self).setUp()
st_db.ServiceTypeManager._instance = None
':lbaas:driver_path',
constants.DUMMY + ':dummy:dummy_dr'],
'service_providers')
- self.addCleanup(db_api.clear_db)
super(ServiceTypeManagerExtTestCase, self).setUp()
def _list_service_providers(self):
import testtools
from neutron.api.v2 import attributes
+from neutron.db import api as db_api
+# Import all data models
+from neutron.db.migration.models import head # noqa
+from neutron.db import model_base
from neutron.tests import base
from neutron import wsgi
return req
-class WebTestCase(base.BaseTestCase):
+class SqlTestCase(base.BaseTestCase):
+
+ def setUp(self):
+ super(SqlTestCase, self).setUp()
+ # Register all data models
+ engine = db_api.get_engine()
+ model_base.BASEV2.metadata.create_all(engine)
+
+ def unregister_models():
+ """Unregister all data models."""
+ model_base.BASEV2.metadata.drop_all(engine)
+
+ self.addCleanup(unregister_models)
+
+
+class WebTestCase(SqlTestCase):
fmt = 'json'
def setUp(self):
from sqlalchemy import orm
from neutron import context
-from neutron.db import api as db
from neutron.plugins.vmware.common import exceptions as p_exc
from neutron.plugins.vmware.dbexts import lsn_db
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class LSNTestCase(base.BaseTestCase):
+class LSNTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(LSNTestCase, self).setUp()
- db.configure_db()
self.ctx = context.get_admin_context()
- self.addCleanup(db.clear_db)
self.net_id = 'foo_network_id'
self.lsn_id = 'foo_lsn_id'
self.lsn_port_id = 'foo_port_id'
from oslo.db import exception as d_exc
from neutron import context
-from neutron.db import api as db
from neutron.db import models_v2
from neutron.plugins.vmware.dbexts import db as nsx_db
from neutron.plugins.vmware.dbexts import models
-from neutron.tests import base
+from neutron.tests.unit import testlib_api
-class NsxDBTestCase(base.BaseTestCase):
+class NsxDBTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(NsxDBTestCase, self).setUp()
- db.configure_db()
self.ctx = context.get_admin_context()
- self.addCleanup(db.clear_db)
def _setup_neutron_network_and_port(self, network_id, port_id):
with self.ctx.session.begin(subtransactions=True):
from neutron.common import constants as n_consts
from neutron.common import exceptions as n_exc
from neutron import context
-from neutron.db import api as db
from neutron.plugins.vmware.api_client import exception
from neutron.plugins.vmware.common import exceptions as p_exc
from neutron.plugins.vmware.dbexts import lsn_db
from neutron.plugins.vmware.dhcp_meta import nsx
from neutron.plugins.vmware.dhcp_meta import rpc
from neutron.tests import base
+from neutron.tests.unit import testlib_api
class DhcpMetadataBuilderTestCase(base.BaseTestCase):
mock.ANY, mock.ANY, mock.ANY, mock.ANY)
-class PersistentLsnManagerTestCase(base.BaseTestCase):
+class PersistentLsnManagerTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(PersistentLsnManagerTestCase, self).setUp()
self.mac = 'aa:bb:cc:dd:ee:ff'
self.lsn_port_id = 'foo_lsn_port_id'
self.tenant_id = 'foo_tenant_id'
- db.configure_db()
nsx.register_dhcp_opts(cfg)
nsx.register_metadata_opts(cfg)
lsn_man.register_lsn_opts(cfg)
self.context = context.get_admin_context()
self.mock_lsn_api_p = mock.patch.object(lsn_man, 'lsn_api')
self.mock_lsn_api = self.mock_lsn_api_p.start()
- self.addCleanup(db.clear_db)
def test_lsn_get(self):
lsn_db.lsn_add(self.context, self.net_id, self.lsn_id)
from neutron.plugins.vmware import plugin
from neutron.tests import base
from neutron.tests.unit import test_api_v2
+from neutron.tests.unit import testlib_api
from neutron.tests.unit import vmware
from neutron.tests.unit.vmware.apiclient import fake
self.assertTrue(synchronizer._synchronize_state.call_count)
-class SyncTestCase(base.BaseTestCase):
+class SyncTestCase(testlib_api.SqlTestCase):
def setUp(self):
# mock api client