if data in [True, False]:
return
else:
- msg = _("%s is not boolean") % data
+ msg = _("'%s' is not boolean") % data
LOG.debug("validate_boolean: %s", msg)
return msg
return msg
+def _validate_string(data, max_len=None):
+ if not isinstance(data, basestring):
+ msg = _("'%s' is not a valid string") % data
+ LOG.debug("validate_string: %s", msg)
+ return msg
+ if max_len is not None:
+ if len(data) > max_len:
+ msg = _("'%(data)s' exceeds maximum length of "
+ "%(max_len)s.") % locals()
+ LOG.debug("validate_string: %s", msg)
+ return msg
+
+
def _validate_range(data, valid_values=None):
min_value = valid_values[0]
max_value = valid_values[1]
netaddr.EUI(data)
return
except Exception:
- msg = _("%s is not a valid MAC address") % data
+ msg = _("'%s' is not a valid MAC address") % data
LOG.debug("validate_mac_address: %s", msg)
return msg
netaddr.IPAddress(data)
return
except Exception:
- msg = _("%s is not a valid IP address") % data
+ msg = _("'%s' is not a valid IP address") % data
LOG.debug("validate_ip_address: %s", msg)
return msg
+def _validate_ip_pools(data, valid_values=None):
+ """Validate that start and end IP addresses are present
+
+ In addition to this the IP addresses will also be validated"""
+ if not isinstance(data, list):
+ msg = _("'%s' in not a valid IP pool") % data
+ LOG.debug("validate_ip_pools: %s", msg)
+ return msg
+
+ expected_keys = set(['start', 'end'])
+ try:
+ for ip_pool in data:
+ if set(ip_pool.keys()) != expected_keys:
+ msg = _("Expected keys not found. Expected: %s "
+ "Provided: %s") % (expected_keys, ip_pool.keys())
+ LOG.debug("validate_ip_pools: %s", msg)
+ return msg
+ for k in expected_keys:
+ msg = _validate_ip_address(ip_pool[k])
+ if msg:
+ LOG.debug("validate_ip_pools: %s", msg)
+ return msg
+ except KeyError, e:
+ args = {'key_name': e.message, 'ip_pool': ip_pool}
+ msg = _("Invalid input. Required key: '%(key_name)s' "
+ "missing from %(ip_pool)s.") % args
+ LOG.debug("validate_ip_pools: %s", msg)
+ return msg
+ except TypeError, e:
+ msg = _("Invalid input. Pool %s must be a dictionary.") % ip_pool
+ LOG.debug("validate_ip_pools: %s", msg)
+ return msg
+ except Exception:
+ msg = _("'%s' in not a valid IP pool") % data
+ LOG.debug("validate_ip_pools: %s", msg)
+ return msg
+
+
+def _validate_fixed_ips(data, valid_values=None):
+ if not isinstance(data, list):
+ msg = _("'%s' in not a valid fixed IP") % data
+ LOG.debug("validate_fixed_ips: %s", msg)
+ return msg
+ ips = []
+ try:
+ for fixed_ip in data:
+ if 'ip_address' in fixed_ip:
+ msg = _validate_ip_address(fixed_ip['ip_address'])
+ if msg:
+ LOG.debug("validate_fixed_ips: %s", msg)
+ return msg
+ if 'subnet_id' in fixed_ip:
+ msg = _validate_regex(fixed_ip['subnet_id'], UUID_PATTERN)
+ if msg:
+ LOG.debug("validate_fixed_ips: %s", msg)
+ return msg
+ # Ensure that duplicate entries are not set - just checking IP
+ # suffices. Duplicate subnet_id's are legitimate.
+ if 'ip_address' in fixed_ip:
+ if fixed_ip['ip_address'] in ips:
+ msg = _("Duplicate entry %s") % fixed_ip
+ LOG.debug("validate_fixed_ips: %s", msg)
+ return msg
+ ips.append(fixed_ip['ip_address'])
+ except Exception:
+ msg = _("'%s' in not a valid fixed IP") % data
+ LOG.debug("validate_fixed_ips: %s", msg)
+ return msg
+
+
+def _validate_nameservers(data, valid_values=None):
+ if not hasattr(data, '__iter__'):
+ msg = _("'%s' in not a valid nameserver") % data
+ LOG.debug("validate_nameservers: %s", msg)
+ return msg
+ ips = set()
+ for ip in data:
+ msg = _validate_ip_address(ip)
+ if msg:
+ # This may be a hostname
+ msg = _validate_regex(ip, HOSTNAME_PATTERN)
+ if msg:
+ msg = _("'%s' in not a valid nameserver") % ip
+ LOG.debug("validate_nameservers: %s", msg)
+ return msg
+ if ip in ips:
+ msg = _("Duplicate nameserver %s") % ip
+ LOG.debug("validate_nameservers: %s", msg)
+ return msg
+ ips.add(ip)
+
+
+def _validate_hostroutes(data, valid_values=None):
+ if not isinstance(data, list):
+ msg = _("'%s' in not a valid hostroute") % data
+ LOG.debug("validate_hostroutes: %s", msg)
+ return msg
+ hostroutes = []
+ try:
+ for hostroute in data:
+ msg = _validate_subnet(hostroute['destination'])
+ if msg:
+ LOG.debug("validate_hostroutes: %s", msg)
+ return msg
+ msg = _validate_ip_address(hostroute['nexthop'])
+ if msg:
+ LOG.debug("validate_hostroutes: %s", msg)
+ return msg
+ if hostroute in hostroutes:
+ msg = _("Duplicate hostroute %s") % hostroute
+ LOG.debug("validate_hostroutes: %s", msg)
+ if msg:
+ return msg
+ hostroutes.append(hostroute)
+ except:
+ msg = _("'%s' in not a valid hostroute") % data
+ LOG.debug("validate_hostroutes: %s", msg)
+ return msg
+
+
def _validate_ip_address_or_none(data, valid_values=None):
if data is None:
return None
except Exception:
pass
- msg = _("%s is not a valid IP subnet") % data
+ msg = _("'%s' is not a valid IP subnet") % data
LOG.debug("validate_subnet: %s", msg)
return msg
def _validate_regex(data, valid_values=None):
- match = re.match(valid_values, data)
- if match:
- return
- else:
- msg = _("%s is not valid") % data
- LOG.debug("validate_regex: %s", msg)
- return msg
+ try:
+ if re.match(valid_values, data):
+ return
+ except TypeError:
+ pass
+
+ msg = _("'%s' is not valid input") % data
+ LOG.debug("validate_regex: %s", msg)
+ return msg
def _validate_uuid(data, valid_values=None):
return True
else:
return False
- except ValueError, TypeError:
+ except (ValueError, TypeError):
if (data == "True" or data == "true"):
return True
if (data == "False" or data == "false"):
return False
- msg = _("%s is not boolean") % data
+ msg = _("'%s' is not boolean") % data
raise q_exc.InvalidInput(error_message=msg)
+def convert_to_int(data):
+ try:
+ return int(data)
+ except (ValueError, TypeError):
+ msg = _("'%s' is not a integer") % data
+ raise q_exc.InvalidInput(error_message=msg)
+
+
def convert_kvp_str_to_list(data):
"""Convert a value of the form 'key=value' to ['key', 'value'].
kvp_map[key].add(value)
return dict((x, list(y)) for x, y in kvp_map.iteritems())
+HOSTNAME_PATTERN = ("(?=^.{1,254}$)(^(?:(?!\d+\.|-)[a-zA-Z0-9_\-]"
+ "{1,63}(?<!-)\.?)+(?:[a-zA-Z]{2,})$)")
HEX_ELEM = '[0-9A-Fa-f]'
UUID_PATTERN = '-'.join([HEX_ELEM + '{8}', HEX_ELEM + '{4}',
# Dictionary that maintains a list of validation functions
validators = {'type:boolean': _validate_boolean,
+ 'type:values': _validate_values,
+ 'type:string': _validate_string,
+ 'type:range': _validate_range,
+ 'type:mac_address': _validate_mac_address,
+ 'type:fixed_ips': _validate_fixed_ips,
'type:ip_address': _validate_ip_address,
'type:ip_address_or_none': _validate_ip_address_or_none,
'type:mac_address': _validate_mac_address,
'type:regex': _validate_regex,
'type:subnet': _validate_subnet,
'type:uuid': _validate_uuid,
- 'type:values': _validate_values}
+ 'type:regex': _validate_regex,
+ 'type:ip_pools': _validate_ip_pools,
+ 'type:hostroutes': _validate_hostroutes,
+ 'type:nameservers': _validate_nameservers}
# Note: a default of ATTR_NOT_SPECIFIED indicates that an
# attribute is not required, but will be generated by the plugin
'validate': {'type:uuid': None},
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True,
+ 'validate': {'type:string': None},
'default': '', 'is_visible': True},
'subnets': {'allow_post': False, 'allow_put': False,
'default': [],
'status': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
+ 'validate': {'type:string': None},
'required_by_policy': True,
'is_visible': True},
SHARED: {'allow_post': True,
'validate': {'type:uuid': None},
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True, 'default': '',
+ 'validate': {'type:string': None},
'is_visible': True},
'network_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'fixed_ips': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED,
'convert_list_to': convert_kvp_list_to_dict,
+ 'validate': {'type:fixed_ips': None},
'enforce_policy': True,
'is_visible': True},
'device_id': {'allow_post': True, 'allow_put': True,
+ 'validate': {'type:string': None},
'default': '',
'is_visible': True},
'device_owner': {'allow_post': True, 'allow_put': True,
+ 'validate': {'type:string': None},
'default': '',
'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
+ 'validate': {'type:string': None},
'required_by_policy': True,
'is_visible': True},
'status': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True, 'default': '',
+ 'validate': {'type:string': None},
'is_visible': True},
'ip_version': {'allow_post': True, 'allow_put': False,
- 'convert_to': int,
+ 'convert_to': convert_to_int,
'validate': {'type:values': [4, 6]},
'is_visible': True},
'network_id': {'allow_post': True, 'allow_put': False,
#TODO(salvatore-orlando): Enable PUT on allocation_pools
'allocation_pools': {'allow_post': True, 'allow_put': False,
'default': ATTR_NOT_SPECIFIED,
+ 'validate': {'type:ip_pools': None},
'is_visible': True},
'dns_nameservers': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED,
+ 'validate': {'type:nameservers': None},
'is_visible': True},
'host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED,
+ 'validate': {'type:hostroutes': None},
'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
+ 'validate': {'type:string': None},
'required_by_policy': True,
'is_visible': True},
'enable_dhcp': {'allow_post': True, 'allow_put': True,
import socket
+import netaddr
import webob.exc
from quantum.api.v2 import attributes
exceptions.HostRoutesExhausted: webob.exc.HTTPBadRequest,
exceptions.DNSNameServersExhausted: webob.exc.HTTPBadRequest,
# Some plugins enforce policies as well
- exceptions.PolicyNotAuthorized: webob.exc.HTTPForbidden
+ exceptions.PolicyNotAuthorized: webob.exc.HTTPForbidden,
+ netaddr.AddrFormatError: webob.exc.HTTPBadRequest,
+ AttributeError: webob.exc.HTTPBadRequest,
+ ValueError: webob.exc.HTTPBadRequest,
}
QUOTAS = quota.QUOTAS
Utility methods for working with WSGI servers redux
"""
+import netaddr
import webob
import webob.dec
import webob.exc
method = getattr(controller, action)
result = method(request=request, **args)
- except exceptions.QuantumException as e:
+ except (ValueError, AttributeError,
+ exceptions.QuantumException,
+ netaddr.AddrFormatError) as e:
LOG.exception('%s failed' % action)
body = serializer({'QuantumError': str(e)})
kwargs = {'body': body, 'content_type': content_type}
s['gateway_ip'] != attributes.ATTR_NOT_SPECIFIED):
self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip')
- if 'dns_nameservers' in s and \
- s['dns_nameservers'] != attributes.ATTR_NOT_SPECIFIED:
+ if ('dns_nameservers' in s and
+ s['dns_nameservers'] != attributes.ATTR_NOT_SPECIFIED):
if len(s['dns_nameservers']) > cfg.CONF.max_dns_nameservers:
raise q_exc.DNSNameServersExhausted(
subnet_id=id,
error_message=("error parsing dns address %s" % dns))
self._validate_ip_version(ip_ver, dns, 'dns_nameserver')
- if 'host_routes' in s and \
- s['host_routes'] != attributes.ATTR_NOT_SPECIFIED:
+ if ('host_routes' in s and
+ s['host_routes'] != attributes.ATTR_NOT_SPECIFIED):
if len(s['host_routes']) > cfg.CONF.max_subnet_host_routes:
raise q_exc.HostRoutesExhausted(
subnet_id=id,
class TestAttributes(unittest2.TestCase):
+ def test_strings(self):
+ msg = attributes._validate_string(None, None)
+ self.assertEquals(msg, "'None' is not a valid string")
+
+ msg = attributes._validate_string("OK", None)
+ self.assertEquals(msg, None)
+
+ msg = attributes._validate_string("123456789", 9)
+ self.assertIsNone(msg)
+
+ msg = attributes._validate_string("1234567890", 9)
+ self.assertIsNotNone(msg)
+
+ def test_ip_pools(self):
+ pools = [[{'end': '10.0.0.254'}],
+ [{'start': '10.0.0.254'}],
+ [{'start': '1000.0.0.254',
+ 'end': '1.1.1.1'}],
+ [{'start': '10.0.0.2', 'end': '10.0.0.254',
+ 'forza': 'juve'}],
+ [{'start': '10.0.0.2', 'end': '10.0.0.254'},
+ {'end': '10.0.0.254'}],
+ None]
+ for pool in pools:
+ msg = attributes._validate_ip_pools(pool, None)
+ self.assertIsNotNone(msg)
+
+ pools = [[{'end': '10.0.0.254', 'start': '10.0.0.2'},
+ {'start': '11.0.0.2', 'end': '11.1.1.1'}],
+ [{'start': '11.0.0.2', 'end': '11.0.0.100'}]]
+ for pool in pools:
+ msg = attributes._validate_ip_pools(pool, None)
+ self.assertIsNone(msg)
+
+ def test_fixed_ips(self):
+ fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
+ 'ip_address': '1111.1.1.1'}],
+ [{'subnet_id': 'invalid'}],
+ None,
+ [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
+ 'ip_address': '1.1.1.1'},
+ {'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
+ 'ip_address': '1.1.1.1'}],
+ [{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
+ 'ip_address': '1.1.1.1'},
+ {'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
+ 'ip_address': '1.1.1.1'}]]
+ for fixed in fixed_ips:
+ msg = attributes._validate_fixed_ips(fixed, None)
+ self.assertIsNotNone(msg)
+
+ def test_nameservers(self):
+ ns_pools = [['1.1.1.2', '1.1.1.2'],
+ ['www.hostname.com', 'www.hostname.com'],
+ ['77.hostname.com'],
+ ['1000.0.0.1'],
+ None]
+
+ for ns in ns_pools:
+ msg = attributes._validate_nameservers(ns, None)
+ self.assertIsNotNone(msg)
+
+ ns_pools = [['100.0.0.2'],
+ ['www.hostname.com'],
+ ['www.great.marathons.to.travel'],
+ ['valid'],
+ ['www.internal.hostname.com']]
+
+ for ns in ns_pools:
+ msg = attributes._validate_nameservers(ns, None)
+ self.assertIsNone(msg)
+
+ def test_hostroutes(self):
+ hostroute_pools = [[{'destination': '100.0.0.0/24'}],
+ [{'nexthop': '10.0.2.20'}],
+ [{'nexthop': '10.0.2.20',
+ 'destination': '100.0.0.0/8'},
+ {'nexthop': '10.0.2.20',
+ 'destination': '100.0.0.0/8'}],
+ None]
+ for host_routes in hostroute_pools:
+ msg = attributes._validate_hostroutes(host_routes, None)
+ self.assertIsNotNone(msg)
+
def test_mac_addresses(self):
# Valid - 3 octets
base_mac = "fa:16:3e:00:00:00"
base_mac = "01:16:3e:4f:00:00"
msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN)
- error = '%s is not valid' % base_mac
- self.assertEquals(msg, error)
+ self.assertIsNotNone(msg)
# Invalid - invalid format
base_mac = "a:16:3e:4f:00:00"
msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN)
- error = '%s is not valid' % base_mac
- self.assertEquals(msg, error)
+ self.assertIsNotNone(msg)
# Invalid - invalid format
base_mac = "ffa:16:3e:4f:00:00"
msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN)
- error = '%s is not valid' % base_mac
- self.assertEquals(msg, error)
+ self.assertIsNotNone(msg)
# Invalid - invalid format
base_mac = "01163e4f0000"
msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN)
- error = '%s is not valid' % base_mac
- self.assertEquals(msg, error)
+ self.assertIsNotNone(msg)
# Invalid - invalid format
base_mac = "01-16-3e-4f-00-00"
msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN)
- error = '%s is not valid' % base_mac
- self.assertEquals(msg, error)
+ self.assertIsNotNone(msg)
# Invalid - invalid format
base_mac = "00:16:3:f:00:00"
msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN)
- error = '%s is not valid' % base_mac
- self.assertEquals(msg, error)
+ self.assertIsNotNone(msg)
# Invalid - invalid format
base_mac = "12:3:4:5:67:89ab"
msg = attributes._validate_regex(base_mac,
attributes.MAC_PATTERN)
- error = '%s is not valid' % base_mac
- self.assertEquals(msg, error)
+ self.assertIsNotNone(msg)
def test_cidr(self):
# Valid - IPv4
cidr = "10.0.2.0"
msg = attributes._validate_subnet(cidr,
None)
- error = "%s is not a valid IP subnet" % cidr
+ error = "'%s' is not a valid IP subnet" % cidr
self.assertEquals(msg, error)
# Invalid - IPv6 without final octets, missing mask
cidr = "fe80::"
msg = attributes._validate_subnet(cidr,
None)
- error = "%s is not a valid IP subnet" % cidr
+ error = "'%s' is not a valid IP subnet" % cidr
self.assertEquals(msg, error)
# Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0"
msg = attributes._validate_subnet(cidr,
None)
- error = "%s is not a valid IP subnet" % cidr
+ error = "'%s' is not a valid IP subnet" % cidr
self.assertEquals(msg, error)
def _do_side_effect(self, patched_plugin, orig, *args, **kwargs):
""" Invoked by test cases for injecting failures in plugin """
def second_call(*args, **kwargs):
- raise Exception('boom')
+ raise AttributeError
patched_plugin.side_effect = second_call
return orig(*args, **kwargs)
def _validate_behavior_on_bulk_failure(self, res, collection):
- self.assertEqual(res.status_int, 500)
+ self.assertEqual(res.status_int, 400)
req = self.new_list_request(collection)
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
+ def test_update_invalid_json_400(self):
+ with self.network() as net:
+ req = self.new_update_request('networks',
+ '{{"name": "aaa"}}',
+ net['network']['id'])
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
def test_bad_route_404(self):
req = self.new_list_request('doohickeys')
res = req.get_response(self.api)
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
+ def test_update_device_id_null(self):
+ with self.port() as port:
+ data = {'port': {'device_id': None}}
+ req = self.new_update_request('ports', data, port['port']['id'])
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
def test_delete_network_if_port_exists(self):
fmt = 'json'
with self.port() as port:
res = self._create_port(fmt, net_id=net_id, **kwargs)
self.assertEquals(res.status_int, 400)
+ def test_overlapping_subnets(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ tenant_id = subnet['subnet']['tenant_id']
+ net_id = subnet['subnet']['network_id']
+ res = self._create_subnet(fmt,
+ tenant_id=tenant_id,
+ net_id=net_id,
+ cidr='10.0.0.225/28',
+ ip_version=4,
+ gateway_ip=ATTR_NOT_SPECIFIED)
+ self.assertEquals(res.status_int, 400)
+
def test_requested_subnet_id_v4_and_v6(self):
fmt = 'json'
with self.subnet() as subnet:
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
+ def test_invalid_ip(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ # Allocate specific IP
+ kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
+ 'ip_address': '1011.0.0.5'}]}
+ net_id = subnet['subnet']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port = self.deserialize(fmt, res)
+ self.assertEquals(res.status_int, 400)
+
def test_requested_split(self):
fmt = 'json'
with self.subnet() as subnet:
net_id = subnet['subnet']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
- self.assertEquals(res.status_int, 500)
+ self.assertEquals(res.status_int, 400)
+
+ def test_fixed_ip_invalid_subnet_id(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ # Allocate specific IP
+ kwargs = {"fixed_ips": [{'subnet_id': 'i am invalid',
+ 'ip_address': '10.0.0.5'}]}
+ net_id = subnet['subnet']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port2 = self.deserialize(fmt, res)
+ self.assertEquals(res.status_int, 400)
+
+ def test_fixed_ip_invalid_ip(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ # Allocate specific IP
+ kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
+ 'ip_address': '10.0.0.55555'}]}
+ net_id = subnet['subnet']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port2 = self.deserialize(fmt, res)
+ self.assertEquals(res.status_int, 400)
def test_requested_ips_only(self):
fmt = 'json'
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 403)
+ def test_create_subnet_bad_ip_version(self):
+ with self.network() as network:
+ # Check bad IP version
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': 'abc',
+ 'tenant_id': network['network']['tenant_id'],
+ 'gateway_ip': '10.0.2.1'}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_bad_ip_version_null(self):
+ with self.network() as network:
+ # Check bad IP version
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': None,
+ 'tenant_id': network['network']['tenant_id'],
+ 'gateway_ip': '10.0.2.1'}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_bad_uuid(self):
+ with self.network() as network:
+ # Check invalid UUID
+ data = {'subnet': {'network_id': None,
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': 4,
+ 'tenant_id': network['network']['tenant_id'],
+ 'gateway_ip': '10.0.2.1'}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_bad_boolean(self):
+ with self.network() as network:
+ # Check invalid boolean
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': '4',
+ 'enable_dhcp': None,
+ 'tenant_id': network['network']['tenant_id'],
+ 'gateway_ip': '10.0.2.1'}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_bad_pools(self):
+ with self.network() as network:
+ # Check allocation pools
+ allocation_pools = [[{'end': '10.0.0.254'}],
+ [{'start': '10.0.0.254'}],
+ [{'start': '1000.0.0.254'}],
+ [{'start': '10.0.0.2', 'end': '10.0.0.254'},
+ {'end': '10.0.0.254'}],
+ None,
+ [{'start': '10.0.0.2', 'end': '10.0.0.3'},
+ {'start': '10.0.0.2', 'end': '10.0.0.3'}]]
+ tenant_id = network['network']['tenant_id']
+ for pool in allocation_pools:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': '4',
+ 'tenant_id': tenant_id,
+ 'gateway_ip': '10.0.2.1',
+ 'allocation_pools': pool}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_bad_nameserver(self):
+ with self.network() as network:
+ # Check nameservers
+ nameserver_pools = [['1100.0.0.2'],
+ ['1.1.1.2', '1.1000.1.3'],
+ ['1.1.1.2', '1.1.1.2'],
+ None]
+ tenant_id = network['network']['tenant_id']
+ for nameservers in nameserver_pools:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': '4',
+ 'tenant_id': tenant_id,
+ 'gateway_ip': '10.0.2.1',
+ 'dns_nameservers': nameservers}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
+ def test_create_subnet_bad_hostroutes(self):
+ with self.network() as network:
+ # Check hostroutes
+ hostroute_pools = [[{'destination': '100.0.0.0/24'}],
+ [{'nexthop': '10.0.2.20'}],
+ [{'nexthop': '10.0.2.20',
+ 'destination': '100.0.0.0/8'},
+ {'nexthop': '10.0.2.20',
+ 'destination': '100.0.0.0/8'}],
+ None]
+ tenant_id = network['network']['tenant_id']
+ for hostroutes in hostroute_pools:
+ data = {'subnet': {'network_id': network['network']['id'],
+ 'cidr': '10.0.2.0/24',
+ 'ip_version': '4',
+ 'tenant_id': tenant_id,
+ 'gateway_ip': '10.0.2.1',
+ 'host_routes': hostroutes}}
+ subnet_req = self.new_create_request('subnets', data)
+ res = subnet_req.get_response(self.api)
+ self.assertEquals(res.status_int, 400)
+
def test_create_subnet_defaults(self):
gateway = '10.0.0.1'
cidr = '10.0.0.0/24'