]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Rename workers to api_workers and simplify code
authorCarl Baldwin <carl.baldwin@hp.com>
Tue, 11 Feb 2014 00:58:42 +0000 (00:58 +0000)
committerCarl Baldwin <carl.baldwin@hp.com>
Mon, 15 Sep 2014 22:34:25 +0000 (22:34 +0000)
Refactor a few ugly aspects of the multiple API worker patch to make
way for multiple rpc workers.  This came up as I was trying to add
multiple RPC workers using similar patterns and remembering that some
things were left in a rather awkward state.

Change-Id: I549db67af4af6a2df80e12cf233109dda5213c47

neutron/agent/metadata/agent.py
neutron/service.py
neutron/tests/unit/test_metadata_agent.py
neutron/tests/unit/test_wsgi.py
neutron/wsgi.py

index 569a99171a27253cae9b07c73203a3d30ba65ebd..a678edab71a141a7305c8b6053345e5649385d91 100644 (file)
@@ -40,7 +40,6 @@ from neutron.openstack.common.cache import cache
 from neutron.openstack.common import excutils
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common import service
 from neutron import wsgi
 
 LOG = logging.getLogger(__name__)
@@ -280,16 +279,8 @@ class UnixDomainWSGIServer(wsgi.Server):
         self._socket = eventlet.listen(file_socket,
                                        family=socket.AF_UNIX,
                                        backlog=backlog)
-        if workers < 1:
-            # For the case where only one process is required.
-            self._server = self.pool.spawn_n(self._run, application,
-                                             self._socket)
-        else:
-            # Minimize the cost of checking for child exit by extending the
-            # wait interval past the default of 0.01s.
-            self._launcher = service.ProcessLauncher(wait_interval=1.0)
-            self._server = WorkerService(self, application)
-            self._launcher.launch_service(self._server, workers=workers)
+
+        self._launch(application, workers=workers)
 
     def _run(self, application, socket):
         """Start a WSGI service in a new green thread."""
index 820364f7c106ab79a75474e4c11220b257f0bda6..eb48687cb3772a27be4706ea1b1a15e0d598d913 100644 (file)
@@ -40,7 +40,7 @@ service_opts = [
                help=_('Seconds between running periodic tasks')),
     cfg.IntOpt('api_workers',
                default=0,
-               help=_('Number of separate worker processes for service')),
+               help=_('Number of separate API worker processes for service')),
     cfg.IntOpt('rpc_workers',
                default=0,
                help=_('Number of RPC worker processes for service')),
index dbb3e12306262d42860e758231b5a46cacccd4b0..dff72cf02893af2369a9e8ff36fd375ffe3b3423 100644 (file)
@@ -495,8 +495,8 @@ class TestUnixDomainWSGIServer(base.BaseTestCase):
 
     def test_start(self):
         mock_app = mock.Mock()
-        with mock.patch.object(self.server, 'pool') as pool:
-            self.server.start(mock_app, '/the/path', workers=0, backlog=128)
+        with mock.patch.object(self.server, '_launch') as launcher:
+            self.server.start(mock_app, '/the/path', workers=5, backlog=128)
             self.eventlet.assert_has_calls([
                 mock.call.listen(
                     '/the/path',
@@ -504,27 +504,7 @@ class TestUnixDomainWSGIServer(base.BaseTestCase):
                     backlog=128
                 )]
             )
-            pool.spawn_n.assert_called_once_with(
-                self.server._run,
-                mock_app,
-                self.eventlet.listen.return_value
-            )
-
-    @mock.patch('neutron.openstack.common.service.ProcessLauncher')
-    def test_start_multiple_workers(self, process_launcher):
-        launcher = process_launcher.return_value
-
-        mock_app = mock.Mock()
-        self.server.start(mock_app, '/the/path', workers=2, backlog=128)
-        launcher.running = True
-        launcher.launch_service.assert_called_once_with(self.server._server,
-                                                        workers=2)
-
-        self.server.stop()
-        self.assertFalse(launcher.running)
-
-        self.server.wait()
-        launcher.wait.assert_called_once_with()
+            launcher.assert_called_once_with(mock_app, workers=5)
 
     def test_run(self):
         with mock.patch.object(agent, 'logging') as logging:
index d1229139886612bca00ab27259fcfa56d08a541e..789b27ce62a3aa199f9f085b26e81362ce695263 100644 (file)
@@ -56,12 +56,10 @@ class TestWSGIServer(base.BaseTestCase):
 
         server = wsgi.Server("test_multiple_processes")
         server.start(None, 0, host="127.0.0.1", workers=2)
-        launcher.running = True
-        launcher.launch_service.assert_called_once_with(server._server,
-                                                        workers=2)
+        launcher.launch_service.assert_called_once_with(mock.ANY, workers=2)
 
         server.stop()
-        self.assertFalse(launcher.running)
+        launcher.stop.assert_called_once_with()
 
         server.wait()
         launcher.wait.assert_called_once_with()
index a98e22263758da956939c2d8886248b5df5cf943..137dcb96e5e347c3b6a96e7f6f94443ea81faf0b 100644 (file)
@@ -97,7 +97,8 @@ class WorkerService(object):
                                                 self._service._socket)
 
     def wait(self):
-        self._service.pool.waitall()
+        if isinstance(self._server, eventlet.greenthread.GreenThread):
+            self._server.wait()
 
     def stop(self):
         if isinstance(self._server, eventlet.greenthread.GreenThread):
@@ -113,7 +114,6 @@ class Server(object):
         eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
         self.pool = eventlet.GreenPool(threads)
         self.name = name
-        self._launcher = None
         self._server = None
 
     def _get_socket(self, host, port, backlog):
@@ -205,17 +205,22 @@ class Server(object):
         self._socket = self._get_socket(self._host,
                                         self._port,
                                         backlog=backlog)
+
+        self._launch(application, workers)
+
+    def _launch(self, application, workers=0):
+        service = WorkerService(self, application)
         if workers < 1:
-            # For the case where only one process is required.
-            self._server = self.pool.spawn(self._run, application,
-                                           self._socket)
+            # The API service should run in the current process.
+            self._server = service
+            service.start()
             systemd.notify_once()
         else:
+            # The API service runs in a number of child processes.
             # Minimize the cost of checking for child exit by extending the
             # wait interval past the default of 0.01s.
-            self._launcher = common_service.ProcessLauncher(wait_interval=1.0)
-            self._server = WorkerService(self, application)
-            self._launcher.launch_service(self._server, workers=workers)
+            self._server = common_service.ProcessLauncher(wait_interval=1.0)
+            self._server.launch_service(service, workers=workers)
 
     @property
     def host(self):
@@ -226,19 +231,12 @@ class Server(object):
         return self._socket.getsockname()[1] if self._socket else self._port
 
     def stop(self):
-        if self._launcher:
-            # The process launcher does not support stop or kill.
-            self._launcher.running = False
-        else:
-            self._server.kill()
+        self._server.stop()
 
     def wait(self):
         """Wait until all servers have completed running."""
         try:
-            if self._launcher:
-                self._launcher.wait()
-            else:
-                self.pool.waitall()
+            self._server.wait()
         except KeyboardInterrupt:
             pass