#max_networks=65568
#model_class=quantum.plugins.cisco.models.virt_phy_sw_v2.VirtualPhysicalSwitchModelV2
#manager_class=quantum.plugins.cisco.segmentation.l2network_vlan_mgr_v2.L2NetworkVLANMgr
-#nexus_driver=quantum.plugins.cisco.tests.unit.v2.nexus.fake_nexus_driver.CiscoNEXUSFakeDriver
+#nexus_driver=quantum.plugins.cisco.test.nexus.fake_nexus_driver.CiscoNEXUSFakeDriver
#svi_round_robin=False
# IMPORTANT: Comment out the following two lines for production deployments
'l2network_vlan_mgr_v2.L2NetworkVLANMgr',
help=_("Manager Class")),
cfg.StrOpt('nexus_driver',
- default='quantum.plugins.cisco.tests.unit.v2.nexus.'
+ default='quantum.plugins.cisco.test.nexus.'
'fake_nexus_driver.CiscoNEXUSFakeDriver',
help=_("Nexus Driver Name")),
]
+++ /dev/null
-#!/usr/bin/env python
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-"""
-Unittest runner for quantum Cisco plugin
-
-export PLUGIN_DIR=quantum/plugins/cisco
-./run_tests.sh -N
-"""
-
-import os
-import sys
-
-from nose import config
-
-sys.path.append(os.getcwd())
-sys.path.append(os.path.dirname(__file__))
-
-from quantum.common.test_lib import run_tests, test_config
-
-
-def main():
-
- test_config['plugin_name'] = "l2network_plugin.L2Network"
- cwd = os.getcwd()
- os.chdir(cwd)
- working_dir = os.path.abspath("quantum/plugins/cisco")
- c = config.Config(stream=sys.stdout,
- env=os.environ,
- verbosity=3,
- workingDir=working_dir)
- sys.exit(run_tests(c))
-
-if __name__ == '__main__':
- main()
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# See http://code.google.com/p/python-nose/issues/detail?id=373
-# The code below enables nosetests to work with i18n _() blocks
-
-import __builtin__
-setattr(__builtin__, '_', lambda x: x)
+++ /dev/null
-[pipeline:extensions_app_with_filter]
-pipeline = extensions extensions_test_app
-
-[filter:extensions]
-paste.filter_factory = quantum.common.extensions:plugin_aware_extension_middleware_factory
-
-[app:extensions_test_app]
-paste.app_factory = quantum.plugins.cisco.tests.unit.test_cisco_extension:app_factory
+++ /dev/null
-[DEFAULT]
-# Show more verbose log output (sets INFO log level output)
-verbose = True
-
-# Show debugging output in logs (sets DEBUG log level output)
-debug = False
-
-# Address to bind the API server
-bind_host = 0.0.0.0
-
-# Port the bind the API server to
-bind_port = 9696
-
-# Path to the extensions
-api_extensions_path = ../../../../extensions
-
-# Paste configuration file
-api_paste_config = api-paste.ini.cisco.test
-
-core_plugin = quantum.plugins.cisco.l2network_plugin.L2Network
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# @authors: Shweta Padubidri, Cisco Systems, Inc.
-# Peter Strunk , Cisco Systems, Inc.
-# Shubhangi Satras , Cisco Systems, Inc.
-
-import logging
-import os.path
-
-import routes
-import webob
-from webtest import TestApp
-
-from quantum.api import extensions
-from quantum.api.extensions import (
- ExtensionMiddleware,
- PluginAwareExtensionManager,
-)
-from quantum.common import config
-from quantum.extensions import (
- credential,
- qos,
-)
-from quantum.manager import QuantumManager
-from quantum.openstack.common import jsonutils
-from quantum.plugins.cisco.db import api as db
-from quantum.plugins.cisco import l2network_plugin
-from quantum.plugins.cisco.l2network_plugin import L2Network
-from quantum.tests import base
-from quantum.tests.unit.extension_stubs import StubBaseAppController
-from quantum import wsgi
-
-
-LOG = logging.getLogger('quantum.plugins.cisco.tests.test_cisco_extensions')
-
-
-EXTENSIONS_PATH = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
- os.pardir, os.pardir, "extensions")
-
-ROOTDIR = os.path.dirname(os.path.dirname(__file__))
-UNITDIR = os.path.join(ROOTDIR, 'unit')
-
-
-def testsdir(*p):
- return os.path.join(UNITDIR, *p)
-
-config_file = 'quantum.conf.cisco.test'
-args = ['--config-file', testsdir(config_file)]
-config.parse(args=args)
-
-
-class ExtensionsTestApp(wsgi.Router):
-
- def __init__(self, options=None):
- options = options or {}
- mapper = routes.Mapper()
- controller = StubBaseAppController()
- mapper.resource("dummy_resource", "/dummy_resources",
- controller=controller)
- super(ExtensionsTestApp, self).__init__(mapper)
-
- def create_request(self, path, body, content_type, method='GET'):
- """Test create request."""
-
- LOG.debug("test_create_request - START")
- req = webob.Request.blank(path)
- req.method = method
- req.headers = {}
- req.headers['Accept'] = content_type
- req.body = body
- LOG.debug("test_create_request - END")
- return req
-
- def _create_network(self, name=None):
- """Test create network."""
-
- LOG.debug("Creating network - START")
- if name:
- net_name = name
- else:
- net_name = self.network_name
- net_path = "/tenants/tt/networks"
- net_data = {'network': {'name': '%s' % net_name}}
- req_body = wsgi.Serializer().serialize(net_data, self.contenttype)
- network_req = self.create_request(net_path, req_body,
- self.contenttype, 'POST')
- network_res = network_req.get_response(self.api)
- network_data = wsgi.Serializer().deserialize(network_res.body,
- self.contenttype)
- LOG.debug("Creating network - END")
- return network_data['network']['id']
-
- def _create_port(self, network_id, port_state):
- """Test create port."""
-
- LOG.debug("Creating port for network %s - START", network_id)
- port_path = "/tenants/tt/networks/%s/ports" % network_id
- port_req_data = {'port': {'state': '%s' % port_state}}
- req_body = wsgi.Serializer().serialize(port_req_data,
- self.contenttype)
- port_req = self.create_request(port_path, req_body,
- self.contenttype, 'POST')
- port_res = port_req.get_response(self.api)
- port_data = wsgi.Serializer().deserialize(port_res.body,
- self.contenttype)
- LOG.debug("Creating port for network - END")
- return port_data['port']['id']
-
- def _delete_port(self, network_id, port_id):
- """Delete port."""
- LOG.debug("Deleting port for network %s - START", network_id)
- data = {'network_id': network_id, 'port_id': port_id}
- port_path = ("/tenants/tt/networks/%(network_id)s/ports/%(port_id)s" %
- data)
- port_req = self.create_request(port_path, None,
- self.contenttype, 'DELETE')
- port_req.get_response(self.api)
- LOG.debug("Deleting port for network - END")
-
- def _delete_network(self, network_id):
- """Delete network."""
- LOG.debug("Deleting network %s - START", network_id)
- network_path = "/tenants/tt/networks/%s" % network_id
- network_req = self.create_request(network_path, None,
- self.contenttype, 'DELETE')
- network_req.get_response(self.api)
- LOG.debug("Deleting network - END")
-
- def tear_down_port_network(self, net_id, port_id):
- """Tear down port and network."""
-
- self._delete_port(net_id, port_id)
- self._delete_network(net_id)
-
-
-class QosExtensionTest(base.BaseTestCase):
-
- def setUp(self):
- """Set up function."""
-
- super(QosExtensionTest, self).setUp()
- parent_resource = dict(member_name="tenant",
- collection_name="extensions/csco/tenants")
- controller = qos.QosController(QuantumManager.get_plugin())
- res_ext = extensions.ResourceExtension('qos', controller,
- parent=parent_resource)
-
- self.test_app = setup_extensions_test_app(
- SimpleExtensionManager(res_ext))
- self.contenttype = 'application/json'
- self.qos_path = '/extensions/csco/tenants/tt/qos'
- self.qos_second_path = '/extensions/csco/tenants/tt/qos/'
- self.test_qos_data = {
- 'qos': {
- 'qos_name': 'cisco_test_qos',
- 'qos_desc': {
- 'PPS': 50,
- 'TTL': 5,
- },
- },
- }
- self._l2network_plugin = l2network_plugin.L2Network()
-
- def test_create_qos(self):
- """Test create qos."""
-
- LOG.debug("test_create_qos - START")
- req_body = jsonutils.dumps(self.test_qos_data)
- index_response = self.test_app.post(self.qos_path,
- req_body,
- content_type=self.contenttype)
- self.assertEqual(200, index_response.status_int)
-
- # Clean Up - Delete the qos
- resp_body = wsgi.Serializer().deserialize(index_response.body,
- self.contenttype)
- qos_path_temp = self.qos_second_path + resp_body['qoss']['qos']['id']
- qos_path = str(qos_path_temp)
- self.tearDownQos(qos_path)
- LOG.debug("test_create_qos - END")
-
- def test_create_qosBADRequest(self):
- """Test create qos bad request."""
-
- LOG.debug("test_create_qosBADRequest - START")
- index_response = self.test_app.post(self.qos_path,
- 'BAD_REQUEST',
- content_type=self.contenttype,
- status='*')
- self.assertEqual(400, index_response.status_int)
- LOG.debug("test_create_qosBADRequest - END")
-
- def test_list_qoss(self):
- """Test list qoss."""
-
- LOG.debug("test_list_qoss - START")
- req_body1 = jsonutils.dumps(self.test_qos_data)
- create_resp1 = self.test_app.post(self.qos_path, req_body1,
- content_type=self.contenttype)
- req_body2 = jsonutils.dumps({
- 'qos': {
- 'qos_name': 'cisco_test_qos2',
- 'qos_desc': {
- 'PPS': 50,
- 'TTL': 5,
- },
- },
- })
- create_resp2 = self.test_app.post(self.qos_path, req_body2,
- content_type=self.contenttype)
- index_response = self.test_app.get(self.qos_path)
- index_resp_body = wsgi.Serializer().deserialize(index_response.body,
- self.contenttype)
- self.assertEqual(200, index_response.status_int)
-
- # Clean Up - Delete the qos's
- resp_body1 = wsgi.Serializer().deserialize(create_resp1.body,
- self.contenttype)
- qos_path1_temp = self.qos_second_path + resp_body1['qoss']['qos']['id']
- qos_path1 = str(qos_path1_temp)
- resp_body2 = wsgi.Serializer().deserialize(create_resp2.body,
- self.contenttype)
- list_all_qos = [resp_body1['qoss']['qos'], resp_body2['qoss']['qos']]
- self.assertTrue(index_resp_body['qoss'][0] in list_all_qos)
- self.assertTrue(index_resp_body['qoss'][1] in list_all_qos)
- qos_path2_temp = self.qos_second_path + resp_body2['qoss']['qos']['id']
- qos_path2 = str(qos_path2_temp)
- self.tearDownQos(qos_path1)
- self.tearDownQos(qos_path2)
- LOG.debug("test_list_qoss - END")
-
- def test_show_qos(self):
- """Test show qos."""
-
- LOG.debug("test_show_qos - START")
- req_body = jsonutils.dumps(self.test_qos_data)
- index_response = self.test_app.post(self.qos_path, req_body,
- content_type=self.contenttype)
- resp_body = wsgi.Serializer().deserialize(index_response.body,
- self.contenttype)
- show_path_temp = self.qos_second_path + resp_body['qoss']['qos']['id']
- show_qos_path = str(show_path_temp)
- show_response = self.test_app.get(show_qos_path)
- show_resp_dict = wsgi.Serializer().deserialize(show_response.body,
- self.contenttype)
- self.assertEqual(show_resp_dict['qoss']['qos']['name'],
- self.test_qos_data['qos']['qos_name'])
-
- self.assertEqual(200, show_response.status_int)
-
- # Clean Up - Delete the qos
- self.tearDownQos(show_qos_path)
- LOG.debug("test_show_qos - END")
-
- def test_show_qosDNE(self, qos_id='100'):
- """Test show qos does not exist."""
-
- LOG.debug("test_show_qosDNE - START")
- show_path_temp = self.qos_second_path + qos_id
- show_qos_path = str(show_path_temp)
- show_response = self.test_app.get(show_qos_path, status='*')
- self.assertEqual(452, show_response.status_int)
- LOG.debug("test_show_qosDNE - END")
-
- def test_update_qos(self):
- """Test update qos."""
-
- LOG.debug("test_update_qos - START")
- req_body = jsonutils.dumps(self.test_qos_data)
- index_response = self.test_app.post(self.qos_path, req_body,
- content_type=self.contenttype)
- resp_body = wsgi.Serializer().deserialize(index_response.body,
- self.contenttype)
- rename_req_body = jsonutils.dumps({
- 'qos': {
- 'qos_name': 'cisco_rename_qos',
- 'qos_desc': {
- 'PPS': 50,
- 'TTL': 5,
- },
- },
- })
- rename_path_temp = (self.qos_second_path +
- resp_body['qoss']['qos']['id'])
- rename_path = str(rename_path_temp)
- rename_response = self.test_app.put(rename_path, rename_req_body,
- content_type=self.contenttype)
- self.assertEqual(200, rename_response.status_int)
- rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
- self.contenttype)
- self.assertEqual(rename_resp_dict['qoss']['qos']['name'],
- 'cisco_rename_qos')
- self.tearDownQos(rename_path)
- LOG.debug("test_update_qos - END")
-
- def test_update_qosDNE(self, qos_id='100'):
- """Test update qos does not exist."""
-
- LOG.debug("test_update_qosDNE - START")
- rename_req_body = jsonutils.dumps({
- 'qos': {
- 'qos_name': 'cisco_rename_qos',
- 'qos_desc': {
- 'PPS': 50,
- 'TTL': 5,
- },
- },
- })
- rename_path_temp = self.qos_second_path + qos_id
- rename_path = str(rename_path_temp)
- rename_response = self.test_app.put(rename_path, rename_req_body,
- content_type=self.contenttype,
- status='*')
- self.assertEqual(452, rename_response.status_int)
- LOG.debug("test_update_qosDNE - END")
-
- def test_update_qosBADRequest(self):
- """Test update qos bad request."""
-
- LOG.debug("test_update_qosBADRequest - START")
- req_body = jsonutils.dumps(self.test_qos_data)
- index_response = self.test_app.post(self.qos_path, req_body,
- content_type=self.contenttype)
- resp_body = wsgi.Serializer().deserialize(index_response.body,
- self.contenttype)
- rename_path_temp = (self.qos_second_path +
- resp_body['qoss']['qos']['id'])
- rename_path = str(rename_path_temp)
- rename_response = self.test_app.put(rename_path, 'BAD_REQUEST',
- status="*")
- self.assertEqual(400, rename_response.status_int)
-
- # Clean Up - Delete the Port Profile
- self.tearDownQos(rename_path)
- LOG.debug("test_update_qosBADRequest - END")
-
- def test_delete_qos(self):
- """Test delte qos."""
-
- LOG.debug("test_delete_qos - START")
- req_body = jsonutils.dumps({
- 'qos': {
- 'qos_name': 'cisco_test_qos',
- 'qos_desc': {
- 'PPS': 50,
- 'TTL': 5,
- },
- },
- })
- index_response = self.test_app.post(self.qos_path, req_body,
- content_type=self.contenttype)
- resp_body = wsgi.Serializer().deserialize(index_response.body,
- self.contenttype)
- delete_path_temp = (self.qos_second_path +
- resp_body['qoss']['qos']['id'])
- delete_path = str(delete_path_temp)
- delete_response = self.test_app.delete(delete_path)
- self.assertEqual(200, delete_response.status_int)
- LOG.debug("test_delete_qos - END")
-
- def test_delete_qosDNE(self, qos_id='100'):
- """Test delte qos does not exist."""
-
- LOG.debug("test_delete_qosDNE - START")
- delete_path_temp = self.qos_second_path + qos_id
- delete_path = str(delete_path_temp)
- delete_response = self.test_app.delete(delete_path, status='*')
- self.assertEqual(452, delete_response.status_int)
- LOG.debug("test_delete_qosDNE - END")
-
- def tearDownQos(self, delete_profile_path):
- """Tear Down Qos."""
-
- self.test_app.delete(delete_profile_path)
-
- def tearDown(self):
- db.clear_db()
-
-
-class CredentialExtensionTest(base.BaseTestCase):
-
- def setUp(self):
- """Set up function."""
-
- super(CredentialExtensionTest, self).setUp()
- parent_resource = dict(member_name="tenant",
- collection_name="extensions/csco/tenants")
- controller = credential.CredentialController(QuantumManager.
- get_plugin())
- res_ext = extensions.ResourceExtension('credentials', controller,
- parent=parent_resource)
- self.test_app = setup_extensions_test_app(SimpleExtensionManager(
- res_ext))
- self.contenttype = 'application/json'
- self.credential_path = '/extensions/csco/tenants/tt/credentials'
- self.cred_second_path = '/extensions/csco/tenants/tt/credentials/'
- self.test_credential_data = {
- 'credential': {
- 'credential_name': 'cred8',
- 'user_name': 'newUser2',
- 'password': 'newPasswd1',
- },
- }
- self._l2network_plugin = l2network_plugin.L2Network()
-
- def test_list_credentials(self):
- """Test list credentials."""
-
- #Create Credential before listing
- LOG.debug("test_list_credentials - START")
- req_body1 = jsonutils.dumps(self.test_credential_data)
- create_response1 = self.test_app.post(
- self.credential_path, req_body1,
- content_type=self.contenttype)
- req_body2 = jsonutils.dumps({
- 'credential': {
- 'credential_name': 'cred9',
- 'user_name': 'newUser2',
- 'password': 'newPasswd2',
- },
- })
- create_response2 = self.test_app.post(
- self.credential_path, req_body2,
- content_type=self.contenttype)
- index_response = self.test_app.get(self.credential_path)
- index_resp_body = wsgi.Serializer().deserialize(index_response.body,
- self.contenttype)
- self.assertEqual(200, index_response.status_int)
- #CLean Up - Deletion of the Credentials
- resp_body1 = wsgi.Serializer().deserialize(create_response1.body,
- self.contenttype)
- delete_path1_temp = (self.cred_second_path +
- resp_body1['credentials']['credential']['id'])
- delete_path1 = str(delete_path1_temp)
- resp_body2 = wsgi.Serializer().deserialize(create_response2.body,
- self.contenttype)
- list_all_credential = [resp_body1['credentials']['credential'],
- resp_body2['credentials']['credential']]
- self.assertTrue(
- index_resp_body['credentials'][0] in list_all_credential)
- self.assertTrue(
- index_resp_body['credentials'][1] in list_all_credential)
- delete_path2_temp = (self.cred_second_path +
- resp_body2['credentials']['credential']['id'])
- delete_path2 = str(delete_path2_temp)
- self.tearDownCredential(delete_path1)
- self.tearDownCredential(delete_path2)
- LOG.debug("test_list_credentials - END")
-
- def test_create_credential(self):
- """Test create credential."""
-
- LOG.debug("test_create_credential - START")
- req_body = jsonutils.dumps(self.test_credential_data)
- index_response = self.test_app.post(
- self.credential_path, req_body,
- content_type=self.contenttype)
- self.assertEqual(200, index_response.status_int)
- #CLean Up - Deletion of the Credentials
- resp_body = wsgi.Serializer().deserialize(
- index_response.body, self.contenttype)
- delete_path_temp = (self.cred_second_path +
- resp_body['credentials']['credential']['id'])
- delete_path = str(delete_path_temp)
- self.tearDownCredential(delete_path)
- LOG.debug("test_create_credential - END")
-
- def test_create_credentialBADRequest(self):
- """Test create credential bad request."""
-
- LOG.debug("test_create_credentialBADRequest - START")
- index_response = self.test_app.post(
- self.credential_path, 'BAD_REQUEST',
- content_type=self.contenttype, status='*')
- self.assertEqual(400, index_response.status_int)
- LOG.debug("test_create_credentialBADRequest - END")
-
- def test_show_credential(self):
- """Test show credential."""
-
- LOG.debug("test_show_credential - START")
- req_body = jsonutils.dumps(self.test_credential_data)
- index_response = self.test_app.post(
- self.credential_path, req_body,
- content_type=self.contenttype)
- resp_body = wsgi.Serializer().deserialize(index_response.body,
- self.contenttype)
- show_path_temp = (self.cred_second_path +
- resp_body['credentials']['credential']['id'])
- show_cred_path = str(show_path_temp)
- show_response = self.test_app.get(show_cred_path)
- show_resp_dict = wsgi.Serializer().deserialize(show_response.body,
- self.contenttype)
- self.assertEqual(show_resp_dict['credentials']['credential']['name'],
- self.test_credential_data['credential']['user_name'])
- self.assertEqual(
- show_resp_dict['credentials']['credential']['password'],
- self.test_credential_data['credential']['password'])
- self.assertEqual(200, show_response.status_int)
- LOG.debug("test_show_credential - END")
-
- def test_show_credentialDNE(self, credential_id='100'):
- """Test show credential does not exist."""
-
- LOG.debug("test_show_credentialDNE - START")
- show_path_temp = self.cred_second_path + credential_id
- show_cred_path = str(show_path_temp)
- show_response = self.test_app.get(show_cred_path, status='*')
- self.assertEqual(451, show_response.status_int)
- LOG.debug("test_show_credentialDNE - END")
-
- def test_update_credential(self):
- """Test update credential."""
-
- LOG.debug("test_update_credential - START")
- req_body = jsonutils.dumps(self.test_credential_data)
-
- index_response = self.test_app.post(
- self.credential_path, req_body,
- content_type=self.contenttype)
- resp_body = wsgi.Serializer().deserialize(
- index_response.body, self.contenttype)
- rename_req_body = jsonutils.dumps({
- 'credential': {
- 'credential_name': 'cred3',
- 'user_name': 'RenamedUser',
- 'password': 'Renamedpassword',
- },
- })
- rename_path_temp = (self.cred_second_path +
- resp_body['credentials']['credential']['id'])
- rename_path = str(rename_path_temp)
- rename_response = self.test_app.put(rename_path, rename_req_body,
- content_type=self.contenttype)
- rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
- self.contenttype)
- self.assertEqual(rename_resp_dict['credentials']['credential']['name'],
- 'cred3')
- self.assertEqual(
- rename_resp_dict['credentials']['credential']['password'],
- self.test_credential_data['credential']['password'])
- self.assertEqual(200, rename_response.status_int)
- # Clean Up - Delete the Credentials
- self.tearDownCredential(rename_path)
- LOG.debug("test_update_credential - END")
-
- def test_update_credBADReq(self):
- """Test update credential bad request."""
-
- LOG.debug("test_update_credBADReq - START")
- req_body = jsonutils.dumps(self.test_credential_data)
- index_response = self.test_app.post(
- self.credential_path, req_body,
- content_type=self.contenttype)
- resp_body = wsgi.Serializer().deserialize(
- index_response.body, self.contenttype)
- rename_path_temp = (self.cred_second_path +
- resp_body['credentials']['credential']['id'])
- rename_path = str(rename_path_temp)
- rename_response = self.test_app.put(rename_path, 'BAD_REQUEST',
- status='*')
- self.assertEqual(400, rename_response.status_int)
- LOG.debug("test_update_credBADReq - END")
-
- def test_update_credentialDNE(self, credential_id='100'):
- """Test update credential does not exist."""
-
- LOG.debug("test_update_credentialDNE - START")
- rename_req_body = jsonutils.dumps({
- 'credential': {
- 'credential_name': 'cred3',
- 'user_name': 'RenamedUser',
- 'password': 'Renamedpassword',
- },
- })
- rename_path_temp = self.cred_second_path + credential_id
- rename_path = str(rename_path_temp)
- rename_response = self.test_app.put(rename_path, rename_req_body,
- content_type=self.contenttype,
- status='*')
- self.assertEqual(451, rename_response.status_int)
- LOG.debug("test_update_credentialDNE - END")
-
- def test_delete_credential(self):
- """Test delete credential."""
-
- LOG.debug("test_delete_credential - START")
- req_body = jsonutils.dumps(self.test_credential_data)
- index_response = self.test_app.post(
- self.credential_path, req_body,
- content_type=self.contenttype)
- resp_body = wsgi.Serializer().deserialize(
- index_response.body, self.contenttype)
- delete_path_temp = (self.cred_second_path +
- resp_body['credentials']['credential']['id'])
- delete_path = str(delete_path_temp)
- delete_response = self.test_app.delete(delete_path)
- self.assertEqual(200, delete_response.status_int)
- LOG.debug("test_delete_credential - END")
-
- def test_delete_credentialDNE(self, credential_id='100'):
- """Test delete credential does not exist."""
-
- LOG.debug("test_delete_credentialDNE - START")
- delete_path_temp = self.cred_second_path + credential_id
- delete_path = str(delete_path_temp)
- delete_response = self.test_app.delete(delete_path, status='*')
- self.assertEqual(451, delete_response.status_int)
- LOG.debug("test_delete_credentialDNE - END")
-
- def tearDownCredential(self, delete_path):
- self.test_app.delete(delete_path)
-
- def tearDown(self):
- db.clear_db()
-
-
-def app_factory(global_conf, **local_conf):
- conf = global_conf.copy()
- conf.update(local_conf)
- return ExtensionsTestApp(conf)
-
-
-def setup_extensions_middleware(extension_manager=None):
- extension_manager = (extension_manager or
- PluginAwareExtensionManager(EXTENSIONS_PATH,
- L2Network()))
- app = config.load_paste_app('extensions_test_app')
- return ExtensionMiddleware(app, ext_mgr=extension_manager)
-
-
-def setup_extensions_test_app(extension_manager=None):
- return TestApp(setup_extensions_middleware(extension_manager))
-
-
-class SimpleExtensionManager(object):
-
- def __init__(self, resource_ext=None, action_ext=None, request_ext=None):
- self.resource_ext = resource_ext
- self.action_ext = action_ext
- self.request_ext = request_ext
-
- def get_resources(self):
- resource_exts = []
- if self.resource_ext:
- resource_exts.append(self.resource_ext)
- return resource_exts
-
- def get_actions(self):
- action_exts = []
- if self.action_ext:
- action_exts.append(self.action_ext)
- return action_exts
-
- def get_request_extensions(self):
- request_extensions = []
- if self.request_ext:
- request_extensions.append(self.request_ext)
- return request_extensions
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011, Cisco Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# @author: Rohit Agarwalla, Cisco Systems, Inc.
-
-"""
-test_database.py is an independent test suite
-that tests the database api method calls
-"""
-
-import mock
-import sqlalchemy
-
-from quantum.openstack.common import log as logging
-import quantum.plugins.cisco.common.cisco_exceptions as c_exc
-import quantum.plugins.cisco.db.api as db
-import quantum.plugins.cisco.db.l2network_db as l2network_db
-import quantum.plugins.cisco.db.nexus_db_v2 as nexus_db
-from quantum.tests import base
-
-
-LOG = logging.getLogger(__name__)
-
-
-class NexusDB(object):
- """Class consisting of methods to call nexus db methods."""
-
- def get_all_nexusportbindings(self):
- """Get all nexus port bindings."""
- bindings = []
- try:
- for bind in nexus_db.get_all_nexusport_bindings():
- LOG.debug("Getting nexus port binding : %s" % bind.port_id)
- bind_dict = {}
- bind_dict["port-id"] = str(bind.port_id)
- bind_dict["vlan-id"] = str(bind.vlan_id)
- bindings.append(bind_dict)
- except Exception as exc:
- LOG.error("Failed to get all bindings: %s" % str(exc))
- return bindings
-
- def get_nexusportbinding(self, vlan_id):
- """Get nexus port binding."""
- binding = []
- try:
- for bind in nexus_db.get_nexusport_binding(vlan_id):
- LOG.debug("Getting nexus port binding : %s" % bind.port_id)
- bind_dict = {}
- bind_dict["port-id"] = str(bind.port_id)
- bind_dict["vlan-id"] = str(bind.vlan_id)
- binding.append(bind_dict)
- except Exception as exc:
- LOG.error("Failed to get all bindings: %s" % str(exc))
- return binding
-
- def create_nexusportbinding(self, port_id, vlan_id):
- """Create nexus port binding."""
- bind_dict = {}
- try:
- res = nexus_db.add_nexusport_binding(port_id, vlan_id)
- LOG.debug("Created nexus port binding : %s" % res.port_id)
- bind_dict["port-id"] = str(res.port_id)
- bind_dict["vlan-id"] = str(res.vlan_id)
- return bind_dict
- except Exception as exc:
- LOG.error("Failed to create nexus binding: %s" % str(exc))
-
- def delete_nexusportbinding(self, vlan_id):
- """Delete nexus port binding."""
- bindings = []
- try:
- bind = nexus_db.remove_nexusport_binding(vlan_id)
- for res in bind:
- LOG.debug("Deleted nexus port binding: %s" % res.vlan_id)
- bind_dict = {}
- bind_dict["port-id"] = res.port_id
- bindings.append(bind_dict)
- return bindings
- except Exception as exc:
- raise Exception("Failed to delete nexus port binding: %s"
- % str(exc))
-
- def update_nexusport_binding(self, port_id, new_vlan_id):
- """Update nexus port binding."""
- try:
- res = nexus_db.update_nexusport_binding(port_id, new_vlan_id)
- LOG.debug("Updating nexus port binding : %s" % res.port_id)
- bind_dict = {}
- bind_dict["port-id"] = str(res.port_id)
- bind_dict["vlan-id"] = str(res.vlan_id)
- return bind_dict
- except Exception as exc:
- raise Exception("Failed to update nexus port binding vnic: %s"
- % str(exc))
-
-
-class L2networkDB(object):
- """Class conisting of methods to call L2network db methods."""
- def get_all_vlan_bindings(self):
- """Get all vlan binding into a list of dict."""
- vlans = []
- try:
- for vlan_bind in l2network_db.get_all_vlan_bindings():
- LOG.debug("Getting vlan bindings for vlan: %s" %
- vlan_bind.vlan_id)
- vlan_dict = {}
- vlan_dict["vlan-id"] = str(vlan_bind.vlan_id)
- vlan_dict["vlan-name"] = vlan_bind.vlan_name
- vlan_dict["net-id"] = str(vlan_bind.network_id)
- vlans.append(vlan_dict)
- except Exception as exc:
- LOG.error("Failed to get all vlan bindings: %s" % str(exc))
- return vlans
-
- def get_vlan_binding(self, network_id):
- """Get a vlan binding."""
- vlan = []
- try:
- for vlan_bind in l2network_db.get_vlan_binding(network_id):
- LOG.debug("Getting vlan binding for vlan: %s" %
- vlan_bind.vlan_id)
- vlan_dict = {}
- vlan_dict["vlan-id"] = str(vlan_bind.vlan_id)
- vlan_dict["vlan-name"] = vlan_bind.vlan_name
- vlan_dict["net-id"] = str(vlan_bind.network_id)
- vlan.append(vlan_dict)
- except Exception as exc:
- LOG.error("Failed to get vlan binding: %s" % str(exc))
- return vlan
-
- def create_vlan_binding(self, vlan_id, vlan_name, network_id):
- """Create a vlan binding."""
- vlan_dict = {}
- try:
- res = l2network_db.add_vlan_binding(vlan_id, vlan_name, network_id)
- LOG.debug("Created vlan binding for vlan: %s" % res.vlan_id)
- vlan_dict["vlan-id"] = str(res.vlan_id)
- vlan_dict["vlan-name"] = res.vlan_name
- vlan_dict["net-id"] = str(res.network_id)
- return vlan_dict
- except Exception as exc:
- LOG.error("Failed to create vlan binding: %s" % str(exc))
-
- def delete_vlan_binding(self, network_id):
- """Delete a vlan binding."""
- try:
- res = l2network_db.remove_vlan_binding(network_id)
- LOG.debug("Deleted vlan binding for vlan: %s" % res.vlan_id)
- vlan_dict = {}
- vlan_dict["vlan-id"] = str(res.vlan_id)
- return vlan_dict
- except Exception as exc:
- raise Exception("Failed to delete vlan binding: %s" % str(exc))
-
- def update_vlan_binding(self, network_id, vlan_id, vlan_name):
- """Update a vlan binding."""
- try:
- res = l2network_db.update_vlan_binding(network_id, vlan_id,
- vlan_name)
- LOG.debug("Updating vlan binding for vlan: %s" % res.vlan_id)
- vlan_dict = {}
- vlan_dict["vlan-id"] = str(res.vlan_id)
- vlan_dict["vlan-name"] = res.vlan_name
- vlan_dict["net-id"] = str(res.network_id)
- return vlan_dict
- except Exception as exc:
- raise Exception("Failed to update vlan binding: %s" % str(exc))
-
-
-class QuantumDB(object):
- """Class conisting of methods to call Quantum db methods."""
- def get_all_networks(self, tenant_id):
- """Get all networks."""
- nets = []
- try:
- for net in db.network_list(tenant_id):
- LOG.debug("Getting network: %s" % net.uuid)
- net_dict = {}
- net_dict["tenant-id"] = net.tenant_id
- net_dict["net-id"] = str(net.uuid)
- net_dict["net-name"] = net.name
- nets.append(net_dict)
- except Exception as exc:
- LOG.error("Failed to get all networks: %s" % str(exc))
- return nets
-
- def get_network(self, network_id):
- """Get a network."""
- net = []
- try:
- for net in db.network_get(network_id):
- LOG.debug("Getting network: %s" % net.uuid)
- net_dict = {}
- net_dict["tenant-id"] = net.tenant_id
- net_dict["net-id"] = str(net.uuid)
- net_dict["net-name"] = net.name
- net.append(net_dict)
- except Exception as exc:
- LOG.error("Failed to get network: %s" % str(exc))
- return net
-
- def create_network(self, tenant_id, net_name):
- """Create a network."""
- net_dict = {}
- try:
- res = db.network_create(tenant_id, net_name)
- LOG.debug("Created network: %s" % res.uuid)
- net_dict["tenant-id"] = res.tenant_id
- net_dict["net-id"] = str(res.uuid)
- net_dict["net-name"] = res.name
- return net_dict
- except Exception as exc:
- LOG.error("Failed to create network: %s" % str(exc))
-
- def delete_network(self, net_id):
- """Delete a network."""
- try:
- net = db.network_destroy(net_id)
- LOG.debug("Deleted network: %s" % net.uuid)
- net_dict = {}
- net_dict["net-id"] = str(net.uuid)
- return net_dict
- except Exception as exc:
- raise Exception("Failed to delete port: %s" % str(exc))
-
- def update_network(self, tenant_id, net_id, **kwargs):
- """Update a network."""
- try:
- net = db.network_update(net_id, tenant_id, **kwargs)
- LOG.debug("Updated network: %s" % net.uuid)
- net_dict = {}
- net_dict["net-id"] = str(net.uuid)
- net_dict["net-name"] = net.name
- return net_dict
- except Exception as exc:
- raise Exception("Failed to update network: %s" % str(exc))
-
- def get_all_ports(self, net_id):
- """Get all ports."""
- ports = []
- try:
- for port in db.port_list(net_id):
- LOG.debug("Getting port: %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- port_dict["net"] = port.network
- ports.append(port_dict)
- return ports
- except Exception as exc:
- LOG.error("Failed to get all ports: %s" % str(exc))
-
- def get_port(self, net_id, port_id):
- """Get a port."""
- port_list = []
- port = db.port_get(net_id, port_id)
- try:
- LOG.debug("Getting port: %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- port_list.append(port_dict)
- return port_list
- except Exception as exc:
- LOG.error("Failed to get port: %s" % str(exc))
-
- def create_port(self, net_id):
- """Add a port."""
- port_dict = {}
- try:
- port = db.port_create(net_id)
- LOG.debug("Creating port %s" % port.uuid)
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception as exc:
- LOG.error("Failed to create port: %s" % str(exc))
-
- def delete_port(self, net_id, port_id):
- """Delete a port."""
- try:
- port = db.port_destroy(net_id, port_id)
- LOG.debug("Deleted port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- return port_dict
- except Exception as exc:
- raise Exception("Failed to delete port: %s" % str(exc))
-
- def update_port(self, net_id, port_id, port_state):
- """Update a port."""
- try:
- port = db.port_set_state(net_id, port_id, port_state)
- LOG.debug("Updated port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception as exc:
- raise Exception("Failed to update port state: %s" % str(exc))
-
- def plug_interface(self, net_id, port_id, int_id):
- """Plug interface to a port."""
- try:
- port = db.port_set_attachment(net_id, port_id, int_id)
- LOG.debug("Attached interface to port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception as exc:
- raise Exception("Failed to plug interface: %s" % str(exc))
-
- def unplug_interface(self, net_id, port_id):
- """Unplug interface to a port."""
- try:
- port = db.port_unset_attachment(net_id, port_id)
- LOG.debug("Detached interface from port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception as exc:
- raise Exception("Failed to unplug interface: %s" % str(exc))
-
-
-class NexusDBTest(base.BaseTestCase):
- """Class conisting of nexus DB unit tests."""
- def setUp(self):
- super(NexusDBTest, self).setUp()
- """Setup for nexus db tests."""
- l2network_db.initialize()
- self.addCleanup(db.clear_db)
- self.dbtest = NexusDB()
- LOG.debug("Setup")
-
- def testa_create_nexusportbinding(self):
- """Create nexus port binding."""
- binding1 = self.dbtest.create_nexusportbinding("port1", 10)
- self.assertTrue(binding1["port-id"] == "port1")
- self.tearDown_nexusportbinding()
-
- def testb_getall_nexusportbindings(self):
- """Get all nexus port bindings."""
- self.dbtest.create_nexusportbinding("port1", 10)
- self.dbtest.create_nexusportbinding("port2", 10)
- bindings = self.dbtest.get_all_nexusportbindings()
- count = 0
- for bind in bindings:
- if "port" in bind["port-id"]:
- count += 1
- self.assertTrue(count == 2)
- self.tearDown_nexusportbinding()
-
- def testc_delete_nexusportbinding(self):
- """Delete nexus port binding."""
- self.dbtest.create_nexusportbinding("port1", 10)
- self.dbtest.delete_nexusportbinding(10)
- bindings = self.dbtest.get_all_nexusportbindings()
- count = 0
- for bind in bindings:
- if "port " in bind["port-id"]:
- count += 1
- self.assertTrue(count == 0)
- self.tearDown_nexusportbinding()
-
- def testd_update_nexusportbinding(self):
- """Update nexus port binding."""
- binding1 = self.dbtest.create_nexusportbinding("port1", 10)
- binding1 = self.dbtest.update_nexusport_binding(binding1["port-id"],
- 20)
- bindings = self.dbtest.get_all_nexusportbindings()
- count = 0
- for bind in bindings:
- if "20" in str(bind["vlan-id"]):
- count += 1
- self.assertTrue(count == 1)
- self.tearDown_nexusportbinding()
-
- def test_get_nexusport_binding_no_result_found_handling(self):
- with mock.patch('sqlalchemy.orm.Query.all') as mock_all:
- mock_all.return_value = []
-
- with self.assertRaises(c_exc.NexusPortBindingNotFound):
- nexus_db.get_nexusport_binding(port_id=10,
- vlan_id=20,
- switch_ip='10.0.0.1',
- instance_id=1)
-
- def test_get_nexusvlan_binding_no_result_found_handling(self):
- with mock.patch('sqlalchemy.orm.Query.all') as mock_all:
- mock_all.return_value = []
-
- with self.assertRaises(c_exc.NexusPortBindingNotFound):
- nexus_db.get_nexusvlan_binding(vlan_id=10,
- switch_ip='10.0.0.1')
-
- def test_update_nexusport_binding_no_result_found_handling(self):
- with mock.patch('sqlalchemy.orm.Query.one') as mock_one:
- mock_one.side_effect = sqlalchemy.orm.exc.NoResultFound
-
- with self.assertRaises(c_exc.NexusPortBindingNotFound):
- nexus_db.update_nexusport_binding(port_id=10,
- vlan_id=20,
- switch_ip='10.0.0.1',
- instance_id=1)
-
- def test_get_nexusvm_binding_no_result_found_handling(self):
- with mock.patch('sqlalchemy.orm.Query.first') as mock_first:
- mock_first.return_value = None
-
- with self.assertRaises(c_exc.NexusPortBindingNotFound):
- nexus_db.get_nexusvm_binding(port_id=10,
- vlan_id=20,
- switch_ip='10.0.0.1')
-
- def test_nexusport_binding_not_found_exception_message_formatting(self):
- try:
- raise c_exc.NexusPortBindingNotFound(a=1, b='test')
- except c_exc.NexusPortBindingNotFound as e:
- self.assertIn('(a=1,b=test)', str(e))
-
- def tearDown_nexusportbinding(self):
- """Tear down nexus port binding table."""
- LOG.debug("Tearing Down Nexus port Bindings")
- binds = self.dbtest.get_all_nexusportbindings()
- for bind in binds:
- vlan_id = bind["vlan-id"]
- self.dbtest.delete_nexusportbinding(vlan_id)
-
-
-class L2networkDBTest(base.BaseTestCase):
- """Class conisting of L2network DB unit tests."""
- def setUp(self):
- """Setup for tests."""
- super(L2networkDBTest, self).setUp()
- l2network_db.initialize()
- self.dbtest = L2networkDB()
- self.quantum = QuantumDB()
- self.addCleanup(db.clear_db)
- LOG.debug("Setup")
-
- def testa_create_vlanbinding(self):
- """Test add vlan binding."""
- net1 = self.quantum.create_network("t1", "netid1")
- vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
- self.assertTrue(vlan1["vlan-id"] == "10")
- self.teardown_vlanbinding()
- self.teardown_network()
-
- def testb_getall_vlanbindings(self):
- """Test get all vlan bindings."""
- net1 = self.quantum.create_network("t1", "netid1")
- net2 = self.quantum.create_network("t1", "netid2")
- vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
- self.assertTrue(vlan1["vlan-id"] == "10")
- vlan2 = self.dbtest.create_vlan_binding(20, "vlan2", net2["net-id"])
- self.assertTrue(vlan2["vlan-id"] == "20")
- vlans = self.dbtest.get_all_vlan_bindings()
- count = 0
- for vlan in vlans:
- if "vlan" in vlan["vlan-name"]:
- count += 1
- self.assertTrue(count == 2)
- self.teardown_vlanbinding()
- self.teardown_network()
-
- def testc_delete_vlanbinding(self):
- """Test delete vlan binding."""
- net1 = self.quantum.create_network("t1", "netid1")
- vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
- self.assertTrue(vlan1["vlan-id"] == "10")
- self.dbtest.delete_vlan_binding(net1["net-id"])
- vlans = self.dbtest.get_all_vlan_bindings()
- count = 0
- for vlan in vlans:
- if "vlan " in vlan["vlan-name"]:
- count += 1
- self.assertTrue(count == 0)
- self.teardown_vlanbinding()
- self.teardown_network()
-
- def testd_update_vlanbinding(self):
- """Test update vlan binding."""
- net1 = self.quantum.create_network("t1", "netid1")
- vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
- self.assertTrue(vlan1["vlan-id"] == "10")
- vlan1 = self.dbtest.update_vlan_binding(net1["net-id"], 11, "newvlan1")
- vlans = self.dbtest.get_all_vlan_bindings()
- count = 0
- for vlan in vlans:
- if "new" in vlan["vlan-name"]:
- count += 1
- self.assertTrue(count == 1)
- self.teardown_vlanbinding()
- self.teardown_network()
-
- def testm_test_vlanids(self):
- """Test vlanid methods."""
- l2network_db.create_vlanids()
- vlanids = l2network_db.get_all_vlanids()
- self.assertTrue(len(vlanids) > 0)
- vlanid = l2network_db.reserve_vlanid()
- used = l2network_db.is_vlanid_used(vlanid)
- self.assertTrue(used)
- used = l2network_db.release_vlanid(vlanid)
- self.assertFalse(used)
- #counting on default teardown here to clear db
-
- def teardown_network(self):
- """tearDown Network table."""
- LOG.debug("Tearing Down Network")
- nets = self.quantum.get_all_networks("t1")
- for net in nets:
- netid = net["net-id"]
- self.quantum.delete_network(netid)
-
- def teardown_port(self):
- """tearDown Port table."""
- LOG.debug("Tearing Down Port")
- nets = self.quantum.get_all_networks("t1")
- for net in nets:
- netid = net["net-id"]
- ports = self.quantum.get_all_ports(netid)
- for port in ports:
- portid = port["port-id"]
- self.quantum.delete_port(netid, portid)
-
- def teardown_vlanbinding(self):
- """tearDown VlanBinding table."""
- LOG.debug("Tearing Down Vlan Binding")
- vlans = self.dbtest.get_all_vlan_bindings()
- for vlan in vlans:
- netid = vlan["net-id"]
- self.dbtest.delete_vlan_binding(netid)
-
-
-class QuantumDBTest(base.BaseTestCase):
- """Class conisting of Quantum DB unit tests."""
- def setUp(self):
- """Setup for tests."""
- super(QuantumDBTest, self).setUp()
- l2network_db.initialize()
- self.addCleanup(db.clear_db)
- self.dbtest = QuantumDB()
- self.tenant_id = "t1"
- LOG.debug("Setup")
-
- def testa_create_network(self):
- """Test to create network."""
- net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
- self.assertTrue(net1["net-name"] == "plugin_test1")
- self.teardown_network_port()
-
- def testb_get_networks(self):
- """Test to get all networks."""
- net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
- self.assertTrue(net1["net-name"] == "plugin_test1")
- net2 = self.dbtest.create_network(self.tenant_id, "plugin_test2")
- self.assertTrue(net2["net-name"] == "plugin_test2")
- nets = self.dbtest.get_all_networks(self.tenant_id)
- count = 0
- for net in nets:
- if "plugin_test" in net["net-name"]:
- count += 1
- self.assertTrue(count == 2)
- self.teardown_network_port()
-
- def testc_delete_network(self):
- """Test to delete network."""
- net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
- self.assertTrue(net1["net-name"] == "plugin_test1")
- self.dbtest.delete_network(net1["net-id"])
- nets = self.dbtest.get_all_networks(self.tenant_id)
- count = 0
- for net in nets:
- if "plugin_test1" in net["net-name"]:
- count += 1
- self.assertTrue(count == 0)
- self.teardown_network_port()
-
- def testd_update_network(self):
- """Test to update (rename) network."""
- net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
- self.assertTrue(net1["net-name"] == "plugin_test1")
- net = self.dbtest.update_network(self.tenant_id, net1["net-id"],
- name="plugin_test1_renamed")
- self.assertTrue(net["net-name"] == "plugin_test1_renamed")
- self.teardown_network_port()
-
- def teste_create_port(self):
- """Test to create port."""
- net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
- port = self.dbtest.create_port(net1["net-id"])
- self.assertTrue(port["net-id"] == net1["net-id"])
- ports = self.dbtest.get_all_ports(net1["net-id"])
- count = 0
- for por in ports:
- count += 1
- self.assertTrue(count == 1)
- self.teardown_network_port()
-
- def testf_delete_port(self):
- """Test to delete port."""
- net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
- port = self.dbtest.create_port(net1["net-id"])
- self.assertTrue(port["net-id"] == net1["net-id"])
- ports = self.dbtest.get_all_ports(net1["net-id"])
- count = 0
- for por in ports:
- count += 1
- self.assertTrue(count == 1)
- for por in ports:
- self.dbtest.delete_port(net1["net-id"], por["port-id"])
- ports = self.dbtest.get_all_ports(net1["net-id"])
- count = 0
- for por in ports:
- count += 1
- self.assertTrue(count == 0)
- self.teardown_network_port()
-
- def testg_plug_unplug_interface(self):
- """Test to plug/unplug interface."""
- net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
- port1 = self.dbtest.create_port(net1["net-id"])
- self.dbtest.plug_interface(net1["net-id"], port1["port-id"], "vif1.1")
- port = self.dbtest.get_port(net1["net-id"], port1["port-id"])
- self.assertTrue(port[0]["int-id"] == "vif1.1")
- self.dbtest.unplug_interface(net1["net-id"], port1["port-id"])
- port = self.dbtest.get_port(net1["net-id"], port1["port-id"])
- self.assertTrue(port[0]["int-id"] is None)
- self.teardown_network_port()
-
- def testh_joined_test(self):
- """Test to get network and port."""
- net1 = self.dbtest.create_network("t1", "net1")
- port1 = self.dbtest.create_port(net1["net-id"])
- self.assertTrue(port1["net-id"] == net1["net-id"])
- port2 = self.dbtest.create_port(net1["net-id"])
- self.assertTrue(port2["net-id"] == net1["net-id"])
- ports = self.dbtest.get_all_ports(net1["net-id"])
- for port in ports:
- net = port["net"]
- LOG.debug("Port id %s Net id %s" % (port["port-id"], net.uuid))
- self.teardown_joined_test()
-
- def teardown_network_port(self):
- """tearDown for Network and Port table."""
- networks = self.dbtest.get_all_networks(self.tenant_id)
- for net in networks:
- netid = net["net-id"]
- name = net["net-name"]
- if "plugin_test" in name:
- ports = self.dbtest.get_all_ports(netid)
- for por in ports:
- self.dbtest.delete_port(netid, por["port-id"])
- self.dbtest.delete_network(netid)
-
- def teardown_joined_test(self):
- """tearDown for joined Network and Port test."""
- LOG.debug("Tearing Down Network and Ports")
- nets = self.dbtest.get_all_networks("t1")
- for net in nets:
- netid = net["net-id"]
- ports = self.dbtest.get_all_ports(netid)
- for port in ports:
- self.dbtest.delete_port(port["net-id"], port["port-id"])
- self.dbtest.delete_network(netid)
+++ /dev/null
-[pipeline:extensions_app_with_filter]
-pipeline = extensions extensions_test_app
-
-[filter:extensions]
-paste.filter_factory = quantum.common.extensions:plugin_aware_extension_middleware_factory
-
-[app:extensions_test_app]
-paste.app_factory = quantum.plugins.cisco.tests.unit.test_cisco_extension:app_factory
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import __builtin__
-setattr(__builtin__, '_', lambda x: x)
+++ /dev/null
-[DEFAULT]
-# Show more verbose log output (sets INFO log level output)
-verbose = True
-
-# Show debugging output in logs (sets DEBUG log level output)
-debug = False
-
-# Address to bind the API server
-bind_host = 0.0.0.0
-
-# Port the bind the API server to
-bind_port = 9696
-
-# Path to the extensions
-api_extensions_path = ../../../../../extensions
-
-# Paste configuration file
-api_paste_config = api-paste.ini.cisco.test
-
-core_plugin = quantum.plugins.cisco.network_plugin.PluginV2
-
-# The messaging module to use, defaults to kombu.
-rpc_backend = quantum.openstack.common.rpc.impl_fake
-
-[QUOTAS]
-# resource name(s) that are supported in quota features
-quota_items = network,subnet,port
-
-# default number of resource allowed per tenant, minus for unlimited
-default_quota = -1
-
-# number of networks allowed per tenant, and minus means unlimited
-# quota_network = 10
-
-# number of subnets allowed per tenant, and minus means unlimited
-# quota_subnet = 10
-
-# number of ports allowed per tenant, and minus means unlimited
-# quota_port = 50
-
-# default driver to use for quota checks
-# quota_driver = quantum.quota.ConfDriver
-quota_driver = quantum.db.quota_db.DbQuotaDriver
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (c) 2012 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import inspect
-import logging
-import os
-
-import mock
-from oslo.config import cfg
-import webtest
-
-from quantum.api.extensions import PluginAwareExtensionManager
-from quantum.api.v2 import router
-from quantum.common import config
-from quantum.manager import QuantumManager
-from quantum.tests.unit import test_api_v2
-
-
-LOG = logging.getLogger(__name__)
-
-
-def curdir(*p):
- return os.path.join(os.path.dirname(__file__), *p)
-
-
-class APIv2TestCase(test_api_v2.APIv2TestCase):
-
- def setUp(self):
- super(APIv2TestCase, self).setUp()
- plugin = 'quantum.plugins.cisco.network_plugin.PluginV2'
- # Ensure 'stale' patched copies of the plugin are never returned
- QuantumManager._instance = None
- # Ensure existing ExtensionManager is not used
- PluginAwareExtensionManager._instance = None
- # Create the default configurations
- args = ['--config-file', curdir('quantumv2.conf.cisco.test')]
- config.parse(args=args)
- # Update the plugin
- cfg.CONF.set_override('core_plugin', plugin)
-
- self._plugin_patcher = mock.patch(plugin, autospec=True)
- self.plugin = self._plugin_patcher.start()
-
- api = router.APIRouter()
- self.api = webtest.TestApp(api)
- LOG.debug("%s.%s.%s done" % (__name__, self.__class__.__name__,
- inspect.stack()[0][3]))
-
-
-class JSONV2TestCase(APIv2TestCase, test_api_v2.JSONV2TestCase):
-
- pass
]
-file_black_list = ["./quantum/plugins/cisco/tests/unit",
- "./quantum/tests/unit",
+file_black_list = ["./quantum/tests/unit",
"./quantum/openstack",
"./quantum/plugins/bigswitch/tests"]