{
"context_is_admin": "role:admin",
- "admin_or_owner": "rule:context_is_admin or tenant_id:%(tenant_id)s",
+ "owner": "tenant_id:%(tenant_id)s",
+ "admin_or_owner": "rule:context_is_admin or rule:owner",
"context_is_advsvc": "role:advsvc",
"admin_or_network_owner": "rule:context_is_admin or tenant_id:%(network:tenant_id)s",
+ "admin_owner_or_network_owner": "rule:admin_or_network_owner or rule:owner",
"admin_only": "rule:context_is_admin",
"regular_user": "",
"shared": "field:networks:shared=True",
"create_port:binding:profile": "rule:admin_only",
"create_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"create_port:allowed_address_pairs": "rule:admin_or_network_owner",
- "get_port": "rule:admin_or_owner or rule:context_is_advsvc",
+ "get_port": "rule:admin_owner_or_network_owner or rule:context_is_advsvc",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
"get_port:binding:vif_details": "rule:admin_only",
"update_port:binding:profile": "rule:admin_only",
"update_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:allowed_address_pairs": "rule:admin_or_network_owner",
- "delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
+ "delete_port": "rule:admin_owner_or_network_owner or rule:context_is_advsvc",
"get_router:ha": "rule:admin_only",
"create_router": "rule:regular_user",
"get_policy_bandwidth_limit_rule": "rule:regular_user",
"create_policy_bandwidth_limit_rule": "rule:admin_only",
"delete_policy_bandwidth_limit_rule": "rule:admin_only",
- "update_policy_bandwidth_limit_rule": "rule:admin_only"
-
+ "update_policy_bandwidth_limit_rule": "rule:admin_only",
+
+ "restrict_wildcard": "(not field:rbac_policy:target_tenant=*) or rule:admin_only",
+ "create_rbac_policy": "",
+ "create_rbac_policy:target_tenant": "rule:restrict_wildcard",
+ "update_rbac_policy": "rule:admin_or_owner",
+ "update_rbac_policy:target_tenant": "rule:restrict_wildcard and rule:admin_or_owner",
+ "get_rbac_policy": "rule:admin_or_owner",
+ "delete_rbac_policy": "rule:admin_or_owner"
}
import abc
import collections
import imp
-import itertools
import os
from oslo_config import cfg
def _plugins_support(self, extension):
alias = extension.get_alias()
- supports_extension = any((hasattr(plugin,
- "supported_extension_aliases") and
- alias in plugin.supported_extension_aliases)
- for plugin in self.plugins.values())
+ supports_extension = alias in self.get_supported_extension_aliases()
if not supports_extension:
LOG.warn(_LW("Extension %s not supported by any of loaded "
"plugins"),
manager.NeutronManager.get_service_plugins())
return cls._instance
+ def get_supported_extension_aliases(self):
+ """Gets extension aliases supported by all plugins."""
+ aliases = set()
+ for plugin in self.plugins.values():
+ # we also check all classes that the plugins inherit to see if they
+ # directly provide support for an extension
+ for item in [plugin] + plugin.__class__.mro():
+ try:
+ aliases |= set(
+ getattr(item, "supported_extension_aliases", []))
+ except TypeError:
+ # we land here if a class has an @property decorator for
+ # supported extension aliases. They only work on objects.
+ pass
+ return aliases
+
def check_if_plugin_extensions_loaded(self):
"""Check if an extension supported by a plugin has been loaded."""
- plugin_extensions = set(itertools.chain.from_iterable([
- getattr(plugin, "supported_extension_aliases", [])
- for plugin in self.plugins.values()]))
+ plugin_extensions = self.get_supported_extension_aliases()
missing_aliases = plugin_extensions - set(self.extensions)
if missing_aliases:
raise exceptions.ExtensionsNotFound(
return model_query_scope(context, model)
def _model_query(self, context, model):
+ if isinstance(model, UnionModel):
+ return self._union_model_query(context, model)
+ else:
+ return self._single_model_query(context, model)
+
+ def _union_model_query(self, context, model):
+ # A union query is a query that combines multiple sets of data
+ # together and represents them as one. So if a UnionModel was
+ # passed in, we generate the query for each model with the
+ # appropriate filters and then combine them together with the
+ # .union operator. This allows any subsequent users of the query
+ # to handle it like a normal query (e.g. add pagination/sorting/etc)
+ first_query = None
+ remaining_queries = []
+ for name, component_model in model.model_map.items():
+ query = self._single_model_query(context, component_model)
+ if model.column_type_name:
+ query.add_columns(
+ sql.expression.column('"%s"' % name, is_literal=True).
+ label(model.column_type_name)
+ )
+ if first_query is None:
+ first_query = query
+ else:
+ remaining_queries.append(query)
+ return first_query.union(*remaining_queries)
+
+ def _single_model_query(self, context, model):
query = context.session.query(model)
# define basic filter condition for model query
query_filter = None
columns = [c.name for c in model.__table__.columns]
return dict((k, v) for (k, v) in
six.iteritems(data) if k in columns)
+
+
+class UnionModel(object):
+ """Collection of models that _model_query can query as a single table."""
+
+ def __init__(self, model_map, column_type_name=None):
+ # model_map is a dictionary of models keyed by an arbitrary name.
+ # If column_type_name is specified, the resulting records will have a
+ # column with that name which identifies the source of each record
+ self.model_map = model_map
+ self.column_type_name = column_type_name
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.common import utils
+from neutron import context as ctx
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.db import ipam_non_pluggable_backend
from neutron.db import ipam_pluggable_backend
from neutron.db import models_v2
+from neutron.db import rbac_db_mixin as rbac_mixin
from neutron.db import rbac_db_models as rbac_db
from neutron.db import sqlalchemyutils
from neutron.extensions import l3
class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
- neutron_plugin_base_v2.NeutronPluginBaseV2):
+ neutron_plugin_base_v2.NeutronPluginBaseV2,
+ rbac_mixin.RbacPluginMixin):
"""V2 Neutron plugin interface implementation using SQLAlchemy models.
Whenever a non-read call happens the plugin will call an event handler
self.nova_notifier.send_port_status)
event.listen(models_v2.Port.status, 'set',
self.nova_notifier.record_port_status_changed)
+ for e in (events.BEFORE_CREATE, events.BEFORE_UPDATE,
+ events.BEFORE_DELETE):
+ registry.subscribe(self.validate_network_rbac_policy_change,
+ rbac_mixin.RBAC_POLICY, e)
+
+ def validate_network_rbac_policy_change(self, resource, event, trigger,
+ context, object_type, policy,
+ **kwargs):
+ """Validates network RBAC policy changes.
+
+ On creation, verify that the creator is an admin or that it owns the
+ network it is sharing.
+
+ On update and delete, make sure the tenant losing access does not have
+ resources that depend on that access.
+ """
+ if object_type != 'network':
+ # we only care about network policies
+ return
+ # The object a policy targets cannot be changed so we can look
+ # at the original network for the update event as well.
+ net = self._get_network(context, policy['object_id'])
+ if event in (events.BEFORE_CREATE, events.BEFORE_UPDATE):
+ # we still have to verify that the caller owns the network because
+ # _get_network will succeed on a shared network
+ if not context.is_admin and net['tenant_id'] != context.tenant_id:
+ msg = _("Only admins can manipulate policies on networks "
+ "they do not own.")
+ raise n_exc.InvalidInput(error_message=msg)
+
+ tenant_to_check = None
+ if event == events.BEFORE_UPDATE:
+ new_tenant = kwargs['policy_update']['target_tenant']
+ if policy['target_tenant'] != new_tenant:
+ tenant_to_check = policy['target_tenant']
+
+ if event == events.BEFORE_DELETE:
+ tenant_to_check = policy['target_tenant']
+
+ if tenant_to_check:
+ self.ensure_no_tenant_ports_on_network(net['id'], net['tenant_id'],
+ tenant_to_check)
+
+ def ensure_no_tenant_ports_on_network(self, network_id, net_tenant_id,
+ tenant_id):
+ ctx_admin = ctx.get_admin_context()
+ rb_model = rbac_db.NetworkRBAC
+ other_rbac_entries = self._model_query(ctx_admin, rb_model).filter(
+ and_(rb_model.object_id == network_id,
+ rb_model.action == 'access_as_shared'))
+ ports = self._model_query(ctx_admin, models_v2.Port).filter(
+ models_v2.Port.network_id == network_id)
+ if tenant_id == '*':
+ # for the wildcard we need to get all of the rbac entries to
+ # see if any allow the remaining ports on the network.
+ other_rbac_entries = other_rbac_entries.filter(
+ rb_model.target_tenant != tenant_id)
+ # any port with another RBAC entry covering it or one belonging to
+ # the same tenant as the network owner is ok
+ allowed_tenants = [entry['target_tenant']
+ for entry in other_rbac_entries]
+ allowed_tenants.append(net_tenant_id)
+ ports = ports.filter(
+ ~models_v2.Port.tenant_id.in_(allowed_tenants))
+ else:
+ # if there is a wildcard rule, we can return early because it
+ # allows any ports
+ query = other_rbac_entries.filter(rb_model.target_tenant == '*')
+ if query.count():
+ return
+ ports = ports.filter(models_v2.Port.tenant_id == tenant_id)
+ if ports.count():
+ raise n_exc.InvalidSharedSetting(network=network_id)
def set_ipam_backend(self):
if cfg.CONF.ipam_driver:
--- /dev/null
+# Copyright (c) 2015 Mirantis, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy.orm import exc
+
+from neutron.callbacks import events
+from neutron.callbacks import exceptions as c_exc
+from neutron.callbacks import registry
+from neutron.common import exceptions as n_exc
+from neutron.db import common_db_mixin
+from neutron.db import rbac_db_models as models
+from neutron.extensions import rbac as ext_rbac
+
+# resource name using in callbacks
+RBAC_POLICY = 'rbac-policy'
+
+
+class RbacPluginMixin(common_db_mixin.CommonDbMixin):
+ """Plugin mixin that implements the RBAC DB operations."""
+
+ object_type_cache = {}
+ supported_extension_aliases = ['rbac-policies']
+
+ def create_rbac_policy(self, context, rbac_policy):
+ e = rbac_policy['rbac_policy']
+ try:
+ registry.notify(RBAC_POLICY, events.BEFORE_CREATE, self,
+ context=context, object_type=e['object_type'],
+ policy=e)
+ except c_exc.CallbackFailure as e:
+ raise n_exc.InvalidInput(error_message=e)
+ dbmodel = models.get_type_model_map()[e['object_type']]
+ tenant_id = self._get_tenant_id_for_create(context, e)
+ with context.session.begin(subtransactions=True):
+ db_entry = dbmodel(object_id=e['object_id'],
+ target_tenant=e['target_tenant'],
+ action=e['action'],
+ tenant_id=tenant_id)
+ context.session.add(db_entry)
+ return self._make_rbac_policy_dict(db_entry)
+
+ def _make_rbac_policy_dict(self, db_entry, fields=None):
+ res = {f: db_entry[f] for f in ('id', 'tenant_id', 'target_tenant',
+ 'action', 'object_id')}
+ res['object_type'] = db_entry.object_type
+ return self._fields(res, fields)
+
+ def update_rbac_policy(self, context, id, rbac_policy):
+ pol = rbac_policy['rbac_policy']
+ entry = self._get_rbac_policy(context, id)
+ object_type = entry['object_type']
+ try:
+ registry.notify(RBAC_POLICY, events.BEFORE_UPDATE, self,
+ context=context, policy=entry,
+ object_type=object_type, policy_update=pol)
+ except c_exc.CallbackFailure as ex:
+ raise ext_rbac.RbacPolicyInUse(object_id=entry['object_id'],
+ details=ex)
+ with context.session.begin(subtransactions=True):
+ entry.update(pol)
+ return self._make_rbac_policy_dict(entry)
+
+ def delete_rbac_policy(self, context, id):
+ entry = self._get_rbac_policy(context, id)
+ object_type = entry['object_type']
+ try:
+ registry.notify(RBAC_POLICY, events.BEFORE_DELETE, self,
+ context=context, object_type=object_type,
+ policy=entry)
+ except c_exc.CallbackFailure as ex:
+ raise ext_rbac.RbacPolicyInUse(object_id=entry['object_id'],
+ details=ex)
+ with context.session.begin(subtransactions=True):
+ context.session.delete(entry)
+ self.object_type_cache.pop(id, None)
+
+ def _get_rbac_policy(self, context, id):
+ object_type = self._get_object_type(context, id)
+ dbmodel = models.get_type_model_map()[object_type]
+ try:
+ return self._model_query(context,
+ dbmodel).filter(dbmodel.id == id).one()
+ except exc.NoResultFound:
+ raise ext_rbac.RbacPolicyNotFound(id=id, object_type=object_type)
+
+ def get_rbac_policy(self, context, id, fields=None):
+ return self._make_rbac_policy_dict(
+ self._get_rbac_policy(context, id), fields=fields)
+
+ def get_rbac_policies(self, context, filters=None, fields=None,
+ sorts=None, limit=None, page_reverse=False):
+ model = common_db_mixin.UnionModel(
+ models.get_type_model_map(), 'object_type')
+ return self._get_collection(
+ context, model, self._make_rbac_policy_dict, filters=filters,
+ sorts=sorts, limit=limit, page_reverse=page_reverse)
+
+ def _get_object_type(self, context, entry_id):
+ """Scans all RBAC tables for an ID to figure out the type.
+
+ This will be an expensive operation as the number of RBAC tables grows.
+ The result is cached since object types cannot be updated for a policy.
+ """
+ if entry_id in self.object_type_cache:
+ return self.object_type_cache[entry_id]
+ for otype, model in models.get_type_model_map().items():
+ if (context.session.query(model).
+ filter(model.id == entry_id).first()):
+ self.object_type_cache[entry_id] = otype
+ return otype
+ raise ext_rbac.RbacPolicyNotFound(id=entry_id, object_type='unknown')
--- /dev/null
+# Copyright (c) 2015 Mirantis, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from oslo_config import cfg
+
+from neutron.api import extensions
+from neutron.api.v2 import attributes as attr
+from neutron.api.v2 import base
+from neutron.common import exceptions as n_exc
+from neutron.db import rbac_db_models
+from neutron import manager
+from neutron.quota import resource_registry
+
+
+class RbacPolicyNotFound(n_exc.NotFound):
+ message = _("RBAC policy of type %(object_type)s with ID %(id)s not found")
+
+
+class RbacPolicyInUse(n_exc.Conflict):
+ message = _("RBAC policy on object %(object_id)s cannot be removed "
+ "because other objects depend on it.\nDetails: %(details)s")
+
+
+def convert_valid_object_type(otype):
+ normalized = otype.strip().lower()
+ if normalized in rbac_db_models.get_type_model_map():
+ return normalized
+ msg = _("'%s' is not a valid RBAC object type") % otype
+ raise n_exc.InvalidInput(error_message=msg)
+
+
+RESOURCE_NAME = 'rbac_policy'
+RESOURCE_COLLECTION = 'rbac_policies'
+
+RESOURCE_ATTRIBUTE_MAP = {
+ RESOURCE_COLLECTION: {
+ 'id': {'allow_post': False, 'allow_put': False,
+ 'validate': {'type:uuid': None},
+ 'is_visible': True, 'primary_key': True},
+ 'object_type': {'allow_post': True, 'allow_put': False,
+ 'convert_to': convert_valid_object_type,
+ 'is_visible': True, 'default': None,
+ 'enforce_policy': True},
+ 'object_id': {'allow_post': True, 'allow_put': False,
+ 'validate': {'type:uuid': None},
+ 'is_visible': True, 'default': None,
+ 'enforce_policy': True},
+ 'target_tenant': {'allow_post': True, 'allow_put': True,
+ 'is_visible': True, 'enforce_policy': True,
+ 'default': None},
+ 'tenant_id': {'allow_post': True, 'allow_put': False,
+ 'required_by_policy': True, 'is_visible': True},
+ 'action': {'allow_post': True, 'allow_put': False,
+ # action depends on type so validation has to occur in
+ # the extension
+ 'validate': {'type:string': attr.DESCRIPTION_MAX_LEN},
+ 'is_visible': True},
+ }
+}
+
+rbac_quota_opts = [
+ cfg.IntOpt('quota_rbac_entry', default=10,
+ help=_('Default number of RBAC entries allowed per tenant. '
+ 'A negative value means unlimited.'))
+]
+cfg.CONF.register_opts(rbac_quota_opts, 'QUOTAS')
+
+
+class Rbac(extensions.ExtensionDescriptor):
+ """RBAC policy support."""
+
+ @classmethod
+ def get_name(cls):
+ return "RBAC Policies"
+
+ @classmethod
+ def get_alias(cls):
+ return 'rbac-policies'
+
+ @classmethod
+ def get_description(cls):
+ return ("Allows creation and modification of policies that control "
+ "tenant access to resources.")
+
+ @classmethod
+ def get_updated(cls):
+ return "2015-06-17T12:15:12-30:00"
+
+ @classmethod
+ def get_resources(cls):
+ """Returns Ext Resources."""
+ plural_mappings = {'rbac_policies': 'rbac_policy'}
+ attr.PLURALS.update(plural_mappings)
+ plugin = manager.NeutronManager.get_plugin()
+ params = RESOURCE_ATTRIBUTE_MAP['rbac_policies']
+ collection_name = 'rbac-policies'
+ resource_name = 'rbac_policy'
+ resource_registry.register_resource_by_name(resource_name)
+ controller = base.create_resource(collection_name, resource_name,
+ plugin, params, allow_bulk=True,
+ allow_pagination=False,
+ allow_sorting=True)
+ return [extensions.ResourceExtension(collection_name, controller,
+ attr_map=params)]
+
+ def get_extended_resources(self, version):
+ if version == "2.0":
+ return RESOURCE_ATTRIBUTE_MAP
+ return {}
import testtools
from neutron.tests.api import base
+from neutron.tests.api import clients
from neutron.tests.tempest import config
from neutron.tests.tempest import test
from tempest_lib.common.utils import data_utils
with testtools.ExpectedException(lib_exc.Forbidden):
self.update_port(
port, allowed_address_pairs=self.allowed_address_pairs)
+
+
+class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
+
+ force_tenant_isolation = True
+
+ @classmethod
+ def resource_setup(cls):
+ super(RBACSharedNetworksTest, cls).resource_setup()
+ extensions = cls.admin_client.list_extensions()
+ if not test.is_extension_enabled('rbac_policies', 'network'):
+ msg = "rbac extension not enabled."
+ raise cls.skipException(msg)
+ # NOTE(kevinbenton): the following test seems to be necessary
+ # since the default is 'all' for the above check and these tests
+ # need to get into the gate and be disabled until the service plugin
+ # is enabled in devstack. Is there a better way to do this?
+ if 'rbac-policies' not in [x['alias']
+ for x in extensions['extensions']]:
+ msg = "rbac extension is not in extension listing."
+ raise cls.skipException(msg)
+ creds = cls.isolated_creds.get_alt_creds()
+ cls.client2 = clients.Manager(credentials=creds).network_client
+
+ def _make_admin_net_and_subnet_shared_to_tenant_id(self, tenant_id):
+ net = self.admin_client.create_network(
+ name=data_utils.rand_name('test-network-'))['network']
+ self.addCleanup(self.admin_client.delete_network, net['id'])
+ subnet = self.create_subnet(net, client=self.admin_client)
+ # network is shared to first unprivileged client by default
+ pol = self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=tenant_id
+ )['rbac_policy']
+ return {'network': net, 'subnet': subnet, 'policy': pol}
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff1fff')
+ def test_network_only_visible_to_policy_target(self):
+ net = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)['network']
+ self.client.show_network(net['id'])
+ with testtools.ExpectedException(lib_exc.NotFound):
+ # client2 has not been granted access
+ self.client2.show_network(net['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff2fff')
+ def test_subnet_on_network_only_visible_to_policy_target(self):
+ sub = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)['subnet']
+ self.client.show_subnet(sub['id'])
+ with testtools.ExpectedException(lib_exc.NotFound):
+ # client2 has not been granted access
+ self.client2.show_subnet(sub['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff2eee')
+ def test_policy_target_update(self):
+ res = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)
+ # change to client2
+ update_res = self.admin_client.update_rbac_policy(
+ res['policy']['id'], target_tenant=self.client2.tenant_id)
+ self.assertEqual(self.client2.tenant_id,
+ update_res['rbac_policy']['target_tenant'])
+ # make sure everything else stayed the same
+ res['policy'].pop('target_tenant')
+ update_res['rbac_policy'].pop('target_tenant')
+ self.assertEqual(res['policy'], update_res['rbac_policy'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff3fff')
+ def test_port_presence_prevents_network_rbac_policy_deletion(self):
+ res = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)
+ port = self.client.create_port(network_id=res['network']['id'])['port']
+ # a port on the network should prevent the deletion of a policy
+ # required for it to exist
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.delete_rbac_policy(res['policy']['id'])
+
+ # a wildcard policy should allow the specific policy to be deleted
+ # since it allows the remaining port
+ wild = self.admin_client.create_rbac_policy(
+ object_type='network', object_id=res['network']['id'],
+ action='access_as_shared', target_tenant='*')['rbac_policy']
+ self.admin_client.delete_rbac_policy(res['policy']['id'])
+
+ # now that wilcard is the only remainin, it should be subjected to
+ # to the same restriction
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.delete_rbac_policy(wild['id'])
+ # similarily, we can't update the policy to a different tenant
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.update_rbac_policy(
+ wild['id'], target_tenant=self.client2.tenant_id)
+
+ self.client.delete_port(port['id'])
+ # anchor is gone, delete should pass
+ self.admin_client.delete_rbac_policy(wild['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-beefbeefbeef')
+ def test_tenant_can_delete_port_on_own_network(self):
+ # TODO(kevinbenton): make adjustments to the db lookup to
+ # make this work.
+ msg = "Non-admin cannot currently delete other's ports."
+ raise self.skipException(msg)
+ # pylint: disable=unreachable
+ net = self.create_network() # owned by self.client
+ self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=self.client2.tenant_id)
+ port = self.client2.create_port(network_id=net['id'])['port']
+ self.client.delete_port(port['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff4fff')
+ def test_regular_client_shares_to_another_regular_client(self):
+ net = self.create_network() # owned by self.client
+ with testtools.ExpectedException(lib_exc.NotFound):
+ self.client2.show_network(net['id'])
+ pol = self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=self.client2.tenant_id)
+ self.client2.show_network(net['id'])
+
+ self.assertIn(pol['rbac_policy'],
+ self.client.list_rbac_policies()['rbac_policies'])
+ # ensure that 'client2' can't see the policy sharing the network to it
+ # because the policy belongs to 'client'
+ self.assertNotIn(pol['rbac_policy']['id'],
+ [p['id']
+ for p in self.client2.list_rbac_policies()['rbac_policies']])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff5fff')
+ def test_policy_show(self):
+ res = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)
+ p1 = res['policy']
+ p2 = self.admin_client.create_rbac_policy(
+ object_type='network', object_id=res['network']['id'],
+ action='access_as_shared',
+ target_tenant='*')['rbac_policy']
+
+ self.assertEqual(
+ p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
+ self.assertEqual(
+ p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff6fff')
+ def test_regular_client_blocked_from_sharing_anothers_network(self):
+ net = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)['network']
+ with testtools.ExpectedException(lib_exc.BadRequest):
+ self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=self.client.tenant_id)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff7fff')
+ def test_regular_client_blocked_from_sharing_with_wildcard(self):
+ net = self.create_network()
+ with testtools.ExpectedException(lib_exc.Forbidden):
+ self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant='*')
+ # ensure it works on update as well
+ pol = self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=self.client2.tenant_id)
+ with testtools.ExpectedException(lib_exc.Forbidden):
+ self.client.update_rbac_policy(pol['rbac_policy']['id'],
+ target_tenant='*')
{
"context_is_admin": "role:admin",
- "admin_or_owner": "rule:context_is_admin or tenant_id:%(tenant_id)s",
+ "owner": "tenant_id:%(tenant_id)s",
+ "admin_or_owner": "rule:context_is_admin or rule:owner",
"context_is_advsvc": "role:advsvc",
"admin_or_network_owner": "rule:context_is_admin or tenant_id:%(network:tenant_id)s",
+ "admin_owner_or_network_owner": "rule:admin_or_network_owner or rule:owner",
"admin_only": "rule:context_is_admin",
"regular_user": "",
"shared": "field:networks:shared=True",
"create_port:binding:profile": "rule:admin_only",
"create_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"create_port:allowed_address_pairs": "rule:admin_or_network_owner",
- "get_port": "rule:admin_or_owner or rule:context_is_advsvc",
+ "get_port": "rule:admin_owner_or_network_owner or rule:context_is_advsvc",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
"get_port:binding:vif_details": "rule:admin_only",
"update_port:binding:profile": "rule:admin_only",
"update_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:allowed_address_pairs": "rule:admin_or_network_owner",
- "delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
+ "delete_port": "rule:admin_owner_or_network_owner or rule:context_is_advsvc",
"get_router:ha": "rule:admin_only",
"create_router": "rule:regular_user",
"get_policy_bandwidth_limit_rule": "rule:regular_user",
"create_policy_bandwidth_limit_rule": "rule:admin_only",
"delete_policy_bandwidth_limit_rule": "rule:admin_only",
- "update_policy_bandwidth_limit_rule": "rule:admin_only"
-
+ "update_policy_bandwidth_limit_rule": "rule:admin_only",
+
+ "restrict_wildcard": "(not field:rbac_policy:target_tenant=*) or rule:admin_only",
+ "create_rbac_policy": "",
+ "create_rbac_policy:target_tenant": "rule:restrict_wildcard",
+ "update_rbac_policy": "rule:admin_or_owner",
+ "update_rbac_policy:target_tenant": "rule:restrict_wildcard and rule:admin_or_owner",
+ "get_rbac_policy": "rule:admin_or_owner",
+ "delete_rbac_policy": "rule:admin_or_owner"
}
'policies': 'qos',
'bandwidth_limit_rules': 'qos',
'rule_types': 'qos',
+ 'rbac-policies': '',
}
service_prefix = service_resource_prefix_map.get(
plural_name)
'ipsec_site_connection': 'ipsec-site-connections',
'quotas': 'quotas',
'firewall_policy': 'firewall_policies',
- 'qos_policy': 'policies'
+ 'qos_policy': 'policies',
+ 'rbac_policy': 'rbac_policies',
}
return resource_plural_map.get(resource_name, resource_name + 's')
from neutron.api.v2 import attributes
from neutron.common import config
from neutron.common import exceptions
-from neutron.db import db_base_plugin_v2
from neutron import manager
from neutron.plugins.common import constants
-from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron import quota
from neutron.tests import base
from neutron.tests.unit.api.v2 import test_base
super(ExtensionsTestApp, self).__init__(mapper)
-class FakePluginWithExtension(db_base_plugin_v2.NeutronDbPluginV2):
+class FakePluginWithExtension(object):
"""A fake plugin used only for extension testing in this file."""
supported_extension_aliases = ["FOXNSOX"]
return request_extensions
-class ExtensionExtendedAttributeTestPlugin(
- ml2_plugin.Ml2Plugin):
+class ExtensionExtendedAttributeTestPlugin(object):
supported_extension_aliases = [
'ext-obj-test', "extended-ext-attr"
ext_mgr = extensions.PluginAwareExtensionManager(
extensions_path,
- {constants.CORE: ExtensionExtendedAttributeTestPlugin}
+ {constants.CORE: ExtensionExtendedAttributeTestPlugin()}
)
ext_mgr.extend_resources("2.0", {})
extensions.PluginAwareExtensionManager._instance = ext_mgr