1 # Copyright 2012 OpenStack Foundation
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
25 from oslo_log import log as logging
26 from oslo_serialization import jsonutils as json
27 from oslo_utils import importutils
29 from six.moves.urllib import parse
33 from neutron.tests.api import clients
34 from neutron.tests.tempest.common import credentials
35 import neutron.tests.tempest.common.generator.valid_generator as valid
36 from neutron.tests.tempest import config
37 from neutron.tests.tempest import exceptions
39 LOG = logging.getLogger(__name__)
44 def attr(*args, **kwargs):
45 """A decorator which applies the testtools attr decorator
47 This decorator applies the testtools.testcase.attr if it is in the list of
48 attributes to testtools we want to apply.
52 if 'type' in kwargs and isinstance(kwargs['type'], str):
53 f = testtools.testcase.attr(kwargs['type'])(f)
54 if kwargs['type'] == 'smoke':
55 f = testtools.testcase.attr('gate')(f)
56 elif 'type' in kwargs and isinstance(kwargs['type'], list):
57 for attr in kwargs['type']:
58 f = testtools.testcase.attr(attr)(f)
60 f = testtools.testcase.attr('gate')(f)
66 def idempotent_id(id):
67 """Stub for metadata decorator"""
68 if not isinstance(id, six.string_types):
69 raise TypeError('Test idempotent_id must be string not %s'
70 '' % type(id).__name__)
74 f = testtools.testcase.attr('id-%s' % id)(f)
76 f.__doc__ = 'Test idempotent id: %s\n%s' % (id, f.__doc__)
78 f.__doc__ = 'Test idempotent id: %s' % id
83 def get_service_list():
85 'compute': CONF.service_available.nova,
86 'image': CONF.service_available.glance,
87 'baremetal': CONF.service_available.ironic,
88 'volume': CONF.service_available.cinder,
89 'orchestration': CONF.service_available.heat,
90 # NOTE(mtreinish) nova-network will provide networking functionality
91 # if neutron isn't available, so always set to True.
94 'object_storage': CONF.service_available.swift,
95 'dashboard': CONF.service_available.horizon,
96 'telemetry': CONF.service_available.ceilometer,
97 'data_processing': CONF.service_available.sahara,
98 'database': CONF.service_available.trove
103 def services(*args, **kwargs):
104 """A decorator used to set an attr for each service used in a test case
106 This decorator applies a testtools attr for each service that gets
107 exercised by a test case.
110 services = ['compute', 'image', 'baremetal', 'volume', 'orchestration',
111 'network', 'identity', 'object_storage', 'dashboard',
112 'telemetry', 'data_processing', 'database']
114 if service not in services:
115 raise exceptions.InvalidServiceTag('%s is not a valid '
117 attr(type=list(args))(f)
120 def wrapper(self, *func_args, **func_kwargs):
121 service_list = get_service_list()
124 if not service_list[service]:
125 msg = 'Skipped because the %s service is not available' % (
127 raise testtools.TestCase.skipException(msg)
128 return f(self, *func_args, **func_kwargs)
133 def stresstest(*args, **kwargs):
134 """Add stress test decorator
136 For all functions with this decorator a attr stress will be
139 @param class_setup_per: allowed values are application, process, action
140 ``application``: once in the stress job lifetime
141 ``process``: once in the worker process lifetime
142 ``action``: on each action
143 @param allow_inheritance: allows inheritance of this attribute
146 if 'class_setup_per' in kwargs:
147 setattr(f, "st_class_setup_per", kwargs['class_setup_per'])
149 setattr(f, "st_class_setup_per", 'process')
150 if 'allow_inheritance' in kwargs:
151 setattr(f, "st_allow_inheritance", kwargs['allow_inheritance'])
153 setattr(f, "st_allow_inheritance", False)
154 attr(type='stress')(f)
159 def requires_ext(*args, **kwargs):
160 """A decorator to skip tests if an extension is not enabled
166 @functools.wraps(func)
167 def wrapper(*func_args, **func_kwargs):
168 if not is_extension_enabled(kwargs['extension'],
170 msg = "Skipped because %s extension: %s is not enabled" % (
171 kwargs['service'], kwargs['extension'])
172 raise testtools.TestCase.skipException(msg)
173 return func(*func_args, **func_kwargs)
178 def is_extension_enabled(extension_name, service):
179 """A function that will check the list of enabled extensions from config
183 'compute': CONF.compute_feature_enabled.api_extensions,
184 'volume': CONF.volume_feature_enabled.api_extensions,
185 'network': CONF.network_feature_enabled.api_extensions,
186 'object': CONF.object_storage_feature_enabled.discoverable_apis,
188 if len(config_dict[service]) == 0:
190 if config_dict[service][0] == 'all':
192 if extension_name in config_dict[service]:
200 def validate_tearDownClass():
203 "tearDownClass does not call the super's "
204 "tearDownClass in these classes: \n"
208 atexit.register(validate_tearDownClass)
211 class BaseTestCase(testtools.testcase.WithAttributes,
213 """The test base class defines Tempest framework for class level fixtures.
214 `setUpClass` and `tearDownClass` are defined here and cannot be overwritten
215 by subclasses (enforced via hacking rule T105).
217 Set-up is split in a series of steps (setup stages), which can be
218 overwritten by test classes. Set-up stages are:
224 Tear-down is also split in a series of steps (teardown stages), which are
225 stacked for execution only if the corresponding setup stage had been
226 reached during the setup phase. Tear-down stages are:
227 - clear_isolated_creds (defined in the base test class)
231 setUpClassCalled = False
234 network_resources = {}
236 # NOTE(sdague): log_format is defined inline here instead of using the oslo
237 # default because going through the config path recouples config to the
238 # stress tests too early, and depending on testr order will fail unit tests
239 log_format = ('%(asctime)s %(process)d %(levelname)-8s '
240 '[%(name)s] %(message)s')
244 # It should never be overridden by descendants
245 if hasattr(super(BaseTestCase, cls), 'setUpClass'):
246 super(BaseTestCase, cls).setUpClass()
247 cls.setUpClassCalled = True
248 # Stack of (name, callable) to be invoked in reverse order at teardown
250 # All the configuration checks that may generate a skip
253 # Allocation of all required credentials and client managers
254 cls.teardowns.append(('credentials', cls.clear_isolated_creds))
255 cls.setup_credentials()
256 # Shortcuts to clients
258 # Additional class-wide test resources
259 cls.teardowns.append(('resources', cls.resource_cleanup))
262 etype, value, trace = sys.exc_info()
263 LOG.info("%s raised in %s.setUpClass. Invoking tearDownClass." % (
264 etype, cls.__name__))
267 raise etype, value, trace
269 del trace # to avoid circular refs
272 def tearDownClass(cls):
273 at_exit_set.discard(cls)
274 # It should never be overridden by descendants
275 if hasattr(super(BaseTestCase, cls), 'tearDownClass'):
276 super(BaseTestCase, cls).tearDownClass()
277 # Save any existing exception, we always want to re-raise the original
279 etype, value, trace = sys.exc_info()
280 # If there was no exception during setup we shall re-raise the first
281 # exception in teardown
282 re_raise = (etype is None)
284 name, teardown = cls.teardowns.pop()
285 # Catch any exception in tearDown so we can re-raise the original
286 # exception at the end
289 except Exception as te:
290 sys_exec_info = sys.exc_info()
291 tetype = sys_exec_info[0]
292 # TODO(andreaf): Till we have the ability to cleanup only
293 # resources that were successfully setup in resource_cleanup,
294 # log AttributeError as info instead of exception.
295 if tetype is AttributeError and name == 'resources':
296 LOG.info("tearDownClass of %s failed: %s" % (name, te))
298 LOG.exception("teardown of %s failed: %s" % (name, te))
300 etype, value, trace = sys_exec_info
301 # If exceptions were raised during teardown, an not before, re-raise
303 if re_raise and etype is not None:
305 raise etype, value, trace
307 del trace # to avoid circular refs
310 def skip_checks(cls):
311 """Class level skip checks. Subclasses verify in here all
312 conditions that might prevent the execution of the entire test class.
313 Checks implemented here may not make use API calls, and should rely on
315 In general skip checks that require an API call are discouraged.
316 If one is really needed it may be implemented either in the
317 resource_setup or at test level.
322 def setup_credentials(cls):
323 """Allocate credentials and the client managers from them."""
324 # TODO(andreaf) There is a fair amount of code that could me moved from
325 # base / test classes in here. Ideally tests should be able to only
326 # specify a list of (additional) credentials the need to use.
330 def setup_clients(cls):
331 """Create links to the clients into the test object."""
332 # TODO(andreaf) There is a fair amount of code that could me moved from
333 # base / test classes in here. Ideally tests should be able to only
334 # specify which client is `client` and nothing else.
338 def resource_setup(cls):
339 """Class level resource setup for test cases.
344 def resource_cleanup(cls):
345 """Class level resource cleanup for test cases.
346 Resource cleanup must be able to handle the case of partially setup
347 resources, in case a failure during `resource_setup` should happen.
352 super(BaseTestCase, self).setUp()
353 if not self.setUpClassCalled:
354 raise RuntimeError("setUpClass does not calls the super's"
356 + self.__class__.__name__)
357 at_exit_set.add(self.__class__)
358 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
360 test_timeout = int(test_timeout)
364 self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
366 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
367 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
368 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
369 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
370 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
371 os.environ.get('OS_STDERR_CAPTURE') == '1'):
372 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
373 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
374 if (os.environ.get('OS_LOG_CAPTURE') != 'False' and
375 os.environ.get('OS_LOG_CAPTURE') != '0'):
376 self.useFixture(fixtures.LoggerFixture(nuke_handlers=False,
377 format=self.log_format,
381 def get_client_manager(cls):
383 Returns an OpenStack client manager
385 force_tenant_isolation = getattr(cls, 'force_tenant_isolation', None)
387 if (not hasattr(cls, 'isolated_creds') or
388 not cls.isolated_creds.name == cls.__name__):
389 cls.isolated_creds = credentials.get_isolated_credentials(
390 name=cls.__name__, network_resources=cls.network_resources,
391 force_tenant_isolation=force_tenant_isolation,
394 creds = cls.isolated_creds.get_primary_creds()
395 os = clients.Manager(credentials=creds, service=cls._service)
399 def clear_isolated_creds(cls):
401 Clears isolated creds if set
403 if hasattr(cls, 'isolated_creds'):
404 cls.isolated_creds.clear_isolated_creds()
407 def _get_identity_admin_client(cls):
409 Returns an instance of the Identity Admin API client
411 os = clients.AdminManager(service=cls._service)
412 admin_client = os.identity_client
416 def set_network_resources(cls, network=False, router=False, subnet=False,
418 """Specify which network resources should be created
425 # network resources should be set only once from callers
426 # in order to ensure that even if it's called multiple times in
427 # a chain of overloaded methods, the attribute is set only
429 if not cls.network_resources:
430 cls.network_resources = {
436 def assertEmpty(self, list, msg=None):
437 self.assertTrue(len(list) == 0, msg)
439 def assertNotEmpty(self, list, msg=None):
440 self.assertTrue(len(list) > 0, msg)
443 class NegativeAutoTest(BaseTestCase):
449 super(NegativeAutoTest, cls).setUpClass()
450 os = cls.get_client_manager()
451 cls.client = os.negative_client
452 os_admin = clients.AdminManager(service=cls._service)
453 cls.admin_client = os_admin.negative_client
456 def load_tests(*args):
458 Wrapper for testscenarios to set the mandatory scenarios variable
459 only in case a real test loader is in place. Will be automatically
460 called in case the variable "load_tests" is set.
462 if getattr(args[0], 'suiteClass', None) is not None:
463 loader, standard_tests, pattern = args
465 standard_tests, module, loader = args
466 for test in testtools.iterate_tests(standard_tests):
467 schema = getattr(test, '_schema', None)
468 if schema is not None:
469 setattr(test, 'scenarios',
470 NegativeAutoTest.generate_scenario(schema))
471 return testscenarios.load_tests_apply_scenarios(*args)
474 def generate_scenario(description):
476 Generates the test scenario list for a given description.
478 :param description: A file or dictionary with the following entries:
479 name (required) name for the api
480 http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
481 url (required) the url to be appended to the catalog url with '%s'
482 for each resource mentioned
483 resources: (optional) A list of resource names such as "server",
484 "flavor", etc. with an element for each '%s' in the url. This
485 method will call self.get_resource for each element when
486 constructing the positive test case template so negative
487 subclasses are expected to return valid resource ids when
489 json-schema (optional) A valid json schema that will be used to
490 create invalid data for the api calls. For "GET" and "HEAD",
491 the data is used to generate query strings appended to the url,
492 otherwise for the body of the http call.
494 LOG.debug(description)
495 generator = importutils.import_class(
496 CONF.negative.test_generator)()
497 generator.validate_schema(description)
498 schema = description.get("json-schema", None)
499 resources = description.get("resources", [])
501 expected_result = None
502 for resource in resources:
503 if isinstance(resource, dict):
504 expected_result = resource['expected_result']
505 resource = resource['name']
506 LOG.debug("Add resource to test %s" % resource)
507 scn_name = "inv_res_%s" % (resource)
508 scenario_list.append((scn_name, {"resource": (resource,
510 "expected_result": expected_result
512 if schema is not None:
513 for scenario in generator.generate_scenarios(schema):
514 scenario_list.append((scenario['_negtest_name'],
516 LOG.debug(scenario_list)
519 def execute(self, description):
521 Execute a http call on an api that are expected to
522 result in client errors. First it uses invalid resources that are part
523 of the url, and then invalid data for queries and http request bodies.
525 :param description: A json file or dictionary with the following
527 name (required) name for the api
528 http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
529 url (required) the url to be appended to the catalog url with '%s'
530 for each resource mentioned
531 resources: (optional) A list of resource names such as "server",
532 "flavor", etc. with an element for each '%s' in the url. This
533 method will call self.get_resource for each element when
534 constructing the positive test case template so negative
535 subclasses are expected to return valid resource ids when
537 json-schema (optional) A valid json schema that will be used to
538 create invalid data for the api calls. For "GET" and "HEAD",
539 the data is used to generate query strings appended to the url,
540 otherwise for the body of the http call.
543 LOG.info("Executing %s" % description["name"])
544 LOG.debug(description)
545 generator = importutils.import_class(
546 CONF.negative.test_generator)()
547 schema = description.get("json-schema", None)
548 method = description["http-method"]
549 url = description["url"]
550 expected_result = None
551 if "default_result_code" in description:
552 expected_result = description["default_result_code"]
554 resources = [self.get_resource(r) for
555 r in description.get("resources", [])]
557 if hasattr(self, "resource"):
558 # Note(mkoderer): The resources list already contains an invalid
559 # entry (see get_resource).
560 # We just send a valid json-schema with it
564 valid.ValidTestGenerator().generate_valid(schema)
565 new_url, body = self._http_arguments(valid_schema, url, method)
566 elif hasattr(self, "_negtest_name"):
567 schema_under_test = \
568 valid.ValidTestGenerator().generate_valid(schema)
569 local_expected_result = \
570 generator.generate_payload(self, schema_under_test)
571 if local_expected_result is not None:
572 expected_result = local_expected_result
574 self._http_arguments(schema_under_test, url, method)
576 raise Exception("testscenarios are not active. Please make sure "
577 "that your test runner supports the load_tests "
580 if "admin_client" in description and description["admin_client"]:
581 client = self.admin_client
584 resp, resp_body = client.send_request(method, new_url,
585 resources, body=body)
586 self._check_negative_response(expected_result, resp.status, resp_body)
588 def _http_arguments(self, json_dict, url, method):
589 LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method))
592 elif method in ["GET", "HEAD", "PUT", "DELETE"]:
593 return "%s?%s" % (url, parse.urlencode(json_dict)), None
595 return url, json.dumps(json_dict)
597 def _check_negative_response(self, expected_result, result, body):
598 self.assertTrue(result >= 400 and result < 500 and result != 413,
599 "Expected client error, got %s:%s" %
601 self.assertTrue(expected_result is None or expected_result == result,
602 "Expected %s, got %s:%s" %
603 (expected_result, result, body))
606 def set_resource(cls, name, resource):
608 This function can be used in setUpClass context to register a resoruce
611 :param name: The name of the kind of resource such as "flavor", "role",
613 :resource: The id of the resource
615 cls._resources[name] = resource
617 def get_resource(self, name):
619 Return a valid uuid for a type of resource. If a real resource is
620 needed as part of a url then this method should return one. Otherwise
623 :param name: The name of the kind of resource such as "flavor", "role",
626 if isinstance(name, dict):
628 if hasattr(self, "resource") and self.resource[0] == name:
629 LOG.debug("Return invalid resource (%s) value: %s" %
630 (self.resource[0], self.resource[1]))
631 return self.resource[1]
632 if name in self._resources:
633 return self._resources[name]
637 def SimpleNegativeAutoTest(klass):
639 This decorator registers a test function on basis of the class name.
641 @attr(type=['negative', 'gate'])
642 def generic_test(self):
643 if hasattr(self, '_schema'):
644 self.execute(self._schema)
647 cn = cn.replace('JSON', '')
648 cn = cn.replace('Test', '')
649 # NOTE(mkoderer): replaces uppercase chars inside the class name with '_'
650 lower_cn = re.sub('(?<!^)(?=[A-Z])', '_', cn).lower()
651 func_name = 'test_%s' % lower_cn
652 setattr(klass, func_name, generic_test)
656 def call_until_true(func, duration, sleep_for):
658 Call the given function until it returns True (and return True) or
659 until the specified duration (in seconds) elapses (and return
662 :param func: A zero argument callable that returns True on success.
663 :param duration: The number of seconds for which to attempt a
664 successful call of the function.
665 :param sleep_for: The number of seconds to sleep after an unsuccessful
666 invocation of the function.
669 timeout = now + duration
673 time.sleep(sleep_for)