]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Allow AsyncProcess to block on process start and stop
authorAssaf Muller <amuller@redhat.com>
Sun, 22 Feb 2015 00:27:44 +0000 (19:27 -0500)
committerAssaf Muller <amuller@redhat.com>
Fri, 27 Feb 2015 13:47:50 +0000 (08:47 -0500)
* Move utility functions from the test tree to the non-test tree
* Add tests for the newly moved functions
* Use these functions in AsyncProcess

This will allow the ip monitor in the following patch to start
and stop in a synchronous manner.

Related-Bug: #1402010
Change-Id: I03727d8acc17e561d3473b0ebecfbe49cb5523b1

neutron/agent/linux/async_process.py
neutron/agent/linux/ip_lib.py
neutron/agent/linux/utils.py
neutron/tests/functional/agent/linux/helpers.py
neutron/tests/functional/agent/linux/test_async_process.py
neutron/tests/functional/agent/linux/test_utils.py [new file with mode: 0644]
neutron/tests/functional/agent/test_l3_agent.py
neutron/tests/unit/agent/linux/test_utils.py
neutron/tests/unit/test_linux_ip_lib.py

index 3b65195c0648d284811640ac8f78aa5b55358e9b..79f80ac298f7b572fcd22bbb372369b12ed9496a 100644 (file)
@@ -16,6 +16,7 @@ import eventlet
 import eventlet.event
 import eventlet.queue
 
+from neutron.agent.linux import ip_lib
 from neutron.agent.linux import utils
 from neutron.i18n import _LE
 from neutron.openstack.common import log as logging
@@ -52,7 +53,8 @@ class AsyncProcess(object):
     ...     print line
     """
 
-    def __init__(self, cmd, run_as_root=False, respawn_interval=None):
+    def __init__(self, cmd, run_as_root=False, respawn_interval=None,
+                 namespace=None):
         """Constructor.
 
         :param cmd: The list of command arguments to invoke.
@@ -60,8 +62,11 @@ class AsyncProcess(object):
         :param respawn_interval: Optional, the interval in seconds to wait
                to respawn after unexpected process death. Respawn will
                only be attempted if a value of 0 or greater is provided.
+        :param namespace: Optional, start the command in the specified
+               namespace.
         """
-        self.cmd = cmd
+        self.cmd_without_namespace = cmd
+        self.cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
         self.run_as_root = run_as_root
         if respawn_interval is not None and respawn_interval < 0:
             raise ValueError(_('respawn_interval must be >= 0 if provided.'))
@@ -75,22 +80,45 @@ class AsyncProcess(object):
         self._stdout_lines = eventlet.queue.LightQueue()
         self._stderr_lines = eventlet.queue.LightQueue()
 
-    def start(self):
-        """Launch a process and monitor it asynchronously."""
+    def is_active(self):
+        # If using sudo rootwrap as a root_helper, we have to wait until sudo
+        # spawns rootwrap and rootwrap spawns the process.
+
+        return utils.pid_invoked_with_cmdline(
+            self.pid, self.cmd_without_namespace)
+
+    def start(self, blocking=False):
+        """Launch a process and monitor it asynchronously.
+
+        :param blocking: Block until the process has started.
+        :raises eventlet.timeout.Timeout if blocking is True and the process
+                did not start in time.
+        """
         if self._kill_event:
             raise AsyncProcessException(_('Process is already started'))
         else:
             LOG.debug('Launching async process [%s].', self.cmd)
             self._spawn()
 
-    def stop(self):
-        """Halt the process and watcher threads."""
+        if blocking:
+            utils.wait_until_true(self.is_active)
+
+    def stop(self, blocking=False):
+        """Halt the process and watcher threads.
+
+        :param blocking: Block until the process has stopped.
+        :raises eventlet.timeout.Timeout if blocking is True and the process
+                did not stop in time.
+        """
         if self._kill_event:
             LOG.debug('Halting async process [%s].', self.cmd)
             self._kill()
         else:
             raise AsyncProcessException(_('Process is not running.'))
 
+        if blocking:
+            utils.wait_until_true(lambda: not self.is_active())
+
     def _spawn(self):
         """Spawn a process and its watchers."""
         self._kill_event = eventlet.event.Event()
@@ -107,6 +135,13 @@ class AsyncProcess(object):
                                      self._kill_event)
             self._watchers.append(watcher)
 
+    @property
+    def pid(self):
+        if self._process:
+            return utils.get_root_helper_child_pid(
+                self._process.pid,
+                run_as_root=self.run_as_root)
+
     def _kill(self, respawning=False):
         """Kill the process and the associated watcher greenthreads.
 
