from neutron.db import l3_db
from neutron.db import model_base
from neutron.db import models_v2
+from neutron.db.vpn import vpn_validator
from neutron.extensions import vpnaas
from neutron import manager
from neutron.openstack.common import excutils
LOG = logging.getLogger(__name__)
-IP_MIN_MTU = {4: 68, 6: 1280}
-
class IPsecPeerCidr(model_base.BASEV2):
"""Internal representation of a IPsec Peer Cidrs."""
"""Do the initialization for the vpn service plugin here."""
qdbapi.register_models()
+ def _get_validator(self):
+ """Obtain validator to use for attribute validation.
+
+ Subclasses may override this with a different valdiator, as needed.
+ Note: some UTs will directly create a VPNPluginDb object and then
+ call its methods, instead of creating a VPNDriverPlugin, which
+ will have a service driver associated that will provide a
+ validator object. As a result, we use the reference validator here.
+ """
+ return vpn_validator.VpnReferenceValidator()
+
def update_status(self, context, model, v_id, status):
with context.session.begin(subtransactions=True):
v_db = self._get_resource(context, model, v_id)
return self._fields(res, fields)
+ def _get_subnet_ip_version(self, context, vpnservice_id):
+ vpn_service_db = self._get_vpnservice(context, vpnservice_id)
+ subnet = vpn_service_db.subnet['cidr']
+ ip_version = netaddr.IPNetwork(subnet).version
+ return ip_version
+
def create_ipsec_site_connection(self, context, ipsec_site_connection):
ipsec_sitecon = ipsec_site_connection['ipsec_site_connection']
- dpd = ipsec_sitecon['dpd']
- ipsec_sitecon['dpd_action'] = dpd.get('action', 'hold')
- ipsec_sitecon['dpd_interval'] = dpd.get('interval', 30)
- ipsec_sitecon['dpd_timeout'] = dpd.get('timeout', 120)
+ validator = self._get_validator()
+ validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon)
tenant_id = self._get_tenant_id_for_create(context, ipsec_sitecon)
- self._check_dpd(ipsec_sitecon)
with context.session.begin(subtransactions=True):
#Check permissions
self._get_resource(context,
self._get_resource(context,
IPsecPolicy,
ipsec_sitecon['ipsecpolicy_id'])
- self._check_mtu(context,
- ipsec_sitecon['mtu'],
- ipsec_sitecon['vpnservice_id'])
+ vpnservice_id = ipsec_sitecon['vpnservice_id']
+ ip_version = self._get_subnet_ip_version(context, vpnservice_id)
+ validator.validate_ipsec_site_connection(context,
+ ipsec_sitecon,
+ ip_version)
ipsec_site_conn_db = IPsecSiteConnection(
id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
dpd_timeout=ipsec_sitecon['dpd_timeout'],
admin_state_up=ipsec_sitecon['admin_state_up'],
status=constants.PENDING_CREATE,
- vpnservice_id=ipsec_sitecon['vpnservice_id'],
+ vpnservice_id=vpnservice_id,
ikepolicy_id=ipsec_sitecon['ikepolicy_id'],
ipsecpolicy_id=ipsec_sitecon['ipsecpolicy_id']
)
context.session.add(peer_cidr_db)
return self._make_ipsec_site_connection_dict(ipsec_site_conn_db)
- def _check_dpd(self, ipsec_sitecon):
- if ipsec_sitecon['dpd_timeout'] <= ipsec_sitecon['dpd_interval']:
- raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
- attr='dpd_timeout')
-
- def _check_mtu(self, context, mtu, vpnservice_id):
- vpn_service_db = self._get_vpnservice(context, vpnservice_id)
- subnet = vpn_service_db.subnet['cidr']
- version = netaddr.IPNetwork(subnet).version
- if mtu < IP_MIN_MTU[version]:
- raise vpnaas.IPsecSiteConnectionMtuError(mtu=mtu, version=version)
-
def update_ipsec_site_connection(
self, context,
ipsec_site_conn_id, ipsec_site_connection):
- conn = ipsec_site_connection['ipsec_site_connection']
+ ipsec_sitecon = ipsec_site_connection['ipsec_site_connection']
changed_peer_cidrs = False
+ validator = self._get_validator()
with context.session.begin(subtransactions=True):
ipsec_site_conn_db = self._get_resource(
context,
IPsecSiteConnection,
ipsec_site_conn_id)
- dpd = conn.get('dpd', {})
- if dpd.get('action'):
- conn['dpd_action'] = dpd.get('action')
- if dpd.get('interval') or dpd.get('timeout'):
- conn['dpd_interval'] = dpd.get(
- 'interval', ipsec_site_conn_db.dpd_interval)
- conn['dpd_timeout'] = dpd.get(
- 'timeout', ipsec_site_conn_db.dpd_timeout)
- self._check_dpd(conn)
-
- if 'mtu' in conn:
- self._check_mtu(context,
- conn['mtu'],
- ipsec_site_conn_db.vpnservice_id)
-
+ vpnservice_id = ipsec_site_conn_db['vpnservice_id']
+ ip_version = self._get_subnet_ip_version(context, vpnservice_id)
+ validator.assign_sensible_ipsec_sitecon_defaults(
+ ipsec_sitecon, ipsec_site_conn_db)
+ validator.validate_ipsec_site_connection(
+ context,
+ ipsec_sitecon,
+ ip_version)
self.assert_update_allowed(ipsec_site_conn_db)
- if "peer_cidrs" in conn:
+ if "peer_cidrs" in ipsec_sitecon:
changed_peer_cidrs = True
old_peer_cidr_list = ipsec_site_conn_db['peer_cidrs']
old_peer_cidr_dict = dict(
(peer_cidr['cidr'], peer_cidr)
for peer_cidr in old_peer_cidr_list)
- new_peer_cidr_set = set(conn["peer_cidrs"])
+ new_peer_cidr_set = set(ipsec_sitecon["peer_cidrs"])
old_peer_cidr_set = set(old_peer_cidr_dict)
new_peer_cidrs = list(new_peer_cidr_set)
cidr=peer_cidr,
ipsec_site_connection_id=ipsec_site_conn_id)
context.session.add(pcidr)
- del conn["peer_cidrs"]
- if conn:
- ipsec_site_conn_db.update(conn)
+ del ipsec_sitecon["peer_cidrs"]
+ if ipsec_sitecon:
+ ipsec_site_conn_db.update(ipsec_sitecon)
result = self._make_ipsec_site_connection_dict(ipsec_site_conn_db)
if changed_peer_cidrs:
result['peer_cidrs'] = new_peer_cidrs
'status': vpnservice['status']}
return self._fields(res, fields)
- def _check_router(self, context, router_id):
- l3_plugin = manager.NeutronManager.get_service_plugins().get(
- constants.L3_ROUTER_NAT)
- router = l3_plugin.get_router(context, router_id)
- if not router.get(l3_db.EXTERNAL_GW_INFO):
- raise vpnaas.RouterIsNotExternal(router_id=router_id)
-
- def _check_subnet_id(self, context, router_id, subnet_id):
- core_plugin = manager.NeutronManager.get_plugin()
- ports = core_plugin.get_ports(
- context,
- filters={
- 'fixed_ips': {'subnet_id': [subnet_id]},
- 'device_id': [router_id]})
- if not ports:
- raise vpnaas.SubnetIsNotConnectedToRouter(
- subnet_id=subnet_id,
- router_id=router_id)
-
def create_vpnservice(self, context, vpnservice):
vpns = vpnservice['vpnservice']
tenant_id = self._get_tenant_id_for_create(context, vpns)
- self._check_router(context, vpns['router_id'])
- self._check_subnet_id(context, vpns['router_id'], vpns['subnet_id'])
+ validator = self._get_validator()
with context.session.begin(subtransactions=True):
+ validator.validate_vpnservice(context, vpns)
vpnservice_db = VPNService(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=vpns['name'],
--- /dev/null
+# Copyright 2014 Cisco Systems, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Paul Michali, Cisco Systems, Inc.
+
+from neutron.db import l3_db
+from neutron.extensions import vpnaas
+from neutron import manager
+from neutron.plugins.common import constants
+
+
+class VpnReferenceValidator(object):
+
+ """Baseline validation routines for VPN resources."""
+
+ IP_MIN_MTU = {4: 68, 6: 1280}
+
+ @property
+ def l3_plugin(self):
+ try:
+ return self._l3_plugin
+ except AttributeError:
+ self._l3_plugin = manager.NeutronManager.get_service_plugins().get(
+ constants.L3_ROUTER_NAT)
+ return self._l3_plugin
+
+ @property
+ def core_plugin(self):
+ try:
+ return self._core_plugin
+ except AttributeError:
+ self._core_plugin = manager.NeutronManager.get_plugin()
+ return self._core_plugin
+
+ def _check_dpd(self, ipsec_sitecon):
+ """Ensure that DPD timeout is greater than DPD interval."""
+ if ipsec_sitecon['dpd_timeout'] <= ipsec_sitecon['dpd_interval']:
+ raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
+ attr='dpd_timeout')
+
+ def _check_mtu(self, context, mtu, ip_version):
+ if mtu < VpnReferenceValidator.IP_MIN_MTU[ip_version]:
+ raise vpnaas.IPsecSiteConnectionMtuError(mtu=mtu,
+ version=ip_version)
+
+ def assign_sensible_ipsec_sitecon_defaults(self, ipsec_sitecon,
+ prev_conn=None):
+ """Provide defaults for optional items, if missing.
+
+ Flatten the nested DPD information, and set default values for
+ any missing information. For connection updates, the previous
+ values will be used as defaults for any missing items.
+ """
+ if not prev_conn:
+ prev_conn = {'dpd_action': 'hold',
+ 'dpd_interval': 30,
+ 'dpd_timeout': 120}
+ dpd = ipsec_sitecon.get('dpd', {})
+ ipsec_sitecon['dpd_action'] = dpd.get('action',
+ prev_conn['dpd_action'])
+ ipsec_sitecon['dpd_interval'] = dpd.get('interval',
+ prev_conn['dpd_interval'])
+ ipsec_sitecon['dpd_timeout'] = dpd.get('timeout',
+ prev_conn['dpd_timeout'])
+
+ def validate_ipsec_site_connection(self, context, ipsec_sitecon,
+ ip_version):
+ """Reference implementation of validation for IPSec connection."""
+ self._check_dpd(ipsec_sitecon)
+ mtu = ipsec_sitecon.get('mtu')
+ if mtu:
+ self._check_mtu(context, mtu, ip_version)
+
+ def _check_router(self, context, router_id):
+ router = self.l3_plugin.get_router(context, router_id)
+ if not router.get(l3_db.EXTERNAL_GW_INFO):
+ raise vpnaas.RouterIsNotExternal(router_id=router_id)
+
+ def _check_subnet_id(self, context, router_id, subnet_id):
+ ports = self.core_plugin.get_ports(
+ context,
+ filters={
+ 'fixed_ips': {'subnet_id': [subnet_id]},
+ 'device_id': [router_id]})
+ if not ports:
+ raise vpnaas.SubnetIsNotConnectedToRouter(
+ subnet_id=subnet_id,
+ router_id=router_id)
+
+ def validate_vpnservice(self, context, vpnservice):
+ self._check_router(context, vpnservice['router_id'])
+ self._check_subnet_id(context, vpnservice['router_id'],
+ vpnservice['subnet_id'])
vpnservice = None
return self._get_driver_for_vpnservice(vpnservice)
+ def _get_validator(self):
+ return self.ipsec_driver.validator
+
def create_ipsec_site_connection(self, context, ipsec_site_connection):
ipsec_site_connection = super(
VPNDriverPlugin, self).create_ipsec_site_connection(
import six
from neutron.common import rpc as n_rpc
+from neutron.db.vpn import vpn_validator
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
@six.add_metaclass(abc.ABCMeta)
class VpnDriver(object):
- def __init__(self, service_plugin):
+ def __init__(self, service_plugin, validator=None):
self.service_plugin = service_plugin
+ if validator is None:
+ validator = vpn_validator.VpnReferenceValidator()
+ self.validator = validator
@property
def service_type(self):
if k in expected),
expected)
- def test_create_vpnservice_with_invalid_router(self):
- """Test case to create a vpnservice with other tenant's router"""
- with self.network(
- set_context=True,
- tenant_id='tenant_a') as network:
- with self.subnet(network=network,
- cidr='10.2.0.0/24') as subnet:
- with self.router(
- set_context=True, tenant_id='tenant_a') as router:
- router_id = router['router']['id']
- subnet_id = subnet['subnet']['id']
- self._create_vpnservice(
- self.fmt, 'fake',
- True, router_id, subnet_id,
- expected_res_status=webob.exc.HTTPNotFound.code,
- set_context=True, tenant_id='tenant_b')
-
- def test_create_vpnservice_with_router_no_external_gateway(self):
- """Test case to create a vpnservice with inner router"""
- error_code = 0
- with self.subnet(cidr='10.2.0.0/24') as subnet:
- with self.router() as router:
- router_id = router['router']['id']
- try:
- with self.vpnservice(subnet=subnet,
- router=router,
- external_router=False):
- pass
- except webob.exc.HTTPClientError as e:
- error_code, error_detail = (
- e.status_code, e.detail['NeutronError']['message'])
- self.assertEqual(400, error_code)
- msg = str(vpnaas.RouterIsNotExternal(router_id=router_id))
- self.assertEqual(msg, error_detail)
-
- def test_create_vpnservice_with_nonconnected_subnet(self):
- """Test case to create a vpnservice with nonconnected subnet."""
- with self.network() as network:
- with self.subnet(network=network,
- cidr='10.2.0.0/24') as subnet:
- with self.router() as router:
- router_id = router['router']['id']
- subnet_id = subnet['subnet']['id']
- self._create_vpnservice(
- self.fmt, 'fake',
- True, router_id, subnet_id,
- expected_res_status=webob.exc.HTTPBadRequest.code)
-
- def test_delete_router_in_use_by_vpnservice(self):
- """Test delete router in use by vpn service."""
- with self.subnet(cidr='10.2.0.0/24') as subnet:
- with self.router() as router:
- with self.vpnservice(subnet=subnet,
- router=router):
- self._delete('routers', router['router']['id'],
- expected_code=webob.exc.HTTPConflict.code)
-
def test_update_vpnservice(self):
"""Test case to update a vpnservice."""
name = 'new_vpnservice1'
self._create_ipsec_site_connection(
fmt=self.fmt,
name=name, initiator='unsupported', expected_status_int=400)
- self._create_ipsec_site_connection(
- fmt=self.fmt,
- name=name,
- dpd_interval=30,
- dpd_timeout=20, expected_status_int=400)
- self._create_ipsec_site_connection(
- fmt=self.fmt,
- name=name,
- dpd_interval=100,
- dpd_timeout=100, expected_status_int=400)
def _test_create_ipsec_site_connection(self, key_overrides=None,
setup_overrides=None,
"""Test case to create an ipsec_site_connection."""
self._test_create_ipsec_site_connection(key_overrides=extras)
- def test_create_ipsec_site_connection_invalid_mtu(self):
- """Test creating an ipsec_site_connection with invalid MTU."""
- self._test_create_ipsec_site_connection(key_overrides={'mtu': 67},
- expected_status_int=400)
- ipv6_overrides = {
- 'peer_address': 'fe80::c0a8:10a',
- 'peer_id': 'fe80::c0a8:10a',
- 'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
- 'mtu': 1279}
- ipv6_setup_params = {'subnet_cidr': 'fe80::a01:0/120',
- 'subnet_version': 6}
- self._test_create_ipsec_site_connection(
- key_overrides=ipv6_overrides,
- setup_overrides=ipv6_setup_params,
- expected_status_int=400)
-
def test_delete_ipsec_site_connection(self):
"""Test case to delete a ipsec_site_connection."""
with self.ipsec_site_connection(
self._test_update_ipsec_site_connection(update={'mtu': 2000},
overrides=ipv6_settings)
- def test_update_ipsec_site_connection_with_invalid_dpd(self):
- """Test updates to ipsec_site_connection with invalid DPD settings."""
- dpd1 = {'action': 'hold',
- 'interval': 100,
- 'timeout': 100}
- self._test_update_ipsec_site_connection(
- update={'dpd': dpd1},
- expected_status_int=400)
- dpd2 = {'action': 'hold',
- 'interval': 100,
- 'timeout': 60}
- self._test_update_ipsec_site_connection(
- update={'dpd': dpd2},
- expected_status_int=400)
- dpd3 = {'action': 'hold',
- 'interval': -50,
- 'timeout': -100}
- self._test_update_ipsec_site_connection(
- update={'dpd': dpd3},
- expected_status_int=400)
-
- def test_update_ipsec_site_connection_with_invalid_mtu(self):
- """Test updates to ipsec_site_connection with invalid MTU settings."""
- self._test_update_ipsec_site_connection(
- update={'mtu': 67}, expected_status_int=400)
- ipv6_settings = {
- 'peer_address': 'fe80::c0a8:10a',
- 'peer_id': 'fe80::c0a8:10a',
- 'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
- 'subnet_cidr': 'fe80::a02:0/120',
- 'subnet_version': 6}
- self._test_update_ipsec_site_connection(
- update={'mtu': 1279},
- overrides=ipv6_settings,
- expected_status_int=400)
-
def test_update_ipsec_site_connection_with_invalid_state(self):
"""Test updating an ipsec_site_connection in invalid state."""
self._test_update_ipsec_site_connection(
# under the License.
import mock
+# TODO(pcm): Uncomment once oslo.messaging 1.4.0.0a2 or newer is available
+# from oslo.config import cfg
-from neutron import context
+from neutron import context as n_ctx
+from neutron.db import l3_db
+from neutron.db.vpn import vpn_validator
+from neutron.extensions import vpnaas
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
+# TODO(pcm): Uncomment once oslo.messaging 1.4.0.0a2 or newer is available
+# from neutron.services.vpn import plugin as vpn_plugin
from neutron.services.vpn.service_drivers import ipsec as ipsec_driver
from neutron.tests import base
_uuid = uuidutils.generate_uuid
+FAKE_SERVICE_ID = _uuid()
FAKE_VPN_CONNECTION = {
- 'vpnservice_id': _uuid()
+ 'vpnservice_id': FAKE_SERVICE_ID
}
+FAKE_ROUTER_ID = _uuid()
FAKE_VPN_SERVICE = {
- 'router_id': _uuid()
+ 'router_id': FAKE_ROUTER_ID
}
FAKE_HOST = 'fake_host'
+FAKE_ROUTER = {l3_db.EXTERNAL_GW_INFO: FAKE_ROUTER_ID}
+FAKE_SUBNET_ID = _uuid()
+IPV4 = 4
+IPV6 = 6
+
+IPSEC_SERVICE_DRIVER = ('neutron.services.vpn.service_drivers.'
+ 'ipsec.IPsecVPNDriver')
+
+# TODO(pcm): Uncomment, once oslo.messaging package 1.4.0.0a2 or
+# newer is released and available for Neutron.
+#
+# class TestValidatorSelection(base.BaseTestCase):
+#
+# def setUp(self):
+# super(TestValidatorSelection, self).setUp()
+# vpnaas_provider = (constants.VPN + ':vpnaas:' +
+# IPSEC_SERVICE_DRIVER + ':default')
+# cfg.CONF.set_override('service_provider',
+# [vpnaas_provider],
+# 'service_providers')
+# mock.patch('neutron.common.rpc.create_connection').start()
+# self.vpn_plugin = vpn_plugin.VPNDriverPlugin()
+#
+# def test_reference_driver_used(self):
+# self.assertIsInstance(self.vpn_plugin._get_validator(),
+# vpn_validator.VpnReferenceValidator)
+
+
+class TestIPsecDriverValidation(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestIPsecDriverValidation, self).setUp()
+ self.l3_plugin = mock.Mock()
+ mock.patch(
+ 'neutron.manager.NeutronManager.get_service_plugins',
+ return_value={constants.L3_ROUTER_NAT: self.l3_plugin}).start()
+ self.core_plugin = mock.Mock()
+ mock.patch('neutron.manager.NeutronManager.get_plugin',
+ return_value=self.core_plugin).start()
+ self.context = n_ctx.Context('some_user', 'some_tenant')
+ self.validator = vpn_validator.VpnReferenceValidator()
+
+ def test_non_public_router_for_vpn_service(self):
+ """Failure test of service validate, when router missing ext. I/F."""
+ self.l3_plugin.get_router.return_value = {} # No external gateway
+ vpnservice = {'router_id': 123, 'subnet_id': 456}
+ self.assertRaises(vpnaas.RouterIsNotExternal,
+ self.validator.validate_vpnservice,
+ self.context, vpnservice)
+
+ def test_subnet_not_connected_for_vpn_service(self):
+ """Failure test of service validate, when subnet not on router."""
+ self.l3_plugin.get_router.return_value = FAKE_ROUTER
+ self.core_plugin.get_ports.return_value = None
+ vpnservice = {'router_id': FAKE_ROUTER_ID, 'subnet_id': FAKE_SUBNET_ID}
+ self.assertRaises(vpnaas.SubnetIsNotConnectedToRouter,
+ self.validator.validate_vpnservice,
+ self.context, vpnservice)
+
+ def test_defaults_for_ipsec_site_connections_on_create(self):
+ """Check that defaults are applied correctly.
+
+ MTU has a default and will always be present on create.
+ However, the DPD settings do not have a default, so
+ database create method will assign default values for any
+ missing. In addition, the DPD dict will be flattened
+ for storage into the database, so we'll do it as part of
+ assigning defaults.
+ """
+ ipsec_sitecon = {}
+ self.validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon)
+ expected = {
+ 'dpd_action': 'hold',
+ 'dpd_timeout': 120,
+ 'dpd_interval': 30
+ }
+ self.assertEqual(expected, ipsec_sitecon)
+
+ ipsec_sitecon = {'dpd': {'interval': 50}}
+ self.validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon)
+ expected = {
+ 'dpd': {'interval': 50},
+ 'dpd_action': 'hold',
+ 'dpd_timeout': 120,
+ 'dpd_interval': 50
+ }
+ self.assertEqual(expected, ipsec_sitecon)
+
+ def test_defaults_for_ipsec_site_connections_on_update(self):
+ """Check that defaults are used for any values not specified."""
+ ipsec_sitecon = {}
+ prev_connection = {'dpd_action': 'clear',
+ 'dpd_timeout': 500,
+ 'dpd_interval': 250}
+ self.validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon,
+ prev_connection)
+ expected = {
+ 'dpd_action': 'clear',
+ 'dpd_timeout': 500,
+ 'dpd_interval': 250
+ }
+ self.assertEqual(expected, ipsec_sitecon)
+
+ ipsec_sitecon = {'dpd': {'timeout': 200}}
+ prev_connection = {'dpd_action': 'clear',
+ 'dpd_timeout': 500,
+ 'dpd_interval': 100}
+ self.validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon,
+ prev_connection)
+ expected = {
+ 'dpd': {'timeout': 200},
+ 'dpd_action': 'clear',
+ 'dpd_timeout': 200,
+ 'dpd_interval': 100
+ }
+ self.assertEqual(expected, ipsec_sitecon)
+
+ def test_bad_dpd_settings_on_create(self):
+ """Failure tests of DPD settings for IPSec conn during create."""
+ ipsec_sitecon = {'mtu': 1500, 'dpd_action': 'hold',
+ 'dpd_interval': 100, 'dpd_timeout': 100}
+ self.assertRaises(vpnaas.IPsecSiteConnectionDpdIntervalValueError,
+ self.validator.validate_ipsec_site_connection,
+ self.context, ipsec_sitecon, IPV4)
+ ipsec_sitecon = {'mtu': 1500, 'dpd_action': 'hold',
+ 'dpd_interval': 100, 'dpd_timeout': 99}
+ self.assertRaises(vpnaas.IPsecSiteConnectionDpdIntervalValueError,
+ self.validator.validate_ipsec_site_connection,
+ self.context, ipsec_sitecon, IPV4)
+
+ def test_bad_dpd_settings_on_update(self):
+ """Failure tests of DPD settings for IPSec conn. during update.
+
+ Note: On an update, the user may specify only some of the DPD settings.
+ Previous values will be assigned for any missing items, so by the
+ time the validation occurs, all items will be available for checking.
+ The MTU may not be provided, during validation and will be ignored,
+ if that is the case.
+ """
+ prev_connection = {'mtu': 2000,
+ 'dpd_action': 'hold',
+ 'dpd_interval': 100,
+ 'dpd_timeout': 120}
+ ipsec_sitecon = {'dpd': {'interval': 120}}
+ self.validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon,
+ prev_connection)
+ self.assertRaises(vpnaas.IPsecSiteConnectionDpdIntervalValueError,
+ self.validator.validate_ipsec_site_connection,
+ self.context, ipsec_sitecon, IPV4)
+
+ prev_connection = {'mtu': 2000,
+ 'dpd_action': 'hold',
+ 'dpd_interval': 100,
+ 'dpd_timeout': 120}
+ ipsec_sitecon = {'dpd': {'timeout': 99}}
+ self.validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon,
+ prev_connection)
+ self.assertRaises(vpnaas.IPsecSiteConnectionDpdIntervalValueError,
+ self.validator.validate_ipsec_site_connection,
+ self.context, ipsec_sitecon, IPV4)
+
+ def test_bad_mtu_for_ipsec_connection(self):
+ """Failure test of invalid MTU values for IPSec conn create/update."""
+ ip_version_limits = vpn_validator.VpnReferenceValidator.IP_MIN_MTU
+ for version, limit in ip_version_limits.items():
+ ipsec_sitecon = {'mtu': limit - 1,
+ 'dpd_action': 'hold',
+ 'dpd_interval': 100,
+ 'dpd_timeout': 120}
+ self.assertRaises(
+ vpnaas.IPsecSiteConnectionMtuError,
+ self.validator.validate_ipsec_site_connection,
+ self.context, ipsec_sitecon, version)
class TestIPsecDriver(base.BaseTestCase):
self.driver = ipsec_driver.IPsecVPNDriver(service_plugin)
def _test_update(self, func, args):
- ctxt = context.Context('', 'somebody')
+ ctxt = n_ctx.Context('', 'somebody')
with mock.patch.object(self.driver.agent_rpc, 'cast') as cast:
func(ctxt, *args)
cast.assert_called_once_with(
from neutron.common import constants
from neutron import context
+from neutron.db.vpn import vpn_validator
from neutron import manager
from neutron.plugins.common import constants as p_constants
from neutron.services.vpn.service_drivers import ipsec as ipsec_driver
class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
test_agent_scheduler.AgentSchedulerTestMixIn,
test_agent_ext_plugin.AgentDBTestMixIn):
+
def setUp(self):
self.adminContext = context.get_admin_context()
driver_cls_p = mock.patch(
driver_cls = driver_cls_p.start()
self.driver = mock.Mock()
self.driver.service_type = ipsec_driver.IPSEC
+ self.driver.validator = vpn_validator.VpnReferenceValidator()
driver_cls.return_value = self.driver
super(TestVPNDriverPlugin, self).setUp(
vpnaas_plugin=VPN_DRIVER_CLASS)