msg = _("%s is not boolean") % data
raise q_exc.InvalidInput(error_message=msg)
+
+def convert_kvp_str_to_list(data):
+ """Convert a value of the form 'key=value' to ['key', 'value'].
+
+ :raises: q_exc.InvalidInput if any of the strings are malformed
+ (e.g. do not contain a key).
+ """
+ kvp = [x.strip() for x in data.split('=', 1)]
+ if len(kvp) == 2 and kvp[0]:
+ return kvp
+ msg = _("'%s' is not of the form <key>=[value]") % data
+ raise q_exc.InvalidInput(error_message=msg)
+
+
+def convert_kvp_list_to_dict(kvp_list):
+ """Convert a list of 'key=value' strings to a dict.
+
+ :raises: q_exc.InvalidInput if any of the strings are malformed
+ (e.g. do not contain a key) or if any
+ of the keys appear more than once.
+ """
+ if kvp_list == ['True']:
+ # No values were provided (i.e. '--flag-name')
+ return {}
+ kvp_map = {}
+ for kvp_str in kvp_list:
+ key, value = convert_kvp_str_to_list(kvp_str)
+ kvp_map.setdefault(key, set())
+ kvp_map[key].add(value)
+ return dict((x, list(y)) for x, y in kvp_map.iteritems())
+
+
HEX_ELEM = '[0-9A-Fa-f]'
UUID_PATTERN = '-'.join([HEX_ELEM + '{8}', HEX_ELEM + '{4}',
HEX_ELEM + '{4}', HEX_ELEM + '{4}',
'is_visible': True},
'fixed_ips': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED,
+ 'convert_list_to': convert_kvp_list_to_dict,
'enforce_policy': True,
'is_visible': True},
'device_id': {'allow_post': True, 'allow_put': True,
if key == 'fields':
continue
values = [v for v in request.GET.getall(key) if v]
- if not attr_info.get(key) and values:
+ key_attr_info = attr_info.get(key, {})
+ if not key_attr_info and values:
res[key] = values
continue
- result_values = []
- convert_to = (attr_info.get(key) and attr_info[key].get('convert_to')
- or None)
- for value in values:
+ convert_list_to = key_attr_info.get('convert_list_to')
+ if not convert_list_to:
+ convert_to = key_attr_info.get('convert_to')
if convert_to:
- try:
- result_values.append(convert_to(value))
- except exceptions.InvalidInput as e:
- raise webob.exc.HTTPBadRequest(str(e))
- else:
- result_values.append(value)
+ convert_list_to = lambda values_: [convert_to(x)
+ for x in values_]
+ if convert_list_to:
+ try:
+ result_values = convert_list_to(values)
+ except exceptions.InvalidInput as e:
+ raise webob.exc.HTTPBadRequest(str(e))
+ else:
+ result_values = values
if result_values:
res[key] = result_values
return res
if key in fields))
return resource
- def _get_collection(self, context, model, dict_func, filters=None,
- fields=None):
- collection = self._model_query(context, model)
+ def _apply_filters_to_query(self, query, model, filters):
if filters:
for key, value in filters.iteritems():
column = getattr(model, key, None)
if column:
- collection = collection.filter(column.in_(value))
+ query = query.filter(column.in_(value))
+ return query
+
+ def _get_collection(self, context, model, dict_func, filters=None,
+ fields=None):
+ collection = self._model_query(context, model)
+ collection = self._apply_filters_to_query(collection, model, filters)
return [dict_func(c, fields) for c in collection.all()]
@staticmethod
return self._make_port_dict(port, fields)
def get_ports(self, context, filters=None, fields=None):
- fixed_ips = filters.pop('fixed_ips', []) if filters else []
- ports = self._get_collection(context, models_v2.Port,
- self._make_port_dict,
- filters=filters, fields=fields)
-
- if ports and fixed_ips:
- filtered_ports = []
- for port in ports:
- if port['fixed_ips']:
- ips = port['fixed_ips']
- for fixed in fixed_ips:
- found = False
- # Convert to dictionary (deserialize)
- fixed = eval(fixed)
- for ip in ips:
- if 'ip_address' in fixed and 'subnet_id' in fixed:
- if (ip['ip_address'] == fixed['ip_address'] and
- ip['subnet_id'] == fixed['subnet_id']):
- found = True
- elif 'ip_address' in fixed:
- if ip['ip_address'] == fixed['ip_address']:
- found = True
- elif 'subnet_id' in fixed:
- if ip['subnet_id'] == fixed['subnet_id']:
- found = True
- if found:
- filtered_ports.append(port)
- break
- if found:
- break
- return filtered_ports
- return ports
+ Port = models_v2.Port
+ IPAllocation = models_v2.IPAllocation
+
+ if not filters:
+ filters = {}
+
+ query = self._model_query(context, Port)
+
+ fixed_ips = filters.pop('fixed_ips', {})
+ ip_addresses = fixed_ips.get('ip_address')
+ subnet_ids = fixed_ips.get('subnet_id')
+ if ip_addresses or subnet_ids:
+ query = query.join(Port.fixed_ips)
+ if ip_addresses:
+ query = query.filter(IPAllocation.ip_address.in_(ip_addresses))
+ if subnet_ids:
+ query = query.filter(IPAllocation.subnet_id.in_(subnet_ids))
+
+ query = self._apply_filters_to_query(query, Port, filters)
+ return [self._make_port_dict(c, fields) for c in query.all()]
filters=filters,
fields=fields)
+ def test_filters_with_convert_to(self):
+ instance = self.plugin.return_value
+ instance.get_ports.return_value = []
+
+ self.api.get(_get_path('ports'), {'admin_state_up': 'true'})
+ filters = {'admin_state_up': [True]}
+ instance.get_ports.assert_called_once_with(mock.ANY,
+ filters=filters,
+ fields=mock.ANY)
+
+ def test_filters_with_convert_list_to(self):
+ instance = self.plugin.return_value
+ instance.get_ports.return_value = []
+
+ self.api.get(_get_path('ports'),
+ {'fixed_ips': ['ip_address=foo', 'subnet_id=bar']})
+ filters = {'fixed_ips': {'ip_address': ['foo'], 'subnet_id': ['bar']}}
+ instance.get_ports.assert_called_once_with(mock.ANY,
+ filters=filters,
+ fields=mock.ANY)
+
# Note: since all resources use the same controller and validation
# logic, we actually get really good coverage from testing just networks.
import unittest2
from quantum.api.v2 import attributes
+from quantum.common import exceptions as q_exc
class TestAttributes(unittest2.TestCase):
attributes.MAC_PATTERN)
error = '%s is not valid' % base_mac
self.assertEquals(msg, error)
+
+
+class TestConvertKvp(unittest2.TestCase):
+
+ def test_convert_kvp_list_to_dict_succeeds_for_missing_values(self):
+ result = attributes.convert_kvp_list_to_dict(['True'])
+ self.assertEqual({}, result)
+
+ def test_convert_kvp_list_to_dict_succeeds_for_multiple_values(self):
+ result = attributes.convert_kvp_list_to_dict(
+ ['a=b', 'a=c', 'a=c', 'b=a'])
+ self.assertEqual({'a': ['c', 'b'], 'b': ['a']}, result)
+
+ def test_convert_kvp_list_to_dict_succeeds_for_values(self):
+ result = attributes.convert_kvp_list_to_dict(['a=b', 'c=d'])
+ self.assertEqual({'a': ['b'], 'c': ['d']}, result)
+
+ def test_convert_kvp_str_to_list_fails_for_missing_key(self):
+ with self.assertRaises(q_exc.InvalidInput):
+ attributes.convert_kvp_str_to_list('=a')
+
+ def test_convert_kvp_str_to_list_fails_for_missing_equals(self):
+ with self.assertRaises(q_exc.InvalidInput):
+ attributes.convert_kvp_str_to_list('a')
+
+ def test_convert_kvp_str_to_list_succeeds_for_one_equals(self):
+ result = attributes.convert_kvp_str_to_list('a=')
+ self.assertEqual(['a', ''], result)
+
+ def test_convert_kvp_str_to_list_succeeds_for_two_equals(self):
+ result = attributes.convert_kvp_str_to_list('a=a=a')
+ self.assertEqual(['a', 'a=a'], result)
self.assertTrue(port1['port']['id'] in ids)
self.assertTrue(port2['port']['id'] in ids)
+ def test_list_ports_filtered_by_fixed_ip(self):
+ with contextlib.nested(self.port(), self.port()) as (port1, port2):
+ fixed_ips = port1['port']['fixed_ips'][0]
+ query_params = """
+fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
+""".strip() % (fixed_ips['ip_address'],
+ '192.168.126.5',
+ fixed_ips['subnet_id'])
+ req = self.new_list_request('ports', 'json', query_params)
+ port_list = self.deserialize('json', req.get_response(self.api))
+ self.assertEqual(len(port_list['ports']), 1)
+ self.assertEqual(port_list['ports'][0]['id'], port1['port']['id'])
+
def test_list_ports_public_network(self):
with self.network(shared=True) as network:
portres_1 = self._create_port('json',