import random
-# TODO(pcm) Uncomment when httmock is added to test requirements.
-# import httmock
+# TODO(pcm): Remove when update to requests-mock package. Comment out, if use
+# local copy of httmock.py source. Needed for PEP8.
+import httmock
import requests
from neutron.openstack.common import log as logging
from neutron.services.vpn.device_drivers import (
cisco_csr_rest_client as csr_client)
from neutron.tests import base
-from neutron.tests.unit.services.vpn import device_drivers
-# TODO(pcm) Remove once httmock is available. In the meantime, use
-# temporary local copy of httmock source to run UT
+from neutron.tests.unit.services.vpn.device_drivers import (
+ cisco_csr_mock as csr_request)
+# TODO(pcm) Uncomment to run w/local copy of httmock.py source. Remove when
+# update to requests-mock package.
+# from neutron.tests.unit.services.vpn.device_drivers import httmock
LOG = logging.getLogger(__name__)
dummy_policy_id = 'dummy-ipsec-policy-id-name'
-httmock = device_drivers.httmock
-
# Note: Helper functions to test reuse of IDs.
def generate_pre_shared_key_id():
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrLoginRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def test_get_token(self):
"""Obtain the token and its expiration time."""
- with httmock.HTTMock(device_drivers.csr_request.token):
+ with httmock.HTTMock(csr_request.token):
self.assertTrue(self.csr.authenticate())
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertIsNotNone(self.csr.token)
def test_unauthorized_token_request(self):
"""Negative test of invalid user/password."""
self.csr.auth = ('stack', 'bogus')
- with httmock.HTTMock(device_drivers.csr_request.token_unauthorized):
+ with httmock.HTTMock(csr_request.token_unauthorized):
self.assertIsNone(self.csr.authenticate())
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
"""Negative test of request to non-existent host."""
self.csr.host = 'wrong-host'
self.csr.token = 'Set by some previously successful access'
- with httmock.HTTMock(device_drivers.csr_request.token_wrong_host):
+ with httmock.HTTMock(csr_request.token_wrong_host):
self.assertIsNone(self.csr.authenticate())
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
self.assertIsNone(self.csr.token)
def test_timeout_on_token_access(self):
"""Negative test of a timeout on a request."""
- with httmock.HTTMock(device_drivers.csr_request.token_timeout):
+ with httmock.HTTMock(csr_request.token_timeout):
self.assertIsNone(self.csr.authenticate())
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
self.assertIsNone(self.csr.token)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrGetRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def test_valid_rest_gets(self):
"""Simple GET requests.
that there are two interfaces on the CSR.
"""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.normal_get):
content = self.csr.get_request('global/host-name')
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertIn('host-name', content)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrPostRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def test_post_requests(self):
"""Simple POST requests (repeatable).
that there are two interfaces (Ge1 and Ge2) on the CSR.
"""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
content = self.csr.post_request(
'interfaces/GigabitEthernet1/statistics',
payload={'action': 'clear'})
def test_post_with_location(self):
"""Create a user and verify that location returned."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
location = self.csr.post_request(
'global/local-users',
payload={'username': 'test-user',
def test_post_missing_required_attribute(self):
"""Negative test of POST with missing mandatory info."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
self.csr.post_request('global/local-users',
payload={'password': 'pass12345',
'privilege': 15})
def test_post_invalid_attribute(self):
"""Negative test of POST with invalid info."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
self.csr.post_request('global/local-users',
payload={'username': 'test-user',
'password': 'pass12345',
Uses the lower level _do_request() API to just perform the POST and
obtain the response, without any error processing.
"""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
location = self.csr._do_request(
'POST',
'global/local-users',
more_headers=csr_client.HEADER_CONTENT_TYPE_JSON)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('global/local-users/test-user', location)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post_change_attempt):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post_change_attempt):
self.csr._do_request(
'POST',
'global/local-users',
def test_post_changing_value(self):
"""Negative test of a POST trying to change a value."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
location = self.csr.post_request(
'global/local-users',
payload={'username': 'test-user',
'privilege': 15})
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('global/local-users/test-user', location)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post_change_attempt):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post_change_attempt):
content = self.csr.post_request('global/local-users',
payload={'username': 'test-user',
'password': 'changed',
"""Test CSR PUT REST API."""
def _save_resources(self):
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.normal_get):
details = self.csr.get_request('global/host-name')
if self.csr.status != requests.codes.OK:
self.fail("Unable to save original host name")
self.csr.auth = (user, password)
self.csr.token = None
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.put):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.put):
payload = {'host-name': self.original_host}
self.csr.put_request('global/host-name', payload=payload)
if self.csr.status != requests.codes.NO_CONTENT:
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
"""Prepare for PUT API tests."""
super(TestCsrPutRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
+
self._save_resources()
self.addCleanup(self._restore_resources, 'stack', 'cisco')
that there are two interfaces on the CSR (Ge1 and Ge2).
"""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.put,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.put,
+ csr_request.normal_get):
payload = {'host-name': 'TestHost'}
content = self.csr.put_request('global/host-name',
payload=payload)
This was a problem with an earlier version of the CSR image and is
here to prevent regression.
"""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.put,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.put,
+ csr_request.normal_get):
payload = {'description': u'Changed description',
'if-name': self.original_if['if-name'],
'ip-address': self.original_if['ip-address'],
test setup to change the description to a non-empty string to
avoid failures in other tests.
"""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.put,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.put,
+ csr_request.normal_get):
payload = {'description': '',
'if-name': self.original_if['if-name'],
'ip-address': self.original_if['ip-address'],
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrDeleteRestApi, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def _make_dummy_user(self):
"""Create a user that will be later deleted."""
def test_delete_requests(self):
"""Simple DELETE requests (creating entry first)."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.delete):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.delete):
self._make_dummy_user()
self.csr.token = None # Force login
self.csr.delete_request('global/local-users/dummy')
def test_delete_non_existent_entry(self):
"""Negative test of trying to delete a non-existent user."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.delete_unknown):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.delete_unknown):
content = self.csr.delete_request('global/local-users/unknown')
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
expected = {u'error-code': -1,
def test_delete_not_allowed(self):
"""Negative test of trying to delete the host-name."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.delete_not_allowed):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.delete_not_allowed):
self.csr.delete_request('global/host-name')
self.assertEqual(requests.codes.METHOD_NOT_ALLOWED,
self.csr.status)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=0.1):
super(TestCsrRestApiFailures, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def test_request_for_non_existent_resource(self):
"""Negative test of non-existent resource on REST request."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.no_such_resource):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.no_such_resource):
self.csr.post_request('no/such/request')
self.assertEqual(requests.codes.NOT_FOUND, self.csr.status)
# The result is HTTP 404 message, so no error content to check
def test_timeout_during_request(self):
"""Negative test of timeout during REST request."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.timeout):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.timeout):
self.csr._do_request('GET', 'global/host-name')
self.assertEqual(requests.codes.REQUEST_TIMEOUT, self.csr.status)
token by changing it.
"""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.expired_request,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.expired_request,
+ csr_request.normal_get):
self.csr.token = '123' # These are 44 characters, so won't match
content = self.csr._do_request('GET', 'global/host-name')
self.assertEqual(requests.codes.OK, self.csr.status)
def test_failed_to_obtain_token_for_request(self):
"""Negative test of unauthorized user for REST request."""
self.csr.auth = ('stack', 'bogus')
- with httmock.HTTMock(device_drivers.csr_request.token_unauthorized):
+ with httmock.HTTMock(csr_request.token_unauthorized):
self.csr._do_request('GET', 'global/host-name')
self.assertEqual(requests.codes.UNAUTHORIZED, self.csr.status)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkePolicyCreate, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def test_create_delete_ike_policy(self):
"""Create and then delete IKE policy."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id,
u'encryption': u'aes256',
expected_policy.update(policy_info)
self.assertEqual(expected_policy, content)
# Now delete and verify the IKE policy is gone
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.delete,
- device_drivers.csr_request.no_such_resource):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.delete,
+ csr_request.no_such_resource):
self.csr.delete_ike_policy(policy_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(location, full_url=True)
def test_create_ike_policy_with_defaults(self):
"""Create IKE policy using defaults for all optional values."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.get_defaults):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.get_defaults):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id}
location = self.csr.create_ike_policy(policy_info)
def test_create_duplicate_ike_policy(self):
"""Negative test of trying to create a duplicate IKE policy."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
policy_id = '2'
policy_info = {u'priority-id': u'%s' % policy_id,
u'encryption': u'aes',
location = self.csr.create_ike_policy(policy_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/ike/policies/%s' % policy_id, location)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post_duplicate):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post_duplicate):
location = self.csr.create_ike_policy(policy_info)
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
expected = {u'error-code': -1,
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecPolicyCreate, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def test_create_delete_ipsec_policy(self):
"""Create and then delete IPSec policy."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
policy_id = '123'
policy_info = {
u'policy-id': u'%s' % policy_id,
expected_policy[u'anti-replay-window-size'] = u'Disable'
self.assertEqual(expected_policy, content)
# Now delete and verify the IPSec policy is gone
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.delete,
- device_drivers.csr_request.no_such_resource):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.delete,
+ csr_request.no_such_resource):
self.csr.delete_ipsec_policy(policy_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(location, full_url=True)
def test_create_ipsec_policy_with_defaults(self):
"""Create IPSec policy with default for all optional values."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.get_defaults):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.get_defaults):
policy_id = '123'
policy_info = {
u'policy-id': u'%s' % policy_id,
def test_create_ipsec_policy_with_uuid(self):
"""Create IPSec policy using UUID for id."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
policy_info = {
u'policy-id': u'%s' % dummy_policy_id,
u'protection-suite': {
def test_create_ipsec_policy_without_ah(self):
"""Create IPSec policy."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.get_no_ah):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.get_no_ah):
policy_id = '10'
policy_info = {
u'policy-id': u'%s' % policy_id,
def test_invalid_ipsec_policy_lifetime(self):
"""Failure test of IPSec policy with unsupported lifetime."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post_bad_lifetime):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post_bad_lifetime):
policy_id = '123'
policy_info = {
u'policy-id': u'%s' % policy_id,
def test_create_ipsec_policy_with_invalid_name(self):
"""Failure test of creating IPSec policy with name too long."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post_bad_name,
- device_drivers.csr_request.get_defaults):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post_bad_name,
+ csr_request.get_defaults):
policy_id = 'policy-name-is-too-long-32-chars'
policy_info = {
u'policy-id': u'%s' % policy_id,
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestPreSharedKeyCreate, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def test_create_delete_pre_shared_key(self):
"""Create and then delete a keyring entry for pre-shared key."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
psk_id = '5'
psk_info = {u'keyring-name': u'%s' % psk_id,
u'pre-shared-key-list': [
u'10.10.10.20 255.255.255.0')
self.assertEqual(expected_policy, content)
# Now delete and verify pre-shared key is gone
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.delete,
- device_drivers.csr_request.no_such_resource):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.delete,
+ csr_request.no_such_resource):
self.csr.delete_pre_shared_key(psk_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(location, full_url=True)
def test_create_pre_shared_key_with_fqdn_peer(self):
"""Create pre-shared key using FQDN for peer address."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.get_fqdn):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.get_fqdn):
psk_id = '5'
psk_info = {u'keyring-name': u'%s' % psk_id,
u'pre-shared-key-list': [
def test_create_pre_shared_key_with_duplicate_peer_address(self):
"""Negative test of creating a second pre-shared key with same peer."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
psk_id = '5'
psk_info = {u'keyring-name': u'%s' % psk_id,
u'pre-shared-key-list': [
location = self.csr.create_pre_shared_key(psk_info)
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/ike/keyrings/%s' % psk_id, location)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post_duplicate):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post_duplicate):
psk_id = u'6'
another_psk_info = {u'keyring-name': psk_id,
u'pre-shared-key-list': [
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIPSecConnectionCreate, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def _make_psk_for_test(self):
psk_id = generate_pre_shared_key_id()
self._remove_resource_for_test(self.csr.delete_pre_shared_key,
psk_id)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
psk_info = {u'keyring-name': u'%d' % psk_id,
u'pre-shared-key-list': [
{u'key': u'super-secret',
policy_id = generate_ike_policy_id()
self._remove_resource_for_test(self.csr.delete_ike_policy,
policy_id)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
policy_info = {u'priority-id': u'%d' % policy_id,
u'encryption': u'aes',
u'hash': u'sha',
policy_id = generate_ipsec_policy_id()
self._remove_resource_for_test(self.csr.delete_ipsec_policy,
policy_id)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
policy_info = {
u'policy-id': u'%d' % policy_id,
u'protection-suite': {
return policy_id
def _remove_resource_for_test(self, delete_resource, resource_id):
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.delete):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.delete):
delete_resource(resource_id)
def _prepare_for_site_conn_create(self, skip_psk=False, skip_ike=False,
def test_create_delete_ipsec_connection(self):
"""Create and then delete an IPSec connection."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
expected_connection.update(connection_info)
self.assertEqual(expected_connection, content)
# Now delete and verify that site-to-site connection is gone
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.delete,
- device_drivers.csr_request.no_such_resource):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.delete,
+ csr_request.no_such_resource):
# Only delete connection. Cleanup will take care of prerequisites
self.csr.delete_ipsec_connection('Tunnel%d' % tunnel_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
def test_create_ipsec_connection_with_no_tunnel_subnet(self):
"""Create an IPSec connection without an IP address on tunnel."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.get_unnumbered):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.get_unnumbered):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
skip_psk=True)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
skip_ike=True)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
"""Create IPSec connection in admin down state."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
tunnel = u'Tunnel%d' % tunnel_id
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post):
connection_info = {
u'vpn-interface-name': tunnel,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
u'vpn-interface-name': tunnel,
u'line-protocol-state': u'down',
u'enabled': False}
- with httmock.HTTMock(device_drivers.csr_request.put,
- device_drivers.csr_request.get_admin_down):
+ with httmock.HTTMock(csr_request.put,
+ csr_request.get_admin_down):
self.csr.set_ipsec_connection_state(tunnel, admin_up=False)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(state_uri, full_url=True)
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertEqual(expected_state, content)
- with httmock.HTTMock(device_drivers.csr_request.put,
- device_drivers.csr_request.get_admin_up):
+ with httmock.HTTMock(csr_request.put,
+ csr_request.get_admin_up):
self.csr.set_ipsec_connection_state(tunnel, admin_up=True)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
content = self.csr.get_request(state_uri, full_url=True)
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create(
skip_ipsec=True)
with httmock.HTTMock(
- device_drivers.csr_request.token,
- device_drivers.csr_request.post_missing_ipsec_policy):
+ csr_request.token,
+ csr_request.post_missing_ipsec_policy):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
self.assertEqual(requests.codes.BAD_REQUEST, self.csr.status)
def _determine_conflicting_ip(self):
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.get_local_ip):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.get_local_ip):
details = self.csr.get_request('interfaces/GigabitEthernet3')
if self.csr.status != requests.codes.OK:
self.fail("Unable to obtain interface GigabitEthernet3's IP")
conflicting_ip = self._determine_conflicting_ip()
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post_bad_ip):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post_bad_ip):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
def test_create_ipsec_connection_with_max_mtu(self):
"""Create an IPSec connection with max MTU value."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.get_mtu):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.get_mtu):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
def test_create_ipsec_connection_with_bad_mtu(self):
"""Negative test of connection create with unsupported MTU value."""
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post_bad_mtu):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post_bad_mtu):
connection_info = {
u'vpn-interface-name': u'Tunnel%d' % tunnel_id,
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
def test_status_when_no_tunnels_exist(self):
"""Get status, when there are no tunnels."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.get_none):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.get_none):
tunnels = self.csr.read_tunnel_statuses()
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertEqual([], tunnels)
# Create the IPsec site-to-site connection first
tunnel_id, ipsec_policy_id = self._prepare_for_site_conn_create()
tunnel_id = 123 # Must hard code to work with mock
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
connection_info = {
u'vpn-interface-name': u'Tunnel123',
u'ipsec-policy-id': u'%d' % ipsec_policy_id,
self.assertEqual(requests.codes.CREATED, self.csr.status)
self.assertIn('vpn-svc/site-to-site/Tunnel%d' % tunnel_id,
location)
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.normal_get):
tunnels = self.csr.read_tunnel_statuses()
self.assertEqual(requests.codes.OK, self.csr.status)
self.assertEqual([(u'Tunnel123', u'DOWN-NEGOTIATING'), ], tunnels)
"""
def _save_dpd_info(self):
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.normal_get):
details = self.csr.get_request('vpn-svc/ike/keepalive')
if self.csr.status == requests.codes.OK:
self.dpd = details
self.fail("Unable to save original DPD info")
def _restore_dpd_info(self):
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.put):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.put):
payload = {'interval': self.dpd['interval'],
'retry': self.dpd['retry']}
self.csr.put_request('vpn-svc/ike/keepalive', payload=payload)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestIkeKeepaliveCreate, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
self._save_dpd_info()
self.csr.token = None
def test_configure_ike_keepalive(self):
"""Set IKE keep-alive (aka Dead Peer Detection) for the CSR."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.put,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.put,
+ csr_request.normal_get):
keepalive_info = {'interval': 60, 'retry': 4}
self.csr.configure_ike_keepalive(keepalive_info)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
def test_disable_ike_keepalive(self):
"""Disable IKE keep-alive (aka Dead Peer Detection) for the CSR."""
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.delete,
- device_drivers.csr_request.put,
- device_drivers.csr_request.get_not_configured):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.delete,
+ csr_request.put,
+ csr_request.get_not_configured):
keepalive_info = {'interval': 0, 'retry': 4}
self.csr.configure_ike_keepalive(keepalive_info)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)
def setUp(self, host='localhost', tunnel_ip='10.10.10.10', timeout=None):
super(TestCsrRestStaticRoute, self).setUp()
- self.csr = csr_client.CsrRestClient(host, tunnel_ip, 'stack', 'cisco',
- timeout)
+ info = {'rest_mgmt': host, 'tunnel_ip': tunnel_ip,
+ 'username': 'stack', 'password': 'cisco', 'timeout': timeout}
+ self.csr = csr_client.CsrRestClient(info)
def test_create_delete_static_route(self):
"""Create and then delete a static route for the tunnel."""
cidr = u'10.1.0.0/24'
interface = u'GigabitEthernet1'
expected_id = '10.1.0.0_24_GigabitEthernet1'
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.post,
- device_drivers.csr_request.normal_get):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.post,
+ csr_request.normal_get):
route_info = {u'destination-network': cidr,
u'outgoing-interface': interface}
location = self.csr.create_static_route(route_info)
expected_route.update(route_info)
self.assertEqual(expected_route, content)
# Now delete and verify that static route is gone
- with httmock.HTTMock(device_drivers.csr_request.token,
- device_drivers.csr_request.delete,
- device_drivers.csr_request.no_such_resource):
+ with httmock.HTTMock(csr_request.token,
+ csr_request.delete,
+ csr_request.no_such_resource):
route_id = csr_client.make_route_id(cidr, interface)
self.csr.delete_static_route(route_id)
self.assertEqual(requests.codes.NO_CONTENT, self.csr.status)