self._collection = collection
self._resource = resource
self._attr_info = attr_info
+ self._policy_attrs = [name for (name, info) in self._attr_info.items()
+ if 'required_by_policy' in info
+ and info['required_by_policy']]
self._view = getattr(views, self._resource)
+ def _do_field_list(self, original_fields):
+ fields_to_add = None
+ # don't do anything if fields were not specified in the request
+ if original_fields:
+ fields_to_add = [attr for attr in self._policy_attrs
+ if attr not in original_fields]
+ original_fields.extend(self._policy_attrs)
+ return original_fields, fields_to_add
+
def _items(self, request, do_authz=False):
"""Retrieves and formats a list of elements of the requested entity"""
+ # NOTE(salvatore-orlando): The following ensures that fields which
+ # are needed for authZ policy validation are not stripped away by the
+ # plugin before returning.
+ original_fields, fields_to_add = self._do_field_list(fields(request))
kwargs = {'filters': filters(request),
'verbose': verbose(request),
- 'fields': fields(request)}
-
+ 'fields': original_fields}
obj_getter = getattr(self._plugin, "get_%s" % self._collection)
obj_list = obj_getter(request.context, **kwargs)
-
# Check authz
if do_authz:
# Omit items from list that should not be visible
"get_%s" % self._resource,
obj)]
- return {self._collection: [self._view(obj) for obj in obj_list]}
+ return {self._collection: [self._view(obj,
+ fields_to_strip=fields_to_add)
+ for obj in obj_list]}
def _item(self, request, id, do_authz=False):
"""Retrieves and formats a single element of the requested entity"""
+ # NOTE(salvatore-orlando): The following ensures that fields which
+ # are needed for authZ policy validation are not stripped away by the
+ # plugin before returning.
+ field_list, added_fields = self._do_field_list(fields(request))
kwargs = {'verbose': verbose(request),
- 'fields': fields(request)}
+ 'fields': field_list}
action = "get_%s" % self._resource
obj_getter = getattr(self._plugin, action)
obj = obj_getter(request.context, id, **kwargs)
if do_authz:
policy.enforce(request.context, action, obj)
- return {self._resource: self._view(obj)}
+ return {self._resource: self._view(obj, fields_to_strip=added_fields)}
def index(self, request):
"""Returns a list of the requested entity"""
# and the default gateway_ip will be generated.
# However, if gateway_ip is specified as None, this means that
# the subnet does not have a gateway IP.
+# Some of the following attributes are used by the policy engine.
+# They are explicitly marked with the required_by_policy flag to ensure
+# they are always returned by a plugin for policy processing, even if
+# they are not specified in the 'fields' query param
RESOURCE_ATTRIBUTE_MAP = {
'networks': {
'admin_state_up': {'allow_post': True, 'allow_put': True,
'default': True},
'status': {'allow_post': False, 'allow_put': False},
- 'tenant_id': {'allow_post': True, 'allow_put': False},
+ 'tenant_id': {'allow_post': True, 'allow_put': False,
+ 'required_by_policy': True},
},
'ports': {
'id': {'allow_post': False, 'allow_put': False},
'host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
'device_id': {'allow_post': True, 'allow_put': True, 'default': ''},
- 'tenant_id': {'allow_post': True, 'allow_put': False},
+ 'tenant_id': {'allow_post': True, 'allow_put': False,
+ 'required_by_policy': True},
},
'subnets': {
'id': {'allow_post': False, 'allow_put': False},
'default': ATTR_NOT_SPECIFIED},
'additional_host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
- 'tenant_id': {'allow_post': True, 'allow_put': False},
+ 'tenant_id': {'allow_post': True, 'allow_put': False,
+ 'required_by_policy': True},
}
}
# limitations under the License.
-def resource(data, keys):
+def resource(data, keys, fields_to_strip=None):
"""Formats the specified entity"""
- return dict(item for item in data.iteritems() if item[0] in keys)
+ # make sure fields_to_strip is iterable
+ if not fields_to_strip:
+ fields_to_strip = []
+ return dict(item for item in data.iteritems()
+ if item[0] in keys and not item[0] in fields_to_strip)
-def port(port_data):
+def port(port_data, fields_to_strip=None):
"""Represents a view for a port object"""
keys = ('id', 'network_id', 'mac_address', 'fixed_ips',
'device_id', 'admin_state_up', 'tenant_id', 'status')
- return resource(port_data, keys)
+ return resource(port_data, keys, fields_to_strip)
-def network(network_data):
+def network(network_data, fields_to_strip=None):
"""Represents a view for a network object"""
keys = ('id', 'name', 'subnets', 'admin_state_up', 'status',
'tenant_id', 'mac_ranges')
- return resource(network_data, keys)
+ return resource(network_data, keys, fields_to_strip)
-def subnet(subnet_data):
+def subnet(subnet_data, fields_to_strip=None):
"""Represents a view for a subnet object"""
keys = ('id', 'network_id', 'tenant_id', 'gateway_ip', 'ip_version',
'cidr', 'allocation_pools')
- return resource(subnet_data, keys)
+ return resource(subnet_data, keys, fields_to_strip)
:return: Returns True if access is permitted else False.
"""
-
init()
-
match_list = ('rule:%s' % action,)
credentials = context.to_dict()
-
return policy.enforce(match_list, target, credentials)
self.api.get(_get_path('networks'), {'fields': 'foo'})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
- fields=['foo'],
+ fields=['foo',
+ 'tenant_id'],
verbose=mock.ANY)
def test_fields_multiple(self):
self.api.get(_get_path('networks'), {'fields': ['foo', 'bar']})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
- fields=['foo', 'bar'],
+ fields=['foo', 'bar',
+ 'tenant_id'],
verbose=mock.ANY)
def test_fields_multiple_with_empty(self):
self.api.get(_get_path('networks'), {'fields': ['foo', '']})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
- fields=['foo'],
+ fields=['foo',
+ 'tenant_id'],
verbose=mock.ANY)
def test_fields_empty(self):
filters = {'foo': ['bar']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
- fields=['foo'],
+ fields=['foo',
+ 'tenant_id'],
verbose=mock.ANY)
def test_filters_with_verbose(self):
filters = {'foo': ['bar']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
- fields=['foo'],
+ fields=['foo',
+ 'tenant_id'],
verbose=True)
db._MAKER = None
cfg.CONF.reset()
- def _req(self, method, resource, data=None, fmt='json', id=None):
+ def _req(self, method, resource, data=None, fmt='json',
+ id=None, params=None):
if id:
path = '/%(resource)s/%(id)s.%(fmt)s' % locals()
else:
body = None
if data:
body = Serializer().serialize(data, content_type)
- return create_request(path, body, content_type, method)
+ return create_request(path,
+ body,
+ content_type,
+ method,
+ query_string=params)
def new_create_request(self, resource, data, fmt='json'):
return self._req('POST', resource, data, fmt)
- def new_list_request(self, resource, fmt='json'):
- return self._req('GET', resource, None, fmt)
+ def new_list_request(self, resource, fmt='json', params=None):
+ return self._req('GET', resource, None, fmt, params=params)
def new_show_request(self, resource, id, fmt='json'):
return self._req('GET', resource, None, fmt, id=id)
data = self._deserializers[ctype].deserialize(response.body)['body']
return data
- def _create_network(self, fmt, name, admin_status_up):
+ def _create_network(self, fmt, name, admin_status_up, **kwargs):
data = {'network': {'name': name,
'admin_state_up': admin_status_up,
'tenant_id': self._tenant_id}}
+ for arg in ('admin_state_up', 'tenant_id', 'public'):
+ # Arg must be present and not empty
+ if arg in kwargs and kwargs[arg]:
+ data['network'][arg] = kwargs[arg]
network_req = self.new_create_request('networks', data, fmt)
+ if ('set_context' in kwargs and
+ kwargs['set_context'] is True and
+ 'tenant_id' in kwargs):
+ # create a specific auth context for this request
+ network_req.environ['quantum.context'] = context.Context(
+ '', kwargs['tenant_id'])
+
return network_req.get_response(self.api)
def _create_subnet(self, fmt, tenant_id, net_id, gateway_ip, cidr,
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
+ def _check_list_with_fields(self, res, field_name):
+ self.assertEquals(res.status_int, 200)
+ body = self.deserialize('json', res)
+ # further checks: 1 networks
+ self.assertEquals(len(body['networks']), 1)
+ # 1 field in the network record
+ self.assertEquals(len(body['networks'][0]), 1)
+ # field is 'name'
+ self.assertIn(field_name, body['networks'][0])
+
+ def test_list_with_fields(self):
+ self._create_network('json', 'some_net', True)
+ req = self.new_list_request('networks', params="fields=name")
+ res = req.get_response(self.api)
+ self._check_list_with_fields(res, 'name')
+
+ def test_list_with_fields_noadmin(self):
+ tenant_id = 'some_tenant'
+ self._create_network('json',
+ 'some_net',
+ True,
+ tenant_id=tenant_id,
+ set_context=True)
+ req = self.new_list_request('networks', params="fields=name")
+ req.environ['quantum.context'] = context.Context('', tenant_id)
+ res = req.get_response(self.api)
+ self._check_list_with_fields(res, 'name')
+
+ def test_list_with_fields_noadmin_and_policy_field(self):
+ """ If a field used by policy is selected, do not duplicate it.
+
+ Verifies that if the field parameter explicitly specifies a field
+ which is used by the policy engine, then it is not duplicated
+ in the response.
+
+ """
+ tenant_id = 'some_tenant'
+ self._create_network('json',
+ 'some_net',
+ True,
+ tenant_id=tenant_id,
+ set_context=True)
+ req = self.new_list_request('networks', params="fields=tenant_id")
+ req.environ['quantum.context'] = context.Context('', tenant_id)
+ res = req.get_response(self.api)
+ self._check_list_with_fields(res, 'tenant_id')
+
def test_show_returns_200(self):
with self.network() as net:
req = self.new_show_request('networks', net['network']['id'])