1 # Copyright (c) 2015 Mirantis, Inc.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
18 from collections import namedtuple
20 from oslo_config import cfg
21 from oslo_policy import policy as oslo_policy
22 from oslo_serialization import jsonutils
23 from oslo_utils import uuidutils
25 from pecan import request
26 from pecan import set_config
27 from pecan.testing import load_test_app
30 from neutron.api import extensions
31 from neutron.api.v2 import attributes
32 from neutron.common import exceptions as n_exc
33 from neutron import context
34 from neutron import manager
35 from neutron.pecan_wsgi.controllers import root as controllers
36 from neutron import policy
37 from neutron.tests.unit import testlib_api
39 _SERVICE_PLUGIN_RESOURCE = 'serviceplugin'
40 _SERVICE_PLUGIN_COLLECTION = _SERVICE_PLUGIN_RESOURCE + 's'
41 _SERVICE_PLUGIN_INDEX_BODY = {_SERVICE_PLUGIN_COLLECTION: []}
44 class FakeServicePluginController(object):
45 resource = _SERVICE_PLUGIN_RESOURCE
47 @pecan.expose(generic=True,
48 content_type='application/json',
51 return _SERVICE_PLUGIN_INDEX_BODY
54 class PecanFunctionalTest(testlib_api.SqlTestCase):
57 self.setup_coreplugin('neutron.plugins.ml2.plugin.Ml2Plugin')
58 super(PecanFunctionalTest, self).setUp()
59 self.addCleanup(extensions.PluginAwareExtensionManager.clear_instance)
60 self.addCleanup(set_config, {}, overwrite=True)
61 self.set_config_overrides()
63 self.setup_service_plugin()
66 self.app = load_test_app(os.path.join(
67 os.path.dirname(__file__),
73 pl = manager.NeutronManager.get_plugin()
74 network_id = pl.create_network(context.get_admin_context(), {
76 {'name': 'pecannet', 'tenant_id': 'tenid', 'shared': False,
77 'admin_state_up': True, 'status': 'ACTIVE'}})['id']
78 self.port = pl.create_port(context.get_admin_context(), {
80 {'tenant_id': 'tenid', 'network_id': network_id,
81 'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
82 'mac_address': '00:11:22:33:44:55',
83 'admin_state_up': True, 'device_id': 'FF',
84 'device_owner': 'pecan', 'name': 'pecan'}})
86 def set_config_overrides(self):
87 cfg.CONF.set_override('auth_strategy', 'noauth')
89 def setup_service_plugin(self):
90 manager.NeutronManager.set_controller_for_resource(
91 _SERVICE_PLUGIN_COLLECTION, FakeServicePluginController())
94 class TestV2Controller(PecanFunctionalTest):
97 response = self.app.get('/v2.0/ports.json')
98 self.assertEqual(response.status_int, 200)
101 response = self.app.post_json('/v2.0/ports.json',
102 params={'port': {'network_id': self.port['network_id'],
103 'admin_state_up': True,
104 'tenant_id': 'tenid'}},
105 headers={'X-Project-Id': 'tenid'})
106 self.assertEqual(response.status_int, 201)
109 response = self.app.put_json('/v2.0/ports/%s.json' % self.port['id'],
110 params={'port': {'name': 'test'}},
111 headers={'X-Project-Id': 'tenid'})
112 self.assertEqual(response.status_int, 200)
114 def test_delete(self):
115 response = self.app.delete('/v2.0/ports/%s.json' % self.port['id'],
116 headers={'X-Project-Id': 'tenid'})
117 self.assertEqual(response.status_int, 204)
119 def test_plugin_initialized(self):
120 self.assertIsNotNone(manager.NeutronManager._instance)
122 def test_get_extensions(self):
123 response = self.app.get('/v2.0/extensions.json')
124 self.assertEqual(response.status_int, 200)
126 def test_get_specific_extension(self):
127 response = self.app.get('/v2.0/extensions/allowed-address-pairs.json')
128 self.assertEqual(response.status_int, 200)
130 def test_service_plugin_uri(self):
131 service_plugin = namedtuple('DummyServicePlugin', 'path_prefix')
132 service_plugin.path_prefix = 'dummy'
133 nm = manager.NeutronManager.get_instance()
134 nm.service_plugins['dummy_sp'] = service_plugin
135 response = self.app.get('/v2.0/dummy/serviceplugins.json')
136 self.assertEqual(200, response.status_int)
137 self.assertEqual(_SERVICE_PLUGIN_INDEX_BODY, response.json_body)
140 class TestErrors(PecanFunctionalTest):
143 response = self.app.get('/assert_called_once', expect_errors=True)
144 self.assertEqual(response.status_int, 404)
146 def test_bad_method(self):
147 response = self.app.patch('/v2.0/ports/44.json',
149 self.assertEqual(response.status_int, 405)
152 class TestRequestID(PecanFunctionalTest):
154 def test_request_id(self):
155 response = self.app.get('/')
156 self.assertIn('x-openstack-request-id', response.headers)
158 response.headers['x-openstack-request-id'].startswith('req-'))
159 id_part = response.headers['x-openstack-request-id'].split('req-')[1]
160 self.assertTrue(uuidutils.is_uuid_like(id_part))
163 class TestKeystoneAuth(PecanFunctionalTest):
165 def set_config_overrides(self):
166 # default auth strategy is keystone so we pass
169 def test_auth_enforced(self):
170 response = self.app.get('/', expect_errors=True)
171 self.assertEqual(response.status_int, 401)
174 class TestInvalidAuth(PecanFunctionalTest):
176 # disable normal app setup since it will fail
179 def test_invalid_auth_strategy(self):
180 cfg.CONF.set_override('auth_strategy', 'badvalue')
181 with testtools.ExpectedException(n_exc.InvalidConfigurationOption):
182 load_test_app(os.path.join(os.path.dirname(__file__), 'config.py'))
185 class TestExceptionTranslationHook(PecanFunctionalTest):
187 def test_neutron_nonfound_to_webob_exception(self):
188 # this endpoint raises a Neutron notfound exception. make sure it gets
189 # translated into a 404 error
191 'neutron.pecan_wsgi.controllers.root.CollectionsController.get',
192 side_effect=n_exc.NotFound()
194 response = self.app.get('/v2.0/ports.json', expect_errors=True)
195 self.assertEqual(response.status_int, 404)
197 def test_unexpected_exception(self):
199 'neutron.pecan_wsgi.controllers.root.CollectionsController.get',
200 side_effect=ValueError('secretpassword')
202 response = self.app.get('/v2.0/ports.json', expect_errors=True)
203 self.assertNotIn(response.body, 'secretpassword')
204 self.assertEqual(response.status_int, 500)
207 class TestRequestProcessing(PecanFunctionalTest):
210 super(TestRequestProcessing, self).setUp()
212 # request.context is thread-local storage so it has to be accessed by
213 # the controller. We can capture it into a list here to assert on after
214 # the request finishes.
216 def capture_request_details(*args, **kwargs):
217 self.captured_context = request.context
219 mock.patch('neutron.pecan_wsgi.controllers.root.'
220 'CollectionsController.get',
221 side_effect=capture_request_details).start()
222 mock.patch('neutron.pecan_wsgi.controllers.root.'
223 'CollectionsController.create',
224 side_effect=capture_request_details).start()
225 mock.patch('neutron.pecan_wsgi.controllers.root.ItemController.get',
226 side_effect=capture_request_details).start()
227 # TODO(kevinbenton): add context tests for X-Roles etc
229 def test_context_set_in_request(self):
230 self.app.get('/v2.0/ports.json',
231 headers={'X-Project-Id': 'tenant_id'})
232 self.assertEqual('tenant_id',
233 self.captured_context['neutron_context'].tenant_id)
235 def test_core_resource_identified(self):
236 self.app.get('/v2.0/ports.json')
237 self.assertEqual('port', self.captured_context['resource'])
238 self.assertEqual('ports', self.captured_context['collection'])
240 def test_lookup_identifies_resource_id(self):
241 # We now this will return a 404 but that's not the point as it is
243 self.app.get('/v2.0/ports/reina.json')
244 self.assertEqual('port', self.captured_context['resource'])
245 self.assertEqual('ports', self.captured_context['collection'])
246 self.assertEqual('reina', self.captured_context['resource_id'])
248 def test_resource_processing_post(self):
251 params={'port': {'network_id': self.port['network_id'],
253 'admin_state_up': True}},
254 headers={'X-Project-Id': 'tenid'})
255 self.assertEqual('port', self.captured_context['resource'])
256 self.assertEqual('ports', self.captured_context['collection'])
257 resources = self.captured_context['resources']
258 self.assertEqual(1, len(resources))
259 self.assertEqual(self.port['network_id'],
260 resources[0]['network_id'])
261 self.assertEqual('the_port', resources[0]['name'])
263 def test_resource_processing_post_bulk(self):
266 params={'ports': [{'network_id': self.port['network_id'],
267 'name': 'the_port_1',
268 'admin_state_up': True},
269 {'network_id': self.port['network_id'],
270 'name': 'the_port_2',
271 'admin_state_up': True}]},
272 headers={'X-Project-Id': 'tenid'})
273 resources = self.captured_context['resources']
274 self.assertEqual(2, len(resources))
275 self.assertEqual(self.port['network_id'],
276 resources[0]['network_id'])
277 self.assertEqual('the_port_1', resources[0]['name'])
278 self.assertEqual(self.port['network_id'],
279 resources[1]['network_id'])
280 self.assertEqual('the_port_2', resources[1]['name'])
282 def test_resource_processing_post_unknown_attribute_returns_400(self):
283 response = self.app.post_json(
285 params={'port': {'network_id': self.port['network_id'],
288 'admin_state_up': True}},
289 headers={'X-Project-Id': 'tenid'},
291 self.assertEqual(400, response.status_int)
293 def test_resource_processing_post_validation_errori_returns_400(self):
294 response = self.app.post_json(
296 params={'port': {'network_id': self.port['network_id'],
298 'admin_state_up': 'invalid_value'}},
299 headers={'X-Project-Id': 'tenid'},
301 self.assertEqual(400, response.status_int)
303 def test_service_plugin_identified(self):
304 # TODO(kevinbenton): fix the unit test setup to include an l3 plugin
305 self.skipTest("A dummy l3 plugin needs to be setup")
306 self.app.get('/v2.0/routers.json')
307 self.assertEqual('router', self.req_stash['resource_type'])
308 # make sure the core plugin was identified as the handler for ports
310 manager.NeutronManager.get_service_plugins()['L3_ROUTER_NAT'],
311 self.req_stash['plugin'])
314 class TestEnforcementHooks(PecanFunctionalTest):
316 def test_network_ownership_check(self):
317 response = self.app.post_json(
319 params={'port': {'network_id': self.port['network_id'],
320 'admin_state_up': True}},
321 headers={'X-Project-Id': 'tenid'})
322 self.assertEqual(201, response.status_int)
324 def test_quota_enforcement(self):
325 # TODO(kevinbenton): this test should do something
329 class TestPolicyEnforcementHook(PecanFunctionalTest):
333 'id': {'allow_post': False, 'allow_put': False,
334 'is_visible': True, 'primary_key': True},
335 'attr': {'allow_post': True, 'allow_put': True,
336 'is_visible': True, 'default': ''},
337 'restricted_attr': {'allow_post': True, 'allow_put': True,
338 'is_visible': True, 'default': ''},
339 'tenant_id': {'allow_post': True, 'allow_put': False,
340 'required_by_policy': True,
341 'validate': {'type:string':
342 attributes.TENANT_ID_MAX_LEN},
348 # Create a controller for a fake resource. This will make the tests
349 # independent from the evolution of the API (so if one changes the API
350 # or the default policies there won't be any risk of breaking these
351 # tests, or at least I hope so)
352 super(TestPolicyEnforcementHook, self).setUp()
353 self.mock_plugin = mock.Mock()
354 attributes.RESOURCE_ATTRIBUTE_MAP.update(self.FAKE_RESOURCE)
355 attributes.PLURALS['mehs'] = 'meh'
356 manager.NeutronManager.set_plugin_for_resource('meh', self.mock_plugin)
357 fake_controller = controllers.CollectionsController('mehs', 'meh')
358 manager.NeutronManager.set_controller_for_resource(
359 'mehs', fake_controller)
360 # Inject policies for the fake resource
362 policy._ENFORCER.set_rules(
363 oslo_policy.Rules.from_dict(
365 'update_meh': 'rule:admin_only',
366 'delete_meh': 'rule:admin_only',
367 'get_meh': 'rule:admin_only or field:mehs:id=xxx',
368 'get_meh:restricted_attr': 'rule:admin_only'}),
371 def test_before_on_create_authorized(self):
372 # Mock a return value for an hypothetical create operation
373 self.mock_plugin.create_meh.return_value = {
376 'restricted_attr': '',
377 'tenant_id': 'tenid'}
378 response = self.app.post_json('/v2.0/mehs.json',
379 params={'meh': {'attr': 'meh'}},
380 headers={'X-Project-Id': 'tenid'})
381 # We expect this operation to succeed
382 self.assertEqual(201, response.status_int)
383 self.assertEqual(0, self.mock_plugin.get_meh.call_count)
384 self.assertEqual(1, self.mock_plugin.create_meh.call_count)
386 def test_before_on_put_not_authorized(self):
387 # The policy hook here should load the resource, and therefore we must
388 # mock a get response
389 self.mock_plugin.get_meh.return_value = {
392 'restricted_attr': '',
393 'tenant_id': 'tenid'}
394 # The policy engine should trigger an exception in 'before', and the
395 # plugin method should not be called at all
396 response = self.app.put_json('/v2.0/mehs/xxx.json',
397 params={'meh': {'attr': 'meh'}},
398 headers={'X-Project-Id': 'tenid'},
400 self.assertEqual(403, response.status_int)
401 self.assertEqual(1, self.mock_plugin.get_meh.call_count)
402 self.assertEqual(0, self.mock_plugin.update_meh.call_count)
404 def test_before_on_delete_not_authorized(self):
405 # The policy hook here should load the resource, and therefore we must
406 # mock a get response
407 self.mock_plugin.delete_meh.return_value = None
408 self.mock_plugin.get_meh.return_value = {
411 'restricted_attr': '',
412 'tenant_id': 'tenid'}
413 # The policy engine should trigger an exception in 'before', and the
414 # plugin method should not be called
415 response = self.app.delete_json('/v2.0/mehs/xxx.json',
416 headers={'X-Project-Id': 'tenid'},
418 self.assertEqual(403, response.status_int)
419 self.assertEqual(1, self.mock_plugin.get_meh.call_count)
420 self.assertEqual(0, self.mock_plugin.delete_meh.call_count)
422 def test_after_on_get_not_authorized(self):
423 # The GET test policy will deny access to anything whose id is not
424 # 'xxx', so the following request should be forbidden
425 self.mock_plugin.get_meh.return_value = {
428 'restricted_attr': '',
429 'tenant_id': 'tenid'}
430 # The policy engine should trigger an exception in 'after', and the
431 # plugin method should be called
432 response = self.app.get('/v2.0/mehs/yyy.json',
433 headers={'X-Project-Id': 'tenid'},
435 self.assertEqual(403, response.status_int)
436 self.assertEqual(1, self.mock_plugin.get_meh.call_count)
438 def test_after_on_get_excludes_admin_attribute(self):
439 self.mock_plugin.get_meh.return_value = {
442 'restricted_attr': '',
443 'tenant_id': 'tenid'}
444 response = self.app.get('/v2.0/mehs/xxx.json',
445 headers={'X-Project-Id': 'tenid'})
446 self.assertEqual(200, response.status_int)
447 json_response = jsonutils.loads(response.body)
448 self.assertNotIn('restricted_attr', json_response['meh'])
450 def test_after_on_list_excludes_admin_attribute(self):
451 self.mock_plugin.get_mehs.return_value = [{
454 'restricted_attr': '',
455 'tenant_id': 'tenid'}]
456 response = self.app.get('/v2.0/mehs',
457 headers={'X-Project-Id': 'tenid'})
458 self.assertEqual(200, response.status_int)
459 json_response = jsonutils.loads(response.body)
460 self.assertNotIn('restricted_attr', json_response['mehs'][0])
463 class TestRootController(PecanFunctionalTest):
464 """Test version listing on root URI."""
467 response = self.app.get('/')
468 self.assertEqual(response.status_int, 200)
469 json_body = jsonutils.loads(response.body)
470 versions = json_body.get('versions')
471 self.assertEqual(1, len(versions))
472 for (attr, value) in controllers.V2Controller.version_info.items():
473 self.assertIn(attr, versions[0])
474 self.assertEqual(value, versions[0][attr])
476 def _test_method_returns_405(self, method):
477 api_method = getattr(self.app, method)
478 response = api_method('/', expect_errors=True)
479 self.assertEqual(response.status_int, 405)
482 self._test_method_returns_405('post')
485 self._test_method_returns_405('put')
487 def test_patch(self):
488 self._test_method_returns_405('patch')
490 def test_delete(self):
491 self._test_method_returns_405('delete')
494 self._test_method_returns_405('head')
497 class TestQuotasController(TestRootController):
498 """Test quota management API controller."""
500 base_url = '/v2.0/quotas'
501 default_expected_limits = {
506 def _verify_limits(self, response, limits):
507 for resource, limit in limits.items():
508 self.assertEqual(limit, response['quota'][resource])
510 def _verify_default_limits(self, response):
511 self._verify_limits(response, self.default_expected_limits)
513 def _verify_after_update(self, response, updated_limits):
514 expected_limits = self.default_expected_limits.copy()
515 expected_limits.update(updated_limits)
516 self._verify_limits(response, expected_limits)
518 def test_index_admin(self):
519 # NOTE(salv-orlando): The quota controller has an hardcoded check for
520 # admin-ness for this operation, which is supposed to return quotas for
521 # all tenants. Such check is "vestigial" from the home-grown WSGI and
523 response = self.app.get('%s.json' % self.base_url,
524 headers={'X-Project-Id': 'admin',
526 self.assertEqual(200, response.status_int)
528 def test_index(self):
529 response = self.app.get('%s.json' % self.base_url, expect_errors=True)
530 self.assertEqual(403, response.status_int)
532 def test_get_admin(self):
533 response = self.app.get('%s/foo.json' % self.base_url,
534 headers={'X-Project-Id': 'admin',
536 self.assertEqual(200, response.status_int)
537 # As quota limits have not been updated, expect default values
538 json_body = jsonutils.loads(response.body)
539 self._verify_default_limits(json_body)
542 # It is not ok to access another tenant's limits
543 url = '%s/foo.json' % self.base_url
544 response = self.app.get(url, expect_errors=True)
545 self.assertEqual(403, response.status_int)
546 # It is however ok to retrieve your own limits
547 response = self.app.get(url, headers={'X-Project-Id': 'foo'})
548 self.assertEqual(200, response.status_int)
549 json_body = jsonutils.loads(response.body)
550 self._verify_default_limits(json_body)
552 def test_put_get_delete(self):
553 # PUT and DELETE actions are in the same test as a meaningful DELETE
554 # test would require a put anyway
555 url = '%s/foo.json' % self.base_url
556 response = self.app.put_json(url,
557 params={'quota': {'network': 99}},
558 headers={'X-Project-Id': 'admin',
560 self.assertEqual(200, response.status_int)
561 json_body = jsonutils.loads(response.body)
562 self._verify_after_update(json_body, {'network': 99})
564 response = self.app.get(url, headers={'X-Project-Id': 'foo'})
565 self.assertEqual(200, response.status_int)
566 json_body = jsonutils.loads(response.body)
567 self._verify_after_update(json_body, {'network': 99})
569 response = self.app.delete(url, headers={'X-Project-Id': 'admin',
571 self.assertEqual(204, response.status_int)
572 # As DELETE does not return a body we need another GET
573 response = self.app.get(url, headers={'X-Project-Id': 'foo'})
574 self.assertEqual(200, response.status_int)
575 json_body = jsonutils.loads(response.body)
576 self._verify_default_limits(json_body)
578 def test_delete(self):