From: Assaf Muller Date: Sun, 22 Feb 2015 00:27:44 +0000 (-0500) Subject: Allow AsyncProcess to block on process start and stop X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=7907d40075de7528122b0e900a5699b280a04901;p=openstack-build%2Fneutron-build.git Allow AsyncProcess to block on process start and stop * Move utility functions from the test tree to the non-test tree * Add tests for the newly moved functions * Use these functions in AsyncProcess This will allow the ip monitor in the following patch to start and stop in a synchronous manner. Related-Bug: #1402010 Change-Id: I03727d8acc17e561d3473b0ebecfbe49cb5523b1 --- diff --git a/neutron/agent/linux/async_process.py b/neutron/agent/linux/async_process.py index 3b65195c0..79f80ac29 100644 --- a/neutron/agent/linux/async_process.py +++ b/neutron/agent/linux/async_process.py @@ -16,6 +16,7 @@ import eventlet import eventlet.event import eventlet.queue +from neutron.agent.linux import ip_lib from neutron.agent.linux import utils from neutron.i18n import _LE from neutron.openstack.common import log as logging @@ -52,7 +53,8 @@ class AsyncProcess(object): ... print line """ - def __init__(self, cmd, run_as_root=False, respawn_interval=None): + def __init__(self, cmd, run_as_root=False, respawn_interval=None, + namespace=None): """Constructor. :param cmd: The list of command arguments to invoke. @@ -60,8 +62,11 @@ class AsyncProcess(object): :param respawn_interval: Optional, the interval in seconds to wait to respawn after unexpected process death. Respawn will only be attempted if a value of 0 or greater is provided. + :param namespace: Optional, start the command in the specified + namespace. """ - self.cmd = cmd + self.cmd_without_namespace = cmd + self.cmd = ip_lib.add_namespace_to_cmd(cmd, namespace) self.run_as_root = run_as_root if respawn_interval is not None and respawn_interval < 0: raise ValueError(_('respawn_interval must be >= 0 if provided.')) @@ -75,22 +80,45 @@ class AsyncProcess(object): self._stdout_lines = eventlet.queue.LightQueue() self._stderr_lines = eventlet.queue.LightQueue() - def start(self): - """Launch a process and monitor it asynchronously.""" + def is_active(self): + # If using sudo rootwrap as a root_helper, we have to wait until sudo + # spawns rootwrap and rootwrap spawns the process. + + return utils.pid_invoked_with_cmdline( + self.pid, self.cmd_without_namespace) + + def start(self, blocking=False): + """Launch a process and monitor it asynchronously. + + :param blocking: Block until the process has started. + :raises eventlet.timeout.Timeout if blocking is True and the process + did not start in time. + """ if self._kill_event: raise AsyncProcessException(_('Process is already started')) else: LOG.debug('Launching async process [%s].', self.cmd) self._spawn() - def stop(self): - """Halt the process and watcher threads.""" + if blocking: + utils.wait_until_true(self.is_active) + + def stop(self, blocking=False): + """Halt the process and watcher threads. + + :param blocking: Block until the process has stopped. + :raises eventlet.timeout.Timeout if blocking is True and the process + did not stop in time. + """ if self._kill_event: LOG.debug('Halting async process [%s].', self.cmd) self._kill() else: raise AsyncProcessException(_('Process is not running.')) + if blocking: + utils.wait_until_true(lambda: not self.is_active()) + def _spawn(self): """Spawn a process and its watchers.""" self._kill_event = eventlet.event.Event() @@ -107,6 +135,13 @@ class AsyncProcess(object): self._kill_event) self._watchers.append(watcher) + @property + def pid(self): + if self._process: + return utils.get_root_helper_child_pid( + self._process.pid, + run_as_root=self.run_as_root) + def _kill(self, respawning=False): """Kill the process and the associated watcher greenthreads. @@ -116,8 +151,7 @@ class AsyncProcess(object): # Halt the greenthreads self._kill_event.send() - pid = utils.get_root_helper_child_pid(self._process.pid, - run_as_root=self.run_as_root) + pid = self.pid if pid: self._kill_process(pid) diff --git a/neutron/agent/linux/ip_lib.py b/neutron/agent/linux/ip_lib.py index 29e5a4f10..b186a4fa6 100644 --- a/neutron/agent/linux/ip_lib.py +++ b/neutron/agent/linux/ip_lib.py @@ -75,10 +75,7 @@ class SubProcessBase(object): def _execute(cls, options, command, args, run_as_root=False, namespace=None, log_fail_as_error=True): opt_list = ['-%s' % o for o in options] - if namespace: - ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip'] - else: - ip_cmd = ['ip'] + ip_cmd = add_namespace_to_cmd(['ip'], namespace) cmd = ip_cmd + opt_list + [command] + list(args) return utils.execute(cmd, run_as_root=run_as_root, log_fail_as_error=log_fail_as_error) @@ -689,3 +686,9 @@ def send_garp_for_proxyarp(ns_name, iface_name, address, count): if count > 0: eventlet.spawn_n(arping_with_temporary_address) + + +def add_namespace_to_cmd(cmd, namespace=None): + """Add an optional namespace to the command.""" + + return ['ip', 'netns', 'exec', namespace] + cmd if namespace else cmd diff --git a/neutron/agent/linux/utils.py b/neutron/agent/linux/utils.py index 47686bfa2..c6f2582be 100644 --- a/neutron/agent/linux/utils.py +++ b/neutron/agent/linux/utils.py @@ -21,6 +21,7 @@ import socket import struct import tempfile +import eventlet from eventlet.green import subprocess from eventlet import greenthread from oslo_config import cfg @@ -213,3 +214,60 @@ def get_root_helper_child_pid(pid, run_as_root=False): # Last process in the tree, return it break return pid + + +def remove_abs_path(cmd): + """Remove absolute path of executable in cmd + + Note: New instance of list is returned + + :param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two']) + + """ + if cmd and os.path.isabs(cmd[0]): + cmd = list(cmd) + cmd[0] = os.path.basename(cmd[0]) + + return cmd + + +def get_cmdline_from_pid(pid): + if pid is None or not os.path.exists('/proc/%s' % pid): + return [] + with open('/proc/%s/cmdline' % pid, 'r') as f: + return f.readline().split('\0')[:-1] + + +def cmdlines_are_equal(cmd1, cmd2): + """Validate provided lists containing output of /proc/cmdline are equal + + This function ignores absolute paths of executables in order to have + correct results in case one list uses absolute path and the other does not. + """ + cmd1 = remove_abs_path(cmd1) + cmd2 = remove_abs_path(cmd2) + return cmd1 == cmd2 + + +def pid_invoked_with_cmdline(pid, expected_cmd): + """Validate process with given pid is running with provided parameters + + """ + cmdline = get_cmdline_from_pid(pid) + return cmdlines_are_equal(expected_cmd, cmdline) + + +def wait_until_true(predicate, timeout=60, sleep=1, exception=None): + """ + Wait until callable predicate is evaluated as True + + :param predicate: Callable deciding whether waiting should continue. + Best practice is to instantiate predicate with functools.partial() + :param timeout: Timeout in seconds how long should function wait. + :param sleep: Polling interval for results in seconds. + :param exception: Exception class for eventlet.Timeout. + (see doc for eventlet.Timeout for more information) + """ + with eventlet.timeout.Timeout(timeout, exception): + while not predicate(): + eventlet.sleep(sleep) diff --git a/neutron/tests/functional/agent/linux/helpers.py b/neutron/tests/functional/agent/linux/helpers.py index 46a1ea244..1dfd591c8 100644 --- a/neutron/tests/functional/agent/linux/helpers.py +++ b/neutron/tests/functional/agent/linux/helpers.py @@ -20,8 +20,6 @@ import select import shlex import subprocess -import eventlet - from neutron.agent.common import config from neutron.agent.linux import ip_lib from neutron.agent.linux import utils @@ -72,63 +70,6 @@ def get_unused_port(used, start=1024, end=65535): return random.choice(list(candidates - used)) -def wait_until_true(predicate, timeout=60, sleep=1, exception=None): - """ - Wait until callable predicate is evaluated as True - - :param predicate: Callable deciding whether waiting should continue. - Best practice is to instantiate predicate with functools.partial() - :param timeout: Timeout in seconds how long should function wait. - :param sleep: Polling interval for results in seconds. - :param exception: Exception class for eventlet.Timeout. - (see doc for eventlet.Timeout for more information) - """ - with eventlet.timeout.Timeout(timeout, exception): - while not predicate(): - eventlet.sleep(sleep) - - -def remove_abs_path(cmd): - """Remove absolute path of executable in cmd - - Note: New instance of list is returned - - :param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two']) - - """ - if cmd and os.path.isabs(cmd[0]): - cmd = list(cmd) - cmd[0] = os.path.basename(cmd[0]) - - return cmd - - -def get_cmdline_from_pid(pid): - if pid is None or not os.path.exists('/proc/%s' % pid): - return list() - with open('/proc/%s/cmdline' % pid, 'r') as f: - return f.readline().split('\0')[:-1] - - -def cmdlines_are_equal(cmd1, cmd2): - """Validate provided lists containing output of /proc/cmdline are equal - - This function ignores absolute paths of executables in order to have - correct results in case one list uses absolute path and the other does not. - """ - cmd1 = remove_abs_path(cmd1) - cmd2 = remove_abs_path(cmd2) - return cmd1 == cmd2 - - -def pid_invoked_with_cmdline(pid, expected_cmd): - """Validate process with given pid is running with provided parameters - - """ - cmdline = get_cmdline_from_pid(pid) - return cmdlines_are_equal(expected_cmd, cmdline) - - class Pinger(object): def __init__(self, namespace, timeout=1, max_attempts=1): self.namespace = namespace @@ -182,9 +123,9 @@ class RootHelperProcess(subprocess.Popen): poller = select.poll() poller.register(stream.fileno()) poll_predicate = functools.partial(poller.poll, 1) - wait_until_true(poll_predicate, timeout, 0.1, - RuntimeError( - 'No output in %.2f seconds' % timeout)) + utils.wait_until_true(poll_predicate, timeout, 0.1, + RuntimeError( + 'No output in %.2f seconds' % timeout)) return stream.readline() def writeline(self, data): @@ -196,10 +137,10 @@ class RootHelperProcess(subprocess.Popen): def child_is_running(): child_pid = utils.get_root_helper_child_pid( self.pid, run_as_root=self.run_as_root) - if pid_invoked_with_cmdline(child_pid, self.cmd): + if utils.pid_invoked_with_cmdline(child_pid, self.cmd): return True - wait_until_true( + utils.wait_until_true( child_is_running, timeout, exception=RuntimeError("Process %s hasn't been spawned " diff --git a/neutron/tests/functional/agent/linux/test_async_process.py b/neutron/tests/functional/agent/linux/test_async_process.py index 20ef5fc9b..afb9d354a 100644 --- a/neutron/tests/functional/agent/linux/test_async_process.py +++ b/neutron/tests/functional/agent/linux/test_async_process.py @@ -17,14 +17,13 @@ import eventlet from six import moves from neutron.agent.linux import async_process -from neutron.agent.linux import utils from neutron.tests import base -class TestAsyncProcess(base.BaseTestCase): +class AsyncProcessTestFramework(base.BaseTestCase): def setUp(self): - super(TestAsyncProcess, self).setUp() + super(AsyncProcessTestFramework, self).setUp() self.test_file_path = self.get_temp_file_path('test_async_process.tmp') self.data = [str(x) for x in moves.xrange(4)] with file(self.test_file_path, 'w') as f: @@ -39,12 +38,21 @@ class TestAsyncProcess(base.BaseTestCase): output += new_output eventlet.sleep(0.01) + +class TestAsyncProcess(AsyncProcessTestFramework): + def _safe_stop(self, proc): + try: + proc.stop() + except async_process.AsyncProcessException: + pass + def test_stopping_async_process_lifecycle(self): proc = async_process.AsyncProcess(['tail', '-f', self.test_file_path]) - proc.start() + self.addCleanup(self._safe_stop, proc) + proc.start(blocking=True) self._check_stdout(proc) - proc.stop() + proc.stop(blocking=True) # Ensure that the process and greenthreads have stopped proc._process.wait() @@ -56,12 +64,10 @@ class TestAsyncProcess(base.BaseTestCase): proc = async_process.AsyncProcess(['tail', '-f', self.test_file_path], respawn_interval=0) + self.addCleanup(self._safe_stop, proc) proc.start() # Ensure that the same output is read twice self._check_stdout(proc) - pid = utils.get_root_helper_child_pid(proc._process.pid, - proc.run_as_root) - proc._kill_process(pid) + proc._kill_process(proc.pid) self._check_stdout(proc) - proc.stop() diff --git a/neutron/tests/functional/agent/linux/test_utils.py b/neutron/tests/functional/agent/linux/test_utils.py new file mode 100644 index 000000000..a9e867181 --- /dev/null +++ b/neutron/tests/functional/agent/linux/test_utils.py @@ -0,0 +1,40 @@ +# Copyright 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import testtools + +from neutron.agent.linux import async_process +from neutron.agent.linux import utils +from neutron.tests.functional.agent.linux import test_async_process + + +class TestPIDHelpers(test_async_process.AsyncProcessTestFramework): + def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self): + cmd = ['tail', '-f', self.test_file_path] + proc = async_process.AsyncProcess(cmd) + proc.start(blocking=True) + self.addCleanup(proc.stop) + + pid = proc.pid + self.assertEqual(cmd, utils.get_cmdline_from_pid(pid)) + self.assertTrue(utils.pid_invoked_with_cmdline(pid, cmd)) + self.assertEqual([], utils.get_cmdline_from_pid(-1)) + + def test_wait_until_true_predicate_succeeds(self): + utils.wait_until_true(lambda: True) + + def test_wait_until_true_predicate_fails(self): + with testtools.ExpectedException(eventlet.timeout.Timeout): + utils.wait_until_true(lambda: False, 2) diff --git a/neutron/tests/functional/agent/test_l3_agent.py b/neutron/tests/functional/agent/test_l3_agent.py index 5217d3c2a..4b4ebd2d8 100755 --- a/neutron/tests/functional/agent/test_l3_agent.py +++ b/neutron/tests/functional/agent/test_l3_agent.py @@ -31,6 +31,7 @@ from neutron.agent.linux import dhcp from neutron.agent.linux import external_process from neutron.agent.linux import ip_lib from neutron.agent.linux import ovs_lib +from neutron.agent.linux import utils from neutron.agent.metadata import agent as metadata_agent from neutron.common import config as common_config from neutron.common import constants as l3_constants @@ -395,7 +396,7 @@ class L3AgentTestCase(L3AgentTestFramework): port = router.get_ex_gw_port() interface_name = self.agent.get_external_device_name(port['id']) self._assert_no_ip_addresses_on_interface(router, interface_name) - helpers.wait_until_true(lambda: router.ha_state == 'master') + utils.wait_until_true(lambda: router.ha_state == 'master') # Keepalived notifies of a state transition when it starts, # not when it ends. Thus, we have to wait until keepalived finishes @@ -407,7 +408,7 @@ class L3AgentTestCase(L3AgentTestFramework): device, self.agent.get_internal_device_name, router.ns_name) - helpers.wait_until_true(device_exists) + utils.wait_until_true(device_exists) self.assertTrue(self._namespace_exists(router.ns_name)) self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router)) @@ -478,7 +479,7 @@ class L3AgentTestCase(L3AgentTestFramework): restarted_agent = l3_test_agent.TestL3NATAgent(self.agent.host, self.agent.conf) self._create_router(restarted_agent, router1.router) - helpers.wait_until_true(lambda: self._floating_ips_configured(router1)) + utils.wait_until_true(lambda: self._floating_ips_configured(router1)) class L3HATestFramework(L3AgentTestFramework): @@ -506,16 +507,16 @@ class L3HATestFramework(L3AgentTestFramework): router2 = self.manage_router(self.failover_agent, router_info_2) - helpers.wait_until_true(lambda: router1.ha_state == 'master') - helpers.wait_until_true(lambda: router2.ha_state == 'backup') + utils.wait_until_true(lambda: router1.ha_state == 'master') + utils.wait_until_true(lambda: router2.ha_state == 'backup') device_name = router1.get_ha_device_name( router1.router[l3_constants.HA_INTERFACE_KEY]['id']) ha_device = ip_lib.IPDevice(device_name, namespace=router1.ns_name) ha_device.link.set_down() - helpers.wait_until_true(lambda: router2.ha_state == 'master') - helpers.wait_until_true(lambda: router1.ha_state == 'fault') + utils.wait_until_true(lambda: router2.ha_state == 'master') + utils.wait_until_true(lambda: router1.ha_state == 'fault') class MetadataFakeProxyHandler(object): diff --git a/neutron/tests/unit/agent/linux/test_utils.py b/neutron/tests/unit/agent/linux/test_utils.py index ed3d2c9bc..79166c558 100644 --- a/neutron/tests/unit/agent/linux/test_utils.py +++ b/neutron/tests/unit/agent/linux/test_utils.py @@ -187,3 +187,19 @@ class TestGetRoothelperChildPid(base.BaseTestCase): def test_returns_none_as_root(self): self._test_get_root_helper_child_pid(expected=None, run_as_root=True) + + +class TestPathUtilities(base.BaseTestCase): + def test_remove_abs_path(self): + self.assertEqual(['ping', '8.8.8.8'], + utils.remove_abs_path(['/usr/bin/ping', '8.8.8.8'])) + + def test_cmdlines_are_equal(self): + self.assertTrue(utils.cmdlines_are_equal( + ['ping', '8.8.8.8'], + ['/usr/bin/ping', '8.8.8.8'])) + + def test_cmdlines_are_equal_different_commands(self): + self.assertFalse(utils.cmdlines_are_equal( + ['ping', '8.8.8.8'], + ['/usr/bin/ping6', '8.8.8.8'])) diff --git a/neutron/tests/unit/test_linux_ip_lib.py b/neutron/tests/unit/test_linux_ip_lib.py index 2fdfd1ada..f20306075 100644 --- a/neutron/tests/unit/test_linux_ip_lib.py +++ b/neutron/tests/unit/test_linux_ip_lib.py @@ -1002,3 +1002,14 @@ class TestArpPing(TestIPCmdBase): # If this was called then check_added_address probably had a assert self.assertFalse(device.addr.add.called) + + +class TestAddNamespaceToCmd(base.BaseTestCase): + def test_add_namespace_to_cmd_with_namespace(self): + cmd = ['ping', '8.8.8.8'] + self.assertEqual(['ip', 'netns', 'exec', 'tmp'] + cmd, + ip_lib.add_namespace_to_cmd(cmd, 'tmp')) + + def test_add_namespace_to_cmd_without_namespace(self): + cmd = ['ping', '8.8.8.8'] + self.assertEqual(cmd, ip_lib.add_namespace_to_cmd(cmd, None))