]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Keep reading stdout/stderr until after kill
authorJohn Schwarz <jschwarz@redhat.com>
Wed, 14 Oct 2015 12:39:33 +0000 (15:39 +0300)
committerJohn Schwarz <jschwarz@redhat.com>
Thu, 19 Nov 2015 17:14:13 +0000 (19:14 +0200)
Currently, when calling AsyncProcess.stop(), the code stops the stdout
and stderr readers and kills the process. There exists an end case (as
described in the bug report) in which after the readers have been
stopped the sub-process will generate a substantial amount of outputs to
either fd. Since the 'subprocess' module is launched with
subprocess.PIPE as stdout/stderr, and since Linux's pipes can be filled
to the point where writing new data to them will block, this may cause a
deadlock if the sub-process has a signal handler for the signal (for
example, the process is handling SIGTERM to produce a graceful exit of
the program).

Therefore, this patch proposes to only kill the readers until AFTER
wait() returned and the process truly died. Also, relying on _kill_event
had to cease since invoking its send() method caused a logical loop back
to _kill, causing eventlet errors.

A different possible solution is closing the stdout/stderr pipes. Alas,
this may raise an exception in the sub-process ("what? No stdout?!
Crash!") and defeats the 'graceful' part of the process.

Closes-Bug: #1506021
Change-Id: I506c41c634a8d656d81a8ad7963412b834bdfa5b

neutron/agent/linux/async_process.py
neutron/tests/unit/agent/linux/failing_process.py [new file with mode: 0644]
neutron/tests/unit/agent/linux/test_async_process.py

index f94cfa6a2e8eef044d0adec708ab99b6873a9fb5..9a114b7c98c8255de405d07ffd39d424482bba06 100644 (file)
@@ -76,6 +76,7 @@ class AsyncProcess(object):
             raise ValueError(_('respawn_interval must be >= 0 if provided.'))
         self.respawn_interval = respawn_interval
         self._process = None
+        self._is_running = False
         self._kill_event = None
         self._reset_queues()
         self._watchers = []
@@ -104,10 +105,10 @@ class AsyncProcess(object):
         :raises eventlet.timeout.Timeout if blocking is True and the process
                 did not start in time.
         """
-        if self._kill_event:
+        LOG.debug('Launching async process [%s].', self.cmd)
+        if self._is_running:
             raise AsyncProcessException(_('Process is already started'))
         else:
-            LOG.debug('Launching async process [%s].', self.cmd)
             self._spawn()
 
         if block:
@@ -122,7 +123,7 @@ class AsyncProcess(object):
         :raises eventlet.timeout.Timeout if blocking is True and the process
                 did not stop in time.
         """
-        if self._kill_event:
+        if self._is_running:
             LOG.debug('Halting async process [%s].', self.cmd)
             self._kill(kill_signal)
         else:
@@ -133,6 +134,7 @@ class AsyncProcess(object):
 
     def _spawn(self):
         """Spawn a process and its watchers."""
+        self._is_running = True
         self._kill_event = eventlet.event.Event()
         self._process, cmd = utils.create_process(self._cmd,
                                                   run_as_root=self.run_as_root)
@@ -154,22 +156,16 @@ class AsyncProcess(object):
                 self._process.pid,
                 run_as_root=self.run_as_root)
 
-    def _kill(self, kill_signal, respawning=False):
-        """Kill the process and the associated watcher greenthreads.
-
-        :param respawning: Optional, whether respawn will be subsequently
-               attempted.
-        """
-        # Halt the greenthreads
-        self._kill_event.send()
-
+    def _kill(self, kill_signal):
+        """Kill the process and the associated watcher greenthreads."""
         pid = self.pid
         if pid:
+            self._is_running = False
             self._kill_process(pid, kill_signal)
 
-        if not respawning:
-            # Clear the kill event to ensure the process can be
-            # explicitly started again.
+        # Halt the greenthreads if they weren't already.
+        if self._kill_event:
+            self._kill_event.send()
             self._kill_event = None
 
     def _kill_process(self, pid, kill_signal):
