self.service_pid = None
self.workers = None
self.temp_file = self.get_temp_file_path("test_server.tmp")
- self.health_checker = None
+ self.health_checker = self._check_active
self.pipein, self.pipeout = os.pipe()
self.addCleanup(self._destroy_workers)
self.service_pid = pid
- if self.workers > 0:
+ # If number of workers is 1 it is assumed that we run
+ # a service in the current process.
+ if self.workers > 1:
# Wait at most 10 seconds to spawn workers
condition = lambda: self.workers == len(self._get_workers())
except psutil.NoSuchProcess:
return None
- if self.workers > 0:
+ if self.workers > 1:
return [proc.pid for proc in psutil.process_iter()
if safe_ppid(proc) == self.service_pid]
else:
return [proc.pid for proc in psutil.process_iter()
if proc.pid == self.service_pid]
+ def _check_active(self):
+ """Dummy service activity check."""
+ time.sleep(5)
+ return True
+
def _fake_start(self):
with open(self.temp_file, 'a') as f:
f.write(FAKE_START_MSG)
- def _test_restart_service_on_sighup(self, service, workers=0):
+ def _test_restart_service_on_sighup(self, service, workers=1):
"""Test that a service correctly (re)starts on receiving SIGHUP.
1. Start a service with a given number of workers.
except socket.error:
return False
- def _run_wsgi(self, workers=0):
+ def _run_wsgi(self, workers=1):
"""Start WSGI server with a test application."""
# Mock start method to check that children are started again on
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
self.plugin = self._plugin_patcher.start()
self.plugin.return_value.rpc_workers_supported = True
- self.health_checker = self._check_active
-
- def _check_active(self):
- time.sleep(5)
- return True
- def _serve_rpc(self, workers=0):
+ def _serve_rpc(self, workers=1):
"""Start RPC server with a given number of workers."""
# Mock start method to check that children are started again on
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
self.plugin = self._plugin_patcher.start()
- def _start_plugin(self, workers=0):
+ def _start_plugin(self, workers=1):
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
gp.return_value = self.plugin
launchers = service.start_plugin_workers()