]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add support for managing async processes
authorMaru Newby <marun@redhat.com>
Mon, 9 Sep 2013 08:29:54 +0000 (01:29 -0700)
committerMaru Newby <marun@redhat.com>
Mon, 14 Oct 2013 07:20:02 +0000 (07:20 +0000)
Interacting with a long-running asynchronous process requires the
use of non-blocking io.  This change adds a helper class that can
launch a long-running process and read stdout and stderr in a
non-blocking fashion via eventlet.

This functionality is intended to support monitoring ovsdb via
a long-running and root-privileged invocation of ovsdb-client.

The complexity of the system interaction in this patch suggested
the addition of a functional test that validated actual behaviour.
The test was added under the neutron/tests/functional path which
is now included in the testr search path.

Partial-Bug: #1177973

Change-Id: I9969e556acecf7a9e77d873371cc2ec2647be011

12 files changed:
.testr.conf
TESTING
neutron/agent/linux/async_process.py [new file with mode: 0644]
neutron/agent/linux/utils.py
neutron/tests/functional/__init__.py [new file with mode: 0644]
neutron/tests/functional/agent/__init__.py [new file with mode: 0644]
neutron/tests/functional/agent/linux/__init__.py [new file with mode: 0644]
neutron/tests/functional/agent/linux/test_async_process.py [new file with mode: 0644]
neutron/tests/unit/agent/__init__.py [new file with mode: 0644]
neutron/tests/unit/agent/linux/__init__.py [new file with mode: 0644]
neutron/tests/unit/agent/linux/test_async_process.py [new file with mode: 0644]
neutron/tests/unit/test_agent_linux_utils.py

index 01d160ec50c872aadb2ada6f0bfe34aa71bfd867..b63d965523cf5ab1f8f4ac44f12ad94c12e4b6c2 100644 (file)
@@ -1,4 +1,4 @@
 [DEFAULT]
-test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests/unit $LISTOPT $IDOPTION
+test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests $LISTOPT $IDOPTION
 test_id_option=--load-list $IDFILE
 test_list_option=--list
diff --git a/TESTING b/TESTING
index 8162b2fd6627ac123eeda6050384204df72dde3a..cd5ae4dab15667a9d62aff31ca65d3eb646ef589 100644 (file)
--- a/TESTING
+++ b/TESTING
@@ -8,12 +8,18 @@ Overview
     the various pieces of the neutron tree to make sure any new changes
     don't break existing functionality.
 
+    The functional tests are intended to validate actual system
+    interaction.  Mocks should be used sparingly, if at all.  Care
+    should be taken to ensure that existing system resources are not
+    modified and that resources created in tests are properly cleaned
+    up.
+
 Running tests
 
     There are two mechanisms for running tests: run_tests.sh and tox.
-    Before submitting a patch for review you should always ensure all unit
-    test pass; a tox run is triggered by the jenkins gate executed on gerrit
-    for each patch pushed for review.
+    Before submitting a patch for review you should always ensure all
+    test pass; a tox run is triggered by the jenkins gate executed on
+    gerrit for each patch pushed for review.
 
     With both mechanisms you can either run the tests in the standard
     environment or create a virtual environment to run them in.
@@ -41,18 +47,18 @@ Running individual tests
 Adding more tests
 
     Neutron has a fast growing code base and there is plenty of areas that
-    need to be covered by unit tests.
+    need to be covered by unit and functional tests.
 
-    To get a grasp of the areas where unit tests are needed, you can check
+    To get a grasp of the areas where tests are needed, you can check
     current coverage by running:
 
     $ ./run_tests.sh -c
 
 Development process
 
-    It is expected that any new changes that are proposed for merge come with
-    unit tests for that feature or code area. Ideally any bugs fixes that are
-    submitted also have unit tests to prove that they stay fixed!
-    In addition, before proposing for merge, all of the current unit tests
-    should be passing.
+    It is expected that any new changes that are proposed for merge
+    come with tests for that feature or code area. Ideally any bugs
+    fixes that are submitted also have tests to prove that they stay
+    fixed!  In addition, before proposing for merge, all of the
+    current tests should be passing.
 
