]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Handle SIGHUP: neutron-server (multiprocess) and metadata agent
authorElena Ezhova <eezhova@mirantis.com>
Tue, 7 Apr 2015 11:58:13 +0000 (14:58 +0300)
committerElena Ezhova <eezhova@mirantis.com>
Tue, 9 Jun 2015 13:15:37 +0000 (16:15 +0300)
All launchers implemented in common.service require each service to
implement reset method because it is called in case a process
receives a SIGHUP.

This change adds the reset method to neutron.service.RpcWorker and
neutron.wsgi.WorkerService which are used to wrap rpc and api
workers correspondingly.

Now neutron-server running in multiprocess mode (api_workers > 0 and
rpc_workers > 0) and metadata agent don't die on receiving SIGHUP and support
reloading policy_path and logging options in config.

Note that reset is called only in case a service is running in daemon mode.

Other changes made in the scope of this patch that need to be mentioned:

* Don't empty self._servers list in RpcWorker's stop method

  When a service is restarted all services are gracefully shutdowned,
  resetted and started again (see openstack.common.service code).
  As graceful shutdown implies calling service.stop() and then
  service.wait() we don't want to clean self._servers list because
  it would be impossible to wait for them to stop processing
  requests and cleaning up their resources.
  Otherwise, this would lead to problems with rpc after starting
  the rpc server again.

* Create a duplicate socket each time WorkerService starts

  When api worker is stopped it kills the eventlet wsgi server
  which internally closes the wsgi server socket object. This server
  socket object becomes not usable which leads to "Bad file
  descriptor" errors on service restart.

Added functional and unit tests.

DocImpact
Partial-Bug: #1276694
Change-Id: I75b00946b7cae891c6eb192e853118e7d49e4a24

neutron/common/config.py
neutron/service.py
neutron/tests/functional/requirements.txt
neutron/tests/functional/test_server.py [new file with mode: 0644]
neutron/tests/unit/test_service.py [new file with mode: 0644]
neutron/tests/unit/test_wsgi.py
neutron/wsgi.py

index 93f57159f3e29a1d0b1dbf9947818c4659a06697..c8e4eebf52cfdba45c470b22e0983e3e62603052 100644 (file)
@@ -31,6 +31,7 @@ from paste import deploy
 from neutron.api.v2 import attributes
 from neutron.common import utils
 from neutron.i18n import _LI
+from neutron import policy
 from neutron import version
 
 
@@ -210,6 +211,14 @@ def setup_logging():
     LOG.debug("command line: %s", " ".join(sys.argv))
 
 
