# username = <user>
# password = <password>
# timeout = <timeout>
+# host = <hostname>
+# tunnel_if = <tunnel I/F>
#
# where:
# public IP ----- Public IP address of router used with a VPN service (1:1 with CSR)
# tunnel IP ----- Public IP address of the CSR used for the IPSec tunnel
-# mgmt port IP -- IP address of CSR for REST API access (not console port)
+# mgmt port IP -- IP address of CSR for REST API access
# user ---------- Username for REST management port access to Cisco CSR
# password ------ Password for REST management port access to Cisco CSR
# timeout ------- REST request timeout to Cisco CSR (optional)
+# hostname ------ Name of host where CSR is running as a VM
+# tunnel I/F ---- CSR port name used for tunnels' IP address
"""REST CsrRestClient for accessing the Cisco Cloud Services Router."""
def __init__(self, settings):
- self.host = settings['rest_mgmt']
- self.tunnel_ip = settings['tunnel_ip']
+ self.port = str(settings.get('protocol_port', 55443))
+ self.host = ':'.join([settings.get('rest_mgmt_ip', ''), self.port])
+ self.tunnel_ip = settings.get('external_ip', '')
self.auth = (settings['username'], settings['password'])
+ self.tunnel_if_name = settings.get('tunnel_if_name', '')
self.token = None
self.status = requests.codes.OK
self.timeout = settings.get('timeout')
return self.post_request(URI_VPN_IKE_KEYRINGS, payload=psk_info)
def create_ipsec_connection(self, connection_info):
- base_conn_info = {u'vpn-type': u'site-to-site',
- u'ip-version': u'ipv4'}
+ base_conn_info = {
+ u'vpn-type': u'site-to-site',
+ u'ip-version': u'ipv4',
+ u'local-device': {
+ u'tunnel-ip-address': self.tunnel_ip,
+ u'ip-address': self.tunnel_if_name
+ }
+ }
connection_info.update(base_conn_info)
return self.post_request(URI_VPN_SITE_TO_SITE,
payload=connection_info)
import collections
import requests
-import netaddr
from oslo.config import cfg
from oslo import messaging
import six
"attribute %(attr)s of %(resource)s")
-def find_available_csrs_from_config(config_files):
- """Read INI for available Cisco CSRs that driver can use.
-
- Loads management port, tunnel IP, user, and password information for
- available CSRs from configuration file. Driver will use this info to
- configure VPN connections. The CSR is associated 1:1 with a Neutron
- router. To identify which CSR to use for a VPN service, the public
- (GW) IP of the Neutron router will be used as an index into the CSR
- config info.
- """
- multi_parser = cfg.MultiConfigParser()
- LOG.info(_("Scanning config files %s for Cisco CSR configurations"),
- config_files)
- try:
- read_ok = multi_parser.read(config_files)
- except cfg.ParseError as pe:
- LOG.error(_("Config file parse error: %s"), pe)
- return {}
-
- if len(read_ok) != len(config_files):
- raise cfg.Error(_("Unable to parse config files %s for Cisco CSR "
- "info") % config_files)
- csrs_found = {}
- for parsed_file in multi_parser.parsed:
- for parsed_item in parsed_file.keys():
- device_type, sep, for_router = parsed_item.partition(':')
- if device_type.lower() == 'cisco_csr_rest':
- try:
- netaddr.IPNetwork(for_router)
- except netaddr.core.AddrFormatError:
- LOG.error(_("Ignoring Cisco CSR configuration entry - "
- "router IP %s is not valid"), for_router)
- continue
- entry = parsed_file[parsed_item]
- # Check for missing fields
- try:
- rest_mgmt_ip = entry['rest_mgmt'][0]
- tunnel_ip = entry['tunnel_ip'][0]
- username = entry['username'][0]
- password = entry['password'][0]
- except KeyError as ke:
- LOG.error(_("Ignoring Cisco CSR for router %(router)s "
- "- missing %(field)s setting"),
- {'router': for_router, 'field': str(ke)})
- continue
- # Validate fields
- try:
- timeout = float(entry['timeout'][0])
- except ValueError:
- LOG.error(_("Ignoring Cisco CSR for router %s - "
- "timeout is not a floating point number"),
- for_router)
- continue
- except KeyError:
- timeout = csr_client.TIMEOUT
- try:
- netaddr.IPAddress(rest_mgmt_ip)
- except netaddr.core.AddrFormatError:
- LOG.error(_("Ignoring Cisco CSR for subnet %s - "
- "REST management is not an IP address"),
- for_router)
- continue
- try:
- netaddr.IPAddress(tunnel_ip)
- except netaddr.core.AddrFormatError:
- LOG.error(_("Ignoring Cisco CSR for router %s - "
- "local tunnel is not an IP address"),
- for_router)
- continue
- csrs_found[for_router] = {'rest_mgmt': rest_mgmt_ip,
- 'tunnel_ip': tunnel_ip,
- 'username': username,
- 'password': password,
- 'timeout': timeout}
-
- LOG.debug(_("Found CSR for router %(router)s: %(info)s"),
- {'router': for_router,
- 'info': csrs_found[for_router]})
- return csrs_found
-
-
class CiscoCsrIPsecVpnDriverApi(n_rpc.RpcProxy):
"""RPC API for agent to plugin messaging."""
self.report_status, context)
self.periodic_report.start(
interval=agent.conf.cisco_csr_ipsec.status_check_interval)
-
- csrs_found = find_available_csrs_from_config(cfg.CONF.config_file)
- if csrs_found:
- LOG.info(_("Loaded %(num)d Cisco CSR configuration%(plural)s"),
- {'num': len(csrs_found),
- 'plural': 's'[len(csrs_found) == 1:]})
- else:
- raise SystemExit(_('No Cisco CSR configurations found in: %s') %
- cfg.CONF.config_file)
- self.csrs = dict([(k, csr_client.CsrRestClient(v))
- for k, v in csrs_found.items()])
+ LOG.debug("Device driver initialized for %s", node_topic)
def vpnservice_updated(self, context, **kwargs):
"""Handle VPNaaS service driver change notifications."""
def create_vpn_service(self, service_data):
"""Create new entry to track VPN service and its connections."""
+ csr = csr_client.CsrRestClient(service_data['router_info'])
vpn_service_id = service_data['id']
- vpn_service_router = service_data['external_ip']
self.service_state[vpn_service_id] = CiscoCsrVpnService(
- service_data, self.csrs.get(vpn_service_router))
+ service_data, csr)
return self.service_state[vpn_service_id]
def update_connection(self, context, vpn_service_id, conn_data):
def update_service(self, context, service_data):
"""Handle notification for a single VPN Service and its connections."""
vpn_service_id = service_data['id']
- csr_id = service_data['external_ip']
- if csr_id not in self.csrs:
- LOG.error(_("Update: Skipping VPN service %(service)s as it's "
- "router (%(csr_id)s is not associated with a Cisco "
- "CSR"), {'service': vpn_service_id, 'csr_id': csr_id})
- return
-
if vpn_service_id in self.service_state:
LOG.debug(_("Update: Existing VPN service %s detected"),
vpn_service_id)
else:
LOG.debug(_("Update: New VPN service %s detected"), vpn_service_id)
vpn_service = self.create_vpn_service(service_data)
+ if not vpn_service:
+ return
vpn_service.is_dirty = False
vpn_service.connections_removed = False
def create_site_connection_info(self, site_conn_id, ipsec_policy_id,
conn_info):
"""Collect/create attributes needed for the IPSec connection."""
- # TODO(pcm) Enable, once CSR is embedded as a Neutron router
- # gw_ip = vpnservice['external_ip'] (need to pass in)
mtu = conn_info['mtu']
return {
u'vpn-interface-name': site_conn_id,
u'ipsec-policy-id': ipsec_policy_id,
- u'local-device': {
- # TODO(pcm): FUTURE - Get CSR port of interface with
- # local subnet
- u'ip-address': u'GigabitEthernet3',
- # TODO(pcm): FUTURE - Get IP address of router's public
- # I/F, once CSR is used as embedded router.
- u'tunnel-ip-address': self.csr.tunnel_ip
- # u'tunnel-ip-address': u'%s' % gw_ip
- },
u'remote-device': {
u'tunnel-ip-address': conn_info['peer_address']
},
validator = vpn_validator.VpnReferenceValidator()
self.validator = validator
+ @property
+ def l3_plugin(self):
+ return manager.NeutronManager.get_service_plugins().get(
+ constants.L3_ROUTER_NAT)
+
@property
def service_type(self):
pass
class BaseIPsecVpnAgentApi(n_rpc.RpcProxy):
"""Base class for IPSec API to agent."""
- def __init__(self, to_agent_topic, topic, default_version):
- self.to_agent_topic = to_agent_topic
+ def __init__(self, topic, default_version, driver):
+ self.topic = topic
+ self.driver = driver
super(BaseIPsecVpnAgentApi, self).__init__(topic, default_version)
def _agent_notification(self, context, method, router_id,
dispatch notification for the agent.
"""
admin_context = context.is_admin and context or context.elevated()
- plugin = manager.NeutronManager.get_service_plugins().get(
- constants.L3_ROUTER_NAT)
if not version:
version = self.RPC_API_VERSION
- l3_agents = plugin.get_l3_agents_hosting_routers(
+ l3_agents = self.driver.l3_plugin.get_l3_agents_hosting_routers(
admin_context, [router_id],
admin_state_up=True,
active=True)
for l3_agent in l3_agents:
LOG.debug(_('Notify agent at %(topic)s.%(host)s the message '
'%(method)s %(args)s'),
- {'topic': self.to_agent_topic,
+ {'topic': self.topic,
'host': l3_agent.host,
'method': method,
'args': kwargs})
self.cast(
context, self.make_msg(method, **kwargs),
version=version,
- topic='%s.%s' % (self.to_agent_topic, l3_agent.host))
+ topic='%s.%s' % (self.topic, l3_agent.host))
def vpnservice_updated(self, context, router_id, **kwargs):
"""Send update event of vpnservices."""
--- /dev/null
+# Copyright 2014 Cisco Systems, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Interim code to obtain router information for Cisco CSR
+
+This obtains information on the Cisco CSR router from an INI file. This is
+an interim solution, until the Cisco L3 router plugin code is up-streamed.
+Once that happens, this code and UTs will be removed and the API calls to
+the L3 router will be used.
+
+To use this code, the Neutron server is started with a config_file that
+points to an INI file with router configuration. The router would be created
+(manually) in Nova, the INI file is then updated with the router information,
+and then VPN IPSec site-to-site connections can be created using that router.
+"""
+
+import netaddr
+import re
+
+from oslo.config import cfg
+
+from neutron.db import l3_db
+from neutron.db import models_v2
+from neutron.openstack.common.gettextutils import _LE
+from neutron.openstack.common.gettextutils import _LI
+from neutron.openstack.common import log as logging
+from neutron.services.vpn.device_drivers import (
+ cisco_csr_rest_client as csr_client)
+
+
+LOG = logging.getLogger(__name__)
+tunnel_if_re = re.compile(r'^GigabitEthernet[123]')
+
+
+def get_available_csrs_from_config(config_files):
+ """Read INI for available Cisco CSRs that driver can use.
+
+ Loads management port, tunnel IP, user, and password information for
+ available CSRs from configuration file. Driver will use this info to
+ configure VPN connections. The CSR is associated 1:1 with a Neutron
+ router. To identify which CSR to use for a VPN service, the public
+ (GW) IP of the Neutron router will be used as an index into the CSR
+ config info.
+ """
+ multi_parser = cfg.MultiConfigParser()
+ LOG.info(_LI("Scanning config files %s for Cisco CSR configurations"),
+ config_files)
+ try:
+ read_ok = multi_parser.read(config_files)
+ except cfg.ParseError as pe:
+ LOG.error(_LE("Config file parse error: %s"), pe)
+ return {}
+
+ if len(read_ok) != len(config_files):
+ raise cfg.Error(_("Unable to parse config files %s for Cisco CSR "
+ "info") % config_files)
+ csrs_found = {}
+ for parsed_file in multi_parser.parsed:
+ for parsed_item in parsed_file.keys():
+ device_type, sep, for_router = parsed_item.partition(':')
+ if device_type.lower() == 'cisco_csr_rest':
+ try:
+ netaddr.IPNetwork(for_router)
+ except netaddr.core.AddrFormatError:
+ LOG.error(_LE("Ignoring Cisco CSR configuration entry - "
+ "router IP %s is not valid"), for_router)
+ continue
+ entry = parsed_file[parsed_item]
+ # Check for missing fields
+ try:
+ rest_mgmt_ip = entry['rest_mgmt'][0]
+ tunnel_ip = entry['tunnel_ip'][0]
+ username = entry['username'][0]
+ password = entry['password'][0]
+ host = entry['host'][0]
+ tunnel_if = entry['tunnel_if'][0]
+ except KeyError as ke:
+ LOG.error(_LE("Ignoring Cisco CSR for router %(router)s "
+ "- missing %(field)s setting"),
+ {'router': for_router, 'field': str(ke)})
+ continue
+ # Validate fields
+ try:
+ timeout = float(entry['timeout'][0])
+ except ValueError:
+ LOG.error(_LE("Ignoring Cisco CSR for router %s - "
+ "timeout is not a floating point number"),
+ for_router)
+ continue
+ except KeyError:
+ timeout = csr_client.TIMEOUT
+ try:
+ netaddr.IPAddress(rest_mgmt_ip)
+ except netaddr.core.AddrFormatError:
+ LOG.error(_("Ignoring Cisco CSR for subnet %s - "
+ "REST management is not an IP address"),
+ for_router)
+ continue
+ try:
+ netaddr.IPAddress(tunnel_ip)
+ except netaddr.core.AddrFormatError:
+ LOG.error(_LE("Ignoring Cisco CSR for router %s - "
+ "local tunnel is not an IP address"),
+ for_router)
+ continue
+ m = tunnel_if_re.match(tunnel_if)
+ if not m:
+ LOG.error(_LE("Malformed interface name for Cisco "
+ "CSR router entry - %s"), tunnel_if)
+ continue
+ csrs_found[for_router] = {'rest_mgmt_ip': rest_mgmt_ip,
+ 'tunnel_ip': tunnel_ip,
+ 'username': username,
+ 'password': password,
+ 'host': host,
+ 'tunnel_if': tunnel_if,
+ 'timeout': timeout}
+
+ LOG.debug("Found CSR for router %(router)s: %(info)s",
+ {'router': for_router,
+ 'info': csrs_found[for_router]})
+ LOG.debug("Found %d router entries", len(csrs_found))
+ return csrs_found
+
+
+def _get_router_id_via_external_ip(context, external_ip):
+ '''Find router ID for router with matching GW port IP.'''
+ query = context.session.query(l3_db.Router)
+ query = query.join(models_v2.Port,
+ l3_db.Router.gw_port_id == models_v2.Port.id)
+ query = query.join(models_v2.IPAllocation,
+ models_v2.IPAllocation.port_id == models_v2.Port.id)
+ query = query.filter(models_v2.IPAllocation.ip_address == external_ip)
+ router = query.first()
+ if router:
+ return router.id
+
+
+def get_active_routers_for_host(context, host):
+ '''Get list of routers from INI file that use host requested.'''
+ routers = []
+ configured_routers = get_available_csrs_from_config(cfg.CONF.config_file)
+ if not configured_routers:
+ LOG.error(_LE("No routers found in INI file!"))
+ return routers
+ for router_ip, info in configured_routers.items():
+ if host == info['host']:
+ router_id = _get_router_id_via_external_ip(context, router_ip)
+ if router_id:
+ LOG.debug("Found router %(router)s on host %(host)s",
+ {'router': router_id, 'host': host})
+ routers.append({
+ 'id': router_id,
+ 'hosting_device': {
+ 'management_ip_address': info['rest_mgmt_ip'],
+ 'credentials': {'username': info['username'],
+ 'password': info['password']}
+ },
+ 'tunnel_if': info['tunnel_if'],
+ 'tunnel_ip': info['tunnel_ip']
+ })
+ else:
+ LOG.error(_LE("Unable to lookup router ID based on router's "
+ "public IP (%s) in INI file"), router_ip)
+ if not routers:
+ LOG.error(_LE("No matching routers on host %s"), host)
+ return routers
+
+
+def _get_external_ip_for_router(context, router_id):
+ '''Find port that is the gateway port for router.'''
+ query = context.session.query(models_v2.Port)
+ query = query.join(l3_db.Router,
+ l3_db.Router.gw_port_id == models_v2.Port.id)
+ query = query.filter(l3_db.Router.id == router_id)
+ gw_port = query.first()
+ if gw_port:
+ return gw_port.fixed_ips[0]['ip_address']
+
+
+def get_host_for_router(context, router_id):
+ '''Find out GW port for router and look-up in INI file to get host.
+
+ For this interim solution, there is the possibility that the INI file is
+ not configured correctly and either there are no entries or no matching
+ entry. An error will be reported in log and resource will remain in the
+ same state (e.g. PENDING_CREATE).
+ '''
+ routers = get_available_csrs_from_config(cfg.CONF.config_file)
+ if not routers:
+ LOG.error(_LE("No routers found in INI file!"))
+ return ''
+ router_public_ip = _get_external_ip_for_router(context, router_id)
+ if router_public_ip:
+ router = routers.get(router_public_ip)
+ if router:
+ LOG.debug("Found host %(host)s for router %(router)s",
+ {'host': router['host'], 'router': router_id})
+ return router['host']
+ LOG.error(_LE("Unable to find host for router %s"), router_id)
+ return ''
# License for the specific language governing permissions and limitations
# under the License.
-
from neutron.common import rpc as n_rpc
+from neutron.db.vpn import vpn_db
from neutron.openstack.common import log as logging
from neutron.services.vpn.common import topics
from neutron.services.vpn import service_drivers
+from neutron.services.vpn.service_drivers import (
+ cisco_cfg_loader as via_cfg_file)
from neutron.services.vpn.service_drivers import cisco_csr_db as csr_id_map
from neutron.services.vpn.service_drivers import cisco_validator
-
LOG = logging.getLogger(__name__)
IPSEC = 'ipsec'
BASE_IPSEC_VERSION = '1.0'
+LIFETIME_LIMITS = {'IKE Policy': {'min': 60, 'max': 86400},
+ 'IPSec Policy': {'min': 120, 'max': 2592000}}
+MIN_CSR_MTU = 1500
+MAX_CSR_MTU = 9192
class CiscoCsrIPsecVpnDriverCallBack(n_rpc.RpcCallback):
super(CiscoCsrIPsecVpnDriverCallBack, self).__init__()
self.driver = driver
+ def create_rpc_dispatcher(self):
+ return n_rpc.PluginRpcDispatcher([self])
+
+ def get_vpn_services_using(self, context, router_id):
+ query = context.session.query(vpn_db.VPNService)
+ query = query.join(vpn_db.IPsecSiteConnection)
+ query = query.join(vpn_db.IKEPolicy)
+ query = query.join(vpn_db.IPsecPolicy)
+ query = query.join(vpn_db.IPsecPeerCidr)
+ query = query.filter(vpn_db.VPNService.router_id == router_id)
+ return query.all()
+
def get_vpn_services_on_host(self, context, host=None):
- """Retuns info on the vpnservices on the host."""
- plugin = self.driver.service_plugin
- vpnservices = plugin._get_agent_hosting_vpn_services(
- context, host)
- return [self.driver._make_vpnservice_dict(vpnservice, context)
- for vpnservice in vpnservices]
+ """Returns info on the VPN services on the host."""
+ routers = via_cfg_file.get_active_routers_for_host(context, host)
+ host_vpn_services = []
+ for router in routers:
+ vpn_services = self.get_vpn_services_using(context, router['id'])
+ for vpn_service in vpn_services:
+ host_vpn_services.append(
+ self.driver._make_vpnservice_dict(context, vpn_service,
+ router))
+ return host_vpn_services
def update_status(self, context, status):
"""Update status of all vpnservices."""
RPC_API_VERSION = BASE_IPSEC_VERSION
- def __init__(self, topic, default_version):
+ def __init__(self, topic, default_version, driver):
super(CiscoCsrIPsecVpnAgentApi, self).__init__(
- topics.CISCO_IPSEC_AGENT_TOPIC, topic, default_version)
+ topic, default_version, driver)
+
+ def _agent_notification(self, context, method, router_id,
+ version=None, **kwargs):
+ """Notify update for the agent.
+
+ Find the host for the router being notified and then
+ dispatches a notification for the VPN device driver.
+ """
+ admin_context = context.is_admin and context or context.elevated()
+ if not version:
+ version = self.RPC_API_VERSION
+ host = via_cfg_file.get_host_for_router(admin_context, router_id)
+ if not host:
+ # NOTE: This is a config error for workaround. At this point we
+ # can't set state of resource to error.
+ return
+ LOG.debug(_('Notify agent at %(topic)s.%(host)s the message '
+ '%(method)s %(args)s for router %(router)s'),
+ {'topic': self.topic,
+ 'host': host,
+ 'method': method,
+ 'args': kwargs,
+ 'router': router_id})
+ self.cast(context, self.make_msg(method, **kwargs),
+ version=version,
+ topic='%s.%s' % (self.topic, host))
class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
topics.CISCO_IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
- topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
+ topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION, self)
@property
def service_type(self):
'ike_policy_id': u'%d' % ike_id,
'ipsec_policy_id': u'%s' % ipsec_id}
- def _make_vpnservice_dict(self, vpnservice, context):
- """Collect all info on service, including Cisco info per IPSec conn."""
+ def _create_tunnel_interface(self, router_info):
+ return router_info['tunnel_if']
+
+ def _get_router_info(self, router_info):
+ hosting_device = router_info['hosting_device']
+ return {'rest_mgmt_ip': hosting_device['management_ip_address'],
+ 'external_ip': router_info['tunnel_ip'],
+ 'username': hosting_device['credentials']['username'],
+ 'password': hosting_device['credentials']['password'],
+ 'tunnel_if_name': self._create_tunnel_interface(router_info),
+ # TODO(pcm): Add protocol_port, if avail from L3 router plugin
+ 'timeout': 30} # Hard-coded for now
+
+ def _make_vpnservice_dict(self, context, vpnservice, router_info):
+ """Collect all service info, including Cisco info for IPSec conn."""
vpnservice_dict = dict(vpnservice)
vpnservice_dict['ipsec_conns'] = []
- vpnservice_dict['subnet'] = dict(
- vpnservice.subnet)
- vpnservice_dict['external_ip'] = vpnservice.router.gw_port[
- 'fixed_ips'][0]['ip_address']
+ vpnservice_dict['subnet'] = dict(vpnservice.subnet)
+ vpnservice_dict['router_info'] = self._get_router_info(router_info)
for ipsec_conn in vpnservice.ipsec_site_connections:
ipsec_conn_dict = dict(ipsec_conn)
ipsec_conn_dict['ike_policy'] = dict(ipsec_conn.ikepolicy)
RPC_API_VERSION = BASE_IPSEC_VERSION
- def __init__(self, topic, default_version):
+ def __init__(self, topic, default_version, driver):
super(IPsecVpnAgentApi, self).__init__(
- topics.IPSEC_AGENT_TOPIC, topic, default_version)
+ topic, default_version, driver)
class IPsecVPNDriver(service_drivers.VpnDriver):
topics.IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = IPsecVpnAgentApi(
- topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
+ topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION, self)
@property
def service_type(self):
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrLoginRestApi, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrGetRestApi, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrPostRestApi, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
"""Prepare for PUT API tests."""
super(TestCsrPutRestApi, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrDeleteRestApi, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1):
super(TestCsrRestApiFailures, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkePolicyCreate, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecPolicyCreate, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestPreSharedKeyCreate, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecConnectionCreate, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkeKeepaliveCreate, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
self._save_dpd_info()
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestStaticRoute, self).setUp()
- info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ info = {'rest_mgmt_ip': host, 'tunnel_ip': tunnel_ip,
'username': 'stack', 'password': 'cisco', 'timeout': timeout}
self.csr = csr_client.CsrRestClient(info)
import copy
import httplib
-import os
-import tempfile
import mock
'cisco': {'site_conn_id': 'Tunnel0',
'ike_policy_id': 222,
'ipsec_policy_id': 333,
- # TODO(pcm) FUTURE use vpnservice['external_ip']
'router_public_ip': '172.24.4.23'}
}
self.csr = mock.Mock(spec=csr_client.CsrRestClient)
self.fail('Expected exception with invalid delete step')
def test_delete_ipsec_connection(self):
- # TODO(pcm) implement
- pass
+ """Perform delete of IPSec site connection and check steps done."""
+ # Simulate that a create was done with rollback steps stored
+ self.ipsec_conn.steps = [
+ ipsec_driver.RollbackStep(action='pre_shared_key',
+ resource_id='123',
+ title='Pre-Shared Key'),
+ ipsec_driver.RollbackStep(action='ike_policy',
+ resource_id=222,
+ title='IKE Policy'),
+ ipsec_driver.RollbackStep(action='ipsec_policy',
+ resource_id=333,
+ title='IPSec Policy'),
+ ipsec_driver.RollbackStep(action='ipsec_connection',
+ resource_id='Tunnel0',
+ title='IPSec Connection'),
+ ipsec_driver.RollbackStep(action='static_route',
+ resource_id='10.1.0.0_24_Tunnel0',
+ title='Static Route'),
+ ipsec_driver.RollbackStep(action='static_route',
+ resource_id='10.2.0.0_24_Tunnel0',
+ title='Static Route')]
+ expected = ['delete_static_route',
+ 'delete_static_route',
+ 'delete_ipsec_connection',
+ 'delete_ipsec_policy',
+ 'delete_ike_policy',
+ 'delete_pre_shared_key']
+ self.ipsec_conn.delete_ipsec_site_connection(mock.Mock(), 123)
+ client_calls = [c[0] for c in self.csr.method_calls]
+ self.assertEqual(expected, client_calls)
class TestCiscoCsrIPsecConnectionCreateTransforms(base.BaseTestCase):
'cisco': {'site_conn_id': 'Tunnel0',
'ike_policy_id': 222,
'ipsec_policy_id': 333,
- # TODO(pcm) get from vpnservice['external_ip']
'router_public_ip': '172.24.4.23'}
}
self.csr = mock.Mock(spec=csr_client.CsrRestClient)
"""Ensure site-to-site connection info is created/mapped correctly."""
expected = {u'vpn-interface-name': 'Tunnel0',
u'ipsec-policy-id': 333,
- u'local-device': {
- u'ip-address': u'GigabitEthernet3',
- u'tunnel-ip-address': '172.24.4.23'
- },
u'remote-device': {
u'tunnel-ip-address': '192.168.1.2'
},
mock.patch(klass).start()
self.context = context.Context('some_user', 'some_tenant')
self.agent = mock.Mock()
- conf_patch = mock.patch('oslo.config.cfg.CONF').start()
- conf_patch.config_file = ['dummy']
- self.config_load = mock.patch(FIND_CFG_FOR_CSRS).start()
- self.config_load.return_value = {'1.1.1.1': {'rest_mgmt': '2.2.2.2',
- 'tunnel_ip': '1.1.1.3',
- 'username': 'pe',
- 'password': 'password',
- 'timeout': 120}}
self.driver = ipsec_driver.CiscoCsrIPsecDriver(self.agent, FAKE_HOST)
self.driver.agent_rpc = mock.Mock()
self.conn_create = mock.patch.object(
ipsec_driver.CiscoCsrIPSecConnection,
'set_admin_state').start()
self.csr = mock.Mock()
- self.driver.csrs['1.1.1.1'] = self.csr
+ self.router_info = {u'router_info': {'rest_mgmt_ip': '2.2.2.2',
+ 'tunnel_ip': '1.1.1.3',
+ 'username': 'me',
+ 'password': 'password',
+ 'timeout': 120,
+ 'external_ip': u'1.1.1.1'}}
self.service123_data = {u'id': u'123',
u'status': constants.DOWN,
- u'admin_state_up': False,
- u'external_ip': u'1.1.1.1'}
+ u'admin_state_up': False}
+ self.service123_data.update(self.router_info)
self.conn1_data = {u'id': u'1',
u'status': constants.ACTIVE,
u'admin_state_up': True,
# Make existing service, and connection that was active
vpn_service = self.driver.create_vpn_service(self.service123_data)
- connection = vpn_service.create_connection(self.conn1_data)
+ vpn_service.create_connection(self.conn1_data)
# Simulate that notification of connection update received
self.driver.mark_existing_connections_as_dirty()
"""
# Make existing service, and connection that was active
vpn_service = self.driver.create_vpn_service(self.service123_data)
- connection = vpn_service.create_connection(self.conn1_data)
+ vpn_service.create_connection(self.conn1_data)
# Simulate that notification of connection update received
self.driver.mark_existing_connections_as_dirty()
conn_data.update({u'status': constants.DOWN, u'admin_state_up': False})
service_data = {u'id': u'123',
u'status': constants.DOWN,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn_data]}
+ service_data.update(self.router_info)
self.driver.update_service(self.context, service_data)
# Simulate that notification of connection update received
conn_data[u'status'] = constants.PENDING_CREATE
service_data = {u'id': u'123',
u'status': constants.PENDING_CREATE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
self.assertFalse(vpn_service.is_dirty)
self.assertEqual(constants.PENDING_CREATE, vpn_service.last_status)
conn_data[u'status'] = constants.PENDING_CREATE
service_data = {u'id': u'123',
u'status': constants.ACTIVE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
# Should reuse the entry and update the status
self.assertEqual(prev_vpn_service, vpn_service)
# Create notification with conn unchanged and service already created
service_data = {u'id': u'123',
u'status': constants.ACTIVE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [self.conn1_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
# Should reuse the entry and update the status
self.assertEqual(prev_vpn_service, vpn_service)
self.driver.mark_existing_connections_as_dirty()
service_data = {u'id': u'123',
u'status': constants.DOWN,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': False,
u'ipsec_conns': [self.conn1_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
self.assertEqual(prev_vpn_service, vpn_service)
self.assertFalse(vpn_service.is_dirty)
"""
service_data = {u'id': u'123',
u'status': constants.DOWN,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': False,
u'ipsec_conns': [self.conn1_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
self.assertIsNotNone(vpn_service)
self.assertFalse(vpn_service.is_dirty)
u'cisco': {u'site_conn_id': u'Tunnel1'}}
service_data = {u'id': u'123',
u'status': constants.DOWN,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn_data1, conn_data2]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
self.assertEqual(prev_vpn_service, vpn_service)
self.assertFalse(vpn_service.is_dirty)
u'cisco': {u'site_conn_id': u'Tunnel0'}}
service_data = {u'id': u'123',
u'status': constants.ACTIVE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
self.assertFalse(vpn_service.is_dirty)
self.assertEqual(constants.ACTIVE, vpn_service.last_status)
self.assertEqual(constants.DOWN, connection.last_status)
self.assertEqual(1, self.conn_create.call_count)
- def test_update_service_create_no_csr(self):
- """Failure test of sync of service that is not on CSR - ignore.
-
- Ignore the VPN service and its IPSec connection(s) notifications for
- which there is no corresponding Cisco CSR.
- """
- conn_data = {u'id': u'1', u'status': constants.PENDING_CREATE,
- u'admin_state_up': True,
- u'cisco': {u'site_conn_id': u'Tunnel0'}}
- service_data = {u'id': u'123',
- u'status': constants.PENDING_CREATE,
- u'external_ip': u'2.2.2.2',
- u'admin_state_up': True,
- u'ipsec_conns': [conn_data]}
- vpn_service = self.driver.update_service(self.context, service_data)
- self.assertIsNone(vpn_service)
-
def _check_connection_for_service(self, count, vpn_service):
"""Helper to check the connection information for a service."""
connection = vpn_service.get_connection(u'%d' % count)
u'cisco': {u'site_conn_id': u'Tunnel2'}}
service1_data = {u'id': u'123',
u'status': constants.PENDING_CREATE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn1_data, conn2_data]}
+ service1_data.update(self.router_info)
conn3_data = {u'id': u'3', u'status': constants.PENDING_CREATE,
u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel3'}}
u'cisco': {u'site_conn_id': u'Tunnel4'}}
service2_data = {u'id': u'456',
u'status': constants.PENDING_CREATE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn3_data, conn4_data]}
+ service2_data.update(self.router_info)
return service1_data, service2_data
def test_create_two_connections_on_two_services(self):
vpn_service1.create_connection(self.conn1_data)
service456_data = {u'id': u'456',
u'status': constants.ACTIVE,
- u'admin_state_up': False,
- u'external_ip': u'1.1.1.1'}
+ u'admin_state_up': False}
+ service456_data.update(self.router_info)
conn2_data = {u'id': u'2', u'status': constants.ACTIVE,
u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel0'}}
connection_state):
"""Create internal structures for single service with connection.
- The service and connection will be marked as clean, and since
- none are being deleted, the service's connections_removed
- attribute will remain false.
+ Creates a service and corresponding connection. Then, simluates
+ the mark/update/sweep operation by marking both the service and
+ connection as clean and updating their status. Override the REST
+ client created for the service, with a mock, so that all calls
+ can be mocked out.
"""
- # Simulate that we have done mark, update, and sweep.
conn_data = {u'id': u'1', u'status': connection_state,
u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel0'}}
service_data = {u'id': u'123',
- u'status': service_state,
- u'external_ip': u'1.1.1.1',
- u'admin_state_up': True,
- u'ipsec_conns': [conn_data]}
- return self.driver.update_service(self.context, service_data)
+ u'admin_state_up': True}
+ service_data.update(self.router_info)
+ # Create a service and connection
+ vpn_service = self.driver.create_vpn_service(service_data)
+ vpn_service.csr = self.csr # Mocked REST client
+ connection = vpn_service.create_connection(conn_data)
+ # Simulate that the update phase visited both of them
+ vpn_service.is_dirty = False
+ vpn_service.connections_removed = False
+ vpn_service.last_status = service_state
+ vpn_service.is_admin_up = True
+ connection.is_dirty = False
+ connection.last_status = connection_state
+ connection.is_admin_up = True
+ connection.forced_down = False
+ return vpn_service
def test_report_fragment_connection_created(self):
"""Generate report section for a created connection."""
u'cisco': {u'site_conn_id': u'Tunnel0'}}
service_data = {u'id': u'123',
u'status': constants.ACTIVE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
+ vpn_service.csr = self.csr # Mocked REST client
# Tunnel would have been deleted, so simulate no status
self.csr.read_tunnel_statuses.return_value = []
u'cisco': {u'site_conn_id': u'Tunnel2'}}
service_data = {u'id': u'123',
u'status': constants.ACTIVE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn1_data, conn2_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
+ vpn_service.csr = self.csr # Mocked REST client
# Simulate that CSR has reported the connections with diff status
self.csr.read_tunnel_statuses.return_value = [
(u'Tunnel1', u'UP-IDLE'), (u'Tunnel2', u'DOWN-NEGOTIATING')]
u'cisco': {u'site_conn_id': u'Tunnel2'}}
service_data = {u'id': u'123',
u'status': constants.ACTIVE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn1_data, conn2_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
+ vpn_service.csr = self.csr # Mocked REST client
# Simulate that the CSR has reported that the connections are DOWN
self.csr.read_tunnel_statuses.return_value = [
(u'Tunnel1', u'DOWN-NEGOTIATING'), (u'Tunnel2', u'DOWN')]
u'cisco': {u'site_conn_id': u'Tunnel2'}}
service_data = {u'id': u'123',
u'status': constants.ACTIVE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn1_data, conn2_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
self.assertEqual(constants.ACTIVE, vpn_service.last_status)
self.assertEqual(constants.ACTIVE,
self.driver.mark_existing_connections_as_dirty()
service_data = {u'id': u'123',
u'status': constants.ACTIVE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': True,
u'ipsec_conns': [conn2_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
+ vpn_service.csr = self.csr # Mocked REST client
self.driver.remove_unknown_connections(self.context)
self.assertTrue(vpn_service.connections_removed)
self.assertEqual(constants.ACTIVE, vpn_service.last_status)
u'cisco': {u'site_conn_id': u'Tunnel2'}}
service_data = {u'id': u'123',
u'status': constants.ACTIVE,
- u'external_ip': u'1.1.1.1',
u'admin_state_up': False,
u'ipsec_conns': [conn1_data, conn2_data]}
+ service_data.update(self.router_info)
vpn_service = self.driver.update_service(self.context, service_data)
+ vpn_service.csr = self.csr # Mocked REST client
# Since service admin down, connections will have been deleted
self.csr.read_tunnel_statuses.return_value = []
vpn_service1 = self.driver.update_service(self.context, service1_data)
vpn_service2 = self.driver.update_service(self.context, service2_data)
# Simulate that the CSR has created the connections
+ vpn_service1.csr = vpn_service2.csr = self.csr # Mocked REST client
self.csr.read_tunnel_statuses.return_value = [
(u'Tunnel1', u'UP-ACTIVE'), (u'Tunnel2', u'DOWN'),
(u'Tunnel3', u'DOWN-NEGOTIATING'), (u'Tunnel4', u'UP-IDLE')]
context = mock.Mock()
self.driver.vpnservice_updated(context)
sync.assert_called_once_with(context, [])
-
-
-class TestCiscoCsrIPsecDeviceDriverConfigLoading(base.BaseTestCase):
-
- def create_tempfile(self, contents):
- (fd, path) = tempfile.mkstemp(prefix='test', suffix='.conf')
- try:
- os.write(fd, contents.encode('utf-8'))
- finally:
- os.close(fd)
- return path
-
- def test_loading_csr_configuration(self):
- """Ensure that Cisco CSR configs can be loaded from config files."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:3.2.1.1]\n'
- 'rest_mgmt = 10.20.30.1\n'
- 'tunnel_ip = 3.2.1.3\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = 5.0\n')
- expected = {'3.2.1.1': {'rest_mgmt': '10.20.30.1',
- 'tunnel_ip': '3.2.1.3',
- 'username': 'me',
- 'password': 'secret',
- 'timeout': 5.0}}
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual(expected, csrs_found)
-
- def test_loading_config_without_timeout(self):
- """Cisco CSR config without timeout will use default timeout."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:3.2.1.1]\n'
- 'rest_mgmt = 10.20.30.1\n'
- 'tunnel_ip = 3.2.1.3\n'
- 'username = me\n'
- 'password = secret\n')
- expected = {'3.2.1.1': {'rest_mgmt': '10.20.30.1',
- 'tunnel_ip': '3.2.1.3',
- 'username': 'me',
- 'password': 'secret',
- 'timeout': csr_client.TIMEOUT}}
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual(expected, csrs_found)
-
- def test_skip_loading_duplicate_csr_configuration(self):
- """Failure test that duplicate configurations are ignored."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:3.2.1.1]\n'
- 'rest_mgmt = 10.20.30.1\n'
- 'tunnel_ip = 3.2.1.3\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = 5.0\n'
- '[CISCO_CSR_REST:3.2.1.1]\n'
- 'rest_mgmt = 5.5.5.3\n'
- 'tunnel_ip = 3.2.1.6\n'
- 'username = me\n'
- 'password = secret\n')
- expected = {'3.2.1.1': {'rest_mgmt': '10.20.30.1',
- 'tunnel_ip': '3.2.1.3',
- 'username': 'me',
- 'password': 'secret',
- 'timeout': 5.0}}
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual(expected, csrs_found)
-
- def test_fail_loading_config_with_invalid_timeout(self):
- """Failure test of invalid timeout in config info."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:3.2.1.1]\n'
- 'rest_mgmt = 10.20.30.1\n'
- 'tunnel_ip = 3.2.1.3\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = yes\n')
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual({}, csrs_found)
-
- def test_fail_loading_config_missing_required_info(self):
- """Failure test of config missing required info."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:1.1.1.0]\n'
- 'tunnel_ip = 1.1.1.3\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = 5.0\n'
- '[CISCO_CSR_REST:2.2.2.0]\n'
- 'rest_mgmt = 10.20.30.1\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = 5.0\n'
- '[CISCO_CSR_REST:3.3.3.0]\n'
- 'rest_mgmt = 10.20.30.1\n'
- 'tunnel_ip = 3.3.3.3\n'
- 'password = secret\n'
- 'timeout = 5.0\n'
- '[CISCO_CSR_REST:4.4.4.0]\n'
- 'rest_mgmt = 10.20.30.1\n'
- 'tunnel_ip = 4.4.4.4\n'
- 'username = me\n'
- 'timeout = 5.0\n')
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual({}, csrs_found)
-
- def test_fail_loading_config_with_invalid_router_id(self):
- """Failure test of config with invalid rotuer ID."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:4.3.2.1.9]\n'
- 'rest_mgmt = 10.20.30.1\n'
- 'tunnel_ip = 4.3.2.3\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = 5.0\n')
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual({}, csrs_found)
-
- def test_fail_loading_config_with_invalid_mgmt_ip(self):
- """Failure test of configuration with invalid management IP address."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:3.2.1.1]\n'
- 'rest_mgmt = 1.1.1.1.1\n'
- 'tunnel_ip = 3.2.1.3\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = 5.0\n')
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual({}, csrs_found)
-
- def test_fail_loading_config_with_invalid_tunnel_ip(self):
- """Failure test of configuration with invalid tunnel IP address."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:3.2.1.1]\n'
- 'rest_mgmt = 1.1.1.1\n'
- 'tunnel_ip = 3.2.1.4.5\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = 5.0\n')
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual({}, csrs_found)
-
- def test_failure_no_configurations_entries(self):
- """Failure test config file without any CSR definitions."""
- cfg_file = self.create_tempfile('NO CISCO SECTION AT ALL\n')
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual({}, csrs_found)
-
- def test_failure_no_csr_configurations_entries(self):
- """Failure test config file without any CSR definitions."""
- cfg_file = self.create_tempfile('[SOME_CONFIG:123]\n'
- 'username = me\n')
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual({}, csrs_found)
-
- def test_missing_config_value(self):
- """Failure test of config file missing a value for attribute."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:3.2.1.1]\n'
- 'rest_mgmt = \n'
- 'tunnel_ip = 3.2.1.3\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = 5.0\n')
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual({}, csrs_found)
-
- def test_ignores_invalid_attribute_in_config(self):
- """Test ignoring of config file with invalid attribute."""
- cfg_file = self.create_tempfile('[CISCO_CSR_REST:3.2.1.1]\n'
- 'rest_mgmt = 1.1.1.1\n'
- 'bogus = abcdef\n'
- 'tunnel_ip = 3.2.1.3\n'
- 'username = me\n'
- 'password = secret\n'
- 'timeout = 15.5\n')
- expected = {'3.2.1.1': {'rest_mgmt': '1.1.1.1',
- 'tunnel_ip': '3.2.1.3',
- 'username': 'me',
- 'password': 'secret',
- 'timeout': 15.5}}
- csrs_found = ipsec_driver.find_available_csrs_from_config([cfg_file])
- self.assertEqual(expected, csrs_found)
--- /dev/null
+# Copyright 2014 Cisco Systems, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Paul Michali, Cisco Systems, Inc.
+
+"""Test module for interim implementation - to be removed later.
+
+This tests using an INI file to obtain Cisco CSR router information
+for IPSec site-to-site connections. Once the Cisco L3 router plugin
+blueprint has been up-streamed, this can be removed and production code
+switched to use the L3 plugin methods for:
+
+ get_host_for_router()
+ get_active_routers_for_host()
+
+TODO(pcm): remove module, when Cisco L3 router plugin is up-streamed.
+"""
+
+import os
+import tempfile
+
+import mock
+from oslo.config import cfg
+
+from neutron import context as ctx
+from neutron.openstack.common import uuidutils
+from neutron.services.vpn.device_drivers import (
+ cisco_csr_rest_client as csr_client)
+from neutron.services.vpn.service_drivers import (
+ cisco_cfg_loader as cfg_loader)
+from neutron.tests import base
+
+_uuid = uuidutils.generate_uuid
+FAKE_ROUTER_ID = _uuid()
+CISCO_GET_ROUTER_IP = ('neutron.services.vpn.service_drivers.'
+ 'cisco_cfg_loader._get_external_ip_for_router')
+CISCO_GET_ROUTER_ID = ('neutron.services.vpn.service_drivers.'
+ 'cisco_cfg_loader._get_router_id_via_external_ip')
+
+
+def create_tempfile(contents):
+ (fd, path) = tempfile.mkstemp(prefix='test', suffix='.conf')
+ try:
+ os.write(fd, contents.encode('utf-8'))
+ finally:
+ os.close(fd)
+ return path
+
+
+class TestCiscoCsrServiceDriverConfigLoading(base.BaseTestCase):
+
+ def test_loading_csr_configuration(self):
+ """Ensure that Cisco CSR configs can be loaded from config files."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n')
+ expected = {'3.2.1.1': {'rest_mgmt_ip': '10.20.30.1',
+ 'tunnel_ip': '3.2.1.3',
+ 'username': 'me',
+ 'password': 'secret',
+ 'host': 'compute-node',
+ 'tunnel_if': 'GigabitEthernet3',
+ 'timeout': 5.0}}
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual(expected, csrs_found)
+
+ def test_loading_config_without_timeout(self):
+ """Cisco CSR config without timeout will use default timeout."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n')
+ expected = {'3.2.1.1': {'rest_mgmt_ip': '10.20.30.1',
+ 'tunnel_ip': '3.2.1.3',
+ 'username': 'me',
+ 'password': 'secret',
+ 'host': 'compute-node',
+ 'tunnel_if': 'GigabitEthernet3',
+ 'timeout': csr_client.TIMEOUT}}
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual(expected, csrs_found)
+
+ def test_skip_loading_duplicate_csr_configuration(self):
+ """Failure test that duplicate configurations are ignored."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n'
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 5.5.5.3\n'
+ 'tunnel_ip = 3.2.1.6\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n')
+ expected = {'3.2.1.1': {'rest_mgmt_ip': '10.20.30.1',
+ 'tunnel_ip': '3.2.1.3',
+ 'username': 'me',
+ 'password': 'secret',
+ 'host': 'compute-node',
+ 'tunnel_if': 'GigabitEthernet3',
+ 'timeout': 5.0}}
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual(expected, csrs_found)
+
+ def test_fail_loading_config_with_invalid_timeout(self):
+ """Failure test of invalid timeout in config info."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = yes\n')
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual({}, csrs_found)
+
+ def test_fail_loading_config_missing_required_info(self):
+ """Failure test of config missing required info."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:1.1.1.0]\n'
+ # No rest_mgmt
+ 'tunnel_ip = 1.1.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n'
+
+ '[CISCO_CSR_REST:2.2.2.0]\n'
+ 'rest_mgmt = 10.20.30.2\n'
+ # No tunnel_ip
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n'
+
+ '[CISCO_CSR_REST:3.3.3.0]\n'
+ 'rest_mgmt = 10.20.30.3\n'
+ 'tunnel_ip = 3.3.3.3\n'
+ # No username
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n'
+
+ '[CISCO_CSR_REST:4.4.4.0]\n'
+ 'rest_mgmt = 10.20.30.4\n'
+ 'tunnel_ip = 4.4.4.4\n'
+ 'username = me\n'
+ # No password
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n'
+
+ '[CISCO_CSR_REST:5.5.5.0]\n'
+ 'rest_mgmt = 10.20.30.5\n'
+ 'tunnel_ip = 5.5.5.5'
+ 'username = me\n'
+ 'password = secret\n'
+ # No host
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n'
+
+ '[CISCO_CSR_REST:6.6.6.0]\n'
+ 'rest_mgmt = 10.20.30.6\n'
+ 'tunnel_ip = 6.6.6.6'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ # No tunnel_if
+ 'timeout = 5.0\n')
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual({}, csrs_found)
+
+ def test_fail_loading_config_with_invalid_router_id(self):
+ """Failure test of config with invalid rotuer ID."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:4.3.2.1.9]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 4.3.2.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n')
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual({}, csrs_found)
+
+ def test_fail_loading_config_with_invalid_mgmt_ip(self):
+ """Failure test of configuration with invalid management IP address."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 1.1.1.1.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n')
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual({}, csrs_found)
+
+ def test_fail_loading_config_with_invalid_tunnel_ip(self):
+ """Failure test of configuration with invalid tunnel IP address."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 1.1.1.1\n'
+ 'tunnel_ip = 3.2.1.4.5\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n')
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual({}, csrs_found)
+
+ def test_failure_no_configurations_entries(self):
+ """Failure test config file without any CSR definitions."""
+ cfg_file = create_tempfile('NO CISCO SECTION AT ALL\n')
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual({}, csrs_found)
+
+ def test_failure_no_csr_configurations_entries(self):
+ """Failure test config file without any CSR definitions."""
+ cfg_file = create_tempfile('[SOME_CONFIG:123]\n'
+ 'username = me\n')
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual({}, csrs_found)
+
+ def test_missing_config_value(self):
+ """Failure test of config file missing a value for attribute."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = \n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n')
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual({}, csrs_found)
+
+ def test_ignores_invalid_attribute_in_config(self):
+ """Test ignoring of config file with invalid attribute."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 1.1.1.1\n'
+ 'bogus = abcdef\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 15.5\n')
+ expected = {'3.2.1.1': {'rest_mgmt_ip': '1.1.1.1',
+ 'tunnel_ip': '3.2.1.3',
+ 'username': 'me',
+ 'password': 'secret',
+ 'host': 'compute-node',
+ 'tunnel_if': 'GigabitEthernet3',
+ 'timeout': 15.5}}
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual(expected, csrs_found)
+
+ def test_invalid_management_interface(self):
+ """Failure test of invalid management interface name."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 1.1.1.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = compute-node\n'
+ 'tunnel_if = GigabitEthernet9\n'
+ 'timeout = 5.0\n')
+ csrs_found = cfg_loader.get_available_csrs_from_config([cfg_file])
+ self.assertEqual({}, csrs_found)
+
+
+class TestCiscoCsrRouterInfo(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestCiscoCsrRouterInfo, self).setUp()
+ self.context = ctx.get_admin_context()
+
+ def test_find_host_for_router(self):
+ """Look up host in INI file for a router."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = ubuntu\n'
+ 'tunnel_if = GigabitEthernet1\n'
+ 'mgmt_vlan = 100\n'
+ 'timeout = 5.0\n')
+ cfg.CONF.set_override('config_file', [cfg_file])
+ mock.patch(CISCO_GET_ROUTER_IP, return_value='3.2.1.1').start()
+ self.assertEqual('ubuntu',
+ cfg_loader.get_host_for_router(self.context,
+ FAKE_ROUTER_ID))
+
+ def test_failed_to_find_host_as_no_routers_in_ini(self):
+ """Fail to find host, as no router info in INI file."""
+ cfg_file = create_tempfile('\n')
+ cfg.CONF.set_override('config_file', [cfg_file])
+ mock.patch(CISCO_GET_ROUTER_IP, return_value='5.5.5.5').start()
+ self.assertEqual('',
+ cfg_loader.get_host_for_router(self.context,
+ FAKE_ROUTER_ID))
+
+ def test_failed_no_matching_router_to_obtain_host(self):
+ """Fail to find INI info for router provided."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = ubuntu\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n')
+ cfg.CONF.set_override('config_file', [cfg_file])
+ mock.patch(CISCO_GET_ROUTER_IP, return_value='5.5.5.5').start()
+ self.assertEqual('',
+ cfg_loader.get_host_for_router(self.context,
+ FAKE_ROUTER_ID))
+
+ def test_failed_to_find_router_ip(self):
+ """Fail to lookup router IP, preventing search in INI file."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = ubuntu\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n')
+ cfg.CONF.set_override('config_file', [cfg_file])
+ mock.patch(CISCO_GET_ROUTER_IP, return_value=None).start()
+ self.assertEqual('',
+ cfg_loader.get_host_for_router(self.context,
+ FAKE_ROUTER_ID))
+
+ def _get_router_id_from_external_ip(self, context, ip):
+ if ip == '3.2.1.1':
+ return '123'
+ elif ip == '4.3.2.1':
+ return '456'
+
+ def test_get_one_active_router_for_host(self):
+ """Get router info from INI for host specified."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 3.2.1.3\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = ubuntu\n'
+ 'tunnel_if = GigabitEthernet2\n'
+ 'timeout = 5.0\n')
+ cfg.CONF.set_override('config_file', [cfg_file])
+ mock.patch(CISCO_GET_ROUTER_ID,
+ side_effect=self._get_router_id_from_external_ip).start()
+ expected = {
+ 'id': '123',
+ 'hosting_device': {
+ 'management_ip_address': '10.20.30.1',
+ 'credentials': {'username': 'me', 'password': 'secret'}
+ },
+ 'tunnel_if': 'GigabitEthernet2',
+ 'tunnel_ip': '3.2.1.3'
+ }
+ routers = cfg_loader.get_active_routers_for_host(self.context,
+ "ubuntu")
+ self.assertEqual([expected], routers)
+
+ def test_get_two_active_routers_for_host(self):
+ """Get info for two routers, from INI file, for host specified."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:3.2.1.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 3.2.1.1\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = ubuntu\n'
+ 'tunnel_if = GigabitEthernet2\n'
+ 'timeout = 5.0\n'
+ '[CISCO_CSR_REST:4.3.2.1]\n'
+ 'rest_mgmt = 10.20.30.2\n'
+ 'tunnel_ip = 4.3.2.1\n'
+ 'username = you\n'
+ 'password = insecure\n'
+ 'host = ubuntu\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n')
+ cfg.CONF.set_override('config_file', [cfg_file])
+ mock.patch(CISCO_GET_ROUTER_ID,
+ side_effect=self._get_router_id_from_external_ip).start()
+ expected_a = {
+ 'id': '123',
+ 'hosting_device': {
+ 'management_ip_address': '10.20.30.1',
+ 'credentials': {'username': 'me', 'password': 'secret'}
+ },
+ 'tunnel_if': 'GigabitEthernet2',
+ 'tunnel_ip': '3.2.1.1'
+ }
+ expected_b = {
+ 'id': '456',
+ 'hosting_device': {
+ 'management_ip_address': '10.20.30.2',
+ 'credentials': {'username': 'you', 'password': 'insecure'}
+ },
+ 'tunnel_if': 'GigabitEthernet3',
+ 'tunnel_ip': '4.3.2.1'
+ }
+ routers = cfg_loader.get_active_routers_for_host(self.context,
+ "ubuntu")
+ sorted_routers = sorted(routers, key=lambda key: key['id'])
+ self.assertEqual([expected_a, expected_b], sorted_routers)
+
+ def test_failure_to_find_routers_for_host(self):
+ """Fail to find a router in INI with matching host name."""
+ routers = cfg_loader.get_active_routers_for_host(self.context,
+ "bogus")
+ self.assertEqual([], routers)
+
+ def test_failure_to_lookup_router_id_for_host(self):
+ """Fail to get router UUID for router in INI matching host name."""
+ cfg_file = create_tempfile(
+ '[CISCO_CSR_REST:6.6.6.1]\n'
+ 'rest_mgmt = 10.20.30.1\n'
+ 'tunnel_ip = 6.6.6.1\n'
+ 'username = me\n'
+ 'password = secret\n'
+ 'host = ubuntu\n'
+ 'tunnel_if = GigabitEthernet3\n'
+ 'timeout = 5.0\n')
+ cfg.CONF.set_override('config_file', [cfg_file])
+ mock.patch(CISCO_GET_ROUTER_ID,
+ side_effect=self._get_router_id_from_external_ip).start()
+ routers = cfg_loader.get_active_routers_for_host(self.context,
+ "ubuntu")
+ self.assertEqual([], routers)
super(TestCiscoIPsecDriver, self).setUp()
mock.patch('neutron.common.rpc.create_connection').start()
- l3_agent = mock.Mock()
- l3_agent.host = FAKE_HOST
- plugin = mock.Mock()
- plugin.get_l3_agents_hosting_routers.return_value = [l3_agent]
- plugin_p = mock.patch('neutron.manager.NeutronManager.get_plugin')
- get_plugin = plugin_p.start()
- get_plugin.return_value = plugin
- service_plugin_p = mock.patch(
- 'neutron.manager.NeutronManager.get_service_plugins')
- get_service_plugin = service_plugin_p.start()
- get_service_plugin.return_value = {constants.L3_ROUTER_NAT: plugin}
-
service_plugin = mock.Mock()
- service_plugin.get_l3_agents_hosting_routers.return_value = [l3_agent]
+ service_plugin.get_host_for_router.return_value = FAKE_HOST
+ # TODO(pcm): Remove when Cisco L3 router plugin support available
+ mock.patch('neutron.services.vpn.service_drivers.'
+ 'cisco_cfg_loader.get_host_for_router',
+ return_value=FAKE_HOST).start()
service_plugin._get_vpnservice.return_value = {
'router_id': _uuid()
}
- self.db_update_mock = service_plugin.update_ipsec_site_conn_status
+ get_service_plugin = mock.patch(
+ 'neutron.manager.NeutronManager.get_service_plugins').start()
+ get_service_plugin.return_value = {
+ constants.L3_ROUTER_NAT: service_plugin}
self.driver = ipsec_driver.CiscoCsrIPsecVPNDriver(service_plugin)
mock.patch.object(csr_db, 'create_tunnel_mapping').start()
self.context = n_ctx.Context('some_user', 'some_tenant')