]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Cisco plugin db code cleanup, part II
authorHenryGessau <gessau@cisco.com>
Mon, 29 Jul 2013 04:08:50 +0000 (00:08 -0400)
committerHenryGessau <gessau@cisco.com>
Tue, 6 Aug 2013 13:05:17 +0000 (09:05 -0400)
Remove an unused table.
Prefix Cisco tablenames with cisco_.
Make full use of neutron model base class.
DRY out the nexusport binding queries.
Follow coding guidelines for imports.
Improve cisco/db/network_db_v2.py test coverage.
Improve cisco/db/nexus_db_v2.py test coverage.

Change-Id: I8ea110de37552176f2d9bcbff4ca3e0b65dfacdc
Closes-bug: #1202687
Partial-bug: #1190619

neutron/db/migration/alembic_migrations/versions/263772d65691_cisco_db_cleanup_2.py [new file with mode: 0644]
neutron/plugins/cisco/common/cisco_exceptions.py
neutron/plugins/cisco/db/network_db_v2.py
neutron/plugins/cisco/db/network_models_v2.py
neutron/plugins/cisco/db/nexus_db_v2.py
neutron/plugins/cisco/db/nexus_models_v2.py
neutron/plugins/cisco/nexus/cisco_nexus_plugin_v2.py
neutron/tests/unit/cisco/test_network_db.py [new file with mode: 0644]
neutron/tests/unit/cisco/test_nexus_db.py [new file with mode: 0644]

diff --git a/neutron/db/migration/alembic_migrations/versions/263772d65691_cisco_db_cleanup_2.py b/neutron/db/migration/alembic_migrations/versions/263772d65691_cisco_db_cleanup_2.py
new file mode 100644 (file)
index 0000000..646cbac
--- /dev/null
@@ -0,0 +1,72 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""Cisco plugin db cleanup part II
+
+Revision ID: 263772d65691
+Revises: 35c7c198ddea
+Create Date: 2013-07-29 02:31:26.646343
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '263772d65691'
+down_revision = '35c7c198ddea'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+    'neutron.plugins.cisco.network_plugin.PluginV2'
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+from neutron.db import migration
+
+
+def upgrade(active_plugin=None, options=None):
+    if not migration.should_run(active_plugin, migration_for_plugins):
+        return
+
+    if 'credentials' in sa.MetaData().tables:
+        op.rename_table('credentials', 'cisco_credentials')
+    if 'nexusport_bindings' in sa.MetaData().tables:
+        op.rename_table('nexusport_bindings', 'cisco_nexusport_bindings')
+    if 'qoss' in sa.MetaData().tables:
+        op.rename_table('qoss', 'cisco_qos_policies')
+
+    op.drop_table('cisco_vlan_ids')
+
+
+def downgrade(active_plugin=None, options=None):
+    if not migration.should_run(active_plugin, migration_for_plugins):
+        return
+
+    op.create_table(
+        'cisco_vlan_ids',
+        sa.Column('vlan_id', sa.Integer, nullable=False),
+        sa.Column('vlan_used', sa.Boolean),
+        sa.PrimaryKeyConstraint('vlan_id'),
+    )
+
+    if 'cisco_credentials' in sa.MetaData().tables:
+        op.rename_table('cisco_credentials', 'credentials')
+    if 'cisco_nexusport_bindings' in sa.MetaData().tables:
+        op.rename_table('cisco_nexusport_bindings', 'nexusport_bindings')
+    if 'cisco_qos_policies' in sa.MetaData().tables:
+        op.rename_table('cisco_qos_policies', 'qoss')
index 37dc52ab3e79f45f1674d16f9ac7103b85537b20..f9610a4698d3d325a1b30547d262b171b3ab2828 100644 (file)
@@ -74,8 +74,8 @@ class CredentialNameNotFound(exceptions.NeutronException):
 
 
 class CredentialAlreadyExists(exceptions.NeutronException):