@@ -194,15 +190,15 @@ class AsyncProcess(object):
         """Kill the async process and respawn if necessary."""
         LOG.debug('Halting async process [%s] in response to an error.',
                   self.cmd)
+        self._kill(signal.SIGKILL)
         if self.respawn_interval is not None and self.respawn_interval >= 0:
-            respawning = True
-        else:
-            respawning = False
-        self._kill(signal.SIGKILL, respawning=respawning)
-        if respawning:
             eventlet.sleep(self.respawn_interval)
             LOG.debug('Respawning async process [%s].', self.cmd)
-            self._spawn()
+            try:
+                self.start()
+            except AsyncProcessException:
+                # Process was already respawned by someone else...
+                pass
 
     def _watch_process(self, callback, kill_event):
         while not kill_event.ready():
@@ -217,10 +213,11 @@ class AsyncProcess(object):
             # Ensure that watching a process with lots of output does
             # not block execution of other greenthreads.
             eventlet.sleep()
-        # The kill event not being ready indicates that the loop was
+        # self._is_running being True indicates that the loop was
         # broken out of due to an error in the watched process rather
         # than the loop condition being satisfied.
-        if not kill_event.ready():
+        if self._is_running:
+            self._is_running = False
             self._handle_process_error()
 
     def _read(self, stream, queue):
diff --git a/neutron/tests/unit/agent/linux/failing_process.py b/neutron/tests/unit/agent/linux/failing_process.py
new file mode 100644 (file)
index 0000000..29547ca
--- /dev/null
@@ -0,0 +1,26 @@
+# Copyright 2015 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import os
+import sys
+
+
+def main():
+    filename = sys.argv[1]
+    if not os.path.exists(filename):
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()
index ad3dfac011de5825d99303067d1724f8f4aca288..6e72061723f0dfc156d45137ad974911f6fa37b9 100644 (file)
@@ -23,6 +23,7 @@ import testtools
 from neutron.agent.linux import async_process
 from neutron.agent.linux import utils
 from neutron.tests import base
+from neutron.tests.unit.agent.linux import failing_process
 
 
 class TestAsyncProcess(base.BaseTestCase):
@@ -43,6 +44,7 @@ class TestAsyncProcess(base.BaseTestCase):
             with mock.patch('eventlet.spawn') as mock_spawn:
                 proc._spawn()
 
+        self.assertTrue(self.proc._is_running)
         self.assertIsInstance(proc._kill_event, eventlet.event.Event)
         self.assertEqual(proc._process, expected_process)
         mock_spawn.assert_has_calls([
@@ -59,7 +61,7 @@ class TestAsyncProcess(base.BaseTestCase):
         with mock.patch.object(self.proc, '_kill') as kill:
             self.proc._handle_process_error()
 
-        kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=False)])
+        kill.assert_has_calls([mock.call(signal.SIGKILL)])
 
     def test__handle_process_error_kills_without_respawn(self):
         self.proc.respawn_interval = 1
@@ -68,11 +70,22 @@ class TestAsyncProcess(base.BaseTestCase):
                 with mock.patch('eventlet.sleep') as sleep:
                     self.proc._handle_process_error()
 
-        kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=True)])
+        kill.assert_has_calls([mock.call(signal.SIGKILL)])
         sleep.assert_has_calls([mock.call(self.proc.respawn_interval)])
         spawn.assert_called_once_with()
 
+    def test__handle_process_error_no_crash_if_started(self):
+        self.proc._is_running = True
+        with mock.patch.object(self.proc, '_kill'):
+            with mock.patch.object(self.proc, '_spawn') as mock_spawn:
+                self.proc._handle_process_error()
+                mock_spawn.assert_not_called()
+
+    def _watch_process_exception(self):
+        raise Exception('Error!')
+
     def _test__watch_process(self, callback, kill_event):
+        self.proc._is_running = True
         self.proc._kill_event = kill_event
         # Ensure the test times out eventually if the watcher loops endlessly
         with eventlet.timeout.Timeout(5):
@@ -87,9 +100,13 @@ class TestAsyncProcess(base.BaseTestCase):
         self._test__watch_process(lambda: None, eventlet.event.Event())
 
     def test__watch_process_exits_on_exception(self):
