NETWORKS: 'network_id'
}
+# Store plural/singular mappings
PLURALS = {NETWORKS: NETWORK,
PORTS: PORT,
SUBNETS: SUBNET,
'allocation_pools': 'allocation_pool',
'fixed_ips': 'fixed_ip',
'extensions': 'extension'}
+# Store singular/plural mappings. This dictionary is populated by
+# get_resource_info
+REVERSED_PLURALS = {}
+
+
+def get_collection_info(collection):
+ """Helper function to retrieve attribute info.
+
+ :param collection: Collection or plural name of the resource
+ """
+ return RESOURCE_ATTRIBUTE_MAP.get(collection)
+
+
+def get_resource_info(resource):
+ """Helper function to retrive attribute info
+
+ :param resource: resource name
+ """
+ plural_name = REVERSED_PLURALS.get(resource)
+ if not plural_name:
+ for (plural, singular) in PLURALS.items():
+ if singular == resource:
+ plural_name = plural
+ REVERSED_PLURALS[resource] = plural_name
+ return RESOURCE_ATTRIBUTE_MAP.get(plural_name)
def fill_default_value(attr_info, res_dict,
hooks.ExceptionTranslationHook(), # priority 100
hooks.ContextHook(), # priority 95
hooks.MemberActionHook(), # piority 95
- hooks.AttributePopulationHook(), # priority 120
+ hooks.BodyValidationHook(), # priority 120
hooks.OwnershipValidationHook(), # priority 125
hooks.QuotaEnforcementHook(), # priority 130
hooks.PolicyHook(), # priority 135
versions = [builder.build(version) for version in _get_version_info()]
return dict(versions=versions)
+ @when(index, method='HEAD')
@when(index, method='POST')
+ @when(index, method='PATCH')
@when(index, method='PUT')
@when(index, method='DELETE')
def not_supported(self):
builder = versions_view.get_view_builder(pecan.request)
return dict(version=builder.build(self.version_info))
+ @when(index, method='HEAD')
@when(index, method='POST')
+ @when(index, method='PATCH')
@when(index, method='PUT')
@when(index, method='DELETE')
def not_supported(self):
LOG.warn(_LW("No controller found for: %s - returning response "
"code 404"), collection)
pecan.abort(404)
- # Store resource name in pecan request context so that hooks can
- # leverage it if necessary
+ # Store resource and collection names in pecan request context so that
+ # hooks can leverage them if necessary
request.context['resource'] = controller.resource
+ request.context['collection'] = collection
return controller, remainder
@expose()
def _lookup(self, item, *remainder):
+ # Store resource identifier in request context
+ request.context['resource_id'] = item
return ItemController(self.resource, item), remainder
@expose(generic=True)
filters = {k: _listify(v) for k, v in kwargs.items()}
# TODO(kevinbenton): convert these using api_common.get_filters
lister = getattr(self.plugin, 'get_%s' % self.collection)
- neutron_context = request.context.get('neutron_context')
+ neutron_context = request.context['neutron_context']
return {self.collection: lister(neutron_context, filters=filters)}
+ @when(index, method='HEAD')
+ @when(index, method='PATCH')
+ @when(index, method='PUT')
+ @when(index, method='DELETE')
+ def not_supported(self):
+ pecan.abort(405)
+
@when(index, method='POST')
def post(self, *args, **kwargs):
# TODO(kevinbenton): emulated bulk!
+ resources = request.context['resources']
pecan.response.status = 201
- if request.bulk:
+ return self.create(resources)
+
+ def create(self, resources):
+ if len(resources) > 1:
+ # Bulk!
method = 'create_%s_bulk' % self.resource
+ key = self.collection
+ data = {key: [{self.resource: res} for res in resources]}
else:
method = 'create_%s' % self.resource
+ key = self.resource
+ data = {key: resources[0]}
creator = getattr(self.plugin, method)
- key = self.collection if request.bulk else self.resource
- neutron_context = request.context.get('neutron_context')
- return {key: creator(neutron_context, request.prepared_data)}
+ neutron_context = request.context['neutron_context']
+ return {key: creator(neutron_context, data)}
class ItemController(NeutronPecanController):
def get(self, *args, **kwargs):
getter = getattr(self.plugin, 'get_%s' % self.resource)
- neutron_context = request.context.get('neutron_context')
+ neutron_context = request.context['neutron_context']
return {self.resource: getter(neutron_context, self.item)}
+ @when(index, method='HEAD')
+ @when(index, method='POST')
+ @when(index, method='PATCH')
+ def not_supported(self):
+ pecan.abort(405)
+
@when(index, method='PUT')
def put(self, *args, **kwargs):
- neutron_context = request.context.get('neutron_context')
+ neutron_context = request.context['neutron_context']
+ resources = request.context['resources']
if request.member_action:
member_action_method = getattr(self.plugin,
request.member_action)
return member_action_method(neutron_context, self.item,
- request.prepared_data)
+ resources[0])
# TODO(kevinbenton): bulk?
updater = getattr(self.plugin, 'update_%s' % self.resource)
- return updater(neutron_context, self.item, request.prepared_data)
+ # Bulk update is not supported, 'resources' always contains a single
+ # elemenet
+ data = {self.resource: resources[0]}
+ return updater(neutron_context, self.item, data)
@when(index, method='DELETE')
def delete(self):
# TODO(kevinbenton): setting code could be in a decorator
pecan.response.status = 204
- neutron_context = request.context.get('neutron_context')
+ neutron_context = request.context['neutron_context']
deleter = getattr(self.plugin, 'delete_%s' % self.resource)
return deleter(neutron_context, self.item)
# License for the specific language governing permissions and limitations
# under the License.
-from neutron.pecan_wsgi.hooks import attribute_population
+from neutron.pecan_wsgi.hooks import body_validation
from neutron.pecan_wsgi.hooks import context
from neutron.pecan_wsgi.hooks import member_action
from neutron.pecan_wsgi.hooks import notifier
ExceptionTranslationHook = translation.ExceptionTranslationHook
ContextHook = context.ContextHook
MemberActionHook = member_action.MemberActionHook
-AttributePopulationHook = attribute_population.AttributePopulationHook
+BodyValidationHook = body_validation.BodyValidationHook
OwnershipValidationHook = ownership_validation.OwnershipValidationHook
PolicyHook = policy_enforcement.PolicyHook
QuotaEnforcementHook = quota_enforcement.QuotaEnforcementHook
+++ /dev/null
-# Copyright (c) 2015 Mirantis, Inc.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from pecan import hooks
-
-from neutron.api.v2 import attributes
-from neutron.api.v2 import base as v2base
-from neutron import manager
-
-
-class AttributePopulationHook(hooks.PecanHook):
-
- priority = 120
-
- def before(self, state):
- state.request.prepared_data = {}
- state.request.resources = []
- if state.request.method not in ('POST', 'PUT'):
- return
- is_create = state.request.method == 'POST'
- resource = state.request.context.get('resource')
- neutron_context = state.request.context['neutron_context']
- if not resource:
- return
- if state.request.member_action:
- # Neutron currently does not describe request bodies for member
- # actions in meh. prepare_request_body should not be called for
- # member actions, and the body should be passed as it is. The
- # plugin will do the validation (yuck).
- state.request.prepared_data = state.request.json
- else:
- state.request.prepared_data = (
- v2base.Controller.prepare_request_body(
- neutron_context, state.request.json, is_create,
- resource, _attributes_for_resource(resource),
- allow_bulk=True))
- # TODO(kevinbenton): conditional allow_bulk
-
- state.request.resources = _extract_resources_from_state(state)
- # make the original object available:
- if not is_create and not state.request.member_action:
- obj_id = _pull_id_from_request(state.request, resource)
- attrs = _attributes_for_resource(resource)
- field_list = [name for (name, value) in attrs.items()
- if (value.get('required_by_policy') or
- value.get('primary_key') or
- 'default' not in value)]
- plugin = manager.NeutronManager.get_plugin_for_resource(resource)
- getter = getattr(plugin, 'get_%s' % resource)
- # TODO(kevinbenton): the parent_id logic currently in base.py
- obj = getter(neutron_context, obj_id, fields=field_list)
- state.request.original_object = obj
-
-
-def _attributes_for_resource(resource):
- if resource not in attributes.PLURALS.values():
- return {}
- return attributes.RESOURCE_ATTRIBUTE_MAP.get(
- _plural(resource), {})
-
-
-def _pull_id_from_request(request, resource):
- # NOTE(kevinbenton): this sucks
- # Converting /v2.0/ports/dbbdae29-82f6-49cf-b05e-3365bcc95b7a.json
- # into dbbdae29-82f6-49cf-b05e-3365bcc95b7a
- resources = _plural(resource)
- jsontrail = request.path_info.replace('/v2.0/%s/' % resources, '')
- obj_id = jsontrail.replace('.json', '')
- return obj_id
-
-
-def _plural(rtype):
- for plural, single in attributes.PLURALS.items():
- if rtype == single:
- return plural
-
-
-def _extract_resources_from_state(state):
- resource = state.request.context['resource']
- if not resource:
- return []
- data = state.request.prepared_data
- # single item
- if resource in data:
- state.request.bulk = False
- return [data[resource]]
- # multiple items
- if _plural(resource) in data:
- state.request.bulk = True
- return data[_plural(resource)]
-
- return []
--- /dev/null
+# Copyright (c) 2015 Mirantis, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from pecan import hooks
+
+from neutron.api.v2 import attributes as v2_attributes
+from neutron.api.v2 import base as v2_base
+
+
+class BodyValidationHook(hooks.PecanHook):
+
+ priority = 120
+
+ def before(self, state):
+ if state.request.method not in ('POST', 'PUT'):
+ return
+ resource = state.request.context.get('resource')
+ collection = state.request.context.get('collection')
+ neutron_context = state.request.context['neutron_context']
+ is_create = state.request.method == 'POST'
+ if not resource:
+ return
+ # Prepare data to be passed to the plugin from request body
+ data = v2_base.Controller.prepare_request_body(
+ neutron_context,
+ state.request.json,
+ is_create,
+ resource,
+ v2_attributes.get_collection_info(collection),
+ allow_bulk=is_create)
+ if collection in data:
+ state.request.context['resources'] = [item[resource] for item in
+ data[collection]]
+ else:
+ state.request.context['resources'] = [data[resource]]
def before(self, state):
if state.request.method != 'POST':
return
- items = state.request.resources
- for item in items:
+ for item in state.request.context.get('resources', []):
self._validate_network_tenant_ownership(state, item)
def _validate_network_tenant_ownership(self, state, resource_item):
from oslo_policy import policy as oslo_policy
from oslo_utils import excutils
-import pecan
from pecan import hooks
import webob
from neutron._i18n import _
+from neutron.api.v2 import attributes as v2_attributes
from neutron.common import constants as const
from neutron import manager
-from neutron.pecan_wsgi.hooks import attribute_population
from neutron import policy
ACTION_MAP = {'POST': 'create', 'PUT': 'update', 'GET': 'get',
'DELETE': 'delete'}
+ def _fetch_resource(self, neutron_context, resource, resource_id):
+ attrs = v2_attributes.get_resource_info(resource)
+ field_list = [name for (name, value) in attrs.items()
+ if (value.get('required_by_policy') or
+ value.get('primary_key') or 'default' not in value)]
+ plugin = manager.NeutronManager.get_plugin_for_resource(resource)
+ getter = getattr(plugin, 'get_%s' % resource)
+ # TODO(kevinbenton): the parent_id logic currently in base.py
+ return getter(neutron_context, resource_id, fields=field_list)
+
def before(self, state):
- if state.request.method not in self.ACTION_MAP:
- pecan.abort(405)
+ # This hook should be run only for PUT,POST and DELETE methods and for
+ # requests targeting a neutron resource
+ # FIXME(salv-orlando): DELETE support. It does not work with the
+ # current logic. See LP Bug #1520180
+ items = state.request.context.get('resources')
+ if state.request.method not in ('POST', 'PUT') or not items:
+ return
neutron_context = state.request.context.get('neutron_context')
resource = state.request.context.get('resource')
+ collection = state.request.context.get('collection')
is_update = (state.request.method == 'PUT')
- items = state.request.resources
policy.init()
action = '%s_%s' % (self.ACTION_MAP[state.request.method], resource)
+
+ # NOTE(salv-orlando): As bulk updates are not supported, in case of PUT
+ # requests there will be only a single item to process, and its
+ # identifier would have been already retrieved by the lookup process
for item in items:
if is_update:
- obj = copy.copy(state.request.original_object)
+ resource_id = state.request.context.get('resource_id')
+ obj = copy.copy(self._fetch_resource(neutron_context,
+ resource,
+ resource_id))
obj.update(item)
obj[const.ATTRIBUTES_TO_UPDATE] = item.keys()
item = obj
try:
policy.enforce(
neutron_context, action, item,
- pluralized=attribute_population._plural(resource))
+ pluralized=collection)
except oslo_policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception() as ctxt:
# If a tenant is modifying it's own object, it's safe to
def after(self, state):
neutron_context = state.request.context.get('neutron_context')
resource = state.request.context.get('resource')
+ collection = state.request.context.get('collection')
if not resource:
# can't filter a resource we don't recognize
return
return
action = '%s_%s' % (self.ACTION_MAP[state.request.method],
resource)
- plural = attribute_population._plural(resource)
- if not data or (resource not in data and plural not in data):
+ if not data or (resource not in data and collection not in data):
return
is_single = resource in data
- key = resource if is_single else plural
- to_process = [data[resource]] if is_single else data[plural]
+ key = resource if is_single else collection
+ to_process = [data[resource]] if is_single else data[collection]
# in the single case, we enforce which raises on violation
# in the plural case, we just check so violating items are hidden
policy_method = policy.enforce if is_single else policy.check
plugin = manager.NeutronManager.get_plugin_for_resource(resource)
- resp = [self._get_filtered_item(state.request, resource, item)
+ resp = [self._get_filtered_item(state.request, resource,
+ collection, item)
for item in to_process
if (state.request.method != 'GET' or
policy_method(neutron_context, action, item,
plugin=plugin,
- pluralized=plural))]
+ pluralized=collection))]
if is_single:
resp = resp[0]
data[key] = resp
state.response.json = data
- def _get_filtered_item(self, request, resource, data):
+ def _get_filtered_item(self, request, resource, collection, data):
neutron_context = request.context.get('neutron_context')
to_exclude = self._exclude_attributes_by_policy(
- neutron_context, resource, data)
+ neutron_context, resource, collection, data)
return self._filter_attributes(request, data, to_exclude)
def _filter_attributes(self, request, data, fields_to_strip):
if (item[0] not in fields_to_strip and
(not user_fields or item[0] in user_fields)))
- def _exclude_attributes_by_policy(self, context, resource, data):
+ def _exclude_attributes_by_policy(self, context, resource,
+ collection, data):
"""Identifies attributes to exclude according to authZ policies.
Return a list of attribute names which should be stripped from the
"""
attributes_to_exclude = []
for attr_name in data.keys():
- attr_data = attribute_population._attributes_for_resource(
+ attr_data = v2_attributes.get_resource_info(
resource).get(attr_name)
if attr_data and attr_data['is_visible']:
if policy.check(
'get_%s:%s' % (resource, attr_name),
data,
might_not_exist=True,
- pluralized=attribute_population._plural(resource)):
+ pluralized=collection):
# this attribute is visible, check next one
continue
# if the code reaches this point then either the policy check
def before(self, state):
# TODO(salv-orlando): This hook must go when adapting the pecan code to
# use reservations.
- if state.request.method != 'POST':
- return
resource = state.request.context.get('resource')
+ if state.request.method != 'POST' or not resource:
+ return
plugin = manager.NeutronManager.get_plugin_for_resource(resource)
- items = state.request.resources
+ items = state.request.context.get('resources')
deltas = {}
for item in items:
tenant_id = item['tenant_id']
self.assertEqual(response.status_int, 500)
-class TestRequestPopulatingHooks(PecanFunctionalTest):
+class TestRequestProcessing(PecanFunctionalTest):
def setUp(self):
- super(TestRequestPopulatingHooks, self).setUp()
+ super(TestRequestProcessing, self).setUp()
# request.context is thread-local storage so it has to be accessed by
# the controller. We can capture it into a list here to assert on after
# the request finishes.
def capture_request_details(*args, **kwargs):
- self.req_stash = {
- 'context': request.context['neutron_context'],
- 'resource_type': request.context['resource'],
- }
- mock.patch(
- 'neutron.pecan_wsgi.controllers.root.CollectionsController.get',
- side_effect=capture_request_details
- ).start()
-
+ self.captured_context = request.context
+
+ mock.patch('neutron.pecan_wsgi.controllers.root.'
+ 'CollectionsController.get',
+ side_effect=capture_request_details).start()
+ mock.patch('neutron.pecan_wsgi.controllers.root.'
+ 'CollectionsController.create',
+ side_effect=capture_request_details).start()
+ mock.patch('neutron.pecan_wsgi.controllers.root.ItemController.get',
+ side_effect=capture_request_details).start()
# TODO(kevinbenton): add context tests for X-Roles etc
def test_context_set_in_request(self):
self.app.get('/v2.0/ports.json',
headers={'X-Project-Id': 'tenant_id'})
- self.assertEqual('tenant_id', self.req_stash['context'].tenant_id)
+ self.assertEqual('tenant_id',
+ self.captured_context['neutron_context'].tenant_id)
def test_core_resource_identified(self):
self.app.get('/v2.0/ports.json')
- self.assertEqual('port', self.req_stash['resource_type'])
+ self.assertEqual('port', self.captured_context['resource'])
+ self.assertEqual('ports', self.captured_context['collection'])
+
+ def test_lookup_identifies_resource_id(self):
+ # We now this will return a 404 but that's not the point as it is
+ # mocked
+ self.app.get('/v2.0/ports/reina.json')
+ self.assertEqual('port', self.captured_context['resource'])
+ self.assertEqual('ports', self.captured_context['collection'])
+ self.assertEqual('reina', self.captured_context['resource_id'])
+
+ def test_resource_processing_post(self):
+ self.app.post_json(
+ '/v2.0/ports.json',
+ params={'port': {'network_id': self.port['network_id'],
+ 'name': 'the_port',
+ 'admin_state_up': True}},
+ headers={'X-Project-Id': 'tenid'})
+ self.assertEqual('port', self.captured_context['resource'])
+ self.assertEqual('ports', self.captured_context['collection'])
+ resources = self.captured_context['resources']
+ self.assertEqual(1, len(resources))
+ self.assertEqual(self.port['network_id'],
+ resources[0]['network_id'])
+ self.assertEqual('the_port', resources[0]['name'])
+
+ def test_resource_processing_post_bulk(self):
+ self.app.post_json(
+ '/v2.0/ports.json',
+ params={'ports': [{'network_id': self.port['network_id'],
+ 'name': 'the_port_1',
+ 'admin_state_up': True},
+ {'network_id': self.port['network_id'],
+ 'name': 'the_port_2',
+ 'admin_state_up': True}]},
+ headers={'X-Project-Id': 'tenid'})
+ resources = self.captured_context['resources']
+ self.assertEqual(2, len(resources))
+ self.assertEqual(self.port['network_id'],
+ resources[0]['network_id'])
+ self.assertEqual('the_port_1', resources[0]['name'])
+ self.assertEqual(self.port['network_id'],
+ resources[1]['network_id'])
+ self.assertEqual('the_port_2', resources[1]['name'])
+
+ def test_resource_processing_post_unknown_attribute_returns_400(self):
+ response = self.app.post_json(
+ '/v2.0/ports.json',
+ params={'port': {'network_id': self.port['network_id'],
+ 'name': 'the_port',
+ 'alien': 'E.T.',
+ 'admin_state_up': True}},
+ headers={'X-Project-Id': 'tenid'},
+ expect_errors=True)
+ self.assertEqual(400, response.status_int)
+
+ def test_resource_processing_post_validation_errori_returns_400(self):
+ response = self.app.post_json(
+ '/v2.0/ports.json',
+ params={'port': {'network_id': self.port['network_id'],
+ 'name': 'the_port',
+ 'admin_state_up': 'invalid_value'}},
+ headers={'X-Project-Id': 'tenid'},
+ expect_errors=True)
+ self.assertEqual(400, response.status_int)
def test_service_plugin_identified(self):
# TODO(kevinbenton): fix the unit test setup to include an l3 plugin
class TestEnforcementHooks(PecanFunctionalTest):
def test_network_ownership_check(self):
- # TODO(kevinbenton): get a scenario that passes attribute population
- self.skipTest("Attribute population blocks this test as-is")
- response = self.app.post_json('/v2.0/ports.json',
+ response = self.app.post_json(
+ '/v2.0/ports.json',
params={'port': {'network_id': self.port['network_id'],
- 'admin_state_up': True,
- 'tenant_id': 'tenid2'}},
+ 'admin_state_up': True}},
headers={'X-Project-Id': 'tenid'})
- self.assertEqual(response.status_int, 200)
+ self.assertEqual(201, response.status_int)
def test_quota_enforcement(self):
# TODO(kevinbenton): this test should do something
self.assertRaises(webob.exc.HTTPBadRequest,
attributes.populate_tenant_id,
ctx, res_dict3, attr_info, True)
+
+
+class TestHelpers(base.DietTestCase):
+
+ def _verify_port_attributes(self, attrs):
+ for test_attribute in ('id', 'name', 'mac_address', 'network_id',
+ 'tenant_id', 'fixed_ips', 'status'):
+ self.assertIn(test_attribute, attrs)
+
+ def test_get_collection_info(self):
+ attrs = attributes.get_collection_info('ports')
+ self._verify_port_attributes(attrs)
+
+ def test_get_collection_info_missing(self):
+ self.assertFalse(attributes.get_collection_info('meh'))
+
+ def test_get_resource_info(self):
+ attributes.REVERSED_PLURALS.pop('port', None)
+ attrs = attributes.get_resource_info('port')
+ self._verify_port_attributes(attrs)
+ # verify side effect
+ self.assertIn('port', attributes.REVERSED_PLURALS)
+
+ def test_get_resource_info_missing(self):
+ self.assertFalse(attributes.get_resource_info('meh'))
+
+ def test_get_resource_info_cached(self):
+ with mock.patch('neutron.api.v2.attributes.PLURALS') as mock_plurals:
+ attributes.REVERSED_PLURALS['port'] = 'ports'
+ attrs = attributes.get_resource_info('port')
+ self._verify_port_attributes(attrs)
+ self.assertEqual(0, mock_plurals.items.call_count)