--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""Cisco plugin db cleanup part II
+
+Revision ID: 263772d65691
+Revises: 35c7c198ddea
+Create Date: 2013-07-29 02:31:26.646343
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '263772d65691'
+down_revision = '35c7c198ddea'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+ 'neutron.plugins.cisco.network_plugin.PluginV2'
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+from neutron.db import migration
+
+
+def upgrade(active_plugin=None, options=None):
+ if not migration.should_run(active_plugin, migration_for_plugins):
+ return
+
+ if 'credentials' in sa.MetaData().tables:
+ op.rename_table('credentials', 'cisco_credentials')
+ if 'nexusport_bindings' in sa.MetaData().tables:
+ op.rename_table('nexusport_bindings', 'cisco_nexusport_bindings')
+ if 'qoss' in sa.MetaData().tables:
+ op.rename_table('qoss', 'cisco_qos_policies')
+
+ op.drop_table('cisco_vlan_ids')
+
+
+def downgrade(active_plugin=None, options=None):
+ if not migration.should_run(active_plugin, migration_for_plugins):
+ return
+
+ op.create_table(
+ 'cisco_vlan_ids',
+ sa.Column('vlan_id', sa.Integer, nullable=False),
+ sa.Column('vlan_used', sa.Boolean),
+ sa.PrimaryKeyConstraint('vlan_id'),
+ )
+
+ if 'cisco_credentials' in sa.MetaData().tables:
+ op.rename_table('cisco_credentials', 'credentials')
+ if 'cisco_nexusport_bindings' in sa.MetaData().tables:
+ op.rename_table('cisco_nexusport_bindings', 'nexusport_bindings')
+ if 'cisco_qos_policies' in sa.MetaData().tables:
+ op.rename_table('cisco_qos_policies', 'qoss')
class CredentialAlreadyExists(exceptions.NeutronException):
- """Credential ID already exists."""
- message = _("Credential %(credential_id)s already exists "
+ """Credential already exists."""
+ message = _("Credential %(credential_name)s already exists "
"for tenant %(tenant_id)s")
from neutron.db import api as db
from neutron.openstack.common import log as logging
+from neutron.openstack.common import uuidutils
from neutron.plugins.cisco.common import cisco_constants as const
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.db import network_models_v2
+# Do NOT remove this import. It is required for all the models to be seen
+# by db.initalize() when called from VirtualPhysicalSwitchModelV2.__init__.
+from neutron.plugins.cisco.db import nexus_models_v2 # noqa
from neutron.plugins.openvswitch import ovs_models_v2
LOG = logging.getLogger(__name__)
-def get_all_vlanids():
- """Gets all the vlanids."""
- LOG.debug(_("get_all_vlanids() called"))
- session = db.get_session()
- return session.query(network_models_v2.VlanID).all()
-
-
-def is_vlanid_used(vlan_id):
- """Checks if a vlanid is in use."""
- LOG.debug(_("is_vlanid_used() called"))
- session = db.get_session()
- try:
- vlanid = (session.query(network_models_v2.VlanID).
- filter_by(vlan_id=vlan_id).one())
- return vlanid["vlan_used"]
- except exc.NoResultFound:
- raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
-
-
-def release_vlanid(vlan_id):
- """Sets the vlanid state to be unused."""
- LOG.debug(_("release_vlanid() called"))
- session = db.get_session()
- try:
- vlanid = (session.query(network_models_v2.VlanID).
- filter_by(vlan_id=vlan_id).one())
- vlanid["vlan_used"] = False
- session.merge(vlanid)
- session.flush()
- return vlanid["vlan_used"]
- except exc.NoResultFound:
- raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
-
-
-def delete_vlanid(vlan_id):
- """Deletes a vlanid entry from db."""
- LOG.debug(_("delete_vlanid() called"))
- session = db.get_session()
- try:
- vlanid = (session.query(network_models_v2.VlanID).
- filter_by(vlan_id=vlan_id).one())
- session.delete(vlanid)
- session.flush()
- return vlanid
- except exc.NoResultFound:
- pass
-
-
-def reserve_vlanid():
- """Reserves the first unused vlanid."""
- LOG.debug(_("reserve_vlanid() called"))
- session = db.get_session()
- try:
- rvlan = (session.query(network_models_v2.VlanID).
- filter_by(vlan_used=False).first())
- if not rvlan:
- raise exc.NoResultFound
- rvlanid = (session.query(network_models_v2.VlanID).
- filter_by(vlan_id=rvlan["vlan_id"]).one())
- rvlanid["vlan_used"] = True
- session.merge(rvlanid)
- session.flush()
- return rvlan["vlan_id"]
- except exc.NoResultFound:
- raise c_exc.VlanIDNotAvailable()
-
-
-def get_all_vlanids_used():
- """Gets all the vlanids used."""
- LOG.debug(_("get_all_vlanids() called"))
- session = db.get_session()
- return (session.query(network_models_v2.VlanID).
- filter_by(vlan_used=True).all())
-
-
def get_all_qoss(tenant_id):
"""Lists all the qos to tenant associations."""
LOG.debug(_("get_all_qoss() called"))
raise c_exc.QosNameAlreadyExists(qos_name=qos_name,
tenant_id=tenant_id)
except exc.NoResultFound:
- qos = network_models_v2.QoS(tenant_id, qos_name, qos_desc)
+ qos = network_models_v2.QoS(qos_id=uuidutils.generate_uuid(),
+ tenant_id=tenant_id,
+ qos_name=qos_name,
+ qos_desc=qos_desc)
session.add(qos)
session.flush()
return qos
raise c_exc.CredentialAlreadyExists(credential_name=credential_name,
tenant_id=tenant_id)
except exc.NoResultFound:
- cred = network_models_v2.Credential(tenant_id, credential_name,
- user_name, password)
+ cred = network_models_v2.Credential(
+ credential_id=uuidutils.generate_uuid(),
+ tenant_id=tenant_id,
+ credential_name=credential_name,
+ user_name=user_name,
+ password=password)
session.add(cred)
session.flush()
return cred
#
# @author: Rohit Agarwalla, Cisco Systems, Inc.
-from sqlalchemy import Column, ForeignKey, Integer, String, Boolean
+import sqlalchemy as sa
from neutron.db import model_base
-from neutron.openstack.common import uuidutils
-
-
-class VlanID(model_base.BASEV2):
- """Represents a vlan_id usage."""
- __tablename__ = 'cisco_vlan_ids'
-
- vlan_id = Column(Integer, primary_key=True)
- vlan_used = Column(Boolean)
-
- def __init__(self, vlan_id):
- self.vlan_id = vlan_id
- self.vlan_used = False
-
- def __repr__(self):
- return "<VlanID(%d,%s)>" % (self.vlan_id, self.vlan_used)
class QoS(model_base.BASEV2):
- """Represents QoS for a tenant."""
+ """Represents QoS policies for a tenant."""
- __tablename__ = 'qoss'
+ __tablename__ = 'cisco_qos_policies'
- qos_id = Column(String(255))
- tenant_id = Column(String(255), primary_key=True)
- qos_name = Column(String(255), primary_key=True)
- qos_desc = Column(String(255))
-
- def __init__(self, tenant_id, qos_name, qos_desc):
- self.qos_id = uuidutils.generate_uuid()
- self.tenant_id = tenant_id
- self.qos_name = qos_name
- self.qos_desc = qos_desc
-
- def __repr__(self):
- return "<QoS(%s,%s,%s,%s)>" % (self.qos_id, self.tenant_id,
- self.qos_name, self.qos_desc)
+ qos_id = sa.Column(sa.String(255))
+ tenant_id = sa.Column(sa.String(255), primary_key=True)
+ qos_name = sa.Column(sa.String(255), primary_key=True)
+ qos_desc = sa.Column(sa.String(255))
class Credential(model_base.BASEV2):
- """Represents credentials for a tenant."""
-
- __tablename__ = 'credentials'
-
- credential_id = Column(String(255))
- tenant_id = Column(String(255), primary_key=True)
- credential_name = Column(String(255), primary_key=True)
- user_name = Column(String(255))
- password = Column(String(255))
+ """Represents credentials for a tenant to control Cisco switches."""
- def __init__(self, tenant_id, credential_name, user_name, password):
- self.credential_id = uuidutils.generate_uuid()
- self.tenant_id = tenant_id
- self.credential_name = credential_name
- self.user_name = user_name
- self.password = password
+ __tablename__ = 'cisco_credentials'
- def __repr__(self):
- return "<Credentials(%s,%s,%s,%s,%s)>" % (self.credential_id,
- self.tenant_id,
- self.credential_name,
- self.user_name,
- self.password)
+ credential_id = sa.Column(sa.String(255))
+ tenant_id = sa.Column(sa.String(255), primary_key=True)
+ credential_name = sa.Column(sa.String(255), primary_key=True)
+ user_name = sa.Column(sa.String(255))
+ password = sa.Column(sa.String(255))
class ProviderNetwork(model_base.BASEV2):
__tablename__ = 'cisco_provider_networks'
- network_id = Column(String(36),
- ForeignKey('networks.id', ondelete="CASCADE"),
- primary_key=True)
- network_type = Column(String(255), nullable=False)
- segmentation_id = Column(Integer, nullable=False)
+ network_id = sa.Column(sa.String(36),
+ sa.ForeignKey('networks.id', ondelete="CASCADE"),
+ primary_key=True)
+ network_type = sa.Column(sa.String(255), nullable=False)
+ segmentation_id = sa.Column(sa.Integer, nullable=False)
# @author: Arvind Somya, Cisco Systems, Inc. (asomya@cisco.com)
#
-from sqlalchemy.orm import exc
+import sqlalchemy.orm.exc as sa_exc
import neutron.db.api as db
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
-def get_all_nexusport_bindings():
- """Lists all the nexusport bindings."""
- LOG.debug(_("get_all_nexusport_bindings() called"))
- session = db.get_session()
- return session.query(nexus_models_v2.NexusPortBinding).all()
-
-
def get_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Lists a nexusport binding."""
LOG.debug(_("get_nexusport_binding() called"))
- session = db.get_session()
-
- filters = dict(port_id=port_id, vlan_id=vlan_id, switch_ip=switch_ip,
- instance_id=instance_id)
- bindings = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(**filters).all())
- if not bindings:
- raise c_exc.NexusPortBindingNotFound(**filters)
-
- return bindings
+ return _lookup_all_nexus_bindings(port_id=port_id,
+ vlan_id=vlan_id,
+ switch_ip=switch_ip,
+ instance_id=instance_id)
def get_nexusvlan_binding(vlan_id, switch_ip):
"""Lists a vlan and switch binding."""
LOG.debug(_("get_nexusvlan_binding() called"))
- session = db.get_session()
-
- filters = dict(vlan_id=vlan_id, switch_ip=switch_ip)
- bindings = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(**filters).all())
- if not bindings:
- raise c_exc.NexusPortBindingNotFound(**filters)
-
- return bindings
+ return _lookup_all_nexus_bindings(vlan_id=vlan_id, switch_ip=switch_ip)
def add_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Adds a nexusport binding."""
LOG.debug(_("add_nexusport_binding() called"))
session = db.get_session()
- binding = nexus_models_v2.NexusPortBinding(
- port_id, vlan_id, switch_ip, instance_id)
+ binding = nexus_models_v2.NexusPortBinding(port_id=port_id,
+ vlan_id=vlan_id,
+ switch_ip=switch_ip,
+ instance_id=instance_id)
session.add(binding)
session.flush()
return binding
"""Removes a nexusport binding."""
LOG.debug(_("remove_nexusport_binding() called"))
session = db.get_session()
- binding = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(vlan_id=vlan_id).filter_by(switch_ip=switch_ip).
- filter_by(port_id=port_id).
- filter_by(instance_id=instance_id).all())
-
+ binding = _lookup_all_nexus_bindings(session=session,
+ vlan_id=vlan_id,
+ switch_ip=switch_ip,
+ port_id=port_id,
+ instance_id=instance_id)
for bind in binding:
session.delete(bind)
session.flush()
def update_nexusport_binding(port_id, new_vlan_id):
"""Updates nexusport binding."""
+ if not new_vlan_id:
+ LOG.warning(_("update_nexusport_binding called with no vlan"))
+ return
LOG.debug(_("update_nexusport_binding called"))
session = db.get_session()
- try:
- binding = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(port_id=port_id).one())
- if new_vlan_id:
- binding["vlan_id"] = new_vlan_id
- session.merge(binding)
- session.flush()
- return binding
- except exc.NoResultFound:
- raise c_exc.NexusPortBindingNotFound(port_id=port_id)
+ binding = _lookup_one_nexus_binding(session=session, port_id=port_id)
+ binding.vlan_id = new_vlan_id
+ session.merge(binding)
+ session.flush()
+ return binding
def get_nexusvm_binding(vlan_id, instance_id):
"""Lists nexusvm bindings."""
LOG.debug(_("get_nexusvm_binding() called"))
- session = db.get_session()
-
- filters = dict(instance_id=instance_id, vlan_id=vlan_id)
- binding = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(**filters).first())
- if not binding:
- raise c_exc.NexusPortBindingNotFound(**filters)
-
- return binding
+ return _lookup_first_nexus_binding(instance_id=instance_id,
+ vlan_id=vlan_id)
def get_port_vlan_switch_binding(port_id, vlan_id, switch_ip):
"""Lists nexusvm bindings."""
LOG.debug(_("get_port_vlan_switch_binding() called"))
- session = db.get_session()
-
- filters = dict(port_id=port_id, switch_ip=switch_ip, vlan_id=vlan_id)
- bindings = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(**filters).all())
- if not bindings:
- raise c_exc.NexusPortBindingNotFound(**filters)
-
- return bindings
+ return _lookup_all_nexus_bindings(port_id=port_id,
+ switch_ip=switch_ip,
+ vlan_id=vlan_id)
def get_port_switch_bindings(port_id, switch_ip):
LOG.debug(_("get_port_switch_bindings() called, "
"port:'%(port_id)s', switch:'%(switch_ip)s'"),
{'port_id': port_id, 'switch_ip': switch_ip})
- session = db.get_session()
try:
- binding = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(port_id=port_id).
- filter_by(switch_ip=switch_ip).all())
- return binding
- except exc.NoResultFound:
- return
+ return _lookup_all_nexus_bindings(port_id=port_id,
+ switch_ip=switch_ip)
+ except c_exc.NexusPortBindingNotFound:
+ pass
def get_nexussvi_bindings():
"""Lists nexus svi bindings."""
LOG.debug(_("get_nexussvi_bindings() called"))
- session = db.get_session()
+ return _lookup_all_nexus_bindings(port_id='router')
+
+
+def _lookup_nexus_bindings(query_type, session=None, **bfilter):
+ """Look up 'query_type' Nexus bindings matching the filter.
+
+ :param query_type: 'all', 'one' or 'first'
+ :param session: db session
+ :param bfilter: filter for bindings query
+ :return: bindings if query gave a result, else
+ raise NexusPortBindingNotFound.
+ """
+ if session is None:
+ session = db.get_session()
+ query_method = getattr(session.query(
+ nexus_models_v2.NexusPortBinding).filter_by(**bfilter), query_type)
+ try:
+ bindings = query_method()
+ if bindings:
+ return bindings
+ except sa_exc.NoResultFound:
+ pass
+ raise c_exc.NexusPortBindingNotFound(**bfilter)
+
+
+def _lookup_all_nexus_bindings(session=None, **bfilter):
+ return _lookup_nexus_bindings('all', session, **bfilter)
+
+
+def _lookup_one_nexus_binding(session=None, **bfilter):
+ return _lookup_nexus_bindings('one', session, **bfilter)
- filters = {'port_id': 'router'}
- bindings = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(**filters).all())
- if not bindings:
- raise c_exc.NexusPortBindingNotFound(**filters)
- return bindings
+def _lookup_first_nexus_binding(session=None, **bfilter):
+ return _lookup_nexus_bindings('first', session, **bfilter)
# under the License.
# @author: Rohit Agarwalla, Cisco Systems, Inc.
-from sqlalchemy import Column, Integer, String
+import sqlalchemy as sa
from neutron.db import model_base
class NexusPortBinding(model_base.BASEV2):
"""Represents a binding of VM's to nexus ports."""
- __tablename__ = "nexusport_bindings"
+ __tablename__ = "cisco_nexusport_bindings"
- id = Column(Integer, primary_key=True, autoincrement=True)
- port_id = Column(String(255))
- vlan_id = Column(Integer, nullable=False)
- switch_ip = Column(String(255))
- instance_id = Column(String(255))
-
- def __init__(self, port_id, vlan_id, switch_ip, instance_id):
- self.port_id = port_id
- self.vlan_id = vlan_id
- self.switch_ip = switch_ip
- self.instance_id = instance_id
+ id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
+ port_id = sa.Column(sa.String(255))
+ vlan_id = sa.Column(sa.Integer, nullable=False)
+ switch_ip = sa.Column(sa.String(255))
+ instance_id = sa.Column(sa.String(255))
def __repr__(self):
- return "<NexusPortBinding (%s,%d, %s, %s)>" % \
- (self.port_id, self.vlan_id, self.switch_ip, self.instance_id)
+ """Just the binding, without the id key."""
+ return ("<NexusPortBinding(%s,%s,%s,%s)>" %
+ (self.port_id, self.vlan_id, self.switch_ip, self.instance_id))
def __eq__(self, other):
+ """Compare only the binding, without the id key."""
return (
self.port_id == other.port_id and
self.vlan_id == other.vlan_id and
"""Remove VLAN SVI from the Nexus Switch."""
# Grab switch_ip from database
switch_ip = nxos_db.get_nexusvm_binding(vlan_id,
- router_id)['switch_ip']
+ router_id).switch_ip
# Delete the SVI interface from the switch
self._client.delete_vlan_svi(switch_ip, vlan_id)
auto_untrunk = conf.CISCO.provider_vlan_auto_trunk
LOG.debug("delete_network(): provider vlan %s" % vlan_id)
- switch_ip = row['switch_ip']
+ switch_ip = row.switch_ip
nexus_port = None
- if row['port_id'] != 'router':
- nexus_port = row['port_id']
+ if row.port_id != 'router':
+ nexus_port = row.port_id
- nxos_db.remove_nexusport_binding(row['port_id'], row['vlan_id'],
- row['switch_ip'],
- row['instance_id'])
+ nxos_db.remove_nexusport_binding(row.port_id, row.vlan_id,
+ row.switch_ip,
+ row.instance_id)
# Check for any other bindings with the same vlan_id and switch_ip
try:
- nxos_db.get_nexusvlan_binding(row['vlan_id'], row['switch_ip'])
+ nxos_db.get_nexusvlan_binding(row.vlan_id, row.switch_ip)
except cisco_exc.NexusPortBindingNotFound:
try:
# Delete this vlan from this switch
if nexus_port and auto_untrunk:
self._client.disable_vlan_on_trunk_int(
- switch_ip, row['vlan_id'], nexus_port)
+ switch_ip, row.vlan_id, nexus_port)
if auto_delete:
- self._client.delete_vlan(switch_ip, row['vlan_id'])
+ self._client.delete_vlan(switch_ip, row.vlan_id)
except Exception:
# The delete vlan operation on the Nexus failed,
# so this delete_port request has failed. For
# consistency, roll back the Nexus database to what
# it was before this request.
with excutils.save_and_reraise_exception():
- nxos_db.add_nexusport_binding(row['port_id'],
- row['vlan_id'],
- row['switch_ip'],
- row['instance_id'])
+ nxos_db.add_nexusport_binding(row.port_id,
+ row.vlan_id,
+ row.switch_ip,
+ row.instance_id)
- return row['instance_id']
+ return row.instance_id
def update_port(self, tenant_id, net_id, port_id, port_state, **kwargs):
"""Update port.
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import testtools
+
+from neutron.db import api as db
+from neutron.plugins.cisco.common import cisco_exceptions as c_exc
+from neutron.plugins.cisco.db import network_db_v2 as cdb
+from neutron.tests import base
+
+
+class CiscoNetworkQosDbTest(base.BaseTestCase):
+
+ """Unit tests for cisco.db.network_models_v2.QoS model."""
+
+ QosObj = collections.namedtuple('QosObj', 'tenant qname desc')
+
+ def setUp(self):
+ super(CiscoNetworkQosDbTest, self).setUp()
+ db.configure_db()
+ self.session = db.get_session()
+ self.addCleanup(db.clear_db)
+
+ def _qos_test_obj(self, tnum, qnum, desc=None):
+ """Create a Qos test object from a pair of numbers."""
+ if desc is None:
+ desc = 'test qos %s-%s' % (str(tnum), str(qnum))
+ tenant = 'tenant_%s' % str(tnum)
+ qname = 'qos_%s' % str(qnum)
+ return self.QosObj(tenant, qname, desc)
+
+ def _assert_equal(self, qos, qos_obj):
+ self.assertEqual(qos.tenant_id, qos_obj.tenant)
+ self.assertEqual(qos.qos_name, qos_obj.qname)
+ self.assertEqual(qos.qos_desc, qos_obj.desc)
+
+ def test_qos_add_remove(self):
+ qos11 = self._qos_test_obj(1, 1)
+ qos = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc)
+ self._assert_equal(qos, qos11)
+ qos_id = qos.qos_id
+ qos = cdb.remove_qos(qos11.tenant, qos_id)
+ self._assert_equal(qos, qos11)
+ qos = cdb.remove_qos(qos11.tenant, qos_id)
+ self.assertIsNone(qos)
+
+ def test_qos_add_dup(self):
+ qos22 = self._qos_test_obj(2, 2)
+ qos = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc)
+ self._assert_equal(qos, qos22)
+ qos_id = qos.qos_id
+ with testtools.ExpectedException(c_exc.QosNameAlreadyExists):
+ cdb.add_qos(qos22.tenant, qos22.qname, "duplicate 22")
+ qos = cdb.remove_qos(qos22.tenant, qos_id)
+ self._assert_equal(qos, qos22)
+ qos = cdb.remove_qos(qos22.tenant, qos_id)
+ self.assertIsNone(qos)
+
+ def test_qos_get(self):
+ qos11 = self._qos_test_obj(1, 1)
+ qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id
+ qos21 = self._qos_test_obj(2, 1)
+ qos21_id = cdb.add_qos(qos21.tenant, qos21.qname, qos21.desc).qos_id
+ qos22 = self._qos_test_obj(2, 2)
+ qos22_id = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc).qos_id
+
+ qos = cdb.get_qos(qos11.tenant, qos11_id)
+ self._assert_equal(qos, qos11)
+ qos = cdb.get_qos(qos21.tenant, qos21_id)
+ self._assert_equal(qos, qos21)
+ qos = cdb.get_qos(qos21.tenant, qos22_id)
+ self._assert_equal(qos, qos22)
+
+ with testtools.ExpectedException(c_exc.QosNotFound):
+ cdb.get_qos(qos11.tenant, "dummyQosId")
+ with testtools.ExpectedException(c_exc.QosNotFound):
+ cdb.get_qos(qos11.tenant, qos21_id)
+ with testtools.ExpectedException(c_exc.QosNotFound):
+ cdb.get_qos(qos21.tenant, qos11_id)
+
+ qos_all_t1 = cdb.get_all_qoss(qos11.tenant)
+ self.assertEqual(len(qos_all_t1), 1)
+ qos_all_t2 = cdb.get_all_qoss(qos21.tenant)
+ self.assertEqual(len(qos_all_t2), 2)
+ qos_all_t3 = cdb.get_all_qoss("tenant3")
+ self.assertEqual(len(qos_all_t3), 0)
+
+ def test_qos_update(self):
+ qos11 = self._qos_test_obj(1, 1)
+ qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id
+ cdb.update_qos(qos11.tenant, qos11_id)
+ new_qname = "new qos name"
+ new_qos = cdb.update_qos(qos11.tenant, qos11_id, new_qname)
+ expected_qobj = self.QosObj(qos11.tenant, new_qname, qos11.desc)
+ self._assert_equal(new_qos, expected_qobj)
+ new_qos = cdb.get_qos(qos11.tenant, qos11_id)
+ self._assert_equal(new_qos, expected_qobj)
+ with testtools.ExpectedException(c_exc.QosNotFound):
+ cdb.update_qos(qos11.tenant, "dummyQosId")
+
+
+class CiscoNetworkCredentialDbTest(base.BaseTestCase):
+
+ """Unit tests for cisco.db.network_models_v2.Credential model."""
+
+ CredObj = collections.namedtuple('CredObj', 'tenant cname usr pwd')
+
+ def setUp(self):
+ super(CiscoNetworkCredentialDbTest, self).setUp()
+ db.configure_db()
+ self.session = db.get_session()
+ self.addCleanup(db.clear_db)
+
+ def _cred_test_obj(self, tnum, cnum):
+ """Create a Credential test object from a pair of numbers."""
+ tenant = 'tenant_%s' % str(tnum)
+ cname = 'credential_%s' % str(cnum)
+ usr = 'User_%s_%s' % (str(tnum), str(cnum))
+ pwd = 'Password_%s_%s' % (str(tnum), str(cnum))
+ return self.CredObj(tenant, cname, usr, pwd)
+
+ def _assert_equal(self, credential, cred_obj):
+ self.assertEqual(credential.tenant_id, cred_obj.tenant)
+ self.assertEqual(credential.credential_name, cred_obj.cname)
+ self.assertEqual(credential.user_name, cred_obj.usr)
+ self.assertEqual(credential.password, cred_obj.pwd)
+
+ def test_credential_add_remove(self):
+ cred11 = self._cred_test_obj(1, 1)
+ cred = cdb.add_credential(
+ cred11.tenant, cred11.cname, cred11.usr, cred11.pwd)
+ self._assert_equal(cred, cred11)
+ cred_id = cred.credential_id
+ cred = cdb.remove_credential(cred11.tenant, cred_id)
+ self._assert_equal(cred, cred11)
+ cred = cdb.remove_credential(cred11.tenant, cred_id)
+ self.assertIsNone(cred)
+
+ def test_credential_add_dup(self):
+ cred22 = self._cred_test_obj(2, 2)
+ cred = cdb.add_credential(
+ cred22.tenant, cred22.cname, cred22.usr, cred22.pwd)
+ self._assert_equal(cred, cred22)
+ cred_id = cred.credential_id
+ with testtools.ExpectedException(c_exc.CredentialAlreadyExists):
+ cdb.add_credential(
+ cred22.tenant, cred22.cname, cred22.usr, cred22.pwd)
+ cred = cdb.remove_credential(cred22.tenant, cred_id)
+ self._assert_equal(cred, cred22)
+ cred = cdb.remove_credential(cred22.tenant, cred_id)
+ self.assertIsNone(cred)
+
+ def test_credential_get_id(self):
+ cred11 = self._cred_test_obj(1, 1)
+ cred11_id = cdb.add_credential(
+ cred11.tenant, cred11.cname, cred11.usr, cred11.pwd).credential_id
+ cred21 = self._cred_test_obj(2, 1)
+ cred21_id = cdb.add_credential(
+ cred21.tenant, cred21.cname, cred21.usr, cred21.pwd).credential_id
+ cred22 = self._cred_test_obj(2, 2)
+ cred22_id = cdb.add_credential(
+ cred22.tenant, cred22.cname, cred22.usr, cred22.pwd).credential_id
+
+ cred = cdb.get_credential(cred11.tenant, cred11_id)
+ self._assert_equal(cred, cred11)
+ cred = cdb.get_credential(cred21.tenant, cred21_id)
+ self._assert_equal(cred, cred21)
+ cred = cdb.get_credential(cred21.tenant, cred22_id)
+ self._assert_equal(cred, cred22)
+
+ with testtools.ExpectedException(c_exc.CredentialNotFound):
+ cdb.get_credential(cred11.tenant, "dummyCredentialId")
+ with testtools.ExpectedException(c_exc.CredentialNotFound):
+ cdb.get_credential(cred11.tenant, cred21_id)
+ with testtools.ExpectedException(c_exc.CredentialNotFound):
+ cdb.get_credential(cred21.tenant, cred11_id)
+
+ cred_all_t1 = cdb.get_all_credentials(cred11.tenant)
+ self.assertEqual(len(cred_all_t1), 1)
+ cred_all_t2 = cdb.get_all_credentials(cred21.tenant)
+ self.assertEqual(len(cred_all_t2), 2)
+ cred_all_t3 = cdb.get_all_credentials("dummyTenant")
+ self.assertEqual(len(cred_all_t3), 0)
+
+ def test_credential_get_name(self):
+ cred11 = self._cred_test_obj(1, 1)
+ cred11_id = cdb.add_credential(
+ cred11.tenant, cred11.cname, cred11.usr, cred11.pwd).credential_id
+ cred21 = self._cred_test_obj(2, 1)
+ cred21_id = cdb.add_credential(
+ cred21.tenant, cred21.cname, cred21.usr, cred21.pwd).credential_id
+ cred22 = self._cred_test_obj(2, 2)
+ cred22_id = cdb.add_credential(
+ cred22.tenant, cred22.cname, cred22.usr, cred22.pwd).credential_id
+ self.assertNotEqual(cred11_id, cred21_id)
+ self.assertNotEqual(cred11_id, cred22_id)
+ self.assertNotEqual(cred21_id, cred22_id)
+
+ cred = cdb.get_credential_name(cred11.tenant, cred11.cname)
+ self._assert_equal(cred, cred11)
+ cred = cdb.get_credential_name(cred21.tenant, cred21.cname)
+ self._assert_equal(cred, cred21)
+ cred = cdb.get_credential_name(cred22.tenant, cred22.cname)
+ self._assert_equal(cred, cred22)
+
+ with testtools.ExpectedException(c_exc.CredentialNameNotFound):
+ cdb.get_credential_name(cred11.tenant, "dummyCredentialName")
+ with testtools.ExpectedException(c_exc.CredentialNameNotFound):
+ cdb.get_credential_name(cred11.tenant, cred22.cname)
+
+ def test_credential_update(self):
+ cred11 = self._cred_test_obj(1, 1)
+ cred11_id = cdb.add_credential(
+ cred11.tenant, cred11.cname, cred11.usr, cred11.pwd).credential_id
+ cdb.update_credential(cred11.tenant, cred11_id)
+ new_usr = "new user name"
+ new_pwd = "new password"
+ new_credential = cdb.update_credential(
+ cred11.tenant, cred11_id, new_usr, new_pwd)
+ expected_cred = self.CredObj(
+ cred11.tenant, cred11.cname, new_usr, new_pwd)
+ self._assert_equal(new_credential, expected_cred)
+ new_credential = cdb.get_credential(cred11.tenant, cred11_id)
+ self._assert_equal(new_credential, expected_cred)
+ with testtools.ExpectedException(c_exc.CredentialNotFound):
+ cdb.update_credential(
+ cred11.tenant, "dummyCredentialId", new_usr, new_pwd)
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import testtools
+
+from neutron.db import api as db
+from neutron.plugins.cisco.common import cisco_exceptions as c_exc
+from neutron.plugins.cisco.db import nexus_db_v2 as nxdb
+from neutron.tests import base
+
+
+class CiscoNexusDbTest(base.BaseTestCase):
+
+ """Unit tests for cisco.db.nexus_models_v2.NexusPortBinding model."""
+
+ NpbObj = collections.namedtuple('NpbObj', 'port vlan switch instance')
+
+ def setUp(self):
+ super(CiscoNexusDbTest, self).setUp()
+ db.configure_db()
+ self.session = db.get_session()
+ self.addCleanup(db.clear_db)
+
+ def _npb_test_obj(self, pnum, vnum, switch=None, instance=None):
+ """Create a Nexus port binding test object from a pair of numbers."""
+ if pnum is 'router':
+ port = pnum
+ else:
+ port = '1/%s' % str(pnum)
+ vlan = str(vnum)
+ if switch is None:
+ switch = '10.9.8.7'
+ if instance is None:
+ instance = 'instance_%s_%s' % (str(pnum), str(vnum))
+ return self.NpbObj(port, vlan, switch, instance)
+
+ def _assert_equal(self, npb, npb_obj):
+ self.assertEqual(npb.port_id, npb_obj.port)
+ self.assertEqual(int(npb.vlan_id), int(npb_obj.vlan))
+ self.assertEqual(npb.switch_ip, npb_obj.switch)
+ self.assertEqual(npb.instance_id, npb_obj.instance)
+
+ def _add_to_db(self, npbs):
+ for npb in npbs:
+ nxdb.add_nexusport_binding(
+ npb.port, npb.vlan, npb.switch, npb.instance)
+
+ def test_nexusportbinding_add_remove(self):
+ npb11 = self._npb_test_obj(10, 100)
+ npb = nxdb.add_nexusport_binding(
+ npb11.port, npb11.vlan, npb11.switch, npb11.instance)
+ self._assert_equal(npb, npb11)
+ npb = nxdb.remove_nexusport_binding(
+ npb11.port, npb11.vlan, npb11.switch, npb11.instance)
+ self.assertEqual(len(npb), 1)
+ self._assert_equal(npb[0], npb11)
+ with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+ nxdb.remove_nexusport_binding(
+ npb11.port, npb11.vlan, npb11.switch, npb11.instance)
+
+ def test_nexusportbinding_get(self):
+ npb11 = self._npb_test_obj(10, 100)
+ npb21 = self._npb_test_obj(20, 100)
+ npb22 = self._npb_test_obj(20, 200)
+ self._add_to_db([npb11, npb21, npb22])
+
+ npb = nxdb.get_nexusport_binding(
+ npb11.port, npb11.vlan, npb11.switch, npb11.instance)
+ self.assertEqual(len(npb), 1)
+ self._assert_equal(npb[0], npb11)
+ npb = nxdb.get_nexusport_binding(
+ npb21.port, npb21.vlan, npb21.switch, npb21.instance)
+ self.assertEqual(len(npb), 1)
+ self._assert_equal(npb[0], npb21)
+ npb = nxdb.get_nexusport_binding(
+ npb22.port, npb22.vlan, npb22.switch, npb22.instance)
+ self.assertEqual(len(npb), 1)
+ self._assert_equal(npb[0], npb22)
+
+ with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+ nxdb.get_nexusport_binding(
+ npb21.port, npb21.vlan, npb21.switch, "dummyInstance")
+
+ def test_nexusvlanbinding_get(self):
+ npb11 = self._npb_test_obj(10, 100)
+ npb21 = self._npb_test_obj(20, 100)
+ npb22 = self._npb_test_obj(20, 200)
+ self._add_to_db([npb11, npb21, npb22])
+
+ npb_all_v100 = nxdb.get_nexusvlan_binding(npb11.vlan, npb11.switch)
+ self.assertEqual(len(npb_all_v100), 2)
+ npb_v200 = nxdb.get_nexusvlan_binding(npb22.vlan, npb22.switch)
+ self.assertEqual(len(npb_v200), 1)
+ self._assert_equal(npb_v200[0], npb22)
+
+ with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+ nxdb.get_nexusvlan_binding(npb21.vlan, "dummySwitch")
+
+ def test_nexusvmbinding_get(self):
+ npb11 = self._npb_test_obj(10, 100)
+ npb21 = self._npb_test_obj(20, 100)
+ npb22 = self._npb_test_obj(20, 200)
+ self._add_to_db([npb11, npb21, npb22])
+
+ npb = nxdb.get_nexusvm_binding(npb21.vlan, npb21.instance)
+ self._assert_equal(npb, npb21)
+ npb = nxdb.get_nexusvm_binding(npb22.vlan, npb22.instance)
+ self._assert_equal(npb, npb22)
+
+ with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+ nxdb.get_nexusvm_binding(npb21.vlan, "dummyInstance")
+
+ def test_nexusportvlanswitchbinding_get(self):
+ npb11 = self._npb_test_obj(10, 100)
+ npb21 = self._npb_test_obj(20, 100)
+ self._add_to_db([npb11, npb21])
+
+ npb = nxdb.get_port_vlan_switch_binding(
+ npb11.port, npb11.vlan, npb11.switch)
+ self.assertEqual(len(npb), 1)
+ self._assert_equal(npb[0], npb11)
+
+ with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+ nxdb.get_port_vlan_switch_binding(
+ npb21.port, npb21.vlan, "dummySwitch")
+
+ def test_nexusportswitchbinding_get(self):
+ npb11 = self._npb_test_obj(10, 100)
+ npb21 = self._npb_test_obj(20, 100, switch='2.2.2.2')
+ npb22 = self._npb_test_obj(20, 200, switch='2.2.2.2')
+ self._add_to_db([npb11, npb21, npb22])
+
+ npb = nxdb.get_port_switch_bindings(npb11.port, npb11.switch)
+ self.assertEqual(len(npb), 1)
+ self._assert_equal(npb[0], npb11)
+ npb_all_p20 = nxdb.get_port_switch_bindings(npb21.port, npb21.switch)
+ self.assertEqual(len(npb_all_p20), 2)
+
+ npb = nxdb.get_port_switch_bindings(npb21.port, "dummySwitch")
+ self.assertIsNone(npb)
+
+ def test_nexussvibinding_get(self):
+ npbr1 = self._npb_test_obj('router', 100)
+ npb21 = self._npb_test_obj(20, 100)
+ self._add_to_db([npbr1, npb21])
+
+ npb_svi = nxdb.get_nexussvi_bindings()
+ self.assertEqual(len(npb_svi), 1)
+ self._assert_equal(npb_svi[0], npbr1)
+
+ npbr2 = self._npb_test_obj('router', 200)
+ self._add_to_db([npbr2])
+ npb_svi = nxdb.get_nexussvi_bindings()
+ self.assertEqual(len(npb_svi), 2)
+
+ def test_nexusbinding_update(self):
+ npb11 = self._npb_test_obj(10, 100, switch='1.1.1.1', instance='test')
+ npb21 = self._npb_test_obj(20, 100, switch='1.1.1.1', instance='test')
+ self._add_to_db([npb11, npb21])
+
+ npb_all_v100 = nxdb.get_nexusvlan_binding(npb11.vlan, '1.1.1.1')
+ self.assertEqual(len(npb_all_v100), 2)
+
+ npb22 = self._npb_test_obj(20, 200, switch='1.1.1.1', instance='test')
+ npb = nxdb.update_nexusport_binding(npb21.port, 200)
+ self._assert_equal(npb, npb22)
+
+ npb_all_v100 = nxdb.get_nexusvlan_binding(npb11.vlan, '1.1.1.1')
+ self.assertEqual(len(npb_all_v100), 1)
+ self._assert_equal(npb_all_v100[0], npb11)
+
+ npb = nxdb.update_nexusport_binding(npb21.port, 0)
+ self.assertIsNone(npb)
+
+ npb33 = self._npb_test_obj(30, 300, switch='1.1.1.1', instance='test')
+ with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+ nxdb.update_nexusport_binding(npb33.port, 200)