# License for the specific language governing permissions and limitations
# under the License.
-import netaddr
-from netaddr import core as net_exc
-from neutron.common import exceptions
from neutron.common import rpc as n_rpc
-from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
-from neutron.plugins.common import constants
from neutron.services.vpn.common import topics
from neutron.services.vpn import service_drivers
from neutron.services.vpn.service_drivers import cisco_csr_db as csr_id_map
+from neutron.services.vpn.service_drivers import cisco_validator
LOG = logging.getLogger(__name__)
IPSEC = 'ipsec'
BASE_IPSEC_VERSION = '1.0'
-LIFETIME_LIMITS = {'IKE Policy': {'min': 60, 'max': 86400},
- 'IPSec Policy': {'min': 120, 'max': 2592000}}
-MIN_CSR_MTU = 1500
-MAX_CSR_MTU = 9192
-
-
-class CsrValidationFailure(exceptions.BadRequest):
- message = _("Cisco CSR does not support %(resource)s attribute %(key)s "
- "with value '%(value)s'")
class CiscoCsrIPsecVpnDriverCallBack(n_rpc.RpcCallback):
"""Cisco CSR VPN Service Driver class for IPsec."""
def __init__(self, service_plugin):
- super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin)
+ super(CiscoCsrIPsecVPNDriver, self).__init__(
+ service_plugin,
+ cisco_validator.CiscoCsrVpnValidator(service_plugin))
self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)]
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(
def service_type(self):
return IPSEC
- def validate_lifetime(self, for_policy, policy_info):
- """Ensure lifetime in secs and value is supported, based on policy."""
- units = policy_info['lifetime']['units']
- if units != 'seconds':
- raise CsrValidationFailure(resource=for_policy,
- key='lifetime:units',
- value=units)
- value = policy_info['lifetime']['value']
- if (value < LIFETIME_LIMITS[for_policy]['min'] or
- value > LIFETIME_LIMITS[for_policy]['max']):
- raise CsrValidationFailure(resource=for_policy,
- key='lifetime:value',
- value=value)
-
- def validate_ike_version(self, policy_info):
- """Ensure IKE policy is v1 for current REST API."""
- version = policy_info['ike_version']
- if version != 'v1':
- raise CsrValidationFailure(resource='IKE Policy',
- key='ike_version',
- value=version)
-
- def validate_mtu(self, conn_info):
- """Ensure the MTU value is supported."""
- mtu = conn_info['mtu']
- if mtu < MIN_CSR_MTU or mtu > MAX_CSR_MTU:
- raise CsrValidationFailure(resource='IPSec Connection',
- key='mtu',
- value=mtu)
-
- def validate_public_ip_present(self, vpn_service):
- """Ensure there is one gateway IP specified for the router used."""
- gw_port = vpn_service.router.gw_port
- if not gw_port or len(gw_port.fixed_ips) != 1:
- raise CsrValidationFailure(resource='IPSec Connection',
- key='router:gw_port:ip_address',
- value='missing')
-
- def validate_peer_id(self, ipsec_conn):
- """Ensure that an IP address is specified for peer ID."""
- # TODO(pcm) Should we check peer_address too?
- peer_id = ipsec_conn['peer_id']
- try:
- netaddr.IPAddress(peer_id)
- except net_exc.AddrFormatError:
- raise CsrValidationFailure(resource='IPSec Connection',
- key='peer_id', value=peer_id)
-
- def validate_ipsec_connection(self, context, ipsec_conn, vpn_service):
- """Validate attributes w.r.t. Cisco CSR capabilities."""
- ike_policy = self.service_plugin.get_ikepolicy(
- context, ipsec_conn['ikepolicy_id'])
- ipsec_policy = self.service_plugin.get_ipsecpolicy(
- context, ipsec_conn['ipsecpolicy_id'])
- self.validate_lifetime('IKE Policy', ike_policy)
- self.validate_lifetime('IPSec Policy', ipsec_policy)
- self.validate_ike_version(ike_policy)
- self.validate_mtu(ipsec_conn)
- self.validate_public_ip_present(vpn_service)
- self.validate_peer_id(ipsec_conn)
- LOG.debug(_("IPSec connection %s validated for Cisco CSR"),
- ipsec_conn['id'])
-
def create_ipsec_site_connection(self, context, ipsec_site_connection):
vpnservice = self.service_plugin._get_vpnservice(
context, ipsec_site_connection['vpnservice_id'])
- try:
- self.validate_ipsec_connection(context, ipsec_site_connection,
- vpnservice)
- except CsrValidationFailure:
- with excutils.save_and_reraise_exception():
- self.service_plugin.update_ipsec_site_conn_status(
- context, ipsec_site_connection['id'], constants.ERROR)
csr_id_map.create_tunnel_mapping(context, ipsec_site_connection)
self.agent_rpc.vpnservice_updated(context, vpnservice['router_id'],
reason='ipsec-conn-create')
--- /dev/null
+# Copyright 2014 Cisco Systems, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Paul Michali, Cisco Systems, Inc.
+
+import netaddr
+from netaddr import core as net_exc
+
+from neutron.common import exceptions
+from neutron.db.vpn import vpn_validator
+from neutron.openstack.common import log as logging
+
+
+LIFETIME_LIMITS = {'IKE Policy': {'min': 60, 'max': 86400},
+ 'IPSec Policy': {'min': 120, 'max': 2592000}}
+MIN_CSR_MTU = 1500
+MAX_CSR_MTU = 9192
+
+LOG = logging.getLogger(__name__)
+
+
+class CsrValidationFailure(exceptions.BadRequest):
+ message = _("Cisco CSR does not support %(resource)s attribute %(key)s "
+ "with value '%(value)s'")
+
+
+class CiscoCsrVpnValidator(vpn_validator.VpnReferenceValidator):
+
+ """Validator methods for the Cisco CSR."""
+
+ def __init__(self, service_plugin):
+ self.service_plugin = service_plugin
+ super(CiscoCsrVpnValidator, self).__init__()
+
+ def validate_lifetime(self, for_policy, policy_info):
+ """Ensure lifetime in secs and value is supported, based on policy."""
+ units = policy_info['lifetime']['units']
+ if units != 'seconds':
+ raise CsrValidationFailure(resource=for_policy,
+ key='lifetime:units',
+ value=units)
+ value = policy_info['lifetime']['value']
+ if (value < LIFETIME_LIMITS[for_policy]['min'] or
+ value > LIFETIME_LIMITS[for_policy]['max']):
+ raise CsrValidationFailure(resource=for_policy,
+ key='lifetime:value',
+ value=value)
+
+ def validate_ike_version(self, policy_info):
+ """Ensure IKE policy is v1 for current REST API."""
+ version = policy_info['ike_version']
+ if version != 'v1':
+ raise CsrValidationFailure(resource='IKE Policy',
+ key='ike_version',
+ value=version)
+
+ def validate_mtu(self, conn_info):
+ """Ensure the MTU value is supported."""
+ mtu = conn_info['mtu']
+ if mtu < MIN_CSR_MTU or mtu > MAX_CSR_MTU:
+ raise CsrValidationFailure(resource='IPSec Connection',
+ key='mtu',
+ value=mtu)
+
+ def validate_public_ip_present(self, vpn_service):
+ """Ensure there is one gateway IP specified for the router used."""
+ gw_port = vpn_service.router.gw_port
+ if not gw_port or len(gw_port.fixed_ips) != 1:
+ raise CsrValidationFailure(resource='IPSec Connection',
+ key='router:gw_port:ip_address',
+ value='missing')
+
+ def validate_peer_id(self, ipsec_conn):
+ """Ensure that an IP address is specified for peer ID."""
+ # TODO(pcm) Should we check peer_address too?
+ peer_id = ipsec_conn['peer_id']
+ try:
+ netaddr.IPAddress(peer_id)
+ except net_exc.AddrFormatError:
+ raise CsrValidationFailure(resource='IPSec Connection',
+ key='peer_id', value=peer_id)
+
+ def validate_ipsec_site_connection(self, context, ipsec_sitecon,
+ ip_version):
+ """Validate IPSec site connection for Cisco CSR.
+
+ After doing reference validation, do additional checks that relate
+ to the Cisco CSR.
+ """
+ super(CiscoCsrVpnValidator, self)._check_dpd(ipsec_sitecon)
+
+ ike_policy = self.service_plugin.get_ikepolicy(
+ context, ipsec_sitecon['ikepolicy_id'])
+ ipsec_policy = self.service_plugin.get_ipsecpolicy(
+ context, ipsec_sitecon['ipsecpolicy_id'])
+ vpn_service = self.service_plugin.get_vpnservice(
+ context, ipsec_sitecon['vpnservice_id'])
+ self.validate_lifetime('IKE Policy', ike_policy)
+ self.validate_lifetime('IPSec Policy', ipsec_policy)
+ self.validate_ike_version(ike_policy)
+ self.validate_mtu(ipsec_sitecon)
+ self.validate_public_ip_present(vpn_service)
+ self.validate_peer_id(ipsec_sitecon)
+ LOG.debug("IPSec connection %s validated for Cisco CSR",
+ ipsec_sitecon['id'])
# under the License.
import mock
-
+# from oslo.config import cfg
from neutron import context as n_ctx
from neutron.db import api as dbapi
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
+# from neutron.services.vpn import plugin as vpn_plugin
from neutron.services.vpn.service_drivers import cisco_csr_db as csr_db
from neutron.services.vpn.service_drivers import cisco_ipsec as ipsec_driver
+from neutron.services.vpn.service_drivers import cisco_validator as validator
from neutron.tests import base
_uuid = uuidutils.generate_uuid
FAKE_VPN_CONN_ID = _uuid()
-
FAKE_VPN_CONNECTION = {
'vpnservice_id': _uuid(),
'id': FAKE_VPN_CONN_ID,
'ipsecpolicy_id': _uuid(),
'tenant_id': _uuid()
}
+
+FAKE_SERVICE_ID = _uuid()
+FAKE_VPN_CONNECTION = {
+ 'vpnservice_id': FAKE_SERVICE_ID
+}
+
+FAKE_ROUTER_ID = _uuid()
FAKE_VPN_SERVICE = {
- 'router_id': _uuid()
+ 'router_id': FAKE_ROUTER_ID
}
+
FAKE_HOST = 'fake_host'
+IPV4 = 4
+
+CISCO_IPSEC_SERVICE_DRIVER = ('neutron.services.vpn.service_drivers.'
+ 'cisco_ipsec.CiscoCsrIPsecVPNDriver')
+
+
+# class TestCiscoValidatorSelection(base.BaseTestCase):
+#
+# def setUp(self):
+# super(TestCiscoValidatorSelection, self).setUp()
+# vpnaas_provider = (constants.VPN + ':vpnaas:' +
+# CISCO_IPSEC_SERVICE_DRIVER + ':default')
+# cfg.CONF.set_override('service_provider',
+# [vpnaas_provider],
+# 'service_providers')
+# mock.patch('neutron.common.rpc.create_connection').start()
+# self.vpn_plugin = vpn_plugin.VPNDriverPlugin()
+#
+# def test_reference_driver_used(self):
+# self.assertIsInstance(self.vpn_plugin._get_validator(),
+# validator.CiscoCsrVpnValidator)
class TestCiscoIPsecDriverValidation(base.BaseTestCase):
def setUp(self):
super(TestCiscoIPsecDriverValidation, self).setUp()
mock.patch('neutron.common.rpc.create_connection').start()
- self.service_plugin = mock.Mock()
- self.driver = ipsec_driver.CiscoCsrIPsecVPNDriver(self.service_plugin)
+ self.l3_plugin = mock.Mock()
+ mock.patch(
+ 'neutron.manager.NeutronManager.get_service_plugins',
+ return_value={constants.L3_ROUTER_NAT: self.l3_plugin}).start()
+ self.core_plugin = mock.Mock()
+ mock.patch('neutron.manager.NeutronManager.get_plugin',
+ return_value=self.core_plugin).start()
self.context = n_ctx.Context('some_user', 'some_tenant')
self.vpn_service = mock.Mock()
+ self.service_plugin = mock.Mock()
+ self.validator = validator.CiscoCsrVpnValidator(self.service_plugin)
def test_ike_version_unsupported(self):
"""Failure test that Cisco CSR REST API does not support IKE v2."""
policy_info = {'ike_version': 'v2',
'lifetime': {'units': 'seconds', 'value': 60}}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_ike_version, policy_info)
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_ike_version,
+ policy_info)
def test_ike_lifetime_not_in_seconds(self):
"""Failure test of unsupported lifetime units for IKE policy."""
policy_info = {'lifetime': {'units': 'kilobytes', 'value': 1000}}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_lifetime,
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_lifetime,
"IKE Policy", policy_info)
def test_ipsec_lifetime_not_in_seconds(self):
"""Failure test of unsupported lifetime units for IPSec policy."""
policy_info = {'lifetime': {'units': 'kilobytes', 'value': 1000}}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_lifetime,
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_lifetime,
"IPSec Policy", policy_info)
def test_ike_lifetime_seconds_values_at_limits(self):
"""Test valid lifetime values for IKE policy."""
policy_info = {'lifetime': {'units': 'seconds', 'value': 60}}
- self.driver.validate_lifetime('IKE Policy', policy_info)
+ self.validator.validate_lifetime('IKE Policy', policy_info)
policy_info = {'lifetime': {'units': 'seconds', 'value': 86400}}
- self.driver.validate_lifetime('IKE Policy', policy_info)
+ self.validator.validate_lifetime('IKE Policy', policy_info)
def test_ipsec_lifetime_seconds_values_at_limits(self):
"""Test valid lifetime values for IPSec policy."""
policy_info = {'lifetime': {'units': 'seconds', 'value': 120}}
- self.driver.validate_lifetime('IPSec Policy', policy_info)
+ self.validator.validate_lifetime('IPSec Policy', policy_info)
policy_info = {'lifetime': {'units': 'seconds', 'value': 2592000}}
- self.driver.validate_lifetime('IPSec Policy', policy_info)
+ self.validator.validate_lifetime('IPSec Policy', policy_info)
def test_ike_lifetime_values_invalid(self):
"""Failure test of unsupported lifetime values for IKE policy."""
which = "IKE Policy"
policy_info = {'lifetime': {'units': 'seconds', 'value': 59}}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_lifetime,
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_lifetime,
which, policy_info)
policy_info = {'lifetime': {'units': 'seconds', 'value': 86401}}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_lifetime,
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_lifetime,
which, policy_info)
def test_ipsec_lifetime_values_invalid(self):
"""Failure test of unsupported lifetime values for IPSec policy."""
which = "IPSec Policy"
policy_info = {'lifetime': {'units': 'seconds', 'value': 119}}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_lifetime,
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_lifetime,
which, policy_info)
policy_info = {'lifetime': {'units': 'seconds', 'value': 2592001}}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_lifetime,
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_lifetime,
which, policy_info)
def test_ipsec_connection_with_mtu_at_limits(self):
"""Test IPSec site-to-site connection with MTU at limits."""
conn_info = {'mtu': 1500}
- self.driver.validate_mtu(conn_info)
+ self.validator.validate_mtu(conn_info)
conn_info = {'mtu': 9192}
- self.driver.validate_mtu(conn_info)
+ self.validator.validate_mtu(conn_info)
def test_ipsec_connection_with_invalid_mtu(self):
"""Failure test of IPSec site connection with unsupported MTUs."""
conn_info = {'mtu': 1499}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_mtu, conn_info)
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_mtu, conn_info)
conn_info = {'mtu': 9193}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_mtu, conn_info)
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_mtu, conn_info)
def simulate_gw_ip_available(self):
"""Helper function indicating that tunnel has a gateway IP."""
def test_have_public_ip_for_router(self):
"""Ensure that router for IPSec connection has gateway IP."""
self.simulate_gw_ip_available()
- self.driver.validate_public_ip_present(self.vpn_service)
+ self.validator.validate_public_ip_present(self.vpn_service)
def test_router_with_missing_gateway_ip(self):
"""Failure test of IPSec connection with missing gateway IP."""
self.simulate_gw_ip_available()
self.vpn_service.router.gw_port = None
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_public_ip_present,
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_public_ip_present,
self.vpn_service)
def test_peer_id_is_an_ip_address(self):
"""Ensure peer ID is an IP address for IPsec connection create."""
- ipsec_conn = {'peer_id': '10.10.10.10'}
- self.driver.validate_peer_id(ipsec_conn)
+ ipsec_sitecon = {'peer_id': '10.10.10.10'}
+ self.validator.validate_peer_id(ipsec_sitecon)
def test_peer_id_is_not_ip_address(self):
"""Failure test of peer_id that is not an IP address."""
- ipsec_conn = {'peer_id': 'some-site.com'}
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.validate_peer_id, ipsec_conn)
+ ipsec_sitecon = {'peer_id': 'some-site.com'}
+ self.assertRaises(validator.CsrValidationFailure,
+ self.validator.validate_peer_id, ipsec_sitecon)
def test_validation_for_create_ipsec_connection(self):
"""Ensure all validation passes for IPSec site connection create."""
self.simulate_gw_ip_available()
- # Provide the minimum needed items to validate
- ipsec_conn = {'id': '1',
- 'ikepolicy_id': '123',
- 'ipsecpolicy_id': '2',
- 'mtu': 1500,
- 'peer_id': '10.10.10.10'}
self.service_plugin.get_ikepolicy = mock.Mock(
return_value={'ike_version': 'v1',
'lifetime': {'units': 'seconds', 'value': 60}})
self.service_plugin.get_ipsecpolicy = mock.Mock(
return_value={'lifetime': {'units': 'seconds', 'value': 120}})
- self.driver.validate_ipsec_connection(self.context, ipsec_conn,
- self.vpn_service)
+ self.service_plugin.get_vpnservice = mock.Mock(
+ return_value=self.vpn_service)
+ # Provide the minimum needed items to validate
+ ipsec_sitecon = {'id': '1',
+ 'vpnservice_id': FAKE_SERVICE_ID,
+ 'ikepolicy_id': '123',
+ 'ipsecpolicy_id': '2',
+ 'mtu': 1500,
+ 'peer_id': '10.10.10.10'}
+ # Using defaults for DPD info
+ expected = {'dpd_action': 'hold',
+ 'dpd_interval': 30,
+ 'dpd_timeout': 120}
+ expected.update(ipsec_sitecon)
+ self.validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon)
+ self.validator.validate_ipsec_site_connection(self.context,
+ ipsec_sitecon, IPV4)
+ self.assertEqual(expected, ipsec_sitecon)
class TestCiscoIPsecDriverMapping(base.BaseTestCase):
}
self.db_update_mock = service_plugin.update_ipsec_site_conn_status
self.driver = ipsec_driver.CiscoCsrIPsecVPNDriver(service_plugin)
- self.driver.validate_ipsec_connection = mock.Mock()
mock.patch.object(csr_db, 'create_tunnel_mapping').start()
self.context = n_ctx.Context('some_user', 'some_tenant')
[FAKE_VPN_CONNECTION],
{'reason': 'ipsec-conn-create'})
- def test_failure_validation_ipsec_connection(self):
- """Failure test of validation during IPSec site connection create.
-
- Simulate a validation failure, and ensure that database is
- updated to indicate connection is in error state.
-
- TODO(pcm): FUTURE - remove test case, once vendor plugin
- validation is done before database commit.
- """
- self.driver.validate_ipsec_connection.side_effect = (
- ipsec_driver.CsrValidationFailure(resource='IPSec Connection',
- key='mtu', value=1000))
- self.assertRaises(ipsec_driver.CsrValidationFailure,
- self.driver.create_ipsec_site_connection,
- self.context, FAKE_VPN_CONNECTION)
- self.db_update_mock.assert_called_with(self.context,
- FAKE_VPN_CONN_ID,
- constants.ERROR)
-
def test_update_ipsec_site_connection(self):
self._test_update(self.driver.update_ipsec_site_connection,
[FAKE_VPN_CONNECTION, FAKE_VPN_CONNECTION],