]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
ovsdb monitor: get rid of custom _read_stdout/_read_stderr methods
authorIhar Hrachyshka <ihrachys@redhat.com>
Thu, 17 Sep 2015 12:57:43 +0000 (14:57 +0200)
committerIhar Hrachyshka <ihrachys@redhat.com>
Mon, 21 Sep 2015 15:55:38 +0000 (17:55 +0200)
Those methods do the same thing as AsyncProcess counterparts, just
with logging the received output. It's better to move the logging into
AsyncProcess and control it with __init__ arguments.

This allows us to get rid of some duplicate tests for ovsdb monitor.

Change-Id: Ic20ded27ba09afdd73e4d96c47469c2d7b4d4db5
Related-Bug: #1495937

neutron/agent/linux/async_process.py
neutron/agent/linux/ovsdb_monitor.py
neutron/tests/unit/agent/linux/test_async_process.py
neutron/tests/unit/agent/linux/test_ovsdb_monitor.py

index c31210bd45ac88dde01514d730663e56059382f4..f94cfa6a2e8eef044d0adec708ab99b6873a9fb5 100644 (file)
@@ -56,7 +56,7 @@ class AsyncProcess(object):
     """
 
     def __init__(self, cmd, run_as_root=False, respawn_interval=None,
-                 namespace=None):
+                 namespace=None, log_output=False, die_on_error=False):
         """Constructor.
 
         :param cmd: The list of command arguments to invoke.
@@ -66,9 +66,11 @@ class AsyncProcess(object):
                only be attempted if a value of 0 or greater is provided.
         :param namespace: Optional, start the command in the specified
                namespace.
