class Controller(object):
+ LIST = 'list'
+ SHOW = 'show'
+ CREATE = 'create'
+ UPDATE = 'update'
+ DELETE = 'delete'
def __init__(self, plugin, collection, resource, attr_info,
- allow_bulk=False, member_actions=None):
+ allow_bulk=False, member_actions=None, parent=None):
if member_actions is None:
member_actions = []
self._plugin = plugin
self._publisher_id = notifier_api.publisher_id('network')
self._member_actions = member_actions
+ if parent:
+ self._parent_id_name = '%s_id' % parent['member_name']
+ parent_part = '_%s' % parent['member_name']
+ else:
+ self._parent_id_name = None
+ parent_part = ''
+ self._plugin_handlers = {
+ self.LIST: 'get%s_%s' % (parent_part, self._collection),
+ self.SHOW: 'get%s_%s' % (parent_part, self._resource)
+ }
+ for action in [self.CREATE, self.UPDATE, self.DELETE]:
+ self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
+ self._resource)
+
def _is_native_bulk_supported(self):
native_bulk_attr_name = ("_%s__native_bulk_support"
% self._plugin.__class__.__name__)
else:
raise AttributeError
- def _items(self, request, do_authz=False):
+ def _items(self, request, do_authz=False, parent_id=None):
"""Retrieves and formats a list of elements of the requested entity"""
# NOTE(salvatore-orlando): The following ensures that fields which
# are needed for authZ policy validation are not stripped away by the
original_fields, fields_to_add = self._do_field_list(_fields(request))
kwargs = {'filters': _filters(request, self._attr_info),
'fields': original_fields}
- obj_getter = getattr(self._plugin, "get_%s" % self._collection)
+ if parent_id:
+ kwargs[self._parent_id_name] = parent_id
+ obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
obj_list = obj_getter(request.context, **kwargs)
# Check authz
if do_authz:
# Omit items from list that should not be visible
obj_list = [obj for obj in obj_list
if policy.check(request.context,
- "get_%s" % self._resource,
+ self._plugin_handlers[self.SHOW],
obj,
plugin=self._plugin)]
return {self._collection: [self._view(obj,
fields_to_strip=fields_to_add)
for obj in obj_list]}
- def _item(self, request, id, do_authz=False, field_list=None):
+ def _item(self, request, id, do_authz=False, field_list=None,
+ parent_id=None):
"""Retrieves and formats a single element of the requested entity"""
kwargs = {'fields': field_list}
- action = "get_%s" % self._resource
+ action = self._plugin_handlers[self.SHOW]
+ if parent_id:
+ kwargs[self._parent_id_name] = parent_id
obj_getter = getattr(self._plugin, action)
obj = obj_getter(request.context, id, **kwargs)
# Check authz
policy.enforce(request.context, action, obj, plugin=self._plugin)
return obj
- def index(self, request):
+ def index(self, request, **kwargs):
"""Returns a list of the requested entity"""
- return self._items(request, True)
+ parent_id = kwargs.get(self._parent_id_name)
+ return self._items(request, True, parent_id)
- def show(self, request, id):
+ def show(self, request, id, **kwargs):
"""Returns detailed information about the requested entity"""
try:
# NOTE(salvatore-orlando): The following ensures that fields
# which are needed for authZ policy validation are not stripped
# away by the plugin before returning.
field_list, added_fields = self._do_field_list(_fields(request))
+ parent_id = kwargs.get(self._parent_id_name)
return {self._resource:
self._view(self._item(request,
id,
do_authz=True,
- field_list=field_list),
+ field_list=field_list,
+ parent_id=parent_id),
fields_to_strip=added_fields)}
except exceptions.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
raise webob.exc.HTTPNotFound()
- def _emulate_bulk_create(self, obj_creator, request, body):
+ def _emulate_bulk_create(self, obj_creator, request, body, parent_id=None):
objs = []
try:
for item in body[self._collection]:
kwargs = {self._resource: item}
+ if parent_id:
+ kwargs[self._parent_id_name] = parent_id
objs.append(self._view(obj_creator(request.context,
**kwargs)))
return objs
# could raise any kind of exception
except Exception as ex:
for obj in objs:
- delete_action = "delete_%s" % self._resource
- obj_deleter = getattr(self._plugin, delete_action)
+ obj_deleter = getattr(self._plugin,
+ self._plugin_handlers[self.DELETE])
try:
- obj_deleter(request.context, obj['id'])
+ kwargs = ({self._parent_id_name: parent_id} if parent_id
+ else {})
+ obj_deleter(request.context, obj['id'], **kwargs)
except Exception:
# broad catch as our only purpose is to log the exception
LOG.exception(_("Unable to undo add for "
# it is then deleted
raise
- def create(self, request, body=None):
+ def create(self, request, body=None, **kwargs):
"""Creates a new instance of the requested entity"""
+ parent_id = kwargs.get(self._parent_id_name)
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.create.start',
body = Controller.prepare_request_body(request.context, body, True,
self._resource, self._attr_info,
allow_bulk=self._allow_bulk)
- action = "create_%s" % self._resource
+ action = self._plugin_handlers[self.CREATE]
# Check authz
try:
if self._collection in body:
create_result)
return create_result
+ kwargs = {self._parent_id_name: parent_id} if parent_id else {}
if self._collection in body and self._native_bulk:
# plugin does atomic bulk create operations
obj_creator = getattr(self._plugin, "%s_bulk" % action)
- objs = obj_creator(request.context, body)
+ objs = obj_creator(request.context, body, **kwargs)
return notify({self._collection: [self._view(obj)
for obj in objs]})
else:
obj_creator = getattr(self._plugin, action)
if self._collection in body:
# Emulate atomic bulk behavior
- objs = self._emulate_bulk_create(obj_creator, request, body)
+ objs = self._emulate_bulk_create(obj_creator, request,
+ body, parent_id)
return notify({self._collection: objs})
else:
- kwargs = {self._resource: body}
+ kwargs.update({self._resource: body})
obj = obj_creator(request.context, **kwargs)
return notify({self._resource: self._view(obj)})
- def delete(self, request, id):
+ def delete(self, request, id, **kwargs):
"""Deletes the specified entity"""
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.delete.start',
notifier_api.INFO,
{self._resource + '_id': id})
- action = "delete_%s" % self._resource
+ action = self._plugin_handlers[self.DELETE]
# Check authz
- obj = self._item(request, id)
+ parent_id = kwargs.get(self._parent_id_name)
+ obj = self._item(request, id, parent_id=parent_id)
try:
policy.enforce(request.context,
action,
raise webob.exc.HTTPNotFound()
obj_deleter = getattr(self._plugin, action)
- obj_deleter(request.context, id)
+ obj_deleter(request.context, id, **kwargs)
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.delete.end',
notifier_api.INFO,
{self._resource + '_id': id})
- def update(self, request, id, body=None):
+ def update(self, request, id, body=None, **kwargs):
"""Updates the specified entity's attributes"""
+ parent_id = kwargs.get(self._parent_id_name)
payload = body.copy()
payload['id'] = id
notifier_api.notify(request.context,
body = Controller.prepare_request_body(request.context, body, False,
self._resource, self._attr_info,
allow_bulk=self._allow_bulk)
- action = "update_%s" % self._resource
+ action = self._plugin_handlers[self.UPDATE]
# Load object to check authz
# but pass only attributes in the original body and required
# by the policy engine to the policy 'brain'
if ('required_by_policy' in value and
value['required_by_policy'] or
not 'default' in value)]
- orig_obj = self._item(request, id, field_list=field_list)
+ orig_obj = self._item(request, id, field_list=field_list,
+ parent_id=parent_id)
orig_obj.update(body[self._resource])
try:
policy.enforce(request.context,
obj_updater = getattr(self._plugin, action)
kwargs = {self._resource: body}
+ if parent_id:
+ kwargs[self._parent_id_name] = parent_id
obj = obj_updater(request.context, id, **kwargs)
result = {self._resource: self._view(obj)}
notifier_api.notify(request.context,
def create_resource(collection, resource, plugin, params, allow_bulk=False,
- member_actions=None):
+ member_actions=None, parent=None):
controller = Controller(plugin, collection, resource, params, allow_bulk,
- member_actions=member_actions)
+ member_actions=member_actions, parent=parent)
# NOTE(jkoelker) To anyone wishing to add "proper" xml support
# this is where you do it
LOG = logging.getLogger(__name__)
+
+RESOURCES = {'network': 'networks',
+ 'subnet': 'subnets',
+ 'port': 'ports'}
+SUB_RESOURCES = {}
COLLECTION_ACTIONS = ['index', 'create']
MEMBER_ACTIONS = ['show', 'update', 'delete']
REQUIREMENTS = {'id': attributes.UUID_PATTERN, 'format': 'xml|json'}
col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
member_actions=MEMBER_ACTIONS)
- resources = {'network': 'networks',
- 'subnet': 'subnets',
- 'port': 'ports'}
-
- def _map_resource(collection, resource, params):
+ def _map_resource(collection, resource, params, parent=None):
allow_bulk = cfg.CONF.allow_bulk
controller = base.create_resource(collection, resource,
plugin, params,
- allow_bulk=allow_bulk)
+ allow_bulk=allow_bulk,
+ parent=parent)
+ path_prefix = None
+ if parent:
+ path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
+ parent['member_name'],
+ collection)
mapper_kwargs = dict(controller=controller,
requirements=REQUIREMENTS,
+ path_prefix=path_prefix,
**col_kwargs)
return mapper.collection(collection, resource,
**mapper_kwargs)
- mapper.connect('index', '/', controller=Index(resources))
- for resource in resources:
- _map_resource(resources[resource], resource,
+ mapper.connect('index', '/', controller=Index(RESOURCES))
+ for resource in RESOURCES:
+ _map_resource(RESOURCES[resource], resource,
+ attributes.RESOURCE_ATTRIBUTE_MAP.get(
+ RESOURCES[resource], dict()))
+
+ for resource in SUB_RESOURCES:
+ _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
attributes.RESOURCE_ATTRIBUTE_MAP.get(
- resources[resource], dict()))
+ SUB_RESOURCES[resource]['collection_name'],
+ dict()),
+ SUB_RESOURCES[resource]['parent'])
super(APIRouter, self).__init__(mapper)
self.assertEqual(res.status_int, 400)
+class SubresourceTest(unittest.TestCase):
+ def setUp(self):
+ plugin = 'quantum.tests.unit.test_api_v2.TestSubresourcePlugin'
+ QuantumManager._instance = None
+ PluginAwareExtensionManager._instance = None
+ args = ['--config-file', etcdir('quantum.conf.test')]
+ config.parse(args=args)
+ cfg.CONF.set_override('core_plugin', plugin)
+
+ self._plugin_patcher = mock.patch(plugin, autospec=True)
+ self.plugin = self._plugin_patcher.start()
+
+ router.SUB_RESOURCES['dummy'] = {
+ 'collection_name': 'dummies',
+ 'parent': {'collection_name': 'networks',
+ 'member_name': 'network'}
+ }
+
+ api = router.APIRouter()
+ self.api = webtest.TestApp(api)
+
+ def tearDown(self):
+ self._plugin_patcher.stop()
+ self.api = None
+ self.plugin = None
+ cfg.CONF.reset()
+
+ def test_index_sub_resource(self):
+ instance = self.plugin.return_value
+
+ self.api.get('/networks/id1/dummies')
+ instance.get_network_dummies.assert_called_once_with(mock.ANY,
+ filters=mock.ANY,
+ fields=mock.ANY,
+ network_id='id1')
+
+ def test_show_sub_resource(self):
+ instance = self.plugin.return_value
+
+ dummy_id = _uuid()
+ self.api.get('/networks/id1' + _get_path('dummies', id=dummy_id))
+ instance.get_network_dummy.assert_called_once_with(mock.ANY,
+ dummy_id,
+ network_id='id1',
+ fields=mock.ANY)
+
+ def test_create_sub_resource(self):
+ instance = self.plugin.return_value
+
+ body = {'dummy': {'foo': 'bar', 'tenant_id': _uuid()}}
+ self.api.post_json('/networks/id1/dummies', body)
+ instance.create_network_dummy.assert_called_once_with(mock.ANY,
+ network_id='id1',
+ dummy=body)
+
+ def test_update_sub_resource(self):
+ instance = self.plugin.return_value
+
+ dummy_id = _uuid()
+ body = {'dummy': {'foo': 'bar', 'tenant_id': _uuid()}}
+ self.api.put_json('/networks/id1' + _get_path('dummies', id=dummy_id),
+ body)
+ instance.update_network_dummy.assert_called_once_with(mock.ANY,
+ dummy_id,
+ network_id='id1',
+ dummy=body)
+
+ def test_delete_sub_resource(self):
+ instance = self.plugin.return_value
+
+ dummy_id = _uuid()
+ self.api.delete('/networks/id1' + _get_path('dummies', id=dummy_id))
+ instance.delete_network_dummy.assert_called_once_with(mock.ANY,
+ dummy_id,
+ network_id='id1')
+
+
class V2Views(unittest.TestCase):
def _view(self, keys, collection, resource):
data = dict((key, 'value') for key in keys)
self.assertEqual(net['status'], "ACTIVE")
self.assertEqual(net['v2attrs:something'], "123")
self.assertFalse('v2attrs:something_else' in net)
+
+
+class TestSubresourcePlugin():
+ def get_network_dummies(self, context, network_id,
+ filters=None, fields=None):
+ return []
+
+ def get_network_dummy(self, context, id, network_id,
+ fields=None):
+ return {}
+
+ def create_network_dummy(self, context, network_id, dummy):
+ return {}
+
+ def update_network_dummy(self, context, id, network_id, dummy):
+ return {}
+
+ def delete_network_dummy(self, context, id, network_id):
+ return