-    """Credential ID already exists."""
-    message = _("Credential %(credential_id)s already exists "
+    """Credential already exists."""
+    message = _("Credential %(credential_name)s already exists "
                 "for tenant %(tenant_id)s")
 
 
index 6be48ec4d8abe5addc29377345be69a8577d6a9d..00ccdc31e04511912ea4b0f9158b956166688ddc 100644 (file)
@@ -20,90 +20,19 @@ from sqlalchemy.orm import exc
 
 from neutron.db import api as db
 from neutron.openstack.common import log as logging
+from neutron.openstack.common import uuidutils
 from neutron.plugins.cisco.common import cisco_constants as const
 from neutron.plugins.cisco.common import cisco_exceptions as c_exc
 from neutron.plugins.cisco.db import network_models_v2
+# Do NOT remove this import. It is required for all the models to be seen
+# by db.initalize() when called from VirtualPhysicalSwitchModelV2.__init__.
+from neutron.plugins.cisco.db import nexus_models_v2  # noqa
 from neutron.plugins.openvswitch import ovs_models_v2
 
 
 LOG = logging.getLogger(__name__)
 
 
-def get_all_vlanids():
-    """Gets all the vlanids."""
-    LOG.debug(_("get_all_vlanids() called"))
-    session = db.get_session()
-    return session.query(network_models_v2.VlanID).all()
-
-
-def is_vlanid_used(vlan_id):
-    """Checks if a vlanid is in use."""
-    LOG.debug(_("is_vlanid_used() called"))
-    session = db.get_session()
-    try:
-        vlanid = (session.query(network_models_v2.VlanID).
-                  filter_by(vlan_id=vlan_id).one())
-        return vlanid["vlan_used"]
-    except exc.NoResultFound:
-        raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
-
-
-def release_vlanid(vlan_id):
-    """Sets the vlanid state to be unused."""
-    LOG.debug(_("release_vlanid() called"))
-    session = db.get_session()
-    try:
-        vlanid = (session.query(network_models_v2.VlanID).
-                  filter_by(vlan_id=vlan_id).one())
-        vlanid["vlan_used"] = False
-        session.merge(vlanid)
-        session.flush()
-        return vlanid["vlan_used"]
-    except exc.NoResultFound:
-        raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
-
-
-def delete_vlanid(vlan_id):
-    """Deletes a vlanid entry from db."""
-    LOG.debug(_("delete_vlanid() called"))
-    session = db.get_session()
-    try:
-        vlanid = (session.query(network_models_v2.VlanID).
-                  filter_by(vlan_id=vlan_id).one())
-        session.delete(vlanid)
-        session.flush()
-        return vlanid
-    except exc.NoResultFound:
-        pass
-
-
-def reserve_vlanid():
-    """Reserves the first unused vlanid."""
-    LOG.debug(_("reserve_vlanid() called"))
-    session = db.get_session()
-    try:
-        rvlan = (session.query(network_models_v2.VlanID).
-                 filter_by(vlan_used=False).first())
-        if not rvlan:
-            raise exc.NoResultFound
-        rvlanid = (session.query(network_models_v2.VlanID).
-                   filter_by(vlan_id=rvlan["vlan_id"]).one())
-        rvlanid["vlan_used"] = True
-        session.merge(rvlanid)
-        session.flush()
-        return rvlan["vlan_id"]
-    except exc.NoResultFound:
-        raise c_exc.VlanIDNotAvailable()
-
-
-def get_all_vlanids_used():
-    """Gets all the vlanids used."""
-    LOG.debug(_("get_all_vlanids() called"))
-    session = db.get_session()
-    return (session.query(network_models_v2.VlanID).
-            filter_by(vlan_used=True).all())
-
-
 def get_all_qoss(tenant_id):
     """Lists all the qos to tenant associations."""
     LOG.debug(_("get_all_qoss() called"))
@@ -137,7 +66,10 @@ def add_qos(tenant_id, qos_name, qos_desc):
         raise c_exc.QosNameAlreadyExists(qos_name=qos_name,
                                          tenant_id=tenant_id)
     except exc.NoResultFound:
-        qos = network_models_v2.QoS(tenant_id, qos_name, qos_desc)
+        qos = network_models_v2.QoS(qos_id=uuidutils.generate_uuid(),
+                                    tenant_id=tenant_id,
+                                    qos_name=qos_name,
+                                    qos_desc=qos_desc)
         session.add(qos)
         session.flush()
         return qos
@@ -217,8 +149,12 @@ def add_credential(tenant_id, credential_name, user_name, password):
         raise c_exc.CredentialAlreadyExists(credential_name=credential_name,
                                             tenant_id=tenant_id)
     except exc.NoResultFound:
-        cred = network_models_v2.Credential(tenant_id, credential_name,
-                                            user_name, password)
+        cred = network_models_v2.Credential(
+            credential_id=uuidutils.generate_uuid(),
+            tenant_id=tenant_id,
+            credential_name=credential_name,
+            user_name=user_name,
+            password=password)
         session.add(cred)
         session.flush()
         return cred
index 65a8a96f71e63cc137937febd4b6f12e2acf295c..a3e751d3af29c41e706be8762a0f836d56e6af9a 100644 (file)
 #
 # @author: Rohit Agarwalla, Cisco Systems, Inc.
 
-from sqlalchemy import Column, ForeignKey, Integer, String, Boolean
+import sqlalchemy as sa
 
 from neutron.db import model_base
-from neutron.openstack.common import uuidutils
-
-
-class VlanID(model_base.BASEV2):
-    """Represents a vlan_id usage."""
-    __tablename__ = 'cisco_vlan_ids'
-
-    vlan_id = Column(Integer, primary_key=True)
-    vlan_used = Column(Boolean)
-
-    def __init__(self, vlan_id):
-        self.vlan_id = vlan_id
-        self.vlan_used = False
-
-    def __repr__(self):
-        return "<VlanID(%d,%s)>" % (self.vlan_id, self.vlan_used)
 
 
 class QoS(model_base.BASEV2):
-    """Represents QoS for a tenant."""
+    """Represents QoS policies for a tenant."""
 
-    __tablename__ = 'qoss'
+    __tablename__ = 'cisco_qos_policies'
 
-    qos_id = Column(String(255))
-    tenant_id = Column(String(255), primary_key=True)
-    qos_name = Column(String(255), primary_key=True)
-    qos_desc = Column(String(255))
-
-    def __init__(self, tenant_id, qos_name, qos_desc):
-        self.qos_id = uuidutils.generate_uuid()
-        self.tenant_id = tenant_id
-        self.qos_name = qos_name
-        self.qos_desc = qos_desc
-
-    def __repr__(self):
-        return "<QoS(%s,%s,%s,%s)>" % (self.qos_id, self.tenant_id,
-                                       self.qos_name, self.qos_desc)
+    qos_id = sa.Column(sa.String(255))
+    tenant_id = sa.Column(sa.String(255), primary_key=True)
+    qos_name = sa.Column(sa.String(255), primary_key=True)
+    qos_desc = sa.Column(sa.String(255))
 
 
 class Credential(model_base.BASEV2):
-    """Represents credentials for a tenant."""
-
-    __tablename__ = 'credentials'
-
-    credential_id = Column(String(255))
-    tenant_id = Column(String(255), primary_key=True)
-    credential_name = Column(String(255), primary_key=True)
-    user_name = Column(String(255))
-    password = Column(String(255))
+    """Represents credentials for a tenant to control Cisco switches."""
 
-    def __init__(self, tenant_id, credential_name, user_name, password):
-        self.credential_id = uuidutils.generate_uuid()
-        self.tenant_id = tenant_id
-        self.credential_name = credential_name
-        self.user_name = user_name
-        self.password = password
+    __tablename__ = 'cisco_credentials'
 
-    def __repr__(self):
-        return "<Credentials(%s,%s,%s,%s,%s)>" % (self.credential_id,
-                                                  self.tenant_id,
-                                                  self.credential_name,
-                                                  self.user_name,
-                                                  self.password)
+    credential_id = sa.Column(sa.String(255))
+    tenant_id = sa.Column(sa.String(255), primary_key=True)
+    credential_name = sa.Column(sa.String(255), primary_key=True)
+    user_name = sa.Column(sa.String(255))
+    password = sa.Column(sa.String(255))
 
 
 class ProviderNetwork(model_base.BASEV2):
@@ -89,8 +49,8 @@ class ProviderNetwork(model_base.BASEV2):
 
     __tablename__ = 'cisco_provider_networks'
 
-    network_id = Column(String(36),
-                        ForeignKey('networks.id', ondelete="CASCADE"),
-                        primary_key=True)
-    network_type = Column(String(255), nullable=False)
-    segmentation_id = Column(Integer, nullable=False)
+    network_id = sa.Column(sa.String(36),
+                           sa.ForeignKey('networks.id', ondelete="CASCADE"),
+                           primary_key=True)
+    network_type = sa.Column(sa.String(255), nullable=False)
+    segmentation_id = sa.Column(sa.Integer, nullable=False)
index 8c99d7ae2deb6dddd52d3a012244f0bdcc09f7e6..6ee7ca91119847557c7670c72557f8c3600e15c5 100644 (file)
@@ -18,7 +18,7 @@
 # @author: Arvind Somya, Cisco Systems, Inc. (asomya@cisco.com)
 #
 
-from sqlalchemy.orm import exc
+import sqlalchemy.orm.exc as sa_exc
 
 import neutron.db.api as db
 from neutron.openstack.common import log as logging
@@ -29,48 +29,29 @@ from neutron.plugins.cisco.db import nexus_models_v2
 LOG = logging.getLogger(__name__)
 
 
-def get_all_nexusport_bindings():
-    """Lists all the nexusport bindings."""
-    LOG.debug(_("get_all_nexusport_bindings() called"))
-    session = db.get_session()
-    return session.query(nexus_models_v2.NexusPortBinding).all()
-
-
 def get_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
     """Lists a nexusport binding."""
     LOG.debug(_("get_nexusport_binding() called"))
-    session = db.get_session()
-
-    filters = dict(port_id=port_id, vlan_id=vlan_id, switch_ip=switch_ip,
-                   instance_id=instance_id)
-    bindings = (session.query(nexus_models_v2.NexusPortBinding).
-                filter_by(**filters).all())
-    if not bindings:
-        raise c_exc.NexusPortBindingNotFound(**filters)
-
-    return bindings
+    return _lookup_all_nexus_bindings(port_id=port_id,
+                                      vlan_id=vlan_id,
+                                      switch_ip=switch_ip,
+                                      instance_id=instance_id)
 
 
 def get_nexusvlan_binding(vlan_id, switch_ip):
     """Lists a vlan and switch binding."""
     LOG.debug(_("get_nexusvlan_binding() called"))
-    session = db.get_session()
-
-    filters = dict(vlan_id=vlan_id, switch_ip=switch_ip)
-    bindings = (session.query(nexus_models_v2.NexusPortBinding).
-                filter_by(**filters).all())
-    if not bindings:
-        raise c_exc.NexusPortBindingNotFound(**filters)
-
-    return bindings
+    return _lookup_all_nexus_bindings(vlan_id=vlan_id, switch_ip=switch_ip)
 
 
 def add_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
     """Adds a nexusport binding."""
     LOG.debug(_("add_nexusport_binding() called"))
     session = db.get_session()
-    binding = nexus_models_v2.NexusPortBinding(
-        port_id, vlan_id, switch_ip, instance_id)
+    binding = nexus_models_v2.NexusPortBinding(port_id=port_id,
+                                               vlan_id=vlan_id,
+                                               switch_ip=switch_ip,
+                                               instance_id=instance_id)
     session.add(binding)
     session.flush()
     return binding
@@ -80,11 +61,11 @@ def remove_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
     """Removes a nexusport binding."""
     LOG.debug(_("remove_nexusport_binding() called"))
     session = db.get_session()
-    binding = (session.query(nexus_models_v2.NexusPortBinding).
-               filter_by(vlan_id=vlan_id).filter_by(switch_ip=switch_ip).
-               filter_by(port_id=port_id).
-               filter_by(instance_id=instance_id).all())
-
+    binding = _lookup_all_nexus_bindings(session=session,
+                                         vlan_id=vlan_id,
+                                         switch_ip=switch_ip,
+                                         port_id=port_id,
+                                         instance_id=instance_id)
     for bind in binding:
         session.delete(bind)
     session.flush()
@@ -93,46 +74,31 @@ def remove_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
 
 def update_nexusport_binding(port_id, new_vlan_id):
     """Updates nexusport binding."""
+    if not new_vlan_id:
+        LOG.warning(_("update_nexusport_binding called with no vlan"))
+        return
     LOG.debug(_("update_nexusport_binding called"))
     session = db.get_session()
-    try:
-        binding = (session.query(nexus_models_v2.NexusPortBinding).
-                   filter_by(port_id=port_id).one())
-        if new_vlan_id:
-            binding["vlan_id"] = new_vlan_id
-        session.merge(binding)
-        session.flush()
-        return binding
-    except exc.NoResultFound:
-        raise c_exc.NexusPortBindingNotFound(port_id=port_id)
+    binding = _lookup_one_nexus_binding(session=session, port_id=port_id)
+    binding.vlan_id = new_vlan_id
+    session.merge(binding)
+    session.flush()
+    return binding
 
 
 def get_nexusvm_binding(vlan_id, instance_id):
     """Lists nexusvm bindings."""
     LOG.debug(_("get_nexusvm_binding() called"))
-    session = db.get_session()
-
-    filters = dict(instance_id=instance_id, vlan_id=vlan_id)
-    binding = (session.query(nexus_models_v2.NexusPortBinding).
-               filter_by(**filters).first())
-    if not binding:
-        raise c_exc.NexusPortBindingNotFound(**filters)
-
-    return binding
+    return _lookup_first_nexus_binding(instance_id=instance_id,
+                                       vlan_id=vlan_id)
 
 
 def get_port_vlan_switch_binding(port_id, vlan_id, switch_ip):
     """Lists nexusvm bindings."""
     LOG.debug(_("get_port_vlan_switch_binding() called"))
-    session = db.get_session()
-
-    filters = dict(port_id=port_id, switch_ip=switch_ip, vlan_id=vlan_id)
-    bindings = (session.query(nexus_models_v2.NexusPortBinding).
-                filter_by(**filters).all())
-    if not bindings:
-        raise c_exc.NexusPortBindingNotFound(**filters)
-
-    return bindings
+    return _lookup_all_nexus_bindings(port_id=port_id,
+                                      switch_ip=switch_ip,
+                                      vlan_id=vlan_id)
 
 
 def get_port_switch_bindings(port_id, switch_ip):
@@ -140,25 +106,48 @@ def get_port_switch_bindings(port_id, switch_ip):
     LOG.debug(_("get_port_switch_bindings() called, "
                 "port:'%(port_id)s', switch:'%(switch_ip)s'"),
               {'port_id': port_id, 'switch_ip': switch_ip})
-    session = db.get_session()
     try:
-        binding = (session.query(nexus_models_v2.NexusPortBinding).
-                   filter_by(port_id=port_id).
-                   filter_by(switch_ip=switch_ip).all())
-        return binding
-    except exc.NoResultFound:
-        return
+        return _lookup_all_nexus_bindings(port_id=port_id,
+                                          switch_ip=switch_ip)
+    except c_exc.NexusPortBindingNotFound:
+        pass
 
 
 def get_nexussvi_bindings():
     """Lists nexus svi bindings."""
     LOG.debug(_("get_nexussvi_bindings() called"))
-    session = db.get_session()
+    return _lookup_all_nexus_bindings(port_id='router')
+
+
+def _lookup_nexus_bindings(query_type, session=None, **bfilter):
+    """Look up 'query_type' Nexus bindings matching the filter.
+
+    :param query_type: 'all', 'one' or 'first'
+    :param session: db session
+    :param bfilter: filter for bindings query
+    :return: bindings if query gave a result, else
+             raise NexusPortBindingNotFound.
+    """
+    if session is None:
+        session = db.get_session()
+    query_method = getattr(session.query(
+        nexus_models_v2.NexusPortBinding).filter_by(**bfilter), query_type)
+    try:
+        bindings = query_method()
+        if bindings:
+            return bindings
+    except sa_exc.NoResultFound:
+        pass
+    raise c_exc.NexusPortBindingNotFound(**bfilter)
+
+
+def _lookup_all_nexus_bindings(session=None, **bfilter):
+    return _lookup_nexus_bindings('all', session, **bfilter)
+
+
+def _lookup_one_nexus_binding(session=None, **bfilter):
+    return _lookup_nexus_bindings('one', session, **bfilter)
 
-    filters = {'port_id': 'router'}
-    bindings = (session.query(nexus_models_v2.NexusPortBinding).
-                filter_by(**filters).all())
-    if not bindings:
-        raise c_exc.NexusPortBindingNotFound(**filters)
 
-    return bindings
+def _lookup_first_nexus_binding(session=None, **bfilter):
+    return _lookup_nexus_bindings('first', session, **bfilter)
index b690f928ce709d5b9f79b0c90af92eb5334214dc..bbfaece42cda0b21f15767371c4726cf28b8d01f 100644 (file)
@@ -15,7 +15,7 @@
 #    under the License.
 # @author: Rohit Agarwalla, Cisco Systems, Inc.
 
-from sqlalchemy import Column, Integer, String
+import sqlalchemy as sa
 
 from neutron.db import model_base
 
@@ -23,25 +23,21 @@ from neutron.db import model_base
 class NexusPortBinding(model_base.BASEV2):
     """Represents a binding of VM's to nexus ports."""
 
-    __tablename__ = "nexusport_bindings"
+    __tablename__ = "cisco_nexusport_bindings"
 
-    id = Column(Integer, primary_key=True, autoincrement=True)
-    port_id = Column(String(255))
-    vlan_id = Column(Integer, nullable=False)
-    switch_ip = Column(String(255))
-    instance_id = Column(String(255))
-
-    def __init__(self, port_id, vlan_id, switch_ip, instance_id):
-        self.port_id = port_id
-        self.vlan_id = vlan_id
-        self.switch_ip = switch_ip
-        self.instance_id = instance_id
+    id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
+    port_id = sa.Column(sa.String(255))
+    vlan_id = sa.Column(sa.Integer, nullable=False)
+    switch_ip = sa.Column(sa.String(255))
+    instance_id = sa.Column(sa.String(255))
 
     def __repr__(self):
-        return "<NexusPortBinding (%s,%d, %s, %s)>" % \
-            (self.port_id, self.vlan_id, self.switch_ip, self.instance_id)
+        """Just the binding, without the id key."""
+        return ("<NexusPortBinding(%s,%s,%s,%s)>" %
+                (self.port_id, self.vlan_id, self.switch_ip, self.instance_id))
 
     def __eq__(self, other):
+        """Compare only the binding, without the id key."""
         return (
             self.port_id == other.port_id and
             self.vlan_id == other.vlan_id and
index ac40faf538cc352a7092734a4f2a8dca465f019f..68b8bbf122faebf9d4969a584d9ca6b13a3ea1ef 100644 (file)
@@ -177,7 +177,7 @@ class NexusPlugin(L2DevicePluginBase):
         """Remove VLAN SVI from the Nexus Switch."""
         # Grab switch_ip from database
         switch_ip = nxos_db.get_nexusvm_binding(vlan_id,
-                                                router_id)['switch_ip']
+                                                router_id).switch_ip
 
         # Delete the SVI interface from the switch
         self._client.delete_vlan_svi(switch_ip, vlan_id)
@@ -263,37 +263,37 @@ class NexusPlugin(L2DevicePluginBase):
             auto_untrunk = conf.CISCO.provider_vlan_auto_trunk
             LOG.debug("delete_network(): provider vlan %s" % vlan_id)
 
-        switch_ip = row['switch_ip']
+        switch_ip = row.switch_ip
         nexus_port = None
-        if row['port_id'] != 'router':
-            nexus_port = row['port_id']
+        if row.port_id != 'router':
+            nexus_port = row.port_id
 
-        nxos_db.remove_nexusport_binding(row['port_id'], row['vlan_id'],
-                                         row['switch_ip'],
-                                         row['instance_id'])
+        nxos_db.remove_nexusport_binding(row.port_id, row.vlan_id,
+                                         row.switch_ip,
+                                         row.instance_id)
         # Check for any other bindings with the same vlan_id and switch_ip
         try:
-            nxos_db.get_nexusvlan_binding(row['vlan_id'], row['switch_ip'])
+            nxos_db.get_nexusvlan_binding(row.vlan_id, row.switch_ip)
         except cisco_exc.NexusPortBindingNotFound:
             try:
                 # Delete this vlan from this switch
                 if nexus_port and auto_untrunk:
                     self._client.disable_vlan_on_trunk_int(
-                        switch_ip, row['vlan_id'], nexus_port)
+                        switch_ip, row.vlan_id, nexus_port)
                 if auto_delete:
