# now, uncomment and include httmock source to UT
from neutron.tests.unit.services.vpn.device_drivers import httmock
-# TODO(pcm) Remove, once verified these have been fixed
-FIXED_CSCum35484 = False
-FIXED_CSCul82396 = False
-FIXED_CSCum10324 = False
-
LOG = logging.getLogger(__name__)
def normal_get(url, request):
if request.method != 'GET':
return
- LOG.debug("DEBUG: GET mock for %s", url)
+ LOG.debug("GET mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
if 'global/host-name' in url.path:
u'priority-id': u'2',
u'version': u'v1',
u'local-auth-method': u'pre-share',
- u'encryption': u'aes',
+ u'encryption': u'aes256',
u'hash': u'sha',
u'dhGroup': 5,
u'lifetime': 3600}
u'mode': u'tunnel',
u'policy-id': u'%s' % ipsec_policy_id,
u'protection-suite': {
- u'esp-encryption': u'esp-aes',
+ u'esp-encryption': u'esp-256-aes',
u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac',
},
- u'anti-replay-window-size': u'128',
+ u'anti-replay-window-size': u'Disable',
u'lifetime-sec': 120,
u'pfs': u'group5',
u'lifetime-kb': 4608000,
@filter_request(['get'], 'vpn-svc/ike/keyrings')
@httmock.urlmatch(netloc=r'localhost')
def get_fqdn(url, request):
- LOG.debug("DEBUG: GET FQDN mock for %s", url)
+ LOG.debug("GET FQDN mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
content = {u'kind': u'object#ike-keyring',
@filter_request(['get'], 'vpn-svc/ipsec/policies/')
@httmock.urlmatch(netloc=r'localhost')
def get_no_ah(url, request):
- LOG.debug("DEBUG: GET No AH mock for %s", url)
+ LOG.debug("GET No AH mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
ipsec_policy_id = url.path.split('/')[-1]
def get_defaults(url, request):
if request.method != 'GET':
return
- LOG.debug("DEBUG: GET mock for %s", url)
+ LOG.debug("GET mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
if 'vpn-svc/ike/policies/2' in url.path:
def post(url, request):
if request.method != 'POST':
return
- LOG.debug("DEBUG: POST mock for %s", url)
+ LOG.debug("POST mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
if 'interfaces/GigabitEthernet' in url.path:
@filter_request(['post'], 'global/local-users')
@httmock.urlmatch(netloc=r'localhost')
def post_change_attempt(url, request):
- LOG.debug("DEBUG: POST change value mock for %s", url)
+ LOG.debug("POST change value mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.NOT_FOUND,
@httmock.urlmatch(netloc=r'localhost')
def post_duplicate(url, request):
- LOG.debug("DEBUG: POST duplicate mock for %s", url)
+ LOG.debug("POST duplicate mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST,
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
def post_missing_ipsec_policy(url, request):
- LOG.debug("DEBUG: POST missing ipsec policy mock for %s", url)
+ LOG.debug("POST missing ipsec policy mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
def post_missing_ike_policy(url, request):
- LOG.debug("DEBUG: POST missing ike policy mock for %s", url)
+ LOG.debug("POST missing ike policy mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
def post_bad_ip(url, request):
- LOG.debug("DEBUG: POST bad IP mock for %s", url)
+ LOG.debug("POST bad IP mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
def post_bad_mtu(url, request):
- LOG.debug("DEBUG: POST bad mtu mock for %s", url)
+ LOG.debug("POST bad mtu mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/ipsec/policies')
@httmock.urlmatch(netloc=r'localhost')
def post_bad_lifetime(url, request):
- LOG.debug("DEBUG: POST bad lifetime mock for %s", url)
+ LOG.debug("POST bad lifetime mock for %s", url)
+ if not request.headers.get('X-auth-token', None):
+ return {'status_code': requests.codes.UNAUTHORIZED}
+ return {'status_code': requests.codes.BAD_REQUEST}
+
+
+@filter_request(['post'], 'vpn-svc/ipsec/policies')
+@httmock.urlmatch(netloc=r'localhost')
+def post_bad_name(url, request):
+ LOG.debug("POST bad IPSec policy name for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
def put(url, request):
if request.method != 'PUT':
return
- LOG.debug("DEBUG: PUT mock for %s", url)
+ LOG.debug("PUT mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
def delete(url, request):
if request.method != 'DELETE':
return
- LOG.debug("DEBUG: DELETE mock for %s", url)
+ LOG.debug("DELETE mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
def delete_unknown(url, request):
if request.method != 'DELETE':
return
- LOG.debug("DEBUG: DELETE unknown mock for %s", url)
+ LOG.debug("DELETE unknown mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
def delete_not_allowed(url, request):
if request.method != 'DELETE':
return
- LOG.debug("DEBUG: DELETE not allowed mock for %s", url)
+ LOG.debug("DELETE not allowed mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
import random
-import httmock
+# TODO(pcm) Uncomment when httmock is added to test requirements.
+# import httmock
import requests
from neutron.openstack.common import log as logging
from neutron.tests import base
from neutron.tests.unit.services.vpn.device_drivers import (
cisco_csr_mock as csr_request)
-# TODO(pcm) Remove once httmock is available. In the meantime, use temp
-# copy of hhtmock source to run UT
-# from neutron.tests.unit.services.vpn.device_drivers import httmock
+# TODO(pcm) Remove once httmock is available. In the meantime, use
+# temporary local copy of httmock source to run UT
+from neutron.tests.unit.services.vpn.device_drivers import httmock
LOG = logging.getLogger(__name__)
logging.CONF.set_override('debug', True)
logging.setup('neutron')
-if csr_request.FIXED_CSCum35484:
- dummy_uuid = '1eb4ee6b-0870-45a0-b554-7b69096'
-else:
- dummy_uuid = '1eb4ee6b-0870-45a0-b554-7b'
+dummy_policy_id = 'dummy-ipsec-policy-id-name'
# Note: Helper functions to test reuse of IDs.
"""Test logging into CSR to obtain token-id."""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrLoginRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def test_get_token(self):
"""Obtain the token and its expiration time."""
"""Test CSR GET REST API."""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrGetRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def test_valid_rest_gets(self):
"""Simple GET requests.
"""Test CSR POST REST API."""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrPostRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def test_post_requests(self):
"""Simple POST requests (repeatable).
if self.csr.status != requests.codes.NO_CONTENT:
self.fail("Unable to restore I/F Ge1 description after test")
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
"""Prepare for PUT API tests."""
super(TestCsrPutRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
self._save_resources()
self.addCleanup(self._restore_resources, 'stack', 'cisco')
"""Test CSR DELETE REST API."""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrDeleteRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def _make_dummy_user(self):
"""Create a user that will be later deleted."""
the result, without any error handling.
"""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1):
super(TestCsrRestApiFailures, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco', timeout=0.1)
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def test_request_for_non_existent_resource(self):
"""Negative test of non-existent resource on REST request."""
"""Test IKE policy create REST requests."""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkePolicyCreate, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def test_create_delete_ike_policy(self):
"""Create and then delete IKE policy."""
csr_request.normal_get):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id,
- u'encryption': u'aes',
+ u'encryption': u'aes256',
u'hash': u'sha',
u'dhGroup': 5,
u'lifetime': 3600}
"""Test IPSec policy create REST requests."""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecPolicyCreate, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def test_create_delete_ipsec_policy(self):
"""Create and then delete IPSec policy."""
policy_info = {
u'policy-id': u'%s' % policy_id,
u'protection-suite': {
- u'esp-encryption': u'esp-aes',
+ u'esp-encryption': u'esp-256-aes',
u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac',
},
u'lifetime-sec': 120,
u'pfs': u'group5',
- u'anti-replay-window-size': u'128'
+ u'anti-replay-window-size': u'disable'
}
location = self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
u'lifetime-kb': 4608000,
u'idle-time': None}
expected_policy.update(policy_info)
+ # CSR will respond with capitalized value
+ expected_policy[u'anti-replay-window-size'] = u'Disable'
self.assertEqual(expected_policy, content)
# Now delete and verify the IPSec policy is gone
with httmock.HTTMock(csr_request.token, csr_request.delete,
with httmock.HTTMock(csr_request.token, csr_request.post,
csr_request.normal_get):
policy_info = {
- u'policy-id': u'%s' % dummy_uuid,
+ u'policy-id': u'%s' % dummy_policy_id,
u'protection-suite': {
- u'esp-encryption': u'esp-aes',
+ u'esp-encryption': u'esp-256-aes',
u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac',
},
u'lifetime-sec': 120,
u'pfs': u'group5',
- u'anti-replay-window-size': u'128'
+ u'anti-replay-window-size': u'disable'
}
location = self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
- self.assertIn('vpn-svc/ipsec/policies/%s' % dummy_uuid,
+ self.assertIn('vpn-svc/ipsec/policies/%s' % dummy_policy_id,
location)
# Check the hard-coded items that get set as well...
content = self.csr.get_request(location, full_url=True)
u'lifetime-kb': 4608000,
u'idle-time': None}
expected_policy.update(policy_info)
+ # CSR will respond with capitalized value
+ expected_policy[u'anti-replay-window-size'] = u'Disable'
self.assertEqual(expected_policy, content)
def test_create_ipsec_policy_without_ah(self):
self.csr.create_ipsec_policy(policy_info)
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
+ def test_create_ipsec_policy_with_invalid_name(self):
+ """Failure test of creating IPSec policy with name too long."""
+ with httmock.HTTMock(csr_request.token, csr_request.post_bad_name,
+ csr_request.get_defaults):
+ policy_id = 'policy-name-is-too-long-32-chars'
+ policy_info = {
+ u'policy-id': u'%s' % policy_id,
+ }
+ self.csr.create_ipsec_policy(policy_info)
+ self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
+
class TestCsrRestPreSharedKeyCreate(base.BaseTestCase):
"""Test Pre-shared key (PSK) create REST requests."""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestPreSharedKeyCreate, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def test_create_delete_pre_shared_key(self):
"""Create and then delete a keyring entry for pre-shared key."""
with a real CSR (as we can't mock out these pre-conditions.
"""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecConnectionCreate, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def _make_psk_for_test(self):
psk_id = generate_pre_shared_key_id()
},
u'lifetime-sec': 120,
u'pfs': u'group5',
- u'anti-replay-window-size': u'64'
+ u'anti-replay-window-size': u'disable'
}
self.csr.create_ipsec_policy(policy_info)
if self.csr.status != requests.codes.CREATED:
"""Test IKE keepalive REST requests.
- This is a global configuration that will apply to all VPN tunnels and
- is used to specify Dead Peer Detection information. Currently, the API
- supports DELETE API, but a bug has been created to remove the API and
- add an indicator of when the capability is disabled.
+ Note: On the Cisco CSR, the IKE keepalive for v1 is a global configuration
+ that applies to all VPN tunnels to specify Dead Peer Detection information.
+ As a result, this REST API is not used in the OpenStack device driver, and
+ the keepalive will default to zero (disabled).
"""
- def setUp(self):
+ def _save_dpd_info(self):
+ with httmock.HTTMock(csr_request.token, csr_request.normal_get):
+ details = self.csr.get_request('vpn-svc/ike/keepalive')
+ if self.csr.status == requests.codes.OK:
+ self.dpd = details
+ self.addCleanup(self._restore_dpd_info)
+ elif self.csr.status != requests.codes.NOT_FOUND:
+ self.fail("Unable to save original DPD info")
+
+ def _restore_dpd_info(self):
+ with httmock.HTTMock(csr_request.token, csr_request.put):
+ payload = {'interval': self.dpd['interval'],
+ 'retry': self.dpd['retry']}
+ self.csr.put_request('vpn-svc/ike/keepalive', payload=payload)
+ if self.csr.status != requests.codes.NO_CONTENT:
+ self.fail("Unable to restore DPD info after test")
+
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkeKeepaliveCreate, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
+ self._save_dpd_info()
+ self.csr.token = None
def test_configure_ike_keepalive(self):
"""Set IKE keep-alive (aka Dead Peer Detection) for the CSR."""
"""Disable IKE keep-alive (aka Dead Peer Detection) for the CSR."""
with httmock.HTTMock(csr_request.token, csr_request.delete,
csr_request.put, csr_request.get_not_configured):
- if csr_request.FIXED_CSCum10324:
- # TODO(pcm) Is this how to disable?
- keepalive_info = {'interval': 0, 'retry': 4}
- self.csr.configure_ike_keepalive(keepalive_info)
- self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
- else:
- self.csr.delete_request('vnc-svc/ike/keepalive')
- self.assertIn(self.csr.status,
- (requests.codes.NO_CONTENT,
- requests.codes.NOT_FOUND))
- self.csr.get_request('vpn-svc/ike/keepalive')
- self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
+ keepalive_info = {'interval': 0, 'retry': 4}
+ self.csr.configure_ike_keepalive(keepalive_info)
+ self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
class TestCsrRestStaticRoute(base.BaseTestCase):
a route for each of the peer CIDRs specified for the VPN connection.
"""
- def setUp(self):
+ def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestStaticRoute, self).setUp()
- self.csr = csr_client.CsrRestClient('localhost', '10.10.10.10',
- 'stack', 'cisco')
+ self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
+ timeout)
def test_create_delete_static_route(self):
"""Create and then delete a static route for the tunnel."""
self.conn_info)
self.assertEqual(expected, policy_info)
+ def test_create_ike_policy_info_different_encryption(self):
+ """Ensure that IKE policy info is mapped/created correctly."""
+ self.conn_info['ike_policy']['encryption_algorithm'] = 'aes-192'
+ expected = {u'priority-id': 222,
+ u'encryption': u'aes192',
+ u'hash': u'sha',
+ u'dhGroup': 5,
+ u'version': u'v1',
+ u'lifetime': 3600}
+ policy_id = self.conn_info['cisco']['ike_policy_id']
+ policy_info = self.ipsec_conn.create_ike_policy_info(policy_id,
+ self.conn_info)
+ self.assertEqual(expected, policy_info)
+
def test_create_ike_policy_info_non_defaults(self):
"""Ensure that IKE policy info with different values."""
self.conn_info['ike_policy'] = {
'lifetime_value': 60
}
expected = {u'priority-id': 222,
- u'encryption': u'aes', # TODO(pcm): fix
+ u'encryption': u'aes256',
u'hash': u'sha',
u'dhGroup': 14,
u'version': u'v1',
self.assertEqual(expected, policy_info)
def test_ipsec_policy_info(self):
- """Ensure that IPSec policy info is mapped/created correctly."""
+ """Ensure that IPSec policy info is mapped/created correctly.
+
+ Note: That although the default for anti-replay-window-size on the
+ CSR is 64, we force it to disabled, for OpenStack use.
+ """
expected = {u'policy-id': 333,
u'protection-suite': {
u'esp-encryption': u'esp-aes',
},
u'lifetime-sec': 3600,
u'pfs': u'group5',
- u'anti-replay-window-size': u'64'}
+ u'anti-replay-window-size': u'disable'}
+ ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id']
+ policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id,
+ self.conn_info)
+ self.assertEqual(expected, policy_info)
+
+ def test_ipsec_policy_info_different_encryption(self):
+ """Create IPSec policy with different settings."""
+ self.conn_info['ipsec_policy']['transform_protocol'] = 'ah-esp'
+ self.conn_info['ipsec_policy']['encryption_algorithm'] = 'aes-192'
+ expected = {u'policy-id': 333,
+ u'protection-suite': {
+ u'esp-encryption': u'esp-192-aes',
+ u'esp-authentication': u'esp-sha-hmac',
+ u'ah': u'ah-sha-hmac'
+ },
+ u'lifetime-sec': 3600,
+ u'pfs': u'group5',
+ u'anti-replay-window-size': u'disable'}
ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id']
policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id,
self.conn_info)
'auth_algorithm': 'sha1',
'pfs': 'group14',
'lifetime_units': 'seconds',
- 'lifetime_value': 120}
+ 'lifetime_value': 120,
+ 'anti-replay-window-size': 'disable'}
expected = {u'policy-id': 333,
u'protection-suite': {
u'esp-encryption': u'esp-3des',
},
u'lifetime-sec': 120,
u'pfs': u'group14',
- u'anti-replay-window-size': u'64'}
+ u'anti-replay-window-size': u'disable'}
ipsec_policy_id = self.conn_info['cisco']['ipsec_policy_id']
policy_info = self.ipsec_conn.create_ipsec_policy_info(ipsec_policy_id,
self.conn_info)
Shows the case where all the connections are down, so that the
service should report as DOWN, as well.
"""
- # Simulated one service with two ACTIVE connections
+ # Simulate one service with two ACTIVE connections
conn1_data = {u'id': u'1', u'status': constants.ACTIVE,
u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel1'}}
is deleted and the remaining connection is DOWN, the service will
indicate as DOWN.
"""
- # Simulated one service with one connection up, one down
+ # Simulate one service with one connection up, one down
conn1_data = {u'id': u'1', u'status': constants.ACTIVE,
u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel1'}}
When the service is admin down, all the connections will report
as DOWN.
"""
- # Simulated one service (admin down) with two ACTIVE connections
+ # Simulate one service (admin down) with two ACTIVE connections
conn1_data = {u'id': u'1', u'status': constants.ACTIVE,
u'admin_state_up': True,
u'cisco': {u'site_conn_id': u'Tunnel1'}}