"extension:router:view": "rule:regular_user",
"extension:router:set": "rule:admin_only",
- "extension:router:add_router_interface": "rule:admin_or_owner",
- "extension:router:remove_router_interface": "rule:admin_or_owner",
"extension:port_binding:view": "rule:admin_only",
"extension:port_binding:set": "rule:admin_only",
def __getattr__(self, name):
if name in self._member_actions:
def _handle_action(request, id, **kwargs):
- if 'body' in kwargs:
- body = kwargs.pop('body')
- return getattr(self._plugin, name)(request.context, id,
- body, **kwargs)
- else:
- return getattr(self._plugin, name)(request.context, id,
- **kwargs)
+ arg_list = [request.context, id]
+ # Fetch the resource and verify if the user can access it
+ try:
+ resource = self._item(request, id, True)
+ except exceptions.PolicyNotAuthorized:
+ raise webob.exc.HTTPNotFound()
+ body = kwargs.pop('body', None)
+ # Explicit comparison with None to distinguish from {}
+ if body is not None:
+ arg_list.append(body)
+ # TODO(salvatore-orlando): bp/make-authz-ortogonal
+ # The body of the action request should be included
+ # in the info passed to the policy engine
+ # Enforce policy, if any, for this action
+ # It is ok to raise a 403 because accessibility to the
+ # object was checked earlier in this method
+ policy.enforce(request.context, name, resource,
+ plugin=self._plugin)
+ return getattr(self._plugin, name)(*arg_list, **kwargs)
return _handle_action
else:
raise AttributeError
pass
def add_router_interface(self, context, router_id, interface_info):
- # make sure router exists
- router = self._get_router(context, router_id)
if not interface_info:
msg = _("Either subnet_id or port_id must be specified")
raise q_exc.BadRequest(resource='router', msg=msg)
- try:
- policy.enforce(context,
- "extension:router:add_router_interface",
- self._make_router_dict(router))
- except q_exc.PolicyNotAuthorized:
- raise l3.RouterNotFound(router_id=router_id)
-
if 'port_id' in interface_info:
if 'subnet_id' in interface_info:
msg = _("Cannot specify both subnet-id and port-id")
router_id=router_id, subnet_id=subnet_id)
def remove_router_interface(self, context, router_id, interface_info):
- # make sure router exists
- router = self._get_router(context, router_id)
- try:
- policy.enforce(context,
- "extension:router:remove_router_interface",
- self._make_router_dict(router))
- except q_exc.PolicyNotAuthorized:
- raise l3.RouterNotFound(router_id=router_id)
-
if not interface_info:
msg = _("Either subnet_id or port_id must be specified")
raise q_exc.BadRequest(resource='router', msg=msg)
:return: Returns True if access is permitted else False.
"""
init()
+ # Compare with None to distinguish case in which target is {}
+ if target is None:
+ target = {}
real_target = _build_target(action, target, plugin, context)
match_rule = _build_match_rule(action, real_target)
credentials = context.to_dict()
"""
init()
+ # Compare with None to distinguish case in which target is {}
+ if target is None:
+ target = {}
real_target = _build_target(action, target, plugin, context)
match_rule = _build_match_rule(action, real_target)
credentials = context.to_dict()