-                    self._client.delete_vlan(switch_ip, row['vlan_id'])
+                    self._client.delete_vlan(switch_ip, row.vlan_id)
             except Exception:
                 # The delete vlan operation on the Nexus failed,
                 # so this delete_port request has failed. For
                 # consistency, roll back the Nexus database to what
                 # it was before this request.
                 with excutils.save_and_reraise_exception():
-                    nxos_db.add_nexusport_binding(row['port_id'],
-                                                  row['vlan_id'],
-                                                  row['switch_ip'],
-                                                  row['instance_id'])
+                    nxos_db.add_nexusport_binding(row.port_id,
+                                                  row.vlan_id,
+                                                  row.switch_ip,
+                                                  row.instance_id)
 
-        return row['instance_id']
+        return row.instance_id
 
     def update_port(self, tenant_id, net_id, port_id, port_state, **kwargs):
         """Update port.
diff --git a/neutron/tests/unit/cisco/test_network_db.py b/neutron/tests/unit/cisco/test_network_db.py
new file mode 100644 (file)
index 0000000..6c0f884
--- /dev/null
@@ -0,0 +1,240 @@
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+import testtools
+
+from neutron.db import api as db
+from neutron.plugins.cisco.common import cisco_exceptions as c_exc
+from neutron.plugins.cisco.db import network_db_v2 as cdb
+from neutron.tests import base
+
+
+class CiscoNetworkQosDbTest(base.BaseTestCase):
+
+    """Unit tests for cisco.db.network_models_v2.QoS model."""
+
+    QosObj = collections.namedtuple('QosObj', 'tenant qname desc')
+
+    def setUp(self):
+        super(CiscoNetworkQosDbTest, self).setUp()
+        db.configure_db()
+        self.session = db.get_session()
+        self.addCleanup(db.clear_db)
+
+    def _qos_test_obj(self, tnum, qnum, desc=None):
+        """Create a Qos test object from a pair of numbers."""
+        if desc is None:
+            desc = 'test qos %s-%s' % (str(tnum), str(qnum))
+        tenant = 'tenant_%s' % str(tnum)
+        qname = 'qos_%s' % str(qnum)
+        return self.QosObj(tenant, qname, desc)
+
+    def _assert_equal(self, qos, qos_obj):
+        self.assertEqual(qos.tenant_id, qos_obj.tenant)
+        self.assertEqual(qos.qos_name, qos_obj.qname)
+        self.assertEqual(qos.qos_desc, qos_obj.desc)
+
+    def test_qos_add_remove(self):
+        qos11 = self._qos_test_obj(1, 1)
+        qos = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc)
+        self._assert_equal(qos, qos11)
+        qos_id = qos.qos_id
+        qos = cdb.remove_qos(qos11.tenant, qos_id)
+        self._assert_equal(qos, qos11)
+        qos = cdb.remove_qos(qos11.tenant, qos_id)
+        self.assertIsNone(qos)
+
+    def test_qos_add_dup(self):
+        qos22 = self._qos_test_obj(2, 2)
+        qos = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc)
+        self._assert_equal(qos, qos22)
+        qos_id = qos.qos_id
+        with testtools.ExpectedException(c_exc.QosNameAlreadyExists):
+            cdb.add_qos(qos22.tenant, qos22.qname, "duplicate 22")
+        qos = cdb.remove_qos(qos22.tenant, qos_id)
+        self._assert_equal(qos, qos22)
+        qos = cdb.remove_qos(qos22.tenant, qos_id)
+        self.assertIsNone(qos)
+
+    def test_qos_get(self):
+        qos11 = self._qos_test_obj(1, 1)
+        qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id
+        qos21 = self._qos_test_obj(2, 1)
+        qos21_id = cdb.add_qos(qos21.tenant, qos21.qname, qos21.desc).qos_id
+        qos22 = self._qos_test_obj(2, 2)
+        qos22_id = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc).qos_id
+
+        qos = cdb.get_qos(qos11.tenant, qos11_id)
+        self._assert_equal(qos, qos11)
+        qos = cdb.get_qos(qos21.tenant, qos21_id)
+        self._assert_equal(qos, qos21)
+        qos = cdb.get_qos(qos21.tenant, qos22_id)
+        self._assert_equal(qos, qos22)
+
+        with testtools.ExpectedException(c_exc.QosNotFound):
+            cdb.get_qos(qos11.tenant, "dummyQosId")
+        with testtools.ExpectedException(c_exc.QosNotFound):
+            cdb.get_qos(qos11.tenant, qos21_id)
+        with testtools.ExpectedException(c_exc.QosNotFound):
+            cdb.get_qos(qos21.tenant, qos11_id)
+
+        qos_all_t1 = cdb.get_all_qoss(qos11.tenant)
+        self.assertEqual(len(qos_all_t1), 1)
+        qos_all_t2 = cdb.get_all_qoss(qos21.tenant)
+        self.assertEqual(len(qos_all_t2), 2)
+        qos_all_t3 = cdb.get_all_qoss("tenant3")
+        self.assertEqual(len(qos_all_t3), 0)
+
+    def test_qos_update(self):
+        qos11 = self._qos_test_obj(1, 1)
+        qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id
+        cdb.update_qos(qos11.tenant, qos11_id)
+        new_qname = "new qos name"
+        new_qos = cdb.update_qos(qos11.tenant, qos11_id, new_qname)
+        expected_qobj = self.QosObj(qos11.tenant, new_qname, qos11.desc)
+        self._assert_equal(new_qos, expected_qobj)
+        new_qos = cdb.get_qos(qos11.tenant, qos11_id)
+        self._assert_equal(new_qos, expected_qobj)
+        with testtools.ExpectedException(c_exc.QosNotFound):
+            cdb.update_qos(qos11.tenant, "dummyQosId")
+
+
+class CiscoNetworkCredentialDbTest(base.BaseTestCase):
+
+    """Unit tests for cisco.db.network_models_v2.Credential model."""
+
+    CredObj = collections.namedtuple('CredObj', 'tenant cname usr pwd')
+
+    def setUp(self):
+        super(CiscoNetworkCredentialDbTest, self).setUp()
+        db.configure_db()
+        self.session = db.get_session()
+        self.addCleanup(db.clear_db)
+
+    def _cred_test_obj(self, tnum, cnum):
+        """Create a Credential test object from a pair of numbers."""
+        tenant = 'tenant_%s' % str(tnum)
+        cname = 'credential_%s' % str(cnum)
+        usr = 'User_%s_%s' % (str(tnum), str(cnum))
+        pwd = 'Password_%s_%s' % (str(tnum), str(cnum))
+        return self.CredObj(tenant, cname, usr, pwd)
+
+    def _assert_equal(self, credential, cred_obj):
+        self.assertEqual(credential.tenant_id, cred_obj.tenant)
+        self.assertEqual(credential.credential_name, cred_obj.cname)
+        self.assertEqual(credential.user_name, cred_obj.usr)
+        self.assertEqual(credential.password, cred_obj.pwd)
+
+    def test_credential_add_remove(self):
+        cred11 = self._cred_test_obj(1, 1)
+        cred = cdb.add_credential(
+            cred11.tenant, cred11.cname, cred11.usr, cred11.pwd)
+        self._assert_equal(cred, cred11)
+        cred_id = cred.credential_id
+        cred = cdb.remove_credential(cred11.tenant, cred_id)
+        self._assert_equal(cred, cred11)
+        cred = cdb.remove_credential(cred11.tenant, cred_id)
+        self.assertIsNone(cred)
+
+    def test_credential_add_dup(self):
+        cred22 = self._cred_test_obj(2, 2)
+        cred = cdb.add_credential(
+            cred22.tenant, cred22.cname, cred22.usr, cred22.pwd)
+        self._assert_equal(cred, cred22)
+        cred_id = cred.credential_id
+        with testtools.ExpectedException(c_exc.CredentialAlreadyExists):
+            cdb.add_credential(
+                cred22.tenant, cred22.cname, cred22.usr, cred22.pwd)
+        cred = cdb.remove_credential(cred22.tenant, cred_id)
+        self._assert_equal(cred, cred22)
+        cred = cdb.remove_credential(cred22.tenant, cred_id)
+        self.assertIsNone(cred)
+
+    def test_credential_get_id(self):
+        cred11 = self._cred_test_obj(1, 1)
+        cred11_id = cdb.add_credential(
+            cred11.tenant, cred11.cname, cred11.usr, cred11.pwd).credential_id
+        cred21 = self._cred_test_obj(2, 1)
+        cred21_id = cdb.add_credential(
+            cred21.tenant, cred21.cname, cred21.usr, cred21.pwd).credential_id
+        cred22 = self._cred_test_obj(2, 2)
+        cred22_id = cdb.add_credential(
+            cred22.tenant, cred22.cname, cred22.usr, cred22.pwd).credential_id
+
+        cred = cdb.get_credential(cred11.tenant, cred11_id)
+        self._assert_equal(cred, cred11)
+        cred = cdb.get_credential(cred21.tenant, cred21_id)
+        self._assert_equal(cred, cred21)
+        cred = cdb.get_credential(cred21.tenant, cred22_id)
+        self._assert_equal(cred, cred22)
+
+        with testtools.ExpectedException(c_exc.CredentialNotFound):
+            cdb.get_credential(cred11.tenant, "dummyCredentialId")
+        with testtools.ExpectedException(c_exc.CredentialNotFound):
+            cdb.get_credential(cred11.tenant, cred21_id)
+        with testtools.ExpectedException(c_exc.CredentialNotFound):
+            cdb.get_credential(cred21.tenant, cred11_id)
+
+        cred_all_t1 = cdb.get_all_credentials(cred11.tenant)
+        self.assertEqual(len(cred_all_t1), 1)
+        cred_all_t2 = cdb.get_all_credentials(cred21.tenant)
+        self.assertEqual(len(cred_all_t2), 2)
+        cred_all_t3 = cdb.get_all_credentials("dummyTenant")
+        self.assertEqual(len(cred_all_t3), 0)
+
+    def test_credential_get_name(self):
+        cred11 = self._cred_test_obj(1, 1)
+        cred11_id = cdb.add_credential(
+            cred11.tenant, cred11.cname, cred11.usr, cred11.pwd).credential_id
+        cred21 = self._cred_test_obj(2, 1)
+        cred21_id = cdb.add_credential(
+            cred21.tenant, cred21.cname, cred21.usr, cred21.pwd).credential_id
+        cred22 = self._cred_test_obj(2, 2)
+        cred22_id = cdb.add_credential(
+            cred22.tenant, cred22.cname, cred22.usr, cred22.pwd).credential_id
+        self.assertNotEqual(cred11_id, cred21_id)
+        self.assertNotEqual(cred11_id, cred22_id)
+        self.assertNotEqual(cred21_id, cred22_id)
+
+        cred = cdb.get_credential_name(cred11.tenant, cred11.cname)
+        self._assert_equal(cred, cred11)
+        cred = cdb.get_credential_name(cred21.tenant, cred21.cname)
+        self._assert_equal(cred, cred21)
+        cred = cdb.get_credential_name(cred22.tenant, cred22.cname)
+        self._assert_equal(cred, cred22)
+
+        with testtools.ExpectedException(c_exc.CredentialNameNotFound):
+            cdb.get_credential_name(cred11.tenant, "dummyCredentialName")
+        with testtools.ExpectedException(c_exc.CredentialNameNotFound):
+            cdb.get_credential_name(cred11.tenant, cred22.cname)
+
+    def test_credential_update(self):
+        cred11 = self._cred_test_obj(1, 1)
+        cred11_id = cdb.add_credential(
+            cred11.tenant, cred11.cname, cred11.usr, cred11.pwd).credential_id
+        cdb.update_credential(cred11.tenant, cred11_id)
+        new_usr = "new user name"
+        new_pwd = "new password"
+        new_credential = cdb.update_credential(
+            cred11.tenant, cred11_id, new_usr, new_pwd)
+        expected_cred = self.CredObj(
+            cred11.tenant, cred11.cname, new_usr, new_pwd)
+        self._assert_equal(new_credential, expected_cred)
+        new_credential = cdb.get_credential(cred11.tenant, cred11_id)
+        self._assert_equal(new_credential, expected_cred)
+        with testtools.ExpectedException(c_exc.CredentialNotFound):
+            cdb.update_credential(
+                cred11.tenant, "dummyCredentialId", new_usr, new_pwd)
diff --git a/neutron/tests/unit/cisco/test_nexus_db.py b/neutron/tests/unit/cisco/test_nexus_db.py
new file mode 100644 (file)
index 0000000..712dffd
--- /dev/null
@@ -0,0 +1,190 @@
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+import testtools
+
+from neutron.db import api as db
+from neutron.plugins.cisco.common import cisco_exceptions as c_exc
+from neutron.plugins.cisco.db import nexus_db_v2 as nxdb
+from neutron.tests import base
+
+
+class CiscoNexusDbTest(base.BaseTestCase):
+
+    """Unit tests for cisco.db.nexus_models_v2.NexusPortBinding model."""
+
+    NpbObj = collections.namedtuple('NpbObj', 'port vlan switch instance')
+
+    def setUp(self):
+        super(CiscoNexusDbTest, self).setUp()
+        db.configure_db()
+        self.session = db.get_session()
+        self.addCleanup(db.clear_db)
+
+    def _npb_test_obj(self, pnum, vnum, switch=None, instance=None):
+        """Create a Nexus port binding test object from a pair of numbers."""
+        if pnum is 'router':
+            port = pnum
+        else:
+            port = '1/%s' % str(pnum)
+        vlan = str(vnum)
+        if switch is None:
+            switch = '10.9.8.7'
+        if instance is None:
+            instance = 'instance_%s_%s' % (str(pnum), str(vnum))
+        return self.NpbObj(port, vlan, switch, instance)
+
+    def _assert_equal(self, npb, npb_obj):
+        self.assertEqual(npb.port_id, npb_obj.port)
+        self.assertEqual(int(npb.vlan_id), int(npb_obj.vlan))
+        self.assertEqual(npb.switch_ip, npb_obj.switch)
+        self.assertEqual(npb.instance_id, npb_obj.instance)
+
+    def _add_to_db(self, npbs):
+        for npb in npbs:
+            nxdb.add_nexusport_binding(
+                npb.port, npb.vlan, npb.switch, npb.instance)
+
+    def test_nexusportbinding_add_remove(self):
+        npb11 = self._npb_test_obj(10, 100)
+        npb = nxdb.add_nexusport_binding(
+            npb11.port, npb11.vlan, npb11.switch, npb11.instance)
+        self._assert_equal(npb, npb11)
+        npb = nxdb.remove_nexusport_binding(
+            npb11.port, npb11.vlan, npb11.switch, npb11.instance)
+        self.assertEqual(len(npb), 1)
+        self._assert_equal(npb[0], npb11)
+        with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+            nxdb.remove_nexusport_binding(
+                npb11.port, npb11.vlan, npb11.switch, npb11.instance)
+
+    def test_nexusportbinding_get(self):
+        npb11 = self._npb_test_obj(10, 100)
+        npb21 = self._npb_test_obj(20, 100)
+        npb22 = self._npb_test_obj(20, 200)
+        self._add_to_db([npb11, npb21, npb22])
+
+        npb = nxdb.get_nexusport_binding(
+            npb11.port, npb11.vlan, npb11.switch, npb11.instance)
+        self.assertEqual(len(npb), 1)
+        self._assert_equal(npb[0], npb11)
+        npb = nxdb.get_nexusport_binding(
+            npb21.port, npb21.vlan, npb21.switch, npb21.instance)
+        self.assertEqual(len(npb), 1)
+        self._assert_equal(npb[0], npb21)
+        npb = nxdb.get_nexusport_binding(
+            npb22.port, npb22.vlan, npb22.switch, npb22.instance)
+        self.assertEqual(len(npb), 1)
+        self._assert_equal(npb[0], npb22)
+
+        with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+            nxdb.get_nexusport_binding(
+                npb21.port, npb21.vlan, npb21.switch, "dummyInstance")
+
+    def test_nexusvlanbinding_get(self):
+        npb11 = self._npb_test_obj(10, 100)
+        npb21 = self._npb_test_obj(20, 100)
+        npb22 = self._npb_test_obj(20, 200)
+        self._add_to_db([npb11, npb21, npb22])
+
+        npb_all_v100 = nxdb.get_nexusvlan_binding(npb11.vlan, npb11.switch)
+        self.assertEqual(len(npb_all_v100), 2)
+        npb_v200 = nxdb.get_nexusvlan_binding(npb22.vlan, npb22.switch)
+        self.assertEqual(len(npb_v200), 1)
+        self._assert_equal(npb_v200[0], npb22)
+
+        with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+            nxdb.get_nexusvlan_binding(npb21.vlan, "dummySwitch")
+
+    def test_nexusvmbinding_get(self):
+        npb11 = self._npb_test_obj(10, 100)
+        npb21 = self._npb_test_obj(20, 100)
+        npb22 = self._npb_test_obj(20, 200)
+        self._add_to_db([npb11, npb21, npb22])
+
+        npb = nxdb.get_nexusvm_binding(npb21.vlan, npb21.instance)
+        self._assert_equal(npb, npb21)
+        npb = nxdb.get_nexusvm_binding(npb22.vlan, npb22.instance)
+        self._assert_equal(npb, npb22)
+
+        with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+            nxdb.get_nexusvm_binding(npb21.vlan, "dummyInstance")
+
+    def test_nexusportvlanswitchbinding_get(self):
+        npb11 = self._npb_test_obj(10, 100)
+        npb21 = self._npb_test_obj(20, 100)
+        self._add_to_db([npb11, npb21])
+
+        npb = nxdb.get_port_vlan_switch_binding(
+            npb11.port, npb11.vlan, npb11.switch)
+        self.assertEqual(len(npb), 1)
+        self._assert_equal(npb[0], npb11)
+
+        with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+            nxdb.get_port_vlan_switch_binding(
+                npb21.port, npb21.vlan, "dummySwitch")
+
+    def test_nexusportswitchbinding_get(self):
+        npb11 = self._npb_test_obj(10, 100)
+        npb21 = self._npb_test_obj(20, 100, switch='2.2.2.2')
+        npb22 = self._npb_test_obj(20, 200, switch='2.2.2.2')
+        self._add_to_db([npb11, npb21, npb22])
+
+        npb = nxdb.get_port_switch_bindings(npb11.port, npb11.switch)
+        self.assertEqual(len(npb), 1)
+        self._assert_equal(npb[0], npb11)
+        npb_all_p20 = nxdb.get_port_switch_bindings(npb21.port, npb21.switch)
+        self.assertEqual(len(npb_all_p20), 2)
+
+        npb = nxdb.get_port_switch_bindings(npb21.port, "dummySwitch")
+        self.assertIsNone(npb)
+
+    def test_nexussvibinding_get(self):
+        npbr1 = self._npb_test_obj('router', 100)
+        npb21 = self._npb_test_obj(20, 100)
+        self._add_to_db([npbr1, npb21])
+
+        npb_svi = nxdb.get_nexussvi_bindings()
+        self.assertEqual(len(npb_svi), 1)
+        self._assert_equal(npb_svi[0], npbr1)
+
+        npbr2 = self._npb_test_obj('router', 200)
+        self._add_to_db([npbr2])
+        npb_svi = nxdb.get_nexussvi_bindings()
+        self.assertEqual(len(npb_svi), 2)
+
+    def test_nexusbinding_update(self):
+        npb11 = self._npb_test_obj(10, 100, switch='1.1.1.1', instance='test')
+        npb21 = self._npb_test_obj(20, 100, switch='1.1.1.1', instance='test')
+        self._add_to_db([npb11, npb21])
+
+        npb_all_v100 = nxdb.get_nexusvlan_binding(npb11.vlan, '1.1.1.1')
+        self.assertEqual(len(npb_all_v100), 2)
+
+        npb22 = self._npb_test_obj(20, 200, switch='1.1.1.1', instance='test')
+        npb = nxdb.update_nexusport_binding(npb21.port, 200)
+        self._assert_equal(npb, npb22)
+
+        npb_all_v100 = nxdb.get_nexusvlan_binding(npb11.vlan, '1.1.1.1')
+        self.assertEqual(len(npb_all_v100), 1)
+        self._assert_equal(npb_all_v100[0], npb11)
+
+        npb = nxdb.update_nexusport_binding(npb21.port, 0)
+        self.assertIsNone(npb)
+
+        npb33 = self._npb_test_obj(30, 300, switch='1.1.1.1', instance='test')
+        with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
+            nxdb.update_nexusport_binding(npb33.port, 200)