+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
#
-# Licensed under the Apache License, Version 2.0 (the 'License');
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the 'License'); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an 'AS IS' BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-ATTR_NOT_SPECIFIED = object()
-# Defining a constant to avoid repeating string literal in several modules
-SHARED = 'shared'
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
import netaddr
import re
LOG = logging.getLogger(__name__)
+ATTR_NOT_SPECIFIED = object()
+# Defining a constant to avoid repeating string literal in several modules
+SHARED = 'shared'
+
+
+def _verify_dict_keys(expected_keys, target_dict):
+ if not isinstance(target_dict, dict):
+ msg = _("Invalid input. %s must be a dictionary.") % target_dict
+ return msg
+
+ provided_keys = target_dict.keys()
+ if set(expected_keys) != set(provided_keys):
+ msg = (_("Expected keys not found. Expected: %(expected_keys)s "
+ "Provided: %(provided_keys)s") % locals())
+ return msg
+
def is_attr_set(attribute):
return not (attribute is None or attribute is ATTR_NOT_SPECIFIED)
def _validate_boolean(data, valid_values=None):
- if isinstance(data, bool):
- return
- else:
+ if not isinstance(data, bool):
msg = _("'%s' is not boolean") % data
LOG.debug("validate_boolean: %s", msg)
return msg
def _validate_mac_address(data, valid_values=None):
try:
netaddr.EUI(data)
- return
except Exception:
msg = _("'%s' is not a valid MAC address") % data
LOG.debug("validate_mac_address: %s", msg)
def _validate_ip_address(data, valid_values=None):
try:
netaddr.IPAddress(data)
- return
except Exception:
msg = _("'%s' is not a valid IP address") % data
LOG.debug("validate_ip_address: %s", msg)
def _validate_ip_pools(data, valid_values=None):
"""Validate that start and end IP addresses are present
- In addition to this the IP addresses will also be validated"""
+ In addition to this the IP addresses will also be validated
+
+ """
if not isinstance(data, list):
- msg = _("'%s' in not a valid IP pool") % data
+ msg = _("'%s' is not a valid IP pool") % data
LOG.debug("validate_ip_pools: %s", msg)
return msg
- expected_keys = set(['start', 'end'])
- try:
- for ip_pool in data:
- if set(ip_pool.keys()) != expected_keys:
- msg = _("Expected keys not found. Expected: %s "
- "Provided: %s") % (expected_keys, ip_pool.keys())
+ expected_keys = ['start', 'end']
+ for ip_pool in data:
+ msg = _verify_dict_keys(expected_keys, ip_pool)
+ if msg:
+ LOG.debug("validate_ip_pools: %s", msg)
+ return msg
+ for k in expected_keys:
+ msg = _validate_ip_address(ip_pool[k])
+ if msg:
LOG.debug("validate_ip_pools: %s", msg)
return msg
- for k in expected_keys:
- msg = _validate_ip_address(ip_pool[k])
- if msg:
- LOG.debug("validate_ip_pools: %s", msg)
- return msg
- except KeyError, e:
- args = {'key_name': e.message, 'ip_pool': ip_pool}
- msg = _("Invalid input. Required key: '%(key_name)s' "
- "missing from %(ip_pool)s.") % args
- LOG.debug("validate_ip_pools: %s", msg)
- return msg
- except TypeError, e:
- msg = _("Invalid input. Pool %s must be a dictionary.") % ip_pool
- LOG.debug("validate_ip_pools: %s", msg)
- return msg
- except Exception:
- msg = _("'%s' in not a valid IP pool") % data
- LOG.debug("validate_ip_pools: %s", msg)
- return msg
def _validate_fixed_ips(data, valid_values=None):
if not isinstance(data, list):
- msg = _("'%s' in not a valid fixed IP") % data
+ msg = _("'%s' is not a valid fixed IP") % data
LOG.debug("validate_fixed_ips: %s", msg)
return msg
+
ips = []
- try:
- for fixed_ip in data:
- if 'ip_address' in fixed_ip:
- msg = _validate_ip_address(fixed_ip['ip_address'])
- if msg:
- LOG.debug("validate_fixed_ips: %s", msg)
- return msg
- if 'subnet_id' in fixed_ip:
- msg = _validate_regex(fixed_ip['subnet_id'], UUID_PATTERN)
- if msg:
- LOG.debug("validate_fixed_ips: %s", msg)
- return msg
+ for fixed_ip in data:
+ if 'ip_address' in fixed_ip:
# Ensure that duplicate entries are not set - just checking IP
# suffices. Duplicate subnet_id's are legitimate.
- if 'ip_address' in fixed_ip:
- if fixed_ip['ip_address'] in ips:
- msg = _("Duplicate entry %s") % fixed_ip
- LOG.debug("validate_fixed_ips: %s", msg)
- return msg
- ips.append(fixed_ip['ip_address'])
- except Exception:
- msg = _("'%s' in not a valid fixed IP") % data
- LOG.debug("validate_fixed_ips: %s", msg)
- return msg
+ fixed_ip_address = fixed_ip['ip_address']
+ if fixed_ip_address in ips:
+ msg = _("Duplicate entry %s") % fixed_ip
+ else:
+ msg = _validate_ip_address(fixed_ip_address)
+ if msg:
+ LOG.debug("validate_fixed_ips: %s", msg)
+ return msg
+ ips.append(fixed_ip_address)
+ if 'subnet_id' in fixed_ip:
+ msg = _validate_uuid(fixed_ip['subnet_id'])
+ if msg:
+ LOG.debug("validate_fixed_ips: %s", msg)
+ return msg
def _validate_nameservers(data, valid_values=None):
if not hasattr(data, '__iter__'):
- msg = _("'%s' in not a valid nameserver") % data
+ msg = _("'%s' is not a valid nameserver") % data
LOG.debug("validate_nameservers: %s", msg)
return msg
- ips = set()
+
+ ips = []
for ip in data:
msg = _validate_ip_address(ip)
if msg:
# This may be a hostname
msg = _validate_regex(ip, HOSTNAME_PATTERN)
if msg:
- msg = _("'%s' in not a valid nameserver") % ip
+ msg = _("'%s' is not a valid nameserver") % ip
LOG.debug("validate_nameservers: %s", msg)
return msg
if ip in ips:
msg = _("Duplicate nameserver %s") % ip
LOG.debug("validate_nameservers: %s", msg)
return msg
- ips.add(ip)
+ ips.append(ip)
def _validate_hostroutes(data, valid_values=None):
if not isinstance(data, list):
- msg = _("'%s' in not a valid hostroute") % data
+ msg = _("'%s' is not a valid hostroute") % data
LOG.debug("validate_hostroutes: %s", msg)
return msg
+
+ expected_keys = ['destination', 'nexthop']
hostroutes = []
- try:
- for hostroute in data:
- msg = _validate_subnet(hostroute['destination'])
- if msg:
- LOG.debug("validate_hostroutes: %s", msg)
- return msg
- msg = _validate_ip_address(hostroute['nexthop'])
- if msg:
- LOG.debug("validate_hostroutes: %s", msg)
- return msg
- if hostroute in hostroutes:
- msg = _("Duplicate hostroute %s") % hostroute
- LOG.debug("validate_hostroutes: %s", msg)
- if msg:
- return msg
- hostroutes.append(hostroute)
- except:
- msg = _("'%s' in not a valid hostroute") % data
- LOG.debug("validate_hostroutes: %s", msg)
- return msg
+ for hostroute in data:
+ msg = _verify_dict_keys(expected_keys, hostroute)
+ if msg:
+ LOG.debug("validate_hostroutes: %s", msg)
+ return msg
+ msg = _validate_subnet(hostroute['destination'])
+ if msg:
+ LOG.debug("validate_hostroutes: %s", msg)
+ return msg
+ msg = _validate_ip_address(hostroute['nexthop'])
+ if msg:
+ LOG.debug("validate_hostroutes: %s", msg)
+ return msg
+ if hostroute in hostroutes:
+ msg = _("Duplicate hostroute %s") % hostroute
+ LOG.debug("validate_hostroutes: %s", msg)
+ return msg
+ hostroutes.append(hostroute)
def _validate_ip_address_or_none(data, valid_values=None):
except TypeError:
pass
- msg = _("'%s' is not valid input") % data
+ msg = _("'%s' is not a valid input") % data
LOG.debug("validate_regex: %s", msg)
return msg