db0321de457930afaca6ac981fc82543358efb22
[openstack-build/neutron-build.git] / neutron / tests / unit / agent / linux / test_async_process.py
1 # Copyright 2013 Red Hat, Inc.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import signal
16
17 import eventlet.event
18 import eventlet.queue
19 import eventlet.timeout
20 import mock
21 import testtools
22
23 from neutron.agent.linux import async_process
24 from neutron.agent.linux import utils
25 from neutron.tests import base
26 from neutron.tests.unit.agent.linux import failing_process
27
28
29 class TestAsyncProcess(base.BaseTestCase):
30
31     def setUp(self):
32         super(TestAsyncProcess, self).setUp()
33         self.proc = async_process.AsyncProcess(['fake'])
34
35     def test_construtor_raises_exception_for_negative_respawn_interval(self):
36         with testtools.ExpectedException(ValueError):
37             async_process.AsyncProcess(['fake'], respawn_interval=-1)
38
39     def test__spawn(self):
40         expected_process = 'Foo'
41         proc = self.proc
42         with mock.patch.object(utils, 'create_process') as mock_create_process:
43             mock_create_process.return_value = [expected_process, None]
44             with mock.patch('eventlet.spawn') as mock_spawn:
45                 proc._spawn()
46
47         self.assertTrue(self.proc._is_running)
48         self.assertIsInstance(proc._kill_event, eventlet.event.Event)
49         self.assertEqual(proc._process, expected_process)
50         mock_spawn.assert_has_calls([
51             mock.call(proc._watch_process,
52                       proc._read_stdout,
53                       proc._kill_event),
54             mock.call(proc._watch_process,
55                       proc._read_stderr,
56                       proc._kill_event),
57         ])
58         self.assertEqual(len(proc._watchers), 2)
59
60     def test__handle_process_error_kills_with_respawn(self):
61         with mock.patch.object(self.proc, '_kill') as kill:
62             self.proc._handle_process_error()
63
64         kill.assert_has_calls([mock.call(signal.SIGKILL)])
65
66     def test__handle_process_error_kills_without_respawn(self):
67         self.proc.respawn_interval = 1
68         with mock.patch.object(self.proc, '_kill') as kill:
69             with mock.patch.object(self.proc, '_spawn') as spawn:
70                 with mock.patch('eventlet.sleep') as sleep:
71                     self.proc._handle_process_error()
72
73         kill.assert_has_calls([mock.call(signal.SIGKILL)])
74         sleep.assert_has_calls([mock.call(self.proc.respawn_interval)])
75         spawn.assert_called_once_with()
76
77     def test__handle_process_error_no_crash_if_started(self):
78         self.proc._is_running = True
79         with mock.patch.object(self.proc, '_kill'):
80             with mock.patch.object(self.proc, '_spawn') as mock_spawn:
81                 self.proc._handle_process_error()
82                 mock_spawn.assert_not_called()
83
84     def _watch_process_exception(self):
85         raise Exception('Error!')
86
87     def _test__watch_process(self, callback, kill_event):
88         self.proc._is_running = True
89         self.proc._kill_event = kill_event
90         # Ensure the test times out eventually if the watcher loops endlessly
91         with eventlet.timeout.Timeout(5):
92             with mock.patch.object(self.proc,
93                                    '_handle_process_error') as func:
94                 self.proc._watch_process(callback, kill_event)
95
96         if not kill_event.ready():
97             func.assert_called_once_with()
98
99     def test__watch_process_exits_on_callback_failure(self):
100         self._test__watch_process(lambda: None, eventlet.event.Event())
101
102     def test__watch_process_exits_on_exception(self):
103         self._test__watch_process(self._watch_process_exception,
104                                   eventlet.event.Event())
105         with mock.patch.object(self.proc,
106                                '_handle_process_error') as func:
107             self.proc._watch_process(self._watch_process_exception,
108                                      self.proc._kill_event)
109             func.assert_not_called()
110
111     def test__watch_process_exits_on_sent_kill_event(self):
112         kill_event = eventlet.event.Event()
113         kill_event.send()
114         self._test__watch_process(None, kill_event)
115
116     def _test_read_output_queues_and_returns_result(self, output):
117         queue = eventlet.queue.LightQueue()
118         mock_stream = mock.Mock()
119         with mock.patch.object(mock_stream, 'readline') as mock_readline:
120             mock_readline.return_value = output
121             result = self.proc._read(mock_stream, queue)
122
123         if output:
124             self.assertEqual(output, result)
125             self.assertEqual(output, queue.get_nowait())
126         else:
127             self.assertFalse(result)
128             self.assertTrue(queue.empty())
129
130     def test__read_queues_and_returns_output(self):
131         self._test_read_output_queues_and_returns_result('foo')
132
133     def test__read_returns_none_for_missing_output(self):
134         self._test_read_output_queues_and_returns_result('')
135
136     def test_start_raises_exception_if_process_already_started(self):
137         self.proc._is_running = True
138         with testtools.ExpectedException(async_process.AsyncProcessException):
139             self.proc.start()
140
141     def test_start_invokes__spawn(self):
142         with mock.patch.object(self.proc, '_spawn') as mock_start:
143             self.proc.start()
144
145         mock_start.assert_called_once_with()
146
147     def test__iter_queue_returns_empty_list_for_empty_queue(self):
148         result = list(self.proc._iter_queue(eventlet.queue.LightQueue(),
149                                             False))
150         self.assertEqual([], result)
151
152     def test__iter_queue_returns_queued_data(self):
153         queue = eventlet.queue.LightQueue()
154         queue.put('foo')
155         result = list(self.proc._iter_queue(queue, False))
156         self.assertEqual(result, ['foo'])
157
158     def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
159         expected_value = 'foo'
160         with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue:
161             mock_iter_queue.return_value = expected_value
162             target_func = getattr(self.proc, 'iter_%s' % output_type, None)
163             value = target_func()
164
165         self.assertEqual(value, expected_value)
166         queue = getattr(self.proc, '_%s_lines' % output_type, None)
167         mock_iter_queue.assert_called_with(queue, False)
168
169     def test_iter_stdout(self):
170         self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
171
172     def test_iter_stderr(self):
173         self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
174
175     def test__kill_targets_process_for_pid(self):
176         pid = 1
177
178         with mock.patch.object(self.proc, '_kill_event'
179                                ) as mock_kill_event,\
180                 mock.patch.object(utils, 'get_root_helper_child_pid',
181                                   return_value=pid),\
182                 mock.patch.object(self.proc, '_kill_process'
183                                   ) as mock_kill_process,\
184                 mock.patch.object(self.proc, '_process'):
185             self.proc._kill(signal.SIGKILL)
186
187             self.assertIsNone(self.proc._kill_event)
188             self.assertFalse(self.proc._is_running)
189
190         mock_kill_event.send.assert_called_once_with()
191         if pid:
192             mock_kill_process.assert_called_once_with(pid, signal.SIGKILL)
193
194     def _test__kill_process(self, pid, expected, exception_message=None,
195                             kill_signal=signal.SIGKILL):
196         self.proc.run_as_root = True
197         if exception_message:
198             exc = RuntimeError(exception_message)
199         else:
200             exc = None
201         with mock.patch.object(utils, 'execute',
202                                side_effect=exc) as mock_execute:
203             actual = self.proc._kill_process(pid, kill_signal)
204
205         self.assertEqual(expected, actual)
206         mock_execute.assert_called_with(['kill', '-%d' % kill_signal, pid],
207                                         run_as_root=self.proc.run_as_root)
208
209     def test__kill_process_returns_true_for_valid_pid(self):
210         self._test__kill_process('1', True)
211
212     def test__kill_process_returns_true_for_stale_pid(self):
213         self._test__kill_process('1', True, 'No such process')
214
215     def test__kill_process_returns_false_for_execute_exception(self):
216         self._test__kill_process('1', False, 'Invalid')
217
218     def test_kill_process_with_different_signal(self):
219         self._test__kill_process('1', True, kill_signal=signal.SIGTERM)
220
221     def test_stop_calls_kill_with_provided_signal_number(self):
222         self.proc._is_running = True
223         with mock.patch.object(self.proc, '_kill') as mock_kill:
224             self.proc.stop(kill_signal=signal.SIGTERM)
225         mock_kill.assert_called_once_with(signal.SIGTERM)
226
227     def test_stop_raises_exception_if_already_started(self):
228         with testtools.ExpectedException(async_process.AsyncProcessException):
229             self.proc.stop()
230
231     def test_cmd(self):
232         for expected, cmd in (('ls -l file', ['ls', '-l', 'file']),
233                               ('fake', ['fake'])):
234             proc = async_process.AsyncProcess(cmd)
235             self.assertEqual(expected, proc.cmd)
236
237
238 class TestAsyncProcessLogging(base.BaseTestCase):
239
240     def setUp(self):
241         super(TestAsyncProcessLogging, self).setUp()
242         self.log_mock = mock.patch.object(async_process, 'LOG').start()
243
244     def _test__read_stdout_logging(self, enable):
245         proc = async_process.AsyncProcess(['fakecmd'], log_output=enable)
246         with mock.patch.object(proc, '_read', return_value='fakedata'),\
247             mock.patch.object(proc, '_process'):
248             proc._read_stdout()
249         self.assertEqual(enable, self.log_mock.debug.called)
250
251     def _test__read_stderr_logging(self, enable):
252         proc = async_process.AsyncProcess(['fake'], log_output=enable)
253         with mock.patch.object(proc, '_read', return_value='fakedata'),\
254                 mock.patch.object(proc, '_process'):
255             proc._read_stderr()
256         self.assertEqual(enable, self.log_mock.error.called)
257
258     def test__read_stdout_logging_enabled(self):
259         self._test__read_stdout_logging(enable=True)
260
261     def test__read_stdout_logging_disabled(self):
262         self._test__read_stdout_logging(enable=False)
263
264     def test__read_stderr_logging_enabled(self):
265         self._test__read_stderr_logging(enable=True)
266
267     def test__read_stderr_logging_disabled(self):
268         self._test__read_stderr_logging(enable=False)
269
270
271 class TestAsyncProcessDieOnError(base.BaseTestCase):
272
273     def test__read_stderr_returns_none_on_error(self):
274         proc = async_process.AsyncProcess(['fakecmd'], die_on_error=True)
275         with mock.patch.object(proc, '_read', return_value='fakedata'),\
276                 mock.patch.object(proc, '_process'):
277             self.assertIsNone(proc._read_stderr())
278
279
280 class TestFailingAsyncProcess(base.BaseTestCase):
281     def setUp(self):
282         super(TestFailingAsyncProcess, self).setUp()
283         path = self.get_temp_file_path('async.tmp', self.get_new_temp_dir())
284         self.process = async_process.AsyncProcess(['python',
285                                                    failing_process.__file__,
286                                                    path],
287                                                   respawn_interval=0)
288
289     def test_failing_async_process_handle_error_once(self):
290         with mock.patch.object(self.process, '_handle_process_error')\
291                 as handle_error_mock:
292             self.process.start()
293             self.process._process.wait()
294             # Wait for the monitor process to complete
295             for thread in self.process._watchers:
296                 thread.wait()
297             self.assertEqual(1, handle_error_mock.call_count)