From: rohitagarwalla Date: Mon, 15 Aug 2011 16:51:58 +0000 (-0700) Subject: pylint changes - pylint score for cisco/db folder - 8.27/10 X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=bc296bb35cbbe433a37b8d74b7ae863e3474ff53;p=openstack-build%2Fneutron-build.git pylint changes - pylint score for cisco/db folder - 8.27/10 pep8 checks done --- diff --git a/quantum/plugins/cisco/db/l2network_db.py b/quantum/plugins/cisco/db/l2network_db.py index 119900bd6..0c2907391 100644 --- a/quantum/plugins/cisco/db/l2network_db.py +++ b/quantum/plugins/cisco/db/l2network_db.py @@ -15,10 +15,7 @@ # under the License. # @author: Rohit Agarwalla, Cisco Systems, Inc. -import logging as LOG -import os - -from sqlalchemy.orm import exc, joinedload +from sqlalchemy.orm import exc from quantum.common import exceptions as q_exc from quantum.plugins.cisco import l2network_plugin_configuration as conf @@ -28,7 +25,8 @@ import l2network_models import quantum.plugins.cisco.db.api as db -def initialize(configfile=None): +def initialize(): + 'Establish database connection and load models' options = {"sql_connection": "mysql://%s:%s@%s/%s" % (conf.DB_USER, conf.DB_PASS, conf.DB_HOST, conf.DB_NAME)} db.configure_db(options) @@ -54,6 +52,7 @@ def create_vlanids(): def get_all_vlanids(): + """Gets all the vlanids""" session = db.get_session() try: vlanids = session.query(l2network_models.VlanID).\ @@ -64,6 +63,7 @@ def get_all_vlanids(): def is_vlanid_used(vlan_id): + """Checks if a vlanid is in use""" session = db.get_session() try: vlanid = session.query(l2network_models.VlanID).\ @@ -75,6 +75,7 @@ def is_vlanid_used(vlan_id): def release_vlanid(vlan_id): + """Sets the vlanid state to be unused""" session = db.get_session() try: vlanid = session.query(l2network_models.VlanID).\ @@ -90,6 +91,7 @@ def release_vlanid(vlan_id): def delete_vlanid(vlan_id): + """Deletes a vlanid entry from db""" session = db.get_session() try: vlanid = session.query(l2network_models.VlanID).\ @@ -103,6 +105,7 @@ def delete_vlanid(vlan_id): def reserve_vlanid(): + """Reserves the first unused vlanid""" session = db.get_session() try: vlanids = session.query(l2network_models.VlanID).\ @@ -117,7 +120,7 @@ def reserve_vlanid(): session.flush() return vlanids[0]["vlan_id"] except exc.NoResultFound: - raise VlanIDNotAvailable() + raise c_exc.VlanIDNotAvailable() def get_all_vlan_bindings(): diff --git a/quantum/plugins/cisco/db/l2network_models.py b/quantum/plugins/cisco/db/l2network_models.py index d04f7dd6f..8bc29d398 100644 --- a/quantum/plugins/cisco/db/l2network_models.py +++ b/quantum/plugins/cisco/db/l2network_models.py @@ -18,7 +18,6 @@ import uuid from sqlalchemy import Column, Integer, String, ForeignKey, Boolean -from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relation, object_mapper from quantum.plugins.cisco.db.models import BASE @@ -30,19 +29,24 @@ class L2NetworkBase(object): __table_args__ = {'mysql_engine': 'InnoDB'} def __setitem__(self, key, value): + """Internal Dict set method""" setattr(self, key, value) def __getitem__(self, key): + """Internal Dict get method""" return getattr(self, key) def get(self, key, default=None): + """Dict get method""" return getattr(self, key, default) def __iter__(self): + """Iterate over table columns""" self._i = iter(object_mapper(self).columns) return self def next(self): + """Next method for the iterator""" n = self._i.next().name return n, getattr(self, n) @@ -52,7 +56,7 @@ class L2NetworkBase(object): setattr(self, k, v) def iteritems(self): - """Make the model object behave like a dict. + """Make the model object behave like a dict" Includes attributes from joins.""" local = dict(self) joined = dict([(k, v) for k, v in self.__dict__.iteritems() diff --git a/quantum/plugins/cisco/db/test_database.py b/quantum/plugins/cisco/db/test_database.py index 88c1bb624..12668b605 100644 --- a/quantum/plugins/cisco/db/test_database.py +++ b/quantum/plugins/cisco/db/test_database.py @@ -15,8 +15,6 @@ # under the License. # @author: Rohit Agarwalla, Cisco Systems, Inc. -import ConfigParser -import os import logging as LOG import unittest @@ -25,42 +23,47 @@ from quantum.plugins.cisco.common import cisco_constants as const import quantum.plugins.cisco.db.api as db import quantum.plugins.cisco.db.l2network_db as l2network_db -import quantum.plugins.cisco.db.l2network_models LOG.getLogger(const.LOGGER_COMPONENT_NAME) class L2networkDB(object): + """Class conisting of methods to call L2network db methods""" def get_all_vlan_bindings(self): + """Get all vlan binding into a list of dict""" vlans = [] try: - for x in l2network_db.get_all_vlan_bindings(): - LOG.debug("Getting vlan bindings for vlan: %s" % x.vlan_id) + for vlan_bind in l2network_db.get_all_vlan_bindings(): + LOG.debug("Getting vlan bindings for vlan: %s" % \ + vlan_bind.vlan_id) vlan_dict = {} - vlan_dict["vlan-id"] = str(x.vlan_id) - vlan_dict["vlan-name"] = x.vlan_name - vlan_dict["net-id"] = str(x.network_id) + vlan_dict["vlan-id"] = str(vlan_bind.vlan_id) + vlan_dict["vlan-name"] = vlan_bind.vlan_name + vlan_dict["net-id"] = str(vlan_bind.network_id) vlans.append(vlan_dict) except Exception, e: LOG.error("Failed to get all vlan bindings: %s" % str(e)) return vlans def get_vlan_binding(self, network_id): + """Get a vlan binding""" vlan = [] try: - for x in l2network_db.get_vlan_binding(network_id): - LOG.debug("Getting vlan binding for vlan: %s" % x.vlan_id) + for vlan_bind in l2network_db.get_vlan_binding(network_id): + LOG.debug("Getting vlan binding for vlan: %s" \ + % vlan_bind.vlan_id) vlan_dict = {} - vlan_dict["vlan-id"] = str(x.vlan_id) - vlan_dict["vlan-name"] = x.vlan_name - vlan_dict["net-id"] = str(x.network_id) + vlan_dict["vlan-id"] = str(vlan_bind.vlan_id) + vlan_dict["vlan-name"] = vlan_bind.vlan_name + vlan_dict["net-id"] = str(vlan_bind.network_id) vlan.append(vlan_dict) except Exception, e: LOG.error("Failed to get vlan binding: %s" % str(e)) return vlan def create_vlan_binding(self, vlan_id, vlan_name, network_id): + """Create a vlan binding""" vlan_dict = {} try: res = l2network_db.add_vlan_binding(vlan_id, vlan_name, network_id) @@ -73,6 +76,7 @@ class L2networkDB(object): LOG.error("Failed to create vlan binding: %s" % str(e)) def delete_vlan_binding(self, network_id): + """Delete a vlan binding""" try: res = l2network_db.remove_vlan_binding(network_id) LOG.debug("Deleted vlan binding for vlan: %s" % res.vlan_id) @@ -83,6 +87,7 @@ class L2networkDB(object): raise Exception("Failed to delete vlan binding: %s" % str(e)) def update_vlan_binding(self, network_id, vlan_id, vlan_name): + """Update a vlan binding""" try: res = l2network_db.update_vlan_binding(network_id, vlan_id, \ vlan_name) @@ -96,36 +101,39 @@ class L2networkDB(object): raise Exception("Failed to update vlan binding: %s" % str(e)) def get_all_portprofiles(self): + """Get all portprofiles""" pps = [] try: - for x in l2network_db.get_all_portprofiles(): - LOG.debug("Getting port profile : %s" % x.uuid) + for portprof in l2network_db.get_all_portprofiles(): + LOG.debug("Getting port profile : %s" % portprof.uuid) pp_dict = {} - pp_dict["portprofile-id"] = str(x.uuid) - pp_dict["portprofile-name"] = x.name - pp_dict["vlan-id"] = str(x.vlan_id) - pp_dict["qos"] = x.qos + pp_dict["portprofile-id"] = str(portprof.uuid) + pp_dict["portprofile-name"] = portprof.name + pp_dict["vlan-id"] = str(portprof.vlan_id) + pp_dict["qos"] = portprof.qos pps.append(pp_dict) except Exception, e: LOG.error("Failed to get all port profiles: %s" % str(e)) return pps def get_portprofile(self, tenant_id, pp_id): + """Get a portprofile""" pp = [] try: - for x in l2network_db.get_portprofile(tenant_id, pp_id): - LOG.debug("Getting port profile : %s" % x.uuid) + for portprof in l2network_db.get_portprofile(tenant_id, pp_id): + LOG.debug("Getting port profile : %s" % portprof.uuid) pp_dict = {} - pp_dict["portprofile-id"] = str(x.uuid) - pp_dict["portprofile-name"] = x.name - pp_dict["vlan-id"] = str(x.vlan_id) - pp_dict["qos"] = x.qos + pp_dict["portprofile-id"] = str(portprof.uuid) + pp_dict["portprofile-name"] = portprof.name + pp_dict["vlan-id"] = str(portprof.vlan_id) + pp_dict["qos"] = portprof.qos pp.append(pp_dict) except Exception, e: LOG.error("Failed to get port profile: %s" % str(e)) return pp def create_portprofile(self, tenant_id, name, vlan_id, qos): + """Create a portprofile""" pp_dict = {} try: res = l2network_db.add_portprofile(tenant_id, name, vlan_id, qos) @@ -139,6 +147,7 @@ class L2networkDB(object): LOG.error("Failed to create port profile: %s" % str(e)) def delete_portprofile(self, tenant_id, pp_id): + """Delete a portprofile""" try: res = l2network_db.remove_portprofile(tenant_id, pp_id) LOG.debug("Deleted port profile : %s" % res.uuid) @@ -149,6 +158,7 @@ class L2networkDB(object): raise Exception("Failed to delete port profile: %s" % str(e)) def update_portprofile(self, tenant_id, pp_id, name, vlan_id, qos): + """Update a portprofile""" try: res = l2network_db.update_portprofile(tenant_id, pp_id, name, vlan_id, qos) @@ -163,38 +173,41 @@ class L2networkDB(object): raise Exception("Failed to update port profile: %s" % str(e)) def get_all_pp_bindings(self): + """Get all portprofile bindings""" pp_bindings = [] try: - for x in l2network_db.get_all_pp_bindings(): + for pp_bind in l2network_db.get_all_pp_bindings(): LOG.debug("Getting port profile binding: %s" % \ - x.portprofile_id) + pp_bind.portprofile_id) ppbinding_dict = {} - ppbinding_dict["portprofile-id"] = str(x.portprofile_id) - ppbinding_dict["port-id"] = str(x.port_id) - ppbinding_dict["tenant-id"] = x.tenant_id - ppbinding_dict["default"] = x.default + ppbinding_dict["portprofile-id"] = str(pp_bind.portprofile_id) + ppbinding_dict["port-id"] = str(pp_bind.port_id) + ppbinding_dict["tenant-id"] = pp_bind.tenant_id + ppbinding_dict["default"] = pp_bind.default pp_bindings.append(ppbinding_dict) except Exception, e: LOG.error("Failed to get all port profiles: %s" % str(e)) return pp_bindings def get_pp_binding(self, tenant_id, pp_id): + """Get a portprofile binding""" pp_binding = [] try: - for x in l2network_db.get_pp_binding(tenant_id, pp_id): + for pp_bind in l2network_db.get_pp_binding(tenant_id, pp_id): LOG.debug("Getting port profile binding: %s" % \ - x.portprofile_id) + pp_bind.portprofile_id) ppbinding_dict = {} - ppbinding_dict["portprofile-id"] = str(x.portprofile_id) - ppbinding_dict["port-id"] = str(x.port_id) - ppbinding_dict["tenant-id"] = x.tenant_id - ppbinding_dict["default"] = x.default - pp_bindings.append(ppbinding_dict) + ppbinding_dict["portprofile-id"] = str(pp_bind.portprofile_id) + ppbinding_dict["port-id"] = str(pp_bind.port_id) + ppbinding_dict["tenant-id"] = pp_bind.tenant_id + ppbinding_dict["default"] = pp_bind.default + pp_binding.append(ppbinding_dict) except Exception, e: LOG.error("Failed to get port profile binding: %s" % str(e)) return pp_binding def create_pp_binding(self, tenant_id, port_id, pp_id, default): + """Add a portprofile binding""" ppbinding_dict = {} try: res = l2network_db.add_pp_binding(tenant_id, port_id, pp_id, \ @@ -209,6 +222,7 @@ class L2networkDB(object): LOG.error("Failed to create port profile binding: %s" % str(e)) def delete_pp_binding(self, tenant_id, port_id, pp_id): + """Delete a portprofile binding""" try: res = l2network_db.remove_pp_binding(tenant_id, port_id, pp_id) LOG.debug("Deleted port profile binding : %s" % res.portprofile_id) @@ -220,6 +234,7 @@ class L2networkDB(object): def update_pp_binding(self, tenant_id, pp_id, newtenant_id, \ port_id, default): + """Update portprofile binding""" try: res = l2network_db.update_pp_binding(tenant_id, pp_id, newtenant_id, port_id, default) @@ -235,35 +250,39 @@ class L2networkDB(object): class QuantumDB(object): + """Class conisting of methods to call Quantum db methods""" def get_all_networks(self, tenant_id): + """Get all networks""" nets = [] try: - for x in db.network_list(tenant_id): - LOG.debug("Getting network: %s" % x.uuid) + for net in db.network_list(tenant_id): + LOG.debug("Getting network: %s" % net.uuid) net_dict = {} - net_dict["tenant-id"] = x.tenant_id - net_dict["net-id"] = str(x.uuid) - net_dict["net-name"] = x.name + net_dict["tenant-id"] = net.tenant_id + net_dict["net-id"] = str(net.uuid) + net_dict["net-name"] = net.name nets.append(net_dict) except Exception, e: LOG.error("Failed to get all networks: %s" % str(e)) return nets def get_network(self, network_id): + """Get a network""" net = [] try: - for x in db.network_get(network_id): - LOG.debug("Getting network: %s" % x.uuid) + for net in db.network_get(network_id): + LOG.debug("Getting network: %s" % net.uuid) net_dict = {} - net_dict["tenant-id"] = x.tenant_id - net_dict["net-id"] = str(x.uuid) - net_dict["net-name"] = x.name - nets.append(net_dict) + net_dict["tenant-id"] = net.tenant_id + net_dict["net-id"] = str(net.uuid) + net_dict["net-name"] = net.name + net.append(net_dict) except Exception, e: LOG.error("Failed to get network: %s" % str(e)) return net def create_network(self, tenant_id, net_name): + """Create a network""" net_dict = {} try: res = db.network_create(tenant_id, net_name) @@ -276,6 +295,7 @@ class QuantumDB(object): LOG.error("Failed to create network: %s" % str(e)) def delete_network(self, net_id): + """Delete a network""" try: net = db.network_destroy(net_id) LOG.debug("Deleted network: %s" % net.uuid) @@ -286,6 +306,7 @@ class QuantumDB(object): raise Exception("Failed to delete port: %s" % str(e)) def rename_network(self, tenant_id, net_id, new_name): + """Rename a network""" try: net = db.network_rename(tenant_id, net_id, new_name) LOG.debug("Renamed network: %s" % net.uuid) @@ -297,37 +318,40 @@ class QuantumDB(object): raise Exception("Failed to rename network: %s" % str(e)) def get_all_ports(self, net_id): + """Get all ports""" ports = [] try: - for x in db.port_list(net_id): - LOG.debug("Getting port: %s" % x.uuid) + for port in db.port_list(net_id): + LOG.debug("Getting port: %s" % port.uuid) port_dict = {} - port_dict["port-id"] = str(x.uuid) - port_dict["net-id"] = str(x.network_id) - port_dict["int-id"] = x.interface_id - port_dict["state"] = x.state - port_dict["net"] = x.network + port_dict["port-id"] = str(port.uuid) + port_dict["net-id"] = str(port.network_id) + port_dict["int-id"] = port.interface_id + port_dict["state"] = port.state + port_dict["net"] = port.network ports.append(port_dict) return ports except Exception, e: LOG.error("Failed to get all ports: %s" % str(e)) def get_port(self, net_id, port_id): - port = [] - x = db.port_get(net_id, port_id) + """Get a port""" + port_list = [] + port = db.port_get(net_id, port_id) try: - LOG.debug("Getting port: %s" % x.uuid) + LOG.debug("Getting port: %s" % port.uuid) port_dict = {} - port_dict["port-id"] = str(x.uuid) - port_dict["net-id"] = str(x.network_id) - port_dict["int-id"] = x.interface_id - port_dict["state"] = x.state - port.append(port_dict) - return port + port_dict["port-id"] = str(port.uuid) + port_dict["net-id"] = str(port.network_id) + port_dict["int-id"] = port.interface_id + port_dict["state"] = port.state + port_list.append(port_dict) + return port_list except Exception, e: LOG.error("Failed to get port: %s" % str(e)) def create_port(self, net_id): + """Add a port""" port_dict = {} try: port = db.port_create(net_id) @@ -341,6 +365,7 @@ class QuantumDB(object): LOG.error("Failed to create port: %s" % str(e)) def delete_port(self, net_id, port_id): + """Delete a port""" try: port = db.port_destroy(net_id, port_id) LOG.debug("Deleted port %s" % port.uuid) @@ -351,6 +376,7 @@ class QuantumDB(object): raise Exception("Failed to delete port: %s" % str(e)) def update_port(self, net_id, port_id, port_state): + """Update a port""" try: port = db.port_set_state(net_id, port_id, port_state) LOG.debug("Updated port %s" % port.uuid) @@ -364,6 +390,7 @@ class QuantumDB(object): raise Exception("Failed to update port state: %s" % str(e)) def plug_interface(self, net_id, port_id, int_id): + """Plug interface to a port""" try: port = db.port_set_attachment(net_id, port_id, int_id) LOG.debug("Attached interface to port %s" % port.uuid) @@ -377,6 +404,7 @@ class QuantumDB(object): raise Exception("Failed to plug interface: %s" % str(e)) def unplug_interface(self, net_id, port_id): + """Unplug interface to a port""" try: port = db.port_unset_attachment(net_id, port_id) LOG.debug("Detached interface from port %s" % port.uuid) @@ -391,12 +419,15 @@ class QuantumDB(object): class L2networkDBTest(unittest.TestCase): + """Class conisting of L2network DB unit tests""" def setUp(self): + """Setup for tests""" self.dbtest = L2networkDB() self.quantum = QuantumDB() LOG.debug("Setup") def testACreateVlanBinding(self): + """test add vlan binding""" net1 = self.quantum.create_network("t1", "netid1") vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"]) self.assertTrue(vlan1["vlan-id"] == "10") @@ -404,86 +435,102 @@ class L2networkDBTest(unittest.TestCase): self.tearDownNetwork() def testBGetAllVlanBindings(self): + """test get all vlan binding""" net1 = self.quantum.create_network("t1", "netid1") net2 = self.quantum.create_network("t1", "netid2") vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"]) + self.assertTrue(vlan1["vlan-id"] == "10") vlan2 = self.dbtest.create_vlan_binding(20, "vlan2", net2["net-id"]) + self.assertTrue(vlan2["vlan-id"] == "20") vlans = self.dbtest.get_all_vlan_bindings() count = 0 - for x in vlans: - if "vlan" in x["vlan-name"]: + for vlan in vlans: + if "vlan" in vlan["vlan-name"]: count += 1 self.assertTrue(count == 2) self.tearDownVlanBinding() self.tearDownNetwork() def testCDeleteVlanBinding(self): + """test delete vlan binding""" net1 = self.quantum.create_network("t1", "netid1") vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"]) + self.assertTrue(vlan1["vlan-id"] == "10") self.dbtest.delete_vlan_binding(net1["net-id"]) vlans = self.dbtest.get_all_vlan_bindings() count = 0 - for x in vlans: - if "vlan " in x["vlan-name"]: + for vlan in vlans: + if "vlan " in vlan["vlan-name"]: count += 1 self.assertTrue(count == 0) self.tearDownVlanBinding() self.tearDownNetwork() def testDUpdateVlanBinding(self): + """test update vlan binding""" net1 = self.quantum.create_network("t1", "netid1") vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"]) + self.assertTrue(vlan1["vlan-id"] == "10") vlan1 = self.dbtest.update_vlan_binding(net1["net-id"], 11, "newvlan1") vlans = self.dbtest.get_all_vlan_bindings() count = 0 - for x in vlans: - if "new" in x["vlan-name"]: + for vlan in vlans: + if "new" in vlan["vlan-name"]: count += 1 self.assertTrue(count == 1) self.tearDownVlanBinding() self.tearDownNetwork() def testICreatePortProfile(self): + """test add port profile""" pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1") self.assertTrue(pp1["portprofile-name"] == "portprofile1") self.tearDownPortProfile() self.tearDownNetwork() def testJGetAllPortProfile(self): + """test get all portprofiles""" pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1") + self.assertTrue(pp1["portprofile-name"] == "portprofile1") pp2 = self.dbtest.create_portprofile("t1", "portprofile2", 20, "qos2") + self.assertTrue(pp2["portprofile-name"] == "portprofile2") pps = self.dbtest.get_all_portprofiles() count = 0 - for x in pps: - if "portprofile" in x["portprofile-name"]: + for pprofile in pps: + if "portprofile" in pprofile["portprofile-name"]: count += 1 self.assertTrue(count == 2) self.tearDownPortProfile() def testKDeletePortProfile(self): + """test delete portprofile""" pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1") + self.assertTrue(pp1["portprofile-name"] == "portprofile1") self.dbtest.delete_portprofile("t1", pp1["portprofile-id"]) pps = self.dbtest.get_all_portprofiles() count = 0 - for x in pps: - if "portprofile " in x["portprofile-name"]: + for pprofile in pps: + if "portprofile " in pprofile["portprofile-name"]: count += 1 self.assertTrue(count == 0) self.tearDownPortProfile() def testLUpdatePortProfile(self): + """test update portprofile""" pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1") + self.assertTrue(pp1["portprofile-name"] == "portprofile1") pp1 = self.dbtest.update_portprofile("t1", pp1["portprofile-id"], \ "newportprofile1", 20, "qos2") pps = self.dbtest.get_all_portprofiles() count = 0 - for x in pps: - if "new" in x["portprofile-name"]: + for pprofile in pps: + if "new" in pprofile["portprofile-name"]: count += 1 self.assertTrue(count == 1) self.tearDownPortProfile() def testMCreatePortProfileBinding(self): + """test create portprofile binding""" net1 = self.quantum.create_network("t1", "netid1") port1 = self.quantum.create_port(net1["net-id"]) pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1") @@ -496,6 +543,7 @@ class L2networkDBTest(unittest.TestCase): self.tearDownPortProfile() def testNGetAllPortProfileBinding(self): + """test get all portprofile binding""" net1 = self.quantum.create_network("t1", "netid1") port1 = self.quantum.create_port(net1["net-id"]) port2 = self.quantum.create_port(net1["net-id"]) @@ -503,12 +551,14 @@ class L2networkDBTest(unittest.TestCase): pp2 = self.dbtest.create_portprofile("t1", "portprofile2", 20, "qos2") pp_binding1 = self.dbtest.create_pp_binding("t1", port1["port-id"], \ pp1["portprofile-id"], "0") + self.assertTrue(pp_binding1["tenant-id"] == "t1") pp_binding2 = self.dbtest.create_pp_binding("t1", port2["port-id"], \ pp2["portprofile-id"], "0") + self.assertTrue(pp_binding2["tenant-id"] == "t1") pp_bindings = self.dbtest.get_all_pp_bindings() count = 0 - for x in pp_bindings: - if "t1" in x["tenant-id"]: + for pp_bind in pp_bindings: + if "t1" in pp_bind["tenant-id"]: count += 1 self.assertTrue(count == 2) self.tearDownPortProfileBinding() @@ -517,17 +567,19 @@ class L2networkDBTest(unittest.TestCase): self.tearDownPortProfile() def testODeletePortProfileBinding(self): + """test delete portprofile binding""" net1 = self.quantum.create_network("t1", "netid1") port1 = self.quantum.create_port(net1["net-id"]) pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1") pp_binding1 = self.dbtest.create_pp_binding("t1", port1["port-id"], \ pp1["portprofile-id"], "0") + self.assertTrue(pp_binding1["tenant-id"] == "t1") self.dbtest.delete_pp_binding("t1", port1["port-id"], \ pp_binding1["portprofile-id"]) pp_bindings = self.dbtest.get_all_pp_bindings() count = 0 - for x in pp_bindings: - if "t1 " in x["tenant-id"]: + for pp_bind in pp_bindings: + if "t1 " in pp_bind["tenant-id"]: count += 1 self.assertTrue(count == 0) self.tearDownPortProfileBinding() @@ -536,17 +588,19 @@ class L2networkDBTest(unittest.TestCase): self.tearDownPortProfile() def testPUpdatePortProfileBinding(self): + """test update portprofile binding""" net1 = self.quantum.create_network("t1", "netid1") port1 = self.quantum.create_port(net1["net-id"]) pp1 = self.dbtest.create_portprofile("t1", "portprofile1", 10, "qos1") pp_binding1 = self.dbtest.create_pp_binding("t1", port1["port-id"], \ pp1["portprofile-id"], "0") + self.assertTrue(pp_binding1["tenant-id"] == "t1") pp_binding1 = self.dbtest.update_pp_binding("t1", \ pp1["portprofile-id"], "newt1", port1["port-id"], "1") pp_bindings = self.dbtest.get_all_pp_bindings() count = 0 - for x in pp_bindings: - if "new" in x["tenant-id"]: + for pp_bind in pp_bindings: + if "new" in pp_bind["tenant-id"]: count += 1 self.assertTrue(count == 1) self.tearDownPortProfileBinding() @@ -555,6 +609,7 @@ class L2networkDBTest(unittest.TestCase): self.tearDownPortProfile() def testQtest_vlanids(self): + """test vlanid methods""" l2network_db.create_vlanids() vlanids = l2network_db.get_all_vlanids() self.assertTrue(len(vlanids) > 0) @@ -566,45 +621,51 @@ class L2networkDBTest(unittest.TestCase): self.tearDownVlanID() def tearDownNetwork(self): + """tearDown Network table""" LOG.debug("Tearing Down Network") nets = self.quantum.get_all_networks("t1") for net in nets: - id = net["net-id"] - self.quantum.delete_network(id) + netid = net["net-id"] + self.quantum.delete_network(netid) def tearDownPort(self): + """tearDown Port table""" LOG.debug("Tearing Down Port") nets = self.quantum.get_all_networks("t1") for net in nets: - id = net["net-id"] - ports = self.quantum.get_all_ports(id) + netid = net["net-id"] + ports = self.quantum.get_all_ports(netid) for port in ports: portid = port["port-id"] - self.quantum.delete_port(id, portid) + self.quantum.delete_port(netid, portid) def tearDownVlanBinding(self): + """tearDown VlanBinding table""" LOG.debug("Tearing Down Vlan Binding") vlans = self.dbtest.get_all_vlan_bindings() for vlan in vlans: - id = vlan["net-id"] - self.dbtest.delete_vlan_binding(id) + netid = vlan["net-id"] + self.dbtest.delete_vlan_binding(netid) def tearDownPortProfile(self): + """tearDown PortProfile table""" LOG.debug("Tearing Down Port Profile") pps = self.dbtest.get_all_portprofiles() - for pp in pps: - id = pp["portprofile-id"] - self.dbtest.delete_portprofile("t1", id) + for pprofile in pps: + ppid = pprofile["portprofile-id"] + self.dbtest.delete_portprofile("t1", ppid) def tearDownPortProfileBinding(self): + """tearDown PortProfileBinding table""" LOG.debug("Tearing Down Port Profile Binding") pp_bindings = self.dbtest.get_all_pp_bindings() for pp_binding in pp_bindings: - id = pp_binding["portprofile-id"] + ppid = pp_binding["portprofile-id"] portid = pp_binding["port-id"] - self.dbtest.delete_pp_binding("t1", portid, id) + self.dbtest.delete_pp_binding("t1", portid, ppid) def tearDownVlanID(self): + """tearDown VlanID table""" LOG.debug("Tearing Down Vlan IDs") vlanids = l2network_db.get_all_vlanids() for vlanid in vlanids: @@ -613,73 +674,88 @@ class L2networkDBTest(unittest.TestCase): class QuantumDBTest(unittest.TestCase): + """Class conisting of Quantum DB unit tests""" def setUp(self): + """Setup for tests""" self.dbtest = QuantumDB() self.tenant_id = "t1" LOG.debug("Setup") def testACreateNetwork(self): + """test to create network""" net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1") self.assertTrue(net1["net-name"] == "plugin_test1") self.tearDownNetworkPort() def testBGetNetworks(self): + """test to get all networks""" net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1") + self.assertTrue(net1["net-name"] == "plugin_test1") net2 = self.dbtest.create_network(self.tenant_id, "plugin_test2") + self.assertTrue(net2["net-name"] == "plugin_test2") nets = self.dbtest.get_all_networks(self.tenant_id) count = 0 - for x in nets: - if "plugin_test" in x["net-name"]: + for net in nets: + if "plugin_test" in net["net-name"]: count += 1 self.assertTrue(count == 2) self.tearDownNetworkPort() def testCDeleteNetwork(self): + """test to delete network""" net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1") + self.assertTrue(net1["net-name"] == "plugin_test1") self.dbtest.delete_network(net1["net-id"]) nets = self.dbtest.get_all_networks(self.tenant_id) count = 0 - for x in nets: - if "plugin_test1" in x["net-name"]: + for net in nets: + if "plugin_test1" in net["net-name"]: count += 1 self.assertTrue(count == 0) self.tearDownNetworkPort() def testDRenameNetwork(self): + """test to rename network""" net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1") + self.assertTrue(net1["net-name"] == "plugin_test1") net = self.dbtest.rename_network(self.tenant_id, net1["net-id"], "plugin_test1_renamed") self.assertTrue(net["net-name"] == "plugin_test1_renamed") self.tearDownNetworkPort() def testECreatePort(self): + """test to create port""" net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1") port = self.dbtest.create_port(net1["net-id"]) + self.assertTrue(port["net-id"] == net1["net-id"]) ports = self.dbtest.get_all_ports(net1["net-id"]) count = 0 - for p in ports: + for por in ports: count += 1 self.assertTrue(count == 1) self.tearDownNetworkPort() def testFDeletePort(self): + """test to delete port""" net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1") port = self.dbtest.create_port(net1["net-id"]) + self.assertTrue(port["net-id"] == net1["net-id"]) ports = self.dbtest.get_all_ports(net1["net-id"]) count = 0 - for p in ports: + for por in ports: count += 1 self.assertTrue(count == 1) - for p in ports: - self.dbtest.delete_port(net1["net-id"], p["port-id"]) + for por in ports: + self.dbtest.delete_port(net1["net-id"], por["port-id"]) ports = self.dbtest.get_all_ports(net1["net-id"]) count = 0 - for p in ports: + for por in ports: count += 1 self.assertTrue(count == 0) self.tearDownNetworkPort() def testGPlugUnPlugInterface(self): + """test to plug/unplug interface""" net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1") port1 = self.dbtest.create_port(net1["net-id"]) self.dbtest.plug_interface(net1["net-id"], port1["port-id"], "vif1.1") @@ -691,9 +767,12 @@ class QuantumDBTest(unittest.TestCase): self.tearDownNetworkPort() def testIJoinedTest(self): + """test to get network and port""" net1 = self.dbtest.create_network("t1", "net1") port1 = self.dbtest.create_port(net1["net-id"]) + self.assertTrue(port1["net-id"] == net1["net-id"]) port2 = self.dbtest.create_port(net1["net-id"]) + self.assertTrue(port2["net-id"] == net1["net-id"]) ports = self.dbtest.get_all_ports(net1["net-id"]) for port in ports: net = port["net"] @@ -701,26 +780,27 @@ class QuantumDBTest(unittest.TestCase): self.tearDownJoinedTest() def tearDownNetworkPort(self): + """tearDown for Network and Port table""" networks = self.dbtest.get_all_networks(self.tenant_id) for net in networks: - id = net["net-id"] + netid = net["net-id"] name = net["net-name"] if "plugin_test" in name: - # Clean up any test ports lying around - ports = self.dbtest.get_all_ports(id) - for p in ports: - self.dbtest.delete_port(id, p["port-id"]) - self.dbtest.delete_network(id) + ports = self.dbtest.get_all_ports(netid) + for por in ports: + self.dbtest.delete_port(netid, por["port-id"]) + self.dbtest.delete_network(netid) def tearDownJoinedTest(self): + """tearDown for joined Network and Port test""" LOG.debug("Tearing Down Network and Ports") nets = self.dbtest.get_all_networks("t1") for net in nets: - id = net["net-id"] - ports = self.dbtest.get_all_ports(id) + netid = net["net-id"] + ports = self.dbtest.get_all_ports(netid) for port in ports: self.dbtest.delete_port(port["net-id"], port["port-id"]) - self.dbtest.delete_network(id) + self.dbtest.delete_network(netid) if __name__ == "__main__":