dpd_interval=100,
dpd_timeout=100, expected_status_int=400)
- def test_create_ipsec_site_connection(self, **extras):
- """Test case to create an ipsec_site_connection."""
- ikename = "ikepolicy1"
- ipsecname = "ipsecpolicy1"
- vpnsname = "vpnservice1"
- name = "connection1"
- description = "my-ipsec-connection"
- keys = {'name': name,
- 'description': "my-ipsec-connection",
+ def _test_create_ipsec_site_connection(self, key_overrides=None,
+ setup_overrides=None,
+ expected_status_int=200):
+ """Create ipsec_site_connection and check results."""
+ params = {'ikename': 'ikepolicy1',
+ 'ipsecname': 'ipsecpolicy1',
+ 'vpnsname': 'vpnservice1',
+ 'subnet_cidr': '10.2.0.0/24',
+ 'subnet_version': 4}
+ if setup_overrides is not None:
+ params.update(setup_overrides)
+ keys = {'name': 'connection1',
+ 'description': 'my-ipsec-connection',
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
'psk': 'abcd',
'status': 'PENDING_CREATE',
'admin_state_up': True}
+ if key_overrides is not None:
+ keys.update(key_overrides)
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
- keys.update(extras)
with contextlib.nested(
- self.ikepolicy(name=ikename),
- self.ipsecpolicy(name=ipsecname),
- self.subnet(),
+ self.ikepolicy(name=params['ikename']),
+ self.ipsecpolicy(name=params['ipsecname']),
+ self.subnet(cidr=params['subnet_cidr'],
+ ip_version=params['subnet_version']),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
- with self.vpnservice(name=vpnsname, subnet=subnet,
+ with self.vpnservice(name=params['vpnsname'], subnet=subnet,
router=router) as vpnservice1:
keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
keys['ipsecpolicy_id'] = (
keys['vpnservice_id'] = (
vpnservice1['vpnservice']['id']
)
- with self.ipsec_site_connection(
- self.fmt,
- name,
- keys['peer_address'],
- keys['peer_id'],
- keys['peer_cidrs'],
- keys['mtu'],
- keys['psk'],
- keys['initiator'],
- dpd['action'],
- dpd['interval'],
- dpd['timeout'],
- vpnservice1,
- ikepolicy,
- ipsecpolicy,
- keys['admin_state_up'],
- description=description,
- **extras
- ) as ipsec_site_connection:
- self._check_ipsec_site_connection(
- ipsec_site_connection['ipsec_site_connection'],
- keys,
- dpd)
+ try:
+ with self.ipsec_site_connection(
+ self.fmt,
+ keys['name'],
+ keys['peer_address'],
+ keys['peer_id'],
+ keys['peer_cidrs'],
+ keys['mtu'],
+ keys['psk'],
+ keys['initiator'],
+ dpd['action'],
+ dpd['interval'],
+ dpd['timeout'],
+ vpnservice1,
+ ikepolicy,
+ ipsecpolicy,
+ keys['admin_state_up'],
+ description=keys['description']
+ ) as ipsec_site_connection:
+ if expected_status_int != 200:
+ self.fail("Expected failure on create")
+ self._check_ipsec_site_connection(
+ ipsec_site_connection['ipsec_site_connection'],
+ keys,
+ dpd)
+ except webob.exc.HTTPClientError as ce:
+ self.assertEqual(ce.code, expected_status_int)
+
+ def test_create_ipsec_site_connection(self, **extras):
+ """Test case to create an ipsec_site_connection."""
+ self._test_create_ipsec_site_connection(key_overrides=extras)
+
+ def test_create_ipsec_site_connection_invalid_mtu(self):
+ """Test creating an ipsec_site_connection with invalid MTU."""
+ self._test_create_ipsec_site_connection(key_overrides={'mtu': 67},
+ expected_status_int=400)
+ ipv6_overrides = {
+ 'peer_address': 'fe80::c0a8:10a',
+ 'peer_id': 'fe80::c0a8:10a',
+ 'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
+ 'mtu': 1279}
+ ipv6_setup_params = {'subnet_cidr': 'fe80::a01:0/120',
+ 'subnet_version': 6}
+ self._test_create_ipsec_site_connection(
+ key_overrides=ipv6_overrides,
+ setup_overrides=ipv6_setup_params,
+ expected_status_int=400)
def _check_ipsec_site_connection(self, ipsec_site_connection, keys, dpd):
self.assertEqual(
+ keys,
dict((k, v) for k, v
in ipsec_site_connection.items()
- if k in keys), keys)
+ if k in keys))
self.assertEqual(
+ dpd,
dict((k, v) for k, v
in ipsec_site_connection['dpd'].items()
- if k in dpd), dpd)
+ if k in dpd))
def test_delete_ipsec_site_connection(self):
"""Test case to delete a ipsec_site_connection."""
self.assertEqual(res.status_int, 204)
def test_update_ipsec_site_connection(self):
+ """Test case for valid updates to IPSec site connection."""
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
- self._test_update_ipsec_site_connection(
- update={'dpd': dpd}
- )
+ self._test_update_ipsec_site_connection(update={'dpd': dpd})
+ self._test_update_ipsec_site_connection(update={'mtu': 2000})
+ ipv6_settings = {
+ 'peer_address': 'fe80::c0a8:10a',
+ 'peer_id': 'fe80::c0a8:10a',
+ 'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
+ 'subnet_cidr': 'fe80::a02:0/120',
+ 'subnet_version': 6}
+ self._test_update_ipsec_site_connection(update={'mtu': 2000},
+ overrides=ipv6_settings)
def test_update_ipsec_site_connection_with_invalid_dpd(self):
+ """Test updates to ipsec_site_connection with invalid DPD settings."""
dpd1 = {'action': 'hold',
'interval': 100,
'timeout': 100}
update={'dpd': dpd3},
expected_status_int=400)
- def _test_update_ipsec_site_connection(
- self, update=None, expected_status_int=200):
- """Test case to update a ipsec_site_connection."""
- name = 'new_ipsec_site_connection'
- ikename = 'ikepolicy1'
- ipsecname = 'ipsecpolicy1'
- vpnsname = 'vpnservice1'
- description = 'my-ipsec-connection'
+ def test_update_ipsec_site_connection_with_invalid_mtu(self):
+ """Test updates to ipsec_site_connection with invalid MTU settings."""
+ self._test_update_ipsec_site_connection(
+ update={'mtu': 67}, expected_status_int=400)
+ ipv6_settings = {
+ 'peer_address': 'fe80::c0a8:10a',
+ 'peer_id': 'fe80::c0a8:10a',
+ 'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
+ 'subnet_cidr': 'fe80::a02:0/120',
+ 'subnet_version': 6}
+ self._test_update_ipsec_site_connection(
+ update={'mtu': 1279},
+ overrides=ipv6_settings,
+ expected_status_int=400)
+
+ def test_update_ipsec_site_connection_with_invalid_state(self):
+ """Test updating an ipsec_site_connection in invalid state."""
+ self._test_update_ipsec_site_connection(
+ overrides={'make_active': False},
+ expected_status_int=400)
+
+ def test_update_ipsec_site_connection_peer_cidrs(self):
+ """Test updating an ipsec_site_connection for peer_cidrs."""
+ new_peers = {'peer_cidrs': ['192.168.4.0/24',
+ '192.168.5.0/24']}
+ self._test_update_ipsec_site_connection(
+ update=new_peers)
+
+ def _test_update_ipsec_site_connection(self,
+ update={'name': 'new name'},
+ overrides=None,
+ expected_status_int=200):
+ """Creates and then updates ipsec_site_connection."""
keys = {'name': 'new_ipsec_site_connection',
- 'description': "my-ipsec-connection",
+ 'ikename': 'ikepolicy1',
+ 'ipsecname': 'ipsecpolicy1',
+ 'vpnsname': 'vpnservice1',
+ 'description': 'my-ipsec-connection',
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
'tenant_id': self._tenant_id,
'psk': 'abcd',
'status': 'ACTIVE',
- 'admin_state_up': True}
- dpd = {'action': 'hold',
- 'interval': 40,
- 'timeout': 120}
+ 'admin_state_up': True,
+ 'action': 'hold',
+ 'interval': 40,
+ 'timeout': 120,
+ 'subnet_cidr': '10.2.0.0/24',
+ 'subnet_version': 4,
+ 'make_active': True}
+ if overrides is not None:
+ keys.update(overrides)
+
with contextlib.nested(
- self.ikepolicy(name=ikename),
- self.ipsecpolicy(name=ipsecname),
- self.subnet(cidr='10.2.0.0/24'),
- self.router()) as (
- ikepolicy, ipsecpolicy, subnet, router):
- with self.vpnservice(name=vpnsname, subnet=subnet,
+ self.ikepolicy(name=keys['ikename']),
+ self.ipsecpolicy(name=keys['ipsecname']),
+ self.subnet(cidr=keys['subnet_cidr'],
+ ip_version=keys['subnet_version']),
+ self.router()) as (
+ ikepolicy, ipsecpolicy, subnet, router):
+ with self.vpnservice(name=keys['vpnsname'], subnet=subnet,
router=router) as vpnservice1:
- keys['vpnservice_id'] = (
- vpnservice1['vpnservice']['id']
- )
- keys['ikepolicy_id'] = (
- ikepolicy['ikepolicy']['id']
- )
- keys['ipsecpolicy_id'] = (
- ipsecpolicy['ipsecpolicy']['id']
- )
+ keys['vpnservice_id'] = vpnservice1['vpnservice']['id']
+ keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
+ keys['ipsecpolicy_id'] = ipsecpolicy['ipsecpolicy']['id']
with self.ipsec_site_connection(
self.fmt,
- name,
+ keys['name'],
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
keys['mtu'],
keys['psk'],
keys['initiator'],
- dpd['action'],
- dpd['interval'],
- dpd['timeout'],
+ keys['action'],
+ keys['interval'],
+ keys['timeout'],
vpnservice1,
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
- description=description
+ description=keys['description']
) as ipsec_site_connection:
- if not update:
- update = {'name': name}
data = {'ipsec_site_connection': update}
- self._set_active(
- vpn_db.IPsecSiteConnection,
- ipsec_site_connection['ipsec_site_connection']['id'])
+ if keys.get('make_active', None):
+ self._set_active(
+ vpn_db.IPsecSiteConnection,
+ (ipsec_site_connection['ipsec_site_connection']
+ ['id']))
req = self.new_update_request(
'ipsec-site-connections',
data,
- ipsec_site_connection['ipsec_site_connection']['id']
- )
+ ipsec_site_connection['ipsec_site_connection']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(expected_status_int, res.status_int)
if expected_status_int == 200:
- res_dict = self.deserialize(
- self.fmt,
- res
- )
+ res_dict = self.deserialize(self.fmt, res)
for k, v in update.items():
self.assertEqual(
res_dict['ipsec_site_connection'][k], v)
- def test_update_ipsec_site_connection_with_invalid_state(self):
- """Test case to update an ipsec_site_connection in invalid state."""
- name = 'new_ipsec_site_connection'
- ikename = 'ikepolicy1'
- ipsecname = 'ipsecpolicy1'
- vpnsname = 'vpnservice1'
- description = 'my-ipsec-connection'
- keys = {'name': 'new_ipsec_site_connection',
- 'description': "my-ipsec-connection",
- 'peer_address': '192.168.1.10',
- 'peer_id': '192.168.1.10',
- 'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
- 'initiator': 'bi-directional',
- 'mtu': 1500,
- 'tenant_id': self._tenant_id,
- 'psk': 'abcd',
- 'status': 'ACTIVE',
- 'admin_state_up': True}
- dpd = {'action': 'hold',
- 'interval': 40,
- 'timeout': 120}
- with contextlib.nested(
- self.ikepolicy(name=ikename),
- self.ipsecpolicy(name=ipsecname),
- self.subnet(cidr='10.2.0.0/24'),
- self.router()) as (
- ikepolicy, ipsecpolicy, subnet, router):
- with self.vpnservice(name=vpnsname, subnet=subnet,
- router=router) as vpnservice1:
- keys['vpnservice_id'] = (
- vpnservice1['vpnservice']['id']
- )
- keys['ikepolicy_id'] = (
- ikepolicy['ikepolicy']['id']
- )
- keys['ipsecpolicy_id'] = (
- ipsecpolicy['ipsecpolicy']['id']
- )
- with self.ipsec_site_connection(
- self.fmt,
- name,
- keys['peer_address'],
- keys['peer_id'],
- keys['peer_cidrs'],
- keys['mtu'],
- keys['psk'],
- keys['initiator'],
- dpd['action'],
- dpd['interval'],
- dpd['timeout'],
- vpnservice1,
- ikepolicy,
- ipsecpolicy,
- keys['admin_state_up'],
- description=description
- ) as ipsec_site_connection:
- data = {'ipsec_site_connection': {'name': name}}
- req = self.new_update_request(
- 'ipsec-site-connections',
- data,
- ipsec_site_connection['ipsec_site_connection']['id']
- )
- res = req.get_response(self.ext_api)
- self.assertEqual(400, res.status_int)
-
- def test_update_ipsec_site_connection_peer_cidrs(self):
- """Test case to update a ipsec_site_connection for peer_cidrs."""
- name = 'ipsec_site_connection'
- ikename = 'ikepolicy1'
- ipsecname = 'ipsecpolicy1'
- vpnsname = 'vpnservice1'
- description = 'my-ipsec-connection'
- keys = {'name': 'ipsec_site_connection',
- 'description': "my-ipsec-connection",
- 'peer_address': '192.168.1.10',
- 'peer_id': '192.168.1.10',
- 'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
- 'initiator': 'bi-directional',
- 'mtu': 1500,
- 'tenant_id': self._tenant_id,
- 'psk': 'abcd',
- 'status': 'ACTIVE',
- 'admin_state_up': True}
- dpd = {'action': 'hold',
- 'interval': 40,
- 'timeout': 120}
- with contextlib.nested(
- self.ikepolicy(name=ikename),
- self.ipsecpolicy(name=ipsecname),
- self.subnet(cidr='10.2.0.0/24'),
- self.router()) as (
- ikepolicy, ipsecpolicy, subnet, router):
- with self.vpnservice(name=vpnsname, subnet=subnet,
- router=router) as vpnservice1:
- keys['vpnservice_id'] = vpnservice1['vpnservice']['id']
- keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
- keys['ipsecpolicy_id'] = ipsecpolicy['ipsecpolicy']['id']
- with self.ipsec_site_connection(
- self.fmt,
- name,
- keys['peer_address'],
- keys['peer_id'],
- keys['peer_cidrs'],
- keys['mtu'],
- keys['psk'],
- keys['initiator'],
- dpd['action'],
- dpd['interval'],
- dpd['timeout'],
- vpnservice1,
- ikepolicy,
- ipsecpolicy,
- keys['admin_state_up'],
- description=description
- ) as ipsec_site_connection:
- data = {'ipsec_site_connection': {
- 'peer_cidrs': ['192.168.2.0/24',
- '192.168.3.0/24']
- }}
- self._set_active(
- vpn_db.IPsecSiteConnection,
- ipsec_site_connection['ipsec_site_connection']['id'])
- req = self.new_update_request(
- 'ipsec-site-connections',
- data,
- ipsec_site_connection[
- 'ipsec_site_connection']['id']
- )
- res = self.deserialize(
- self.fmt,
- req.get_response(self.ext_api)
- )
- self._check_ipsec_site_connection(
- res['ipsec_site_connection'],
- keys,
- dpd)
-
def test_show_ipsec_site_connection(self):
"""Test case to show a ipsec_site_connection."""
ikename = "ikepolicy1"