"admin_or_network_owner": [["role:admin"], ["tenant_id:%(network_tenant_id)s"]],
"admin_only": [["role:admin"]],
"regular_user": [],
+ "shared": [["field:networks:shared=True"]],
+ "external": [["field:networks:router:external=True"]],
"default": [["rule:admin_or_owner"]],
"extension:provider_network:view": [["rule:admin_only"]],
"extension:router:view": [["rule:regular_user"]],
"extension:router:set": [["rule:admin_only"]],
- "networks:private:read": [["rule:admin_or_owner"]],
- "networks:private:write": [["rule:admin_or_owner"]],
- "networks:shared:read": [["rule:regular_user"]],
- "networks:shared:write": [["rule:admin_only"]],
-
"subnets:private:read": [["rule:admin_or_owner"]],
"subnets:private:write": [["rule:admin_or_owner"]],
"subnets:shared:read": [["rule:regular_user"]],
"subnets:shared:write": [["rule:admin_only"]],
"create_subnet": [["rule:admin_or_network_owner"]],
- "get_subnet": [],
+ "get_subnet": [["rule:admin_or_owner"], ["rule:shared"]],
"update_subnet": [["rule:admin_or_network_owner"]],
"delete_subnet": [["rule:admin_or_network_owner"]],
"create_network": [],
- "get_network": [],
+ "get_network": [["rule:admin_or_owner"], ["rule:shared"], ["rule:external"]],
"create_network:shared": [["rule:admin_only"]],
- "update_network": [],
- "update_network:shared": [["rule:admin_only"]],
- "delete_network": [],
+ "create_network:router:external": [["rule:admin_only"]],
+ "update_network": [["rule:admin_or_owner"]],
+ "delete_network": [["rule:admin_or_owner"]],
"create_port": [],
"create_port:mac_address": [["rule:admin_or_network_owner"]],
exceptions.InvalidSharedSetting: webob.exc.HTTPConflict,
exceptions.HostRoutesExhausted: webob.exc.HTTPBadRequest,
exceptions.DNSNameServersExhausted: webob.exc.HTTPBadRequest,
+ # Some plugins enforce policies as well
+ exceptions.PolicyNotAuthorized: webob.exc.HTTPForbidden
}
QUOTAS = quota.QUOTAS
# bulk operations. Name mangling is used in order to ensure it
# is qualified by class
__native_bulk_support = True
+ # Plugins, mixin classes implementing extension will register
+ # hooks into the dict below for "augmenting" the "core way" of
+ # building a query for retrieving objects from a model class.
+ # To this aim, the register_model_query_hook and unregister_query_hook
+ # from this class should be invoked
+ _model_query_hooks = {}
def __init__(self):
# NOTE(jkoelker) This is an incomlete implementation. Subclasses
def _model_query(self, context, model):
query = context.session.query(model)
-
+ # define basic filter condition for model query
# NOTE(jkoelker) non-admin queries are scoped to their tenant_id
# NOTE(salvatore-orlando): unless the model allows for shared objects
+ query_filter = None
if not context.is_admin and hasattr(model, 'tenant_id'):
if hasattr(model, 'shared'):
- query = query.filter((model.tenant_id == context.tenant_id) |
- (model.shared))
+ query_filter = ((model.tenant_id == context.tenant_id) |
+ (model.shared))
else:
- query = query.filter(model.tenant_id == context.tenant_id)
+ query_filter = (model.tenant_id == context.tenant_id)
+ # Execute query hooks registered from mixins and plugins
+ for _name, hooks in self._model_query_hooks.get(model,
+ {}).iteritems():
+ query_hook = hooks.get('query')
+ filter_hook = hooks.get('filter')
+ if query_hook:
+ query = query_hook(self, context, model, query)
+ if filter_hook:
+ query_filter = filter_hook(self, context, model, query_filter)
+
+ # NOTE(salvatore-orlando): 'if query_filter' will try to evaluate the
+ # condition, raising an exception
+ if query_filter is not None:
+ query = query.filter(query_filter)
return query
+ @classmethod
+ def register_model_query_hook(cls, model, name, query_hook, filter_hook):
+ """ register an hook to be invoked when a query is executed.
+
+ Add the hooks to the _model_query_hooks dict. Models are the keys
+ of this dict, whereas the value is another dict mapping hook names to
+ callables performing the hook.
+ Each hook has a "query" component, used to build the query expression
+ and a "filter" component, which is used to build the filter expression.
+
+ Query hooks take as input the query being built and return a
+ transformed query expression.
+
+ Filter hooks take as input the filter expression being built and return
+ a transformed filter expression
+ """
+ model_hooks = cls._model_query_hooks.get(model)
+ if not model_hooks:
+ # add key to dict
+ model_hooks = {}
+ cls._model_query_hooks[model] = model_hooks
+ model_hooks[name] = {'query': query_hook, 'filter': filter_hook}
+
def _get_by_id(self, context, model, id):
query = self._model_query(context, model)
- return query.filter_by(id=id).one()
+ return query.filter(model.id == id).one()
def _get_network(self, context, id):
try:
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
+from sqlalchemy.sql import expression as expr
import webob.exc as w_exc
from quantum.api.v2 import attributes
from quantum.common import exceptions as q_exc
from quantum.common import utils
+from quantum.db import db_base_plugin_v2
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import l3
name = sa.Column(sa.String(255))
status = sa.Column(sa.String(16))
admin_state_up = sa.Column(sa.Boolean)
- gw_port_id = sa.Column(sa.String(36), sa.ForeignKey('ports.id',
- ondelete="CASCADE"))
+ gw_port_id = sa.Column(sa.String(36), sa.ForeignKey('ports.id'))
gw_port = orm.relationship(models_v2.Port)
class L3_NAT_db_mixin(l3.RouterPluginBase):
"""Mixin class to add L3/NAT router methods to db_plugin_base_v2"""
+ def _network_model_hook(self, context, original_model, query):
+ query = query.outerjoin(ExternalNetwork,
+ (original_model.id ==
+ ExternalNetwork.network_id))
+ return query
+
+ def _network_filter_hook(self, context, original_model, conditions):
+ if conditions is not None and not hasattr(conditions, '__iter__'):
+ conditions = (conditions, )
+ # Apply the external network filter only in non-admin context
+ if not context.is_admin and hasattr(original_model, 'tenant_id'):
+ conditions = expr.or_(ExternalNetwork.network_id != expr.null(),
+ *conditions)
+ return conditions
+
+ # TODO(salvatore-orlando): Perform this operation without explicitly
+ # referring to db_base_plugin_v2, as plugins that do not extend from it
+ # might exist in the future
+ db_base_plugin_v2.QuantumDbPluginV2.register_model_query_hook(
+ models_v2.Network,
+ "external_net",
+ _network_model_hook,
+ _network_filter_hook)
+
def _get_router(self, context, id):
try:
router = self._get_by_id(context, Router, id)
with context.session.begin(subtransactions=True):
router.update({'gw_port_id': None})
context.session.add(router)
- self.delete_port(context, gw_port['id'], l3_port_check=False)
+ self.delete_port(context.elevated(), gw_port['id'],
+ l3_port_check=False)
if network_id is not None and (gw_port is None or
gw_port['network_id'] != network_id):
# Port has no 'tenant-id', as it is hidden from user
- gw_port = self.create_port(context, {
+ gw_port = self.create_port(context.elevated(), {
'port':
- {'network_id': network_id,
+ {'tenant_id': '', # intentionally not set
+ 'network_id': network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
'device_id': router_id,
'name': ''}})
if not len(gw_port['fixed_ips']):
- self.delete_port(context, gw_port['id'], l3_port_check=False)
+ self.delete_port(context.elevated(), gw_port['id'],
+ l3_port_check=False)
msg = ('No IPs available for external network %s' %
network_id)
raise q_exc.BadRequest(resource='router', msg=msg)
ports = self.get_ports(context, filters=device_filter)
if ports:
raise l3.RouterInUse(router_id=id)
- # NOTE(salvatore-orlando): gw port will be automatically deleted
- # thanks to cascading on the ORM relationship
+
+ # delete any gw port
+ device_filter = {'device_id': [id],
+ 'device_owner': [DEVICE_OWNER_ROUTER_GW]}
+ ports = self.get_ports(context.elevated(), filters=device_filter)
+ if ports:
+ self._delete_port(context.elevated(), ports[0]['id'])
+
context.session.delete(router)
def get_router(self, context, id, fields=None):
'subnet_id': subnet['id']}
port = self.create_port(context, {
'port':
- {'network_id': subnet['network_id'],
+ {'tenant_id': subnet['tenant_id'],
+ 'network_id': subnet['network_id'],
'fixed_ips': [fixed_ip],
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
# confirm that this router has a floating
# ip enabled gateway with support for this floating IP network
try:
- port_qry = context.session.query(models_v2.Port)
+ port_qry = context.elevated().session.query(models_v2.Port)
ports = port_qry.filter_by(
network_id=floating_network_id,
device_id=router_id,
def _update_fip_assoc(self, context, fip, floatingip_db, external_port):
port_id = internal_ip_address = router_id = None
+ if (('fixed_ip_address' in fip and fip['fixed_ip_address']) and
+ not ('port_id' in fip and fip['port_id'])):
+ msg = "fixed_ip_address cannot be specified without a port_id"
+ raise q_exc.BadRequest(resource='floatingip', msg=msg)
if 'port_id' in fip and fip['port_id']:
port_qry = context.session.query(FloatingIP)
try:
# This external port is never exposed to the tenant.
# it is used purely for internal system and admin use when
# managing floating IPs.
- external_port = self.create_port(context, {
+ external_port = self.create_port(context.elevated(), {
'port':
- {'network_id': f_net_id,
+ {'tenant_id': '', # tenant intentionally not set
+ 'network_id': f_net_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
if not external_port['fixed_ips']:
msg = "Unable to find any IP address on external network"
# remove the external port
- self.delete_port(context, external_port['id'], l3_port_check=False)
+ self.delete_port(context.elevated(), external_port['id'],
+ l3_port_check=False)
raise q_exc.BadRequest(resource='floatingip', msg=msg)
floating_ip_address = external_port['fixed_ips'][0]['ip_address']
context.session.add(floatingip_db)
# TODO(salvatore-orlando): Avoid broad catch
# Maybe by introducing base class for L3 exceptions
+ except q_exc.BadRequest:
+ LOG.exception("Unable to create Floating ip due to a "
+ "malformed request")
+ raise
except Exception:
LOG.exception("Floating IP association failed")
# Remove the port created for internal purposes
fip['id'] = id
fip_port_id = floatingip_db['floating_port_id']
self._update_fip_assoc(context, fip, floatingip_db,
- self.get_port(context, fip_port_id))
+ self.get_port(context.elevated(),
+ fip_port_id))
return self._make_floatingip_dict(floatingip_db)
def delete_floatingip(self, context, id):
floatingip = self._get_floatingip(context, id)
with context.session.begin(subtransactions=True):
context.session.delete(floatingip)
- self.delete_port(context, floatingip['floating_port_id'],
+ self.delete_port(context.elevated(),
+ floatingip['floating_port_id'],
l3_port_check=False)
def get_floatingip(self, context, id, fields=None):
def _extend_network_dict_l3(self, context, network):
if self._check_l3_view_auth(context, network):
- network['router:external'] = self._network_is_external(
+ network[l3.EXTERNAL] = self._network_is_external(
context, network['id'])
def _process_l3_create(self, context, net_data, net_id):
- external = net_data.get('router:external')
+ external = net_data.get(l3.EXTERNAL)
external_set = attributes.is_attr_set(external)
if not external_set:
def _process_l3_update(self, context, net_data, net_id):
- new_value = net_data.get('router:external')
+ new_value = net_data.get(l3.EXTERNAL)
if not attributes.is_attr_set(new_value):
return
context.session.query(models_v2.Port).filter_by(
device_owner=DEVICE_OWNER_ROUTER_GW,
network_id=net_id).first()
- raise ExternalNetworkInUse(net_id=net_id)
+ raise l3.ExternalNetworkInUse(net_id=net_id)
except exc.NoResultFound:
pass # expected
},
}
+EXTERNAL = 'router:external'
EXTENDED_ATTRIBUTES_2_0 = {
- 'networks': {'router:external': {'allow_post': True,
- 'allow_put': True,
- 'default': attr.ATTR_NOT_SPECIFIED,
- 'is_visible': True,
- 'convert_to': attr.convert_to_boolean,
- 'validate': {'type:boolean': None}}}}
+ 'networks': {EXTERNAL: {'allow_post': True,
+ 'allow_put': True,
+ 'default': attr.ATTR_NOT_SPECIFIED,
+ 'is_visible': True,
+ 'convert_to': attr.convert_to_boolean,
+ 'validate': {'type:boolean': None},
+ 'enforce_policy': True,
+ 'required_by_policy': True}}}
l3_quota_opts = [
cfg.IntOpt('quota_router',
"""
Policy engine for quantum. Largely copied from nova.
"""
+import logging
+
from quantum.api.v2 import attributes
from quantum.common import exceptions
from quantum.openstack.common import cfg
import quantum.common.utils as utils
from quantum.openstack.common import policy
+
+LOG = logging.getLogger(__name__)
_POLICY_PATH = None
_POLICY_CACHE = {}
reload_func=_set_brain)
+def get_resource_and_action(action):
+ """ Extract resource and action (write, read) from api operation """
+ data = action.split(':', 1)[0].split('_', 1)
+ return ("%ss" % data[-1], data[0] != 'get')
+
+
def _set_brain(data):
default_rule = 'default'
policy.set_brain(policy.Brain.load_json(data, default_rule))
-def _get_resource_and_action(action):
- data = action.split(':', 1)[0].split('_', 1)
- return ("%ss" % data[-1], data[0] != 'get')
-
-
def _is_attribute_explicitly_set(attribute_name, resource, target):
"""Verify that an attribute is present and has a non-default value"""
if ('default' in resource[attribute_name] and
"parent" resource of the targeted one.
"""
target = original_target.copy()
- resource, _w = _get_resource_and_action(action)
+ resource, _a = get_resource_and_action(action)
hierarchy_info = attributes.RESOURCE_HIERARCHY_MAP.get(resource, None)
if hierarchy_info and plugin:
# use the 'singular' version of the resource name
return target
-def _create_access_rule_match(resource, is_write, shared):
- if shared == resource[attributes.SHARED]:
- return ('rule:%s:%s:%s' % (resource,
- shared and 'shared' or 'private',
- is_write and 'write' or 'read'), )
-
-
-def _build_perm_match(action, target):
- """Create the permission rule match.
-
- Given the current access right on a network (shared/private), and
- the type of the current operation (read/write), builds a match
- rule of the type <resource>:<sharing_mode>:<operation_type>
- """
- resource, is_write = _get_resource_and_action(action)
- res_map = attributes.RESOURCE_ATTRIBUTE_MAP
- if (resource in res_map and
- attributes.SHARED in res_map[resource] and
- attributes.SHARED in target):
- return ('rule:%s:%s:%s' % (resource,
- target[attributes.SHARED]
- and 'shared' or 'private',
- is_write and 'write' or 'read'), )
-
-
def _build_match_list(action, target):
"""Create the list of rules to match for a given action.
"""
match_list = ('rule:%s' % action,)
- resource, is_write = _get_resource_and_action(action)
- # assigning to variable with short name for improving readability
- res_map = attributes.RESOURCE_ATTRIBUTE_MAP
- if resource in res_map:
- for attribute_name in res_map[resource]:
- if _is_attribute_explicitly_set(attribute_name,
- res_map[resource],
- target):
- attribute = res_map[resource][attribute_name]
- if 'enforce_policy' in attribute and is_write:
- match_list += ('rule:%s:%s' % (action,
- attribute_name),)
- # add permission-based rule (for shared resources)
- perm_match = _build_perm_match(action, target)
- if perm_match:
- match_list += perm_match
- # the policy engine must AND between all the rules
+ resource, is_write = get_resource_and_action(action)
+ if is_write:
+ # assigning to variable with short name for improving readability
+ res_map = attributes.RESOURCE_ATTRIBUTE_MAP
+ if resource in res_map:
+ for attribute_name in res_map[resource]:
+ if _is_attribute_explicitly_set(attribute_name,
+ res_map[resource],
+ target):
+ attribute = res_map[resource][attribute_name]
+ if 'enforce_policy' in attribute and is_write:
+ match_list += ('rule:%s:%s' % (action,
+ attribute_name),)
return [match_list]
+@policy.register('field')
+def check_field(brain, match_kind, match, target_dict, cred_dict):
+ # If this method is invoked for the wrong kind of match
+ # which should never happen, just skip the check and don't
+ # fail the policy evaluation
+ if match_kind != 'field':
+ LOG.warning("Field check function invoked with wrong match_kind:%s",
+ match_kind)
+ return True
+ resource, field_value = match.split(':', 1)
+ field, value = field_value.split('=', 1)
+ target_value = target_dict.get(field)
+ # target_value might be a boolean, explicitly compare with None
+ if target_value is None:
+ LOG.debug("Unable to find requested field: %s in target: %s",
+ field, target_dict)
+ return False
+ # Value migth need conversion - we need help from the attribute map
+ conv_func = attributes.RESOURCE_ATTRIBUTE_MAP[resource][field].get(
+ 'convert_to', lambda x: x)
+ if target_value != conv_func(value):
+ LOG.debug("%s does not match the value in the target object:%s",
+ conv_func(value), target_value)
+ return False
+ # If we manage to get here, the policy check is successful
+ return True
+
+
def check(context, action, target, plugin=None):
"""Verifies that the action is valid on the target in this context.
"""
init()
-
real_target = _build_target(action, target, plugin, context)
match_list = _build_match_list(action, real_target)
credentials = context.to_dict()
-
policy.enforce(match_list, real_target, credentials,
exceptions.PolicyNotAuthorized, action=action)
req.environ['quantum.context'] = kwargs['context']
return req.get_response(self.api)
- def _create_network(self, fmt, name, admin_status_up, **kwargs):
+ def _create_network(self, fmt, name, admin_status_up,
+ arg_list=None, **kwargs):
data = {'network': {'name': name,
'admin_state_up': admin_status_up,
'tenant_id': self._tenant_id}}
- for arg in ('admin_state_up', 'tenant_id', 'shared'):
+ for arg in (('admin_state_up', 'tenant_id', 'shared') +
+ (arg_list or ())):
# Arg must be present and not empty
if arg in kwargs and kwargs[arg]:
data['network'][arg] = kwargs[arg]
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
- def _make_port(self, fmt, net_id, **kwargs):
- res = self._create_port(fmt, net_id, **kwargs)
- return self.deserialize(fmt, res)
-
def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports']:
return self.api
if not subnet:
with self.subnet() as subnet:
net_id = subnet['subnet']['network_id']
- port = self._make_port(fmt, net_id, fixed_ips=fixed_ips,
- **kwargs)
+ res = self._create_port(fmt, net_id, **kwargs)
+ port = self.deserialize(fmt, res)
+ # Things can go wrong - raise HTTP exc with res code only
+ # so it can be caught by unit tests
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+
yield port
if not no_delete:
self._delete('ports', port['port']['id'])
else:
net_id = subnet['subnet']['network_id']
- port = self._make_port(fmt, net_id, fixed_ips=fixed_ips,
- **kwargs)
+ res = self._create_port(fmt, net_id, **kwargs)
+ port = self.deserialize(fmt, res)
+ # Things can go wrong - raise HTTP exc with res code only
+ # so it can be caught by unit tests
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
yield port
if not no_delete:
self._delete('ports', port['port']['id'])
admin_status_up=True)
network = self.deserialize(fmt, res)
network_id = network['network']['id']
- port = self._make_port(fmt, network_id, device_owner='network:dhcp')
+ self._create_port(fmt, network_id, device_owner='network:dhcp')
req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
network_id = network['network']['id']
subnet = self._make_subnet(fmt, network, gateway_ip,
cidr, ip_version=4)
- port = self._make_port(fmt, network['network']['id'],
- device_owner='network:dhcp')
+ self._create_port(fmt,
+ network['network']['id'],
+ device_owner='network:dhcp')
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
import contextlib
import copy
+import itertools
import logging
import unittest
from quantum.common import config
from quantum.common.test_lib import test_config
from quantum.common import utils
+from quantum import context
from quantum.db import db_base_plugin_v2
from quantum.db import l3_db
+from quantum.db import models_v2
from quantum.extensions import extensions
from quantum.extensions import l3
from quantum import manager
class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
+ def _create_network(self, fmt, name, admin_status_up, **kwargs):
+ """ Override the routine for allowing the router:external attribute """
+ # attributes containing a colon should be passed with
+ # a double underscore
+ new_args = dict(itertools.izip(map(lambda x: x.replace('__', ':'),
+ kwargs),
+ kwargs.values()))
+ arg_list = (l3.EXTERNAL,)
+ return super(L3NatDBTestCase, self)._create_network(fmt,
+ name,
+ admin_status_up,
+ arg_list=arg_list,
+ **new_args)
+
def setUp(self):
test_config['plugin_name_v2'] = (
'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin')
def _set_net_external(self, net_id):
self._update('networks', net_id,
- {'network': {'router:external': True}})
+ {'network': {l3.EXTERNAL: True}})
def _create_floatingip(self, fmt, network_id, port_id=None,
fixed_ip=None):
self.assertEquals(len(body['networks']), 2)
body = self._list('networks',
- query_params="router:external=True")
+ query_params="%s=True" % l3.EXTERNAL)
self.assertEquals(len(body['networks']), 1)
body = self._list('networks',
- query_params="router:external=False")
+ query_params="%s=False" % l3.EXTERNAL)
self.assertEquals(len(body['networks']), 1)
+
+ def test_network_filter_hook_admin_context(self):
+ plugin = manager.QuantumManager.get_plugin()
+ ctx = context.Context(None, None, is_admin=True)
+ model = models_v2.Network
+ conditions = plugin._network_filter_hook(ctx, model, [])
+ self.assertEqual(conditions, [])
+
+ def test_network_filter_hook_nonadmin_context(self):
+ plugin = manager.QuantumManager.get_plugin()
+ ctx = context.Context('edinson', 'cavani')
+ model = models_v2.Network
+ txt = "externalnetworks.network_id IS NOT NULL"
+ conditions = plugin._network_filter_hook(ctx, model, [])
+ self.assertEqual(conditions.__str__(), txt)
+ # Try to concatenate confitions
+ conditions = plugin._network_filter_hook(ctx, model, conditions)
+ self.assertEqual(conditions.__str__(), "%s OR %s" % (txt, txt))
+
+ def test_create_port_external_network_non_admin_fails(self):
+ with self.network(router__external=True) as ext_net:
+ with self.subnet(network=ext_net) as ext_subnet:
+ with self.assertRaises(exc.HTTPClientError) as ctx_manager:
+ with self.port(subnet=ext_subnet,
+ set_context='True',
+ tenant_id='noadmin'):
+ pass
+ self.assertEquals(ctx_manager.exception.code, 403)
+
+ def test_create_port_external_network_admin_suceeds(self):
+ with self.network(router__external=True) as ext_net:
+ with self.subnet(network=ext_net) as ext_subnet:
+ with self.port(subnet=ext_subnet) as port:
+ self.assertEqual(port['port']['network_id'],
+ ext_net['network']['id'])
+
+ def test_create_external_network_non_admin_fails(self):
+ with self.assertRaises(exc.HTTPClientError) as ctx_manager:
+ with self.network(router__external=True,
+ set_context='True',
+ tenant_id='noadmin'):
+ pass
+ self.assertEquals(ctx_manager.exception.code, 403)
+
+ def test_create_external_network_admin_suceeds(self):
+ with self.network(router__external=True) as ext_net:
+ self.assertEqual(ext_net['network'][l3.EXTERNAL],
+ True)
from quantum.common import exceptions
from quantum.common import utils
from quantum import context
+from quantum.openstack.common import cfg
from quantum.openstack.common import importutils
from quantum.openstack.common import policy as common_policy
from quantum import policy
self.rules = {
"admin_or_network_owner": [["role:admin"],
["tenant_id:%(network_tenant_id)s"]],
+ "admin_or_owner": [["role:admin"], ["tenant_id:%(tenant_id)s"]],
"admin_only": [["role:admin"]],
"regular_user": [["role:user"]],
+ "shared": [["field:networks:shared=True"]],
+ "external": [["field:networks:router:external=True"]],
"default": [],
- "networks:private:read": [["rule:admin_only"]],
- "networks:private:write": [["rule:admin_only"]],
- "networks:shared:read": [["rule:regular_user"]],
- "networks:shared:write": [["rule:admin_only"]],
-
- "create_network": [],
+ "create_network": [["rule:admin_or_owner"]],
"create_network:shared": [["rule:admin_only"]],
"update_network": [],
"update_network:shared": [["rule:admin_only"]],
- "get_network": [],
+ "get_network": [["rule:admin_or_owner"],
+ ["rule:shared"],
+ ["rule:external"]],
"create_port:mac": [["rule:admin_or_network_owner"]],
}
self.patcher.stop()
policy.reset()
- def test_nonadmin_write_on_private_returns_403(self):
- action = "update_network"
- user_context = context.Context('', "user", roles=['user'])
- # 384 is the int value of the bitmask for rw------
- target = {'tenant_id': 'the_owner', 'shared': False}
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
- user_context, action, target, None)
+ def _test_action_on_attr(self, context, action, attr, value,
+ exception=None):
+ action = "%s_network" % action
+ target = {'tenant_id': 'the_owner', attr: value}
+ if exception:
+ self.assertRaises(exception, policy.enforce,
+ context, action, target, None)
+ else:
+ result = policy.enforce(context, action, target, None)
+ self.assertEqual(result, None)
- def test_nonadmin_read_on_private_returns_403(self):
- action = "get_network"
+ def _test_nonadmin_action_on_attr(self, action, attr, value,
+ exception=None):
user_context = context.Context('', "user", roles=['user'])
- # 384 is the int value of the bitmask for rw------
- target = {'tenant_id': 'the_owner', 'shared': False}
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
- user_context, action, target, None)
+ self._test_action_on_attr(user_context, action, attr,
+ value, exception)
- def test_nonadmin_write_on_shared_returns_403(self):
- action = "update_network"
- user_context = context.Context('', "user", roles=['user'])
- # 384 is the int value of the bitmask for rw-r--r--
- target = {'tenant_id': 'the_owner', 'shared': True}
- self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
- user_context, action, target, None)
+ def test_nonadmin_write_on_private_fails(self):
+ self._test_nonadmin_action_on_attr('create', 'shared', False,
+ exceptions.PolicyNotAuthorized)
- def test_nonadmin_read_on_shared_returns_200(self):
- action = "get_network"
- user_context = context.Context('', "user", roles=['user'])
- # 420 is the int value of the bitmask for rw-r--r--
- target = {'tenant_id': 'the_owner', 'shared': True}
- result = policy.enforce(user_context, action, target, None)
- self.assertEqual(result, None)
+ def test_nonadmin_read_on_private_fails(self):
+ self._test_nonadmin_action_on_attr('get', 'shared', False,
+ exceptions.PolicyNotAuthorized)
+
+ def test_nonadmin_write_on_shared_fails(self):
+ self._test_nonadmin_action_on_attr('create', 'shared', True,
+ exceptions.PolicyNotAuthorized)
+
+ def test_nonadmin_read_on_shared_succeeds(self):
+ self._test_nonadmin_action_on_attr('get', 'shared', True)
def _test_enforce_adminonly_attribute(self, action):
admin_context = context.get_admin_context()
def test_enforce_adminoly_attribute_nonadminctx_returns_403(self):
action = "create_network"
- target = {'shared': True}
+ target = {'shared': True, 'tenant_id': 'somebody_else'}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)