diff --git a/neutron/agent/linux/async_process.py b/neutron/agent/linux/async_process.py
new file mode 100644 (file)
index 0000000..aa41c9c
--- /dev/null
@@ -0,0 +1,214 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet
+import eventlet.event
+import eventlet.queue
+import eventlet.timeout
+
+from neutron.agent.linux import utils
+from neutron.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class AsyncProcessException(Exception):
+    pass
+
+
+class AsyncProcess(object):
+    """Manages an asynchronous process.
+
+    This class spawns a new process via subprocess and uses
+    greenthreads to read stderr and stdout asynchronously into queues
+    that can be read via repeatedly calling iter_stdout() and
+    iter_stderr().
+
+    If respawn_interval is non-zero, any error in communicating with
+    the managed process will result in the process and greenthreads
+    being cleaned up and the process restarted after the specified
+    interval.
+
+    Example usage:
+
+    >>> import time
+    >>> proc = AsyncProcess(['ping'])
+    >>> proc.start()
+    >>> time.sleep(5)
+    >>> proc.stop()
+    >>> for line in proc.iter_stdout():
+    ...     print line
+    """
+
+    def __init__(self, cmd, root_helper=None, respawn_interval=None):
+        """Constructor.
+
+        :param cmd: The list of command arguments to invoke.
+        :param root_helper: Optional, utility to use when running shell cmds.
+        :param respawn_interval: Optional, the interval in seconds to wait
+               to respawn after unexpected process death. Respawn will
+               only be attempted if a value of 0 or greater is provided.
+        """
+        self.cmd = cmd
+        self.root_helper = root_helper
+        if respawn_interval is not None and respawn_interval < 0:
+            raise ValueError(_('respawn_interval must be >= 0 if provided.'))
+        self.respawn_interval = respawn_interval
+        self._process = None
+        self._kill_event = None
+        self._stdout_lines = eventlet.queue.LightQueue()
+        self._stderr_lines = eventlet.queue.LightQueue()
+        self._watchers = []
+
+    def start(self):
+        """Launch a process and monitor it asynchronously."""
+        if self._kill_event:
+            raise AsyncProcessException(_('Process is already started'))
+        else:
+            LOG.debug(_('Launching async process [%s].'), self.cmd)
+            self._spawn()
+
+    def stop(self):
+        """Halt the process and watcher threads."""
+        if self._kill_event:
+            LOG.debug(_('Halting async process [%s].'), self.cmd)
+            self._kill()
+        else:
+            raise AsyncProcessException(_('Process is not running.'))
+
+    def _spawn(self):
+        """Spawn a process and its watchers."""
+        self._kill_event = eventlet.event.Event()
+        self._process, cmd = utils.create_process(self.cmd,
+                                                  root_helper=self.root_helper)
+        self._watchers = []
+        for reader in (self._read_stdout, self._read_stderr):
+            # Pass the stop event directly to the greenthread to
+            # ensure that assignment of a new event to the instance
+            # attribute does not prevent the greenthread from using
+            # the original event.
+            watcher = eventlet.spawn(self._watch_process,
+                                     reader,
+                                     self._kill_event)
+            self._watchers.append(watcher)
+
+    def _kill(self, respawning=False):
+        """Kill the process and the associated watcher greenthreads.
+
+        :param respawning: Optional, whether respawn will be subsequently
+               attempted.
+        """
+        # Halt the greenthreads
+        self._kill_event.send()
+
+        pid = self._get_pid_to_kill()
+        if pid:
+            self._kill_process(pid)
+
+        if not respawning:
+            # Clear the kill event to ensure the process can be
+            # explicitly started again.
+            self._kill_event = None
+
+    def _get_pid_to_kill(self):
+        pid = self._process.pid
+        # If root helper was used, two processes will be created:
+        #
+        #  - a root helper process (e.g. sudo myscript)
+        #  - a child process (e.g. myscript)
+        #
+        # Killing the root helper process will leave the child process
+        # as a zombie, so the only way to ensure that both die is to
+        # target the child process directly.
+        if self.root_helper:
+            pids = utils.find_child_pids(pid)
+            if pids:
+                # The root helper will only ever launch a single child.
+                pid = pids[0]
+            else:
+                # Process is already dead.
+                pid = None
+        return pid
+
+    def _kill_process(self, pid):
+        try:
+            # A process started by a root helper will be running as
+            # root and need to be killed via the same helper.
+            utils.execute(['kill', '-9', pid], root_helper=self.root_helper)
+        except Exception as ex:
+            stale_pid = (isinstance(ex, RuntimeError) and
+                         'No such process' in str(ex))
+            if not stale_pid:
+                LOG.exception(_('An error occurred while killing [%s].'),
+                              self.cmd)
+                return False
+        return True
+
+    def _handle_process_error(self):
+        """Kill the async process and respawn if necessary."""
+        LOG.debug(_('Halting async process [%s] in response to an error.'),
+                  self.cmd)
+        respawning = self.respawn_interval >= 0
+        self._kill(respawning=respawning)
+        if respawning:
+            eventlet.sleep(self.respawn_interval)
+            LOG.debug(_('Respawning async process [%s].'), self.cmd)
+            self._spawn()
+
+    def _watch_process(self, callback, kill_event):
+        while not kill_event.ready():
+            try:
+                if not callback():
+                    break
+            except Exception:
+                LOG.exception(_('An error occured while communicating '
+                                'with async process [%s].'), self.cmd)
+                break
+            # Ensure that watching a process with lots of output does
+            # not block execution of other greenthreads.
+            eventlet.sleep()
+        # The kill event not being ready indicates that the loop was
+        # broken out of due to an error in the watched process rather
+        # than the loop condition being satisfied.
+        if not kill_event.ready():
+            self._handle_process_error()
+
+    def _read(self, stream, queue):
+        data = stream.readline()
+        if data:
+            data = data.strip()
+            queue.put(data)
+            return data
+
+    def _read_stdout(self):
+        return self._read(self._process.stdout, self._stdout_lines)
+
+    def _read_stderr(self):
+        return self._read(self._process.stderr, self._stderr_lines)
+
+    def _iter_queue(self, queue):
+        while True:
+            try:
+                yield queue.get_nowait()
+            except eventlet.queue.Empty:
+                break
+
+    def iter_stdout(self):
+        return self._iter_queue(self._stdout_lines)
+
+    def iter_stderr(self):
+        return self._iter_queue(self._stderr_lines)
index 6e0aae41efb57536c0874cd24359eeacdd79453e..f292906fd90c9fac93acbcb2f4cf497affc3663c 100644 (file)
@@ -34,8 +34,12 @@ from neutron.openstack.common import log as logging
 LOG = logging.getLogger(__name__)
 
 
