# The following parameters are supported:
# servers : <host:port>[,<host:port>]* (Error if not set)
# server_auth : <username:password> (default: no auth)
-# server_ssl : True | False (default: False)
+# server_ssl : True | False (default: True)
+# ssl_cert_directory : <path> (default: /etc/neutron/plugins/bigswitch/ssl)
+# no_ssl_validation : True | False (default: False)
+# ssl_sticky : True | False (default: True)
# sync_data : True | False (default: False)
# auto_sync_on_failure : True | False (default: True)
# server_timeout : <integer> (default: 10 seconds)
# server_auth=username:password
# Use SSL when connecting to the BigSwitch or Floodlight controller.
-# server_ssl=False
+# server_ssl=True
+
+# Directory which contains the ca_certs and host_certs to be used to validate
+# controller certificates.
+# ssl_cert_directory=/etc/neutron/plugins/bigswitch/ssl/
+
+# If a certificate does not exist for a controller, trust and store the first
+# certificate received for that controller and use it to validate future
+# connections to that controller.
+# ssl_sticky=True
+
+# Do not validate the controller certificates for SSL
+# Warning: This will not provide protection against man-in-the-middle attacks
+# no_ssl_validation=False
# Sync data on connect
# sync_data=False
--- /dev/null
+Certificates in this folder will be used to
+verify signatures for any controllers the plugin
+connects to.
--- /dev/null
+Certificates in this folder must match the name
+of the controller they should be used to authenticate
+with a .pem extension.
+
+For example, the certificate for the controller
+"192.168.0.1" should be named "192.168.0.1.pem".
cfg.StrOpt('server_auth', default=None, secret=True,
help=_("The username and password for authenticating against "
" the BigSwitch or Floodlight controller.")),
- cfg.BoolOpt('server_ssl', default=False,
+ cfg.BoolOpt('server_ssl', default=True,
help=_("If True, Use SSL when connecting to the BigSwitch or "
"Floodlight controller.")),
+ cfg.BoolOpt('ssl_sticky', default=True,
+ help=_("Trust and store the first certificate received for "
+ "each controller address and use it to validate future "
+ "connections to that address.")),
+ cfg.BoolOpt('no_ssl_validation', default=False,
+ help=_("Disables SSL certificate validation for controllers")),
+ cfg.BoolOpt('cache_connections', default=True,
+ help=_("Re-use HTTP/HTTPS connections to the controller.")),
+ cfg.StrOpt('ssl_cert_directory',
+ default='/etc/neutron/plugins/bigswitch/ssl',
+ help=_("Directory containing ca_certs and host_certs "
+ "certificate directories.")),
cfg.BoolOpt('sync_data', default=False,
help=_("Sync data on connect")),
cfg.BoolOpt('auto_sync_on_failure', default=True,
The following functionality is handled by this module:
- Translation of rest_* function calls to HTTP/HTTPS calls to the controllers
- Automatic failover between controllers
+- SSL Certificate enforcement
- HTTP Authentication
"""
import base64
import httplib
import json
+import os
import socket
+import ssl
import time
import eventlet
from neutron.common import exceptions
from neutron.common import utils
+from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch.db import consistency_db as cdb
"""REST server proxy to a network controller."""
def __init__(self, server, port, ssl, auth, neutron_id, timeout,
- base_uri, name, mypool):
+ base_uri, name, mypool, combined_cert):
self.server = server
self.port = port
self.ssl = ssl
self.capabilities = []
# enable server to reference parent pool
self.mypool = mypool
+ # cache connection here to avoid a SSL handshake for every connection
+ self.currentconn = None
if auth:
self.auth = 'Basic ' + base64.encodestring(auth).strip()
+ self.combined_cert = combined_cert
def get_capabilities(self):
try:
'cap': self.capabilities})
return self.capabilities
- def rest_call(self, action, resource, data='', headers={}, timeout=None):
+ def rest_call(self, action, resource, data='', headers={}, timeout=False,
+ reconnect=False):
uri = self.base_uri + resource
body = json.dumps(data)
if not headers:
headers['Instance-ID'] = self.neutron_id
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash
+ if 'keep-alive' in self.capabilities:
+ headers['Connection'] = 'keep-alive'
+ else:
+ reconnect = True
if self.auth:
headers['Authorization'] = self.auth
{'resource': resource, 'data': data, 'headers': headers,
'action': action})
- conn = None
- timeout = timeout or self.timeout
- if self.ssl:
- conn = httplib.HTTPSConnection(
- self.server, self.port, timeout=timeout)
- if conn is None:
- LOG.error(_('ServerProxy: Could not establish HTTPS '
- 'connection'))
- return 0, None, None, None
- else:
- conn = httplib.HTTPConnection(
- self.server, self.port, timeout=timeout)
- if conn is None:
- LOG.error(_('ServerProxy: Could not establish HTTP '
- 'connection'))
- return 0, None, None, None
+ # unspecified timeout is False because a timeout can be specified as
+ # None to indicate no timeout.
+ if timeout is False:
+ timeout = self.timeout
+
+ if timeout != self.timeout:
+ # need a new connection if timeout has changed
+ reconnect = True
+
+ if not self.currentconn or reconnect:
+ if self.currentconn:
+ self.currentconn.close()
+ if self.ssl:
+ self.currentconn = HTTPSConnectionWithValidation(
+ self.server, self.port, timeout=timeout)
+ self.currentconn.combined_cert = self.combined_cert
+ if self.currentconn is None:
+ LOG.error(_('ServerProxy: Could not establish HTTPS '
+ 'connection'))
+ return 0, None, None, None
+ else:
+ self.currentconn = httplib.HTTPConnection(
+ self.server, self.port, timeout=timeout)
+ if self.currentconn is None:
+ LOG.error(_('ServerProxy: Could not establish HTTP '
+ 'connection'))
+ return 0, None, None, None
try:
- conn.request(action, uri, body, headers)
- response = conn.getresponse()
+ self.currentconn.request(action, uri, body, headers)
+ response = self.currentconn.getresponse()
newhash = response.getheader(HASH_MATCH_HEADER)
if newhash:
self._put_consistency_hash(newhash)
# response was not JSON, ignore the exception
pass
ret = (response.status, response.reason, respstr, respdata)
+ except httplib.ImproperConnectionState:
+ # If we were using a cached connection, try again with a new one.
+ with excutils.save_and_reraise_exception() as ctxt:
+ if not reconnect:
+ ctxt.reraise = False
+
+ if self.currentconn:
+ self.currentconn.close()
+ return self.rest_call(action, resource, data, headers,
+ timeout=timeout, reconnect=True)
except (socket.timeout, socket.error) as e:
LOG.error(_('ServerProxy: %(action)s failure, %(e)r'),
{'action': action, 'e': e})
ret = 0, None, None, None
- conn.close()
LOG.debug(_("ServerProxy: status=%(status)d, reason=%(reason)r, "
"ret=%(ret)s, data=%(data)r"), {'status': ret[0],
'reason': ret[1],
class ServerPool(object):
- def __init__(self, timeout=10,
+ def __init__(self, timeout=False,
base_uri=BASE_URI, name='NeutronRestProxy'):
LOG.debug(_("ServerPool: initializing"))
# 'servers' is the list of network controller REST end-points
self.base_uri = base_uri
self.name = name
self.timeout = cfg.CONF.RESTPROXY.server_timeout
+ self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
default_port = 8000
- if timeout is not None:
+ if timeout is not False:
self.timeout = timeout
# Function to use to retrieve topology for consistency syncs.
return self.capabilities
def server_proxy_for(self, server, port):
+ combined_cert = self._get_combined_cert_for_server(server, port)
return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
- self.timeout, self.base_uri, self.name, mypool=self)
+ self.timeout, self.base_uri, self.name, mypool=self,
+ combined_cert=combined_cert)
+
+ def _get_combined_cert_for_server(self, server, port):
+ # The ssl library requires a combined file with all trusted certs
+ # so we make one containing the trusted CAs and the corresponding
+ # host cert for this server
+ combined_cert = None
+ if self.ssl and not cfg.CONF.RESTPROXY.no_ssl_validation:
+ base_ssl = cfg.CONF.RESTPROXY.ssl_cert_directory
+ host_dir = os.path.join(base_ssl, 'host_certs')
+ ca_dir = os.path.join(base_ssl, 'ca_certs')
+ combined_dir = os.path.join(base_ssl, 'combined')
+ combined_cert = os.path.join(combined_dir, '%s.pem' % server)
+ if not os.path.exists(base_ssl):
+ raise cfg.Error(_('ssl_cert_directory [%s] does not exist. '
+ 'Create it or disable ssl.') % base_ssl)
+ for automake in [combined_dir, ca_dir, host_dir]:
+ if not os.path.exists(automake):
+ os.makedirs(automake)
+
+ # get all CA certs
+ certs = self._get_ca_cert_paths(ca_dir)
+
+ # check for a host specific cert
+ hcert, exists = self._get_host_cert_path(host_dir, server)
+ if exists:
+ certs.append(hcert)
+ elif cfg.CONF.RESTPROXY.ssl_sticky:
+ self._fetch_and_store_cert(server, port, hcert)
+ certs.append(hcert)
+ if not certs:
+ raise cfg.Error(_('No certificates were found to verify '
+ 'controller %s') % (server))
+ self._combine_certs_to_file(certs, combined_cert)
+ return combined_cert
+
+ def _combine_certs_to_file(certs, cfile):
+ '''
+ Concatenates the contents of each certificate in a list of
+ certificate paths to one combined location for use with ssl
+ sockets.
+ '''
+ with open(cfile, 'w') as combined:
+ for c in certs:
+ with open(c, 'r') as cert_handle:
+ combined.write(cert_handle.read())
+
+ def _get_host_cert_path(self, host_dir, server):
+ '''
+ returns full path and boolean indicating existence
+ '''
+ hcert = os.path.join(host_dir, '%s.pem' % server)
+ if os.path.exists(hcert):
+ return hcert, True
+ return hcert, False
+
+ def _get_ca_cert_paths(self, ca_dir):
+ certs = [os.path.join(root, name)
+ for name in [
+ name for (root, dirs, files) in os.walk(ca_dir)
+ for name in files
+ ]
+ if name.endswith('.pem')]
+ return certs
+
+ def _fetch_and_store_cert(self, server, port, path):
+ '''
+ Grabs a certificate from a server and writes it to
+ a given path.
+ '''
+ try:
+ cert = ssl.get_server_certificate((server, port))
+ except Exception as e:
+ raise cfg.Error(_('Could not retrieve initial '
+ 'certificate from controller %(server)s. '
+ 'Error details: %(error)s'),
+ {'server': server, 'error': e.strerror})
+
+ LOG.warning(_("Storing to certificate for host %(server)s "
+ "at %(path)s") % {'server': server,
+ 'path': path})
+ self._file_put_contents(path, cert)
+
+ return cert
+
+ def _file_put_contents(path, contents):
+ # Simple method to write to file.
+ # Created for easy Mocking
+ with open(path, 'w') as handle:
+ handle.write(contents)
def server_failure(self, resp, ignore_codes=[]):
"""Define failure codes as required.
@utils.synchronized('bsn-rest-call')
def rest_call(self, action, resource, data, headers, ignore_codes,
- timeout=None):
+ timeout=False):
good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None
for active_server in good_first:
ret = active_server.rest_call(action, resource, data, headers,
- timeout)
+ timeout,
+ reconnect=self.always_reconnect)
# If inconsistent, do a full synchronization
if ret[0] == httplib.CONFLICT:
if not self.get_topo_function:
return first_response
def rest_action(self, action, resource, data='', errstr='%s',
- ignore_codes=[], headers={}, timeout=None):
+ ignore_codes=[], headers={}, timeout=False):
"""
Wrapper for rest_call that verifies success and raises a
RemoteRestError on failure with a provided error string
# that will be handled by the rest_call.
time.sleep(polling_interval)
self.servers.rest_call('GET', HEALTH_PATH)
+
+
+class HTTPSConnectionWithValidation(httplib.HTTPSConnection):
+
+ # If combined_cert is None, the connection will continue without
+ # any certificate validation.
+ combined_cert = None
+
+ def connect(self):
+ sock = socket.create_connection((self.host, self.port),
+ self.timeout, self.source_address)
+ if self._tunnel_host:
+ self.sock = sock
+ self._tunnel()
+
+ if self.combined_cert:
+ self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
+ cert_reqs=ssl.CERT_REQUIRED,
+ ca_certs=self.combined_cert)
+ else:
+ self.sock = ssl.wrap_socket(sock, self.key_file,
+ self.cert_file,
+ cert_reqs=ssl.CERT_NONE)
--- /dev/null
+ca_certs directory for SSL unit tests
+No files will be generated here, but it should exist for the tests
--- /dev/null
+combined certificates directory for SSL unit tests
+No files will be created here, but it should exist for the tests
--- /dev/null
+host_certs directory for SSL unit tests
+No files will be created here, but it should exist for the tests
raise Exception(msg)
super(VerifyMultiTenantFloatingIP,
self).request(action, uri, body, headers)
+
+
+class HTTPSMockBase(HTTPConnectionMock):
+ expected_cert = ''
+ combined_cert = None
+
+ def __init__(self, host, port=None, key_file=None, cert_file=None,
+ strict=None, timeout=None, source_address=None):
+ self.host = host
+ super(HTTPSMockBase, self).__init__(host, port, timeout)
+
+ def request(self, method, url, body=None, headers={}):
+ self.connect()
+ super(HTTPSMockBase, self).request(method, url, body, headers)
+
+
+class HTTPSNoValidation(HTTPSMockBase):
+
+ def connect(self):
+ if self.combined_cert:
+ raise Exception('combined_cert set on NoValidation')
+
+
+class HTTPSCAValidation(HTTPSMockBase):
+ expected_cert = 'DUMMYCERTIFICATEAUTHORITY'
+
+ def connect(self):
+ contents = get_cert_contents(self.combined_cert)
+ if self.expected_cert not in contents:
+ raise Exception('No dummy CA cert in cert_file')
+
+
+class HTTPSHostValidation(HTTPSMockBase):
+ expected_cert = 'DUMMYCERTFORHOST%s'
+
+ def connect(self):
+ contents = get_cert_contents(self.combined_cert)
+ expected = self.expected_cert % self.host
+ if expected not in contents:
+ raise Exception(_('No host cert for %(server)s in cert %(cert)s'),
+ {'server': self.host, 'cert': contents})
+
+
+def get_cert_contents(path):
+ raise Exception('METHOD MUST BE MOCKED FOR TEST')
'restproxy.ini.test')]
self.addCleanup(cfg.CONF.reset)
config.register_config()
+ # Only try SSL on SSL tests
+ cfg.CONF.set_override('server_ssl', False, 'RESTPROXY')
+ cfg.CONF.set_override('ssl_cert_directory',
+ os.path.join(etc_path, 'ssl'), 'RESTPROXY')
+ # The mock interferes with HTTP(S) connection caching
+ cfg.CONF.set_override('cache_connections', False, 'RESTPROXY')
def setup_patches(self):
self.httpPatch = mock.patch(HTTPCON, create=True,
--- /dev/null
+# Copyright 2014 Big Switch Networks, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Kevin Benton, kevin.benton@bigswitch.com
+#
+import os
+
+import mock
+from oslo.config import cfg
+import webob.exc
+
+from neutron.openstack.common import log as logging
+from neutron.tests.unit.bigswitch import fake_server
+from neutron.tests.unit.bigswitch import test_base
+from neutron.tests.unit import test_api_v2
+from neutron.tests.unit import test_db_plugin as test_plugin
+
+LOG = logging.getLogger(__name__)
+
+SERVERMANAGER = 'neutron.plugins.bigswitch.servermanager'
+HTTPS = SERVERMANAGER + '.HTTPSConnectionWithValidation'
+CERTCOMBINER = SERVERMANAGER + '.ServerPool._combine_certs_to_file'
+FILEPUT = SERVERMANAGER + '.ServerPool._file_put_contents'
+GETCACERTS = SERVERMANAGER + '.ServerPool._get_ca_cert_paths'
+GETHOSTCERT = SERVERMANAGER + '.ServerPool._get_host_cert_path'
+FAKECERTGET = 'neutron.tests.unit.bigswitch.fake_server.get_cert_contents'
+SSLGETCERT = 'ssl.get_server_certificate'
+
+
+class test_ssl_certificate_base(test_plugin.NeutronDbPluginV2TestCase,
+ test_base.BigSwitchTestBase):
+
+ plugin_str = ('%s.NeutronRestProxyV2' %
+ test_base.RESTPROXY_PKG_PATH)
+ servername = None
+ cert_base = None
+
+ def _setUp(self):
+ self.servername = test_api_v2._uuid()
+ self.cert_base = cfg.CONF.RESTPROXY.ssl_cert_directory
+ self.host_cert_val = 'DUMMYCERTFORHOST%s' % self.servername
+ self.host_cert_path = os.path.join(
+ self.cert_base,
+ 'host_certs',
+ '%s.pem' % self.servername
+ )
+ self.comb_cert_path = os.path.join(
+ self.cert_base,
+ 'combined',
+ '%s.pem' % self.servername
+ )
+ self.ca_certs_path = os.path.join(
+ self.cert_base,
+ 'ca_certs'
+ )
+ cfg.CONF.set_override('servers', ["%s:443" % self.servername],
+ 'RESTPROXY')
+ self.setup_patches()
+
+ # Mock method SSL lib uses to grab cert from server
+ self.sslgetcert_m = mock.patch(SSLGETCERT, create=True).start()
+ self.sslgetcert_m.return_value = self.host_cert_val
+
+ # Mock methods that write and read certs from the file-system
+ self.fileput_m = mock.patch(FILEPUT, create=True).start()
+ self.certcomb_m = mock.patch(CERTCOMBINER, create=True).start()
+ self.getcacerts_m = mock.patch(GETCACERTS, create=True).start()
+
+ # this is used to configure what certificate contents the fake HTTPS
+ # lib should expect to receive
+ self.fake_certget_m = mock.patch(FAKECERTGET, create=True).start()
+
+ def setUp(self):
+ super(test_ssl_certificate_base, self).setUp(self.plugin_str)
+
+
+class TestSslSticky(test_ssl_certificate_base):
+
+ def setUp(self):
+ self.setup_config_files()
+ cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
+ cfg.CONF.set_override('ssl_sticky', True, 'RESTPROXY')
+ self.httpsPatch = mock.patch(HTTPS, create=True,
+ new=fake_server.HTTPSHostValidation)
+ self.httpsPatch.start()
+ self._setUp()
+ # Set fake HTTPS connection's expectation
+ self.fake_certget_m.return_value = self.host_cert_val
+ # No CA certs for this test
+ self.getcacerts_m.return_value = []
+ super(TestSslSticky, self).setUp()
+
+ def test_sticky_cert(self):
+ # SSL connection should be successful and cert should be cached
+ with self.network():
+ # CA certs should have been checked for
+ self.getcacerts_m.assert_has_calls([mock.call(self.ca_certs_path)])
+ # cert should have been fetched via SSL lib
+ self.sslgetcert_m.assert_has_calls(
+ [mock.call((self.servername, 443))]
+ )
+
+ # cert should have been recorded
+ self.fileput_m.assert_has_calls([mock.call(self.host_cert_path,
+ self.host_cert_val)])
+ # no ca certs, so host cert only for this combined cert
+ self.certcomb_m.assert_has_calls([mock.call([self.host_cert_path],
+ self.comb_cert_path)])
+
+
+class TestSslHostCert(test_ssl_certificate_base):
+
+ def setUp(self):
+ self.setup_config_files()
+ cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
+ cfg.CONF.set_override('ssl_sticky', False, 'RESTPROXY')
+ self.httpsPatch = mock.patch(HTTPS, create=True,
+ new=fake_server.HTTPSHostValidation)
+ self.httpsPatch.start()
+ self._setUp()
+ # Set fake HTTPS connection's expectation
+ self.fake_certget_m.return_value = self.host_cert_val
+ # No CA certs for this test
+ self.getcacerts_m.return_value = []
+ # Pretend host cert exists
+ self.hcertpath_p = mock.patch(GETHOSTCERT,
+ return_value=(self.host_cert_path, True),
+ create=True).start()
+ super(TestSslHostCert, self).setUp()
+
+ def test_host_cert(self):
+ # SSL connection should be successful because of pre-configured cert
+ with self.network():
+ self.hcertpath_p.assert_has_calls([
+ mock.call(os.path.join(self.cert_base, 'host_certs'),
+ self.servername)
+ ])
+ # sticky is disabled, no fetching allowed
+ self.assertFalse(self.sslgetcert_m.call_count)
+ # no ca certs, so host cert is only for this combined cert
+ self.certcomb_m.assert_has_calls([mock.call([self.host_cert_path],
+ self.comb_cert_path)])
+
+
+class TestSslCaCert(test_ssl_certificate_base):
+
+ def setUp(self):
+ self.setup_config_files()
+ cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
+ cfg.CONF.set_override('ssl_sticky', False, 'RESTPROXY')
+ self.httpsPatch = mock.patch(HTTPS, create=True,
+ new=fake_server.HTTPSCAValidation)
+ self.httpsPatch.start()
+ self._setUp()
+
+ # pretend to have a few ca certs
+ self.getcacerts_m.return_value = ['ca1.pem', 'ca2.pem']
+
+ # Set fake HTTPS connection's expectation
+ self.fake_certget_m.return_value = 'DUMMYCERTIFICATEAUTHORITY'
+
+ super(TestSslCaCert, self).setUp()
+
+ def test_ca_cert(self):
+ # SSL connection should be successful because CA cert was present
+ # If not, attempting to create a network would raise an exception
+ with self.network():
+ # sticky is disabled, no fetching allowed
+ self.assertFalse(self.sslgetcert_m.call_count)
+ # 2 CAs and no host cert so combined should only contain both CAs
+ self.certcomb_m.assert_has_calls([mock.call(['ca1.pem', 'ca2.pem'],
+ self.comb_cert_path)])
+
+
+class TestSslWrongHostCert(test_ssl_certificate_base):
+
+ def setUp(self):
+ self.setup_config_files()
+ cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
+ cfg.CONF.set_override('ssl_sticky', True, 'RESTPROXY')
+ self.httpsPatch = mock.patch(HTTPS, create=True,
+ new=fake_server.HTTPSHostValidation)
+ self.httpsPatch.start()
+ self._setUp()
+
+ # Set fake HTTPS connection's expectation to something wrong
+ self.fake_certget_m.return_value = 'OTHERCERT'
+
+ # No CA certs for this test
+ self.getcacerts_m.return_value = []
+
+ # Pretend host cert exists
+ self.hcertpath_p = mock.patch(GETHOSTCERT,
+ return_value=(self.host_cert_path, True),
+ create=True).start()
+ super(TestSslWrongHostCert, self).setUp()
+
+ def test_error_no_cert(self):
+ # since there will already be a host cert, sticky should not take
+ # effect and there will be an error because the host cert's contents
+ # will be incorrect
+ tid = test_api_v2._uuid()
+ data = {}
+ data['network'] = {'tenant_id': tid, 'name': 'name',
+ 'admin_state_up': True}
+ req = self.new_create_request('networks', data, 'json')
+ res = req.get_response(self.api)
+ self.assertEqual(res.status_int,
+ webob.exc.HTTPInternalServerError.code)
+ self.hcertpath_p.assert_has_calls([
+ mock.call(os.path.join(self.cert_base, 'host_certs'),
+ self.servername)
+ ])
+ # sticky is enabled, but a host cert already exists so it shant fetch
+ self.assertFalse(self.sslgetcert_m.call_count)
+ # no ca certs, so host cert only for this combined cert
+ self.certcomb_m.assert_has_calls([mock.call([self.host_cert_path],
+ self.comb_cert_path)])
+
+
+class TestSslNoValidation(test_ssl_certificate_base):
+
+ def setUp(self):
+ self.setup_config_files()
+ cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
+ cfg.CONF.set_override('ssl_sticky', False, 'RESTPROXY')
+ cfg.CONF.set_override('no_ssl_validation', True, 'RESTPROXY')
+ self.httpsPatch = mock.patch(HTTPS, create=True,
+ new=fake_server.HTTPSNoValidation)
+ self.httpsPatch.start()
+ self._setUp()
+ super(TestSslNoValidation, self).setUp()
+
+ def test_validation_disabled(self):
+ # SSL connection should be successful without any certificates
+ # If not, attempting to create a network will raise an exception
+ with self.network():
+ # no sticky grabbing and no cert combining with no enforcement
+ self.assertFalse(self.sslgetcert_m.call_count)
+ self.assertFalse(self.certcomb_m.call_count)
etc/neutron/rootwrap.d/ryu-plugin.filters
etc/neutron/rootwrap.d/vpnaas.filters
etc/init.d = etc/init.d/neutron-server
- etc/neutron/plugins/bigswitch = etc/neutron/plugins/bigswitch/restproxy.ini
+ etc/neutron/plugins/bigswitch =
+ etc/neutron/plugins/bigswitch/restproxy.ini
+ etc/neutron/plugins/bigswitch/ssl/ca_certs/README
+ etc/neutron/plugins/bigswitch/ssl/host_certs/README
etc/neutron/plugins/brocade = etc/neutron/plugins/brocade/brocade.ini
etc/neutron/plugins/cisco = etc/neutron/plugins/cisco/cisco_plugins.ini
etc/neutron/plugins/hyperv = etc/neutron/plugins/hyperv/hyperv_neutron_plugin.ini