quantum.egg-info/
quantum/vcsversion.py
quantum/versioninfo
-run_tests.err.log
-run_tests.log
setuptools*.egg/
-subunit.log
+*.log
*.mo
*.sw?
*~
--- /dev/null
+# Copyright 2014, Red Hat Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+This module defines functional tests for the Neutron V2 API in the
+BaseTestApi class. The intention is that the class will be overridden
+and configured for use with testscenarios as follows:
+
+ - A subclass should override the 'scenarios' class member with a
+ list of tuple pairs, e.g.
+
+ scenarios = [('scenario_id', dict(client=Client())]
+
+ The first element of each scenario tuple is a user-defined textual
+ id, and the second element is a dictionary whose client parameter
+ should be a subclass of BaseNeutronClient.
+
+ - The module containing the test class should defines a 'load_tests'
+ variable as follows:
+
+ load_tests = testscenarios.load_tests_apply_scenarios
+
+Examples of use include:
+
+ neutron.tests.functional.api.test_v2_plugin - targets the plugin api
+ for each configured plugin
+
+ neutron.tests.api.test_v2_rest_client - targets neutron server
+ via the tempest rest client
+
+ The tests in neutron.tests.api depend on Neutron and Tempest being
+ deployed (e.g. with Devstack) and are intended to be run in advisory
+ check jobs.
+
+Reference: https://pypi.python.org/pypi/testscenarios/
+"""
+
+import testtools
+
+from neutron.tests import sub_base
+
+
+class AttributeDict(dict):
+
+ """
+ Provide attribute access (dict.key) to dictionary values.
+ """
+
+ def __getattr__(self, name):
+ """Allow attribute access for all keys in the dict."""
+ if name in self:
+ return self[name]
+ raise AttributeError(_("Unknown attribute '%s'.") % name)
+
+
+class BaseNeutronClient(object):
+ """
+ Base class for a client that can interact the neutron api in some
+ manner.
+
+ Reference: :file:`neutron/neutron_plugin_base_v2.py`
+ """
+
+ def setUp(self, test_case):
+ """Configure the api for use with a test case
+
+ :param test_case: The test case that will exercise the api
+ """
+ self.test_case = test_case
+
+ @property
+ def NotFound(self):
+ """The exception that indicates a resource could not be found.
+
+ Tests can use this property to assert for a missing resource
+ in a client-agnostic way.
+ """
+ raise NotImplementedError()
+
+ def create_network(self, **kwargs):
+ raise NotImplementedError()
+
+ def update_network(self, id_, **kwargs):
+ raise NotImplementedError()
+
+ def get_network(self, id_, fields=None):
+ raise NotImplementedError()
+
+ def get_networks(self, filters=None, fields=None,
+ sorts=None, limit=None, marker=None, page_reverse=False):
+ raise NotImplementedError()
+
+ def delete_network(self, id_):
+ raise NotImplementedError()
+
+
+class BaseTestApi(sub_base.SubBaseTestCase):
+
+ scenarios = ()
+
+ def setUp(self, setup_parent=True):
+ # Calling the parent setUp is optional - the subclass may be
+ # calling it already via a different ancestor.
+ if setup_parent:
+ super(BaseTestApi, self).setUp()
+ self.client.setUp(self)
+
+ def test_network_lifecycle(self):
+ net = self.client.create_network(name=sub_base.get_rand_name())
+ listed_networks = dict((x.id, x.name)
+ for x in self.client.get_networks())
+ self.assertIn(net.id, listed_networks)
+ self.assertEqual(listed_networks[net.id], net.name,
+ 'Listed network name is not as expected.')
+ updated_name = 'new %s' % net.name
+ updated_net = self.client.update_network(net.id, name=updated_name)
+ self.assertEqual(updated_name, updated_net.name,
+ 'Updated network name is not as expected.')
+ self.client.delete_network(net.id)
+ with testtools.ExpectedException(self.client.NotFound,
+ msg='Network was not deleted'):
+ self.client.get_network(net.id)
--- /dev/null
+# Copyright 2014, Red Hat Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+This module implements BaseNeutronClient for the Tempest rest client
+and configures the api tests with scenarios targeting the Neutron API.
+"""
+
+from tempest import exceptions
+from tempest import test as t_test
+import testscenarios
+
+from neutron.tests.api import base_v2
+
+
+# Required to generate tests from scenarios. Not compatible with nose.
+load_tests = testscenarios.load_tests_apply_scenarios
+
+
+class TempestRestClient(base_v2.BaseNeutronClient):
+
+ @property
+ def client(self):
+ if not hasattr(self, '_client'):
+ manager = t_test.BaseTestCase.get_client_manager(interface='json')
+ self._client = manager.network_client
+ return self._client
+
+ @property
+ def NotFound(self):
+ return exceptions.NotFound
+
+ def _cleanup_network(self, id_):
+ try:
+ self.delete_network(id_)
+ except self.NotFound:
+ pass
+
+ def create_network(self, **kwargs):
+ network = self._create_network(**kwargs)
+ self.test_case.addCleanup(self._cleanup_network, network.id)
+ return network
+
+ def _create_network(self, **kwargs):
+ # Internal method - use create_network() instead
+ body = self.client.create_network(**kwargs)
+ return base_v2.AttributeDict(body['network'])
+
+ def update_network(self, id_, **kwargs):
+ body = self.client.update_network(id_, **kwargs)
+ return base_v2.AttributeDict(body['network'])
+
+ def get_network(self, id_, **kwargs):
+ body = self.client.show_network(id_, **kwargs)
+ return base_v2.AttributeDict(body['network'])
+
+ def get_networks(self, **kwargs):
+ body = self.client.list_networks(**kwargs)
+ return [base_v2.AttributeDict(x) for x in body['networks']]
+
+ def delete_network(self, id_):
+ self.client.delete_network(id_)
+
+
+class TestApiWithRestClient(base_v2.BaseTestApi):
+ scenarios = [('tempest', {'client': TempestRestClient()})]
# License for the specific language governing permissions and limitations
# under the License.
-import random
-
import netaddr
from oslo.config import cfg
from neutron.openstack.common import uuidutils
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base as functional_base
+from neutron.tests import sub_base
BR_PREFIX = 'test-br'
#TODO(jschwarz): Move these two functions to neutron/tests/common/
-def get_rand_name(max_length=None, prefix='test'):
- name = prefix + str(random.randint(1, 0x7fffffff))
- return name[:max_length] if max_length is not None else name
+get_rand_name = sub_base.get_rand_name
def get_rand_veth_name():
--- /dev/null
+# Copyright 2014, Red Hat Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+This module implements BaseNeutronClient for the programmatic plugin
+api and configures the api tests with scenarios targeting individual
+plugins.
+"""
+
+import testscenarios
+
+from neutron.common import exceptions as q_exc
+from neutron import context
+from neutron import manager
+from neutron.tests.api import base_v2
+from neutron.tests.unit.ml2 import test_ml2_plugin
+from neutron.tests.unit import testlib_api
+from neutron.tests.unit import testlib_plugin
+
+
+# Each plugin must add a class to plugin_configurations that can configure the
+# plugin for use with PluginClient. For a given plugin, the setup
+# used for NeutronDbPluginV2TestCase can usually be reused. See the
+# configuration classes listed below for examples of this reuse.
+
+#TODO(marun) Discover plugin conf via a metaclass
+plugin_configurations = [
+ test_ml2_plugin.Ml2PluginConf,
+]
+
+
+# Required to generate tests from scenarios. Not compatible with nose.
+load_tests = testscenarios.load_tests_apply_scenarios
+
+
+class PluginClient(base_v2.BaseNeutronClient):
+
+ @property
+ def ctx(self):
+ if not hasattr(self, '_ctx'):
+ self._ctx = context.Context('', 'test-tenant')
+ return self._ctx
+
+ @property
+ def plugin(self):
+ return manager.NeutronManager.get_plugin()
+
+ @property
+ def NotFound(self):
+ return q_exc.NetworkNotFound
+
+ def create_network(self, **kwargs):
+ # Supply defaults that are expected to be set by the api
+ # framwork
+ kwargs.setdefault('admin_state_up', True)
+ kwargs.setdefault('shared', False)
+ data = dict(network=kwargs)
+ result = self.plugin.create_network(self.ctx, data)
+ return base_v2.AttributeDict(result)
+
+ def update_network(self, id_, **kwargs):
+ data = dict(network=kwargs)
+ result = self.plugin.update_network(self.ctx, id_, data)
+ return base_v2.AttributeDict(result)
+
+ def get_network(self, *args, **kwargs):
+ result = self.plugin.get_network(self.ctx, *args, **kwargs)
+ return base_v2.AttributeDict(result)
+
+ def get_networks(self, *args, **kwargs):
+ result = self.plugin.get_networks(self.ctx, *args, **kwargs)
+ return [base_v2.AttributeDict(x) for x in result]
+
+ def delete_network(self, id_):
+ self.plugin.delete_network(self.ctx, id_)
+
+
+def get_scenarios():
+ scenarios = []
+ client = PluginClient()
+ for conf in plugin_configurations:
+ name = conf.plugin_name
+ class_name = name[name.rfind('.') + 1:]
+ scenarios.append((class_name, {'client': client, 'plugin_conf': conf}))
+ return scenarios
+
+
+class TestPluginApi(base_v2.BaseTestApi,
+ testlib_api.SqlTestCase,
+ testlib_plugin.PluginSetupHelper):
+
+ scenarios = get_scenarios()
+
+ def setUp(self):
+ # BaseTestApi is not based on BaseTestCase to avoid import
+ # errors when importing Tempest. When targeting the plugin
+ # api, it is necessary to avoid calling BaseTestApi's parent
+ # setUp, since that setup will be called by SqlTestCase.setUp.
+ super(TestPluginApi, self).setUp(setup_parent=False)
+ testlib_api.SqlTestCase.setUp(self)
+ self.setup_coreplugin(self.plugin_conf.plugin_name)
+ self.plugin_conf.setUp(self)
import logging as std_logging
import os
import os.path
+import random
import traceback
import eventlet.timeout
LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
+def get_rand_name(max_length=None, prefix='test'):
+ name = prefix + str(random.randint(1, 0x7fffffff))
+ return name[:max_length] if max_length is not None else name
+
+
def bool_from_env(key, strict=False, default=False):
value = os.environ.get(key)
return strutils.bool_from_string(value, strict=strict, default=default)
# under the License.
import contextlib
+import functools
import mock
import testtools
import uuid
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
+class Ml2PluginConf(object):
+ """Plugin configuration shared across the unit and functional tests.
+
+ TODO(marun) Evolve a configuration interface usable across all plugins.
+ """
+
+ plugin_name = PLUGIN_NAME
+
+ @staticmethod
+ def setUp(test_case, parent_setup=None):
+ """Perform additional configuration around the parent's setUp."""
+ if parent_setup:
+ parent_setup()
+ test_case.port_create_status = 'DOWN'
+
+
class Ml2PluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
- _plugin_name = PLUGIN_NAME
_mechanism_drivers = ['logger', 'test']
- def setUp(self):
- # We need a L3 service plugin
+ def setup_parent(self):
+ """Perform parent setup with the common plugin configuration class."""
l3_plugin = ('neutron.tests.unit.test_l3_plugin.'
'TestL3NatServicePlugin')
service_plugins = {'l3_plugin_name': l3_plugin}
+ # Ensure that the parent setup can be called without arguments
+ # by the common configuration setUp.
+ parent_setup = functools.partial(
+ super(Ml2PluginV2TestCase, self).setUp,
+ plugin=Ml2PluginConf.plugin_name,
+ service_plugins=service_plugins,
+ )
+ Ml2PluginConf.setUp(self, parent_setup)
+
+ def setUp(self):
# Enable the test mechanism driver to ensure that
# we can successfully call through to all mechanism
# driver apis.
config.cfg.CONF.set_override('network_vlan_ranges',
[self.phys_vrange, self.phys2_vrange],
group='ml2_type_vlan')
- super(Ml2PluginV2TestCase, self).setUp(PLUGIN_NAME,
- service_plugins=service_plugins)
- self.port_create_status = 'DOWN'
+ self.setup_parent()
self.driver = ml2_plugin.Ml2Plugin()
self.context = context.get_admin_context()
# tox --hashseed 1235130571 -e hashtest
setenv = VIRTUAL_ENV={envdir}
+[testenv:api]
+setenv = OS_TEST_PATH=./neutron/tests/api
+
[testenv:functional]
setenv = OS_TEST_PATH=./neutron/tests/functional
OS_TEST_TIMEOUT=90