]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
updated README file to include persistence framework setup instructions
authorrohitagarwalla <roagarwa@cisco.com>
Sat, 13 Aug 2011 20:37:20 +0000 (13:37 -0700)
committerrohitagarwalla <roagarwa@cisco.com>
Sat, 13 Aug 2011 20:37:20 +0000 (13:37 -0700)
updated db api.py unset_attachment method to return port
moved db_conn.ini into cisco/conf/ with other configuration files
updated l2network_plugin_configuration.py to get db config
cleaned up l2network_db.py - removed config parser code as using cisco config parser
updated l2network_db.py to raise specific exceptions in error cases
updated create_vlanid method in l2network_db.py to not raise exception if vlan rows exist
updated portprofile and portprofile_binding methods to include tenant_id as an argument
added cisco/db/test_database.py containing unit tests for quantum and l2network_plugin tables
edited get_pp_binding method in l2network_db.py to return empty list when no results found
pep8 checks done

quantum/plugins/cisco/README
quantum/plugins/cisco/common/cisco_exceptions.py
quantum/plugins/cisco/conf/db_conn.ini [new file with mode: 0644]
quantum/plugins/cisco/db/api.py
quantum/plugins/cisco/db/db_conn.ini [deleted file]
quantum/plugins/cisco/db/l2network_db.py
quantum/plugins/cisco/db/test_database.py [new file with mode: 0644]
quantum/plugins/cisco/l2network_plugin_configuration.py

index 68d1b5cf42439c18fdf27075906ab94310d544c4..f67c453f81bbbea80acb284a50e0c74730f8f36f 100755 (executable)
@@ -108,6 +108,14 @@ nexus_plugin=quantum.plugins.cisco.nexus.cisco_nexus_plugin.NexusPlugin
 \r
     4b.  Enter the relevant configuration in the \r
          quantum/plugins/cisco/conf/nexus.ini file.  Example:\r
+         \r
+5.  Plugin Persistence framework setup:\r
+       5a.  Create quantum_l2network database in mysql with the following command -\r
+       \r
+mysql -u<mysqlusername> -p<mysqlpassword> -e "create database quantum_l2network"\r
+\r
+       5b.  Enter the quantum_l2netowrk database configuration info in the \r
+         quantum/plugins/cisco/conf/db_conn.ini file.\r
 \r
 [SWITCH]\r
 # Change the following to reflect the IP address of the Nexus switch.\r
index 06e5e53639ea56d8bf6ebd2088a4a357aa693f7f..7426f260f9679a1c52a6bfb38cc6cff1c5c07b33 100644 (file)
@@ -15,7 +15,7 @@
 #    under the License.
 #
 # @author: Sumit Naiksatam, Cisco Systems, Inc.
