From bdcf8e60794c8aae9fc4aeaff200adf986381306 Mon Sep 17 00:00:00 2001 From: Ihar Hrachyshka Date: Thu, 17 Sep 2015 14:57:43 +0200 Subject: [PATCH] ovsdb monitor: get rid of custom _read_stdout/_read_stderr methods Those methods do the same thing as AsyncProcess counterparts, just with logging the received output. It's better to move the logging into AsyncProcess and control it with __init__ arguments. This allows us to get rid of some duplicate tests for ovsdb monitor. Change-Id: Ic20ded27ba09afdd73e4d96c47469c2d7b4d4db5 Related-Bug: #1495937 --- neutron/agent/linux/async_process.py | 36 +++++++++++-- neutron/agent/linux/ovsdb_monitor.py | 19 ++----- .../unit/agent/linux/test_async_process.py | 50 ++++++++++++++++++- .../unit/agent/linux/test_ovsdb_monitor.py | 42 ++++++---------- 4 files changed, 99 insertions(+), 48 deletions(-) diff --git a/neutron/agent/linux/async_process.py b/neutron/agent/linux/async_process.py index c31210bd4..f94cfa6a2 100644 --- a/neutron/agent/linux/async_process.py +++ b/neutron/agent/linux/async_process.py @@ -56,7 +56,7 @@ class AsyncProcess(object): """ def __init__(self, cmd, run_as_root=False, respawn_interval=None, - namespace=None): + namespace=None, log_output=False, die_on_error=False): """Constructor. :param cmd: The list of command arguments to invoke. @@ -66,9 +66,11 @@ class AsyncProcess(object): only be attempted if a value of 0 or greater is provided. :param namespace: Optional, start the command in the specified namespace. + :param log_output: Optional, also log received output. + :param die_on_error: Optional, kills the process on stderr output. """ self.cmd_without_namespace = cmd - self.cmd = ip_lib.add_namespace_to_cmd(cmd, namespace) + self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace) self.run_as_root = run_as_root if respawn_interval is not None and respawn_interval < 0: raise ValueError(_('respawn_interval must be >= 0 if provided.')) @@ -77,6 +79,12 @@ class AsyncProcess(object): self._kill_event = None self._reset_queues() self._watchers = [] + self.log_output = log_output + self.die_on_error = die_on_error + + @property + def cmd(self): + return ' '.join(self._cmd) def _reset_queues(self): self._stdout_lines = eventlet.queue.LightQueue() @@ -126,7 +134,7 @@ class AsyncProcess(object): def _spawn(self): """Spawn a process and its watchers.""" self._kill_event = eventlet.event.Event() - self._process, cmd = utils.create_process(self.cmd, + self._process, cmd = utils.create_process(self._cmd, run_as_root=self.run_as_root) self._watchers = [] for reader in (self._read_stdout, self._read_stderr): @@ -223,10 +231,28 @@ class AsyncProcess(object): return data def _read_stdout(self): - return self._read(self._process.stdout, self._stdout_lines) + data = self._read(self._process.stdout, self._stdout_lines) + if self.log_output: + LOG.debug('Output received from [%(cmd)s]: %(data)s', + {'cmd': self.cmd, + 'data': data}) + return data def _read_stderr(self): - return self._read(self._process.stderr, self._stderr_lines) + data = self._read(self._process.stderr, self._stderr_lines) + if self.log_output: + LOG.error(_LE('Error received from [%(cmd)s]: %(err)s'), + {'cmd': self.cmd, + 'err': data}) + if self.die_on_error: + LOG.error(_LE("Process [%(cmd)s] dies due to the error: %(err)s"), + {'cmd': self.cmd, + 'err': data}) + # the callback caller will use None to indicate the need to bail + # out of the thread + return None + + return data def _iter_queue(self, queue, block): while True: diff --git a/neutron/agent/linux/ovsdb_monitor.py b/neutron/agent/linux/ovsdb_monitor.py index 8dafa7073..feed5c3ff 100644 --- a/neutron/agent/linux/ovsdb_monitor.py +++ b/neutron/agent/linux/ovsdb_monitor.py @@ -40,22 +40,9 @@ class OvsdbMonitor(async_process.AsyncProcess): if format: cmd.append('--format=%s' % format) super(OvsdbMonitor, self).__init__(cmd, run_as_root=True, - respawn_interval=respawn_interval) - - def _read_stdout(self): - data = self._process.stdout.readline() - if not data: - return - self._stdout_lines.put(data) - LOG.debug('Output received from ovsdb monitor: %s', data) - return data - - def _read_stderr(self): - data = super(OvsdbMonitor, self)._read_stderr() - if data: - LOG.error(_LE('Error received from ovsdb monitor: %s'), data) - # Do not return value to ensure that stderr output will - # stop the monitor. + respawn_interval=respawn_interval, + log_output=True, + die_on_error=True) class SimpleInterfaceMonitor(OvsdbMonitor): diff --git a/neutron/tests/unit/agent/linux/test_async_process.py b/neutron/tests/unit/agent/linux/test_async_process.py index 5bf909759..ad3dfac01 100644 --- a/neutron/tests/unit/agent/linux/test_async_process.py +++ b/neutron/tests/unit/agent/linux/test_async_process.py @@ -84,7 +84,7 @@ class TestAsyncProcess(base.BaseTestCase): func.assert_called_once_with() def test__watch_process_exits_on_callback_failure(self): - self._test__watch_process(lambda: False, eventlet.event.Event()) + self._test__watch_process(lambda: None, eventlet.event.Event()) def test__watch_process_exits_on_exception(self): def foo(): @@ -219,3 +219,51 @@ class TestAsyncProcess(base.BaseTestCase): def test_stop_raises_exception_if_already_started(self): with testtools.ExpectedException(async_process.AsyncProcessException): self.proc.stop() + + def test_cmd(self): + for expected, cmd in (('ls -l file', ['ls', '-l', 'file']), + ('fake', ['fake'])): + proc = async_process.AsyncProcess(cmd) + self.assertEqual(expected, proc.cmd) + + +class TestAsyncProcessLogging(base.BaseTestCase): + + def setUp(self): + super(TestAsyncProcessLogging, self).setUp() + self.log_mock = mock.patch.object(async_process, 'LOG').start() + + def _test__read_stdout_logging(self, enable): + proc = async_process.AsyncProcess(['fakecmd'], log_output=enable) + with mock.patch.object(proc, '_read', return_value='fakedata'),\ + mock.patch.object(proc, '_process'): + proc._read_stdout() + self.assertEqual(enable, self.log_mock.debug.called) + + def _test__read_stderr_logging(self, enable): + proc = async_process.AsyncProcess(['fake'], log_output=enable) + with mock.patch.object(proc, '_read', return_value='fakedata'),\ + mock.patch.object(proc, '_process'): + proc._read_stderr() + self.assertEqual(enable, self.log_mock.error.called) + + def test__read_stdout_logging_enabled(self): + self._test__read_stdout_logging(enable=True) + + def test__read_stdout_logging_disabled(self): + self._test__read_stdout_logging(enable=False) + + def test__read_stderr_logging_enabled(self): + self._test__read_stderr_logging(enable=True) + + def test__read_stderr_logging_disabled(self): + self._test__read_stderr_logging(enable=False) + + +class TestAsyncProcessDieOnError(base.BaseTestCase): + + def test__read_stderr_returns_none_on_error(self): + proc = async_process.AsyncProcess(['fakecmd'], die_on_error=True) + with mock.patch.object(proc, '_read', return_value='fakedata'),\ + mock.patch.object(proc, '_process'): + self.assertIsNone(proc._read_stderr()) diff --git a/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py b/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py index ec1fb52d3..dc41c96e0 100644 --- a/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py +++ b/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py @@ -21,33 +21,23 @@ from neutron.tests import base class TestOvsdbMonitor(base.BaseTestCase): - def setUp(self): - super(TestOvsdbMonitor, self).setUp() - self.monitor = ovsdb_monitor.OvsdbMonitor('Interface') - - def read_output_queues_and_returns_result(self, output_type, output): - with mock.patch.object(self.monitor, '_process') as mock_process: - with mock.patch.object(mock_process, output_type) as mock_file: - with mock.patch.object(mock_file, 'readline') as mock_readline: - mock_readline.return_value = output - func = getattr(self.monitor, - '_read_%s' % output_type, - None) - return func() - - def test__read_stdout_returns_none_for_empty_read(self): - result = self.read_output_queues_and_returns_result('stdout', '') - self.assertIsNone(result) - - def test__read_stdout_queues_normal_output_to_stdout_queue(self): - output = 'foo' - result = self.read_output_queues_and_returns_result('stdout', output) - self.assertEqual(result, output) - self.assertEqual(self.monitor._stdout_lines.get_nowait(), output) + def test___init__(self): + ovsdb_monitor.OvsdbMonitor('Interface') - def test__read_stderr_returns_none(self): - result = self.read_output_queues_and_returns_result('stderr', '') - self.assertIsNone(result) + def test___init___with_columns(self): + columns = ['col1', 'col2'] + with mock.patch( + 'neutron.agent.linux.async_process.AsyncProcess.__init__') as init: + ovsdb_monitor.OvsdbMonitor('Interface', columns=columns) + cmd = init.call_args_list[0][0][0] + self.assertEqual('col1,col2', cmd[-1]) + + def test___init___with_format(self): + with mock.patch( + 'neutron.agent.linux.async_process.AsyncProcess.__init__') as init: + ovsdb_monitor.OvsdbMonitor('Interface', format='blob') + cmd = init.call_args_list[0][0][0] + self.assertEqual('--format=blob', cmd[-1]) class TestSimpleInterfaceMonitor(base.BaseTestCase): -- 2.45.2