SHARED = 'shared'
-def _verify_dict_keys(expected_keys, target_dict):
+def _verify_dict_keys(expected_keys, target_dict, strict=True):
+ """ Allows to verify keys in a dictionary.
+ :param expected_keys: A list of keys expected to be present.
+ :param target_dict: The dictionary which should be verified.
+ :param strict: Specifies whether additional keys are allowed to be present.
+ :return: True, if keys in the dictionary correspond to the specification.
+ """
if not isinstance(target_dict, dict):
msg = (_("Invalid input. '%(target_dict)s' must be a dictionary "
"with keys: %(expected_keys)s") %
dict(target_dict=target_dict, expected_keys=expected_keys))
return msg
- provided_keys = target_dict.keys()
- if set(expected_keys) != set(provided_keys):
- msg = (_("Expected keys not found. Expected: '%(expected_keys)s', "
- "Provided: '%(provided_keys)s'") %
- dict(expected_keys=expected_keys, provided_keys=provided_keys))
+ expected_keys = set(expected_keys)
+ provided_keys = set(target_dict.keys())
+
+ predicate = expected_keys.__eq__ if strict else expected_keys.issubset
+
+ if not predicate(provided_keys):
+ msg = (_("Validation of dictionary's keys failed."
+ "Expected keys: %(expected_keys)s "
+ "Provided keys: %(provided_keys)s") % locals())
return msg
return msg
-def _validate_dict(data, valid_values=None):
+def _validate_dict(data, key_specs=None):
if not isinstance(data, dict):
msg = _("'%s' is not a dictionary") % data
LOG.debug(msg)
return msg
+ # Do not perform any further validation, if no constraints are supplied
+ if not key_specs:
+ return
+
+ # Check whether all required keys are present
+ required_keys = [key for key, spec in key_specs.iteritems()
+ if spec.get('required')]
+
+ if required_keys:
+ msg = _verify_dict_keys(required_keys, data, False)
+ if msg:
+ LOG.debug(msg)
+ return msg
+
+ # Perform validation of all values according to the specifications.
+ for key, key_validator in [(k, v) for k, v in key_specs.iteritems()
+ if k in data]:
+
+ for val_name in [n for n in key_validator.iterkeys()
+ if n.startswith('type:')]:
+ # Check whether specified validator exists.
+ if val_name not in validators:
+ msg = _("Validator '%s' does not exist.") % val_name
+ LOG.debug(msg)
+ return msg
+
+ val_func = validators[val_name]
+ val_params = key_validator[val_name]
+
+ msg = val_func(data.get(key), val_params)
+ if msg:
+ LOG.debug(msg)
+ return msg
+
+
+def _validate_dict_or_none(data, key_specs=None):
+ if data is not None:
+ return _validate_dict(data, key_specs)
+
+
+def _validate_dict_or_empty(data, key_specs=None):
+ if data != {}:
+ return _validate_dict(data, key_specs)
+
def _validate_non_negative(data, valid_values=None):
try:
# Dictionary that maintains a list of validation functions
validators = {'type:dict': _validate_dict,
+ 'type:dict_or_none': _validate_dict_or_none,
+ 'type:dict_or_empty': _validate_dict_or_empty,
'type:fixed_ips': _validate_fixed_ips,
'type:hostroutes': _validate_hostroutes,
'type:ip_address': _validate_ip_address,
'admin_state_up': vip['admin_state_up'],
'status': vip['status']}
if vip['session_persistence']:
- res['session_persistence'] = {
- 'type': vip['session_persistence']['type'],
- 'cookie_name': vip['session_persistence']['cookie_name']
+ s_p = {
+ 'type': vip['session_persistence']['type']
}
+
+ if vip['session_persistence']['type'] == 'APP_COOKIE':
+ s_p['cookie_name'] = vip['session_persistence']['cookie_name']
+
+ res['session_persistence'] = s_p
+
return self._fields(res, fields)
def _update_pool_vip_info(self, context, pool_id, vip_id):
with context.session.begin(subtransactions=True):
pool_db.update({'vip_id': vip_id})
+ def _check_session_persistence_info(self, info):
+ """ Performs sanity check on session persistence info.
+ :param info: Session persistence info
+ """
+ if info['type'] == 'APP_COOKIE':
+ if not info.get('cookie_name'):
+ raise ValueError(_("'cookie_name' should be specified for this"
+ " type of session persistence."))
+ else:
+ if 'cookie_name' in info:
+ raise ValueError(_("'cookie_name' is not allowed for this type"
+ " of session persistence"))
+
+ def _create_session_persistence_db(self, session_info, vip_id):
+ self._check_session_persistence_info(session_info)
+
+ sesspersist_db = SessionPersistence(
+ type=session_info['type'],
+ cookie_name=session_info.get('cookie_name'),
+ vip_id=vip_id)
+ return sesspersist_db
+
def _update_vip_session_persistence(self, context, vip_id, info):
+ self._check_session_persistence_info(info)
+
vip = self._get_resource(context, Vip, vip_id)
with context.session.begin(subtransactions=True):
# Update sessionPersistence table
sess_qry = context.session.query(SessionPersistence)
sesspersist_db = sess_qry.filter_by(vip_id=vip_id).first()
+
+ # Insert a None cookie_info if it is not present to overwrite an
+ # an existing value in the database.
+ if 'cookie_name' not in info:
+ info['cookie_name'] = None
+
if sesspersist_db:
sesspersist_db.update(info)
else:
connection_limit=v['connection_limit'],
admin_state_up=v['admin_state_up'],
status=constants.PENDING_CREATE)
+
vip_id = vip_db['id']
+ session_info = v['session_persistence']
- sessionInfo = v['session_persistence']
- if sessionInfo:
- has_session_persistence = True
- sesspersist_db = SessionPersistence(
- type=sessionInfo['type'],
- cookie_name=sessionInfo['cookie_name'],
- vip_id=vip_id)
- vip_db.session_persistence = sesspersist_db
+ if session_info:
+ s_p = self._create_session_persistence_db(session_info, vip_id)
+ vip_db.session_persistence = s_p
context.session.add(vip_db)
self._update_pool_vip_info(context, v['pool_id'], vip_id)
'session_persistence': {'allow_post': True, 'allow_put': True,
'convert_to': attr.convert_none_to_empty_dict,
'default': {},
- 'validate': {'type:dict': None},
+ 'validate': {
+ 'type:dict_or_empty': {
+ 'type': {'type:values': ['APP_COOKIE',
+ 'HTTP_COOKIE',
+ 'SOURCE_IP'],
+ 'required': True},
+ 'cookie_name': {'type:string': None,
+ 'required': False}}},
'is_visible': True},
'connection_limit': {'allow_post': True, 'allow_put': True,
'default': -1,
('address', "172.16.1.123"),
('port', 80),
('protocol', 'HTTP'),
- ('session_persistence', {'type': "HTTP_COOKIE",
- 'cookie_name': "jessionId"}),
+ ('session_persistence', {'type': "HTTP_COOKIE"}),
('connection_limit', -1),
('admin_state_up', True),
('status', 'PENDING_CREATE')]
with self.vip(name=name,
- session_persistence={'type': "HTTP_COOKIE",
- 'cookie_name': "jessionId"}) as vip:
+ session_persistence={'type': "HTTP_COOKIE"}) as vip:
for k, v in keys:
self.assertEqual(vip['vip'][k], v)
+ def test_create_vip_with_session_persistence_with_app_cookie(self):
+ name = 'vip7'
+ keys = [('name', name),
+ ('subnet_id', self._subnet_id),
+ ('address', "172.16.1.123"),
+ ('port', 80),
+ ('protocol', 'HTTP'),
+ ('session_persistence', {'type': "APP_COOKIE",
+ 'cookie_name': 'sessionId'}),
+ ('connection_limit', -1),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+
+ with self.vip(name=name,
+ session_persistence={'type': "APP_COOKIE",
+ 'cookie_name': 'sessionId'}) as vip:
+ for k, v in keys:
+ self.assertEqual(vip['vip'][k], v)
+
+ def test_create_vip_with_session_persistence_unsupported_type(self):
+ name = 'vip5'
+
+ vip = self.vip(name=name, session_persistence={'type': "UNSUPPORTED"})
+ self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
+
+ def test_create_vip_with_unnecessary_cookie_name(self):
+ name = 'vip8'
+
+ s_p = {'type': "SOURCE_IP", 'cookie_name': 'sessionId'}
+ vip = self.vip(name=name, session_persistence=s_p)
+
+ self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
+
+ def test_create_vip_with_session_persistence_without_cookie_name(self):
+ name = 'vip6'
+
+ vip = self.vip(name=name, session_persistence={'type': "APP_COOKIE"})
+ self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
+
def test_reset_session_persistence(self):
name = 'vip4'
- session_persistence = {'type': "HTTP_COOKIE",
- 'cookie_name': "cookie_name"}
+ session_persistence = {'type': "HTTP_COOKIE"}
update_info = {'vip': {'session_persistence': None}}
data = {'vip': {'name': name,
'connection_limit': 100,
'session_persistence':
- {'type': "HTTP_COOKIE",
+ {'type': "APP_COOKIE",
'cookie_name': "jesssionId"},
'admin_state_up': False}}
req = self.new_update_request('vips', data, vip['vip']['id'])
class TestAttributes(unittest2.TestCase):
+ def _construct_dict_and_constraints(self):
+ """ Constructs a test dictionary and a definition of constraints.
+ :return: A (dictionary, constraint) tuple
+ """
+ constraints = {'key1': {'type:values': ['val1', 'val2'],
+ 'required': True},
+ 'key2': {'type:string': None,
+ 'required': False},
+ 'key3': {'type:dict': {'k4': {'type:string': None,
+ 'required': True}},
+ 'required': True}}
+
+ dictionary = {'key1': 'val1',
+ 'key2': 'a string value',
+ 'key3': {'k4': 'a string value'}}
+
+ return dictionary, constraints
+
def test_is_attr_set(self):
data = attributes.ATTR_NOT_SPECIFIED
self.assertIs(attributes.is_attr_set(data), False)
msg = attributes._validate_uuid_list(uuid_list)
self.assertEquals(msg, None)
- def test_validate_dict(self):
+ def test_validate_dict_type(self):
for value in (None, True, '1', []):
self.assertEquals(attributes._validate_dict(value),
"'%s' is not a dictionary" % value)
+ def test_validate_dict_without_constraints(self):
msg = attributes._validate_dict({})
self.assertIsNone(msg)
+ # Validate a dictionary without constraints.
msg = attributes._validate_dict({'key': 'value'})
self.assertIsNone(msg)
+ def test_validate_a_valid_dict_with_constraints(self):
+ dictionary, constraints = self._construct_dict_and_constraints()
+
+ msg = attributes._validate_dict(dictionary, constraints)
+ self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
+
+ def test_validate_dict_with_invalid_validator(self):
+ dictionary, constraints = self._construct_dict_and_constraints()
+
+ constraints['key1'] = {'type:unsupported': None, 'required': True}
+ msg = attributes._validate_dict(dictionary, constraints)
+ self.assertEqual(msg, "Validator 'type:unsupported' does not exist.")
+
+ def test_validate_dict_not_required_keys(self):
+ dictionary, constraints = self._construct_dict_and_constraints()
+
+ del dictionary['key2']
+ msg = attributes._validate_dict(dictionary, constraints)
+ self.assertIsNone(msg, 'Field that was not required by the specs was'
+ 'required by the validator.')
+
+ def test_validate_dict_required_keys(self):
+ dictionary, constraints = self._construct_dict_and_constraints()
+
+ del dictionary['key1']
+ msg = attributes._validate_dict(dictionary, constraints)
+ self.assertIn('Expected keys:', msg, 'The error was not detected.')
+
+ def test_validate_dict_wrong_values(self):
+ dictionary, constraints = self._construct_dict_and_constraints()
+
+ dictionary['key1'] = 'UNSUPPORTED'
+ msg = attributes._validate_dict(dictionary, constraints)
+ self.assertIsNotNone(msg)
+
+ def test_subdictionary(self):
+ dictionary, constraints = self._construct_dict_and_constraints()
+
+ del dictionary['key3']['k4']
+ dictionary['key3']['k5'] = 'a string value'
+ msg = attributes._validate_dict(dictionary, constraints)
+ self.assertIn('Expected keys:', msg, 'The error was not detected.')
+
+ def test_validate_dict_or_none(self):
+ dictionary, constraints = self._construct_dict_and_constraints()
+
+ # Check whether None is a valid value.
+ msg = attributes._validate_dict_or_none(None, constraints)
+ self.assertIsNone(msg, 'Validation of a None dictionary failed.')
+
+ # Check validation of a regular dictionary.
+ msg = attributes._validate_dict_or_none(dictionary, constraints)
+ self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
+
+ def test_validate_dict_or_empty(self):
+ dictionary, constraints = self._construct_dict_and_constraints()
+
+ # Check whether an empty dictionary is valid.
+ msg = attributes._validate_dict_or_empty({}, constraints)
+ self.assertIsNone(msg, 'Validation of a None dictionary failed.')
+
+ # Check validation of a regular dictionary.
+ msg = attributes._validate_dict_or_none(dictionary, constraints)
+ self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
+ self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
+
def test_validate_non_negative(self):
for value in (-1, '-2'):
self.assertEquals(attributes._validate_non_negative(value),
self.assertEqual(
[], attributes.convert_none_to_empty_list(None))
+ def test_convert_none_to_empty_dict(self):
+ self.assertEqual(
+ {}, attributes.convert_none_to_empty_dict(None))
+
def test_convert_none_to_empty_list_value(self):
values = ['1', 3, [], [1], {}, {'a': 3}]
for value in values:
'port': 80,
'protocol': 'HTTP',
'pool_id': _uuid(),
- 'session_persistence': {'type': 'dummy'},
+ 'session_persistence': {'type': 'HTTP_COOKIE'},
'connection_limit': 100,
'admin_state_up': True,
'tenant_id': _uuid()}}