+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import re
-from copy import deepcopy
-
-from heat.openstack.common import log as logging
-
-logger = logging.getLogger('heat.engine.checkeddict')
-
-
-class CheckedDict(collections.MutableMapping):
-
- def __init__(self, name):
- self.data = {}
- self.name = name
-
- def addschema(self, key, schema):
- self.data[key] = deepcopy(schema)
-
- def get_attr(self, key, attr):
- return self.data[key].get(attr, '')
-
- def __setitem__(self, key, value):
- '''Since this function gets called whenever we modify the
- dictionary (object), we can (and do) validate those keys that we
- know how to validate.
- '''
- def str_to_num(s):
- try:
- return int(s)
- except ValueError:
- return float(s)
-
- num_converter = {'Integer': int,
- 'Number': str_to_num,
- 'Float': float}
-
- if not key in self.data:
- raise KeyError('%s: key %s not found' % (self.name, key))
-
- if 'Type' in self.data[key]:
- t = self.data[key]['Type']
- if t == 'String':
- if not isinstance(value, basestring):
- raise ValueError('%s: %s Value must be a string' %
- (self.name, key))
- if 'MaxLength' in self.data[key]:
- if len(value) > int(self.data[key]['MaxLength']):
- raise ValueError('%s: %s is too long; MaxLength %s' %
- (self.name, key,
- self.data[key]['MaxLength']))
- if 'MinLength' in self.data[key]:
- if len(value) < int(self.data[key]['MinLength']):
- raise ValueError('%s: %s is too short; MinLength %s' %
- (self.name, key,
- self.data[key]['MinLength']))
- if 'AllowedPattern' in self.data[key]:
- rc = re.match('^%s$' % self.data[key]['AllowedPattern'],
- value)
- if rc is None:
- raise ValueError('%s: Pattern %s does not match %s' %
- (self.name,
- self.data[key]['AllowedPattern'],
- key))
-
- elif t in ['Integer', 'Number', 'Float']:
- # just try convert and see if it will throw a ValueError
- num = num_converter[t](value)
- minn = num
- maxn = num
- if 'MaxValue' in self.data[key]:
- maxn = num_converter[t](self.data[key]['MaxValue'])
- if 'MinValue' in self.data[key]:
- minn = num_converter[t](self.data[key]['MinValue'])
- if num > maxn or num < minn:
- raise ValueError('%s: %s is out of range' % (self.name,
- key))
-
- elif t == 'Map':
- if not isinstance(value, dict):
- raise ValueError('%s: %s Value must be a map' %
- (self.name, key))
- if 'Schema' in self.data[key]:
- cdict = CheckedDict(key)
- schema = self.data[key]['Schema']
- for n, s in schema.items():
- cdict.addschema(n, s)
- for k, v in value.items():
- cdict[k] = v
-
- elif t == 'List':
- if not isinstance(value, (list, tuple)):
- raise ValueError('%s: %s Value must be a list, not %s' %
- (self.name, key, value))
- if 'Schema' in self.data[key]:
- for item in value:
- cdict = CheckedDict(key)
- schema = self.data[key]['Schema']
- for n, s in schema.items():
- cdict.addschema(n, s)
- for k, v in item.items():
- cdict[k] = v
-
- elif t == 'CommaDelimitedList':
- sp = value.split(',')
-
- else:
- logger.warn('Unknown value type "%s"' % t)
-
- if 'AllowedValues' in self.data[key]:
- if not value in self.data[key]['AllowedValues']:
- raise ValueError('%s: %s Value must be one of %s' %
- (self.name, key,
- str(self.data[key]['AllowedValues'])))
-
- self.data[key]['Value'] = value
-
- def __getitem__(self, key):
- if not key in self.data:
- raise KeyError('%s: key %s not found' % (self.name, key))
-
- if 'Value' in self.data[key]:
- return self.data[key]['Value']
- elif 'Default' in self.data[key]:
- return self.data[key]['Default']
- elif 'Required' in self.data[key]:
- if not self.data[key]['Required']:
- return None
- else:
- raise ValueError('%s: Property %s not assigned' % (self.name,
- key))
- else:
- raise ValueError('%s: Property %s not assigned' % (self.name, key))
-
- def __len__(self):
- return len(self.data)
-
- def __contains__(self, key):
- return key in self.data
-
- def __iter__(self):
- return iter(self.data)
-
- def __delitem__(self, k):
- del self.data[k]
+++ /dev/null
-import sys
-import os
-
-import nose
-import unittest
-import mox
-import json
-
-from nose.plugins.attrib import attr
-from nose import with_setup
-
-from heat.engine import checkeddict
-
-
-@attr(tag=['unit', 'checkeddict'])
-@attr(speed='fast')
-class CheckedDictTest(unittest.TestCase):
-
- def test_paramerters(self):
- parms = '''
-{
- "Parameters" : {
- "TODOList" : {
- "Description" : "stuff",
- "Type" : "CommaDelimitedList"
- },
- "SomeNumber" : {
- "Type" : "Number",
- "Default" : "56",
- "MaxValue": "6778",
- "MinValue": "15.78"
- },
- "DBUsername": {
- "Default": "admin",
- "NoEcho": "true",
- "Description" : "The WordPress database admin account username",
- "Type": "String",
- "MinLength": "1",
- "MaxLength": "16",
- "AllowedPattern" : "[a-zA-Z][a-zA-Z0-9]*",
- "ConstraintDescription" : "begin with a letter & \
- contain only alphanumeric characters."
- },
- "LinuxDistribution": {
- "Default": "F16",
- "Description" : "Distribution of choice",
- "Type": "String",
- "AllowedValues" : [ "F16", "F17", "U10", "RHEL-6.1", "RHEL-6.3" ]
- }
- }
-}
-'''
- ps = json.loads(parms)
- cd = checkeddict.CheckedDict('test_paramerters')
- for p in ps['Parameters']:
- cd.addschema(p, ps['Parameters'][p])
-
- # AllowedValues
- self.assertRaises(ValueError, cd.__setitem__, 'LinuxDistribution',
- 'f16')
- # MaxLength
- self.assertRaises(ValueError, cd.__setitem__, 'DBUsername',
- 'Farstarststrststrstrstrst144')
- # MinLength
- self.assertRaises(ValueError, cd.__setitem__, 'DBUsername', '')
- # AllowedPattern
- self.assertRaises(ValueError, cd.__setitem__, 'DBUsername', '4me')
-
- cd['DBUsername'] = 'wtf'
- self.assertTrue(cd['DBUsername'] == 'wtf')
- cd['LinuxDistribution'] = 'U10'
- self.assertTrue(cd['LinuxDistribution'] == 'U10')
-
- # int
- cd['SomeNumber'] = '98'
- self.assertTrue(cd['SomeNumber'] == '98')
-
- # float
- cd['SomeNumber'] = '54.345'
- self.assertTrue(cd['SomeNumber'] == '54.345')
-
- # not a num
- self.assertRaises(ValueError, cd.__setitem__, 'SomeNumber', 'S8')
- # range errors
- self.assertRaises(ValueError, cd.__setitem__, 'SomeNumber', '8')
- self.assertRaises(ValueError, cd.__setitem__, 'SomeNumber', '9048.56')
- # lists
- cd['TODOList'] = "'one', 'two', 'three'"
-
- def test_nested_paramerters(self):
- listeners_schema = {
- 'InstancePort': {'Type': 'Integer',
- 'Required': True},
- 'LoadBalancerPort': {'Type': 'Integer',
- 'Required': True}
- }
-
- healthcheck_schema = {
- 'HealthyThreshold': {'Type': 'Number',
- 'Required': True},
- 'Interval': {'Type': 'Number',
- 'Required': True}
- }
-
- properties_schema = {
- 'HealthCheck': {'Type': 'Map',
- 'Implemented': False,
- 'Schema': healthcheck_schema},
- 'Listeners': {'Type': 'List',
- 'Schema': listeners_schema}
- }
-
- cd = checkeddict.CheckedDict('nested')
- for p, s in properties_schema.items():
- cd.addschema(p, s)
-
- hc = {'HealthyThreshold': 'bla', 'Interval': '45'}
- self.assertRaises(ValueError, cd.__setitem__, 'HealthCheck', hc)
-
- hc = {'HealthyThreshold': '246', 'Interval': '45'}
- cd['HealthCheck'] = hc
- self.assertTrue(cd['HealthCheck']['HealthyThreshold'] == '246')
- self.assertTrue(cd['HealthCheck']['Interval'] == '45')
-
- li = [{'InstancePort': 'bla', 'LoadBalancerPort': '45'},
- {'InstancePort': '66', 'LoadBalancerPort': '775'}]
- self.assertRaises(ValueError, cd.__setitem__, 'Listeners', li)
-
- li2 = [{'InstancePort': '674', 'LoadBalancerPort': '45'},
- {'InstancePort': '66', 'LoadBalancerPort': '775'}]
- cd['Listeners'] = li2
-
- self.assertTrue(cd['Listeners'][0]['LoadBalancerPort'] == '45')
- self.assertTrue(cd['Listeners'][1]['LoadBalancerPort'] == '775')