-        def foo():
-            raise Exception('Error!')
-        self._test__watch_process(foo, eventlet.event.Event())
+        self._test__watch_process(self._watch_process_exception,
+                                  eventlet.event.Event())
+        with mock.patch.object(self.proc,
+                               '_handle_process_error') as func:
+            self.proc._watch_process(self._watch_process_exception,
+                                     self.proc._kill_event)
+            func.assert_not_called()
 
     def test__watch_process_exits_on_sent_kill_event(self):
         kill_event = eventlet.event.Event()
@@ -117,7 +134,7 @@ class TestAsyncProcess(base.BaseTestCase):
         self._test_read_output_queues_and_returns_result('')
 
     def test_start_raises_exception_if_process_already_started(self):
-        self.proc._kill_event = True
+        self.proc._is_running = True
         with testtools.ExpectedException(async_process.AsyncProcessException):
             self.proc.start()
 
@@ -155,7 +172,9 @@ class TestAsyncProcess(base.BaseTestCase):
     def test_iter_stderr(self):
         self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
 
-    def _test__kill(self, respawning, pid=None):
+    def test__kill_targets_process_for_pid(self):
+        pid = 1
+
         with mock.patch.object(self.proc, '_kill_event'
                                ) as mock_kill_event,\
                 mock.patch.object(utils, 'get_root_helper_child_pid',
@@ -163,26 +182,15 @@ class TestAsyncProcess(base.BaseTestCase):
                 mock.patch.object(self.proc, '_kill_process'
                                   ) as mock_kill_process,\
                 mock.patch.object(self.proc, '_process'):
-            self.proc._kill(signal.SIGKILL, respawning)
+            self.proc._kill(signal.SIGKILL)
 
-            if respawning:
-                self.assertIsNotNone(self.proc._kill_event)
-            else:
-                self.assertIsNone(self.proc._kill_event)
+            self.assertIsNone(self.proc._kill_event)
+            self.assertFalse(self.proc._is_running)
 
         mock_kill_event.send.assert_called_once_with()
         if pid:
             mock_kill_process.assert_called_once_with(pid, signal.SIGKILL)
 
-    def test__kill_when_respawning_does_not_clear_kill_event(self):
-        self._test__kill(True)
-
-    def test__kill_when_not_respawning_clears_kill_event(self):
-        self._test__kill(False)
-
-    def test__kill_targets_process_for_pid(self):
-        self._test__kill(False, pid='1')
-
     def _test__kill_process(self, pid, expected, exception_message=None,
                             kill_signal=signal.SIGKILL):
         self.proc.run_as_root = True
@@ -211,7 +219,7 @@ class TestAsyncProcess(base.BaseTestCase):
         self._test__kill_process('1', True, kill_signal=signal.SIGTERM)
 
     def test_stop_calls_kill_with_provided_signal_number(self):
-        self.proc._kill_event = True
+        self.proc._is_running = True
         with mock.patch.object(self.proc, '_kill') as mock_kill:
             self.proc.stop(kill_signal=signal.SIGTERM)
         mock_kill.assert_called_once_with(signal.SIGTERM)
@@ -267,3 +275,20 @@ class TestAsyncProcessDieOnError(base.BaseTestCase):
         with mock.patch.object(proc, '_read', return_value='fakedata'),\
                 mock.patch.object(proc, '_process'):
             self.assertIsNone(proc._read_stderr())
+
+
+class TestFailingAsyncProcess(base.BaseTestCase):
+    def setUp(self):
+        super(TestFailingAsyncProcess, self).setUp()
+        path = self.get_temp_file_path('async.tmp', self.get_new_temp_dir())
+        self.process = async_process.AsyncProcess(['python',
+                                                   failing_process.__file__,
+                                                   path],
+                                                  respawn_interval=0)
+
+    def test_failing_async_process_handle_error_once(self):
+        with mock.patch.object(self.process, '_handle_process_error')\
+                as handle_error_mock:
+            self.process.start()
+            self.process._process.wait()
+            self.assertEqual(1, handle_error_mock.call_count)