@@ -116,8 +151,7 @@ class AsyncProcess(object):
         # Halt the greenthreads
         self._kill_event.send()
 
-        pid = utils.get_root_helper_child_pid(self._process.pid,
-                                              run_as_root=self.run_as_root)
+        pid = self.pid
         if pid:
             self._kill_process(pid)
 
index 29e5a4f109e5b54e7a2a2e9ebc32b54b705940cf..b186a4fa6067f2bf84713fa6b73c7049d5849e15 100644 (file)
@@ -75,10 +75,7 @@ class SubProcessBase(object):
     def _execute(cls, options, command, args, run_as_root=False,
                  namespace=None, log_fail_as_error=True):
         opt_list = ['-%s' % o for o in options]
-        if namespace:
-            ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip']
-        else:
-            ip_cmd = ['ip']
+        ip_cmd = add_namespace_to_cmd(['ip'], namespace)
         cmd = ip_cmd + opt_list + [command] + list(args)
         return utils.execute(cmd, run_as_root=run_as_root,
                              log_fail_as_error=log_fail_as_error)
@@ -689,3 +686,9 @@ def send_garp_for_proxyarp(ns_name, iface_name, address, count):
 
     if count > 0:
         eventlet.spawn_n(arping_with_temporary_address)
+
+
+def add_namespace_to_cmd(cmd, namespace=None):
+    """Add an optional namespace to the command."""
+
+    return ['ip', 'netns', 'exec', namespace] + cmd if namespace else cmd
index 47686bfa2fa47308133196858fc59f6fab2e32a4..c6f2582befd589ad2e986e349677fedaa269b955 100644 (file)
@@ -21,6 +21,7 @@ import socket
 import struct
 import tempfile
 
+import eventlet
 from eventlet.green import subprocess
 from eventlet import greenthread
 from oslo_config import cfg
@@ -213,3 +214,60 @@ def get_root_helper_child_pid(pid, run_as_root=False):
                 # Last process in the tree, return it
                 break
     return pid
+
+
+def remove_abs_path(cmd):
+    """Remove absolute path of executable in cmd
+
+    Note: New instance of list is returned
+
+    :param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two'])
+
+    """
+    if cmd and os.path.isabs(cmd[0]):
+        cmd = list(cmd)
+        cmd[0] = os.path.basename(cmd[0])
+
+    return cmd
+
+
+def get_cmdline_from_pid(pid):
+    if pid is None or not os.path.exists('/proc/%s' % pid):
+        return []
+    with open('/proc/%s/cmdline' % pid, 'r') as f:
+        return f.readline().split('\0')[:-1]
+
+
+def cmdlines_are_equal(cmd1, cmd2):
+    """Validate provided lists containing output of /proc/cmdline are equal
+
+    This function ignores absolute paths of executables in order to have
+    correct results in case one list uses absolute path and the other does not.
+    """
+    cmd1 = remove_abs_path(cmd1)
+    cmd2 = remove_abs_path(cmd2)
+    return cmd1 == cmd2
+
+
+def pid_invoked_with_cmdline(pid, expected_cmd):
+    """Validate process with given pid is running with provided parameters
+
+    """
+    cmdline = get_cmdline_from_pid(pid)
+    return cmdlines_are_equal(expected_cmd, cmdline)
+
+
+def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
+    """
+    Wait until callable predicate is evaluated as True
+
+    :param predicate: Callable deciding whether waiting should continue.
+    Best practice is to instantiate predicate with functools.partial()
+    :param timeout: Timeout in seconds how long should function wait.
+    :param sleep: Polling interval for results in seconds.
+    :param exception: Exception class for eventlet.Timeout.
+    (see doc for eventlet.Timeout for more information)
+    """
+    with eventlet.timeout.Timeout(timeout, exception):
+        while not predicate():
+            eventlet.sleep(sleep)
index 46a1ea2446177b301e70c092d806b4ba71660464..1dfd591c82f2ca73e137e14d70a2e90d72f36a6c 100644 (file)
@@ -20,8 +20,6 @@ import select
 import shlex
 import subprocess
 
