"""
def __init__(self, cmd, run_as_root=False, respawn_interval=None,
- namespace=None):
+ namespace=None, log_output=False, die_on_error=False):
"""Constructor.
:param cmd: The list of command arguments to invoke.
only be attempted if a value of 0 or greater is provided.
:param namespace: Optional, start the command in the specified
namespace.
+ :param log_output: Optional, also log received output.
+ :param die_on_error: Optional, kills the process on stderr output.
"""
self.cmd_without_namespace = cmd
- self.cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
+ self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
self.run_as_root = run_as_root
if respawn_interval is not None and respawn_interval < 0:
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
self._kill_event = None
self._reset_queues()
self._watchers = []
+ self.log_output = log_output
+ self.die_on_error = die_on_error
+
+ @property
+ def cmd(self):
+ return ' '.join(self._cmd)
def _reset_queues(self):
self._stdout_lines = eventlet.queue.LightQueue()
def _spawn(self):
"""Spawn a process and its watchers."""
self._kill_event = eventlet.event.Event()
- self._process, cmd = utils.create_process(self.cmd,
+ self._process, cmd = utils.create_process(self._cmd,
run_as_root=self.run_as_root)
self._watchers = []
for reader in (self._read_stdout, self._read_stderr):
return data
def _read_stdout(self):
- return self._read(self._process.stdout, self._stdout_lines)
+ data = self._read(self._process.stdout, self._stdout_lines)
+ if self.log_output:
+ LOG.debug('Output received from [%(cmd)s]: %(data)s',
+ {'cmd': self.cmd,
+ 'data': data})
+ return data
def _read_stderr(self):
- return self._read(self._process.stderr, self._stderr_lines)
+ data = self._read(self._process.stderr, self._stderr_lines)
+ if self.log_output:
+ LOG.error(_LE('Error received from [%(cmd)s]: %(err)s'),
+ {'cmd': self.cmd,
+ 'err': data})
+ if self.die_on_error:
+ LOG.error(_LE("Process [%(cmd)s] dies due to the error: %(err)s"),
+ {'cmd': self.cmd,
+ 'err': data})
+ # the callback caller will use None to indicate the need to bail
+ # out of the thread
+ return None
+
+ return data
def _iter_queue(self, queue, block):
while True:
if format:
cmd.append('--format=%s' % format)
super(OvsdbMonitor, self).__init__(cmd, run_as_root=True,
- respawn_interval=respawn_interval)
-
- def _read_stdout(self):
- data = self._process.stdout.readline()
- if not data:
- return
- self._stdout_lines.put(data)
- LOG.debug('Output received from ovsdb monitor: %s', data)
- return data
-
- def _read_stderr(self):
- data = super(OvsdbMonitor, self)._read_stderr()
- if data:
- LOG.error(_LE('Error received from ovsdb monitor: %s'), data)
- # Do not return value to ensure that stderr output will
- # stop the monitor.
+ respawn_interval=respawn_interval,
+ log_output=True,
+ die_on_error=True)
class SimpleInterfaceMonitor(OvsdbMonitor):
func.assert_called_once_with()
def test__watch_process_exits_on_callback_failure(self):
- self._test__watch_process(lambda: False, eventlet.event.Event())
+ self._test__watch_process(lambda: None, eventlet.event.Event())
def test__watch_process_exits_on_exception(self):
def foo():
def test_stop_raises_exception_if_already_started(self):
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.stop()
+
+ def test_cmd(self):
+ for expected, cmd in (('ls -l file', ['ls', '-l', 'file']),
+ ('fake', ['fake'])):
+ proc = async_process.AsyncProcess(cmd)
+ self.assertEqual(expected, proc.cmd)
+
+
+class TestAsyncProcessLogging(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestAsyncProcessLogging, self).setUp()
+ self.log_mock = mock.patch.object(async_process, 'LOG').start()
+
+ def _test__read_stdout_logging(self, enable):
+ proc = async_process.AsyncProcess(['fakecmd'], log_output=enable)
+ with mock.patch.object(proc, '_read', return_value='fakedata'),\
+ mock.patch.object(proc, '_process'):
+ proc._read_stdout()
+ self.assertEqual(enable, self.log_mock.debug.called)
+
+ def _test__read_stderr_logging(self, enable):
+ proc = async_process.AsyncProcess(['fake'], log_output=enable)
+ with mock.patch.object(proc, '_read', return_value='fakedata'),\
+ mock.patch.object(proc, '_process'):
+ proc._read_stderr()
+ self.assertEqual(enable, self.log_mock.error.called)
+
+ def test__read_stdout_logging_enabled(self):
+ self._test__read_stdout_logging(enable=True)
+
+ def test__read_stdout_logging_disabled(self):
+ self._test__read_stdout_logging(enable=False)
+
+ def test__read_stderr_logging_enabled(self):
+ self._test__read_stderr_logging(enable=True)
+
+ def test__read_stderr_logging_disabled(self):
+ self._test__read_stderr_logging(enable=False)
+
+
+class TestAsyncProcessDieOnError(base.BaseTestCase):
+
+ def test__read_stderr_returns_none_on_error(self):
+ proc = async_process.AsyncProcess(['fakecmd'], die_on_error=True)
+ with mock.patch.object(proc, '_read', return_value='fakedata'),\
+ mock.patch.object(proc, '_process'):
+ self.assertIsNone(proc._read_stderr())
class TestOvsdbMonitor(base.BaseTestCase):
- def setUp(self):
- super(TestOvsdbMonitor, self).setUp()
- self.monitor = ovsdb_monitor.OvsdbMonitor('Interface')
-
- def read_output_queues_and_returns_result(self, output_type, output):
- with mock.patch.object(self.monitor, '_process') as mock_process:
- with mock.patch.object(mock_process, output_type) as mock_file:
- with mock.patch.object(mock_file, 'readline') as mock_readline:
- mock_readline.return_value = output
- func = getattr(self.monitor,
- '_read_%s' % output_type,
- None)
- return func()
-
- def test__read_stdout_returns_none_for_empty_read(self):
- result = self.read_output_queues_and_returns_result('stdout', '')
- self.assertIsNone(result)
-
- def test__read_stdout_queues_normal_output_to_stdout_queue(self):
- output = 'foo'
- result = self.read_output_queues_and_returns_result('stdout', output)
- self.assertEqual(result, output)
- self.assertEqual(self.monitor._stdout_lines.get_nowait(), output)
+ def test___init__(self):
+ ovsdb_monitor.OvsdbMonitor('Interface')
- def test__read_stderr_returns_none(self):
- result = self.read_output_queues_and_returns_result('stderr', '')
- self.assertIsNone(result)
+ def test___init___with_columns(self):
+ columns = ['col1', 'col2']
+ with mock.patch(
+ 'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
+ ovsdb_monitor.OvsdbMonitor('Interface', columns=columns)
+ cmd = init.call_args_list[0][0][0]
+ self.assertEqual('col1,col2', cmd[-1])
+
+ def test___init___with_format(self):
+ with mock.patch(
+ 'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
+ ovsdb_monitor.OvsdbMonitor('Interface', format='blob')
+ cmd = init.call_args_list[0][0][0]
+ self.assertEqual('--format=blob', cmd[-1])
class TestSimpleInterfaceMonitor(base.BaseTestCase):