-def execute(cmd, root_helper=None, process_input=None, addl_env=None,
-            check_exit_code=True, return_stderr=False):
+def create_process(cmd, root_helper=None, addl_env=None):
+    """Create a process object for the given command.
+
+    The return value will be a tuple of the process object and the
+    list of command arguments used to create it.
+    """
     if root_helper:
         cmd = shlex.split(root_helper) + cmd
     cmd = map(str, cmd)
@@ -44,12 +48,21 @@ def execute(cmd, root_helper=None, process_input=None, addl_env=None,
     env = os.environ.copy()
     if addl_env:
         env.update(addl_env)
+
+    obj = utils.subprocess_popen(cmd, shell=False,
+                                 stdin=subprocess.PIPE,
+                                 stdout=subprocess.PIPE,
+                                 stderr=subprocess.PIPE,
+                                 env=env)
+
+    return obj, cmd
+
+
+def execute(cmd, root_helper=None, process_input=None, addl_env=None,
+            check_exit_code=True, return_stderr=False):
     try:
-        obj = utils.subprocess_popen(cmd, shell=False,
-                                     stdin=subprocess.PIPE,
-                                     stdout=subprocess.PIPE,
-                                     stderr=subprocess.PIPE,
-                                     env=env)
+        obj, cmd = create_process(cmd, root_helper=root_helper,
+                                  addl_env=addl_env)
         _stdout, _stderr = (process_input and
                             obj.communicate(process_input) or
                             obj.communicate())
@@ -95,3 +108,17 @@ def replace_file(file_name, data):
     tmp_file.close()
     os.chmod(tmp_file.name, 0o644)
     os.rename(tmp_file.name, file_name)
+
+
+def find_child_pids(pid):
+    """Retrieve a list of the pids of child processes of the given pid."""
+    try:
+        raw_pids = execute(['ps', '--ppid', pid, '-o', 'pid='])
+    except RuntimeError as e:
+        # Exception has already been logged by execute
+        no_children_found = 'Exit code: 1' in str(e)
+        if no_children_found:
+            return []
+        # Unexpected errors are the responsibility of the caller
+        raise
+    return [x.strip() for x in raw_pids.split('\n') if x.strip()]
diff --git a/neutron/tests/functional/__init__.py b/neutron/tests/functional/__init__.py
new file mode 100644 (file)
index 0000000..ac4d6cb
--- /dev/null
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/tests/functional/agent/__init__.py b/neutron/tests/functional/agent/__init__.py
new file mode 100644 (file)
index 0000000..ac4d6cb
--- /dev/null
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/tests/functional/agent/linux/__init__.py b/neutron/tests/functional/agent/linux/__init__.py
new file mode 100644 (file)
index 0000000..ac4d6cb
--- /dev/null
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/tests/functional/agent/linux/test_async_process.py b/neutron/tests/functional/agent/linux/test_async_process.py
new file mode 100644 (file)
index 0000000..f7fa330
--- /dev/null
@@ -0,0 +1,79 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import contextlib
+
+import eventlet
+import eventlet.timeout
+import fixtures
+
+from neutron.agent.linux import async_process
+from neutron.tests import base
+
+
+class TestAsyncProcess(base.BaseTestCase):
+
+    def setUp(self):
+        super(TestAsyncProcess, self).setUp()
+        self.test_file_path = self.useFixture(
+            fixtures.TempDir()).join("test_async_process.tmp")
+        self.data = [str(x) for x in xrange(4)]
+        with file(self.test_file_path, 'w') as f:
+            f.writelines('%s\n' % item for item in self.data)
+
+    def _check_stdout(self, proc):
+        # Ensure that all the output from the file is read
+        output = []
+        while output != self.data:
+            new_output = list(proc.iter_stdout())
+            if new_output:
+                output += new_output
+            eventlet.sleep(0.01)
+
+    @contextlib.contextmanager
+    def assert_max_execution_time(self, max_execution_time=5):
+        with eventlet.timeout.Timeout(max_execution_time, False):
+            yield
+            return
+        self.fail('Execution of this test timed out')
+
+    def test_stopping_async_process_lifecycle(self):
+        with self.assert_max_execution_time():
+            proc = async_process.AsyncProcess(['tail', '-f',
+                                               self.test_file_path])
+            proc.start()
+            self._check_stdout(proc)
+            proc.stop()
+
+            # Ensure that the process and greenthreads have stopped
+            proc._process.wait()
+            self.assertEqual(proc._process.returncode, -9)
+            for watcher in proc._watchers:
+                watcher.wait()
+
+    def test_async_process_respawns(self):
+        with self.assert_max_execution_time():
+            proc = async_process.AsyncProcess(['tail', '-f',
+                                               self.test_file_path],
+                                              respawn_interval=0)
+            proc.start()
+
+            # Ensure that the same output is read twice
+            self._check_stdout(proc)
+            pid = proc._get_pid_to_kill()
+            proc._kill_process(pid)
+            self._check_stdout(proc)
+            proc.stop()
diff --git a/neutron/tests/unit/agent/__init__.py b/neutron/tests/unit/agent/__init__.py
new file mode 100644 (file)
index 0000000..ac4d6cb
--- /dev/null
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/tests/unit/agent/linux/__init__.py b/neutron/tests/unit/agent/linux/__init__.py
new file mode 100644 (file)
index 0000000..ac4d6cb
--- /dev/null
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/tests/unit/agent/linux/test_async_process.py b/neutron/tests/unit/agent/linux/test_async_process.py
new file mode 100644 (file)
index 0000000..d57a301
--- /dev/null
@@ -0,0 +1,239 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet.event
+import eventlet.queue
+import eventlet.timeout
+import mock
+import testtools
+
+from neutron.agent.linux import async_process
+from neutron.agent.linux import utils
+from neutron.tests import base
+
+
+_marker = ()
+
+
+class TestAsyncProcess(base.BaseTestCase):
+
+    def setUp(self):
+        super(TestAsyncProcess, self).setUp()
+        self.proc = async_process.AsyncProcess(['fake'])
+
+    def test_construtor_raises_exception_for_negative_respawn_interval(self):
+        with testtools.ExpectedException(ValueError):
+            async_process.AsyncProcess(['fake'], respawn_interval=-1)
+
+    def test__spawn(self):
+        expected_process = 'Foo'
+        proc = self.proc
+        with mock.patch.object(utils, 'create_process') as mock_create_process:
+            mock_create_process.return_value = [expected_process, None]
+            with mock.patch('eventlet.spawn') as mock_spawn:
+                proc._spawn()
+
+        self.assertIsInstance(proc._kill_event, eventlet.event.Event)
+        self.assertEqual(proc._process, expected_process)
+        mock_spawn.assert_has_calls([
+            mock.call(proc._watch_process,
+                      proc._read_stdout,
+                      proc._kill_event),
+            mock.call(proc._watch_process,
+                      proc._read_stderr,
+                      proc._kill_event),
+        ])
+        self.assertEqual(len(proc._watchers), 2)
+
+    def test__handle_process_error_kills_with_respawn(self):
+        with mock.patch.object(self.proc, '_kill') as kill:
+            self.proc._handle_process_error()
+
+        kill.assert_has_calls(mock.call(respawning=False))
+
+    def test__handle_process_error_kills_without_respawn(self):
+        self.proc.respawn_interval = 1
+        with mock.patch.object(self.proc, '_kill') as kill:
+            with mock.patch.object(self.proc, '_spawn') as spawn:
+                with mock.patch('eventlet.sleep') as sleep:
+                    self.proc._handle_process_error()
+
+        kill.assert_has_calls(mock.call(respawning=True))
+        sleep.assert_has_calls(mock.call(self.proc.respawn_interval))
+        spawn.assert_called_once()
+
+    def _test__watch_process(self, callback, kill_event):
+        self.proc._kill_event = kill_event
+        # Ensure the test times out eventually if the watcher loops endlessly
+        with eventlet.timeout.Timeout(5):
+            with mock.patch.object(self.proc,
+                                   '_handle_process_error') as func:
+                self.proc._watch_process(callback, kill_event)
+
+        if not kill_event.ready():
+            func.assert_called_once()
+
+    def test__watch_process_exits_on_callback_failure(self):
+        self._test__watch_process(lambda: False, eventlet.event.Event())
+
+    def test__watch_process_exits_on_exception(self):
+        def foo():
+            raise Exception('Error!')
+        self._test__watch_process(foo, eventlet.event.Event())
+
+    def test__watch_process_exits_on_sent_kill_event(self):
+        kill_event = eventlet.event.Event()
+        kill_event.send()
+        self._test__watch_process(None, kill_event)
+
+    def _test_read_output_queues_and_returns_result(self, output):
+        queue = eventlet.queue.LightQueue()
+        mock_stream = mock.Mock()
+        with mock.patch.object(mock_stream, 'readline') as mock_readline:
+            mock_readline.return_value = output
+            result = self.proc._read(mock_stream, queue)
+
+        if output:
+            self.assertEqual(output, result)
+            self.assertEqual(output, queue.get_nowait())
+        else:
+            self.assertFalse(result)
+            self.assertTrue(queue.empty())
+
+    def test__read_queues_and_returns_output(self):
+        self._test_read_output_queues_and_returns_result('foo')
+
+    def test__read_returns_none_for_missing_output(self):
+        self._test_read_output_queues_and_returns_result('')
+
+    def test_start_raises_exception_if_process_already_started(self):
+        self.proc._kill_event = True
+        with testtools.ExpectedException(async_process.AsyncProcessException):
+            self.proc.start()
+
+    def test_start_invokes__spawn(self):
+        with mock.patch.object(self.proc, '_spawn') as mock_start:
+            self.proc.start()
+
+        mock_start.assert_called_once()
+
+    def test__iter_queue_returns_empty_list_for_empty_queue(self):
+        result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
+        self.assertEqual(result, [])
+
+    def test__iter_queue_returns_queued_data(self):
+        queue = eventlet.queue.LightQueue()
+        queue.put('foo')
+        result = list(self.proc._iter_queue(queue))
+        self.assertEqual(result, ['foo'])
+
+    def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
+        expected_value = 'foo'
+        with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue:
+            mock_iter_queue.return_value = expected_value
+            target_func = getattr(self.proc, 'iter_%s' % output_type, None)
+            value = target_func()
+
+        self.assertEqual(value, expected_value)
+        queue = getattr(self.proc, '_%s_lines' % output_type, None)
+        mock_iter_queue.assert_called_with(queue)
+
+    def test_iter_stdout(self):
+        self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
+
+    def test_iter_stderr(self):
+        self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
+
+    def _test__kill(self, respawning, pid=None):
+        with mock.patch.object(self.proc, '_kill_event') as mock_kill_event:
+            with mock.patch.object(self.proc, '_get_pid_to_kill',
+                                   return_value=pid):
+                with mock.patch.object(self.proc,
+                                       '_kill_process') as mock_kill_process:
+                    self.proc._kill(respawning)
+
+            if respawning:
+                self.assertIsNotNone(self.proc._kill_event)
+            else:
+                self.assertIsNone(self.proc._kill_event)
+
+        mock_kill_event.send.assert_called_once()
+        if pid:
+            mock_kill_process.assert_called_once(pid)
+
+    def test__kill_when_respawning_does_not_clear_kill_event(self):
+        self._test__kill(True)
+
+    def test__kill_when_not_respawning_clears_kill_event(self):
+        self._test__kill(False)
+
+    def test__kill_targets_process_for_pid(self):
+        self._test__kill(False, pid='1')
+
+    def _test__get_pid_to_kill(self, expected=_marker,
+                               root_helper=None, pids=None):
+        if root_helper:
+            self.proc.root_helper = root_helper
+        with mock.patch.object(self.proc, '_process') as mock_process:
+            with mock.patch.object(mock_process, 'pid') as mock_pid:
+                with mock.patch.object(utils, 'find_child_pids',
+                                       return_value=pids):
+                    actual = self.proc._get_pid_to_kill()
+        if expected is _marker:
+            expected = mock_pid
+        self.assertEqual(expected, actual)
+
+    def test__get_pid_to_kill_returns_process_pid_without_root_helper(self):
+        self._test__get_pid_to_kill()
+
+    def test__get_pid_to_kill_returns_child_pid_with_root_helper(self):
+        self._test__get_pid_to_kill(expected='1', pids=['1'], root_helper='a')
+
+    def test__get_pid_to_kill_returns_none_with_root_helper(self):
+        self._test__get_pid_to_kill(expected=None, root_helper='a')
+
+    def _test__kill_process(self, pid, expected, exception_message=None):
+        self.proc.root_helper = 'foo'
+        if exception_message:
+            exc = RuntimeError(exception_message)
+        else:
+            exc = None
+        with mock.patch.object(utils, 'execute',
+                               side_effect=exc) as mock_execute:
+            actual = self.proc._kill_process(pid)
+
+        self.assertEqual(expected, actual)
+        mock_execute.assert_called_with(['kill', '-9', pid],
+                                        root_helper=self.proc.root_helper)
+
+    def test__kill_process_returns_true_for_valid_pid(self):
+        self._test__kill_process('1', True)
+
+    def test__kill_process_returns_true_for_stale_pid(self):
+        self._test__kill_process('1', True, 'No such process')
+
+    def test__kill_process_returns_false_for_execute_exception(self):
+        self._test__kill_process('1', False, 'Invalid')
+
+    def test_stop_calls_kill(self):
+        self.proc._kill_event = True
+        with mock.patch.object(self.proc, '_kill') as mock_kill:
+            self.proc.stop()
+        mock_kill.called_once()
+
+    def test_stop_raises_exception_if_already_started(self):
+        with testtools.ExpectedException(async_process.AsyncProcessException):
+            self.proc.stop()
index cccbf2024f622f4e0872dea9061ecb91a10a7490..6b7fbbfd00d9263984d9541408fae869f1a64130 100644 (file)
@@ -17,6 +17,7 @@
 
 import fixtures
 import mock
+import testtools
 
 from neutron.agent.linux import utils
 from neutron.tests import base
@@ -106,3 +107,25 @@ class AgentUtilsReplaceFile(base.BaseTestCase):
                     ntf.assert_has_calls(expected)
                     chmod.assert_called_once_with('/baz', 0o644)
                     rename.assert_called_once_with('/baz', '/foo')
+
+
+class TestFindChildPids(base.BaseTestCase):
+
+    def test_returns_empty_list_for_exit_code_1(self):
+        with mock.patch.object(utils, 'execute',
+                               side_effect=RuntimeError('Exit code: 1')):
+            self.assertEqual(utils.find_child_pids(-1), [])
+
+    def test_returns_empty_list_for_no_output(self):
+        with mock.patch.object(utils, 'execute', return_value=''):
+            self.assertEqual(utils.find_child_pids(-1), [])
+
+    def test_returns_list_of_child_process_ids_for_good_ouput(self):
+        with mock.patch.object(utils, 'execute', return_value=' 123 \n 185\n'):
+            self.assertEqual(utils.find_child_pids(-1), ['123', '185'])
+
+    def test_raises_unknown_exception(self):
+        with testtools.ExpectedException(RuntimeError):
+            with mock.patch.object(utils, 'execute',
+                                   side_effect=RuntimeError()):
+                utils.find_child_pids(-1)