-import eventlet
-
 from neutron.agent.common import config
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import utils
@@ -72,63 +70,6 @@ def get_unused_port(used, start=1024, end=65535):
     return random.choice(list(candidates - used))
 
 
-def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
-    """
-    Wait until callable predicate is evaluated as True
-
-    :param predicate: Callable deciding whether waiting should continue.
-    Best practice is to instantiate predicate with functools.partial()
-    :param timeout: Timeout in seconds how long should function wait.
-    :param sleep: Polling interval for results in seconds.
-    :param exception: Exception class for eventlet.Timeout.
-    (see doc for eventlet.Timeout for more information)
-    """
-    with eventlet.timeout.Timeout(timeout, exception):
-        while not predicate():
-            eventlet.sleep(sleep)
-
-
-def remove_abs_path(cmd):
-    """Remove absolute path of executable in cmd
-
-    Note: New instance of list is returned
-
-    :param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two'])
-
-    """
-    if cmd and os.path.isabs(cmd[0]):
-        cmd = list(cmd)
-        cmd[0] = os.path.basename(cmd[0])
-
-    return cmd
-
-
-def get_cmdline_from_pid(pid):
-    if pid is None or not os.path.exists('/proc/%s' % pid):
-        return list()
-    with open('/proc/%s/cmdline' % pid, 'r') as f:
-        return f.readline().split('\0')[:-1]
-
-
-def cmdlines_are_equal(cmd1, cmd2):
-    """Validate provided lists containing output of /proc/cmdline are equal
-
-    This function ignores absolute paths of executables in order to have
-    correct results in case one list uses absolute path and the other does not.
-    """
-    cmd1 = remove_abs_path(cmd1)
-    cmd2 = remove_abs_path(cmd2)
-    return cmd1 == cmd2
-
-
-def pid_invoked_with_cmdline(pid, expected_cmd):
-    """Validate process with given pid is running with provided parameters
-
-    """
-    cmdline = get_cmdline_from_pid(pid)
-    return cmdlines_are_equal(expected_cmd, cmdline)
-
-
 class Pinger(object):
     def __init__(self, namespace, timeout=1, max_attempts=1):
         self.namespace = namespace
@@ -182,9 +123,9 @@ class RootHelperProcess(subprocess.Popen):
             poller = select.poll()
             poller.register(stream.fileno())
             poll_predicate = functools.partial(poller.poll, 1)
-            wait_until_true(poll_predicate, timeout, 0.1,
-                            RuntimeError(
-                                'No output in %.2f seconds' % timeout))
+            utils.wait_until_true(poll_predicate, timeout, 0.1,
+                                  RuntimeError(
+                                      'No output in %.2f seconds' % timeout))
         return stream.readline()
 
     def writeline(self, data):
@@ -196,10 +137,10 @@ class RootHelperProcess(subprocess.Popen):
         def child_is_running():
             child_pid = utils.get_root_helper_child_pid(
                 self.pid, run_as_root=self.run_as_root)
-            if pid_invoked_with_cmdline(child_pid, self.cmd):
+            if utils.pid_invoked_with_cmdline(child_pid, self.cmd):
                 return True
 
