def _verify_dict_keys(expected_keys, target_dict):
if not isinstance(target_dict, dict):
- msg = _("Invalid input. %s must be a dictionary.") % target_dict
+ msg = (_("Invalid input. '%(target_dict)s' must be a dictionary "
+ "with keys: %(expected_keys)s") %
+ dict(target_dict=target_dict, expected_keys=expected_keys))
return msg
provided_keys = target_dict.keys()
if set(expected_keys) != set(provided_keys):
- msg = (_("Expected keys not found. Expected: %(expected_keys)s "
- "Provided: %(provided_keys)s") % locals())
+ msg = (_("Expected keys not found. Expected: '%(expected_keys)s', "
+ "Provided: '%(provided_keys)s'") %
+ dict(expected_keys=expected_keys, provided_keys=provided_keys))
return msg
def _validate_values(data, valid_values=None):
if data not in valid_values:
- msg = _("'%(data)s' is not in %(valid_values)s") % locals()
+ msg = (_("'%(data)s' is not in %(valid_values)s") %
+ dict(data=data, valid_values=valid_values))
LOG.debug(msg)
return msg
return msg
if max_len is not None and len(data) > max_len:
- msg = _("'%(data)s' exceeds maximum length of %(max_len)s") % locals()
+ msg = (_("'%(data)s' exceeds maximum length of %(max_len)s") %
+ dict(data=data, max_len=max_len))
LOG.debug(msg)
return msg
max_value = valid_values[1]
if not min_value <= data <= max_value:
msg = _("'%(data)s' is not in range %(min_value)s through "
- "%(max_value)s") % locals()
+ "%(max_value)s") % dict(data=data,
+ min_value=min_value,
+ max_value=max_value)
LOG.debug(msg)
return msg
"""
if not isinstance(data, list):
- msg = _("'%s' is not a valid IP pool") % data
+ msg = _("Invalid data format for IP pool: '%s'") % data
LOG.debug(msg)
return msg
def _validate_fixed_ips(data, valid_values=None):
if not isinstance(data, list):
- msg = _("'%s' is not a valid fixed IP") % data
+ msg = _("Invalid data format for fixed IP: '%s'") % data
LOG.debug(msg)
return msg
ips = []
for fixed_ip in data:
+ if not isinstance(fixed_ip, dict):
+ msg = _("Invalid data format for fixed IP: '%s'") % fixed_ip
+ LOG.debug(msg)
+ return msg
if 'ip_address' in fixed_ip:
# Ensure that duplicate entries are not set - just checking IP
# suffices. Duplicate subnet_id's are legitimate.
fixed_ip_address = fixed_ip['ip_address']
if fixed_ip_address in ips:
- msg = _("Duplicate entry %s") % fixed_ip
+ msg = _("Duplicate IP address '%s'") % fixed_ip_address
else:
msg = _validate_ip_address(fixed_ip_address)
if msg:
def _validate_nameservers(data, valid_values=None):
if not hasattr(data, '__iter__'):
- msg = _("'%s' is not a valid nameserver") % data
+ msg = _("Invalid data format for nameserver: '%s'") % data
LOG.debug(msg)
return msg
LOG.debug(msg)
return msg
if ip in ips:
- msg = _("Duplicate nameserver %s") % ip
+ msg = _("Duplicate nameserver '%s'") % ip
LOG.debug(msg)
return msg
ips.append(ip)
def _validate_hostroutes(data, valid_values=None):
if not isinstance(data, list):
- msg = _("'%s' is not a valid hostroute") % data
+ msg = _("Invalid data format for hostroute: '%s'") % data
LOG.debug(msg)
return msg
LOG.debug(msg)
return msg
if hostroute in hostroutes:
- msg = _("Duplicate hostroute %s") % hostroute
+ msg = _("Duplicate hostroute '%s'") % hostroute
LOG.debug(msg)
return msg
hostroutes.append(hostroute)
return msg
if len(set(data)) != len(data):
- msg = _("Duplicate items in the list: %s") % ', '.join(data)
+ msg = _("Duplicate items in the list: '%s'") % ', '.join(data)
LOG.debug(msg)
return msg
exceptions.ServiceUnavailable: webob.exc.HTTPServiceUnavailable,
exceptions.NotAuthorized: webob.exc.HTTPForbidden,
netaddr.AddrFormatError: webob.exc.HTTPBadRequest,
- AttributeError: webob.exc.HTTPBadRequest,
- ValueError: webob.exc.HTTPBadRequest,
}
def update(self, request, id, body=None, **kwargs):
"""Updates the specified entity's attributes"""
parent_id = kwargs.get(self._parent_id_name)
- payload = body.copy()
+ try:
+ payload = body.copy()
+ except AttributeError:
+ msg = _("Invalid format: %s") % request.body
+ raise exceptions.BadRequest(resource='body', msg=msg)
payload['id'] = id
notifier_api.notify(request.context,
self._publisher_id,
method = getattr(controller, action)
result = method(request=request, **args)
- except (ValueError, AttributeError,
- exceptions.QuantumException,
+ except (exceptions.QuantumException,
netaddr.AddrFormatError) as e:
LOG.exception(_('%s failed'), action)
body = serializer.serialize({'QuantumError': str(e)})
'test',
True)
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'ports')
+ self._validate_behavior_on_bulk_failure(res, 'ports', 500)
def test_create_ports_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True, context=ctx)
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'ports')
+ self._validate_behavior_on_bulk_failure(res, 'ports', 500)
class TestCiscoNetworksV2(CiscoNetworkPluginV2TestCase,
res = self._create_network_bulk(self.fmt, 2, 'test', True)
LOG.debug("response is %s" % res)
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'networks')
+ self._validate_behavior_on_bulk_failure(res, 'networks', 500)
def test_create_networks_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
patched_plugin.side_effect = side_effect
res = self._create_network_bulk(self.fmt, 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'networks')
+ self._validate_behavior_on_bulk_failure(res, 'networks', 500)
class TestCiscoSubnetsV2(CiscoNetworkPluginV2TestCase,
net['network']['id'],
'test')
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'subnets')
+ self._validate_behavior_on_bulk_failure(res, 'subnets', 500)
def test_create_subnets_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
'test')
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'subnets')
+ self._validate_behavior_on_bulk_failure(res, 'subnets', 500)
class TestCiscoPortsV2XML(TestCiscoPortsV2):
self.assertIsNone(msg)
def test_validate_fixed_ips(self):
- fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
+ fixed_ips = [
+ {'data': [{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
'ip_address': '1111.1.1.1'}],
- [{'subnet_id': 'invalid'}],
- None,
- [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
+ 'error_msg': "'1111.1.1.1' is not a valid IP address"},
+ {'data': [{'subnet_id': 'invalid',
+ 'ip_address': '1.1.1.1'}],
+ 'error_msg': "'invalid' is not a valid UUID"},
+ {'data': None,
+ 'error_msg': "Invalid data format for fixed IP: 'None'"},
+ {'data': "1.1.1.1",
+ 'error_msg': "Invalid data format for fixed IP: '1.1.1.1'"},
+ {'data': ['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1'],
+ 'error_msg': "Invalid data format for fixed IP: "
+ "'00000000-ffff-ffff-ffff-000000000000'"},
+ {'data': [['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1']],
+ 'error_msg': "Invalid data format for fixed IP: "
+ "'['00000000-ffff-ffff-ffff-000000000000', "
+ "'1.1.1.1']'"},
+ {'data': [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
'ip_address': '1.1.1.1'},
{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
- 'ip_address': '1.1.1.1'}]]
+ 'ip_address': '1.1.1.1'}],
+ 'error_msg': "Duplicate IP address '1.1.1.1'"}]
for fixed in fixed_ips:
- msg = attributes._validate_fixed_ips(fixed)
- self.assertIsNotNone(msg)
+ msg = attributes._validate_fixed_ips(fixed['data'])
+ self.assertEqual(msg, fixed['error_msg'])
fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
'ip_address': '1.1.1.1'}],
'f3eeab00-8367-4524-b662-55e64d4cacb5',
'e5069610-744b-42a7-8bd8-ceac1a229cd4']
msg = attributes._validate_uuid_list(duplicate_uuids)
- error = "Duplicate items in the list: %s" % ', '.join(duplicate_uuids)
+ error = ("Duplicate items in the list: "
+ "'%s'" % ', '.join(duplicate_uuids))
self.assertEquals(msg, error)
# check valid uuid lists
def _do_side_effect(self, patched_plugin, orig, *args, **kwargs):
""" Invoked by test cases for injecting failures in plugin """
def second_call(*args, **kwargs):
- raise AttributeError
+ raise q_exc.QuantumException
patched_plugin.side_effect = second_call
return orig(*args, **kwargs)
'test',
True)
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'ports')
+ self._validate_behavior_on_bulk_failure(res, 'ports', 500)
def test_create_ports_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True, context=ctx)
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'ports')
+ self._validate_behavior_on_bulk_failure(res, 'ports', 500)
def test_list_ports(self):
# for this test we need to enable overlapping ips
patched_plugin.side_effect = side_effect
res = self._create_network_bulk(self.fmt, 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'networks')
+ self._validate_behavior_on_bulk_failure(res, 'networks', 500)
def test_create_networks_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
patched_plugin.side_effect = side_effect
res = self._create_network_bulk(self.fmt, 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'networks')
+ self._validate_behavior_on_bulk_failure(res, 'networks', 500)
def test_list_networks(self):
with contextlib.nested(self.network(),
net['network']['id'],
'test')
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'subnets')
+ self._validate_behavior_on_bulk_failure(res, 'subnets', 500)
def test_create_subnets_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
'test')
# We expect a 500 as we injected a fault in the plugin
- self._validate_behavior_on_bulk_failure(res, 'subnets')
+ self._validate_behavior_on_bulk_failure(res, 'subnets', 500)
def test_delete_subnet(self):
gateway_ip = '10.0.0.1'