import logging
from webob import exc
-
+from quantum import wsgi
from quantum.extensions import _credential_view as credential_view
from quantum.api import api_common as common
from quantum.extensions import extensions
parent=parent_resource)]
-class CredentialController(common.QuantumController):
+class CredentialController(common.QuantumController, wsgi.Controller):
""" credential API controller
based on QuantumController """
def create(self, request, tenant_id):
""" Creates a new credential for a given tenant """
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, request.get_content_type())
+ req_body = \
+ self._prepare_request_body(body,
self._credential_ops_param_list)
+ req_params = req_body[self._resource_name]
+
except exc.HTTPError as exp:
return faults.Fault(exp)
credential = self._plugin.\
def update(self, request, tenant_id, id):
""" Updates the name for the credential with the given id """
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, request.get_content_type())
+ req_body = \
+ self._prepare_request_body(body,
self._credential_ops_param_list)
+ req_params = req_body[self._resource_name]
except exc.HTTPError as exp:
return faults.Fault(exp)
try:
import logging
from webob import exc
-
+from quantum import wsgi
from quantum.api import api_common as common
from quantum.api.views import ports as port_view
from quantum.extensions import extensions
parent=parent_resource)]
-class MultiportController(common.QuantumController):
+class MultiportController(common.QuantumController, wsgi.Controller):
""" multiport API controller
based on QuantumController """
def __init__(self, plugin):
self._resource_name = 'multiport'
self._plugin = plugin
+ self.version = "1.0"
# pylint: disable-msg=E1101,W0613
def create(self, request, tenant_id):
""" Creates a new multiport for a given tenant """
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, request.get_content_type())
+ req_body = \
+ self._prepare_request_body(body,
self._multiport_ops_param_list)
+ req_params = req_body[self._resource_name]
+
except exc.HTTPError as exp:
return faults.Fault(exp)
multiports = self._plugin.\
req_params['net_id_list'],
req_params['status'],
req_params['ports_desc'])
- builder = port_view.get_view_builder(request)
+ builder = port_view.get_view_builder(request, self.version)
result = [builder.build(port)['port']
for port in multiports]
return dict(ports=result)
#
"""
from webob import exc
-
+from quantum import wsgi
from quantum.extensions import _novatenant_view as novatenant_view
from quantum.api import api_common as common
from quantum.common import exceptions as qexception
member_actions=member_actions)]
-class NovatenantsController(common.QuantumController):
+class NovatenantsController(common.QuantumController, wsgi.Controller):
""" Novatenant API controller
based on QuantumController """
content_type = request.best_match_content_type()
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, content_type)
+ req_body = \
+ self._prepare_request_body(body,
self._schedule_host_ops_param_list)
+ req_params = req_body[self._resource_name]
+
except exc.HTTPError as exp:
return faults.Fault(exp)
instance_id = req_params['instance_id']
def associate_port(self, request, tenant_id, id):
content_type = request.best_match_content_type()
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, content_type)
+ req_body = \
+ self._prepare_request_body(body,
self._schedule_host_ops_param_list)
+ req_params = req_body[self._resource_name]
+
except exc.HTTPError as exp:
return faults.Fault(exp)
instance_id = req_params['instance_id']
def detach_port(self, request, tenant_id, id):
content_type = request.best_match_content_type()
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, content_type)
+ req_body = \
+ self._prepare_request_body(body,
self._schedule_host_ops_param_list)
+ req_params = req_body[self._resource_name]
+
except exc.HTTPError as exp:
return faults.Fault(exp)
"""
from webob import exc
-
+from quantum import wsgi
from quantum.extensions import _pprofiles as pprofiles_view
from quantum.api import api_common as common
from quantum.common import exceptions as qexception
member_actions=member_actions)]
-class PortprofilesController(common.QuantumController):
+class PortprofilesController(common.QuantumController, wsgi.Controller):
""" portprofile API controller
based on QuantumController """
""" Creates a new portprofile for a given tenant """
#look for portprofile name in request
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, request.get_content_type())
+ req_body = \
+ self._prepare_request_body(body,
self._portprofile_ops_param_list)
+ req_params = req_body[self._resource_name]
except exc.HTTPError as exp:
return faults.Fault(exp)
portprofile = self._plugin.\
def update(self, request, tenant_id, id):
""" Updates the name for the portprofile with the given id """
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, request.get_content_type())
+ req_body = \
+ self._prepare_request_body(body,
self._portprofile_ops_param_list)
+ req_params = req_body[self._resource_name]
except exc.HTTPError as exp:
return faults.Fault(exp)
try:
content_type = request.best_match_content_type()
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, content_type)
+ req_body = \
+ self._prepare_request_body(body,
self._assignprofile_ops_param_list)
+ req_params = req_body[self._resource_name]
except exc.HTTPError as exp:
return faults.Fault(exp)
net_id = req_params['network-id'].strip()
""" Disassociate a portprofile from a port """
content_type = request.best_match_content_type()
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, content_type)
+ req_body = \
+ self._prepare_request_body(body,
self._assignprofile_ops_param_list)
+ req_params = req_body[self._resource_name]
except exc.HTTPError as exp:
return faults.Fault(exp)
net_id = req_params['network-id'].strip()
"""
import logging
+from quantum import wsgi
from webob import exc
from quantum.extensions import _qos_view as qos_view
from quantum.api import api_common as common
parent=parent_resource)]
-class QosController(common.QuantumController):
+class QosController(common.QuantumController, wsgi.Controller):
""" qos API controller
based on QuantumController """
""" Creates a new qos for a given tenant """
#look for qos name in request
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, request.get_content_type())
+ req_body = \
+ self._prepare_request_body(body,
self._qos_ops_param_list)
+ req_params = req_body[self._resource_name]
except exc.HTTPError as exp:
return faults.Fault(exp)
qos = self._plugin.\
def update(self, request, tenant_id, id):
""" Updates the name for the qos with the given id """
try:
- req_params = \
- self._parse_request_params(request,
+ body = self._deserialize(request.body, request.get_content_type())
+ req_body = \
+ self._prepare_request_body(body,
self._qos_ops_param_list)
+ req_params = req_body[self._resource_name]
except exc.HTTPError as exp:
return faults.Fault(exp)
try:
'/extensions/csco/tenants/{tenant_id}'
TENANT_ID = 'nova'
CSCO_EXT_NAME = 'Cisco Nova Tenant'
+DEFAULT_QUANTUM_VERSION = '1.1'
def help():
action="store_true", default=False, help="turn on verbose logging")
PARSER.add_option("-f", "--logfile", dest="logfile",
type="string", default="syslog", help="log file path")
+ PARSER.add_option('--version', default=DEFAULT_QUANTUM_VERSION,
+ help='Accepts 1.1 and 1.0, defaults to env[QUANTUM_VERSION].')
options, args = PARSER.parse_args()
if options.verbose:
LOG.addHandler(logging.handlers.WatchedFileHandler(options.logfile))
os.chmod(options.logfile, 0644)
+ version = options.version
if len(args) < 1:
PARSER.print_help()
- qcli.help()
+ qcli.help(version)
help()
sys.exit(1)
sys.exit(1)
if CMD not in COMMANDS.keys():
LOG.error("Unknown command: %s" % CMD)
- qcli.help()
+ qcli.help(version)
help()
sys.exit(1)
def validate_port_ownership(tenant_id, net_id, port_id, session=None):
validate_network_ownership(tenant_id, net_id)
- port_get(port_id, net_id)
+ port_get(net_id, port_id)
vlan_id = Column(Integer, primary_key=True)
vlan_name = Column(String(255))
- network_id = Column(String(255), ForeignKey("networks.uuid"),
+ network_id = Column(String(255),
nullable=False)
- network = relation(models.Network, uselist=False)
def __init__(self, vlan_id, vlan_name, network_id):
self.vlan_id = vlan_id
def _invoke_plugin(self, plugin_key, function_name, args, kwargs):
"""Invoke only the device plugin"""
- # If there are more args than needed, add them to kwargs
func = getattr(self._plugins[plugin_key], function_name)
- if args.__len__() + 1 > inspect.getargspec(func).args.__len__():
- kwargs.update(args.pop())
+ # If there are more args than needed, add them to kwargs
+ args_copy = deepcopy(args)
+ if args.__len__() + 1 > \
+ inspect.getargspec(func).args.__len__():
+ kwargs.update(args_copy.pop())
- return func(*args, **kwargs)
+ return func(*args_copy, **kwargs)
def get_all_networks(self, args):
"""Not implemented for this model"""
"""
Unittest runner for quantum Cisco plugin
-export PLUGIN_DIR=plugins/cisco-plugin/lib/quantum/plugins/cisco/
+export PLUGIN_DIR=quantum/plugins/cisco
./run_tests.sh -N
"""
import os
-import unittest
import sys
from nose import config
-from nose import core
-
sys.path.append(os.getcwd())
-
-from quantum.common.test_lib import run_tests
-
-import quantum.plugins.cisco.tests.unit
+sys.path.append(os.path.dirname(__file__))
+from quantum.common.test_lib import run_tests, test_config
+import quantum.tests.unit
def main():
+
+ test_config['plugin_name'] = "l2network_plugin.L2Network"
+ cwd = os.getcwd()
+ os.chdir(cwd)
+ working_dir = os.path.abspath("quantum/plugins/cisco")
c = config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
- includeExe=True,
- traverseNamespace=True,
- plugins=core.DefaultPluginManager())
- c.configureWhere(quantum.plugins.cisco.tests.unit.__path__)
+ workingDir=working_dir)
sys.exit(run_tests(c))
if __name__ == '__main__':
TEST_CONF_FILE = config.find_config_file({'plugin': 'cisco'}, None,
'quantum.conf.ciscoext')
EXTENSIONS_PATH = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
- os.pardir, os.pardir, os.pardir, "extensions")
+ os.pardir, os.pardir, "extensions")
LOG = logging.getLogger('quantum.plugins.cisco.tests.test_cisco_extensions')
options = {}
options['plugin_provider'] = 'quantum.plugins.cisco.l2network_plugin'\
'.L2Network'
- self.api = server.APIRouterV1(options)
+ self.api = server.APIRouterV10(options)
self._l2network_plugin = l2network_plugin.L2Network()
def test_list_portprofile(self):
rename_path_temp = self.portprofile_path +\
resp_body['portprofiles']['portprofile']['id']
rename_path = str(rename_path_temp)
- rename_response = self.test_app.put(rename_path, rename_req_body)
+ rename_response = self.test_app.put(rename_path, rename_req_body,
+ content_type=self.contenttype)
rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
self.contenttype)
self.assertEqual(
update_path_temp = self.portprofile_path + portprofile_id
update_path = str(update_path_temp)
update_response = self.test_app.put(update_path, rename_req_body,
+ content_type=self.contenttype,
status='*')
self.assertEqual(450, update_response.status_int)
LOG.debug("test_update_portprofileDNE - START")
rename_path_temp = self.qos_second_path +\
resp_body['qoss']['qos']['id']
rename_path = str(rename_path_temp)
- rename_response = self.test_app.put(rename_path, rename_req_body)
+ rename_response = self.test_app.put(rename_path, rename_req_body,
+ content_type=self.contenttype)
self.assertEqual(200, rename_response.status_int)
rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
self.contenttype)
rename_path_temp = self.qos_second_path + qos_id
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
+ content_type=self.contenttype,
status='*')
self.assertEqual(452, rename_response.status_int)
LOG.debug("test_update_qosDNE - END")
rename_path_temp = self.cred_second_path +\
resp_body['credentials']['credential']['id']
rename_path = str(rename_path_temp)
- rename_response = self.test_app.put(rename_path, rename_req_body)
+ rename_response = self.test_app.put(rename_path, rename_req_body,
+ content_type=self.contenttype)
rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
self.contenttype)
self.assertEqual(
rename_path_temp = self.cred_second_path + credential_id
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
+ content_type=self.contenttype,
status='*')
self.assertEqual(451, rename_response.status_int)
LOG.debug("test_update_credentialDNE - END")
options = {}
options['plugin_provider'] = 'quantum.plugins.cisco.l2network_plugin'\
'.L2Network'
- self.api = server.APIRouterV1(options)
+ self.api = server.APIRouterV10(options)
self._l2network_plugin = l2network_plugin.L2Network()
def create_request(self, path, body, content_type, method='GET'):
def vlan_name(id):
- return "q-%svlan" % id
+ return "q-%svlan" % id[0:10]
class TestMultiBlade(unittest.TestCase):
self.net_id,
vlan_name(self.net_id),
vlan_id])
+ cdb.add_vlan_binding(vlan_id, vlan_name(self.net_id), self.net_id)
- self.assertEqual(networks.__len__(), self.ucs_count)
for network in networks:
self.assertEqual(network[const.NET_ID], self.net_id)
self.assertEqual(network[const.NET_NAME], net_name)
LOG.debug("test_create_network - END")
- def test_create_networkDNE(self):
- """Support for the Quantum core API call"""
- LOG.debug("test_create_networkDNE - START")
-
- self.assertRaises(exc.NetworkNotFound,
- self._l2network_multiblade.create_network,
- [tenant_id, net_name, net_id,
- vlan_name(net_id), vlan_id])
-
- LOG.debug("test_create_networkDNE - END")
-
def test_delete_network(self):
"""Support for the Quantum core API call"""
LOG.debug("test_delete_network - START")
networks = self._l2network_multiblade.delete_network([tenant_id,
self.net_id])
-
- self.assertEqual(networks.__len__(), self.ucs_count)
+ cdb.remove_vlan_binding(self.net_id)
+ db.network_destroy(self.net_id)
for network in networks:
self.assertEqual(network[const.NET_ID], self.net_id)
self.assertEqual(network[const.NET_NAME], net_name)
self.net_id,
vlan_name(self.net_id),
vlan_id])
+ cdb.add_vlan_binding(vlan_id, vlan_name(self.net_id), self.net_id)
- db.network_update(self.net_id, tenant_id, name=new_net_name)
+ net_details = db.network_update(self.net_id, tenant_id,
+ name=new_net_name)
networks = self._l2network_multiblade.update_network([tenant_id,
self.net_id,
{'name': new_net_name}])
- self.assertEqual(networks.__len__(), self.ucs_count)
for network in networks:
self.assertEqual(network[const.NET_ID], self.net_id)
self.assertEqual(network[const.NET_NAME], new_net_name)
self.net_id,
vlan_name(self.net_id),
vlan_id])
+ cdb.add_vlan_binding(vlan_id, vlan_name(self.net_id), self.net_id)
self.port_id = db.port_create(self.net_id, port_state)[const.UUID]
port = self._l2network_multiblade.create_port([tenant_id,
self.net_id,
vlan_name(self.net_id),
vlan_id])
+ cdb.add_vlan_binding(vlan_id, vlan_name(self.net_id), self.net_id)
self.port_id = db.port_create(self.net_id, port_state)[const.UUID]
self._l2network_multiblade.create_port([tenant_id,
self.net_id = 000007
self.vlan_name = "q-" + str(self.net_id) + "vlan"
self.vlan_id = 267
+ self.second_vlan_id = 265
self.port_id = "9"
db.configure_db({'sql_connection': 'sqlite:///:memory:'})
cdb.initialize()
tenant_id, self.net_name, network_created["net-id"],
self.vlan_name, self.vlan_id)
network_created2 = self.create_network(tenant_id, 'test_network2')
- cdb.add_vlan_binding(265, 'second_vlan', network_created2["net-id"])
+ cdb.add_vlan_binding(self.second_vlan_id, 'second_vlan',
+ network_created2["net-id"])
new_net_dict2 = self._cisco_nexus_plugin.create_network(
- tenant_id, "New_Network2",
- network_created2["net-id"], "second_vlan", "2003")
+ tenant_id, "New_Network2", network_created2["net-id"],
+ "second_vlan", self.second_vlan_id)
list_net_dict = self._cisco_nexus_plugin.get_all_networks(tenant_id)
net_temp_list = [new_net_dict1, new_net_dict2]
self.assertTrue(net_temp_list[0] in list_net_dict)