--- /dev/null
+import collections
+import re
+
+
+SCHEMA_KEYS = (
+ REQUIRED, IMPLEMENTED, DEFAULT, TYPE, SCHEMA,
+ PATTERN, MIN_VALUE, MAX_VALUE, VALUES,
+) = (
+ 'Required', 'Implemented', 'Default', 'Type', 'Schema',
+ 'AllowedPattern', 'MinValue', 'MaxValue', 'AllowedValues',
+)
+
+SCHEMA_TYPES = (
+ INTEGER,
+ STRING, NUMBER, BOOLEAN,
+ MAP, LIST
+) = (
+ 'Integer',
+ 'String', 'Number', 'Boolean',
+ 'Map', 'List'
+)
+
+
+class Property(object):
+ def __init__(self, schema, name=None):
+ self.schema = schema
+ self.name = name
+
+ for key in self.schema:
+ assert key in SCHEMA_KEYS, 'Unknown schema key "%s"' % key
+
+ assert self.type() in SCHEMA_TYPES, \
+ 'Unknown property type "%s"' % self.type()
+
+ def required(self):
+ return self.schema.get(REQUIRED, False)
+
+ def implemented(self):
+ return self.schema.get(IMPLEMENTED, True)
+
+ def has_default(self):
+ return DEFAULT in self.schema
+
+ def default(self):
+ return self.schema[DEFAULT]
+
+ def type(self):
+ return self.schema[TYPE]
+
+ def _check_allowed(self, value):
+ if VALUES in self.schema:
+ allowed = self.schema[VALUES]
+ if value not in allowed:
+ raise ValueError('"%s" is not an allowed value %s' %
+ (value, str(allowed)))
+
+ @staticmethod
+ def str_to_num(value):
+ try:
+ return int(value)
+ except ValueError:
+ return float(value)
+
+ def _validate_number(self, value):
+ self._check_allowed(value)
+
+ num = self.str_to_num(value)
+
+ minn = self.str_to_num(self.schema.get(MIN_VALUE, value))
+ maxn = self.str_to_num(self.schema.get(MAX_VALUE, value))
+
+ if num > maxn or num < minn:
+ format = '%d' if isinstance(num, int) else '%f'
+ raise ValueError('%s is out of range' % (format % num))
+ return value
+
+ def _validate_string(self, value):
+ if not isinstance(value, basestring):
+ raise ValueError('Value must be a string')
+
+ self._check_allowed(value)
+
+ if PATTERN in self.schema:
+ pattern = self.schema[PATTERN]
+ match = re.match(pattern, value)
+ if match is None or match.end() != len(value):
+ raise ValueError('"%s" does not match pattern "%s"' %
+ (value, pattern))
+
+ return value
+
+ def _validate_map(self, value):
+ if not isinstance(value, collections.Mapping):
+ raise TypeError('"%s" is not a map' % value)
+
+ if SCHEMA in self.schema:
+ children = dict(Properties(self.schema[SCHEMA], value,
+ parent_name=self.name))
+ else:
+ children = value
+
+ return children
+
+ def _validate_list(self, value):
+ if (not isinstance(value, collections.Sequence) or
+ isinstance(value, basestring)):
+ raise TypeError('"%s" is not a list' % repr(value))
+
+ for v in value:
+ self._check_allowed(v)
+
+ if SCHEMA in self.schema:
+ prop = Property({TYPE: MAP, SCHEMA: self.schema[SCHEMA]})
+ children = [prop.validate_data(d) for d in value]
+ else:
+ children = value
+
+ return children
+
+ def _validate_bool(self, value):
+ normalised = value.lower()
+ if normalised not in ['true', 'false']:
+ raise ValueError('"%s" is not a valid boolean')
+
+ return normalised
+
+ def validate_data(self, value):
+ t = self.type()
+ if t == STRING:
+ return self._validate_string(value)
+ elif t == INTEGER:
+ if not isinstance(value, int):
+ raise TypeError('value is not an integer')
+ return self._validate_number(value)
+ elif t == NUMBER:
+ return self._validate_number(value)
+ elif t == MAP:
+ return self._validate_map(value)
+ elif t == LIST:
+ return self._validate_list(value)
+ elif t == BOOLEAN:
+ return self._validate_bool(value)
+
+
+class Properties(collections.Mapping):
+
+ def __init__(self, schema, data, resolver=lambda d: d, parent_name=None):
+ self.props = dict((k, Property(s, k)) for k, s in schema.items())
+ self.resolve = resolver
+ self.data = data
+ if parent_name is None:
+ self.error_prefix = ''
+ else:
+ self.error_prefix = parent_name + ': '
+
+ def validate(self):
+ for (key, prop) in self.props.items():
+ try:
+ self[key]
+ except ValueError as e:
+ return str(e)
+
+ # are there unimplemented Properties
+ if not prop.implemented() and key in self.data:
+ return (self.error_prefix +
+ '%s Property not implemented yet' % key)
+
+ def __getitem__(self, key):
+ if key not in self:
+ raise KeyError(self.error_prefix + 'Invalid Property %s' % key)
+
+ prop = self.props[key]
+
+ if key in self.data:
+ value = self.resolve(self.data[key])
+ try:
+ return prop.validate_data(value)
+ except ValueError as e:
+ raise ValueError(self.error_prefix + '%s %s' % (key, str(e)))
+ elif prop.has_default():
+ return prop.default()
+ elif prop.required():
+ raise ValueError(self.error_prefix +
+ 'Property %s not assigned' % key)
+
+ def __len__(self):
+ return len(self.props)
+
+ def __contains__(self, key):
+ return key in self.props
+
+ def __iter__(self):
+ return iter(self.props)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import unittest
+from nose.plugins.attrib import attr
+import mox
+
+from heat.engine.resources import properties
+
+
+@attr(tag=['unit', 'properties'])
+@attr(speed='fast')
+class PropertyTest(unittest.TestCase):
+ def test_required_default(self):
+ p = properties.Property({'Type': 'String'})
+ self.assertFalse(p.required())
+
+ def test_required_false(self):
+ p = properties.Property({'Type': 'String', 'Required': False})
+ self.assertFalse(p.required())
+
+ def test_required_true(self):
+ p = properties.Property({'Type': 'String', 'Required': True})
+ self.assertTrue(p.required())
+
+ def test_implemented_default(self):
+ p = properties.Property({'Type': 'String'})
+ self.assertTrue(p.implemented())
+
+ def test_implemented_false(self):
+ p = properties.Property({'Type': 'String', 'Implemented': False})
+ self.assertFalse(p.implemented())
+
+ def test_implemented_true(self):
+ p = properties.Property({'Type': 'String', 'Implemented': True})
+ self.assertTrue(p.implemented())
+
+ def test_no_default(self):
+ p = properties.Property({'Type': 'String'})
+ self.assertFalse(p.has_default())
+
+ def test_default(self):
+ p = properties.Property({'Type': 'String', 'Default': 'wibble'})
+ self.assertEqual(p.default(), 'wibble')
+
+ def test_type(self):
+ p = properties.Property({'Type': 'String'})
+ self.assertEqual(p.type(), 'String')
+
+ def test_bad_type(self):
+ self.assertRaises(AssertionError,
+ properties.Property, {'Type': 'Fish'})
+
+ def test_bad_key(self):
+ self.assertRaises(AssertionError,
+ properties.Property,
+ {'Type': 'String', 'Foo': 'Bar'})
+
+ def test_string_pattern_good(self):
+ schema = {'Type': 'String',
+ 'AllowedPattern': '[a-z]*'}
+ p = properties.Property(schema)
+ self.assertEqual(p.validate_data('foo'), 'foo')
+
+ def test_string_pattern_bad_prefix(self):
+ schema = {'Type': 'String',
+ 'AllowedPattern': '[a-z]*'}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, '1foo')
+
+ def test_string_pattern_bad_suffix(self):
+ schema = {'Type': 'String',
+ 'AllowedPattern': '[a-z]*'}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, 'foo1')
+
+ def test_string_value_list_good(self):
+ schema = {'Type': 'String',
+ 'AllowedValues': ['foo', 'bar', 'baz']}
+ p = properties.Property(schema)
+ self.assertEqual(p.validate_data('bar'), 'bar')
+
+ def test_string_value_list_bad(self):
+ schema = {'Type': 'String',
+ 'AllowedValues': ['foo', 'bar', 'baz']}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, 'blarg')
+
+ def test_int_good(self):
+ schema = {'Type': 'Integer',
+ 'MinValue': 3,
+ 'MaxValue': 3}
+ p = properties.Property(schema)
+ self.assertEqual(p.validate_data(3), 3)
+
+ def test_int_bad(self):
+ schema = {'Type': 'Integer'}
+ p = properties.Property(schema)
+ self.assertRaises(TypeError, p.validate_data, '3')
+
+ def test_integer_low(self):
+ schema = {'Type': 'Integer',
+ 'MinValue': 4}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, 3)
+
+ def test_integer_high(self):
+ schema = {'Type': 'Integer',
+ 'MaxValue': 2}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, 3)
+
+ def test_integer_value_list_good(self):
+ schema = {'Type': 'Integer',
+ 'AllowedValues': [1, 3, 5]}
+ p = properties.Property(schema)
+ self.assertEqual(p.validate_data(5), 5)
+
+ def test_integer_value_list_bad(self):
+ schema = {'Type': 'Integer',
+ 'AllowedValues': [1, 3, 5]}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, 2)
+
+ def test_number_good(self):
+ schema = {'Type': 'Number',
+ 'MinValue': '3',
+ 'MaxValue': '3'}
+ p = properties.Property(schema)
+ self.assertEqual(p.validate_data('3'), '3')
+
+ def test_number_value_list_good(self):
+ schema = {'Type': 'Number',
+ 'AllowedValues': ['1', '3', '5']}
+ p = properties.Property(schema)
+ self.assertEqual(p.validate_data('5'), '5')
+
+ def test_number_value_list_bad(self):
+ schema = {'Type': 'Number',
+ 'AllowedValues': ['1', '3', '5']}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, '2')
+
+ def test_number_low(self):
+ schema = {'Type': 'Number',
+ 'MinValue': '4'}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, '3')
+
+ def test_number_high(self):
+ schema = {'Type': 'Number',
+ 'MaxValue': '2'}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, '3')
+
+ def test_boolean_true(self):
+ p = properties.Property({'Type': 'Boolean'})
+ self.assertEqual(p.validate_data('True'), 'true')
+
+ def test_boolean_false(self):
+ p = properties.Property({'Type': 'Boolean'})
+ self.assertEqual(p.validate_data('False'), 'false')
+
+ def test_boolean_invalid(self):
+ p = properties.Property({'Type': 'Boolean'})
+ self.assertRaises(ValueError, p.validate_data, 'fish')
+
+ def test_list_string(self):
+ p = properties.Property({'Type': 'List'})
+ self.assertRaises(TypeError, p.validate_data, 'foo')
+
+ def test_list_good(self):
+ p = properties.Property({'Type': 'List'})
+ self.assertEqual(p.validate_data(['foo', 'bar']), ['foo', 'bar'])
+
+ def test_list_dict(self):
+ p = properties.Property({'Type': 'List'})
+ self.assertRaises(TypeError, p.validate_data, {'foo': 'bar'})
+
+ def test_list_value_list_bad(self):
+ schema = {'Type': 'List',
+ 'AllowedValues': ['foo', 'bar', 'baz']}
+ p = properties.Property(schema)
+ self.assertRaises(ValueError, p.validate_data, ['foo', 'wibble'])
+
+ def test_list_value_list_good(self):
+ schema = {'Type': 'List',
+ 'AllowedValues': ['foo', 'bar', 'baz']}
+ p = properties.Property(schema)
+ self.assertEqual(p.validate_data(['bar', 'foo']), ['bar', 'foo'])
+
+ def test_map_string(self):
+ p = properties.Property({'Type': 'Map'})
+ self.assertRaises(TypeError, p.validate_data, 'foo')
+
+ def test_map_list(self):
+ p = properties.Property({'Type': 'Map'})
+ self.assertRaises(TypeError, p.validate_data, ['foo'])
+
+ def test_map_schema_good(self):
+ map_schema = {'valid': {'Type': 'Boolean'}}
+ p = properties.Property({'Type': 'Map', 'Schema': map_schema})
+ self.assertEqual(p.validate_data({'valid': 'TRUE'}), {'valid': 'true'})
+
+ def test_map_schema_bad_data(self):
+ map_schema = {'valid': {'Type': 'Boolean'}}
+ p = properties.Property({'Type': 'Map', 'Schema': map_schema})
+ self.assertRaises(ValueError, p.validate_data, {'valid': 'fish'})
+
+ def test_map_schema_missing_data(self):
+ map_schema = {'valid': {'Type': 'Boolean'}}
+ p = properties.Property({'Type': 'Map', 'Schema': map_schema})
+ self.assertEqual(p.validate_data({}), {'valid': None})
+
+ def test_map_schema_missing_required_data(self):
+ map_schema = {'valid': {'Type': 'Boolean', 'Required': True}}
+ p = properties.Property({'Type': 'Map', 'Schema': map_schema})
+ self.assertRaises(ValueError, p.validate_data, {})
+
+ def test_list_schema_good(self):
+ map_schema = {'valid': {'Type': 'Boolean'}}
+ p = properties.Property({'Type': 'List', 'Schema': map_schema})
+ self.assertEqual(p.validate_data([{'valid': 'TRUE'},
+ {'valid': 'False'}]),
+ [{'valid': 'true'},
+ {'valid': 'false'}])
+
+ def test_list_schema_bad_data(self):
+ map_schema = {'valid': {'Type': 'Boolean'}}
+ p = properties.Property({'Type': 'List', 'Schema': map_schema})
+ self.assertRaises(ValueError, p.validate_data, [{'valid': 'True'},
+ {'valid': 'fish'}])
+
+
+@attr(tag=['unit', 'properties'])
+@attr(speed='fast')
+class PropertiesTest(unittest.TestCase):
+ def setUp(self):
+ schema = {
+ 'int': {'Type': 'Integer'},
+ 'string': {'Type': 'String'},
+ 'required_int': {'Type': 'Integer', 'Required': True},
+ 'bad_int': {'Type': 'Integer'},
+ 'missing': {'Type': 'Integer'},
+ 'defaulted': {'Type': 'Integer', 'Default': 1},
+ 'default_override': {'Type': 'Integer', 'Default': 1},
+ }
+ data = {
+ 'int': 21,
+ 'string': 'foo',
+ 'bad_int': 'foo',
+ 'default_override': 21,
+ }
+ double = lambda d: d * 2
+ self.props = properties.Properties(schema, data, double, 'wibble')
+
+ def test_integer_good(self):
+ self.assertEqual(self.props['int'], 42)
+
+ def test_string_good(self):
+ self.assertEqual(self.props['string'], 'foofoo')
+
+ def test_missing_required(self):
+ self.assertRaises(ValueError, self.props.get, 'required_int')
+
+ def test_integer_bad(self):
+ self.assertRaises(TypeError, self.props.get, 'bad_int')
+
+ def test_missing(self):
+ self.assertEqual(self.props['missing'], None)
+
+ def test_default(self):
+ self.assertEqual(self.props['defaulted'], 1)
+
+ def test_default_override(self):
+ self.assertEqual(self.props['default_override'], 42)
+
+ def test_bad_key(self):
+ self.assertEqual(self.props.get('foo', 'wibble'), 'wibble')
+
+
+@attr(tag=['unit', 'properties'])
+@attr(speed='fast')
+class PropertiesValidationTest(unittest.TestCase):
+ def test_required(self):
+ schema = {'foo': {'Type': 'String', 'Required': True}}
+ props = properties.Properties(schema, {'foo': 'bar'})
+ self.assertEqual(props.validate(), None)
+
+ def test_missing_required(self):
+ schema = {'foo': {'Type': 'String', 'Required': True}}
+ props = properties.Properties(schema, {})
+ self.assertNotEqual(props.validate(), None)
+
+ def test_missing_unimplemented(self):
+ schema = {'foo': {'Type': 'String', 'Implemented': False}}
+ props = properties.Properties(schema, {})
+ self.assertEqual(props.validate(), None)
+
+ def test_present_unimplemented(self):
+ schema = {'foo': {'Type': 'String', 'Implemented': False}}
+ props = properties.Properties(schema, {'foo': 'bar'})
+ self.assertNotEqual(props.validate(), None)
+
+ def test_missing(self):
+ schema = {'foo': {'Type': 'String'}}
+ props = properties.Properties(schema, {})
+ self.assertEqual(props.validate(), None)
+
+ def test_bad_data(self):
+ schema = {'foo': {'Type': 'String'}}
+ props = properties.Properties(schema, {'foo': 42})
+ self.assertNotEqual(props.validate(), None)
+
+
+# allows testing of the test directly, shown below
+if __name__ == '__main__':
+ sys.argv.append(__file__)
+ nose.main()