From 6d0d72973152bb45587437c80d4ffe0fe7bba761 Mon Sep 17 00:00:00 2001 From: Elena Ezhova Date: Tue, 7 Apr 2015 14:58:13 +0300 Subject: [PATCH] Handle SIGHUP: neutron-server (multiprocess) and metadata agent All launchers implemented in common.service require each service to implement reset method because it is called in case a process receives a SIGHUP. This change adds the reset method to neutron.service.RpcWorker and neutron.wsgi.WorkerService which are used to wrap rpc and api workers correspondingly. Now neutron-server running in multiprocess mode (api_workers > 0 and rpc_workers > 0) and metadata agent don't die on receiving SIGHUP and support reloading policy_path and logging options in config. Note that reset is called only in case a service is running in daemon mode. Other changes made in the scope of this patch that need to be mentioned: * Don't empty self._servers list in RpcWorker's stop method When a service is restarted all services are gracefully shutdowned, resetted and started again (see openstack.common.service code). As graceful shutdown implies calling service.stop() and then service.wait() we don't want to clean self._servers list because it would be impossible to wait for them to stop processing requests and cleaning up their resources. Otherwise, this would lead to problems with rpc after starting the rpc server again. * Create a duplicate socket each time WorkerService starts When api worker is stopped it kills the eventlet wsgi server which internally closes the wsgi server socket object. This server socket object becomes not usable which leads to "Bad file descriptor" errors on service restart. Added functional and unit tests. DocImpact Partial-Bug: #1276694 Change-Id: I75b00946b7cae891c6eb192e853118e7d49e4a24 --- neutron/common/config.py | 9 + neutron/service.py | 9 +- neutron/tests/functional/requirements.txt | 1 + neutron/tests/functional/test_server.py | 247 ++++++++++++++++++++++ neutron/tests/unit/test_service.py | 33 +++ neutron/tests/unit/test_wsgi.py | 14 +- neutron/wsgi.py | 16 +- 7 files changed, 321 insertions(+), 8 deletions(-) create mode 100644 neutron/tests/functional/test_server.py create mode 100644 neutron/tests/unit/test_service.py diff --git a/neutron/common/config.py b/neutron/common/config.py index 93f57159f..c8e4eebf5 100644 --- a/neutron/common/config.py +++ b/neutron/common/config.py @@ -31,6 +31,7 @@ from paste import deploy from neutron.api.v2 import attributes from neutron.common import utils from neutron.i18n import _LI +from neutron import policy from neutron import version @@ -210,6 +211,14 @@ def setup_logging(): LOG.debug("command line: %s", " ".join(sys.argv)) +def reset_service(): + # Reset worker in case SIGHUP is called. + # Note that this is called only in case a service is running in + # daemon mode. + setup_logging() + policy.refresh() + + def load_paste_app(app_name): """Builds and returns a WSGI app from a paste config file. diff --git a/neutron/service.py b/neutron/service.py index 708882b73..e27dd5cdc 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -32,7 +32,6 @@ from neutron.i18n import _LE, _LI from neutron import manager from neutron.openstack.common import loopingcall from neutron.openstack.common import service as common_service -from neutron import policy from neutron import wsgi @@ -128,7 +127,10 @@ class RpcWorker(object): for server in self._servers: if isinstance(server, rpc_server.MessageHandlingServer): server.stop() - self._servers = [] + + @staticmethod + def reset(): + config.reset_service() def serve_rpc(): @@ -288,8 +290,7 @@ class Service(n_rpc.Service): LOG.exception(_LE("Exception occurs when waiting for timer")) def reset(self): - config.setup_logging() - policy.refresh() + config.reset_service() def periodic_tasks(self, raise_on_error=False): """Tasks to be run at a periodic interval.""" diff --git a/neutron/tests/functional/requirements.txt b/neutron/tests/functional/requirements.txt index 0c5f2215b..f98f475bc 100644 --- a/neutron/tests/functional/requirements.txt +++ b/neutron/tests/functional/requirements.txt @@ -4,5 +4,6 @@ # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. +psutil>=1.1.1,<2.0.0 psycopg2 MySQL-python diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py new file mode 100644 index 000000000..8f81f6849 --- /dev/null +++ b/neutron/tests/functional/test_server.py @@ -0,0 +1,247 @@ +# Copyright 2015 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import httplib2 +import mock +import os +import signal +import socket +import time +import traceback + +from oslo_config import cfg +import psutil + +from neutron.agent.linux import utils +from neutron import service +from neutron.tests import base +from neutron import wsgi + + +CONF = cfg.CONF + +# This message will be written to temporary file each time +# reset method is called. +FAKE_RESET_MSG = "reset".encode("utf-8") + +TARGET_PLUGIN = 'neutron.plugins.ml2.plugin.Ml2Plugin' + + +class TestNeutronServer(base.BaseTestCase): + def setUp(self): + super(TestNeutronServer, self).setUp() + self.service_pid = None + self.workers = None + self.temp_file = self.get_temp_file_path("test_server.tmp") + self.health_checker = None + self.pipein, self.pipeout = os.pipe() + self.addCleanup(self._destroy_workers) + + def _destroy_workers(self): + if self.service_pid: + # Make sure all processes are stopped + os.kill(self.service_pid, signal.SIGKILL) + + def _start_server(self, callback, workers): + """Run a given service. + + :param callback: callback that will start the required service + :param workers: number of service workers + :returns: list of spawned workers' pids + """ + + self.workers = workers + + # Fork a new process in which server will be started + pid = os.fork() + if pid == 0: + status = 0 + try: + callback(workers) + except SystemExit as exc: + status = exc.code + except BaseException: + traceback.print_exc() + status = 2 + + # Really exit + os._exit(status) + + self.service_pid = pid + + if self.workers > 0: + # Wait at most 10 seconds to spawn workers + condition = lambda: self.workers == len(self._get_workers()) + + utils.wait_until_true( + condition, timeout=10, sleep=0.1, + exception=RuntimeError( + "Failed to start %d workers." % self.workers)) + + workers = self._get_workers() + self.assertEqual(len(workers), self.workers) + return workers + + # Wait for a service to start. + utils.wait_until_true(self.health_checker, timeout=10, sleep=0.1, + exception=RuntimeError( + "Failed to start service.")) + + return [self.service_pid] + + def _get_workers(self): + """Get the list of processes in which WSGI server is running.""" + + if self.workers > 0: + return [proc.pid for proc in psutil.process_iter() + if proc.ppid == self.service_pid] + else: + return [proc.pid for proc in psutil.process_iter() + if proc.pid == self.service_pid] + + def _fake_reset(self): + """Writes FAKE_RESET_MSG to temporary file on each call.""" + + with open(self.temp_file, 'a') as f: + f.write(FAKE_RESET_MSG) + + def _test_restart_service_on_sighup(self, service, workers=0): + """Test that a service correctly restarts on receiving SIGHUP. + + 1. Start a service with a given number of workers. + 2. Send SIGHUP to the service. + 3. Wait for workers (if any) to restart. + 4. Assert that the pids of the workers didn't change after restart. + """ + + start_workers = self._start_server(callback=service, workers=workers) + + os.kill(self.service_pid, signal.SIGHUP) + + # Wait for temp file to be created and its size become equal + # to size of FAKE_RESET_MSG repeated (workers + 1) times. + expected_size = len(FAKE_RESET_MSG) * (workers + 1) + condition = lambda: (os.path.isfile(self.temp_file) + and os.stat(self.temp_file).st_size == + expected_size) + + utils.wait_until_true( + condition, timeout=5, sleep=0.1, + exception=RuntimeError( + "Timed out waiting for file %(filename)s to be created and " + "its size become equal to %(size)s." % + {'filename': self.temp_file, + 'size': expected_size})) + + # Verify that reset has been called for parent process in which + # a service was started and for each worker by checking that + # FAKE_RESET_MSG has been written to temp file workers + 1 times. + with open(self.temp_file, 'r') as f: + res = f.readline() + self.assertEqual(FAKE_RESET_MSG * (workers + 1), res) + + # Make sure worker pids don't change + end_workers = self._get_workers() + self.assertEqual(start_workers, end_workers) + + +class TestWsgiServer(TestNeutronServer): + """Tests for neutron.wsgi.Server.""" + + def setUp(self): + super(TestWsgiServer, self).setUp() + self.health_checker = self._check_active + self.port = None + + @staticmethod + def application(environ, start_response): + """A primitive test application.""" + + response_body = 'Response' + status = '200 OK' + response_headers = [('Content-Type', 'text/plain'), + ('Content-Length', str(len(response_body)))] + start_response(status, response_headers) + return [response_body] + + def _check_active(self): + """Check a wsgi service is active by making a GET request.""" + port = int(os.read(self.pipein, 5)) + conn = httplib2.HTTPConnectionWithTimeout("localhost", port) + try: + conn.request("GET", "/") + resp = conn.getresponse() + return resp.status == 200 + except socket.error: + return False + + def _run_wsgi(self, workers=0): + """Start WSGI server with a test application.""" + + # Mock reset method to check that it is being called + # on receiving SIGHUP. + with mock.patch("neutron.wsgi.WorkerService.reset") as reset_method: + reset_method.side_effect = self._fake_reset + + server = wsgi.Server("Test") + server.start(self.application, 0, "0.0.0.0", + workers=workers) + + # Memorize a port that was chosen for the service + self.port = server.port + os.write(self.pipeout, str(self.port)) + + server.wait() + + def test_restart_wsgi_on_sighup_multiple_workers(self): + self._test_restart_service_on_sighup(service=self._run_wsgi, + workers=2) + + +class TestRPCServer(TestNeutronServer): + """Tests for neutron RPC server.""" + + def setUp(self): + super(TestRPCServer, self).setUp() + self.setup_coreplugin(TARGET_PLUGIN) + self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True) + self.plugin = self._plugin_patcher.start() + self.plugin.return_value.rpc_workers_supported = True + self.health_checker = self._check_active + + def _check_active(self): + time.sleep(5) + return True + + def _serve_rpc(self, workers=0): + """Start RPC server with a given number of workers.""" + + # Mock reset method to check that it is being called + # on receiving SIGHUP. + with mock.patch("neutron.service.RpcWorker.reset") as reset_method: + with mock.patch( + "neutron.manager.NeutronManager.get_plugin" + ) as get_plugin: + reset_method.side_effect = self._fake_reset + get_plugin.return_value = self.plugin + + CONF.set_override("rpc_workers", workers) + + launcher = service.serve_rpc() + launcher.wait() + + def test_restart_rpc_on_sighup_multiple_workers(self): + self._test_restart_service_on_sighup(service=self._serve_rpc, + workers=2) diff --git a/neutron/tests/unit/test_service.py b/neutron/tests/unit/test_service.py new file mode 100644 index 000000000..582449f5a --- /dev/null +++ b/neutron/tests/unit/test_service.py @@ -0,0 +1,33 @@ +# Copyright 2015 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron import service +from neutron.tests import base + + +class TestRpcWorker(base.BaseTestCase): + + @mock.patch("neutron.policy.refresh") + @mock.patch("neutron.common.config.setup_logging") + def test_reset(self, setup_logging_mock, refresh_mock): + _plugin = mock.Mock() + + rpc_worker = service.RpcWorker(_plugin) + rpc_worker.reset() + + setup_logging_mock.assert_called_once_with() + refresh_mock.assert_called_once_with() diff --git a/neutron/tests/unit/test_wsgi.py b/neutron/tests/unit/test_wsgi.py index 584a66610..3331f450f 100644 --- a/neutron/tests/unit/test_wsgi.py +++ b/neutron/tests/unit/test_wsgi.py @@ -65,6 +65,18 @@ class TestWorkerService(base.BaseTestCase): workerservice.start() self.assertFalse(apimock.called) + @mock.patch("neutron.policy.refresh") + @mock.patch("neutron.common.config.setup_logging") + def test_reset(self, setup_logging_mock, refresh_mock): + _service = mock.Mock() + _app = mock.Mock() + + worker_service = wsgi.WorkerService(_service, _app) + worker_service.reset() + + setup_logging_mock.assert_called_once_with() + refresh_mock.assert_called_once_with() + class TestWSGIServer(base.BaseTestCase): """WSGI server tests.""" @@ -132,7 +144,7 @@ class TestWSGIServer(base.BaseTestCase): mock.call( server._run, None, - mock_listen.return_value) + mock_listen.return_value.dup.return_value) ]) def test_app(self): diff --git a/neutron/wsgi.py b/neutron/wsgi.py index 437e57b09..0aecc8069 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -37,6 +37,7 @@ import six import webob.dec import webob.exc +from neutron.common import config from neutron.common import exceptions as exception from neutron import context from neutron.db import api @@ -99,12 +100,17 @@ class WorkerService(object): self._server = None def start(self): + # When api worker is stopped it kills the eventlet wsgi server which + # internally closes the wsgi server socket object. This server socket + # object becomes not usable which leads to "Bad file descriptor" + # errors on service restart. + # Duplicate a socket object to keep a file descriptor usable. + dup_sock = self._service._socket.dup() if CONF.use_ssl: - self._service._socket = self._service.wrap_ssl( - self._service._socket) + dup_sock = self._service.wrap_ssl(dup_sock) self._server = self._service.pool.spawn(self._service._run, self._application, - self._service._socket) + dup_sock) def wait(self): if isinstance(self._server, eventlet.greenthread.GreenThread): @@ -115,6 +121,10 @@ class WorkerService(object): self._server.kill() self._server = None + @staticmethod + def reset(): + config.reset_service() + class Server(object): """Server class to manage multiple WSGI sockets and applications.""" -- 2.45.2