"external": "field:networks:router:external=True",
"default": "rule:admin_or_owner",
- "extension:provider_network:view": "rule:admin_only",
- "extension:provider_network:set": "rule:admin_only",
-
- "extension:router:view": "rule:regular_user",
-
- "extension:port_binding:view": "rule:admin_only",
- "extension:port_binding:set": "rule:admin_only",
- "get_port:binding:host_id": "rule:admin_only",
- "get_port:binding:vif_type": "rule:admin_only",
- "get_port:binding:profile": "rule:admin_only",
- "get_port:binding:capabilities": "rule:admin_only",
- "create_port:binding:host_id": "rule:admin_only",
- "update_port:binding:host_id": "rule:admin_only",
-
"subnets:private:read": "rule:admin_or_owner",
"subnets:private:write": "rule:admin_or_owner",
"subnets:shared:read": "rule:regular_user",
"create_network": "",
"get_network": "rule:admin_or_owner or rule:shared or rule:external",
+ "get_network:router:external": "rule:regular_user",
+ "get_network:provider:network_type": "rule:admin_only",
+ "get_network:provider:physical_network": "rule:admin_only",
+ "get_network:provider:segmentation_id": "rule:admin_only",
+ "get_network:queue_id": "rule:admin_only",
"create_network:shared": "rule:admin_only",
"create_network:router:external": "rule:admin_only",
"create_network:provider:network_type": "rule:admin_only",
"create_port:mac_address": "rule:admin_or_network_owner",
"create_port:fixed_ips": "rule:admin_or_network_owner",
"create_port:port_security_enabled": "rule:admin_or_network_owner",
+ "create_port:binding:host_id": "rule:admin_only",
"get_port": "rule:admin_or_owner",
+ "get_port:queue_id": "rule:admin_only",
+ "get_port:binding:vif_type": "rule:admin_only",
+ "get_port:binding:capabilities": "rule:admin_only",
+ "get_port:binding:host_id": "rule:admin_only",
+ "get_port:binding:profile": "rule:admin_only",
"update_port": "rule:admin_or_owner",
"update_port:fixed_ips": "rule:admin_or_network_owner",
"update_port:port_security_enabled": "rule:admin_or_network_owner",
+ "update_port:binding:host_id": "rule:admin_only",
"delete_port": "rule:admin_or_owner",
- "extension:service_type:view_extended": "rule:admin_only",
"create_service_type": "rule:admin_only",
"update_service_type": "rule:admin_only",
"delete_service_type": "rule:admin_only",
"create_qos_queue": "rule:admin_only",
"get_qos_queue": "rule:admin_only",
- "get_qos_queues": "rule:admin_only",
"update_agent": "rule:admin_only",
"delete_agent": "rule:admin_only",
% self._plugin.__class__.__name__)
return getattr(self._plugin, native_sorting_attr_name, False)
- def _is_visible(self, attr):
- attr_val = self._attr_info.get(attr)
- return attr_val and attr_val['is_visible']
-
- def _view(self, data, fields_to_strip=None):
+ def _is_visible(self, context, attr_name, data):
+ action = "%s:%s" % (self._plugin_handlers[self.SHOW], attr_name)
+ # Optimistically init authz_check to True
+ authz_check = True
+ try:
+ attr = (attributes.RESOURCE_ATTRIBUTE_MAP
+ [self._collection].get(attr_name))
+ if attr and attr.get('enforce_policy'):
+ authz_check = policy.check_if_exists(
+ context, action, data)
+ except KeyError:
+ # The extension was not configured for adding its resources
+ # to the global resource attribute map. Policy check should
+ # not be performed
+ LOG.debug(_("The resource %(resource)s was not found in the "
+ "RESOURCE_ATTRIBUTE_MAP; unable to perform authZ "
+ "check for attribute %(attr)s"),
+ {'resource': self._collection,
+ 'attr': attr})
+ except exceptions.PolicyRuleNotFound:
+ LOG.debug(_("Policy rule:%(action)s not found. Assuming no "
+ "authZ check is defined for %(attr)s"),
+ {'action': action,
+ 'attr': attr_name})
+ attr_val = self._attr_info.get(attr_name)
+ return attr_val and attr_val['is_visible'] and authz_check
+
+ def _view(self, context, data, fields_to_strip=None):
# make sure fields_to_strip is iterable
if not fields_to_strip:
fields_to_strip = []
return dict(item for item in data.iteritems()
- if (self._is_visible(item[0]) and
+ if (self._is_visible(context, item[0], data) and
item[0] not in fields_to_strip))
def _do_field_list(self, original_fields):
obj_list = obj_getter(request.context, **kwargs)
obj_list = sorting_helper.sort(obj_list)
obj_list = pagination_helper.paginate(obj_list)
-
# Check authz
if do_authz:
# FIXME(salvatore-orlando): obj_getter might return references to
obj,
plugin=self._plugin)]
collection = {self._collection:
- [self._view(obj,
+ [self._view(request.context, obj,
fields_to_strip=fields_to_add)
for obj in obj_list]}
pagination_links = pagination_helper.get_links(obj_list)
api_common.list_args(request, "fields"))
parent_id = kwargs.get(self._parent_id_name)
return {self._resource:
- self._view(self._item(request,
+ self._view(request.context,
+ self._item(request,
id,
do_authz=True,
field_list=field_list,
kwargs = {self._resource: item}
if parent_id:
kwargs[self._parent_id_name] = parent_id
- objs.append(self._view(obj_creator(request.context,
+ objs.append(self._view(request.context,
+ obj_creator(request.context,
**kwargs)))
return objs
# Note(salvatore-orlando): broad catch as in theory a plugin
# plugin does atomic bulk create operations
obj_creator = getattr(self._plugin, "%s_bulk" % action)
objs = obj_creator(request.context, body, **kwargs)
- return notify({self._collection: [self._view(obj)
+ return notify({self._collection: [self._view(request.context, obj)
for obj in objs]})
else:
obj_creator = getattr(self._plugin, action)
else:
kwargs.update({self._resource: body})
obj = obj_creator(request.context, **kwargs)
- return notify({self._resource: self._view(obj)})
+ return notify({self._resource: self._view(request.context,
+ obj)})
def delete(self, request, id, **kwargs):
"""Deletes the specified entity."""
notifier_method,
notifier_api.CONF.default_notification_level,
{self._resource + '_id': id})
- result = {self._resource: self._view(obj)}
+ result = {self._resource: self._view(request.context, obj)}
self._send_dhcp_notification(request.context,
result,
notifier_method)
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj = obj_updater(request.context, id, **kwargs)
- result = {self._resource: self._view(obj)}
+ result = {self._resource: self._view(request.context, obj)}
notifier_method = self._resource + '.update.end'
notifier_api.notify(request.context,
self._publisher_id,
"on network %(net_id)s")
-class PolicyNotFound(NotFound):
+class PolicyFileNotFound(NotFound):
message = _("Policy configuration policy.json could not be found")
+class PolicyRuleNotFound(NotFound):
+ message = _("Requested rule:%(rule)s cannot be found")
+
+
class StateInvalid(BadRequest):
message = _("Unsupported port state: %(port_state)s")
from quantum.openstack.common import log as logging
from quantum.openstack.common.notifier import api as notifier_api
from quantum.openstack.common import uuidutils
-from quantum import policy
LOG = logging.getLogger(__name__)
routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
- def _check_l3_view_auth(self, context, network):
- return policy.check(context,
- "extension:router:view",
- network)
-
def _network_is_external(self, context, net_id):
try:
context.session.query(ExternalNetwork).filter_by(
return False
def _extend_network_dict_l3(self, context, network):
- if self._check_l3_view_auth(context, network):
- network[l3.EXTERNAL] = self._network_is_external(
- context, network['id'])
+ network[l3.EXTERNAL] = self._network_is_external(
+ context, network['id'])
def _process_l3_create(self, context, net_data, net_id):
external = net_data.get(l3.EXTERNAL)
from quantum.db import models_v2
from quantum.extensions import portbindings
from quantum.openstack.common import log as logging
-from quantum import policy
LOG = logging.getLogger(__name__)
None,
_port_result_filter_hook)
- def _check_portbindings_view_auth(self, context, port):
- #TODO(salv-orlando): Remove this as part of bp/make-authz-orthogonal
- keys_to_delete = []
- for key in port:
- if key.startswith('binding'):
- policy_rule = "get_port:%s" % key
- if not policy.check(context, policy_rule, port):
- keys_to_delete.append(key)
- for key in keys_to_delete:
- del port[key]
- return port
-
def _process_portbindings_create_and_update(self, context, port_data,
port):
host = port_data.get(portbindings.HOST_ID)
from quantum.db import model_base
from quantum.db import models_v2
from quantum.openstack.common import log as logging
-from quantum import policy
LOG = logging.getLogger(__name__)
context.session.add(ServiceDefinition(**svc_def))
return svc_type_db
- def _check_service_type_view_auth(self, context, service_type):
- # FIXME(salvatore-orlando): This should be achieved via policy
- # engine without need for explicit checks in manager code.
- # Also, the policy in this way does not make a lot of sense
- return policy.check(context,
- "extension:service_type:view_extended",
- service_type)
-
def _get_service_type(self, context, svc_type_id):
try:
query = context.session.query(ServiceType)
def _make_svc_def_dict(svc_def_db):
svc_def = {'service_class': svc_def_db['service_class']}
- if self._check_service_type_view_auth(context,
- svc_type.as_dict()):
- svc_def.update({'plugin': svc_def_db['plugin'],
- 'driver': svc_def_db['driver']})
+ svc_def.update({'plugin': svc_def_db['plugin'],
+ 'driver': svc_def_db['driver']})
return svc_def
res = {'id': svc_type['id'],
'name': svc_type['name'],
'default': svc_type['default'],
+ 'num_instances': svc_type['num_instances'],
'service_definitions':
[_make_svc_def_dict(svc_def) for svc_def
in svc_type['service_definitions']]}
- if self._check_service_type_view_auth(context,
- svc_type.as_dict()):
- res['num_instances'] = svc_type['num_instances']
# Field selection
if fields:
return dict(((k, v) for k, v in res.iteritems()
return [ex]
def get_extended_resources(self, version):
- return {}
+ if version == "2.0":
+ return RESOURCE_ATTRIBUTE_MAP
+ else:
+ return {}
class AgentPluginBase(object):
'ports': {
VIF_TYPE: {'allow_post': False, 'allow_put': False,
'default': attributes.ATTR_NOT_SPECIFIED,
+ 'enforce_policy': True,
'is_visible': True},
HOST_ID: {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'enforce_policy': True},
PROFILE: {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
+ 'enforce_policy': True,
'validate': {'type:dict': None},
'is_visible': True},
CAPABILITIES: {'allow_post': False, 'allow_put': False,
'default': attributes.ATTR_NOT_SPECIFIED,
+ 'enforce_policy': True,
'is_visible': True},
}
}
'networks': {
PORTSECURITY: {'allow_post': True, 'allow_put': True,
'convert_to': attributes.convert_to_boolean,
+ 'enforce_policy': True,
'default': True,
'is_visible': True},
},
PORTSECURITY: {'allow_post': True, 'allow_put': True,
'convert_to': attributes.convert_to_boolean,
'default': attributes.ATTR_NOT_SPECIFIED,
+ 'enforce_policy': True,
'is_visible': True},
}
}
def get_extended_resources(self, version):
if version == "2.0":
- return EXTENDED_ATTRIBUTES_2_0
+ return dict(EXTENDED_ATTRIBUTES_2_0.items() +
+ RESOURCE_ATTRIBUTE_MAP.items())
else:
return {}
LOG = logging.getLogger(__name__)
-RESOURCE_NAME = "service-type"
+RESOURCE_NAME = "service_type"
COLLECTION_NAME = "%ss" % RESOURCE_NAME
SERVICE_ATTR = 'service_class'
PLUGIN_ATTR = 'plugin'
DRIVER_ATTR = 'driver'
-EXT_ALIAS = RESOURCE_NAME
+EXT_ALIAS = 'service-type'
# Attribute Map for Service Type Resource
RESOURCE_ATTRIBUTE_MAP = {
@classmethod
def get_resources(cls):
"""Returns Extended Resource for service type management."""
- my_plurals = [(key.replace('-', '_'),
- key[:-1].replace('-', '_')) for
- key in RESOURCE_ATTRIBUTE_MAP.keys()]
+ my_plurals = [(key, key[:-1]) for key in RESOURCE_ATTRIBUTE_MAP.keys()]
my_plurals.append(('service_definitions', 'service_definition'))
attributes.PLURALS.update(dict(my_plurals))
attr_map = RESOURCE_ATTRIBUTE_MAP[COLLECTION_NAME]
+ collection_name = COLLECTION_NAME.replace('_', '-')
controller = base.create_resource(
- COLLECTION_NAME,
+ collection_name,
RESOURCE_NAME,
servicetype_db.ServiceTypeManager.get_instance(),
attr_map)
- return [extensions.ResourceExtension(COLLECTION_NAME,
+ return [extensions.ResourceExtension(collection_name,
controller,
attr_map=attr_map)]
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.plugins.bigswitch.version import version_string_with_vcs
-from quantum import policy
LOG = logging.getLogger(__name__)
supported_extension_aliases = ["router", "binding"]
- binding_view = "extension:port_binding:view"
- binding_set = "extension:port_binding:set"
-
def __init__(self):
LOG.info(_('QuantumRestProxy: Starting plugin. Version=%s'),
version_string_with_vcs())
return data
- def _check_view_auth(self, context, resource, action):
- return policy.check(context, action, resource)
-
def _extend_port_dict_binding(self, context, port):
- if self._check_view_auth(context, port, self.binding_view):
- port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
- port[portbindings.CAPABILITIES] = {
- portbindings.CAP_PORT_FILTER:
- 'security-group' in self.supported_extension_aliases}
+ port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
+ port[portbindings.CAPABILITIES] = {
+ portbindings.CAP_PORT_FILTER:
+ 'security-group' in self.supported_extension_aliases}
return port
from quantum.openstack.common.rpc import proxy
from quantum.plugins.brocade.db import models as brocade_db
from quantum.plugins.brocade import vlanbm as vbm
-from quantum import policy
from quantum import scheduler
self.supported_extension_aliases = ["binding", "security-group",
"agent", "agent_scheduler"]
- self.binding_view = "extension:port_binding:view"
- self.binding_set = "extension:port_binding:set"
self.physical_interface = (cfg.CONF.PHYSICAL_INTERFACE.
physical_interface)
bport.vlan_id)
def _extend_port_dict_binding(self, context, port):
- if self._check_view_auth(context, port, self.binding_view):
- port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_BRIDGE
- port[portbindings.CAPABILITIES] = {
- portbindings.CAP_PORT_FILTER:
- 'security-group' in self.supported_extension_aliases}
+ port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_BRIDGE
+ port[portbindings.CAPABILITIES] = {
+ portbindings.CAP_PORT_FILTER:
+ 'security-group' in self.supported_extension_aliases}
return port
- def _check_view_auth(self, context, resource, action):
- return policy.check(context, action, resource)
-
def get_plugin_version(self):
"""Get version number of the plugin."""
return PLUGIN_VERSION
from quantum.plugins.hyperv.common import constants
from quantum.plugins.hyperv import db as hyperv_db
from quantum.plugins.hyperv import rpc_callbacks
-from quantum import policy
DEFAULT_VLAN_RANGES = []
__native_bulk_support = True
supported_extension_aliases = ["provider", "router", "binding", "quotas"]
- network_view = "extension:provider_network:view"
- network_set = "extension:provider_network:set"
- binding_view = "extension:port_binding:view"
- binding_set = "extension:port_binding:set"
-
def __init__(self, configfile=None):
self._db = hyperv_db.HyperVPluginDB()
self._db.initialize()
# Consume from all consumers in a thread
self.conn.consume_in_thread()
- def _check_view_auth(self, context, resource, action):
- return policy.check(context, action, resource)
-
def _parse_network_vlan_ranges(self):
self._network_vlan_ranges = plugin_utils.parse_network_vlan_ranges(
cfg.CONF.HYPERV.network_vlan_ranges)
return net
def _extend_network_dict_provider(self, context, network):
- if self._check_view_auth(context, network, self.network_view):
- binding = self._db.get_network_binding(
- context.session, network['id'])
- network[provider.NETWORK_TYPE] = binding.network_type
- p = self._network_providers_map[binding.network_type]
- p.extend_network_dict(network, binding)
+ binding = self._db.get_network_binding(
+ context.session, network['id'])
+ network[provider.NETWORK_TYPE] = binding.network_type
+ p = self._network_providers_map[binding.network_type]
+ p.extend_network_dict(network, binding)
def _check_provider_update(self, context, attrs):
network_type = attrs.get(provider.NETWORK_TYPE)
return [self._fields(net, fields) for net in nets]
def _extend_port_dict_binding(self, context, port):
- if self._check_view_auth(context, port, self.binding_view):
- port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_HYPERV
+ port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_HYPERV
return port
def create_port(self, context, port):
from quantum.plugins.common import utils as plugin_utils
from quantum.plugins.linuxbridge.common import constants
from quantum.plugins.linuxbridge.db import l2network_db_v2 as db
-from quantum import policy
LOG = logging.getLogger(__name__)
self._aliases = aliases
return self._aliases
- network_view = "extension:provider_network:view"
- network_set = "extension:provider_network:set"
-
def __init__(self):
self.extra_binding_dict = {
portbindings.VIF_TYPE: portbindings.VIF_TYPE_BRIDGE,
sys.exit(1)
LOG.info(_("Network VLAN ranges: %s"), self.network_vlan_ranges)
- def _check_view_auth(self, context, resource, action):
- return policy.check(context, action, resource)
+ def _add_network_vlan_range(self, physical_network, vlan_min, vlan_max):
+ self._add_network(physical_network)
+ self.network_vlan_ranges[physical_network].append((vlan_min, vlan_max))
- # REVISIT(rkukura) Use core mechanism for attribute authorization
- # when available.
+ def _add_network(self, physical_network):
+ if physical_network not in self.network_vlan_ranges:
+ self.network_vlan_ranges[physical_network] = []
def _extend_network_dict_provider(self, context, network):
- if self._check_view_auth(context, network, self.network_view):
- binding = db.get_network_binding(context.session, network['id'])
- if binding.vlan_id == constants.FLAT_VLAN_ID:
- network[provider.NETWORK_TYPE] = constants.TYPE_FLAT
- network[provider.PHYSICAL_NETWORK] = binding.physical_network
- network[provider.SEGMENTATION_ID] = None
- elif binding.vlan_id == constants.LOCAL_VLAN_ID:
- network[provider.NETWORK_TYPE] = constants.TYPE_LOCAL
- network[provider.PHYSICAL_NETWORK] = None
- network[provider.SEGMENTATION_ID] = None
- else:
- network[provider.NETWORK_TYPE] = constants.TYPE_VLAN
- network[provider.PHYSICAL_NETWORK] = binding.physical_network
- network[provider.SEGMENTATION_ID] = binding.vlan_id
+ binding = db.get_network_binding(context.session, network['id'])
+ if binding.vlan_id == constants.FLAT_VLAN_ID:
+ network[provider.NETWORK_TYPE] = constants.TYPE_FLAT
+ network[provider.PHYSICAL_NETWORK] = binding.physical_network
+ network[provider.SEGMENTATION_ID] = None
+ elif binding.vlan_id == constants.LOCAL_VLAN_ID:
+ network[provider.NETWORK_TYPE] = constants.TYPE_LOCAL
+ network[provider.PHYSICAL_NETWORK] = None
+ network[provider.SEGMENTATION_ID] = None
+ else:
+ network[provider.NETWORK_TYPE] = constants.TYPE_VLAN
+ network[provider.PHYSICAL_NETWORK] = binding.physical_network
+ network[provider.SEGMENTATION_ID] = binding.vlan_id
def _process_provider_create(self, context, attrs):
network_type = attrs.get(provider.NETWORK_TYPE)
return [self._fields(net, fields) for net in nets]
- def get_port(self, context, id, fields=None):
- with context.session.begin(subtransactions=True):
- port = super(LinuxBridgePluginV2, self).get_port(context,
- id,
- fields)
- return self._check_portbindings_view_auth(context, port)
-
- def get_ports(self, context, filters=None, fields=None,
- sorts=None, limit=None, marker=None, page_reverse=False):
- res_ports = []
- with context.session.begin(subtransactions=True):
- ports = super(LinuxBridgePluginV2,
- self).get_ports(context, filters, fields, sorts,
- limit, marker, page_reverse)
- for port in ports:
- self._check_portbindings_view_auth(context, port)
- res_ports.append(port)
- return res_ports
-
def create_port(self, context, port):
session = context.session
port_data = port['port']
self._process_port_create_security_group(
context, port, sgids)
self.notify_security_groups_member_updated(context, port)
- return self._check_portbindings_view_auth(context, port)
+ return port
def update_port(self, context, id, port):
original_port = self.get_port(context, id)
if need_port_update_notify:
self._notify_port_updated(context, updated_port)
- return self._check_portbindings_view_auth(context, updated_port)
+ return updated_port
def delete_port(self, context, id, l3_port_check=True):
from quantum.plugins.mlnx.common import constants
from quantum.plugins.mlnx.db import mlnx_db_v2 as db
from quantum.plugins.mlnx import rpc_callbacks
-from quantum import policy
LOG = logging.getLogger(__name__)
sys.exit(1)
LOG.info(_("Network VLAN ranges: %s"), self.network_vlan_ranges)
- def _check_view_auth(self, context, resource, action):
- return policy.check(context, action, resource)
-
- def _enforce_set_auth(self, context, resource, action):
- policy.enforce(context, action, resource)
-
def _add_network_vlan_range(self, physical_network, vlan_min, vlan_max):
self._add_network(physical_network)
self.network_vlan_ranges[physical_network].append((vlan_min, vlan_max))
self.network_vlan_ranges[physical_network] = []
def _extend_network_dict_provider(self, context, network):
- if self._check_view_auth(context, network, self.network_view):
- binding = db.get_network_binding(context.session, network['id'])
- network[provider.NETWORK_TYPE] = binding.network_type
- if binding.network_type == constants.TYPE_FLAT:
- network[provider.PHYSICAL_NETWORK] = binding.physical_network
- network[provider.SEGMENTATION_ID] = None
- elif binding.network_type == constants.TYPE_LOCAL:
- network[provider.PHYSICAL_NETWORK] = None
- network[provider.SEGMENTATION_ID] = None
- else:
- network[provider.PHYSICAL_NETWORK] = binding.physical_network
- network[provider.SEGMENTATION_ID] = binding.segmentation_id
+ binding = db.get_network_binding(context.session, network['id'])
+ network[provider.NETWORK_TYPE] = binding.network_type
+ if binding.network_type == constants.TYPE_FLAT:
+ network[provider.PHYSICAL_NETWORK] = binding.physical_network
+ network[provider.SEGMENTATION_ID] = None
+ elif binding.network_type == constants.TYPE_LOCAL:
+ network[provider.PHYSICAL_NETWORK] = None
+ network[provider.SEGMENTATION_ID] = None
+ else:
+ network[provider.PHYSICAL_NETWORK] = binding.physical_network
+ network[provider.SEGMENTATION_ID] = binding.segmentation_id
def _set_tenant_network_type(self):
self.tenant_network_type = cfg.CONF.MLNX.tenant_network_type
if not (network_type_set or physical_network_set or
segmentation_id_set):
return (None, None, None)
- # Authorize before exposing plugin details to client
- self._enforce_set_auth(context, attrs, self.network_set)
if not network_type_set:
msg = _("provider:network_type required")
if not (network_type_set or physical_network_set or
segmentation_id_set):
return
- # Authorize before exposing plugin details to client
- self._enforce_set_auth(context, attrs, self.network_set)
msg = _("Plugin does not support updating provider attributes")
raise q_exc.InvalidInput(error_message=msg)
return [self._fields(net, fields) for net in nets]
def _extend_port_dict_binding(self, context, port):
- if self._check_view_auth(context, port, self.binding_view):
- port_binding = db.get_port_profile_binding(context.session,
- port['id'])
- if port_binding:
- port[portbindings.VIF_TYPE] = port_binding.vnic_type
- port[portbindings.CAPABILITIES] = {
- portbindings.CAP_PORT_FILTER:
- 'security-group' in self.supported_extension_aliases}
- binding = db.get_network_binding(context.session,
- port['network_id'])
- fabric = binding.physical_network
- port[portbindings.PROFILE] = {'physical_network': fabric}
+ port_binding = db.get_port_profile_binding(context.session,
+ port['id'])
+ if port_binding:
+ port[portbindings.VIF_TYPE] = port_binding.vnic_type
+ port[portbindings.CAPABILITIES] = {
+ portbindings.CAP_PORT_FILTER:
+ 'security-group' in self.supported_extension_aliases}
+ binding = db.get_network_binding(context.session,
+ port['network_id'])
+ fabric = binding.physical_network
+ port[portbindings.PROFILE] = {'physical_network': fabric}
return port
def create_port(self, context, port):
from quantum.plugins.nec.db import api as ndb
from quantum.plugins.nec.db import nec_plugin_base
from quantum.plugins.nec import ofc_manager
-from quantum import policy
LOG = logging.getLogger(__name__)
self._aliases = aliases
return self._aliases
- binding_view = "extension:port_binding:view"
- binding_set = "extension:port_binding:set"
-
def __init__(self):
ndb.initialize()
self.ofc = ofc_manager.OFCManager()
# Consume from all consumers in a thread
self.conn.consume_in_thread()
- def _check_view_auth(self, context, resource, action):
- return policy.check(context, action, resource)
-
def _update_resource_status(self, context, resource, id, status):
"""Update status of specified resource."""
request = {}
return [self._fields(net, fields) for net in nets]
def _extend_port_dict_binding(self, context, port):
- if self._check_view_auth(context, port, self.binding_view):
- port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
- port[portbindings.CAPABILITIES] = {
- portbindings.CAP_PORT_FILTER:
- 'security-group' in self.supported_extension_aliases}
+ port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
+ port[portbindings.CAPABILITIES] = {
+ portbindings.CAP_PORT_FILTER:
+ 'security-group' in self.supported_extension_aliases}
return port
def create_port(self, context, port):
from quantum.plugins.nicira.nvp_plugin_version import PLUGIN_VERSION
from quantum.plugins.nicira import NvpApiClient
from quantum.plugins.nicira import nvplib
-from quantum import policy
+
LOG = logging.getLogger("QuantumPlugin")
NVP_NOSNAT_RULES_ORDER = 10
# Map nova zones to cluster for easy retrieval
novazone_cluster_map = {}
- provider_network_view = "extension:provider_network:view"
port_security_enabled_update = "update_port:port_security_enabled"
def __init__(self, loglevel=None):
nvp_exc.NvpNoMorePortsException:
webob.exc.HTTPBadRequest})
- def _check_view_auth(self, context, resource, action):
- return policy.check(context, action, resource)
-
def _handle_provider_create(self, context, attrs):
# NOTE(salvatore-orlando): This method has been borrowed from
# the OpenvSwtich plugin, altough changed to match NVP specifics.
# which should be specified in physical_network
def _extend_network_dict_provider(self, context, network, binding=None):
- if self._check_view_auth(context, network, self.provider_network_view):
- if not binding:
- binding = nicira_db.get_network_binding(context.session,
- network['id'])
- # With NVP plugin 'normal' overlay networks will have no binding
- # TODO(salvatore-orlando) make sure users can specify a distinct
- # phy_uuid as 'provider network' for STT net type
- if binding:
- network[pnet.NETWORK_TYPE] = binding.binding_type
- network[pnet.PHYSICAL_NETWORK] = binding.phy_uuid
- network[pnet.SEGMENTATION_ID] = binding.vlan_id
+ if not binding:
+ binding = nicira_db.get_network_binding(context.session,
+ network['id'])
+ # With NVP plugin 'normal' overlay networks will have no binding
+ # TODO(salvatore-orlando) make sure users can specify a distinct
+ # phy_uuid as 'provider network' for STT net type
+ if binding:
+ network[pnet.NETWORK_TYPE] = binding.binding_type
+ network[pnet.PHYSICAL_NETWORK] = binding.phy_uuid
+ network[pnet.SEGMENTATION_ID] = binding.vlan_id
def _handle_lswitch_selection(self, cluster, network,
network_binding, max_ports,
return
nvplib.delete_lqueue(self.cluster, id)
return super(NvpPluginV2, self).delete_qos_queue(context, id)
-
- def get_qos_queue(self, context, id, fields=None):
- if not self._check_view_auth(context, {}, ext_qos.qos_queue_get):
- # don't want the user to find out that they guessed the right id
- # so we raise not found if the policy.json file doesn't allow them
- raise ext_qos.QueueNotFound(id=id)
-
- return super(NvpPluginV2, self).get_qos_queue(context, id, fields)
-
- def get_qos_queues(self, context, filters=None, fields=None):
- if not self._check_view_auth(context, {'qos_queue': []},
- ext_qos.qos_queue_list):
- return []
- return super(NvpPluginV2, self).get_qos_queues(context, filters,
- fields)
from quantum import quota
-RESOURCE_NAME = "network-gateway"
+RESOURCE_NAME = "network_gateway"
COLLECTION_NAME = "%ss" % RESOURCE_NAME
EXT_ALIAS = RESOURCE_NAME
DEVICE_ID_ATTR = 'id'
# register quotas for network gateways
quota.QUOTAS.register_resource_by_name(RESOURCE_NAME)
-
- controller = base.create_resource(COLLECTION_NAME,
+ collection_name = COLLECTION_NAME.replace('_', '-')
+ controller = base.create_resource(collection_name,
RESOURCE_NAME,
plugin, params,
member_actions=member_actions)
controller,
member_actions=member_actions)]
+ def get_extended_resources(self, version):
+ if version == "2.0":
+ return RESOURCE_ATTRIBUTE_MAP
+ else:
+ return {}
+
class NetworkGatewayPluginBase(object):
'allow_put': False,
'is_visible': False,
'default': 1,
+ 'enforce_policy': True,
'convert_to': convert_to_unsigned_int_or_none},
QUEUE: {'allow_post': False,
'allow_put': False,
'is_visible': True,
- 'default': False}},
+ 'default': False,
+ 'enforce_policy': True}},
'networks': {QUEUE: {'allow_post': True,
'allow_put': True,
'is_visible': True,
- 'default': False}}
+ 'default': False,
+ 'enforce_policy': True}}
}
context.session.delete(binding)
def _extend_port_qos_queue(self, context, port):
- if self._check_view_auth(context, {'qos_queue': None},
- ext_qos.qos_queue_get):
- filters = {'port_id': [port['id']]}
- fields = ['queue_id']
- port[ext_qos.QUEUE] = None
- queue_id = self._get_port_queue_bindings(
- context, filters, fields)
- if queue_id:
- port[ext_qos.QUEUE] = queue_id[0]['queue_id']
+ filters = {'port_id': [port['id']]}
+ fields = ['queue_id']
+ port[ext_qos.QUEUE] = None
+ queue_id = self._get_port_queue_bindings(
+ context, filters, fields)
+ if queue_id:
+ port[ext_qos.QUEUE] = queue_id[0]['queue_id']
return port
def _extend_network_qos_queue(self, context, network):
- if self._check_view_auth(context, {'qos_queue': None},
- ext_qos.qos_queue_get):
- filters = {'network_id': [network['id']]}
- fields = ['queue_id']
- network[ext_qos.QUEUE] = None
- queue_id = self._get_network_queue_bindings(
- context, filters, fields)
- if queue_id:
- network[ext_qos.QUEUE] = queue_id[0]['queue_id']
+ filters = {'network_id': [network['id']]}
+ fields = ['queue_id']
+ network[ext_qos.QUEUE] = None
+ queue_id = self._get_network_queue_bindings(
+ context, filters, fields)
+ if queue_id:
+ network[ext_qos.QUEUE] = queue_id[0]['queue_id']
return network
def _make_qos_queue_dict(self, queue, fields=None):
from quantum.plugins.openvswitch.common import config # noqa
from quantum.plugins.openvswitch.common import constants
from quantum.plugins.openvswitch import ovs_db_v2
-from quantum import policy
LOG = logging.getLogger(__name__)
self._aliases = aliases
return self._aliases
- network_view = "extension:provider_network:view"
- network_set = "extension:provider_network:set"
-
def __init__(self, configfile=None):
self.extra_binding_dict = {
portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS,
sys.exit(1)
LOG.info(_("Tunnel ID ranges: %s"), self.tunnel_id_ranges)
- # TODO(rkukura) Use core mechanism for attribute authorization
- # when available.
-
- def _check_view_auth(self, context, resource, action):
- return policy.check(context, action, resource)
-
def _extend_network_dict_provider(self, context, network):
- if self._check_view_auth(context, network, self.network_view):
- binding = ovs_db_v2.get_network_binding(context.session,
- network['id'])
- network[provider.NETWORK_TYPE] = binding.network_type
- if binding.network_type == constants.TYPE_GRE:
- network[provider.PHYSICAL_NETWORK] = None
- network[provider.SEGMENTATION_ID] = binding.segmentation_id
- elif binding.network_type == constants.TYPE_FLAT:
- network[provider.PHYSICAL_NETWORK] = binding.physical_network
- network[provider.SEGMENTATION_ID] = None
- elif binding.network_type == constants.TYPE_VLAN:
- network[provider.PHYSICAL_NETWORK] = binding.physical_network
- network[provider.SEGMENTATION_ID] = binding.segmentation_id
- elif binding.network_type == constants.TYPE_LOCAL:
- network[provider.PHYSICAL_NETWORK] = None
- network[provider.SEGMENTATION_ID] = None
+ binding = ovs_db_v2.get_network_binding(context.session,
+ network['id'])
+ network[provider.NETWORK_TYPE] = binding.network_type
+ if binding.network_type == constants.TYPE_GRE:
+ network[provider.PHYSICAL_NETWORK] = None
+ network[provider.SEGMENTATION_ID] = binding.segmentation_id
+ elif binding.network_type == constants.TYPE_FLAT:
+ network[provider.PHYSICAL_NETWORK] = binding.physical_network
+ network[provider.SEGMENTATION_ID] = None
+ elif binding.network_type == constants.TYPE_VLAN:
+ network[provider.PHYSICAL_NETWORK] = binding.physical_network
+ network[provider.SEGMENTATION_ID] = binding.segmentation_id
+ elif binding.network_type == constants.TYPE_LOCAL:
+ network[provider.PHYSICAL_NETWORK] = None
+ network[provider.SEGMENTATION_ID] = None
def _process_provider_create(self, context, attrs):
network_type = attrs.get(provider.NETWORK_TYPE)
port_data, port)
self._process_port_create_security_group(context, port, sgids)
self.notify_security_groups_member_updated(context, port)
- return self._check_portbindings_view_auth(context, port)
-
- def get_port(self, context, id, fields=None):
- with context.session.begin(subtransactions=True):
- port = super(OVSQuantumPluginV2, self).get_port(context,
- id,
- fields)
- return self._check_portbindings_view_auth(context, port)
-
- def get_ports(self, context, filters=None, fields=None,
- sorts=None, limit=None, marker=None, page_reverse=False):
- res_ports = []
- with context.session.begin(subtransactions=True):
- ports = super(OVSQuantumPluginV2,
- self).get_ports(context, filters, fields, sorts,
- limit, marker, page_reverse)
- for port in ports:
- self._check_portbindings_view_auth(context, port)
- res_ports.append(port)
- return res_ports
+ return port
def update_port(self, context, id, port):
session = context.session
binding.network_type,
binding.segmentation_id,
binding.physical_network)
- return self._check_portbindings_view_auth(context, updated_port)
+ return updated_port
def delete_port(self, context, id, l3_port_check=True):
if not _POLICY_PATH:
_POLICY_PATH = utils.find_config_file({}, cfg.CONF.policy_file)
if not _POLICY_PATH:
- raise exceptions.PolicyNotFound(path=cfg.CONF.policy_file)
+ raise exceptions.PolicyFileNotFound(path=cfg.CONF.policy_file)
# pass _set_brain to read_cached_file so that the policy brain
# is reset only if the file has changed
utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
def _set_rules(data):
default_rule = 'default'
LOG.debug(_("loading policies from file: %s"), _POLICY_PATH)
+ # TODO(salvatore-orlando): Ensure backward compatibility with
+ # folsom/grizzly style for extension rules (bp/make-authz-orthogonal)
policy.set_rules(policy.Rules.load_json(data, default_rule))
match_rule = policy.RuleCheck('rule', action)
resource, is_write = get_resource_and_action(action)
+ # Attribute-based checks shall not be enforced on GETs
if is_write:
# assigning to variable with short name for improving readability
res_map = attributes.RESOURCE_ATTRIBUTE_MAP
return target_value == self.value
+def _prepare_check(context, action, target, plugin=None):
+ """Prepare rule, target, and credentials for the policy engine."""
+ init()
+ # Compare with None to distinguish case in which target is {}
+ if target is None:
+ target = {}
+ # Update target only if plugin is provided
+ if plugin:
+ target = _build_target(action, target, plugin, context)
+ match_rule = _build_match_rule(action, target)
+ credentials = context.to_dict()
+ return match_rule, target, credentials
+
+
def check(context, action, target, plugin=None):
"""Verifies that the action is valid on the target in this context.
:return: Returns True if access is permitted else False.
"""
- init()
- # Compare with None to distinguish case in which target is {}
- if target is None:
- target = {}
- real_target = _build_target(action, target, plugin, context)
- match_rule = _build_match_rule(action, real_target)
- credentials = context.to_dict()
- return policy.check(match_rule, real_target, credentials)
+ return policy.check(*(_prepare_check(context, action, target, plugin)))
+
+
+def check_if_exists(context, action, target):
+ """Verify if the action can be authorized, and raise if it is unknown.
+
+ Check whether the action can be performed on the target within this
+ context, and raise a PolicyRuleNotFound exception if the action is
+ not defined in the policy engine.
+ """
+ # TODO(salvatore-orlando): Consider modifying oslo policy engine in
+ # order to allow to raise distinct exception when check fails and
+ # when policy is missing
+ # Raise if there's no match for requested action in the policy engine
+ if not policy._rules or action not in policy._rules:
+ raise exceptions.PolicyRuleNotFound(rule=action)
+ return policy.check(*(_prepare_check(context, action, target)))
def enforce(context, action, target, plugin=None):
class TestHyperVVirtualSwitchPortsV2(
test_plugin.TestPortsV2, HyperVQuantumPluginTestCase):
def test_port_vif_details(self):
- plugin = QuantumManager.get_plugin()
with self.port(name='name') as port:
- port_id = port['port']['id']
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_HYPERV)
- # By default user is admin - now test non admin user
- ctx = context.Context(user_id=None,
- tenant_id=self._tenant_id,
- is_admin=False,
- read_deleted="no")
- non_admin_port = plugin.get_port(ctx, port_id)
- self.assertTrue('status' in non_admin_port)
- self.assertFalse('binding:vif_type' in non_admin_port)
def test_ports_vif_details(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
for port in ports:
self.assertEqual(port['binding:vif_type'],
portbindings.VIF_TYPE_HYPERV)
- # By default user is admin - now test non admin user
- ctx = context.Context(user_id=None,
- tenant_id=self._tenant_id,
- is_admin=False,
- read_deleted="no")
- ports = plugin.get_ports(ctx)
- self.assertEqual(len(ports), 2)
- for non_admin_port in ports:
- self.assertTrue('status' in non_admin_port)
- self.assertFalse('binding:vif_type' in non_admin_port)
class TestHyperVVirtualSwitchNetworksV2(
# See the License for the specific language governing permissions and
# limitations under the License.
-from quantum import context
-from quantum.manager import QuantumManager
from quantum.plugins.mlnx.common import constants
+from quantum.tests.unit import _test_extension_portbindings as test_bindings
from quantum.tests.unit import test_db_plugin as test_plugin
PLUGIN_NAME = ('quantum.plugins.mlnx.mlnx_plugin.MellanoxEswitchPlugin')
class TestMlnxPortsV2(test_plugin.TestPortsV2,
MlnxPluginV2TestCase):
- VIF_TYPE = constants.VIF_TYPE_DIRECT
- HAS_PORT_FILTER = False
-
- def test_port_vif_details(self):
- plugin = QuantumManager.get_plugin()
- with self.port(name='name') as port:
- port_id = port['port']['id']
- self.assertEqual(port['port']['binding:vif_type'],
- self.VIF_TYPE)
- # By default user is admin - now test non admin user
- ctx = context.Context(user_id=None,
- tenant_id=self._tenant_id,
- is_admin=False,
- read_deleted="no")
- non_admin_port = plugin.get_port(ctx, port_id)
- self.assertIn('status', non_admin_port)
- self.assertNotIn('binding:vif_type', non_admin_port)
+ pass
class TestMlnxNetworksV2(test_plugin.TestNetworksV2, MlnxPluginV2TestCase):
pass
+
+
+class TestMlnxPortBinding(MlnxPluginV2TestCase,
+ test_bindings.PortBindingsTestCase):
+ VIF_TYPE = constants.VIF_TYPE_DIRECT
+ HAS_PORT_FILTER = False
from quantum.api import extensions
from quantum.api.extensions import PluginAwareExtensionManager
+from quantum.api.v2 import attributes
from quantum.common import config
from quantum.common.test_lib import test_config
from quantum import context
class TestExtensionManager(object):
def get_resources(self):
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ networkgw.RESOURCE_ATTRIBUTE_MAP)
return networkgw.Nvp_networkgw.get_resources()
def get_actions(self):
quantum_context = context.Context('', 'not_admin')
port = self._update('ports', port['port']['id'], data,
quantum_context=quantum_context)
- self.assertEqual(ext_qos.QUEUE not in port['port'], True)
+ self.assertFalse(ext_qos.QUEUE in port['port'])
def test_rxtx_factor(self):
with self.qos_queue(max=10) as q1:
from quantum.api import extensions
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
+from quantum.api.v2 import attributes
from quantum.common import constants
from quantum import context
from quantum.db import agents_db
from quantum.db import dhcp_rpc_base
from quantum.db import l3_rpc_base
+from quantum.extensions import agent
from quantum.extensions import agentscheduler
from quantum import manager
from quantum.openstack.common import timeutils
def _get_agent_id(self, agent_type, host):
agents = self._list_agents()
- for agent in agents['agents']:
- if (agent['agent_type'] == agent_type and
- agent['host'] == host):
- return agent['id']
+ for agent_data in agents['agents']:
+ if (agent_data['agent_type'] == agent_type and
+ agent_data['host'] == host):
+ return agent_data['id']
class OvsAgentSchedulerTestCase(test_l3_plugin.L3NatTestCaseMixin,
'ovs_quantum_plugin.OVSQuantumPluginV2')
def setUp(self):
+ # Save the global RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
super(OvsAgentSchedulerTestCase, self).setUp(self.plugin_str)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ agent.RESOURCE_ATTRIBUTE_MAP)
+ self.addCleanup(self.restore_attribute_map)
self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
+ def restore_attribute_map(self):
+ # Restore the original RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+
def test_report_states(self):
self._register_agent_states()
agents = self._list_agents()
'DhcpAgentNotifyAPI')
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
+ # Save the global RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
super(OvsDhcpAgentNotifierTestCase, self).setUp(self.plugin_str)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ agent.RESOURCE_ATTRIBUTE_MAP)
+ self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
self.addCleanup(self.dhcp_notifier_cls_p.stop)
+ self.addCleanup(self.restore_attribute_map)
+
+ def restore_attribute_map(self):
+ # Restore the original RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def test_network_add_to_dhcp_agent_notification(self):
with mock.patch.object(self.dhcp_notifier, 'cast') as mock_dhcp:
self.dhcp_notifier = mock.Mock(name='dhcp_notifier')
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
+ # Save the global RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
super(OvsL3AgentNotifierTestCase, self).setUp(self.plugin_str)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ agent.RESOURCE_ATTRIBUTE_MAP)
+ self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
self.addCleanup(self.dhcp_notifier_cls_p.stop)
+ self.addCleanup(self.restore_attribute_map)
+
+ def restore_attribute_map(self):
+ # Restore the original RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def test_router_add_to_l3_agent_notification(self):
plugin = manager.QuantumManager.get_plugin()
from oslo.config import cfg
from webob import exc
+from quantum.api.v2 import attributes
from quantum.common import constants
from quantum.common.test_lib import test_config
from quantum.common import topics
class AgentTestExtensionManager(object):
def get_resources(self):
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ agent.RESOURCE_ATTRIBUTE_MAP)
return agent.Agent.get_resources()
def get_actions(self):
'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
+ # Save the original RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
ext_mgr = AgentTestExtensionManager()
test_config['extension_manager'] = ext_mgr
+ self.addCleanup(self.restore_resource_attribute_map)
+ self.addCleanup(cfg.CONF.reset)
super(AgentDBTestCase, self).setUp()
+ def restore_resource_attribute_map(self):
+ # Restore the originak RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+
def test_create_agent(self):
data = {'agent': {}}
_req = self.new_create_request('agents', data, self.fmt)
from quantum import context
from quantum.manager import QuantumManager
from quantum.openstack.common.notifier import api as notifer_api
+from quantum.openstack.common import policy as common_policy
from quantum.openstack.common import uuidutils
from quantum.tests import base
from quantum.tests.unit import testlib_api
extra_environ=env,
expect_errors=expect_errors)
self.assertEqual(res.status_int, expected_code)
+ return res
def test_get_noauth(self):
self._test_get(None, _uuid(), 200)
tenant_id = _uuid()
self._test_get(tenant_id + "another", tenant_id, 200)
+ def test_get_keystone_strip_admin_only_attribute(self):
+ tenant_id = _uuid()
+ # Inject rule in policy engine
+ common_policy._rules['get_network:name'] = common_policy.parse_rule(
+ "rule:admin_only")
+ res = self._test_get(tenant_id, tenant_id, 200)
+ res = self.deserialize(res)
+ try:
+ self.assertNotIn('name', res['network'])
+ finally:
+ del common_policy._rules['get_network:name']
+
def _test_update(self, req_tenant_id, real_tenant_id, expected_code,
expect_errors=False):
env = {}
data['fake'] = 'value'
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[collection]
controller = v2_base.Controller(None, collection, resource, attr_info)
- res = controller._view(data)
+ res = controller._view(context.get_admin_context(), data)
self.assertTrue('fake' not in res)
for key in keys:
self.assertTrue(key in res)
import quantum
from quantum.api import extensions
+from quantum.api.v2 import attributes
from quantum.common import config
from quantum import manager
from quantum.plugins.common import constants
self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
+ # Save the global RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ extattr.EXTENDED_ATTRIBUTES_2_0)
+ self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
self.addCleanup(cfg.CONF.reset)
+ self.addCleanup(self.restore_attribute_map)
+
+ def restore_attribute_map(self):
+ # Restore the original RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def _do_request(self, method, path, data=None, params=None, action=None):
content_type = 'application/json'
req.environ['quantum.context'] = context.Context(
'', 'not_network_owner')
res = req.get_response(self.api)
- self.assertEqual(res.status_int, 403)
+ # TODO(salvatore-orlando): Expected error is 404 because
+ # the current API controller always returns this error
+ # code for any policy check failures on update.
+ # It should be 404 when the caller cannot access the whole
+ # resource, and 403 when it cannot access a single attribute
+ self.assertEqual(res.status_int, 404)
class SecurityGroupTestExtensionManager(object):
def get_resources(self):
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attr.RESOURCE_ATTRIBUTE_MAP.update(
+ ext_sg.RESOURCE_ATTRIBUTE_MAP)
return ext_sg.Securitygroup.get_resources()
def get_actions(self):
class L3TestExtensionManager(object):
def get_resources(self):
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ l3.RESOURCE_ATTRIBUTE_MAP)
return l3.L3.get_resources()
def get_actions(self):
import webtest
from quantum.api import extensions
+from quantum.api.v2 import attributes
from quantum.common import config
from quantum.extensions import loadbalancer
from quantum import manager
class LoadBalancerTestExtensionManager(object):
def get_resources(self):
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ loadbalancer.RESOURCE_ATTRIBUTE_MAP)
return loadbalancer.Loadbalancer.get_resources()
def get_actions(self):
result = policy.check(self.context, action, self.target)
self.assertEqual(result, False)
+ def test_check_if_exists_non_existent_action_raises(self):
+ action = "example:idonotexist"
+ self.assertRaises(exceptions.PolicyRuleNotFound,
+ policy.check_if_exists,
+ self.context, action, self.target)
+
def test_enforce_good_action(self):
action = "example:allowed"
result = policy.enforce(self.context, action, self.target)
import webtest
from quantum.api import extensions
+from quantum.api.v2 import attributes
from quantum import context
from quantum.db import api as db_api
from quantum.db import servicetype_db
"""Mock extensions manager."""
def get_resources(self):
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ servicetype.RESOURCE_ATTRIBUTE_MAP)
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(dp.RESOURCE_ATTRIBUTE_MAP)
return (servicetype.Servicetype.get_resources() +
dp.Dummy.get_resources())