import unittest
import quantum.db.api as db
-from quantum.plugins.linuxbridge.common import constants as const
import quantum.plugins.linuxbridge.db.l2network_db as l2network_db
LOG.error("Failed to get all networks: %s" % str(exc))
return nets
- def get_network(self, network_id):
- """Get a network"""
- net = []
- try:
- for net in db.network_get(network_id):
- LOG.debug("Getting network: %s" % net.uuid)
- net_dict = {}
- net_dict["tenant-id"] = net.tenant_id
- net_dict["net-id"] = str(net.uuid)
- net_dict["net-name"] = net.name
- net.append(net_dict)
- except Exception, exc:
- LOG.error("Failed to get network: %s" % str(exc))
- return net
-
def create_network(self, tenant_id, net_name):
"""Create a network"""
net_dict = {}
except Exception, exc:
raise Exception("Failed to delete port: %s" % str(exc))
- def update_network(self, tenant_id, net_id, **kwargs):
- """Update a network"""
- try:
- net = db.network_update(net_id, tenant_id, **kwargs)
- LOG.debug("Updated network: %s" % net.uuid)
- net_dict = {}
- net_dict["net-id"] = str(net.uuid)
- net_dict["net-name"] = net.name
- return net_dict
- except Exception, exc:
- raise Exception("Failed to update network: %s" % str(exc))
-
- def get_all_ports(self, net_id):
- """Get all ports"""
- ports = []
- try:
- for port in db.port_list(net_id):
- LOG.debug("Getting port: %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- port_dict["net"] = port.network
- ports.append(port_dict)
- return ports
- except Exception, exc:
- LOG.error("Failed to get all ports: %s" % str(exc))
-
- def get_port(self, net_id, port_id):
- """Get a port"""
- port_list = []
- port = db.port_get(net_id, port_id)
- try:
- LOG.debug("Getting port: %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- port_list.append(port_dict)
- return port_list
- except Exception, exc:
- LOG.error("Failed to get port: %s" % str(exc))
-
- def create_port(self, net_id):
- """Add a port"""
- port_dict = {}
- try:
- port = db.port_create(net_id)
- LOG.debug("Creating port %s" % port.uuid)
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception, exc:
- LOG.error("Failed to create port: %s" % str(exc))
-
- def delete_port(self, net_id, port_id):
- """Delete a port"""
- try:
- port = db.port_destroy(net_id, port_id)
- LOG.debug("Deleted port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- return port_dict
- except Exception, exc:
- raise Exception("Failed to delete port: %s" % str(exc))
-
- def update_port(self, net_id, port_id, port_state):
- """Update a port"""
- try:
- port = db.port_set_state(net_id, port_id, port_state)
- LOG.debug("Updated port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception, exc:
- raise Exception("Failed to update port state: %s" % str(exc))
-
- def plug_interface(self, net_id, port_id, int_id):
- """Plug interface to a port"""
- try:
- port = db.port_set_attachment(net_id, port_id, int_id)
- LOG.debug("Attached interface to port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception, exc:
- raise Exception("Failed to plug interface: %s" % str(exc))
-
- def unplug_interface(self, net_id, port_id):
- """Unplug interface to a port"""
- try:
- port = db.port_unset_attachment(net_id, port_id)
- LOG.debug("Detached interface from port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception, exc:
- raise Exception("Failed to unplug interface: %s" % str(exc))
-
class L2networkDBTest(unittest.TestCase):
"""Class conisting of L2network DB unit tests"""