-#
+# @author: Rohit Agarwalla, Cisco Systems, Inc.
 
 """
 Exceptions used by the Cisco plugin
@@ -55,3 +55,26 @@ class PortProfileNotFound(exceptions.QuantumException):
 class PortProfileInvalidDelete(exceptions.QuantumException):
     message = _("Port profile %(profile_id)s could not be deleted " \
                 "for tenant %(tenant_id)s since port associations exist")
+
+
+class NetworkVlanBindingAlreadyExists(exceptions.QuantumException):
+    message = _("NetworkVlanBinding for %(vlan_id)s and network " \
+                "%(network_id)s already exists")
+
+
+class PortProfileAlreadyExists(exceptions.QuantumException):
+    message = _("PortProfile %(pp_name) for %(tenant_id)s " \
+                "already exists")
+
+
+class PortProfileBindingAlreadyExists(exceptions.QuantumException):
+    message = _("PortProfileBinding for port profile %(pp_id)s to " \
+                 "port %(port_id) already exists")
+
+
+class VlanIDNotFound(exceptions.QuantumException):
+    message = _("Vlan ID %(vlan_id)s not found")
+
+
+class VlanIDNotAvailable(exceptions.QuantumException):
+    message = _("No available Vlan ID found")
diff --git a/quantum/plugins/cisco/conf/db_conn.ini b/quantum/plugins/cisco/conf/db_conn.ini
new file mode 100644 (file)
index 0000000..4a5d7e3
--- /dev/null
@@ -0,0 +1,5 @@
+[DATABASE]
+name = quantum_l2network
+user = <put_db_user_name_here>
+pass = <put_db_password_here>
+host = <put_quantum_mysql_host_here>
index fc9a69ea29058a445f7d730a65e48de372e907a7..db4e631a286628c1a90f55f5715ec9c3ad8bf860 100644 (file)
@@ -231,6 +231,7 @@ def port_unset_attachment(net_id, port_id):
     port.interface_id = None
     session.merge(port)
     session.flush()
+    return port
 
 
 def port_destroy(net_id, port_id):
diff --git a/quantum/plugins/cisco/db/db_conn.ini b/quantum/plugins/cisco/db/db_conn.ini
deleted file mode 100644 (file)
index 55066aa..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-[DATABASE]
-name = quantum_l2network
-user = root
-pass = nova
-host = 127.0.0.1
index b11c6567765d92746d99d77854b6644c7fa1a28f..119900bd608793c6796da99db70f4cbea5d3585b 100644 (file)
 #    under the License.
 # @author: Rohit Agarwalla, Cisco Systems, Inc.
 
-import ConfigParser
-import os
 import logging as LOG
+import os
 
-from sqlalchemy import create_engine
-from sqlalchemy.orm import sessionmaker, exc, joinedload
+from sqlalchemy.orm import exc, joinedload
 
+from quantum.common import exceptions as q_exc
 from quantum.plugins.cisco import l2network_plugin_configuration as conf
-
-import quantum.plugins.cisco.db.api as db
+from quantum.plugins.cisco.common import cisco_exceptions as c_exc
 
 import l2network_models
-
-
-CONF_FILE = "db_conn.ini"
-
-
-def find_config(basepath):
-    for root, dirs, files in os.walk(basepath):
-        if CONF_FILE in files:
-            return os.path.join(root, CONF_FILE)
-    return None
+import quantum.plugins.cisco.db.api as db
 
 
 def initialize(configfile=None):
-    config = ConfigParser.ConfigParser()
-    if configfile == None:
-        if os.path.exists(CONF_FILE):
-            configfile = CONF_FILE
-        else:
-            configfile = \
-           find_config(os.path.abspath(os.path.dirname(__file__)))
-    if configfile == None:
-        raise Exception("Configuration file \"%s\" doesn't exist" %
-              (configfile))
-    LOG.debug("Using configuration file: %s" % configfile)
-    config.read(configfile)
-
-    DB_NAME = config.get("DATABASE", "name")
-    DB_USER = config.get("DATABASE", "user")
-    DB_PASS = config.get("DATABASE", "pass")
-    DB_HOST = config.get("DATABASE", "host")
-    options = {"sql_connection": "mysql://%s:%s@%s/%s" % (DB_USER,
-    DB_PASS, DB_HOST, DB_NAME)}
+    options = {"sql_connection": "mysql://%s:%s@%s/%s" % (conf.DB_USER,
+    conf.DB_PASS, conf.DB_HOST, conf.DB_NAME)}
     db.configure_db(options)
 
 
@@ -68,7 +40,8 @@ def create_vlanids():
     try:
         vlanid = session.query(l2network_models.VlanID).\
           one()
-        raise Exception("Vlan table not empty id for prepopulation")
+    except exc.MultipleResultsFound:
+        pass
     except exc.NoResultFound:
         start = int(conf.VLAN_START)
         end = int(conf.VLAN_END)
@@ -98,7 +71,7 @@ def is_vlanid_used(vlan_id):
           one()
         return vlanid["vlan_used"]
     except exc.NoResultFound:
-        raise Exception("No vlan found with vlan-id = %s" % vlan_id)
+        raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
 
 
 def release_vlanid(vlan_id):
@@ -112,7 +85,7 @@ def release_vlanid(vlan_id):
         session.flush()
         return vlanid["vlan_used"]
     except exc.NoResultFound:
-        raise Exception("Vlan id %s not present in table" % vlan_id)
+        raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
     return
 
 
@@ -144,7 +117,7 @@ def reserve_vlanid():
         session.flush()
         return vlanids[0]["vlan_id"]
     except exc.NoResultFound:
-        raise Exception("All vlan id's are used")
+        raise VlanIDNotAvailable()
 
 
 def get_all_vlan_bindings():
@@ -167,7 +140,7 @@ def get_vlan_binding(netid):
           one()
         return binding
     except exc.NoResultFound:
-        raise Exception("No network found with net-id = %s" % network_id)
+        raise q_exc.NetworkNotFound(net_id=netid)
 
 
 def add_vlan_binding(vlanid, vlanname, netid):
@@ -177,7 +150,8 @@ def add_vlan_binding(vlanid, vlanname, netid):
         binding = session.query(l2network_models.VlanBinding).\
           filter_by(vlan_id=vlanid).\
           one()
-        raise Exception("Vlan with id \"%s\" already exists" % vlanid)
+        raise c_exc.NetworkVlanBindingAlreadyExists(vlan_id=vlanid,
+                                                    network_id=netid)
     except exc.NoResultFound:
         binding = l2network_models.VlanBinding(vlanid, vlanname, netid)
         session.add(binding)
@@ -214,7 +188,7 @@ def update_vlan_binding(netid, newvlanid=None, newvlanname=None):
         session.flush()
         return binding
     except exc.NoResultFound:
-        raise Exception("No vlan binding found with network_id = %s" % netid)
+        raise q_exc.NetworkNotFound(net_id=netid)
 
 
 def get_all_portprofiles():
@@ -228,7 +202,7 @@ def get_all_portprofiles():
         return []
 
 
-def get_portprofile(ppid):
+def get_portprofile(tenantid, ppid):
     """Lists a port profile"""
     session = db.get_session()
     try:
@@ -237,17 +211,19 @@ def get_portprofile(ppid):
           one()
         return pp
     except exc.NoResultFound:
-        raise Exception("No portprofile found with id = %s" % ppid)
+        raise c_exc.PortProfileNotFound(tenant_id=tenantid,
+                                portprofile_id=ppid)
 
 
-def add_portprofile(ppname, vlanid, qos):
+def add_portprofile(tenantid, ppname, vlanid, qos):
     """Adds a port profile"""
     session = db.get_session()
     try:
         pp = session.query(l2network_models.PortProfile).\
           filter_by(name=ppname).\
           one()
-        raise Exception("Port profile with name %s already exists" % ppname)
+        raise c_exc.PortProfileAlreadyExists(tenant_id=tenantid,
+                                       pp_name=ppname)
     except exc.NoResultFound:
         pp = l2network_models.PortProfile(ppname, vlanid, qos)
         session.add(pp)
@@ -255,7 +231,7 @@ def add_portprofile(ppname, vlanid, qos):
         return pp
 
 
-def remove_portprofile(ppid):
+def remove_portprofile(tenantid, ppid):
     """Removes a port profile"""
     session = db.get_session()
     try:
@@ -269,7 +245,8 @@ def remove_portprofile(ppid):
             pass
 
 
-def update_portprofile(ppid, newppname=None, newvlanid=None, newqos=None):
+def update_portprofile(tenantid, ppid, newppname=None, newvlanid=None, \
+                       newqos=None):
     """Updates port profile"""
     session = db.get_session()
     try:
@@ -286,7 +263,8 @@ def update_portprofile(ppid, newppname=None, newvlanid=None, newqos=None):
         session.flush()
         return pp
     except exc.NoResultFound:
-        raise Exception("No port profile with id = %s" % ppid)
+        raise c_exc.PortProfileNotFound(tenant_id=tenantid,
+                                portprofile_id=ppid)
 
 
 def get_all_pp_bindings():
@@ -300,7 +278,7 @@ def get_all_pp_bindings():
         return []
 
 
-def get_pp_binding(ppid):
+def get_pp_binding(tenantid, ppid):
     """Lists a port profile binding"""
     session = db.get_session()
     try:
@@ -309,7 +287,7 @@ def get_pp_binding(ppid):
           one()
         return binding
     except exc.NoResultFound:
-        raise Exception("No portprofile binding found with id = %s" % ppid)
+        return []
 
 
 def add_pp_binding(tenantid, portid, ppid, default):
@@ -319,8 +297,8 @@ def add_pp_binding(tenantid, portid, ppid, default):
         binding = session.query(l2network_models.PortProfileBinding).\
           filter_by(portprofile_id=ppid).\
           one()
-        raise Exception("Port profile binding with id \"%s\" already \
-                                                         exists" % ppid)
+        raise c_exc.PortProfileBindingAlreadyExists(pp_id=ppid,
+                                                    port_id=portid)
     except exc.NoResultFound:
         binding = l2network_models.PortProfileBinding(tenantid, portid, \
                                                             ppid, default)
@@ -329,7 +307,7 @@ def add_pp_binding(tenantid, portid, ppid, default):
         return binding
 
 
-def remove_pp_binding(portid, ppid):
+def remove_pp_binding(tenantid, portid, ppid):
     """Removes a port profile binding"""
     session = db.get_session()
     try:
@@ -344,7 +322,7 @@ def remove_pp_binding(portid, ppid):
             pass
 
 
-def update_pp_binding(ppid, newtenantid=None, newportid=None, \
+def update_pp_binding(tenantid, ppid, newtenantid=None, newportid=None, \
                                                     newdefault=None):
     """Updates port profile binding"""
     session = db.get_session()
@@ -362,4 +340,5 @@ def update_pp_binding(ppid, newtenantid=None, newportid=None, \
         session.flush()
         return binding
     except exc.NoResultFound:
-        raise Exception("No port profile binding with id = %s" % ppid)
+        raise c_exc.PortProfileNotFound(tenant_id=tenantid,
+                                portprofile_id=ppid)
diff --git a/quantum/plugins/cisco/db/test_database.py b/quantum/plugins/cisco/db/test_database.py
new file mode 100644 (file)
index 0000000..88c1bb6
--- /dev/null
@@ -0,0 +1,745 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011, Cisco Systems, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+# @author: Rohit Agarwalla, Cisco Systems, Inc.
+
+import ConfigParser
+import os
+import logging as LOG
+import unittest
+
+from optparse import OptionParser
+from quantum.plugins.cisco.common import cisco_constants as const
+
+import quantum.plugins.cisco.db.api as db
+import quantum.plugins.cisco.db.l2network_db as l2network_db
+import quantum.plugins.cisco.db.l2network_models
+
+
+LOG.getLogger(const.LOGGER_COMPONENT_NAME)
+
+
+class L2networkDB(object):
+    def get_all_vlan_bindings(self):
+        vlans = []
+        try:
+            for x in l2network_db.get_all_vlan_bindings():
+                LOG.debug("Getting vlan bindings for vlan: %s" % x.vlan_id)
+                vlan_dict = {}
+                vlan_dict["vlan-id"] = str(x.vlan_id)
+                vlan_dict["vlan-name"] = x.vlan_name
+                vlan_dict["net-id"] = str(x.network_id)
+                vlans.append(vlan_dict)
+        except Exception, e:
+            LOG.error("Failed to get all vlan bindings: %s" % str(e))
+        return vlans
+
+    def get_vlan_binding(self, network_id):
+        vlan = []
+        try:
+            for x in l2network_db.get_vlan_binding(network_id):
+                LOG.debug("Getting vlan binding for vlan: %s" % x.vlan_id)
+                vlan_dict = {}
+                vlan_dict["vlan-id"] = str(x.vlan_id)
+                vlan_dict["vlan-name"] = x.vlan_name
+                vlan_dict["net-id"] = str(x.network_id)
+                vlan.append(vlan_dict)
+        except Exception, e:
+            LOG.error("Failed to get vlan binding: %s" % str(e))
+        return vlan
+
+    def create_vlan_binding(self, vlan_id, vlan_name, network_id):
+        vlan_dict = {}
+        try:
+            res = l2network_db.add_vlan_binding(vlan_id, vlan_name, network_id)
+            LOG.debug("Created vlan binding for vlan: %s" % res.vlan_id)
+            vlan_dict["vlan-id"] = str(res.vlan_id)
+            vlan_dict["vlan-name"] = res.vlan_name
+            vlan_dict["net-id"] = str(res.network_id)
+            return vlan_dict
+        except Exception, e:
+            LOG.error("Failed to create vlan binding: %s" % str(e))
+
+    def delete_vlan_binding(self, network_id):
+        try:
+            res = l2network_db.remove_vlan_binding(network_id)
+            LOG.debug("Deleted vlan binding for vlan: %s" % res.vlan_id)
+            vlan_dict = {}
+            vlan_dict["vlan-id"] = str(res.vlan_id)
+            return vlan_dict
+        except Exception, e:
+            raise Exception("Failed to delete vlan binding: %s" % str(e))
+
+    def update_vlan_binding(self, network_id, vlan_id, vlan_name):
+        try:
+            res = l2network_db.update_vlan_binding(network_id, vlan_id, \
+                                                            vlan_name)
+            LOG.debug("Updating vlan binding for vlan: %s" % res.vlan_id)
+            vlan_dict = {}
+            vlan_dict["vlan-id"] = str(res.vlan_id)
+            vlan_dict["vlan-name"] = res.vlan_name
+            vlan_dict["net-id"] = str(res.network_id)
+            return vlan_dict
+        except Exception, e:
+            raise Exception("Failed to update vlan binding: %s" % str(e))
+
+    def get_all_portprofiles(self):
+        pps = []
+        try:
+            for x in l2network_db.get_all_portprofiles():
+                LOG.debug("Getting port profile : %s" % x.uuid)
+                pp_dict = {}
+                pp_dict["portprofile-id"] = str(x.uuid)
+                pp_dict["portprofile-name"] = x.name
+                pp_dict["vlan-id"] = str(x.vlan_id)
+                pp_dict["qos"] = x.qos
+                pps.append(pp_dict)
+        except Exception, e:
+            LOG.error("Failed to get all port profiles: %s" % str(e))
+        return pps
+
+    def get_portprofile(self, tenant_id, pp_id):
+        pp = []
+        try:
+            for x in l2network_db.get_portprofile(tenant_id, pp_id):
+                LOG.debug("Getting port profile : %s" % x.uuid)
+                pp_dict = {}
+                pp_dict["portprofile-id"] = str(x.uuid)
+                pp_dict["portprofile-name"] = x.name
+                pp_dict["vlan-id"] = str(x.vlan_id)
+                pp_dict["qos"] = x.qos
+                pp.append(pp_dict)
+        except Exception, e:
+            LOG.error("Failed to get port profile: %s" % str(e))
+        return pp
+
+    def create_portprofile(self, tenant_id, name, vlan_id, qos):
+        pp_dict = {}
+        try:
+            res = l2network_db.add_portprofile(tenant_id, name, vlan_id, qos)
+            LOG.debug("Created port profile: %s" % res.uuid)
+            pp_dict["portprofile-id"] = str(res.uuid)
+            pp_dict["portprofile-name"] = res.name
+            pp_dict["vlan-id"] = str(res.vlan_id)
+            pp_dict["qos"] = res.qos
+            return pp_dict
+        except Exception, e:
+            LOG.error("Failed to create port profile: %s" % str(e))
+
+    def delete_portprofile(self, tenant_id, pp_id):
+        try:
+            res = l2network_db.remove_portprofile(tenant_id, pp_id)
+            LOG.debug("Deleted port profile : %s" % res.uuid)
+            pp_dict = {}
+            pp_dict["pp-id"] = str(res.uuid)
+            return pp_dict
+        except Exception, e:
+            raise Exception("Failed to delete port profile: %s" % str(e))
+
+    def update_portprofile(self, tenant_id, pp_id, name, vlan_id, qos):
+        try:
+            res = l2network_db.update_portprofile(tenant_id, pp_id, name,
+                                                  vlan_id, qos)
+            LOG.debug("Updating port profile : %s" % res.uuid)
+            pp_dict = {}
+            pp_dict["portprofile-id"] = str(res.uuid)
+            pp_dict["portprofile-name"] = res.name
+            pp_dict["vlan-id"] = str(res.vlan_id)
+            pp_dict["qos"] = res.qos
+            return pp_dict
+        except Exception, e:
+            raise Exception("Failed to update port profile: %s" % str(e))
+
+    def get_all_pp_bindings(self):
+        pp_bindings = []
+        try:
+            for x in l2network_db.get_all_pp_bindings():
+                LOG.debug("Getting port profile binding: %s" % \
+                                               x.portprofile_id)
+                ppbinding_dict = {}
+                ppbinding_dict["portprofile-id"] = str(x.portprofile_id)
+                ppbinding_dict["port-id"] = str(x.port_id)
+                ppbinding_dict["tenant-id"] = x.tenant_id
+                ppbinding_dict["default"] = x.default
+                pp_bindings.append(ppbinding_dict)
+        except Exception, e:
+            LOG.error("Failed to get all port profiles: %s" % str(e))
+        return pp_bindings
+
+    def get_pp_binding(self, tenant_id, pp_id):
+        pp_binding = []
+        try:
+            for x in l2network_db.get_pp_binding(tenant_id, pp_id):
+                LOG.debug("Getting port profile binding: %s" % \
+                                                 x.portprofile_id)
+                ppbinding_dict = {}
+                ppbinding_dict["portprofile-id"] = str(x.portprofile_id)
+                ppbinding_dict["port-id"] = str(x.port_id)
+                ppbinding_dict["tenant-id"] = x.tenant_id
+                ppbinding_dict["default"] = x.default
+                pp_bindings.append(ppbinding_dict)
+        except Exception, e:
+            LOG.error("Failed to get port profile binding: %s" % str(e))
+        return pp_binding
+
+    def create_pp_binding(self, tenant_id, port_id, pp_id, default):
+        ppbinding_dict = {}
+        try:
+            res = l2network_db.add_pp_binding(tenant_id, port_id, pp_id, \
+                                                                default)
+            LOG.debug("Created port profile binding: %s" % res.portprofile_id)
+            ppbinding_dict["portprofile-id"] = str(res.portprofile_id)
+            ppbinding_dict["port-id"] = str(res.port_id)
+            ppbinding_dict["tenant-id"] = res.tenant_id
+            ppbinding_dict["default"] = res.default
+            return ppbinding_dict
+        except Exception, e:
+            LOG.error("Failed to create port profile binding: %s" % str(e))
+
+    def delete_pp_binding(self, tenant_id, port_id, pp_id):
+        try:
+            res = l2network_db.remove_pp_binding(tenant_id, port_id, pp_id)
+            LOG.debug("Deleted port profile binding : %s" % res.portprofile_id)
+            ppbinding_dict = {}
+            ppbinding_dict["portprofile-id"] = str(res.portprofile_id)
+            return ppbinding_dict
+        except Exception, e:
+            raise Exception("Failed to delete port profile: %s" % str(e))
+
+    def update_pp_binding(self, tenant_id, pp_id, newtenant_id, \
+                          port_id, default):
+        try:
+            res = l2network_db.update_pp_binding(tenant_id, pp_id,
+                                            newtenant_id, port_id, default)
+            LOG.debug("Updating port profile binding: %s" % res.portprofile_id)
+            ppbinding_dict = {}
+            ppbinding_dict["portprofile-id"] = str(res.portprofile_id)
+            ppbinding_dict["port-id"] = str(res.port_id)
+            ppbinding_dict["tenant-id"] = res.tenant_id
+            ppbinding_dict["default"] = res.default
+            return ppbinding_dict
+        except Exception, e:
+            raise Exception("Failed to update portprofile binding:%s" % str(e))
+
+
+class QuantumDB(object):
+    def get_all_networks(self, tenant_id):
+        nets = []
+        try:
+            for x in db.network_list(tenant_id):
+                LOG.debug("Getting network: %s" % x.uuid)
+                net_dict = {}
+                net_dict["tenant-id"] = x.tenant_id
+                net_dict["net-id"] = str(x.uuid)
+                net_dict["net-name"] = x.name
+                nets.append(net_dict)
+        except Exception, e:
+            LOG.error("Failed to get all networks: %s" % str(e))
+        return nets
+
+    def get_network(self, network_id):
+        net = []
+        try:
+            for x in db.network_get(network_id):
+                LOG.debug("Getting network: %s" % x.uuid)
+                net_dict = {}
+                net_dict["tenant-id"] = x.tenant_id
+                net_dict["net-id"] = str(x.uuid)
+                net_dict["net-name"] = x.name
+                nets.append(net_dict)
+        except Exception, e:
+            LOG.error("Failed to get network: %s" % str(e))
+        return net
+
+    def create_network(self, tenant_id, net_name):
+        net_dict = {}
+        try:
+            res = db.network_create(tenant_id, net_name)
+            LOG.debug("Created network: %s" % res.uuid)
+            net_dict["tenant-id"] = res.tenant_id
+            net_dict["net-id"] = str(res.uuid)
+            net_dict["net-name"] = res.name
+            return net_dict
+        except Exception, e:
+            LOG.error("Failed to create network: %s" % str(e))
+
+    def delete_network(self, net_id):
+        try:
+            net = db.network_destroy(net_id)
+            LOG.debug("Deleted network: %s" % net.uuid)
+            net_dict = {}
+            net_dict["net-id"] = str(net.uuid)
+            return net_dict
+        except Exception, e:
+            raise Exception("Failed to delete port: %s" % str(e))
+
+    def rename_network(self, tenant_id, net_id, new_name):
+        try:
+            net = db.network_rename(tenant_id, net_id, new_name)
+            LOG.debug("Renamed network: %s" % net.uuid)
+            net_dict = {}
+            net_dict["net-id"] = str(net.uuid)
+            net_dict["net-name"] = net.name
+            return net_dict
+        except Exception, e:
+            raise Exception("Failed to rename network: %s" % str(e))
+
+    def get_all_ports(self, net_id):
+        ports = []
+        try:
+            for x in db.port_list(net_id):
+                LOG.debug("Getting port: %s" % x.uuid)
+                port_dict = {}
+                port_dict["port-id"] = str(x.uuid)
+                port_dict["net-id"] = str(x.network_id)
+                port_dict["int-id"] = x.interface_id
+                port_dict["state"] = x.state
+                port_dict["net"] = x.network
+                ports.append(port_dict)
+            return ports
+        except Exception, e:
+            LOG.error("Failed to get all ports: %s" % str(e))
+
+    def get_port(self, net_id, port_id):
+        port = []
+        x = db.port_get(net_id, port_id)
+        try:
+            LOG.debug("Getting port: %s" % x.uuid)
+            port_dict = {}
+            port_dict["port-id"] = str(x.uuid)
+            port_dict["net-id"] = str(x.network_id)
+            port_dict["int-id"] = x.interface_id
+            port_dict["state"] = x.state
+            port.append(port_dict)
+            return port
+        except Exception, e:
+            LOG.error("Failed to get port: %s" % str(e))
+
+    def create_port(self, net_id):
+        port_dict = {}
+        try:
+            port = db.port_create(net_id)
+            LOG.debug("Creating port %s" % port.uuid)
+            port_dict["port-id"] = str(port.uuid)
+            port_dict["net-id"] = str(port.network_id)
+            port_dict["int-id"] = port.interface_id
+            port_dict["state"] = port.state
+            return port_dict
+        except Exception, e:
+            LOG.error("Failed to create port: %s" % str(e))
+
+    def delete_port(self, net_id, port_id):
+        try:
+            port = db.port_destroy(net_id, port_id)
+            LOG.debug("Deleted port %s" % port.uuid)
+            port_dict = {}
+            port_dict["port-id"] = str(port.uuid)
+            return port_dict
+        except Exception, e:
+            raise Exception("Failed to delete port: %s" % str(e))
+
+    def update_port(self, net_id, port_id, port_state):
+        try:
+            port = db.port_set_state(net_id, port_id, port_state)
+            LOG.debug("Updated port %s" % port.uuid)
+            port_dict = {}
+            port_dict["port-id"] = str(port.uuid)
+            port_dict["net-id"] = str(port.network_id)
+            port_dict["int-id"] = port.interface_id
+            port_dict["state"] = port.state
+            return port_dict
+        except Exception, e:
+            raise Exception("Failed to update port state: %s" % str(e))
+
+    def plug_interface(self, net_id, port_id, int_id):
+        try:
+            port = db.port_set_attachment(net_id, port_id, int_id)
+            LOG.debug("Attached interface to port %s" % port.uuid)
+            port_dict = {}
+            port_dict["port-id"] = str(port.uuid)
+            port_dict["net-id"] = str(port.network_id)
+            port_dict["int-id"] = port.interface_id
+            port_dict["state"] = port.state
+            return port_dict
+        except Exception, e:
+            raise Exception("Failed to plug interface: %s" % str(e))
+
+    def unplug_interface(self, net_id, port_id):
+        try:
+            port = db.port_unset_attachment(net_id, port_id)
+            LOG.debug("Detached interface from port %s" % port.uuid)
+            port_dict = {}
+            port_dict["port-id"] = str(port.uuid)
+            port_dict["net-id"] = str(port.network_id)
+            port_dict["int-id"] = port.interface_id
+            port_dict["state"] = port.state
+            return port_dict
+        except Exception, e:
+            raise Exception("Failed to unplug interface: %s" % str(e))
+
+
+class L2networkDBTest(unittest.TestCase):
+    def setUp(self):
+        self.dbtest = L2networkDB()
+        self.quantum = QuantumDB()
+        LOG.debug("Setup")
+
+    def testACreateVlanBinding(self):
+        net1 = self.quantum.create_network("t1", "netid1")
+        vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
+        self.assertTrue(vlan1["vlan-id"] == "10")
+        self.tearDownVlanBinding()
+        self.tearDownNetwork()
+
+    def testBGetAllVlanBindings(self):
+        net1 = self.quantum.create_network("t1", "netid1")
+        net2 = self.quantum.create_network("t1", "netid2")
+        vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
+        vlan2 = self.dbtest.create_vlan_binding(20, "vlan2", net2["net-id"])
+        vlans = self.dbtest.get_all_vlan_bindings()
+        count = 0
+        for x in vlans:
+            if "vlan" in x["vlan-name"]:
+                count += 1
+        self.assertTrue(count == 2)
+        self.tearDownVlanBinding()
+        self.tearDownNetwork()
+
+    def testCDeleteVlanBinding(self):
+        net1 = self.quantum.create_network("t1", "netid1")
+        vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
+        self.dbtest.delete_vlan_binding(net1["net-id"])
+        vlans = self.dbtest.get_all_vlan_bindings()
+        count = 0
+        for x in vlans:
+            if "vlan " in x["vlan-name"]:
+                count += 1
+        self.assertTrue(count == 0)
+        self.tearDownVlanBinding()
+        self.tearDownNetwork()
+
+    def testDUpdateVlanBinding(self):
+        net1 = self.quantum.create_network("t1", "netid1")
+        vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
+        vlan1 = self.dbtest.update_vlan_binding(net1["net-id"], 11, "newvlan1")
+        vlans = self.dbtest.get_all_vlan_bindings()
+        count = 0
+        for x in vlans:
+            if "new" in x["vlan-name"]:
+                count += 1
+        self.assertTrue(count == 1)
+        self.tearDownVlanBinding()
+        self.tearDownNetwork()
+
+    def testICreatePortProfile(self):
+        pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1")
+        self.assertTrue(pp1["portprofile-name"] == "portprofile1")
+        self.tearDownPortProfile()
+        self.tearDownNetwork()
+
+    def testJGetAllPortProfile(self):
+        pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1")
+        pp2 = self.dbtest.create_portprofile("t1", "portprofile2", 20, "qos2")
+        pps = self.dbtest.get_all_portprofiles()
+        count = 0
+        for x in pps:
+            if "portprofile" in x["portprofile-name"]:
+                count += 1
+        self.assertTrue(count == 2)
+        self.tearDownPortProfile()
+
+    def testKDeletePortProfile(self):
+        pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1")
+        self.dbtest.delete_portprofile("t1", pp1["portprofile-id"])
+        pps = self.dbtest.get_all_portprofiles()
+        count = 0
+        for x in pps:
+            if "portprofile " in x["portprofile-name"]:
+                count += 1
+        self.assertTrue(count == 0)
+        self.tearDownPortProfile()
+
+    def testLUpdatePortProfile(self):
+        pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1")
+        pp1 = self.dbtest.update_portprofile("t1", pp1["portprofile-id"], \
+                                          "newportprofile1", 20, "qos2")
+        pps = self.dbtest.get_all_portprofiles()
+        count = 0
+        for x in pps:
+            if "new" in x["portprofile-name"]:
+                count += 1
+        self.assertTrue(count == 1)
+        self.tearDownPortProfile()
+
+    def testMCreatePortProfileBinding(self):
+        net1 = self.quantum.create_network("t1", "netid1")
+        port1 = self.quantum.create_port(net1["net-id"])
+        pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1")
+        pp_binding1 = self.dbtest.create_pp_binding("t1", port1["port-id"], \
+                                              pp1["portprofile-id"], "0")
+        self.assertTrue(pp_binding1["tenant-id"] == "t1")
+        self.tearDownPortProfileBinding()
+        self.tearDownPort()
+        self.tearDownNetwork()
+        self.tearDownPortProfile()
+
+    def testNGetAllPortProfileBinding(self):
+        net1 = self.quantum.create_network("t1", "netid1")
+        port1 = self.quantum.create_port(net1["net-id"])
+        port2 = self.quantum.create_port(net1["net-id"])
+        pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1")
+        pp2 = self.dbtest.create_portprofile("t1", "portprofile2", 20, "qos2")
+        pp_binding1 = self.dbtest.create_pp_binding("t1", port1["port-id"], \
+                                               pp1["portprofile-id"], "0")
+        pp_binding2 = self.dbtest.create_pp_binding("t1", port2["port-id"], \
+                                               pp2["portprofile-id"], "0")
+        pp_bindings = self.dbtest.get_all_pp_bindings()
+        count = 0
+        for x in pp_bindings:
+            if "t1" in x["tenant-id"]:
+                count += 1
+        self.assertTrue(count == 2)
+        self.tearDownPortProfileBinding()
+        self.tearDownPort()
+        self.tearDownNetwork()
+        self.tearDownPortProfile()
+
+    def testODeletePortProfileBinding(self):
+        net1 = self.quantum.create_network("t1", "netid1")
+        port1 = self.quantum.create_port(net1["net-id"])
+        pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1")
+        pp_binding1 = self.dbtest.create_pp_binding("t1", port1["port-id"], \
+                                                pp1["portprofile-id"], "0")
+        self.dbtest.delete_pp_binding("t1", port1["port-id"], \
+                                      pp_binding1["portprofile-id"])
+        pp_bindings = self.dbtest.get_all_pp_bindings()
+        count = 0
+        for x in pp_bindings:
+            if "t1 " in x["tenant-id"]:
+                count += 1
+        self.assertTrue(count == 0)
+        self.tearDownPortProfileBinding()
+        self.tearDownPort()
+        self.tearDownNetwork()
+        self.tearDownPortProfile()
+
+    def testPUpdatePortProfileBinding(self):
+        net1 = self.quantum.create_network("t1", "netid1")
+        port1 = self.quantum.create_port(net1["net-id"])
+        pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1")
+        pp_binding1 = self.dbtest.create_pp_binding("t1", port1["port-id"], \
+                                                pp1["portprofile-id"], "0")
+        pp_binding1 = self.dbtest.update_pp_binding("t1", \
+                      pp1["portprofile-id"], "newt1", port1["port-id"], "1")
+        pp_bindings = self.dbtest.get_all_pp_bindings()
+        count = 0
+        for x in pp_bindings:
+            if "new" in x["tenant-id"]:
+                count += 1
+        self.assertTrue(count == 1)
+        self.tearDownPortProfileBinding()
+        self.tearDownPort()
+        self.tearDownNetwork()
+        self.tearDownPortProfile()
+
+    def testQtest_vlanids(self):
+        l2network_db.create_vlanids()
+        vlanids = l2network_db.get_all_vlanids()
+        self.assertTrue(len(vlanids) > 0)
+        vlanid = l2network_db.reserve_vlanid()
+        used = l2network_db.is_vlanid_used(vlanid)
+        self.assertTrue(used == True)
+        used = l2network_db.release_vlanid(vlanid)
+        self.assertTrue(used == False)
+        self.tearDownVlanID()
+
+    def tearDownNetwork(self):
+        LOG.debug("Tearing Down Network")
+        nets = self.quantum.get_all_networks("t1")
+        for net in nets:
+            id = net["net-id"]
+            self.quantum.delete_network(id)
+
+    def tearDownPort(self):
+        LOG.debug("Tearing Down Port")
+        nets = self.quantum.get_all_networks("t1")
+        for net in nets:
+            id = net["net-id"]
+            ports = self.quantum.get_all_ports(id)
+            for port in ports:
+                portid = port["port-id"]
+                self.quantum.delete_port(id, portid)
+
+    def tearDownVlanBinding(self):
+        LOG.debug("Tearing Down Vlan Binding")
+        vlans = self.dbtest.get_all_vlan_bindings()
+        for vlan in vlans:
+            id = vlan["net-id"]
+            self.dbtest.delete_vlan_binding(id)
+
+    def tearDownPortProfile(self):
+        LOG.debug("Tearing Down Port Profile")
+        pps = self.dbtest.get_all_portprofiles()
+        for pp in pps:
+            id = pp["portprofile-id"]
+            self.dbtest.delete_portprofile("t1", id)
+
+    def tearDownPortProfileBinding(self):
+        LOG.debug("Tearing Down Port Profile Binding")
+        pp_bindings = self.dbtest.get_all_pp_bindings()
+        for pp_binding in pp_bindings:
+            id = pp_binding["portprofile-id"]
+            portid = pp_binding["port-id"]
+            self.dbtest.delete_pp_binding("t1", portid, id)
+
+    def tearDownVlanID(self):
+        LOG.debug("Tearing Down Vlan IDs")
+        vlanids = l2network_db.get_all_vlanids()
+        for vlanid in vlanids:
+            vlan_id = vlanid["vlan_id"]
+            l2network_db.delete_vlanid(vlan_id)
+
+
+class QuantumDBTest(unittest.TestCase):
+    def setUp(self):
+        self.dbtest = QuantumDB()
+        self.tenant_id = "t1"
+        LOG.debug("Setup")
+
+    def testACreateNetwork(self):
+        net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+        self.assertTrue(net1["net-name"] == "plugin_test1")
+        self.tearDownNetworkPort()
+
+    def testBGetNetworks(self):
+        net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+        net2 = self.dbtest.create_network(self.tenant_id, "plugin_test2")
+        nets = self.dbtest.get_all_networks(self.tenant_id)
+        count = 0
+        for x in nets:
+            if "plugin_test" in x["net-name"]:
+                count += 1
+        self.assertTrue(count == 2)
+        self.tearDownNetworkPort()
+
+    def testCDeleteNetwork(self):
+        net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+        self.dbtest.delete_network(net1["net-id"])
+        nets = self.dbtest.get_all_networks(self.tenant_id)
+        count = 0
+        for x in nets:
+            if "plugin_test1" in x["net-name"]:
+                count += 1
+        self.assertTrue(count == 0)
+        self.tearDownNetworkPort()
+
+    def testDRenameNetwork(self):
+        net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+        net = self.dbtest.rename_network(self.tenant_id, net1["net-id"],
+          "plugin_test1_renamed")
+        self.assertTrue(net["net-name"] == "plugin_test1_renamed")
+        self.tearDownNetworkPort()
+
+    def testECreatePort(self):
+        net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+        port = self.dbtest.create_port(net1["net-id"])
+        ports = self.dbtest.get_all_ports(net1["net-id"])
+        count = 0
+        for p in ports:
+            count += 1
+        self.assertTrue(count == 1)
+        self.tearDownNetworkPort()
+
+    def testFDeletePort(self):
+        net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+        port = self.dbtest.create_port(net1["net-id"])
+        ports = self.dbtest.get_all_ports(net1["net-id"])
+        count = 0
+        for p in ports:
+            count += 1
+        self.assertTrue(count == 1)
+        for p in ports:
+            self.dbtest.delete_port(net1["net-id"], p["port-id"])
+        ports = self.dbtest.get_all_ports(net1["net-id"])
+        count = 0
+        for p in ports:
+            count += 1
+        self.assertTrue(count == 0)
+        self.tearDownNetworkPort()
+
+    def testGPlugUnPlugInterface(self):
+        net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+        port1 = self.dbtest.create_port(net1["net-id"])
+        self.dbtest.plug_interface(net1["net-id"], port1["port-id"], "vif1.1")
+        port = self.dbtest.get_port(net1["net-id"], port1["port-id"])
+        self.assertTrue(port[0]["int-id"] == "vif1.1")
+        self.dbtest.unplug_interface(net1["net-id"], port1["port-id"])
+        port = self.dbtest.get_port(net1["net-id"], port1["port-id"])
+        self.assertTrue(port[0]["int-id"] == None)
+        self.tearDownNetworkPort()
+
+    def testIJoinedTest(self):
+        net1 = self.dbtest.create_network("t1", "net1")
+        port1 = self.dbtest.create_port(net1["net-id"])
+        port2 = self.dbtest.create_port(net1["net-id"])
+        ports = self.dbtest.get_all_ports(net1["net-id"])
+        for port in ports:
+            net = port["net"]
+            LOG.debug("Port id %s Net id %s" % (port["port-id"], net.uuid))
+        self.tearDownJoinedTest()
+
+    def tearDownNetworkPort(self):
+        networks = self.dbtest.get_all_networks(self.tenant_id)
+        for net in networks:
+            id = net["net-id"]
+            name = net["net-name"]
+            if "plugin_test" in name:
+                # Clean up any test ports lying around
+                ports = self.dbtest.get_all_ports(id)
+                for p in ports:
+                    self.dbtest.delete_port(id, p["port-id"])
+                self.dbtest.delete_network(id)
+
+    def tearDownJoinedTest(self):
+        LOG.debug("Tearing Down Network and Ports")
+        nets = self.dbtest.get_all_networks("t1")
+        for net in nets:
+            id = net["net-id"]
+            ports = self.dbtest.get_all_ports(id)
+            for port in ports:
+                self.dbtest.delete_port(port["net-id"], port["port-id"])
+            self.dbtest.delete_network(id)
+
+
+if __name__ == "__main__":
+    usagestr = "Usage: %prog [OPTIONS] <command> [args]"
+    parser = OptionParser(usage=usagestr)
+    parser.add_option("-v", "--verbose", dest="verbose",
+      action="store_true", default=False, help="turn on verbose logging")
+
+    options, args = parser.parse_args()
+
+    if options.verbose:
+        LOG.basicConfig(level=LOG.DEBUG)
+    else:
+        LOG.basicConfig(level=LOG.WARN)
+
+    l2network_db.initialize()
+
+    # Run the tests
+    suite = unittest.TestLoader().loadTestsFromTestCase(QuantumDBTest)
+    unittest.TextTestRunner(verbosity=2).run(suite)
+    suite = unittest.TestLoader().loadTestsFromTestCase(L2networkDBTest)
+    unittest.TextTestRunner(verbosity=2).run(suite)
index 81f221f0393fe36151bd080e6dcaa256d1c98a66..36ac116215d487f8da324be0da39d9116484a5d7 100644 (file)
@@ -15,7 +15,7 @@
 #    under the License.
 #
 # @author: Sumit Naiksatam, Cisco Systems, Inc.
-#
+# @author: Rohit Agarwalla, Cisco Systems, Inc.
 
 import os
 
@@ -49,6 +49,16 @@ cp = confp.CiscoConfigParser(os.path.dirname(os.path.realpath(__file__)) \
                              + "/" + CONF_FILE)
 plugins = cp.walk(cp.dummy)
 
+CONF_FILE = "conf/db_conn.ini"
+
+cp = confp.CiscoConfigParser(os.path.dirname(os.path.realpath(__file__)) \
+                             + "/" + CONF_FILE)
+
+section = cp['DATABASE']
+DB_NAME = section['name']
+DB_USER = section['user']
+DB_PASS = section['pass']
+DB_HOST = section['host']
 
 def main():
     print plugins['PLUGINS']