1 # Copyright 2013 Red Hat, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
19 import eventlet.timeout
23 from neutron.agent.linux import async_process
24 from neutron.agent.linux import utils
25 from neutron.tests import base
26 from neutron.tests.unit.agent.linux import failing_process
29 class TestAsyncProcess(base.BaseTestCase):
32 super(TestAsyncProcess, self).setUp()
33 self.proc = async_process.AsyncProcess(['fake'])
35 def test_construtor_raises_exception_for_negative_respawn_interval(self):
36 with testtools.ExpectedException(ValueError):
37 async_process.AsyncProcess(['fake'], respawn_interval=-1)
39 def test__spawn(self):
40 expected_process = 'Foo'
42 with mock.patch.object(utils, 'create_process') as mock_create_process:
43 mock_create_process.return_value = [expected_process, None]
44 with mock.patch('eventlet.spawn') as mock_spawn:
47 self.assertTrue(self.proc._is_running)
48 self.assertIsInstance(proc._kill_event, eventlet.event.Event)
49 self.assertEqual(proc._process, expected_process)
50 mock_spawn.assert_has_calls([
51 mock.call(proc._watch_process,
54 mock.call(proc._watch_process,
58 self.assertEqual(len(proc._watchers), 2)
60 def test__handle_process_error_kills_with_respawn(self):
61 with mock.patch.object(self.proc, '_kill') as kill:
62 self.proc._handle_process_error()
64 kill.assert_has_calls([mock.call(signal.SIGKILL)])
66 def test__handle_process_error_kills_without_respawn(self):
67 self.proc.respawn_interval = 1
68 with mock.patch.object(self.proc, '_kill') as kill:
69 with mock.patch.object(self.proc, '_spawn') as spawn:
70 with mock.patch('eventlet.sleep') as sleep:
71 self.proc._handle_process_error()
73 kill.assert_has_calls([mock.call(signal.SIGKILL)])
74 sleep.assert_has_calls([mock.call(self.proc.respawn_interval)])
75 spawn.assert_called_once_with()
77 def test__handle_process_error_no_crash_if_started(self):
78 self.proc._is_running = True
79 with mock.patch.object(self.proc, '_kill'):
80 with mock.patch.object(self.proc, '_spawn') as mock_spawn:
81 self.proc._handle_process_error()
82 mock_spawn.assert_not_called()
84 def _watch_process_exception(self):
85 raise Exception('Error!')
87 def _test__watch_process(self, callback, kill_event):
88 self.proc._is_running = True
89 self.proc._kill_event = kill_event
90 # Ensure the test times out eventually if the watcher loops endlessly
91 with eventlet.timeout.Timeout(5):
92 with mock.patch.object(self.proc,
93 '_handle_process_error') as func:
94 self.proc._watch_process(callback, kill_event)
96 if not kill_event.ready():
97 func.assert_called_once_with()
99 def test__watch_process_exits_on_callback_failure(self):
100 self._test__watch_process(lambda: None, eventlet.event.Event())
102 def test__watch_process_exits_on_exception(self):
103 self._test__watch_process(self._watch_process_exception,
104 eventlet.event.Event())
105 with mock.patch.object(self.proc,
106 '_handle_process_error') as func:
107 self.proc._watch_process(self._watch_process_exception,
108 self.proc._kill_event)
109 func.assert_not_called()
111 def test__watch_process_exits_on_sent_kill_event(self):
112 kill_event = eventlet.event.Event()
114 self._test__watch_process(None, kill_event)
116 def _test_read_output_queues_and_returns_result(self, output):
117 queue = eventlet.queue.LightQueue()
118 mock_stream = mock.Mock()
119 with mock.patch.object(mock_stream, 'readline') as mock_readline:
120 mock_readline.return_value = output
121 result = self.proc._read(mock_stream, queue)
124 self.assertEqual(output, result)
125 self.assertEqual(output, queue.get_nowait())
127 self.assertFalse(result)
128 self.assertTrue(queue.empty())
130 def test__read_queues_and_returns_output(self):
131 self._test_read_output_queues_and_returns_result('foo')
133 def test__read_returns_none_for_missing_output(self):
134 self._test_read_output_queues_and_returns_result('')
136 def test_start_raises_exception_if_process_already_started(self):
137 self.proc._is_running = True
138 with testtools.ExpectedException(async_process.AsyncProcessException):
141 def test_start_invokes__spawn(self):
142 with mock.patch.object(self.proc, '_spawn') as mock_start:
145 mock_start.assert_called_once_with()
147 def test__iter_queue_returns_empty_list_for_empty_queue(self):
148 result = list(self.proc._iter_queue(eventlet.queue.LightQueue(),
150 self.assertEqual([], result)
152 def test__iter_queue_returns_queued_data(self):
153 queue = eventlet.queue.LightQueue()
155 result = list(self.proc._iter_queue(queue, False))
156 self.assertEqual(result, ['foo'])
158 def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
159 expected_value = 'foo'
160 with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue:
161 mock_iter_queue.return_value = expected_value
162 target_func = getattr(self.proc, 'iter_%s' % output_type, None)
163 value = target_func()
165 self.assertEqual(value, expected_value)
166 queue = getattr(self.proc, '_%s_lines' % output_type, None)
167 mock_iter_queue.assert_called_with(queue, False)
169 def test_iter_stdout(self):
170 self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
172 def test_iter_stderr(self):
173 self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
175 def test__kill_targets_process_for_pid(self):
178 with mock.patch.object(self.proc, '_kill_event'
179 ) as mock_kill_event,\
180 mock.patch.object(utils, 'get_root_helper_child_pid',
182 mock.patch.object(self.proc, '_kill_process'
183 ) as mock_kill_process,\
184 mock.patch.object(self.proc, '_process'):
185 self.proc._kill(signal.SIGKILL)
187 self.assertIsNone(self.proc._kill_event)
188 self.assertFalse(self.proc._is_running)
190 mock_kill_event.send.assert_called_once_with()
192 mock_kill_process.assert_called_once_with(pid, signal.SIGKILL)
194 def _test__kill_process(self, pid, expected, exception_message=None,
195 kill_signal=signal.SIGKILL):
196 self.proc.run_as_root = True
197 if exception_message:
198 exc = RuntimeError(exception_message)
201 with mock.patch.object(utils, 'execute',
202 side_effect=exc) as mock_execute:
203 actual = self.proc._kill_process(pid, kill_signal)
205 self.assertEqual(expected, actual)
206 mock_execute.assert_called_with(['kill', '-%d' % kill_signal, pid],
207 run_as_root=self.proc.run_as_root)
209 def test__kill_process_returns_true_for_valid_pid(self):
210 self._test__kill_process('1', True)
212 def test__kill_process_returns_true_for_stale_pid(self):
213 self._test__kill_process('1', True, 'No such process')
215 def test__kill_process_returns_false_for_execute_exception(self):
216 self._test__kill_process('1', False, 'Invalid')
218 def test_kill_process_with_different_signal(self):
219 self._test__kill_process('1', True, kill_signal=signal.SIGTERM)
221 def test_stop_calls_kill_with_provided_signal_number(self):
222 self.proc._is_running = True
223 with mock.patch.object(self.proc, '_kill') as mock_kill:
224 self.proc.stop(kill_signal=signal.SIGTERM)
225 mock_kill.assert_called_once_with(signal.SIGTERM)
227 def test_stop_raises_exception_if_already_started(self):
228 with testtools.ExpectedException(async_process.AsyncProcessException):
232 for expected, cmd in (('ls -l file', ['ls', '-l', 'file']),
234 proc = async_process.AsyncProcess(cmd)
235 self.assertEqual(expected, proc.cmd)
238 class TestAsyncProcessLogging(base.BaseTestCase):
241 super(TestAsyncProcessLogging, self).setUp()
242 self.log_mock = mock.patch.object(async_process, 'LOG').start()
244 def _test__read_stdout_logging(self, enable):
245 proc = async_process.AsyncProcess(['fakecmd'], log_output=enable)
246 with mock.patch.object(proc, '_read', return_value='fakedata'),\
247 mock.patch.object(proc, '_process'):
249 self.assertEqual(enable, self.log_mock.debug.called)
251 def _test__read_stderr_logging(self, enable):
252 proc = async_process.AsyncProcess(['fake'], log_output=enable)
253 with mock.patch.object(proc, '_read', return_value='fakedata'),\
254 mock.patch.object(proc, '_process'):
256 self.assertEqual(enable, self.log_mock.error.called)
258 def test__read_stdout_logging_enabled(self):
259 self._test__read_stdout_logging(enable=True)
261 def test__read_stdout_logging_disabled(self):
262 self._test__read_stdout_logging(enable=False)
264 def test__read_stderr_logging_enabled(self):
265 self._test__read_stderr_logging(enable=True)
267 def test__read_stderr_logging_disabled(self):
268 self._test__read_stderr_logging(enable=False)
271 class TestAsyncProcessDieOnError(base.BaseTestCase):
273 def test__read_stderr_returns_none_on_error(self):
274 proc = async_process.AsyncProcess(['fakecmd'], die_on_error=True)
275 with mock.patch.object(proc, '_read', return_value='fakedata'),\
276 mock.patch.object(proc, '_process'):
277 self.assertIsNone(proc._read_stderr())
280 class TestFailingAsyncProcess(base.BaseTestCase):
282 super(TestFailingAsyncProcess, self).setUp()
283 path = self.get_temp_file_path('async.tmp', self.get_new_temp_dir())
284 self.process = async_process.AsyncProcess(['python',
285 failing_process.__file__,
289 def test_failing_async_process_handle_error_once(self):
290 with mock.patch.object(self.process, '_handle_process_error')\
291 as handle_error_mock:
293 self.process._process.wait()
294 # Wait for the monitor process to complete
295 for thread in self.process._watchers:
297 self.assertEqual(1, handle_error_mock.call_count)