[DEFAULT]
-test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests/unit $LISTOPT $IDOPTION
+test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list
the various pieces of the neutron tree to make sure any new changes
don't break existing functionality.
+ The functional tests are intended to validate actual system
+ interaction. Mocks should be used sparingly, if at all. Care
+ should be taken to ensure that existing system resources are not
+ modified and that resources created in tests are properly cleaned
+ up.
+
Running tests
There are two mechanisms for running tests: run_tests.sh and tox.
- Before submitting a patch for review you should always ensure all unit
- test pass; a tox run is triggered by the jenkins gate executed on gerrit
- for each patch pushed for review.
+ Before submitting a patch for review you should always ensure all
+ test pass; a tox run is triggered by the jenkins gate executed on
+ gerrit for each patch pushed for review.
With both mechanisms you can either run the tests in the standard
environment or create a virtual environment to run them in.
Adding more tests
Neutron has a fast growing code base and there is plenty of areas that
- need to be covered by unit tests.
+ need to be covered by unit and functional tests.
- To get a grasp of the areas where unit tests are needed, you can check
+ To get a grasp of the areas where tests are needed, you can check
current coverage by running:
$ ./run_tests.sh -c
Development process
- It is expected that any new changes that are proposed for merge come with
- unit tests for that feature or code area. Ideally any bugs fixes that are
- submitted also have unit tests to prove that they stay fixed!
- In addition, before proposing for merge, all of the current unit tests
- should be passing.
+ It is expected that any new changes that are proposed for merge
+ come with tests for that feature or code area. Ideally any bugs
+ fixes that are submitted also have tests to prove that they stay
+ fixed! In addition, before proposing for merge, all of the
+ current tests should be passing.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+import eventlet.event
+import eventlet.queue
+import eventlet.timeout
+
+from neutron.agent.linux import utils
+from neutron.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class AsyncProcessException(Exception):
+ pass
+
+
+class AsyncProcess(object):
+ """Manages an asynchronous process.
+
+ This class spawns a new process via subprocess and uses
+ greenthreads to read stderr and stdout asynchronously into queues
+ that can be read via repeatedly calling iter_stdout() and
+ iter_stderr().
+
+ If respawn_interval is non-zero, any error in communicating with
+ the managed process will result in the process and greenthreads
+ being cleaned up and the process restarted after the specified
+ interval.
+
+ Example usage:
+
+ >>> import time
+ >>> proc = AsyncProcess(['ping'])
+ >>> proc.start()
+ >>> time.sleep(5)
+ >>> proc.stop()
+ >>> for line in proc.iter_stdout():
+ ... print line
+ """
+
+ def __init__(self, cmd, root_helper=None, respawn_interval=None):
+ """Constructor.
+
+ :param cmd: The list of command arguments to invoke.
+ :param root_helper: Optional, utility to use when running shell cmds.
+ :param respawn_interval: Optional, the interval in seconds to wait
+ to respawn after unexpected process death. Respawn will
+ only be attempted if a value of 0 or greater is provided.
+ """
+ self.cmd = cmd
+ self.root_helper = root_helper
+ if respawn_interval is not None and respawn_interval < 0:
+ raise ValueError(_('respawn_interval must be >= 0 if provided.'))
+ self.respawn_interval = respawn_interval
+ self._process = None
+ self._kill_event = None
+ self._stdout_lines = eventlet.queue.LightQueue()
+ self._stderr_lines = eventlet.queue.LightQueue()
+ self._watchers = []
+
+ def start(self):
+ """Launch a process and monitor it asynchronously."""
+ if self._kill_event:
+ raise AsyncProcessException(_('Process is already started'))
+ else:
+ LOG.debug(_('Launching async process [%s].'), self.cmd)
+ self._spawn()
+
+ def stop(self):
+ """Halt the process and watcher threads."""
+ if self._kill_event:
+ LOG.debug(_('Halting async process [%s].'), self.cmd)
+ self._kill()
+ else:
+ raise AsyncProcessException(_('Process is not running.'))
+
+ def _spawn(self):
+ """Spawn a process and its watchers."""
+ self._kill_event = eventlet.event.Event()
+ self._process, cmd = utils.create_process(self.cmd,
+ root_helper=self.root_helper)
+ self._watchers = []
+ for reader in (self._read_stdout, self._read_stderr):
+ # Pass the stop event directly to the greenthread to
+ # ensure that assignment of a new event to the instance
+ # attribute does not prevent the greenthread from using
+ # the original event.
+ watcher = eventlet.spawn(self._watch_process,
+ reader,
+ self._kill_event)
+ self._watchers.append(watcher)
+
+ def _kill(self, respawning=False):
+ """Kill the process and the associated watcher greenthreads.
+
+ :param respawning: Optional, whether respawn will be subsequently
+ attempted.
+ """
+ # Halt the greenthreads
+ self._kill_event.send()
+
+ pid = self._get_pid_to_kill()
+ if pid:
+ self._kill_process(pid)
+
+ if not respawning:
+ # Clear the kill event to ensure the process can be
+ # explicitly started again.
+ self._kill_event = None
+
+ def _get_pid_to_kill(self):
+ pid = self._process.pid
+ # If root helper was used, two processes will be created:
+ #
+ # - a root helper process (e.g. sudo myscript)
+ # - a child process (e.g. myscript)
+ #
+ # Killing the root helper process will leave the child process
+ # as a zombie, so the only way to ensure that both die is to
+ # target the child process directly.
+ if self.root_helper:
+ pids = utils.find_child_pids(pid)
+ if pids:
+ # The root helper will only ever launch a single child.
+ pid = pids[0]
+ else:
+ # Process is already dead.
+ pid = None
+ return pid
+
+ def _kill_process(self, pid):
+ try:
+ # A process started by a root helper will be running as
+ # root and need to be killed via the same helper.
+ utils.execute(['kill', '-9', pid], root_helper=self.root_helper)
+ except Exception as ex:
+ stale_pid = (isinstance(ex, RuntimeError) and
+ 'No such process' in str(ex))
+ if not stale_pid:
+ LOG.exception(_('An error occurred while killing [%s].'),
+ self.cmd)
+ return False
+ return True
+
+ def _handle_process_error(self):
+ """Kill the async process and respawn if necessary."""
+ LOG.debug(_('Halting async process [%s] in response to an error.'),
+ self.cmd)
+ respawning = self.respawn_interval >= 0
+ self._kill(respawning=respawning)
+ if respawning:
+ eventlet.sleep(self.respawn_interval)
+ LOG.debug(_('Respawning async process [%s].'), self.cmd)
+ self._spawn()
+
+ def _watch_process(self, callback, kill_event):
+ while not kill_event.ready():
+ try:
+ if not callback():
+ break
+ except Exception:
+ LOG.exception(_('An error occured while communicating '
+ 'with async process [%s].'), self.cmd)
+ break
+ # Ensure that watching a process with lots of output does
+ # not block execution of other greenthreads.
+ eventlet.sleep()
+ # The kill event not being ready indicates that the loop was
+ # broken out of due to an error in the watched process rather
+ # than the loop condition being satisfied.
+ if not kill_event.ready():
+ self._handle_process_error()
+
+ def _read(self, stream, queue):
+ data = stream.readline()
+ if data:
+ data = data.strip()
+ queue.put(data)
+ return data
+
+ def _read_stdout(self):
+ return self._read(self._process.stdout, self._stdout_lines)
+
+ def _read_stderr(self):
+ return self._read(self._process.stderr, self._stderr_lines)
+
+ def _iter_queue(self, queue):
+ while True:
+ try:
+ yield queue.get_nowait()
+ except eventlet.queue.Empty:
+ break
+
+ def iter_stdout(self):
+ return self._iter_queue(self._stdout_lines)
+
+ def iter_stderr(self):
+ return self._iter_queue(self._stderr_lines)
LOG = logging.getLogger(__name__)
-def execute(cmd, root_helper=None, process_input=None, addl_env=None,
- check_exit_code=True, return_stderr=False):
+def create_process(cmd, root_helper=None, addl_env=None):
+ """Create a process object for the given command.
+
+ The return value will be a tuple of the process object and the
+ list of command arguments used to create it.
+ """
if root_helper:
cmd = shlex.split(root_helper) + cmd
cmd = map(str, cmd)
env = os.environ.copy()
if addl_env:
env.update(addl_env)
+
+ obj = utils.subprocess_popen(cmd, shell=False,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env)
+
+ return obj, cmd
+
+
+def execute(cmd, root_helper=None, process_input=None, addl_env=None,
+ check_exit_code=True, return_stderr=False):
try:
- obj = utils.subprocess_popen(cmd, shell=False,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=env)
+ obj, cmd = create_process(cmd, root_helper=root_helper,
+ addl_env=addl_env)
_stdout, _stderr = (process_input and
obj.communicate(process_input) or
obj.communicate())
tmp_file.close()
os.chmod(tmp_file.name, 0o644)
os.rename(tmp_file.name, file_name)
+
+
+def find_child_pids(pid):
+ """Retrieve a list of the pids of child processes of the given pid."""
+ try:
+ raw_pids = execute(['ps', '--ppid', pid, '-o', 'pid='])
+ except RuntimeError as e:
+ # Exception has already been logged by execute
+ no_children_found = 'Exit code: 1' in str(e)
+ if no_children_found:
+ return []
+ # Unexpected errors are the responsibility of the caller
+ raise
+ return [x.strip() for x in raw_pids.split('\n') if x.strip()]
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+
+import eventlet
+import eventlet.timeout
+import fixtures
+
+from neutron.agent.linux import async_process
+from neutron.tests import base
+
+
+class TestAsyncProcess(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestAsyncProcess, self).setUp()
+ self.test_file_path = self.useFixture(
+ fixtures.TempDir()).join("test_async_process.tmp")
+ self.data = [str(x) for x in xrange(4)]
+ with file(self.test_file_path, 'w') as f:
+ f.writelines('%s\n' % item for item in self.data)
+
+ def _check_stdout(self, proc):
+ # Ensure that all the output from the file is read
+ output = []
+ while output != self.data:
+ new_output = list(proc.iter_stdout())
+ if new_output:
+ output += new_output
+ eventlet.sleep(0.01)
+
+ @contextlib.contextmanager
+ def assert_max_execution_time(self, max_execution_time=5):
+ with eventlet.timeout.Timeout(max_execution_time, False):
+ yield
+ return
+ self.fail('Execution of this test timed out')
+
+ def test_stopping_async_process_lifecycle(self):
+ with self.assert_max_execution_time():
+ proc = async_process.AsyncProcess(['tail', '-f',
+ self.test_file_path])
+ proc.start()
+ self._check_stdout(proc)
+ proc.stop()
+
+ # Ensure that the process and greenthreads have stopped
+ proc._process.wait()
+ self.assertEqual(proc._process.returncode, -9)
+ for watcher in proc._watchers:
+ watcher.wait()
+
+ def test_async_process_respawns(self):
+ with self.assert_max_execution_time():
+ proc = async_process.AsyncProcess(['tail', '-f',
+ self.test_file_path],
+ respawn_interval=0)
+ proc.start()
+
+ # Ensure that the same output is read twice
+ self._check_stdout(proc)
+ pid = proc._get_pid_to_kill()
+ proc._kill_process(pid)
+ self._check_stdout(proc)
+ proc.stop()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet.event
+import eventlet.queue
+import eventlet.timeout
+import mock
+import testtools
+
+from neutron.agent.linux import async_process
+from neutron.agent.linux import utils
+from neutron.tests import base
+
+
+_marker = ()
+
+
+class TestAsyncProcess(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestAsyncProcess, self).setUp()
+ self.proc = async_process.AsyncProcess(['fake'])
+
+ def test_construtor_raises_exception_for_negative_respawn_interval(self):
+ with testtools.ExpectedException(ValueError):
+ async_process.AsyncProcess(['fake'], respawn_interval=-1)
+
+ def test__spawn(self):
+ expected_process = 'Foo'
+ proc = self.proc
+ with mock.patch.object(utils, 'create_process') as mock_create_process:
+ mock_create_process.return_value = [expected_process, None]
+ with mock.patch('eventlet.spawn') as mock_spawn:
+ proc._spawn()
+
+ self.assertIsInstance(proc._kill_event, eventlet.event.Event)
+ self.assertEqual(proc._process, expected_process)
+ mock_spawn.assert_has_calls([
+ mock.call(proc._watch_process,
+ proc._read_stdout,
+ proc._kill_event),
+ mock.call(proc._watch_process,
+ proc._read_stderr,
+ proc._kill_event),
+ ])
+ self.assertEqual(len(proc._watchers), 2)
+
+ def test__handle_process_error_kills_with_respawn(self):
+ with mock.patch.object(self.proc, '_kill') as kill:
+ self.proc._handle_process_error()
+
+ kill.assert_has_calls(mock.call(respawning=False))
+
+ def test__handle_process_error_kills_without_respawn(self):
+ self.proc.respawn_interval = 1
+ with mock.patch.object(self.proc, '_kill') as kill:
+ with mock.patch.object(self.proc, '_spawn') as spawn:
+ with mock.patch('eventlet.sleep') as sleep:
+ self.proc._handle_process_error()
+
+ kill.assert_has_calls(mock.call(respawning=True))
+ sleep.assert_has_calls(mock.call(self.proc.respawn_interval))
+ spawn.assert_called_once()
+
+ def _test__watch_process(self, callback, kill_event):
+ self.proc._kill_event = kill_event
+ # Ensure the test times out eventually if the watcher loops endlessly
+ with eventlet.timeout.Timeout(5):
+ with mock.patch.object(self.proc,
+ '_handle_process_error') as func:
+ self.proc._watch_process(callback, kill_event)
+
+ if not kill_event.ready():
+ func.assert_called_once()
+
+ def test__watch_process_exits_on_callback_failure(self):
+ self._test__watch_process(lambda: False, eventlet.event.Event())
+
+ def test__watch_process_exits_on_exception(self):
+ def foo():
+ raise Exception('Error!')
+ self._test__watch_process(foo, eventlet.event.Event())
+
+ def test__watch_process_exits_on_sent_kill_event(self):
+ kill_event = eventlet.event.Event()
+ kill_event.send()
+ self._test__watch_process(None, kill_event)
+
+ def _test_read_output_queues_and_returns_result(self, output):
+ queue = eventlet.queue.LightQueue()
+ mock_stream = mock.Mock()
+ with mock.patch.object(mock_stream, 'readline') as mock_readline:
+ mock_readline.return_value = output
+ result = self.proc._read(mock_stream, queue)
+
+ if output:
+ self.assertEqual(output, result)
+ self.assertEqual(output, queue.get_nowait())
+ else:
+ self.assertFalse(result)
+ self.assertTrue(queue.empty())
+
+ def test__read_queues_and_returns_output(self):
+ self._test_read_output_queues_and_returns_result('foo')
+
+ def test__read_returns_none_for_missing_output(self):
+ self._test_read_output_queues_and_returns_result('')
+
+ def test_start_raises_exception_if_process_already_started(self):
+ self.proc._kill_event = True
+ with testtools.ExpectedException(async_process.AsyncProcessException):
+ self.proc.start()
+
+ def test_start_invokes__spawn(self):
+ with mock.patch.object(self.proc, '_spawn') as mock_start:
+ self.proc.start()
+
+ mock_start.assert_called_once()
+
+ def test__iter_queue_returns_empty_list_for_empty_queue(self):
+ result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
+ self.assertEqual(result, [])
+
+ def test__iter_queue_returns_queued_data(self):
+ queue = eventlet.queue.LightQueue()
+ queue.put('foo')
+ result = list(self.proc._iter_queue(queue))
+ self.assertEqual(result, ['foo'])
+
+ def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
+ expected_value = 'foo'
+ with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue:
+ mock_iter_queue.return_value = expected_value
+ target_func = getattr(self.proc, 'iter_%s' % output_type, None)
+ value = target_func()
+
+ self.assertEqual(value, expected_value)
+ queue = getattr(self.proc, '_%s_lines' % output_type, None)
+ mock_iter_queue.assert_called_with(queue)
+
+ def test_iter_stdout(self):
+ self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
+
+ def test_iter_stderr(self):
+ self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
+
+ def _test__kill(self, respawning, pid=None):
+ with mock.patch.object(self.proc, '_kill_event') as mock_kill_event:
+ with mock.patch.object(self.proc, '_get_pid_to_kill',
+ return_value=pid):
+ with mock.patch.object(self.proc,
+ '_kill_process') as mock_kill_process:
+ self.proc._kill(respawning)
+
+ if respawning:
+ self.assertIsNotNone(self.proc._kill_event)
+ else:
+ self.assertIsNone(self.proc._kill_event)
+
+ mock_kill_event.send.assert_called_once()
+ if pid:
+ mock_kill_process.assert_called_once(pid)
+
+ def test__kill_when_respawning_does_not_clear_kill_event(self):
+ self._test__kill(True)
+
+ def test__kill_when_not_respawning_clears_kill_event(self):
+ self._test__kill(False)
+
+ def test__kill_targets_process_for_pid(self):
+ self._test__kill(False, pid='1')
+
+ def _test__get_pid_to_kill(self, expected=_marker,
+ root_helper=None, pids=None):
+ if root_helper:
+ self.proc.root_helper = root_helper
+ with mock.patch.object(self.proc, '_process') as mock_process:
+ with mock.patch.object(mock_process, 'pid') as mock_pid:
+ with mock.patch.object(utils, 'find_child_pids',
+ return_value=pids):
+ actual = self.proc._get_pid_to_kill()
+ if expected is _marker:
+ expected = mock_pid
+ self.assertEqual(expected, actual)
+
+ def test__get_pid_to_kill_returns_process_pid_without_root_helper(self):
+ self._test__get_pid_to_kill()
+
+ def test__get_pid_to_kill_returns_child_pid_with_root_helper(self):
+ self._test__get_pid_to_kill(expected='1', pids=['1'], root_helper='a')
+
+ def test__get_pid_to_kill_returns_none_with_root_helper(self):
+ self._test__get_pid_to_kill(expected=None, root_helper='a')
+
+ def _test__kill_process(self, pid, expected, exception_message=None):
+ self.proc.root_helper = 'foo'
+ if exception_message:
+ exc = RuntimeError(exception_message)
+ else:
+ exc = None
+ with mock.patch.object(utils, 'execute',
+ side_effect=exc) as mock_execute:
+ actual = self.proc._kill_process(pid)
+
+ self.assertEqual(expected, actual)
+ mock_execute.assert_called_with(['kill', '-9', pid],
+ root_helper=self.proc.root_helper)
+
+ def test__kill_process_returns_true_for_valid_pid(self):
+ self._test__kill_process('1', True)
+
+ def test__kill_process_returns_true_for_stale_pid(self):
+ self._test__kill_process('1', True, 'No such process')
+
+ def test__kill_process_returns_false_for_execute_exception(self):
+ self._test__kill_process('1', False, 'Invalid')
+
+ def test_stop_calls_kill(self):
+ self.proc._kill_event = True
+ with mock.patch.object(self.proc, '_kill') as mock_kill:
+ self.proc.stop()
+ mock_kill.called_once()
+
+ def test_stop_raises_exception_if_already_started(self):
+ with testtools.ExpectedException(async_process.AsyncProcessException):
+ self.proc.stop()
import fixtures
import mock
+import testtools
from neutron.agent.linux import utils
from neutron.tests import base
ntf.assert_has_calls(expected)
chmod.assert_called_once_with('/baz', 0o644)
rename.assert_called_once_with('/baz', '/foo')
+
+
+class TestFindChildPids(base.BaseTestCase):
+
+ def test_returns_empty_list_for_exit_code_1(self):
+ with mock.patch.object(utils, 'execute',
+ side_effect=RuntimeError('Exit code: 1')):
+ self.assertEqual(utils.find_child_pids(-1), [])
+
+ def test_returns_empty_list_for_no_output(self):
+ with mock.patch.object(utils, 'execute', return_value=''):
+ self.assertEqual(utils.find_child_pids(-1), [])
+
+ def test_returns_list_of_child_process_ids_for_good_ouput(self):
+ with mock.patch.object(utils, 'execute', return_value=' 123 \n 185\n'):
+ self.assertEqual(utils.find_child_pids(-1), ['123', '185'])
+
+ def test_raises_unknown_exception(self):
+ with testtools.ExpectedException(RuntimeError):
+ with mock.patch.object(utils, 'execute',
+ side_effect=RuntimeError()):
+ utils.find_child_pids(-1)