ipsec_sitecon['dpd_interval'] = dpd.get('interval', 30)
ipsec_sitecon['dpd_timeout'] = dpd.get('timeout', 120)
tenant_id = self._get_tenant_id_for_create(context, ipsec_sitecon)
- if ipsec_sitecon['dpd_timeout'] < ipsec_sitecon['dpd_interval']:
- raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
- attribute_a='dpd_timeout')
+ self._check_dpd(ipsec_sitecon)
with context.session.begin(subtransactions=True):
#Check permissions
self._get_resource(context,
context.session.add(peer_cidr_db)
return self._make_ipsec_site_connection_dict(ipsec_site_conn_db)
+ def _check_dpd(self, ipsec_sitecon):
+ if ipsec_sitecon['dpd_timeout'] <= ipsec_sitecon['dpd_interval']:
+ raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
+ attr='dpd_timeout')
+
def update_ipsec_site_connection(
self, context,
ipsec_site_conn_id, ipsec_site_connection):
- ipsec_sitecon = ipsec_site_connection['ipsec_site_connection']
- dpd = ipsec_sitecon.get('dpd', {})
- if dpd.get('action'):
- ipsec_sitecon['dpd_action'] = dpd.get('action')
- if dpd.get('interval'):
- ipsec_sitecon['dpd_interval'] = dpd.get('interval')
- if dpd.get('timeout'):
- ipsec_sitecon['dpd_timeout'] = dpd.get('timeout')
+ conn = ipsec_site_connection['ipsec_site_connection']
changed_peer_cidrs = False
with context.session.begin(subtransactions=True):
ipsec_site_conn_db = self._get_resource(
context,
IPsecSiteConnection,
ipsec_site_conn_id)
+ dpd = conn.get('dpd', {})
+ if dpd.get('action'):
+ conn['dpd_action'] = dpd.get('action')
+ if dpd.get('interval') or dpd.get('timeout'):
+ conn['dpd_interval'] = dpd.get(
+ 'interval', ipsec_site_conn_db.dpd_interval)
+ conn['dpd_timeout'] = dpd.get(
+ 'timeout', ipsec_site_conn_db.dpd_timeout)
+ self._check_dpd(conn)
+
self.assert_update_allowed(ipsec_site_conn_db)
- if "peer_cidrs" in ipsec_sitecon:
+
+ if "peer_cidrs" in conn:
changed_peer_cidrs = True
old_peer_cidr_list = ipsec_site_conn_db['peer_cidrs']
old_peer_cidr_dict = dict(
(peer_cidr['cidr'], peer_cidr)
for peer_cidr in old_peer_cidr_list)
- new_peer_cidr_set = set(ipsec_sitecon["peer_cidrs"])
+ new_peer_cidr_set = set(conn["peer_cidrs"])
old_peer_cidr_set = set(old_peer_cidr_dict)
new_peer_cidrs = list(new_peer_cidr_set)
cidr=peer_cidr,
ipsec_site_connection_id=ipsec_site_conn_id)
context.session.add(pcidr)
- del ipsec_sitecon["peer_cidrs"]
- if ipsec_sitecon:
- ipsec_site_conn_db.update(ipsec_sitecon)
+ del conn["peer_cidrs"]
+ if conn:
+ ipsec_site_conn_db.update(conn)
result = self._make_ipsec_site_connection_dict(ipsec_site_conn_db)
if changed_peer_cidrs:
result['peer_cidrs'] = new_peer_cidrs
name=name,
dpd_interval=30,
dpd_timeout=20, expected_status_int=400)
+ self._create_ipsec_site_connection(
+ fmt=self.fmt,
+ name=name,
+ dpd_interval=100,
+ dpd_timeout=100, expected_status_int=400)
def test_create_ipsec_site_connection(self, **extras):
"""Test case to create an ipsec_site_connection."""
self.assertEqual(res.status_int, 204)
def test_update_ipsec_site_connection(self):
+ dpd = {'action': 'hold',
+ 'interval': 40,
+ 'timeout': 120}
+ self._test_update_ipsec_site_connection(
+ update={'dpd': dpd}
+ )
+
+ def test_update_ipsec_site_connection_with_invalid_dpd(self):
+ dpd1 = {'action': 'hold',
+ 'interval': 100,
+ 'timeout': 100}
+ self._test_update_ipsec_site_connection(
+ update={'dpd': dpd1},
+ expected_status_int=400)
+ dpd2 = {'action': 'hold',
+ 'interval': 100,
+ 'timeout': 60}
+ self._test_update_ipsec_site_connection(
+ update={'dpd': dpd2},
+ expected_status_int=400)
+ dpd3 = {'action': 'hold',
+ 'interval': -50,
+ 'timeout': -100}
+ self._test_update_ipsec_site_connection(
+ update={'dpd': dpd3},
+ expected_status_int=400)
+
+ def _test_update_ipsec_site_connection(
+ self, update=None, expected_status_int=200):
"""Test case to update a ipsec_site_connection."""
name = 'new_ipsec_site_connection'
ikename = 'ikepolicy1'
keys['admin_state_up'],
description=description
) as ipsec_site_connection:
- data = {'ipsec_site_connection': {'name': name}}
+ if not update:
+ update = {'name': name}
+ data = {'ipsec_site_connection': update}
self._set_active(
vpn_db.IPsecSiteConnection,
ipsec_site_connection['ipsec_site_connection']['id'])
data,
ipsec_site_connection['ipsec_site_connection']['id']
)
- res = self.deserialize(
- self.fmt,
- req.get_response(self.ext_api)
- )
- for k, v in keys.items():
- self.assertEqual(res['ipsec_site_connection'][k], v)
+ res = req.get_response(self.ext_api)
+ self.assertEqual(expected_status_int, res.status_int)
+ if expected_status_int == 200:
+ res_dict = self.deserialize(
+ self.fmt,
+ res
+ )
+ for k, v in update.items():
+ self.assertEqual(
+ res_dict['ipsec_site_connection'][k], v)
def test_update_ipsec_site_connection_with_invalid_state(self):
"""Test case to update an ipsec_site_connection in invalid state."""