--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011, Cisco Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Rohit Agarwalla, Cisco Systems, Inc.
+
+"""
+stubs.py provides interface methods for
+the database test cases
+"""
+import logging
+import unittest
+
+from quantum.db import api as db
+
+
+LOG = logging.getLogger('quantum.tests.database_stubs')
+
+
+class QuantumDB(object):
+ """Class conisting of methods to call Quantum db methods"""
+ def get_all_networks(self, tenant_id):
+ """Get all networks"""
+ nets = []
+ try:
+ for net in db.network_list(tenant_id):
+ LOG.debug("Getting network: %s" % net.uuid)
+ net_dict = {}
+ net_dict["tenant-id"] = net.tenant_id
+ net_dict["net-id"] = str(net.uuid)
+ net_dict["net-name"] = net.name
+ nets.append(net_dict)
+ except Exception, exc:
+ LOG.error("Failed to get all networks: %s" % str(exc))
+ return nets
+
+ def get_network(self, network_id):
+ """Get a network"""
+ net = []
+ try:
+ for net in db.network_get(network_id):
+ LOG.debug("Getting network: %s" % net.uuid)
+ net_dict = {}
+ net_dict["tenant-id"] = net.tenant_id
+ net_dict["net-id"] = str(net.uuid)
+ net_dict["net-name"] = net.name
+ net.append(net_dict)
+ except Exception, exc:
+ LOG.error("Failed to get network: %s" % str(exc))
+ return net
+
+ def create_network(self, tenant_id, net_name):
+ """Create a network"""
+ net_dict = {}
+ try:
+ res = db.network_create(tenant_id, net_name)
+ LOG.debug("Created network: %s" % res.uuid)
+ net_dict["tenant-id"] = res.tenant_id
+ net_dict["net-id"] = str(res.uuid)
+ net_dict["net-name"] = res.name
+ return net_dict
+ except Exception, exc:
+ LOG.error("Failed to create network: %s" % str(exc))
+
+ def delete_network(self, net_id):
+ """Delete a network"""
+ try:
+ net = db.network_destroy(net_id)
+ LOG.debug("Deleted network: %s" % net.uuid)
+ net_dict = {}
+ net_dict["net-id"] = str(net.uuid)
+ return net_dict
+ except Exception, exc:
+ raise Exception("Failed to delete port: %s" % str(exc))
+
+ def rename_network(self, tenant_id, net_id, new_name):
+ """Rename a network"""
+ try:
+ net = db.network_rename(net_id, tenant_id, new_name)
+ LOG.debug("Renamed network: %s" % net.uuid)
+ net_dict = {}
+ net_dict["net-id"] = str(net.uuid)
+ net_dict["net-name"] = net.name
+ return net_dict
+ except Exception, exc:
+ raise Exception("Failed to rename network: %s" % str(exc))
+
+ def get_all_ports(self, net_id):
+ """Get all ports"""
+ ports = []
+ try:
+ for port in db.port_list(net_id):
+ LOG.debug("Getting port: %s" % port.uuid)
+ port_dict = {}
+ port_dict["port-id"] = str(port.uuid)
+ port_dict["net-id"] = str(port.network_id)
+ port_dict["int-id"] = port.interface_id
+ port_dict["state"] = port.state
+ ports.append(port_dict)
+ return ports
+ except Exception, exc:
+ LOG.error("Failed to get all ports: %s" % str(exc))
+
+ def get_port(self, net_id, port_id):
+ """Get a port"""
+ port_list = []
+ port = db.port_get(port_id, net_id)
+ try:
+ LOG.debug("Getting port: %s" % port.uuid)
+ port_dict = {}
+ port_dict["port-id"] = str(port.uuid)
+ port_dict["net-id"] = str(port.network_id)
+ port_dict["int-id"] = port.interface_id
+ port_dict["state"] = port.state
+ port_list.append(port_dict)
+ return port_list
+ except Exception, exc:
+ LOG.error("Failed to get port: %s" % str(exc))
+
+ def create_port(self, net_id):
+ """Add a port"""
+ port_dict = {}
+ try:
+ port = db.port_create(net_id)
+ LOG.debug("Creating port %s" % port.uuid)
+ port_dict["port-id"] = str(port.uuid)
+ port_dict["net-id"] = str(port.network_id)
+ port_dict["int-id"] = port.interface_id
+ port_dict["state"] = port.state
+ return port_dict
+ except Exception, exc:
+ LOG.error("Failed to create port: %s" % str(exc))
+
+ def delete_port(self, net_id, port_id):
+ """Delete a port"""
+ try:
+ port = db.port_destroy(port_id, net_id)
+ LOG.debug("Deleted port %s" % port.uuid)
+ port_dict = {}
+ port_dict["port-id"] = str(port.uuid)
+ return port_dict
+ except Exception, exc:
+ raise Exception("Failed to delete port: %s" % str(exc))
+
+ def update_port(self, net_id, port_id, port_state):
+ """Update a port"""
+ try:
+ port = db.port_set_state(net_id, port_id, port_state)
+ LOG.debug("Updated port %s" % port.uuid)
+ port_dict = {}
+ port_dict["port-id"] = str(port.uuid)
+ port_dict["net-id"] = str(port.network_id)
+ port_dict["int-id"] = port.interface_id
+ port_dict["state"] = port.state
+ return port_dict
+ except Exception, exc:
+ raise Exception("Failed to update port state: %s" % str(exc))
+
+ def plug_interface(self, net_id, port_id, int_id):
+ """Plug interface to a port"""
+ try:
+ port = db.port_set_attachment(port_id, net_id, int_id)
+ LOG.debug("Attached interface to port %s" % port.uuid)
+ port_dict = {}
+ port_dict["port-id"] = str(port.uuid)
+ port_dict["net-id"] = str(port.network_id)
+ port_dict["int-id"] = port.interface_id
+ port_dict["state"] = port.state
+ return port_dict
+ except Exception, exc:
+ raise Exception("Failed to plug interface: %s" % str(exc))
+
+ def unplug_interface(self, net_id, port_id):
+ """Unplug interface to a port"""
+ try:
+ db.port_unset_attachment(port_id, net_id)
+ LOG.debug("Detached interface from port %s" % port_id)
+ except Exception, exc:
+ raise Exception("Failed to unplug interface: %s" % str(exc))
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011, Cisco Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Rohit Agarwalla, Cisco Systems, Inc.
+
+"""
+test_database.py is an independent test suite
+that tests the database api method calls
+"""
+import logging
+import unittest
+
+
+from quantum.db import api as db
+from tests.unit import database_stubs as db_stubs
+
+
+LOG = logging.getLogger('quantum.tests.test_database')
+
+
+class QuantumDBTest(unittest.TestCase):
+ """Class conisting of Quantum DB unit tests"""
+ def setUp(self):
+ """Setup for tests"""
+ db.configure_db({'sql_connection': 'sqlite:///:memory:'})
+ self.dbtest = db_stubs.QuantumDB()
+ self.tenant_id = "t1"
+ LOG.debug("Setup")
+
+ def tearDown(self):
+ """Tear Down"""
+ db.clear_db()
+
+ def testa_create_network(self):
+ """test to create network"""
+ net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+ self.assertTrue(net1["net-name"] == "plugin_test1")
+ self.teardown_network_port()
+
+ def testb_get_networks(self):
+ """test to get all networks"""
+ net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+ self.assertTrue(net1["net-name"] == "plugin_test1")
+ net2 = self.dbtest.create_network(self.tenant_id, "plugin_test2")
+ self.assertTrue(net2["net-name"] == "plugin_test2")
+ nets = self.dbtest.get_all_networks(self.tenant_id)
+ count = 0
+ for net in nets:
+ if "plugin_test" in net["net-name"]:
+ count += 1
+ self.assertTrue(count == 2)
+ self.teardown_network_port()
+
+ def testc_delete_network(self):
+ """test to delete network"""
+ net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+ self.assertTrue(net1["net-name"] == "plugin_test1")
+ self.dbtest.delete_network(net1["net-id"])
+ nets = self.dbtest.get_all_networks(self.tenant_id)
+ count = 0
+ for net in nets:
+ if "plugin_test1" in net["net-name"]:
+ count += 1
+ self.assertTrue(count == 0)
+ self.teardown_network_port()
+
+ def testd_rename_network(self):
+ """test to rename network"""
+ net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+ self.assertTrue(net1["net-name"] == "plugin_test1")
+ net = self.dbtest.rename_network(self.tenant_id, net1["net-id"],
+ "plugin_test1_renamed")
+ self.assertTrue(net["net-name"] == "plugin_test1_renamed")
+ self.teardown_network_port()
+
+ def teste_create_port(self):
+ """test to create port"""
+ net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+ port = self.dbtest.create_port(net1["net-id"])
+ self.assertTrue(port["net-id"] == net1["net-id"])
+ ports = self.dbtest.get_all_ports(net1["net-id"])
+ count = 0
+ for por in ports:
+ count += 1
+ self.assertTrue(count == 1)
+ self.teardown_network_port()
+
+ def testf_delete_port(self):
+ """test to delete port"""
+ net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+ port = self.dbtest.create_port(net1["net-id"])
+ self.assertTrue(port["net-id"] == net1["net-id"])
+ ports = self.dbtest.get_all_ports(net1["net-id"])
+ count = 0
+ for por in ports:
+ count += 1
+ self.assertTrue(count == 1)
+ for por in ports:
+ self.dbtest.delete_port(net1["net-id"], por["port-id"])
+ ports = self.dbtest.get_all_ports(net1["net-id"])
+ count = 0
+ for por in ports:
+ count += 1
+ self.assertTrue(count == 0)
+ self.teardown_network_port()
+
+ def testg_plug_unplug_interface(self):
+ """test to plug/unplug interface"""
+ net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
+ port1 = self.dbtest.create_port(net1["net-id"])
+ self.dbtest.plug_interface(net1["net-id"], port1["port-id"], "vif1.1")
+ port = self.dbtest.get_port(net1["net-id"], port1["port-id"])
+ self.assertTrue(port[0]["int-id"] == "vif1.1")
+ self.dbtest.unplug_interface(net1["net-id"], port1["port-id"])
+ port = self.dbtest.get_port(net1["net-id"], port1["port-id"])
+ self.assertTrue(port[0]["int-id"] == None)
+ self.teardown_network_port()
+
+ def teardown_network_port(self):
+ """tearDown for Network and Port table"""
+ networks = self.dbtest.get_all_networks(self.tenant_id)
+ for net in networks:
+ netid = net["net-id"]
+ name = net["net-name"]
+ if "plugin_test" in name:
+ ports = self.dbtest.get_all_ports(netid)
+ for por in ports:
+ self.dbtest.delete_port(netid, por["port-id"])
+ self.dbtest.delete_network(netid)