+        :param log_output: Optional, also log received output.
+        :param die_on_error: Optional, kills the process on stderr output.
         """
         self.cmd_without_namespace = cmd
-        self.cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
+        self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
         self.run_as_root = run_as_root
         if respawn_interval is not None and respawn_interval < 0:
             raise ValueError(_('respawn_interval must be >= 0 if provided.'))
@@ -77,6 +79,12 @@ class AsyncProcess(object):
         self._kill_event = None
         self._reset_queues()
         self._watchers = []
+        self.log_output = log_output
+        self.die_on_error = die_on_error
+
+    @property
+    def cmd(self):
+        return ' '.join(self._cmd)
 
     def _reset_queues(self):
         self._stdout_lines = eventlet.queue.LightQueue()
@@ -126,7 +134,7 @@ class AsyncProcess(object):
     def _spawn(self):
         """Spawn a process and its watchers."""
         self._kill_event = eventlet.event.Event()
-        self._process, cmd = utils.create_process(self.cmd,
+        self._process, cmd = utils.create_process(self._cmd,
                                                   run_as_root=self.run_as_root)
         self._watchers = []
         for reader in (self._read_stdout, self._read_stderr):
@@ -223,10 +231,28 @@ class AsyncProcess(object):
             return data
 
     def _read_stdout(self):
-        return self._read(self._process.stdout, self._stdout_lines)
+        data = self._read(self._process.stdout, self._stdout_lines)
+        if self.log_output:
+            LOG.debug('Output received from [%(cmd)s]: %(data)s',
+                      {'cmd': self.cmd,
+                       'data': data})
+        return data
 
     def _read_stderr(self):
-        return self._read(self._process.stderr, self._stderr_lines)
+        data = self._read(self._process.stderr, self._stderr_lines)
+        if self.log_output:
+            LOG.error(_LE('Error received from [%(cmd)s]: %(err)s'),
+                      {'cmd': self.cmd,
+                       'err': data})
+        if self.die_on_error:
+            LOG.error(_LE("Process [%(cmd)s] dies due to the error: %(err)s"),
+                      {'cmd': self.cmd,
+                       'err': data})
+            # the callback caller will use None to indicate the need to bail
+            # out of the thread
+            return None
+
+        return data
 
     def _iter_queue(self, queue, block):
         while True:
index 8dafa70738e027d16d2b4fa1b17ff29d094ec5f6..feed5c3ff32ef059447cef2fee79ddf897af2ac5 100644 (file)
@@ -40,22 +40,9 @@ class OvsdbMonitor(async_process.AsyncProcess):
         if format:
             cmd.append('--format=%s' % format)
         super(OvsdbMonitor, self).__init__(cmd, run_as_root=True,
-                                           respawn_interval=respawn_interval)
-
-    def _read_stdout(self):
-        data = self._process.stdout.readline()
-        if not data:
-            return
-        self._stdout_lines.put(data)
-        LOG.debug('Output received from ovsdb monitor: %s', data)
-        return data
-
-    def _read_stderr(self):
-        data = super(OvsdbMonitor, self)._read_stderr()
-        if data:
-            LOG.error(_LE('Error received from ovsdb monitor: %s'), data)
-            # Do not return value to ensure that stderr output will
-            # stop the monitor.
+                                           respawn_interval=respawn_interval,
+                                           log_output=True,
+                                           die_on_error=True)
 
 
 class SimpleInterfaceMonitor(OvsdbMonitor):
index 5bf9097592a1b1afd7f5f5c53455b21d7c59d201..ad3dfac011de5825d99303067d1724f8f4aca288 100644 (file)
@@ -84,7 +84,7 @@ class TestAsyncProcess(base.BaseTestCase):
             func.assert_called_once_with()
 
     def test__watch_process_exits_on_callback_failure(self):
-        self._test__watch_process(lambda: False, eventlet.event.Event())
+        self._test__watch_process(lambda: None, eventlet.event.Event())
 
     def test__watch_process_exits_on_exception(self):
         def foo():
@@ -219,3 +219,51 @@ class TestAsyncProcess(base.BaseTestCase):
     def test_stop_raises_exception_if_already_started(self):
         with testtools.ExpectedException(async_process.AsyncProcessException):
             self.proc.stop()
+
+    def test_cmd(self):
+        for expected, cmd in (('ls -l file', ['ls', '-l', 'file']),
+                              ('fake', ['fake'])):
+            proc = async_process.AsyncProcess(cmd)
+            self.assertEqual(expected, proc.cmd)
+
+
+class TestAsyncProcessLogging(base.BaseTestCase):
+
+    def setUp(self):
+        super(TestAsyncProcessLogging, self).setUp()
+        self.log_mock = mock.patch.object(async_process, 'LOG').start()
+
+    def _test__read_stdout_logging(self, enable):
+        proc = async_process.AsyncProcess(['fakecmd'], log_output=enable)
+        with mock.patch.object(proc, '_read', return_value='fakedata'),\
+            mock.patch.object(proc, '_process'):
+            proc._read_stdout()
+        self.assertEqual(enable, self.log_mock.debug.called)
+
+    def _test__read_stderr_logging(self, enable):
+        proc = async_process.AsyncProcess(['fake'], log_output=enable)
+        with mock.patch.object(proc, '_read', return_value='fakedata'),\
+                mock.patch.object(proc, '_process'):
+            proc._read_stderr()
+        self.assertEqual(enable, self.log_mock.error.called)
+
+    def test__read_stdout_logging_enabled(self):
+        self._test__read_stdout_logging(enable=True)
+
+    def test__read_stdout_logging_disabled(self):
+        self._test__read_stdout_logging(enable=False)
+
+    def test__read_stderr_logging_enabled(self):
+        self._test__read_stderr_logging(enable=True)
+
+    def test__read_stderr_logging_disabled(self):
+        self._test__read_stderr_logging(enable=False)
+
+
+class TestAsyncProcessDieOnError(base.BaseTestCase):
+
+    def test__read_stderr_returns_none_on_error(self):
+        proc = async_process.AsyncProcess(['fakecmd'], die_on_error=True)
+        with mock.patch.object(proc, '_read', return_value='fakedata'),\
+                mock.patch.object(proc, '_process'):
+            self.assertIsNone(proc._read_stderr())
index ec1fb52d3bf2aaff613ebbded122c72143024340..dc41c96e009f5a93d102735be1bbba015ec38163 100644 (file)
@@ -21,33 +21,23 @@ from neutron.tests import base
 
 class TestOvsdbMonitor(base.BaseTestCase):
 
-    def setUp(self):
-        super(TestOvsdbMonitor, self).setUp()
-        self.monitor = ovsdb_monitor.OvsdbMonitor('Interface')
-
-    def read_output_queues_and_returns_result(self, output_type, output):
-        with mock.patch.object(self.monitor, '_process') as mock_process:
-            with mock.patch.object(mock_process, output_type) as mock_file:
-                with mock.patch.object(mock_file, 'readline') as mock_readline:
-                    mock_readline.return_value = output
-                    func = getattr(self.monitor,
-                                   '_read_%s' % output_type,
-                                   None)
-                    return func()
-
-    def test__read_stdout_returns_none_for_empty_read(self):
-        result = self.read_output_queues_and_returns_result('stdout', '')
-        self.assertIsNone(result)
-
-    def test__read_stdout_queues_normal_output_to_stdout_queue(self):
-        output = 'foo'
-        result = self.read_output_queues_and_returns_result('stdout', output)
-        self.assertEqual(result, output)
-        self.assertEqual(self.monitor._stdout_lines.get_nowait(), output)
+    def test___init__(self):
+        ovsdb_monitor.OvsdbMonitor('Interface')
 
-    def test__read_stderr_returns_none(self):
-        result = self.read_output_queues_and_returns_result('stderr', '')
-        self.assertIsNone(result)
+    def test___init___with_columns(self):
+        columns = ['col1', 'col2']
+        with mock.patch(
+            'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
+            ovsdb_monitor.OvsdbMonitor('Interface', columns=columns)
+            cmd = init.call_args_list[0][0][0]
+            self.assertEqual('col1,col2', cmd[-1])
+
+    def test___init___with_format(self):
+        with mock.patch(
+            'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
+            ovsdb_monitor.OvsdbMonitor('Interface', format='blob')
+            cmd = init.call_args_list[0][0][0]
+            self.assertEqual('--format=blob', cmd[-1])
 
 
 class TestSimpleInterfaceMonitor(base.BaseTestCase):