-        wait_until_true(
+        utils.wait_until_true(
             child_is_running,
             timeout,
             exception=RuntimeError("Process %s hasn't been spawned "
index 20ef5fc9be4f289b80291f5371c9b68fd91ff6ae..afb9d354a6fc4c62656a5813ad9329d5b8c925a7 100644 (file)
@@ -17,14 +17,13 @@ import eventlet
 from six import moves
 
 from neutron.agent.linux import async_process
-from neutron.agent.linux import utils
 from neutron.tests import base
 
 
-class TestAsyncProcess(base.BaseTestCase):
+class AsyncProcessTestFramework(base.BaseTestCase):
 
     def setUp(self):
-        super(TestAsyncProcess, self).setUp()
+        super(AsyncProcessTestFramework, self).setUp()
         self.test_file_path = self.get_temp_file_path('test_async_process.tmp')
         self.data = [str(x) for x in moves.xrange(4)]
         with file(self.test_file_path, 'w') as f:
@@ -39,12 +38,21 @@ class TestAsyncProcess(base.BaseTestCase):
                 output += new_output
             eventlet.sleep(0.01)
 
+
+class TestAsyncProcess(AsyncProcessTestFramework):
+    def _safe_stop(self, proc):
+        try:
+            proc.stop()
+        except async_process.AsyncProcessException:
+            pass
+
     def test_stopping_async_process_lifecycle(self):
         proc = async_process.AsyncProcess(['tail', '-f',
                                            self.test_file_path])
-        proc.start()
+        self.addCleanup(self._safe_stop, proc)
+        proc.start(blocking=True)
         self._check_stdout(proc)
-        proc.stop()
+        proc.stop(blocking=True)
 
         # Ensure that the process and greenthreads have stopped
         proc._process.wait()
@@ -56,12 +64,10 @@ class TestAsyncProcess(base.BaseTestCase):
         proc = async_process.AsyncProcess(['tail', '-f',
                                            self.test_file_path],
                                           respawn_interval=0)
+        self.addCleanup(self._safe_stop, proc)
         proc.start()
 
         # Ensure that the same output is read twice
         self._check_stdout(proc)
-        pid = utils.get_root_helper_child_pid(proc._process.pid,
-                                              proc.run_as_root)
-        proc._kill_process(pid)
+        proc._kill_process(proc.pid)
         self._check_stdout(proc)
-        proc.stop()
diff --git a/neutron/tests/functional/agent/linux/test_utils.py b/neutron/tests/functional/agent/linux/test_utils.py
new file mode 100644 (file)
index 0000000..a9e8671
--- /dev/null
@@ -0,0 +1,40 @@
+# Copyright 2015 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet
+import testtools
+
+from neutron.agent.linux import async_process
+from neutron.agent.linux import utils
+from neutron.tests.functional.agent.linux import test_async_process
+
+
+class TestPIDHelpers(test_async_process.AsyncProcessTestFramework):
+    def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self):
+        cmd = ['tail', '-f', self.test_file_path]
+        proc = async_process.AsyncProcess(cmd)
+        proc.start(blocking=True)
+        self.addCleanup(proc.stop)
+
+        pid = proc.pid
+        self.assertEqual(cmd, utils.get_cmdline_from_pid(pid))
+        self.assertTrue(utils.pid_invoked_with_cmdline(pid, cmd))
+        self.assertEqual([], utils.get_cmdline_from_pid(-1))
+
+    def test_wait_until_true_predicate_succeeds(self):
+        utils.wait_until_true(lambda: True)
+
+    def test_wait_until_true_predicate_fails(self):
+        with testtools.ExpectedException(eventlet.timeout.Timeout):
+            utils.wait_until_true(lambda: False, 2)
index 5217d3c2a385b3b9ab9526730ee0ec7ba31f30a4..4b4ebd2d8a68544b32a2a2529eced3b9f36e2586 100755 (executable)
@@ -31,6 +31,7 @@ from neutron.agent.linux import dhcp
 from neutron.agent.linux import external_process
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import ovs_lib
+from neutron.agent.linux import utils
 from neutron.agent.metadata import agent as metadata_agent
 from neutron.common import config as common_config
 from neutron.common import constants as l3_constants
@@ -395,7 +396,7 @@ class L3AgentTestCase(L3AgentTestFramework):
             port = router.get_ex_gw_port()
             interface_name = self.agent.get_external_device_name(port['id'])
             self._assert_no_ip_addresses_on_interface(router, interface_name)
-            helpers.wait_until_true(lambda: router.ha_state == 'master')
+            utils.wait_until_true(lambda: router.ha_state == 'master')
 
             # Keepalived notifies of a state transition when it starts,
             # not when it ends. Thus, we have to wait until keepalived finishes
@@ -407,7 +408,7 @@ class L3AgentTestCase(L3AgentTestFramework):
                 device,
                 self.agent.get_internal_device_name,
                 router.ns_name)
-            helpers.wait_until_true(device_exists)
+            utils.wait_until_true(device_exists)
 
         self.assertTrue(self._namespace_exists(router.ns_name))
         self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
@@ -478,7 +479,7 @@ class L3AgentTestCase(L3AgentTestFramework):
         restarted_agent = l3_test_agent.TestL3NATAgent(self.agent.host,
                                                        self.agent.conf)
         self._create_router(restarted_agent, router1.router)
-        helpers.wait_until_true(lambda: self._floating_ips_configured(router1))
+        utils.wait_until_true(lambda: self._floating_ips_configured(router1))
 
 
 class L3HATestFramework(L3AgentTestFramework):
@@ -506,16 +507,16 @@ class L3HATestFramework(L3AgentTestFramework):
 
         router2 = self.manage_router(self.failover_agent, router_info_2)
 
-        helpers.wait_until_true(lambda: router1.ha_state == 'master')
-        helpers.wait_until_true(lambda: router2.ha_state == 'backup')
+        utils.wait_until_true(lambda: router1.ha_state == 'master')
+        utils.wait_until_true(lambda: router2.ha_state == 'backup')
 
         device_name = router1.get_ha_device_name(
             router1.router[l3_constants.HA_INTERFACE_KEY]['id'])
         ha_device = ip_lib.IPDevice(device_name, namespace=router1.ns_name)
         ha_device.link.set_down()
 
-        helpers.wait_until_true(lambda: router2.ha_state == 'master')
-        helpers.wait_until_true(lambda: router1.ha_state == 'fault')
+        utils.wait_until_true(lambda: router2.ha_state == 'master')
+        utils.wait_until_true(lambda: router1.ha_state == 'fault')
 
 
 class MetadataFakeProxyHandler(object):
index ed3d2c9bc32c5909c75b45b1b86b04416001e240..79166c558dd2a8e93ef6a3bfb955b3c85439c76a 100644 (file)
@@ -187,3 +187,19 @@ class TestGetRoothelperChildPid(base.BaseTestCase):
 
     def test_returns_none_as_root(self):
         self._test_get_root_helper_child_pid(expected=None, run_as_root=True)
+
+
+class TestPathUtilities(base.BaseTestCase):
+    def test_remove_abs_path(self):
+        self.assertEqual(['ping', '8.8.8.8'],
+                         utils.remove_abs_path(['/usr/bin/ping', '8.8.8.8']))
+
+    def test_cmdlines_are_equal(self):
+        self.assertTrue(utils.cmdlines_are_equal(
+            ['ping', '8.8.8.8'],
+            ['/usr/bin/ping', '8.8.8.8']))
+
+    def test_cmdlines_are_equal_different_commands(self):
+        self.assertFalse(utils.cmdlines_are_equal(
+            ['ping', '8.8.8.8'],
+            ['/usr/bin/ping6', '8.8.8.8']))
index 2fdfd1adac016b23452d1b379a2a62581150a974..f20306075bebd930a6444a3435c8927e41b8e126 100644 (file)
@@ -1002,3 +1002,14 @@ class TestArpPing(TestIPCmdBase):
 
         # If this was called then check_added_address probably had a assert
         self.assertFalse(device.addr.add.called)
+
+
+class TestAddNamespaceToCmd(base.BaseTestCase):
+    def test_add_namespace_to_cmd_with_namespace(self):
+        cmd = ['ping', '8.8.8.8']
+        self.assertEqual(['ip', 'netns', 'exec', 'tmp'] + cmd,
+                         ip_lib.add_namespace_to_cmd(cmd, 'tmp'))
+
+    def test_add_namespace_to_cmd_without_namespace(self):
+        cmd = ['ping', '8.8.8.8']
+        self.assertEqual(cmd, ip_lib.add_namespace_to_cmd(cmd, None))