+def reset_service():
+    # Reset worker in case SIGHUP is called.
+    # Note that this is called only in case a service is running in
+    # daemon mode.
+    setup_logging()
+    policy.refresh()
+
+
 def load_paste_app(app_name):
     """Builds and returns a WSGI app from a paste config file.
 
index 708882b73124d8feedd97fa14198afd94ebcd3d8..e27dd5cdc2ff493d8d81e4d1501e1de58770591d 100644 (file)
@@ -32,7 +32,6 @@ from neutron.i18n import _LE, _LI
 from neutron import manager
 from neutron.openstack.common import loopingcall
 from neutron.openstack.common import service as common_service
-from neutron import policy
 from neutron import wsgi
 
 
@@ -128,7 +127,10 @@ class RpcWorker(object):
         for server in self._servers:
             if isinstance(server, rpc_server.MessageHandlingServer):
                 server.stop()
-            self._servers = []
+
+    @staticmethod
+    def reset():
+        config.reset_service()
 
 
 def serve_rpc():
@@ -288,8 +290,7 @@ class Service(n_rpc.Service):
                 LOG.exception(_LE("Exception occurs when waiting for timer"))
 
     def reset(self):
-        config.setup_logging()
-        policy.refresh()
+        config.reset_service()
 
     def periodic_tasks(self, raise_on_error=False):
         """Tasks to be run at a periodic interval."""
index 0c5f2215b448eaecc4ba07e12345e23c58c78448..f98f475bc617eb8b76bde65603b4a90301942d05 100644 (file)
@@ -4,5 +4,6 @@
 # of appearance. Changing the order has an impact on the overall integration
 # process, which may cause wedges in the gate later.
 
+psutil>=1.1.1,<2.0.0
 psycopg2
 MySQL-python
diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py
new file mode 100644 (file)
index 0000000..8f81f68
--- /dev/null
@@ -0,0 +1,247 @@
+# Copyright 2015 Mirantis Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import httplib2
+import mock
+import os
+import signal
+import socket
+import time
+import traceback
+
+from oslo_config import cfg
+import psutil
+
+from neutron.agent.linux import utils
+from neutron import service
+from neutron.tests import base
+from neutron import wsgi
+
+
+CONF = cfg.CONF
+
+# This message will be written to temporary file each time
+# reset method is called.
+FAKE_RESET_MSG = "reset".encode("utf-8")
+
+TARGET_PLUGIN = 'neutron.plugins.ml2.plugin.Ml2Plugin'
+
+
+class TestNeutronServer(base.BaseTestCase):
+    def setUp(self):
+        super(TestNeutronServer, self).setUp()
+        self.service_pid = None
+        self.workers = None
+        self.temp_file = self.get_temp_file_path("test_server.tmp")
+        self.health_checker = None
+        self.pipein, self.pipeout = os.pipe()
+        self.addCleanup(self._destroy_workers)
+
+    def _destroy_workers(self):
+        if self.service_pid:
+            # Make sure all processes are stopped
+            os.kill(self.service_pid, signal.SIGKILL)
+
+    def _start_server(self, callback, workers):
+        """Run a given service.
+
+        :param callback: callback that will start the required service
+        :param workers: number of service workers
+        :returns: list of spawned workers' pids
+        """
+
+        self.workers = workers
+
+        # Fork a new process in which server will be started
+        pid = os.fork()
+        if pid == 0:
+            status = 0
+            try:
+                callback(workers)
+            except SystemExit as exc:
+                status = exc.code
+            except BaseException:
+                traceback.print_exc()
+                status = 2
+
+            # Really exit
+            os._exit(status)
+
+        self.service_pid = pid
+
+        if self.workers > 0:
+            # Wait at most 10 seconds to spawn workers
+            condition = lambda: self.workers == len(self._get_workers())
+
+            utils.wait_until_true(
+                condition, timeout=10, sleep=0.1,
+                exception=RuntimeError(
+                    "Failed to start %d workers." % self.workers))
+
+            workers = self._get_workers()
+            self.assertEqual(len(workers), self.workers)
+            return workers
+
+        # Wait for a service to start.
+        utils.wait_until_true(self.health_checker, timeout=10, sleep=0.1,
+                              exception=RuntimeError(
+                                  "Failed to start service."))
+
+        return [self.service_pid]
+
+    def _get_workers(self):
+        """Get the list of processes in which WSGI server is running."""
+
+        if self.workers > 0:
+            return [proc.pid for proc in psutil.process_iter()
+                    if proc.ppid == self.service_pid]
+        else:
+            return [proc.pid for proc in psutil.process_iter()
+                    if proc.pid == self.service_pid]
+
+    def _fake_reset(self):
+        """Writes FAKE_RESET_MSG to temporary file on each call."""
+
+        with open(self.temp_file, 'a') as f:
+            f.write(FAKE_RESET_MSG)
+
+    def _test_restart_service_on_sighup(self, service, workers=0):
+        """Test that a service correctly restarts on receiving SIGHUP.
+
+        1. Start a service with a given number of workers.
+        2. Send SIGHUP to the service.
+        3. Wait for workers (if any) to restart.
+        4. Assert that the pids of the workers didn't change after restart.
+        """
+
+        start_workers = self._start_server(callback=service, workers=workers)
+
+        os.kill(self.service_pid, signal.SIGHUP)
+
+        # Wait for temp file to be created and its size become equal
+        # to size of FAKE_RESET_MSG repeated (workers + 1) times.
+        expected_size = len(FAKE_RESET_MSG) * (workers + 1)
+        condition = lambda: (os.path.isfile(self.temp_file)
+                             and os.stat(self.temp_file).st_size ==
+                             expected_size)
+
+        utils.wait_until_true(
+            condition, timeout=5, sleep=0.1,
+            exception=RuntimeError(
+                "Timed out waiting for file %(filename)s to be created and "
+                "its size become equal to %(size)s." %
+                {'filename': self.temp_file,
+                 'size': expected_size}))
+
+        # Verify that reset has been called for parent process in which
+        # a service was started and for each worker by checking that
+        # FAKE_RESET_MSG has been written to temp file workers + 1 times.
+        with open(self.temp_file, 'r') as f:
+            res = f.readline()
+            self.assertEqual(FAKE_RESET_MSG * (workers + 1), res)
+
+        # Make sure worker pids don't change
+        end_workers = self._get_workers()
+        self.assertEqual(start_workers, end_workers)
+
+
+class TestWsgiServer(TestNeutronServer):
+    """Tests for neutron.wsgi.Server."""
+
+    def setUp(self):
+        super(TestWsgiServer, self).setUp()
+        self.health_checker = self._check_active
+        self.port = None
+
+    @staticmethod
+    def application(environ, start_response):
+        """A primitive test application."""
+
+        response_body = 'Response'
+        status = '200 OK'
+        response_headers = [('Content-Type', 'text/plain'),
+                            ('Content-Length', str(len(response_body)))]
+        start_response(status, response_headers)
+        return [response_body]
+
+    def _check_active(self):
+        """Check a wsgi service is active by making a GET request."""
+        port = int(os.read(self.pipein, 5))
+        conn = httplib2.HTTPConnectionWithTimeout("localhost", port)
+        try:
+            conn.request("GET", "/")
+            resp = conn.getresponse()
+            return resp.status == 200
+        except socket.error:
+            return False
+
+    def _run_wsgi(self, workers=0):
+        """Start WSGI server with a test application."""
+
+        # Mock reset method to check that it is being called
+        # on receiving SIGHUP.
+        with mock.patch("neutron.wsgi.WorkerService.reset") as reset_method:
+            reset_method.side_effect = self._fake_reset
+
+            server = wsgi.Server("Test")
+            server.start(self.application, 0, "0.0.0.0",
+                         workers=workers)
+
+            # Memorize a port that was chosen for the service
+            self.port = server.port
+            os.write(self.pipeout, str(self.port))
+
+            server.wait()
+
+    def test_restart_wsgi_on_sighup_multiple_workers(self):
+        self._test_restart_service_on_sighup(service=self._run_wsgi,
+                                             workers=2)
+
+
+class TestRPCServer(TestNeutronServer):
+    """Tests for neutron RPC server."""
+
+    def setUp(self):
+        super(TestRPCServer, self).setUp()
+        self.setup_coreplugin(TARGET_PLUGIN)
+        self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
+        self.plugin = self._plugin_patcher.start()
+        self.plugin.return_value.rpc_workers_supported = True
+        self.health_checker = self._check_active
+
+    def _check_active(self):
+        time.sleep(5)
+        return True
+
+    def _serve_rpc(self, workers=0):
+        """Start RPC server with a given number of workers."""
+
+        # Mock reset method to check that it is being called
+        # on receiving SIGHUP.
+        with mock.patch("neutron.service.RpcWorker.reset") as reset_method:
+            with mock.patch(
+                    "neutron.manager.NeutronManager.get_plugin"
+            ) as get_plugin:
+                reset_method.side_effect = self._fake_reset
+                get_plugin.return_value = self.plugin
+
+                CONF.set_override("rpc_workers", workers)
+
+                launcher = service.serve_rpc()
+                launcher.wait()
+
+    def test_restart_rpc_on_sighup_multiple_workers(self):
+        self._test_restart_service_on_sighup(service=self._serve_rpc,
+                                             workers=2)
diff --git a/neutron/tests/unit/test_service.py b/neutron/tests/unit/test_service.py
new file mode 100644 (file)
index 0000000..582449f
--- /dev/null
@@ -0,0 +1,33 @@
+# Copyright 2015 Mirantis Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+
+from neutron import service
+from neutron.tests import base
+
+
+class TestRpcWorker(base.BaseTestCase):
+
+    @mock.patch("neutron.policy.refresh")
+    @mock.patch("neutron.common.config.setup_logging")
+    def test_reset(self, setup_logging_mock, refresh_mock):
+        _plugin = mock.Mock()
+
+        rpc_worker = service.RpcWorker(_plugin)
+        rpc_worker.reset()
+
+        setup_logging_mock.assert_called_once_with()
+        refresh_mock.assert_called_once_with()
index 584a66610eea0307d0294003ceec0663f9bd07aa..3331f450f8014eb9be1d65a31e4f625b681523b0 100644 (file)
@@ -65,6 +65,18 @@ class TestWorkerService(base.BaseTestCase):
         workerservice.start()
         self.assertFalse(apimock.called)
 
+    @mock.patch("neutron.policy.refresh")
+    @mock.patch("neutron.common.config.setup_logging")
+    def test_reset(self, setup_logging_mock, refresh_mock):
+        _service = mock.Mock()
+        _app = mock.Mock()
+
+        worker_service = wsgi.WorkerService(_service, _app)
+        worker_service.reset()
+
+        setup_logging_mock.assert_called_once_with()
+        refresh_mock.assert_called_once_with()
+
 
 class TestWSGIServer(base.BaseTestCase):
     """WSGI server tests."""
@@ -132,7 +144,7 @@ class TestWSGIServer(base.BaseTestCase):
                         mock.call(
                             server._run,
                             None,
-                            mock_listen.return_value)
+                            mock_listen.return_value.dup.return_value)
                     ])
 
     def test_app(self):
index 437e57b0984390a87cbc1831c1551962d06a780a..0aecc8069df0df0ca50f83529c16a5f4b321f7f5 100644 (file)
@@ -37,6 +37,7 @@ import six
 import webob.dec
 import webob.exc
 
+from neutron.common import config
 from neutron.common import exceptions as exception
 from neutron import context
 from neutron.db import api
@@ -99,12 +100,17 @@ class WorkerService(object):
         self._server = None
 
     def start(self):
+        # When api worker is stopped it kills the eventlet wsgi server which
+        # internally closes the wsgi server socket object. This server socket
+        # object becomes not usable which leads to "Bad file descriptor"
+        # errors on service restart.
+        # Duplicate a socket object to keep a file descriptor usable.
+        dup_sock = self._service._socket.dup()
         if CONF.use_ssl:
-            self._service._socket = self._service.wrap_ssl(
-                self._service._socket)
+            dup_sock = self._service.wrap_ssl(dup_sock)
         self._server = self._service.pool.spawn(self._service._run,
                                                 self._application,
-                                                self._service._socket)
+                                                dup_sock)
 
     def wait(self):
         if isinstance(self._server, eventlet.greenthread.GreenThread):
@@ -115,6 +121,10 @@ class WorkerService(object):
             self._server.kill()
             self._server = None
 
+    @staticmethod
+    def reset():
+        config.reset_service()
+
 
 class Server(object):
     """Server class to manage multiple WSGI sockets and applications."""