import eventlet.event
import eventlet.queue
+from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.i18n import _LE
from neutron.openstack.common import log as logging
... print line
"""
- def __init__(self, cmd, run_as_root=False, respawn_interval=None):
+ def __init__(self, cmd, run_as_root=False, respawn_interval=None,
+ namespace=None):
"""Constructor.
:param cmd: The list of command arguments to invoke.
:param respawn_interval: Optional, the interval in seconds to wait
to respawn after unexpected process death. Respawn will
only be attempted if a value of 0 or greater is provided.
+ :param namespace: Optional, start the command in the specified
+ namespace.
"""
- self.cmd = cmd
+ self.cmd_without_namespace = cmd
+ self.cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
self.run_as_root = run_as_root
if respawn_interval is not None and respawn_interval < 0:
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
self._stdout_lines = eventlet.queue.LightQueue()
self._stderr_lines = eventlet.queue.LightQueue()
- def start(self):
- """Launch a process and monitor it asynchronously."""
+ def is_active(self):
+ # If using sudo rootwrap as a root_helper, we have to wait until sudo
+ # spawns rootwrap and rootwrap spawns the process.
+
+ return utils.pid_invoked_with_cmdline(
+ self.pid, self.cmd_without_namespace)
+
+ def start(self, blocking=False):
+ """Launch a process and monitor it asynchronously.
+
+ :param blocking: Block until the process has started.
+ :raises eventlet.timeout.Timeout if blocking is True and the process
+ did not start in time.
+ """
if self._kill_event:
raise AsyncProcessException(_('Process is already started'))
else:
LOG.debug('Launching async process [%s].', self.cmd)
self._spawn()
- def stop(self):
- """Halt the process and watcher threads."""
+ if blocking:
+ utils.wait_until_true(self.is_active)
+
+ def stop(self, blocking=False):
+ """Halt the process and watcher threads.
+
+ :param blocking: Block until the process has stopped.
+ :raises eventlet.timeout.Timeout if blocking is True and the process
+ did not stop in time.
+ """
if self._kill_event:
LOG.debug('Halting async process [%s].', self.cmd)
self._kill()
else:
raise AsyncProcessException(_('Process is not running.'))
+ if blocking:
+ utils.wait_until_true(lambda: not self.is_active())
+
def _spawn(self):
"""Spawn a process and its watchers."""
self._kill_event = eventlet.event.Event()
self._kill_event)
self._watchers.append(watcher)
+ @property
+ def pid(self):
+ if self._process:
+ return utils.get_root_helper_child_pid(
+ self._process.pid,
+ run_as_root=self.run_as_root)
+
def _kill(self, respawning=False):
"""Kill the process and the associated watcher greenthreads.
# Halt the greenthreads
self._kill_event.send()
- pid = utils.get_root_helper_child_pid(self._process.pid,
- run_as_root=self.run_as_root)
+ pid = self.pid
if pid:
self._kill_process(pid)
def _execute(cls, options, command, args, run_as_root=False,
namespace=None, log_fail_as_error=True):
opt_list = ['-%s' % o for o in options]
- if namespace:
- ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip']
- else:
- ip_cmd = ['ip']
+ ip_cmd = add_namespace_to_cmd(['ip'], namespace)
cmd = ip_cmd + opt_list + [command] + list(args)
return utils.execute(cmd, run_as_root=run_as_root,
log_fail_as_error=log_fail_as_error)
if count > 0:
eventlet.spawn_n(arping_with_temporary_address)
+
+
+def add_namespace_to_cmd(cmd, namespace=None):
+ """Add an optional namespace to the command."""
+
+ return ['ip', 'netns', 'exec', namespace] + cmd if namespace else cmd
import struct
import tempfile
+import eventlet
from eventlet.green import subprocess
from eventlet import greenthread
from oslo_config import cfg
# Last process in the tree, return it
break
return pid
+
+
+def remove_abs_path(cmd):
+ """Remove absolute path of executable in cmd
+
+ Note: New instance of list is returned
+
+ :param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two'])
+
+ """
+ if cmd and os.path.isabs(cmd[0]):
+ cmd = list(cmd)
+ cmd[0] = os.path.basename(cmd[0])
+
+ return cmd
+
+
+def get_cmdline_from_pid(pid):
+ if pid is None or not os.path.exists('/proc/%s' % pid):
+ return []
+ with open('/proc/%s/cmdline' % pid, 'r') as f:
+ return f.readline().split('\0')[:-1]
+
+
+def cmdlines_are_equal(cmd1, cmd2):
+ """Validate provided lists containing output of /proc/cmdline are equal
+
+ This function ignores absolute paths of executables in order to have
+ correct results in case one list uses absolute path and the other does not.
+ """
+ cmd1 = remove_abs_path(cmd1)
+ cmd2 = remove_abs_path(cmd2)
+ return cmd1 == cmd2
+
+
+def pid_invoked_with_cmdline(pid, expected_cmd):
+ """Validate process with given pid is running with provided parameters
+
+ """
+ cmdline = get_cmdline_from_pid(pid)
+ return cmdlines_are_equal(expected_cmd, cmdline)
+
+
+def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
+ """
+ Wait until callable predicate is evaluated as True
+
+ :param predicate: Callable deciding whether waiting should continue.
+ Best practice is to instantiate predicate with functools.partial()
+ :param timeout: Timeout in seconds how long should function wait.
+ :param sleep: Polling interval for results in seconds.
+ :param exception: Exception class for eventlet.Timeout.
+ (see doc for eventlet.Timeout for more information)
+ """
+ with eventlet.timeout.Timeout(timeout, exception):
+ while not predicate():
+ eventlet.sleep(sleep)
import shlex
import subprocess
-import eventlet
-
from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
return random.choice(list(candidates - used))
-def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
- """
- Wait until callable predicate is evaluated as True
-
- :param predicate: Callable deciding whether waiting should continue.
- Best practice is to instantiate predicate with functools.partial()
- :param timeout: Timeout in seconds how long should function wait.
- :param sleep: Polling interval for results in seconds.
- :param exception: Exception class for eventlet.Timeout.
- (see doc for eventlet.Timeout for more information)
- """
- with eventlet.timeout.Timeout(timeout, exception):
- while not predicate():
- eventlet.sleep(sleep)
-
-
-def remove_abs_path(cmd):
- """Remove absolute path of executable in cmd
-
- Note: New instance of list is returned
-
- :param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two'])
-
- """
- if cmd and os.path.isabs(cmd[0]):
- cmd = list(cmd)
- cmd[0] = os.path.basename(cmd[0])
-
- return cmd
-
-
-def get_cmdline_from_pid(pid):
- if pid is None or not os.path.exists('/proc/%s' % pid):
- return list()
- with open('/proc/%s/cmdline' % pid, 'r') as f:
- return f.readline().split('\0')[:-1]
-
-
-def cmdlines_are_equal(cmd1, cmd2):
- """Validate provided lists containing output of /proc/cmdline are equal
-
- This function ignores absolute paths of executables in order to have
- correct results in case one list uses absolute path and the other does not.
- """
- cmd1 = remove_abs_path(cmd1)
- cmd2 = remove_abs_path(cmd2)
- return cmd1 == cmd2
-
-
-def pid_invoked_with_cmdline(pid, expected_cmd):
- """Validate process with given pid is running with provided parameters
-
- """
- cmdline = get_cmdline_from_pid(pid)
- return cmdlines_are_equal(expected_cmd, cmdline)
-
-
class Pinger(object):
def __init__(self, namespace, timeout=1, max_attempts=1):
self.namespace = namespace
poller = select.poll()
poller.register(stream.fileno())
poll_predicate = functools.partial(poller.poll, 1)
- wait_until_true(poll_predicate, timeout, 0.1,
- RuntimeError(
- 'No output in %.2f seconds' % timeout))
+ utils.wait_until_true(poll_predicate, timeout, 0.1,
+ RuntimeError(
+ 'No output in %.2f seconds' % timeout))
return stream.readline()
def writeline(self, data):
def child_is_running():
child_pid = utils.get_root_helper_child_pid(
self.pid, run_as_root=self.run_as_root)
- if pid_invoked_with_cmdline(child_pid, self.cmd):
+ if utils.pid_invoked_with_cmdline(child_pid, self.cmd):
return True
- wait_until_true(
+ utils.wait_until_true(
child_is_running,
timeout,
exception=RuntimeError("Process %s hasn't been spawned "
from six import moves
from neutron.agent.linux import async_process
-from neutron.agent.linux import utils
from neutron.tests import base
-class TestAsyncProcess(base.BaseTestCase):
+class AsyncProcessTestFramework(base.BaseTestCase):
def setUp(self):
- super(TestAsyncProcess, self).setUp()
+ super(AsyncProcessTestFramework, self).setUp()
self.test_file_path = self.get_temp_file_path('test_async_process.tmp')
self.data = [str(x) for x in moves.xrange(4)]
with file(self.test_file_path, 'w') as f:
output += new_output
eventlet.sleep(0.01)
+
+class TestAsyncProcess(AsyncProcessTestFramework):
+ def _safe_stop(self, proc):
+ try:
+ proc.stop()
+ except async_process.AsyncProcessException:
+ pass
+
def test_stopping_async_process_lifecycle(self):
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path])
- proc.start()
+ self.addCleanup(self._safe_stop, proc)
+ proc.start(blocking=True)
self._check_stdout(proc)
- proc.stop()
+ proc.stop(blocking=True)
# Ensure that the process and greenthreads have stopped
proc._process.wait()
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path],
respawn_interval=0)
+ self.addCleanup(self._safe_stop, proc)
proc.start()
# Ensure that the same output is read twice
self._check_stdout(proc)
- pid = utils.get_root_helper_child_pid(proc._process.pid,
- proc.run_as_root)
- proc._kill_process(pid)
+ proc._kill_process(proc.pid)
self._check_stdout(proc)
- proc.stop()
--- /dev/null
+# Copyright 2015 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+import testtools
+
+from neutron.agent.linux import async_process
+from neutron.agent.linux import utils
+from neutron.tests.functional.agent.linux import test_async_process
+
+
+class TestPIDHelpers(test_async_process.AsyncProcessTestFramework):
+ def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self):
+ cmd = ['tail', '-f', self.test_file_path]
+ proc = async_process.AsyncProcess(cmd)
+ proc.start(blocking=True)
+ self.addCleanup(proc.stop)
+
+ pid = proc.pid
+ self.assertEqual(cmd, utils.get_cmdline_from_pid(pid))
+ self.assertTrue(utils.pid_invoked_with_cmdline(pid, cmd))
+ self.assertEqual([], utils.get_cmdline_from_pid(-1))
+
+ def test_wait_until_true_predicate_succeeds(self):
+ utils.wait_until_true(lambda: True)
+
+ def test_wait_until_true_predicate_fails(self):
+ with testtools.ExpectedException(eventlet.timeout.Timeout):
+ utils.wait_until_true(lambda: False, 2)
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
+from neutron.agent.linux import utils
from neutron.agent.metadata import agent as metadata_agent
from neutron.common import config as common_config
from neutron.common import constants as l3_constants
port = router.get_ex_gw_port()
interface_name = self.agent.get_external_device_name(port['id'])
self._assert_no_ip_addresses_on_interface(router, interface_name)
- helpers.wait_until_true(lambda: router.ha_state == 'master')
+ utils.wait_until_true(lambda: router.ha_state == 'master')
# Keepalived notifies of a state transition when it starts,
# not when it ends. Thus, we have to wait until keepalived finishes
device,
self.agent.get_internal_device_name,
router.ns_name)
- helpers.wait_until_true(device_exists)
+ utils.wait_until_true(device_exists)
self.assertTrue(self._namespace_exists(router.ns_name))
self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
restarted_agent = l3_test_agent.TestL3NATAgent(self.agent.host,
self.agent.conf)
self._create_router(restarted_agent, router1.router)
- helpers.wait_until_true(lambda: self._floating_ips_configured(router1))
+ utils.wait_until_true(lambda: self._floating_ips_configured(router1))
class L3HATestFramework(L3AgentTestFramework):
router2 = self.manage_router(self.failover_agent, router_info_2)
- helpers.wait_until_true(lambda: router1.ha_state == 'master')
- helpers.wait_until_true(lambda: router2.ha_state == 'backup')
+ utils.wait_until_true(lambda: router1.ha_state == 'master')
+ utils.wait_until_true(lambda: router2.ha_state == 'backup')
device_name = router1.get_ha_device_name(
router1.router[l3_constants.HA_INTERFACE_KEY]['id'])
ha_device = ip_lib.IPDevice(device_name, namespace=router1.ns_name)
ha_device.link.set_down()
- helpers.wait_until_true(lambda: router2.ha_state == 'master')
- helpers.wait_until_true(lambda: router1.ha_state == 'fault')
+ utils.wait_until_true(lambda: router2.ha_state == 'master')
+ utils.wait_until_true(lambda: router1.ha_state == 'fault')
class MetadataFakeProxyHandler(object):
def test_returns_none_as_root(self):
self._test_get_root_helper_child_pid(expected=None, run_as_root=True)
+
+
+class TestPathUtilities(base.BaseTestCase):
+ def test_remove_abs_path(self):
+ self.assertEqual(['ping', '8.8.8.8'],
+ utils.remove_abs_path(['/usr/bin/ping', '8.8.8.8']))
+
+ def test_cmdlines_are_equal(self):
+ self.assertTrue(utils.cmdlines_are_equal(
+ ['ping', '8.8.8.8'],
+ ['/usr/bin/ping', '8.8.8.8']))
+
+ def test_cmdlines_are_equal_different_commands(self):
+ self.assertFalse(utils.cmdlines_are_equal(
+ ['ping', '8.8.8.8'],
+ ['/usr/bin/ping6', '8.8.8.8']))
# If this was called then check_added_address probably had a assert
self.assertFalse(device.addr.add.called)
+
+
+class TestAddNamespaceToCmd(base.BaseTestCase):
+ def test_add_namespace_to_cmd_with_namespace(self):
+ cmd = ['ping', '8.8.8.8']
+ self.assertEqual(['ip', 'netns', 'exec', 'tmp'] + cmd,
+ ip_lib.add_namespace_to_cmd(cmd, 'tmp'))
+
+ def test_add_namespace_to_cmd_without_namespace(self):
+ cmd = ['ping', '8.8.8.8']
+ self.assertEqual(cmd, ip_lib.add_namespace_to_cmd(cmd, None))