cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname()))
-def fields(request):
+def _fields(request):
"""
Extracts the list of fields to return
"""
return [v for v in request.GET.getall('fields') if v]
-def filters(request):
+def _filters(request, attr_info):
"""
Extracts the filters from the request string
for key in set(request.GET):
if key in ('verbose', 'fields'):
continue
-
values = [v for v in request.GET.getall(key) if v]
- if values:
+ if not attr_info.get(key) and values:
res[key] = values
+ continue
+ result_values = []
+ convert_to = (attr_info.get(key) and attr_info[key].get('convert_to')
+ or None)
+ for value in values:
+ if convert_to:
+ try:
+ result_values.append(convert_to(value))
+ except exceptions.InvalidInput as e:
+ raise webob.exc.HTTPUnprocessableEntity(str(e))
+ else:
+ result_values.append(value)
+ if result_values:
+ res[key] = result_values
return res
-def verbose(request):
+def _verbose(request):
"""
Determines the verbose fields for a request
# NOTE(salvatore-orlando): The following ensures that fields which
# are needed for authZ policy validation are not stripped away by the
# plugin before returning.
- original_fields, fields_to_add = self._do_field_list(fields(request))
- kwargs = {'filters': filters(request),
- 'verbose': verbose(request),
+ original_fields, fields_to_add = self._do_field_list(_fields(request))
+ kwargs = {'filters': _filters(request, self._attr_info),
+ 'verbose': _verbose(request),
'fields': original_fields}
obj_getter = getattr(self._plugin, "get_%s" % self._collection)
obj_list = obj_getter(request.context, **kwargs)
def _item(self, request, id, do_authz=False, field_list=None):
"""Retrieves and formats a single element of the requested entity"""
- kwargs = {'verbose': verbose(request),
+ kwargs = {'verbose': _verbose(request),
'fields': field_list}
action = "get_%s" % self._resource
obj_getter = getattr(self._plugin, action)
# NOTE(salvatore-orlando): The following ensures that fields
# which are needed for authZ policy validation are not stripped
# away by the plugin before returning.
- field_list, added_fields = self._do_field_list(fields(request))
+ field_list, added_fields = self._do_field_list(_fields(request))
return {self._resource:
self._view(self._item(request,
id,
self.assertEquals(res['networks'][1]['name'],
net2['network']['name'])
+ def test_list_networks_with_parameters(self):
+ with self.network(name='net1', admin_status_up=False) as net1:
+ with self.network(name='net2') as net2:
+ req = self.new_list_request('networks',
+ params='admin_state_up=False')
+ res = self.deserialize('json', req.get_response(self.api))
+ self.assertEquals(1, len(res['networks']))
+ self.assertEquals(res['networks'][0]['name'],
+ net1['network']['name'])
+ req = self.new_list_request('networks',
+ params='admin_state_up=true')
+ res = self.deserialize('json', req.get_response(self.api))
+ self.assertEquals(1, len(res['networks']))
+ self.assertEquals(res['networks'][0]['name'],
+ net2['network']['name'])
+
+ def test_list_networks_with_parameters_invalid_values(self):
+ with self.network(name='net1', admin_status_up=False) as net1:
+ with self.network(name='net2') as net2:
+ req = self.new_list_request('networks',
+ params='admin_state_up=fake')
+ res = req.get_response(self.api)
+ self.assertEquals(422, res.status_int)
+
def test_show_network(self):
with self.network(name='net1') as net:
req = self.new_show_request('networks', net['network']['id'])
self.assertEquals(res2['cidr'],
subnet2['subnet']['cidr'])
+ def test_list_subnets_with_parameter(self):
+ # NOTE(jkoelker) This would be a good place to use contextlib.nested
+ # or just drop 2.6 support ;)
+ with self.network() as network:
+ with self.subnet(network=network, gateway_ip='10.0.0.1',
+ cidr='10.0.0.0/24') as subnet:
+ with self.subnet(network=network, gateway_ip='10.0.1.1',
+ cidr='10.0.1.0/24') as subnet2:
+ req = self.new_list_request(
+ 'subnets',
+ params='ip_version=4&ip_version=6')
+ res = self.deserialize('json',
+ req.get_response(self.api))
+ self.assertEquals(2, len(res['subnets']))
+ req = self.new_list_request('subnets',
+ params='ip_version=6')
+ res = self.deserialize('json',
+ req.get_response(self.api))
+ self.assertEquals(0, len(res['subnets']))
+
def test_invalid_ip_version(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],