#
# @author: Sumit Naiksatam, Cisco Systems, Inc.
-import inspect
import logging
-from sqlalchemy import orm
import webob.exc as wexc
from neutron.api.v2 import base
-from neutron.common import exceptions as exc
from neutron.db import db_base_plugin_v2
-from neutron.db import models_v2
from neutron.openstack.common import importutils
-from neutron.plugins.cisco.common import cisco_constants as const
from neutron.plugins.cisco.common import cisco_exceptions as cexc
from neutron.plugins.cisco.common import config
from neutron.plugins.cisco.db import network_db_v2 as cdb
'create_subnet',
'delete_subnet', 'update_subnet',
'get_subnet', 'get_subnets', ]
- _master = True
CISCO_FAULT_MAP = {
cexc.NetworkSegmentIDNotFound: wexc.HTTPNotFound,
def __init__(self):
"""Load the model class."""
self._model = importutils.import_object(config.CISCO.model_class)
- if hasattr(self._model, "MANAGE_STATE") and self._model.MANAGE_STATE:
- self._master = False
- LOG.debug(_("Model %s manages state"), config.CISCO.model_class)
- native_bulk_attr_name = ("_%s__native_bulk_support"
- % self._model.__class__.__name__)
- self.__native_bulk_support = getattr(self._model,
- native_bulk_attr_name, False)
+ native_bulk_attr_name = ("_%s__native_bulk_support"
+ % self._model.__class__.__name__)
+ self.__native_bulk_support = getattr(self._model,
+ native_bulk_attr_name, False)
if hasattr(self._model, "supported_extension_aliases"):
self.supported_extension_aliases.extend(
def __getattribute__(self, name):
"""Delegate core API calls to the model class.
- When the configured model class offers to manage the state of the
- logical resources, we delegate the core API calls directly to it.
+ Core API calls are delegated directly to the configured model class.
Note: Bulking calls will be handled by this class, and turned into
non-bulking calls to be considered for delegation.
"""
- master = object.__getattribute__(self, "_master")
methods = object.__getattribute__(self, "_methods_to_delegate")
- if not master and name in methods:
+ if name in methods:
return getattr(object.__getattribute__(self, "_model"),
name)
else:
"""
base.FAULT_MAP.update(self.CISCO_FAULT_MAP)
- """
- Core API implementation
- """
- def create_network(self, context, network):
- """Create new Virtual Network, and assigns it a symbolic name."""
- LOG.debug(_("create_network() called"))
- new_network = super(PluginV2, self).create_network(context,
- network)
- try:
- self._invoke_device_plugins(self._func_name(), [context,
- new_network])
- return new_network
- except Exception:
- super(PluginV2, self).delete_network(context,
- new_network['id'])
- raise
-
- def update_network(self, context, id, network):
- """Update network.
-
- Updates the symbolic name belonging to a particular Virtual Network.
- """
- LOG.debug(_("update_network() called"))
- upd_net_dict = super(PluginV2, self).update_network(context, id,
- network)
- self._invoke_device_plugins(self._func_name(), [context, id,
- upd_net_dict])
- return upd_net_dict
-
- def delete_network(self, context, id):
- """Delete network.
-
- Deletes the network with the specified network identifier
- belonging to the specified tenant.
- """
- LOG.debug(_("delete_network() called"))
- #We first need to check if there are any ports on this network
- with context.session.begin():
- network = self._get_network(context, id)
- filter = {'network_id': [id]}
- ports = self.get_ports(context, filters=filter)
-
- # check if there are any tenant owned ports in-use
- prefix = db_base_plugin_v2.AGENT_OWNER_PREFIX
- only_svc = all(p['device_owner'].startswith(prefix) for p in ports)
- if not only_svc:
- raise exc.NetworkInUse(net_id=id)
- context.session.close()
- #Network does not have any ports, we can proceed to delete
- network = self._get_network(context, id)
- kwargs = {const.NETWORK: network,
- const.BASE_PLUGIN_REF: self}
- self._invoke_device_plugins(self._func_name(), [context, id,
- kwargs])
- return super(PluginV2, self).delete_network(context, id)
-
- def get_network(self, context, id, fields=None):
- """Get a particular network."""
- LOG.debug(_("get_network() called"))
- return super(PluginV2, self).get_network(context, id, fields)
-
- def get_networks(self, context, filters=None, fields=None):
- """Get all networks."""
- LOG.debug(_("get_networks() called"))
- return super(PluginV2, self).get_networks(context, filters, fields)
-
- def create_port(self, context, port):
- """Create a port on the specified Virtual Network."""
- LOG.debug(_("create_port() called"))
- new_port = super(PluginV2, self).create_port(context, port)
- try:
- self._invoke_device_plugins(self._func_name(), [context, new_port])
- return new_port
- except Exception:
- super(PluginV2, self).delete_port(context, new_port['id'])
- raise
-
- def delete_port(self, context, id):
- LOG.debug(_("delete_port() called"))
- port = self._get_port(context, id)
- """Delete port.
-
- TODO (Sumit): Disabling this check for now, check later
- Allow deleting a port only if the administrative state is down,
- and its operation status is also down
- #if port['admin_state_up'] or port['status'] == 'ACTIVE':
- # raise exc.PortInUse(port_id=id, net_id=port['network_id'],
- # att_id=port['device_id'])
- """
- kwargs = {const.PORT: port}
- # TODO(Sumit): Might first need to check here if port is active
- self._invoke_device_plugins(self._func_name(), [context, id,
- kwargs])
- return super(PluginV2, self).delete_port(context, id)
-
- def update_port(self, context, id, port):
- """Update the state of a port and return the updated port."""
- LOG.debug(_("update_port() called"))
- self._invoke_device_plugins(self._func_name(), [context, id,
- port])
- return super(PluginV2, self).update_port(context, id, port)
-
- def create_subnet(self, context, subnet):
- """Create subnet.
-
- Create a subnet, which represents a range of IP addresses
- that can be allocated to devices.
- """
- LOG.debug(_("create_subnet() called"))
- new_subnet = super(PluginV2, self).create_subnet(context, subnet)
- try:
- self._invoke_device_plugins(self._func_name(), [context,
- new_subnet])
- return new_subnet
- except Exception:
- super(PluginV2, self).delete_subnet(context, new_subnet['id'])
- raise
-
- def update_subnet(self, context, id, subnet):
- """Updates the state of a subnet and returns the updated subnet."""
- LOG.debug(_("update_subnet() called"))
- self._invoke_device_plugins(self._func_name(), [context, id,
- subnet])
- return super(PluginV2, self).update_subnet(context, id, subnet)
-
- def delete_subnet(self, context, id):
- LOG.debug(_("delete_subnet() called"))
- with context.session.begin():
- subnet = self._get_subnet(context, id)
- # Check if ports are using this subnet
- allocated_qry = context.session.query(models_v2.IPAllocation)
- allocated_qry = allocated_qry.options(orm.joinedload('ports'))
- allocated = allocated_qry.filter_by(subnet_id=id)
-
- prefix = db_base_plugin_v2.AGENT_OWNER_PREFIX
- if not all(not a.port_id or a.ports.device_owner.startswith(prefix)
- for a in allocated):
- raise exc.SubnetInUse(subnet_id=id)
- context.session.close()
- kwargs = {const.SUBNET: subnet}
- self._invoke_device_plugins(self._func_name(), [context, id,
- kwargs])
- return super(PluginV2, self).delete_subnet(context, id)
-
"""
Extension API implementation
"""
def get_credential_details(self, credential_id):
"""Get a particular credential."""
LOG.debug(_("get_credential_details() called"))
- try:
- credential = cdb.get_credential(credential_id)
- except exc.NotFound:
- raise cexc.CredentialNotFound(credential_id=credential_id)
- return credential
+ return cdb.get_credential(credential_id)
- def rename_credential(self, credential_id, new_name):
+ def rename_credential(self, credential_id, new_name, new_password):
"""Rename the particular credential resource."""
LOG.debug(_("rename_credential() called"))
- try:
- credential = cdb.get_credential(credential_id)
- except exc.NotFound:
- raise cexc.CredentialNotFound(credential_id=credential_id)
- credential = cdb.update_credential(credential_id, new_name)
- return credential
-
- def schedule_host(self, tenant_id, instance_id, instance_desc):
- """Provides the hostname on which a dynamic vnic is reserved."""
- LOG.debug(_("schedule_host() called"))
- host_list = self._invoke_device_plugins(self._func_name(),
- [tenant_id,
- instance_id,
- instance_desc])
- return host_list
-
- def associate_port(self, tenant_id, instance_id, instance_desc):
- """Associate port.
-
- Get the portprofile name and the device name for the dynamic vnic.
- """
- LOG.debug(_("associate_port() called"))
- return self._invoke_device_plugins(self._func_name(), [tenant_id,
- instance_id,
- instance_desc])
-
- def detach_port(self, tenant_id, instance_id, instance_desc):
- """Remove the association of the VIF with the dynamic vnic."""
- LOG.debug(_("detach_port() called"))
- return self._invoke_device_plugins(self._func_name(), [tenant_id,
- instance_id,
- instance_desc])
-
- """
- Private functions
- """
- def _invoke_device_plugins(self, function_name, args):
- """Device-specific calls.
-
- Including core API and extensions are delegated to the model.
- """
- if hasattr(self._model, function_name):
- return getattr(self._model, function_name)(*args)
-
- def _func_name(self, offset=0):
- """Getting the name of the calling funciton."""
- return inspect.stack()[1 + offset][3]
+ return cdb.update_credential(credential_id, new_name,
+ new_password=new_password)
# under the License.
import collections
+import mock
import testtools
from neutron.db import api as db
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.db import network_db_v2 as cdb
+from neutron.plugins.cisco import network_plugin
from neutron.tests import base
-class CiscoNetworkQosDbTest(base.BaseTestCase):
+class CiscoNetworkDbTest(base.BaseTestCase):
- """Unit tests for cisco.db.network_models_v2.QoS model."""
-
- QosObj = collections.namedtuple('QosObj', 'tenant qname desc')
+ """Base class for Cisco network database unit tests."""
def setUp(self):
- super(CiscoNetworkQosDbTest, self).setUp()
+ super(CiscoNetworkDbTest, self).setUp()
db.configure_db()
self.session = db.get_session()
+
+ # The Cisco network plugin includes a thin layer of QoS and
+ # credential API methods which indirectly call Cisco QoS and
+ # credential database access methods. For better code coverage,
+ # this test suite will make calls to the QoS and credential database
+ # access methods indirectly through the network plugin. The network
+ # plugin's init function can be mocked out for this purpose.
+ def new_network_plugin_init(instance):
+ pass
+ with mock.patch.object(network_plugin.PluginV2,
+ '__init__', new=new_network_plugin_init):
+ self._network_plugin = network_plugin.PluginV2()
+
self.addCleanup(db.clear_db)
+
+class CiscoNetworkQosDbTest(CiscoNetworkDbTest):
+
+ """Unit tests for Cisco network QoS database model."""
+
+ QosObj = collections.namedtuple('QosObj', 'tenant qname desc')
+
def _qos_test_obj(self, tnum, qnum, desc=None):
"""Create a Qos test object from a pair of numbers."""
if desc is None:
def test_qos_add_remove(self):
qos11 = self._qos_test_obj(1, 1)
- qos = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc)
+ qos = self._network_plugin.create_qos(qos11.tenant, qos11.qname,
+ qos11.desc)
self._assert_equal(qos, qos11)
qos_id = qos.qos_id
- qos = cdb.remove_qos(qos11.tenant, qos_id)
+ qos = self._network_plugin.delete_qos(qos11.tenant, qos_id)
self._assert_equal(qos, qos11)
- qos = cdb.remove_qos(qos11.tenant, qos_id)
+ qos = self._network_plugin.delete_qos(qos11.tenant, qos_id)
self.assertIsNone(qos)
def test_qos_add_dup(self):
qos22 = self._qos_test_obj(2, 2)
- qos = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc)
+ qos = self._network_plugin.create_qos(qos22.tenant, qos22.qname,
+ qos22.desc)
self._assert_equal(qos, qos22)
qos_id = qos.qos_id
with testtools.ExpectedException(c_exc.QosNameAlreadyExists):
- cdb.add_qos(qos22.tenant, qos22.qname, "duplicate 22")
- qos = cdb.remove_qos(qos22.tenant, qos_id)
+ self._network_plugin.create_qos(qos22.tenant, qos22.qname,
+ "duplicate 22")
+ qos = self._network_plugin.delete_qos(qos22.tenant, qos_id)
self._assert_equal(qos, qos22)
- qos = cdb.remove_qos(qos22.tenant, qos_id)
+ qos = self._network_plugin.delete_qos(qos22.tenant, qos_id)
self.assertIsNone(qos)
def test_qos_get(self):
qos11 = self._qos_test_obj(1, 1)
- qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id
+ qos11_id = self._network_plugin.create_qos(qos11.tenant, qos11.qname,
+ qos11.desc).qos_id
qos21 = self._qos_test_obj(2, 1)
- qos21_id = cdb.add_qos(qos21.tenant, qos21.qname, qos21.desc).qos_id
+ qos21_id = self._network_plugin.create_qos(qos21.tenant, qos21.qname,
+ qos21.desc).qos_id
qos22 = self._qos_test_obj(2, 2)
- qos22_id = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc).qos_id
+ qos22_id = self._network_plugin.create_qos(qos22.tenant, qos22.qname,
+ qos22.desc).qos_id
- qos = cdb.get_qos(qos11.tenant, qos11_id)
+ qos = self._network_plugin.get_qos_details(qos11.tenant, qos11_id)
self._assert_equal(qos, qos11)
- qos = cdb.get_qos(qos21.tenant, qos21_id)
+ qos = self._network_plugin.get_qos_details(qos21.tenant, qos21_id)
self._assert_equal(qos, qos21)
- qos = cdb.get_qos(qos21.tenant, qos22_id)
+ qos = self._network_plugin.get_qos_details(qos21.tenant, qos22_id)
self._assert_equal(qos, qos22)
with testtools.ExpectedException(c_exc.QosNotFound):
- cdb.get_qos(qos11.tenant, "dummyQosId")
+ self._network_plugin.get_qos_details(qos11.tenant, "dummyQosId")
with testtools.ExpectedException(c_exc.QosNotFound):
- cdb.get_qos(qos11.tenant, qos21_id)
+ self._network_plugin.get_qos_details(qos11.tenant, qos21_id)
with testtools.ExpectedException(c_exc.QosNotFound):
- cdb.get_qos(qos21.tenant, qos11_id)
+ self._network_plugin.get_qos_details(qos21.tenant, qos11_id)
- qos_all_t1 = cdb.get_all_qoss(qos11.tenant)
+ qos_all_t1 = self._network_plugin.get_all_qoss(qos11.tenant)
self.assertEqual(len(qos_all_t1), 1)
- qos_all_t2 = cdb.get_all_qoss(qos21.tenant)
+ qos_all_t2 = self._network_plugin.get_all_qoss(qos21.tenant)
self.assertEqual(len(qos_all_t2), 2)
- qos_all_t3 = cdb.get_all_qoss("tenant3")
+ qos_all_t3 = self._network_plugin.get_all_qoss("tenant3")
self.assertEqual(len(qos_all_t3), 0)
def test_qos_update(self):
qos11 = self._qos_test_obj(1, 1)
- qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id
- cdb.update_qos(qos11.tenant, qos11_id)
+ qos11_id = self._network_plugin.create_qos(qos11.tenant, qos11.qname,
+ qos11.desc).qos_id
+ self._network_plugin.rename_qos(qos11.tenant, qos11_id,
+ new_name=None)
new_qname = "new qos name"
- new_qos = cdb.update_qos(qos11.tenant, qos11_id, new_qname)
+ new_qos = self._network_plugin.rename_qos(qos11.tenant, qos11_id,
+ new_qname)
expected_qobj = self.QosObj(qos11.tenant, new_qname, qos11.desc)
self._assert_equal(new_qos, expected_qobj)
- new_qos = cdb.get_qos(qos11.tenant, qos11_id)
+ new_qos = self._network_plugin.get_qos_details(qos11.tenant, qos11_id)
self._assert_equal(new_qos, expected_qobj)
with testtools.ExpectedException(c_exc.QosNotFound):
- cdb.update_qos(qos11.tenant, "dummyQosId")
+ self._network_plugin.rename_qos(qos11.tenant, "dummyQosId",
+ new_name=None)
-class CiscoNetworkCredentialDbTest(base.BaseTestCase):
+class CiscoNetworkCredentialDbTest(CiscoNetworkDbTest):
- """Unit tests for cisco.db.network_models_v2.Credential model."""
+ """Unit tests for Cisco network credentials database model."""
CredObj = collections.namedtuple('CredObj', 'cname usr pwd ctype')
- def setUp(self):
- super(CiscoNetworkCredentialDbTest, self).setUp()
- db.configure_db()
- self.session = db.get_session()
- self.addCleanup(db.clear_db)
-
def _cred_test_obj(self, tnum, cnum):
"""Create a Credential test object from a pair of numbers."""
cname = 'credential_%s_%s' % (str(tnum), str(cnum))
cred22_id = cdb.add_credential(
cred22.cname, cred22.usr, cred22.pwd, cred22.ctype).credential_id
- cred = cdb.get_credential(cred11_id)
+ cred = self._network_plugin.get_credential_details(cred11_id)
self._assert_equal(cred, cred11)
- cred = cdb.get_credential(cred21_id)
+ cred = self._network_plugin.get_credential_details(cred21_id)
self._assert_equal(cred, cred21)
- cred = cdb.get_credential(cred22_id)
+ cred = self._network_plugin.get_credential_details(cred22_id)
self._assert_equal(cred, cred22)
with testtools.ExpectedException(c_exc.CredentialNotFound):
- cdb.get_credential("dummyCredentialId")
+ self._network_plugin.get_credential_details("dummyCredentialId")
- cred_all_t1 = cdb.get_all_credentials()
+ cred_all_t1 = self._network_plugin.get_all_credentials()
self.assertEqual(len(cred_all_t1), 3)
def test_credential_get_name(self):
cred11 = self._cred_test_obj(1, 1)
cred11_id = cdb.add_credential(
cred11.cname, cred11.usr, cred11.pwd, cred11.ctype).credential_id
- cdb.update_credential(cred11_id)
+ self._network_plugin.rename_credential(cred11_id, new_name=None,
+ new_password=None)
new_usr = "new user name"
new_pwd = "new password"
- new_credential = cdb.update_credential(
+ new_credential = self._network_plugin.rename_credential(
cred11_id, new_usr, new_pwd)
expected_cred = self.CredObj(
cred11.cname, new_usr, new_pwd, cred11.ctype)
self._assert_equal(new_credential, expected_cred)
- new_credential = cdb.get_credential(cred11_id)
+ new_credential = self._network_plugin.get_credential_details(
+ cred11_id)
self._assert_equal(new_credential, expected_cred)
with testtools.ExpectedException(c_exc.CredentialNotFound):
- cdb.update_credential(
+ self._network_plugin.rename_credential(
"dummyCredentialId", new_usr, new_pwd)
+
+ def test_get_credential_not_found_exception(self):
+ self.assertRaises(c_exc.CredentialNotFound,
+ self._network_plugin.get_credential_details,
+ "dummyCredentialId")