# under the License.
import collections
+import json
import re
from heat.common import exception
'Description', 'ConstraintDescription'
)
PARAMETER_TYPES = (
- STRING, NUMBER, COMMA_DELIMITED_LIST
+ STRING, NUMBER, COMMA_DELIMITED_LIST, JSON
) = (
- 'String', 'Number', 'CommaDelimitedList'
+ 'String', 'Number', 'CommaDelimitedList', 'Json'
)
PSEUDO_PARAMETERS = (
PARAM_STACK_ID, PARAM_STACK_NAME, PARAM_REGION
ParamClass = NumberParam
elif param_type == COMMA_DELIMITED_LIST:
ParamClass = CommaDelimitedListParam
+ elif param_type == JSON:
+ ParamClass = JsonParam
else:
raise ValueError('Invalid Parameter type "%s"' % param_type)
def _validate(self, value):
'''Check that the supplied value is compatible with the constraints.'''
try:
- sp = value.split(',')
+ value.split(',')
except AttributeError:
raise ValueError('Value must be a comma-delimited list string')
return self.value().split(',')[index]
+class JsonParam(Parameter, collections.Mapping):
+ """A template parameter who's value is valid map."""
+
+ def _validate(self, value):
+ message = 'Value must be valid JSON'
+ if isinstance(value, collections.Mapping):
+ try:
+ self.user_value = json.dumps(value)
+ except (ValueError, TypeError) as err:
+ raise ValueError("%s: %s" % (message, str(err)))
+ self.parsed = value
+ else:
+ try:
+ self.parsed = json.loads(value)
+ except ValueError:
+ raise ValueError(message)
+
+ # check length
+ my_len = len(self.parsed)
+ if MAX_LENGTH in self.schema:
+ max_length = int(self.schema[MAX_LENGTH])
+ if my_len > max_length:
+ message = ('value length (%d) overflows %s %s'
+ % (my_len, MAX_LENGTH, max_length))
+ raise ValueError(self._error_msg(message))
+ if MIN_LENGTH in self.schema:
+ min_length = int(self.schema[MIN_LENGTH])
+ if my_len < min_length:
+ message = ('value length (%d) underflows %s %s'
+ % (my_len, MIN_LENGTH, min_length))
+ raise ValueError(self._error_msg(message))
+ # check valid keys
+ if VALUES in self.schema:
+ allowed = self.schema[VALUES]
+ bad_keys = [k for k in self.parsed if k not in allowed]
+ if bad_keys:
+ message = ('keys %s are not in %s %s'
+ % (bad_keys, VALUES, allowed))
+ raise ValueError(self._error_msg(message))
+
+ def __getitem__(self, key):
+ return self.parsed[key]
+
+ def __iter__(self):
+ return iter(self.parsed)
+
+ def __len__(self):
+ return len(self.parsed)
+
+
class Parameters(collections.Mapping):
'''
The parameters of a stack, with type checking, defaults &c. specified by
p = parameters.Parameter('p', {'Type': 'CommaDelimitedList'})
self.assertTrue(isinstance(p, parameters.CommaDelimitedListParam))
+ def test_new_json(self):
+ p = parameters.Parameter('p', {'Type': 'Json'})
+ self.assertTrue(isinstance(p, parameters.JsonParam))
+
def test_new_bad_type(self):
self.assertRaises(ValueError, parameters.Parameter,
'p', {'Type': 'List'})
else:
self.fail('ValueError not raised')
+ def test_map_value(self):
+ '''Happy path for value thats already a map.'''
+ schema = {'Type': 'Json'}
+ val = {"foo": "bar", "items": [1, 2, 3]}
+ val_s = json.dumps(val)
+ p = parameters.Parameter('p', schema, val)
+ self.assertEqual(val_s, p.value())
+ self.assertEqual(val, p.parsed)
+
+ def test_map_value_bad(self):
+ '''Map value is not JSON parsable.'''
+ schema = {'Type': 'Json',
+ 'ConstraintDescription': 'wibble'}
+ val = {"foo": "bar", "not_json": len}
+ try:
+ parameters.Parameter('p', schema, val)
+ except ValueError as verr:
+ self.assertIn('Value must be valid JSON', str(verr))
+ else:
+ self.fail("Value error not raised")
+
+ def test_map_value_parse(self):
+ '''Happy path for value that's a string.'''
+ schema = {'Type': 'Json'}
+ val = {"foo": "bar", "items": [1, 2, 3]}
+ val_s = json.dumps(val)
+ p = parameters.Parameter('p', schema, val_s)
+ self.assertEqual(val_s, p.value())
+ self.assertEqual(val, p.parsed)
+
+ def test_map_value_bad_parse(self):
+ '''Test value error for unparsable string value.'''
+ schema = {'Type': 'Json',
+ 'ConstraintDescription': 'wibble'}
+ val = "I am not a map"
+ try:
+ parameters.Parameter('p', schema, val)
+ except ValueError as verr:
+ self.assertIn('Value must be valid JSON', str(verr))
+ else:
+ self.fail("Value error not raised")
+
+ def test_map_values_good(self):
+ '''Happy path for map keys.'''
+ schema = {'Type': 'Json',
+ 'AllowedValues': ["foo", "bar", "baz"]}
+ val = {"foo": "bar", "baz": [1, 2, 3]}
+ val_s = json.dumps(val)
+ p = parameters.Parameter('p', schema, val_s)
+ self.assertEqual(val_s, p.value())
+ self.assertEqual(val, p.parsed)
+
+ def test_map_values_bad(self):
+ '''Test failure of invalid map keys.'''
+ schema = {'Type': 'Json',
+ 'AllowedValues': ["foo", "bar", "baz"]}
+ val = {"foo": "bar", "items": [1, 2, 3]}
+ try:
+ parameters.Parameter('p', schema, val)
+ except ValueError as verr:
+ self.assertIn("items", str(verr))
+ else:
+ self.fail("Value error not raised")
+
+ def test_map_underrun(self):
+ '''Test map length under MIN_LEN.'''
+ schema = {'Type': 'Json',
+ 'MinLength': 3}
+ val = {"foo": "bar", "items": [1, 2, 3]}
+ try:
+ parameters.Parameter('p', schema, val)
+ except ValueError as verr:
+ self.assertIn('underflows', str(verr))
+ else:
+ self.fail("Value error not raised")
+
+ def test_map_overrun(self):
+ '''Test map length over MAX_LEN.'''
+ schema = {'Type': 'Json',
+ 'MaxLength': 1}
+ val = {"foo": "bar", "items": [1, 2, 3]}
+ try:
+ parameters.Parameter('p', schema, val)
+ except ValueError as verr:
+ self.assertIn('overflows', str(verr))
+ else:
+ self.fail("Value error not raised")
+
params_schema = json.loads('''{
"Parameters" : {