def get_resource(self, request, tenant_id, network_id, id):
try:
result = self.network_manager.get_port_details(
- tenant_id, network_id, id).get('attachment-id', None)
+ tenant_id, network_id, id).get('attachment-id',
+ None)
return dict(attachment=result)
except exception.NetworkNotFound as e:
return faults.Fault(faults.NetworkNotFound(e))
self.network_id = network_id
self.state = "DOWN"
-
def __repr__(self):
return "<Port(%s,%s,%s,%s)>" % (self.uuid, self.network_id,
- self.state,self.interface_id)
+ self.state, self.interface_id)
class Network(BASE, QuantumBase):
CONFIG_FILE = "plugins.ini"
LOG = logging.getLogger('quantum.manager')
+
def find_config(basepath):
for root, dirs, files in os.walk(basepath):
if CONFIG_FILE in files:
LOG = logging.getLogger('quantum.plugins.SamplePlugin')
+
class QuantumEchoPlugin(object):
"""
def __init__(self):
db_options = {"sql_connection": "sqlite:///fake_plugin.sqllite"}
- db.configure_db(db_options)
+ db.configure_db(db_options)
FakePlugin._net_counter = 0
def _get_network(self, tenant_id, network_id):
def _get_port(self, tenant_id, network_id, port_id):
net = self._get_network(tenant_id, network_id)
- port = db.port_get(port_id)
+ try:
+ port = db.port_get(port_id)
+ except:
+ raise exc.PortNotFound(net_id=network_id, port_id=port_id)
# Port must exist and belong to the appropriate network.
- if not port or port['network_id']!=net['uuid']:
+ if port['network_id'] != net['uuid']:
raise exc.PortNotFound(net_id=network_id, port_id=port_id)
return port
LOG.debug("FakePlugin.get_all_networks() called")
nets = []
for net in db.network_list(tenant_id):
- net_item = {'net-id':str(net.uuid),
- 'net-name':net.name}
+ net_item = {'net-id': str(net.uuid),
+ 'net-name': net.name}
nets.append(net_item)
- return nets
+ return nets
def get_network_details(self, tenant_id, net_id):
"""
"""
LOG.debug("FakePlugin.get_network_details() called")
net = self._get_network(tenant_id, net_id)
- return {'net-id':str(net.uuid),
- 'net-name':net.name}
+ return {'net-id': str(net.uuid),
+ 'net-name': net.name}
def create_network(self, tenant_id, net_name):
"""
# Verify that no attachments are plugged into the network
if net:
for port in db.port_list(net_id):
- if port['interface-id']:
+ if port['interface_id']:
raise exc.NetworkInUse(net_id=net_id)
db.network_destroy(net_id)
return net
port_ids = []
ports = db.port_list(net_id)
for x in ports:
- d = {'port-id':str(x.uuid)}
+ d = {'port-id': str(x.uuid)}
port_ids.append(d)
return port_ids
"""
LOG.debug("FakePlugin.get_port_details() called")
port = self._get_port(tenant_id, net_id, port_id)
- return {'port-id':str(port.uuid),
- 'attachment-id':port.interface_id,
- 'port-state':port.state}
-
+ return {'port-id': str(port.uuid),
+ 'attachment-id': port.interface_id,
+ 'port-state': port.state}
def create_port(self, tenant_id, net_id, port_state=None):
"""
"""
LOG.debug("FakePlugin.create_port() called")
port = db.port_create(net_id)
- port_item = {'port-id':str(port.uuid)}
+ port_item = {'port-id': str(port.uuid)}
return port_item
def update_port(self, tenant_id, net_id, port_id, new_state):
"""
LOG.debug("FakePlugin.update_port() called")
self._validate_port_state(new_state)
- db.port_set_state(port_id,new_state)
- return
+ db.port_set_state(port_id, new_state)
+ return
def delete_port(self, tenant_id, net_id, port_id):
"""
"""
import gettext
-import logging
+import logging
import os
import unittest
import sys
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.DEBUG)
-
+
working_dir = os.path.abspath("tests")
c = config.Config(stream=sys.stdout,
env=os.environ,
# under the License.
import gettext
-import json
-import sys
import unittest
gettext.install('quantum', unicode=1)
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
#resdict = json.loads(output)
#attachment = resdict["attachment"]
- #self.assertEqual(attachment, "fudd", "Attachment: %s" % attachment)
+ #self.assertEqual(attachment, "fudd", "Attachment: %s"
+ #% attachment)
#def test_renameNetwork(self):
#self.create_network(test_network1_data)
# License for the specific language governing permissions and limitations
# under the License.
# @author: Brad Hall, Nicira Networks
-# @author: Salvatore Orlando, Citrix Systems
+# @author: Salvatore Orlando, Citrix Systems
import logging
import unittest
import tests.unit.testlib as testlib
-from quantum import api as server
+from quantum import api as server
from quantum.common.wsgi import Serializer
LOG = logging.getLogger('quantum.tests.test_api')
def tearDown(self):
"""Clear the test environment"""
# Remove all the networks.
- network_req = testlib.create_list_networks_request(self.tenant_id)
+ network_req = testlib.create_network_list_request(self.tenant_id)
network_res = network_req.get_response(self.api)
- network_data = Serializer().deserialize(network_res.body,"application/xml")
+ network_data = Serializer().deserialize(network_res.body,
+ "application/xml")
for network in network_data["networks"].values():
network_delete_req = testlib. \
- create_network_delete_request(self.tenant_id,network['id'])
- network_delete_req.get_response(self.api)
-
-
-
+ create_network_delete_request(self.tenant_id, network['id'])
+ network_delete_req.get_response(self.api)
+
# Fault names copied here for reference
#
# _fault_names = {
# 470: "serviceUnavailable",
# 471: "pluginFault"}
-
def _test_delete_port(self, format):
+ LOG.debug("_test_delete_port - format:%s - START", format)
content_type = "application/" + format
port_state = "ACTIVE"
+ LOG.debug("Creating network and port")
network_req = testlib.create_new_network_request(self.tenant_id,
- self.network_name,
+ self.network_name,
format)
network_res = network_req.get_response(self.api)
self.assertEqual(network_res.status_int, 200)
network_data = Serializer().deserialize(network_res.body,
content_type)
- network_id = network_data["networks"]["network"]["id"]
- port_req = testlib.create_new_port_request(self.tenant_id,
+ network_id = network_data['networks']['network']['id']
+ port_req = testlib.create_new_port_request(self.tenant_id,
network_id, port_state,
format)
port_res = port_req.get_response(self.api)
self.assertEqual(port_res.status_int, 200)
port_data = Serializer().deserialize(port_res.body, content_type)
- port_id = port_data["ports"]["port"]["id"]
+ port_id = port_data['ports']['port']['id']
LOG.debug("Deleting port %(port_id)s for network %(network_id)s"\
" of tenant %(tenant_id)s", locals())
delete_port_req = testlib.create_port_delete_request(self.tenant_id,
network_id,
- port_id,
+ port_id,
format)
delete_port_res = delete_port_req.get_response(self.api)
self.assertEqual(delete_port_res.status_int, 202)
-
- def test_deletePort_xml(self):
+ list_port_req = testlib.create_port_list_request(self.tenant_id,
+ network_id,
+ format)
+ list_port_res = list_port_req.get_response(self.api)
+ port_list_data = Serializer().deserialize(list_port_res.body,
+ content_type)
+ port_count = len(port_list_data['ports'])
+ self.assertEqual(port_count, 0)
+ LOG.debug("_test_delete_port - format:%s - END", format)
+
+ def test_delete_port_xml(self):
self._test_delete_port('xml')
- def test_deletePort_json(self):
+ def test_delete_port_json(self):
self._test_delete_port('json')
-
- #def test_deletePortNegative(self):
- # tenant = "tenant1"
- # network = "test1"
-
- # Check for network not found
- #rv = self.port.delete("", tenant, network, 2)
- #self.assertEqual(rv.wrapped_exc.status_int, 420)
-
- # Create a network to put the port on
- #req = testlib.create_network_request(tenant, network)
- #network_obj = self.network.create(req, tenant)
- #network_id = network_obj["networks"]["network"]["id"]
-
- # Test for portnotfound
- #rv = self.port.delete("", tenant, network_id, 2)
- #self.assertEqual(rv.wrapped_exc.status_int, 430)
-
+ def _test_delete_port_in_use(self):
# Test for portinuse
#rv = self.port.create(req, tenant, network_id)
#port_id = rv["ports"]["port"]["id"]
#self.assertEqual(rv.status_int, 202)
#rv = self.port.delete("", tenant, network_id, port_id)
#self.assertEqual(rv.wrapped_exc.status_int, 432)
-#
\ No newline at end of file
+ pass
+
+
+ def _test_delete_port_with_bad_id(self,format):
+ LOG.debug("_test_delete_port - format:%s - START", format)
+ content_type = "application/" + format
+ port_state = "ACTIVE"
+ LOG.debug("Creating network and port")
+ network_req = testlib.create_new_network_request(self.tenant_id,
+ self.network_name,
+ format)
+ network_res = network_req.get_response(self.api)
+ self.assertEqual(network_res.status_int, 200)
+ network_data = Serializer().deserialize(network_res.body,
+ content_type)
+ network_id = network_data['networks']['network']['id']
+ port_req = testlib.create_new_port_request(self.tenant_id,
+ network_id, port_state,
+ format)
+ port_res = port_req.get_response(self.api)
+ self.assertEqual(port_res.status_int, 200)
+
+ # Test for portnotfound
+ delete_port_req = testlib.create_port_delete_request(self.tenant_id,
+ network_id,
+ "A_BAD_ID",
+ format)
+ delete_port_res = delete_port_req.get_response(self.api)
+ self.assertEqual(delete_port_res.status_int, 430)
+
+ def test_delete_port_with_bad_id_xml(self):
+ self._test_delete_port_wth_bad_id('xml')
+
+ def test_delete_port_with_bad_id_json(self):
+ self._test_delete_port_with_bad_id('json')
+
from quantum.common.wsgi import Serializer
-def create_request(path, body, content_type, method= 'GET'):
+def create_request(path, body, content_type, method='GET'):
req = webob.Request.blank(path)
req.method = method
req.headers = {}
req.body = body
return req
-def create_list_networks_request(tenant_id, format='xml'):
+
+def create_network_list_request(tenant_id, format='xml'):
method = 'GET'
path = "/tenants/%(tenant_id)s/networks.%(format)s" % locals()
- content_type = "application/" + format
+ content_type = "application/" + format
return create_request(path, None, content_type, method)
method = 'POST'
path = "/tenants/%(tenant_id)s/networks.%(format)s" % locals()
data = {'network': {'net-name': '%s' % network_name}}
- content_type = "application/" + format
+ content_type = "application/" + format
body = Serializer().serialize(data, content_type)
return create_request(path, body, content_type, method)
return create_request(path, None, content_type, method)
+def create_port_list_request(tenant_id, network_id, format='xml'):
+ method = 'GET'
+ path = "/tenants/%(tenant_id)s/networks/" \
+ "%(network_id)s/ports.%(format)s" % locals()
+ content_type = "application/" + format
+ return create_request(path, None, content_type, method)
+
+
def create_new_port_request(tenant_id, network_id, port_state, format='xml'):
method = 'POST'
path = "/tenants/%(tenant_id)s/networks/" \
body = Serializer().serialize(data, content_type)
return create_request(path, body, content_type, method)
+
def create_port_delete_request(tenant_id, network_id, port_id, format='xml'):
method = 'DELETE'
path = "/tenants/%(tenant_id)s/networks/" \