"create_port:port_security_enabled": "rule:admin_or_network_owner",
"create_port:binding:host_id": "rule:admin_only",
"create_port:binding:profile": "rule:admin_only",
- "create_port:binding:vnic_type": "rule:admin_or_owner",
"create_port:mac_learning_enabled": "rule:admin_or_network_owner",
"get_port": "rule:admin_or_owner",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_details": "rule:admin_only",
"get_port:binding:host_id": "rule:admin_only",
"get_port:binding:profile": "rule:admin_only",
- "get_port:binding:vnic_type": "rule:admin_or_owner",
"update_port": "rule:admin_or_owner",
"update_port:fixed_ips": "rule:admin_or_network_owner",
"update_port:port_security_enabled": "rule:admin_or_network_owner",
"update_port:binding:host_id": "rule:admin_only",
"update_port:binding:profile": "rule:admin_only",
- "update_port:binding:vnic_type": "rule:admin_or_owner",
"update_port:mac_learning_enabled": "rule:admin_or_network_owner",
"delete_port": "rule:admin_or_owner",
"create_firewall_rule": "",
"get_firewall_rule": "rule:admin_or_owner or rule:shared_firewalls",
- "create_firewall_rule:shared": "rule:admin_or_owner",
- "get_firewall_rule:shared": "rule:admin_or_owner",
"update_firewall_rule": "rule:admin_or_owner",
"delete_firewall_rule": "rule:admin_or_owner",
% self._plugin.__class__.__name__)
return getattr(self._plugin, native_sorting_attr_name, False)
- def _is_visible(self, context, attr_name, data):
- action = "%s:%s" % (self._plugin_handlers[self.SHOW], attr_name)
- # Optimistically init authz_check to True
- authz_check = True
- try:
- attr = (attributes.RESOURCE_ATTRIBUTE_MAP
- [self._collection].get(attr_name))
- if attr and attr.get('enforce_policy'):
- authz_check = policy.check_if_exists(
- context, action, data)
- except KeyError:
- # The extension was not configured for adding its resources
- # to the global resource attribute map. Policy check should
- # not be performed
- LOG.debug(_("The resource %(resource)s was not found in the "
- "RESOURCE_ATTRIBUTE_MAP; unable to perform authZ "
- "check for attribute %(attr)s"),
- {'resource': self._collection,
- 'attr': attr_name})
- except exceptions.PolicyRuleNotFound:
- # Just ignore the exception. Do not even log it, as this will add
- # a lot of unnecessary info in the log (and take time as well to
- # write info to the logger)
- pass
- attr_val = self._attr_info.get(attr_name)
- return attr_val and attr_val['is_visible'] and authz_check
+ def _exclude_attributes_by_policy(self, context, data):
+ """Identifies attributes to exclude according to authZ policies.
+
+ Return a list of attribute names which should be stripped from the
+ response returned to the user because the user is not authorized
+ to see them.
+ """
+ attributes_to_exclude = []
+ for attr_name in data.keys():
+ attr_data = self._attr_info.get(attr_name)
+ if attr_data and attr_data['is_visible']:
+ if policy.check(
+ context,
+ '%s:%s' % (self._plugin_handlers[self.SHOW], attr_name),
+ None,
+ might_not_exist=True):
+ # this attribute is visible, check next one
+ continue
+ # if the code reaches this point then either the policy check
+ # failed or the attribute was not visible in the first place
+ attributes_to_exclude.append(attr_name)
+ return attributes_to_exclude
def _view(self, context, data, fields_to_strip=None):
- # make sure fields_to_strip is iterable
- if not fields_to_strip:
- fields_to_strip = []
+ """Build a view of an API resource.
+ :param context: the neutron context
+ :param data: the object for which a view is being created
+ :param fields_to_strip: attributes to remove from the view
+
+ :returns: a view of the object which includes only attributes
+ visible according to API resource declaration and authZ policies.
+ """
+ fields_to_strip = ((fields_to_strip or []) +
+ self._exclude_attributes_by_policy(context, data))
+ return self._filter_attributes(context, data, fields_to_strip)
+
+ def _filter_attributes(self, context, data, fields_to_strip=None):
+ if not fields_to_strip:
+ return data
return dict(item for item in data.iteritems()
- if (self._is_visible(context, item[0], data) and
- item[0] not in fields_to_strip))
+ if (item[0] not in fields_to_strip))
def _do_field_list(self, original_fields):
fields_to_add = None
self._plugin_handlers[self.SHOW],
obj,
plugin=self._plugin)]
+ # Use the first element in the list for discriminating which attributes
+ # should be filtered out because of authZ policies
+ # fields_to_add contains a list of attributes added for request policy
+ # checks but that were not required by the user. They should be
+ # therefore stripped
+ fields_to_strip = fields_to_add or []
+ if obj_list:
+ fields_to_strip += self._exclude_attributes_by_policy(
+ request.context, obj_list[0])
collection = {self._collection:
- [self._view(request.context, obj,
- fields_to_strip=fields_to_add)
+ [self._filter_attributes(
+ request.context, obj,
+ fields_to_strip=fields_to_strip)
for obj in obj_list]}
pagination_links = pagination_helper.get_links(obj_list)
if pagination_links:
kwargs = {self._resource: item}
if parent_id:
kwargs[self._parent_id_name] = parent_id
- objs.append(self._view(request.context,
- obj_creator(request.context,
- **kwargs)))
+ fields_to_strip = self._exclude_attributes_by_policy(
+ request.context, item)
+ objs.append(self._filter_attributes(
+ request.context,
+ obj_creator(request.context, **kwargs),
+ fields_to_strip=fields_to_strip))
return objs
# Note(salvatore-orlando): broad catch as in theory a plugin
# could raise any kind of exception
# plugin does atomic bulk create operations
obj_creator = getattr(self._plugin, "%s_bulk" % action)
objs = obj_creator(request.context, body, **kwargs)
- return notify({self._collection: [self._view(request.context, obj)
- for obj in objs]})
+ # Use first element of list to discriminate attributes which
+ # should be removed because of authZ policies
+ fields_to_strip = self._exclude_attributes_by_policy(
+ request.context, objs[0])
+ return notify({self._collection: [self._filter_attributes(
+ request.context, obj, fields_to_strip=fields_to_strip)
+ for obj in objs]})
else:
obj_creator = getattr(self._plugin, action)
if self._collection in body:
self._nova_notifier.send_network_change(
action, {}, {self._resource: obj})
- return notify({self._resource: self._view(request.context,
- obj)})
+ return notify({self._resource: self._view(
+ request.context, obj)})
def delete(self, request, id, **kwargs):
"""Deletes the specified entity."""
return match_rule, target, credentials
-def check(context, action, target, plugin=None):
+def check(context, action, target, plugin=None, might_not_exist=False):
"""Verifies that the action is valid on the target in this context.
:param context: neutron context
location of the object e.g. ``{'project_id': context.project_id}``
:param plugin: currently unused and deprecated.
Kept for backward compatibility.
+ :param might_not_exist: If True the policy check is skipped (and the
+ function returns True) if the specified policy does not exist.
+ Defaults to false.
:return: Returns True if access is permitted else False.
"""
- return policy.check(*(_prepare_check(context, action, target)))
-
-
-def check_if_exists(context, action, target):
- """Verify if the action can be authorized, and raise if it is unknown.
-
- Check whether the action can be performed on the target within this
- context, and raise a PolicyRuleNotFound exception if the action is
- not defined in the policy engine.
- """
- # TODO(salvatore-orlando): Consider modifying oslo policy engine in
- # order to allow to raise distinct exception when check fails and
- # when policy is missing
- # Raise if there's no match for requested action in the policy engine
- if not policy._rules or action not in policy._rules:
- raise exceptions.PolicyRuleNotFound(rule=action)
+ if might_not_exist and not (policy._rules and action in policy._rules):
+ return True
return policy.check(*(_prepare_check(context, action, target)))