raise ValueError(_('respawn_interval must be >= 0 if provided.'))
self.respawn_interval = respawn_interval
self._process = None
+ self._is_running = False
self._kill_event = None
self._reset_queues()
self._watchers = []
:raises eventlet.timeout.Timeout if blocking is True and the process
did not start in time.
"""
- if self._kill_event:
+ LOG.debug('Launching async process [%s].', self.cmd)
+ if self._is_running:
raise AsyncProcessException(_('Process is already started'))
else:
- LOG.debug('Launching async process [%s].', self.cmd)
self._spawn()
if block:
:raises eventlet.timeout.Timeout if blocking is True and the process
did not stop in time.
"""
- if self._kill_event:
+ if self._is_running:
LOG.debug('Halting async process [%s].', self.cmd)
self._kill(kill_signal)
else:
def _spawn(self):
"""Spawn a process and its watchers."""
+ self._is_running = True
self._kill_event = eventlet.event.Event()
self._process, cmd = utils.create_process(self._cmd,
run_as_root=self.run_as_root)
self._process.pid,
run_as_root=self.run_as_root)
- def _kill(self, kill_signal, respawning=False):
- """Kill the process and the associated watcher greenthreads.
-
- :param respawning: Optional, whether respawn will be subsequently
- attempted.
- """
- # Halt the greenthreads
- self._kill_event.send()
-
+ def _kill(self, kill_signal):
+ """Kill the process and the associated watcher greenthreads."""
pid = self.pid
if pid:
+ self._is_running = False
self._kill_process(pid, kill_signal)
- if not respawning:
- # Clear the kill event to ensure the process can be
- # explicitly started again.
+ # Halt the greenthreads if they weren't already.
+ if self._kill_event:
+ self._kill_event.send()
self._kill_event = None
def _kill_process(self, pid, kill_signal):
"""Kill the async process and respawn if necessary."""
LOG.debug('Halting async process [%s] in response to an error.',
self.cmd)
+ self._kill(signal.SIGKILL)
if self.respawn_interval is not None and self.respawn_interval >= 0:
- respawning = True
- else:
- respawning = False
- self._kill(signal.SIGKILL, respawning=respawning)
- if respawning:
eventlet.sleep(self.respawn_interval)
LOG.debug('Respawning async process [%s].', self.cmd)
- self._spawn()
+ try:
+ self.start()
+ except AsyncProcessException:
+ # Process was already respawned by someone else...
+ pass
def _watch_process(self, callback, kill_event):
while not kill_event.ready():
# Ensure that watching a process with lots of output does
# not block execution of other greenthreads.
eventlet.sleep()
- # The kill event not being ready indicates that the loop was
+ # self._is_running being True indicates that the loop was
# broken out of due to an error in the watched process rather
# than the loop condition being satisfied.
- if not kill_event.ready():
+ if self._is_running:
+ self._is_running = False
self._handle_process_error()
def _read(self, stream, queue):
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
from neutron.tests import base
+from neutron.tests.unit.agent.linux import failing_process
class TestAsyncProcess(base.BaseTestCase):
with mock.patch('eventlet.spawn') as mock_spawn:
proc._spawn()
+ self.assertTrue(self.proc._is_running)
self.assertIsInstance(proc._kill_event, eventlet.event.Event)
self.assertEqual(proc._process, expected_process)
mock_spawn.assert_has_calls([
with mock.patch.object(self.proc, '_kill') as kill:
self.proc._handle_process_error()
- kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=False)])
+ kill.assert_has_calls([mock.call(signal.SIGKILL)])
def test__handle_process_error_kills_without_respawn(self):
self.proc.respawn_interval = 1
with mock.patch('eventlet.sleep') as sleep:
self.proc._handle_process_error()
- kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=True)])
+ kill.assert_has_calls([mock.call(signal.SIGKILL)])
sleep.assert_has_calls([mock.call(self.proc.respawn_interval)])
spawn.assert_called_once_with()
+ def test__handle_process_error_no_crash_if_started(self):
+ self.proc._is_running = True
+ with mock.patch.object(self.proc, '_kill'):
+ with mock.patch.object(self.proc, '_spawn') as mock_spawn:
+ self.proc._handle_process_error()
+ mock_spawn.assert_not_called()
+
+ def _watch_process_exception(self):
+ raise Exception('Error!')
+
def _test__watch_process(self, callback, kill_event):
+ self.proc._is_running = True
self.proc._kill_event = kill_event
# Ensure the test times out eventually if the watcher loops endlessly
with eventlet.timeout.Timeout(5):
self._test__watch_process(lambda: None, eventlet.event.Event())
def test__watch_process_exits_on_exception(self):
- def foo():
- raise Exception('Error!')
- self._test__watch_process(foo, eventlet.event.Event())
+ self._test__watch_process(self._watch_process_exception,
+ eventlet.event.Event())
+ with mock.patch.object(self.proc,
+ '_handle_process_error') as func:
+ self.proc._watch_process(self._watch_process_exception,
+ self.proc._kill_event)
+ func.assert_not_called()
def test__watch_process_exits_on_sent_kill_event(self):
kill_event = eventlet.event.Event()
self._test_read_output_queues_and_returns_result('')
def test_start_raises_exception_if_process_already_started(self):
- self.proc._kill_event = True
+ self.proc._is_running = True
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.start()
def test_iter_stderr(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
- def _test__kill(self, respawning, pid=None):
+ def test__kill_targets_process_for_pid(self):
+ pid = 1
+
with mock.patch.object(self.proc, '_kill_event'
) as mock_kill_event,\
mock.patch.object(utils, 'get_root_helper_child_pid',
mock.patch.object(self.proc, '_kill_process'
) as mock_kill_process,\
mock.patch.object(self.proc, '_process'):
- self.proc._kill(signal.SIGKILL, respawning)
+ self.proc._kill(signal.SIGKILL)
- if respawning:
- self.assertIsNotNone(self.proc._kill_event)
- else:
- self.assertIsNone(self.proc._kill_event)
+ self.assertIsNone(self.proc._kill_event)
+ self.assertFalse(self.proc._is_running)
mock_kill_event.send.assert_called_once_with()
if pid:
mock_kill_process.assert_called_once_with(pid, signal.SIGKILL)
- def test__kill_when_respawning_does_not_clear_kill_event(self):
- self._test__kill(True)
-
- def test__kill_when_not_respawning_clears_kill_event(self):
- self._test__kill(False)
-
- def test__kill_targets_process_for_pid(self):
- self._test__kill(False, pid='1')
-
def _test__kill_process(self, pid, expected, exception_message=None,
kill_signal=signal.SIGKILL):
self.proc.run_as_root = True
self._test__kill_process('1', True, kill_signal=signal.SIGTERM)
def test_stop_calls_kill_with_provided_signal_number(self):
- self.proc._kill_event = True
+ self.proc._is_running = True
with mock.patch.object(self.proc, '_kill') as mock_kill:
self.proc.stop(kill_signal=signal.SIGTERM)
mock_kill.assert_called_once_with(signal.SIGTERM)
with mock.patch.object(proc, '_read', return_value='fakedata'),\
mock.patch.object(proc, '_process'):
self.assertIsNone(proc._read_stderr())
+
+
+class TestFailingAsyncProcess(base.BaseTestCase):
+ def setUp(self):
+ super(TestFailingAsyncProcess, self).setUp()
+ path = self.get_temp_file_path('async.tmp', self.get_new_temp_dir())
+ self.process = async_process.AsyncProcess(['python',
+ failing_process.__file__,
+ path],
+ respawn_interval=0)
+
+ def test_failing_async_process_handle_error_once(self):
+ with mock.patch.object(self.process, '_handle_process_error')\
+ as handle_error_mock:
+ self.process.start()
+ self.process._process.wait()
+ self.assertEqual(1, handle_error_mock.call_count)