class TestWSGIServerWithSSL(base.BaseTestCase):
"""WSGI server tests."""
+ @mock.patch("exceptions.RuntimeError")
+ @mock.patch("os.path.exists")
+ def test__check_ssl_settings(self, exists_mock, runtime_error_mock):
+ exists_mock.return_value = True
+ CONF.set_default('use_ssl', True)
+ CONF.set_default("ssl_cert_file", 'certificate.crt')
+ CONF.set_default("ssl_key_file", 'privatekey.key')
+ CONF.set_default("ssl_ca_file", 'cacert.pem')
+ wsgi.Server("test_app")
+ self.assertFalse(runtime_error_mock.called)
+
+ @mock.patch("os.path.exists")
+ def test__check_ssl_settings_no_ssl_cert_file_fails(self, exists_mock):
+ exists_mock.side_effect = [False]
+ CONF.set_default('use_ssl', True)
+ CONF.set_default("ssl_cert_file", "/no/such/file")
+ self.assertRaises(RuntimeError, wsgi.Server, "test_app")
+
+ @mock.patch("os.path.exists")
+ def test__check_ssl_settings_no_ssl_key_file_fails(self, exists_mock):
+ exists_mock.side_effect = [True, False]
+ CONF.set_default('use_ssl', True)
+ CONF.set_default("ssl_cert_file", 'certificate.crt')
+ CONF.set_default("ssl_key_file", "/no/such/file")
+ self.assertRaises(RuntimeError, wsgi.Server, "test_app")
+
+ @mock.patch("os.path.exists")
+ def test__check_ssl_settings_no_ssl_ca_file_fails(self, exists_mock):
+ exists_mock.side_effect = [True, True, False]
+ CONF.set_default('use_ssl', True)
+ CONF.set_default("ssl_cert_file", 'certificate.crt')
+ CONF.set_default("ssl_key_file", 'privatekey.key')
+ CONF.set_default("ssl_ca_file", "/no/such/file")
+ self.assertRaises(RuntimeError, wsgi.Server, "test_app")
+
+ @mock.patch("ssl.wrap_socket")
+ @mock.patch("os.path.exists")
+ def _test_wrap_ssl(self, exists_mock, wrap_socket_mock, **kwargs):
+ exists_mock.return_value = True
+ sock = mock.Mock()
+ CONF.set_default("ssl_cert_file", 'certificate.crt')
+ CONF.set_default("ssl_key_file", 'privatekey.key')
+ ssl_kwargs = {'server_side': True,
+ 'certfile': CONF.ssl_cert_file,
+ 'keyfile': CONF.ssl_key_file,
+ 'cert_reqs': ssl.CERT_NONE,
+ }
+ if kwargs:
+ ssl_kwargs.update(**kwargs)
+ server = wsgi.Server("test_app")
+ server.wrap_ssl(sock)
+ wrap_socket_mock.assert_called_once_with(sock, **ssl_kwargs)
+
+ def test_wrap_ssl(self):
+ self._test_wrap_ssl()
+
+ def test_wrap_ssl_ca_file(self):
+ CONF.set_default("ssl_ca_file", 'cacert.pem')
+ ssl_kwargs = {'ca_certs': CONF.ssl_ca_file,
+ 'cert_reqs': ssl.CERT_REQUIRED
+ }
+ self._test_wrap_ssl(**ssl_kwargs)
+
def test_app_using_ssl(self):
CONF.set_default('use_ssl', True)
CONF.set_default("ssl_cert_file",
# existing sql connections avoids producing 500 errors later when they
# are discovered to be broken.
api.dispose()
+ if CONF.use_ssl:
+ self._service._socket = self._service.wrap_ssl(
+ self._service._socket)
self._server = self._service.pool.spawn(self._service._run,
self._application,
self._service._socket)
# A value of 0 is converted to None because None is what causes the
# wsgi server to wait forever.
self.client_socket_timeout = CONF.client_socket_timeout or None
+ if CONF.use_ssl:
+ self._check_ssl_settings()
def _get_socket(self, host, port, backlog):
bind_addr = (host, port)
{'host': host, 'port': port})
sys.exit(1)
- if CONF.use_ssl:
- if not os.path.exists(CONF.ssl_cert_file):
- raise RuntimeError(_("Unable to find ssl_cert_file "
- ": %s") % CONF.ssl_cert_file)
-
- # ssl_key_file is optional because the key may be embedded in the
- # certificate file
- if CONF.ssl_key_file and not os.path.exists(CONF.ssl_key_file):
- raise RuntimeError(_("Unable to find "
- "ssl_key_file : %s") % CONF.ssl_key_file)
-
- # ssl_ca_file is optional
- if CONF.ssl_ca_file and not os.path.exists(CONF.ssl_ca_file):
- raise RuntimeError(_("Unable to find ssl_ca_file "
- ": %s") % CONF.ssl_ca_file)
-
- def wrap_ssl(sock):
- ssl_kwargs = {
- 'server_side': True,
- 'certfile': CONF.ssl_cert_file,
- 'keyfile': CONF.ssl_key_file,
- 'cert_reqs': ssl.CERT_NONE,
- }
-
- if CONF.ssl_ca_file:
- ssl_kwargs['ca_certs'] = CONF.ssl_ca_file
- ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
-
- return ssl.wrap_socket(sock, **ssl_kwargs)
-
sock = None
retry_until = time.time() + CONF.retry_until_window
while not sock and time.time() < retry_until:
sock = eventlet.listen(bind_addr,
backlog=backlog,
family=family)
- if CONF.use_ssl:
- sock = wrap_ssl(sock)
-
except socket.error as err:
with excutils.save_and_reraise_exception() as ctxt:
if err.errno == errno.EADDRINUSE:
return sock
+ @staticmethod
+ def _check_ssl_settings():
+ if not os.path.exists(CONF.ssl_cert_file):
+ raise RuntimeError(_("Unable to find ssl_cert_file "
+ ": %s") % CONF.ssl_cert_file)
+
+ # ssl_key_file is optional because the key may be embedded in the
+ # certificate file
+ if CONF.ssl_key_file and not os.path.exists(CONF.ssl_key_file):
+ raise RuntimeError(_("Unable to find "
+ "ssl_key_file : %s") % CONF.ssl_key_file)
+
+ # ssl_ca_file is optional
+ if CONF.ssl_ca_file and not os.path.exists(CONF.ssl_ca_file):
+ raise RuntimeError(_("Unable to find ssl_ca_file "
+ ": %s") % CONF.ssl_ca_file)
+
+ @staticmethod
+ def wrap_ssl(sock):
+ ssl_kwargs = {'server_side': True,
+ 'certfile': CONF.ssl_cert_file,
+ 'keyfile': CONF.ssl_key_file,
+ 'cert_reqs': ssl.CERT_NONE,
+ }
+
+ if CONF.ssl_ca_file:
+ ssl_kwargs['ca_certs'] = CONF.ssl_ca_file
+ ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
+
+ return ssl.wrap_socket(sock, **ssl_kwargs)
+
def start(self, application, port, host='0.0.0.0', workers=0):
"""Run a WSGI server with the given application."""
self._host = host