Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / tests / unit / agent / linux / test_utils.py
1 # Copyright 2012, VMware, Inc.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import socket
16
17 import mock
18 import six
19 import testtools
20
21 import oslo_i18n
22
23 from neutron.agent.linux import utils
24 from neutron.tests import base
25 from neutron.tests.common import helpers
26
27
28 _marker = object()
29
30
31 class AgentUtilsExecuteTest(base.BaseTestCase):
32     def setUp(self):
33         super(AgentUtilsExecuteTest, self).setUp()
34         self.test_file = self.get_temp_file_path('test_execute.tmp')
35         open(self.test_file, 'w').close()
36         self.process = mock.patch('eventlet.green.subprocess.Popen').start()
37         self.process.return_value.returncode = 0
38         self.mock_popen = self.process.return_value.communicate
39
40     def test_without_helper(self):
41         expected = "%s\n" % self.test_file
42         self.mock_popen.return_value = [expected, ""]
43         result = utils.execute(["ls", self.test_file])
44         self.assertEqual(result, expected)
45
46     def test_with_helper(self):
47         expected = "ls %s\n" % self.test_file
48         self.mock_popen.return_value = [expected, ""]
49         self.config(group='AGENT', root_helper='echo')
50         result = utils.execute(["ls", self.test_file], run_as_root=True)
51         self.assertEqual(result, expected)
52
53     def test_stderr_true(self):
54         expected = "%s\n" % self.test_file
55         self.mock_popen.return_value = [expected, ""]
56         out = utils.execute(["ls", self.test_file], return_stderr=True)
57         self.assertIsInstance(out, tuple)
58         self.assertEqual(out, (expected, ""))
59
60     def test_check_exit_code(self):
61         self.mock_popen.return_value = ["", ""]
62         stdout = utils.execute(["ls", self.test_file[:-1]],
63                                check_exit_code=False)
64         self.assertEqual("", stdout)
65
66     def test_execute_raises(self):
67         self.mock_popen.side_effect = RuntimeError
68         self.assertRaises(RuntimeError, utils.execute,
69                           ["ls", self.test_file[:-1]])
70
71     def test_process_input(self):
72         expected = "%s\n" % self.test_file[:-1]
73         self.mock_popen.return_value = [expected, ""]
74         result = utils.execute(["cat"], process_input="%s\n" %
75                                self.test_file[:-1])
76         self.assertEqual(result, expected)
77
78     def test_with_addl_env(self):
79         expected = "%s\n" % self.test_file
80         self.mock_popen.return_value = [expected, ""]
81         result = utils.execute(["ls", self.test_file],
82                                addl_env={'foo': 'bar'})
83         self.assertEqual(result, expected)
84
85     def test_return_code_log_error_raise_runtime(self):
86         self.mock_popen.return_value = ('', '')
87         self.process.return_value.returncode = 1
88         with mock.patch.object(utils, 'LOG') as log:
89             self.assertRaises(RuntimeError, utils.execute,
90                               ['ls'])
91             self.assertTrue(log.error.called)
92
93     def test_return_code_log_error_no_raise_runtime(self):
94         self.mock_popen.return_value = ('', '')
95         self.process.return_value.returncode = 1
96         with mock.patch.object(utils, 'LOG') as log:
97             utils.execute(['ls'], check_exit_code=False)
98             self.assertTrue(log.error.called)
99
100     def test_return_code_log_debug(self):
101         self.mock_popen.return_value = ('', '')
102         with mock.patch.object(utils, 'LOG') as log:
103             utils.execute(['ls'])
104             self.assertTrue(log.debug.called)
105
106     def test_return_code_log_error_change_locale(self):
107         ja_output = 'std_out in Japanese'
108         ja_error = 'std_err in Japanese'
109         ja_message_out = oslo_i18n._message.Message(ja_output)
110         ja_message_err = oslo_i18n._message.Message(ja_error)
111         ja_translate_out = oslo_i18n._translate.translate(ja_message_out, 'ja')
112         ja_translate_err = oslo_i18n._translate.translate(ja_message_err, 'ja')
113         self.mock_popen.return_value = (ja_translate_out, ja_translate_err)
114         self.process.return_value.returncode = 1
115
116         with mock.patch.object(utils, 'LOG') as log:
117             utils.execute(['ls'], check_exit_code=False)
118             self.assertIn(ja_translate_out, str(log.error.call_args_list))
119             self.assertIn(ja_translate_err, str(log.error.call_args_list))
120
121     def test_return_code_raise_runtime_do_not_log_fail_as_error(self):
122         self.mock_popen.return_value = ('', '')
123         self.process.return_value.returncode = 1
124         with mock.patch.object(utils, 'LOG') as log:
125             self.assertRaises(RuntimeError, utils.execute,
126                               ['ls'], log_fail_as_error=False)
127             self.assertFalse(log.error.called)
128
129     def test_encode_process_input(self):
130         str_idata = "%s\n" % self.test_file[:-1]
131         str_odata = "%s\n" % self.test_file
132         if six.PY3:
133             bytes_idata = str_idata.encode(encoding='utf-8')
134             bytes_odata = str_odata.encode(encoding='utf-8')
135             self.mock_popen.return_value = [bytes_odata, b'']
136             result = utils.execute(['cat'], process_input=str_idata)
137             self.mock_popen.assert_called_once_with(bytes_idata)
138             self.assertEqual(str_odata, result)
139         else:
140             self.mock_popen.return_value = [str_odata, '']
141             result = utils.execute(['cat'], process_input=str_idata)
142             self.mock_popen.assert_called_once_with(str_idata)
143             self.assertEqual(str_odata, result)
144
145     def test_return_str_data(self):
146         str_data = "%s\n" % self.test_file
147         self.mock_popen.return_value = [str_data, '']
148         result = utils.execute(['ls', self.test_file], return_stderr=True)
149         self.assertEqual((str_data, ''), result)
150
151     @helpers.requires_py3
152     def test_surrogateescape_in_decoding_out_data(self):
153         bytes_err_data = b'\xed\xa0\xbd'
154         err_data = bytes_err_data.decode('utf-8', 'surrogateescape')
155         out_data = "%s\n" % self.test_file
156         bytes_out_data = out_data.encode(encoding='utf-8')
157         self.mock_popen.return_value = [bytes_out_data, bytes_err_data]
158         result = utils.execute(['ls', self.test_file], return_stderr=True)
159         self.assertEqual((out_data, err_data), result)
160
161
162 class AgentUtilsExecuteEncodeTest(base.BaseTestCase):
163     def setUp(self):
164         super(AgentUtilsExecuteEncodeTest, self).setUp()
165         self.test_file = self.get_temp_file_path('test_execute.tmp')
166         open(self.test_file, 'w').close()
167
168     def test_decode_return_data(self):
169         str_data = "%s\n" % self.test_file
170         result = utils.execute(['ls', self.test_file], return_stderr=True)
171         self.assertEqual((str_data, ''), result)
172
173
174 class AgentUtilsGetInterfaceMAC(base.BaseTestCase):
175     def test_get_interface_mac(self):
176         expect_val = '01:02:03:04:05:06'
177         with mock.patch('fcntl.ioctl') as ioctl:
178             ioctl.return_value = ''.join(['\x00' * 18,
179                                           '\x01\x02\x03\x04\x05\x06',
180                                           '\x00' * 232])
181             actual_val = utils.get_interface_mac('eth0')
182         self.assertEqual(actual_val, expect_val)
183
184
185 class AgentUtilsReplaceFile(base.BaseTestCase):
186     def _test_replace_file_helper(self, explicit_perms=None):
187         # make file to replace
188         with mock.patch('tempfile.NamedTemporaryFile') as ntf:
189             ntf.return_value.name = '/baz'
190             with mock.patch('os.chmod') as chmod:
191                 with mock.patch('os.rename') as rename:
192                     if explicit_perms is None:
193                         expected_perms = 0o644
194                         utils.replace_file('/foo', 'bar')
195                     else:
196                         expected_perms = explicit_perms
197                         utils.replace_file('/foo', 'bar', explicit_perms)
198
199                     expected = [mock.call('w+', dir='/', delete=False),
200                                 mock.call().write('bar'),
201                                 mock.call().close()]
202
203                     ntf.assert_has_calls(expected)
204                     chmod.assert_called_once_with('/baz', expected_perms)
205                     rename.assert_called_once_with('/baz', '/foo')
206
207     def test_replace_file_with_default_perms(self):
208         self._test_replace_file_helper()
209
210     def test_replace_file_with_0o600_perms(self):
211         self._test_replace_file_helper(0o600)
212
213
214 class TestFindChildPids(base.BaseTestCase):
215
216     def test_returns_empty_list_for_exit_code_1(self):
217         with mock.patch.object(utils, 'execute',
218                                side_effect=RuntimeError('Exit code: 1')):
219             self.assertEqual([], utils.find_child_pids(-1))
220
221     def test_returns_empty_list_for_no_output(self):
222         with mock.patch.object(utils, 'execute', return_value=''):
223             self.assertEqual([], utils.find_child_pids(-1))
224
225     def test_returns_list_of_child_process_ids_for_good_ouput(self):
226         with mock.patch.object(utils, 'execute', return_value=' 123 \n 185\n'):
227             self.assertEqual(utils.find_child_pids(-1), ['123', '185'])
228
229     def test_raises_unknown_exception(self):
230         with testtools.ExpectedException(RuntimeError):
231             with mock.patch.object(utils, 'execute',
232                                    side_effect=RuntimeError()):
233                 utils.find_child_pids(-1)
234
235
236 class TestGetRoothelperChildPid(base.BaseTestCase):
237     def _test_get_root_helper_child_pid(self, expected=_marker,
238                                         run_as_root=False, pids=None):
239         def _find_child_pids(x):
240             if not pids:
241                 return []
242             pids.pop(0)
243             return pids
244
245         mock_pid = object()
246         with mock.patch.object(utils, 'find_child_pids',
247                                side_effect=_find_child_pids):
248             actual = utils.get_root_helper_child_pid(mock_pid, run_as_root)
249         if expected is _marker:
250             expected = str(mock_pid)
251         self.assertEqual(expected, actual)
252
253     def test_returns_process_pid_not_root(self):
254         self._test_get_root_helper_child_pid()
255
256     def test_returns_child_pid_as_root(self):
257         self._test_get_root_helper_child_pid(expected='2', pids=['1', '2'],
258                                              run_as_root=True)
259
260     def test_returns_last_child_pid_as_root(self):
261         self._test_get_root_helper_child_pid(expected='3',
262                                              pids=['1', '2', '3'],
263                                              run_as_root=True)
264
265     def test_returns_none_as_root(self):
266         self._test_get_root_helper_child_pid(expected=None, run_as_root=True)
267
268
269 class TestPathUtilities(base.BaseTestCase):
270     def test_remove_abs_path(self):
271         self.assertEqual(['ping', '8.8.8.8'],
272                          utils.remove_abs_path(['/usr/bin/ping', '8.8.8.8']))
273
274     def test_cmd_matches_expected_matches_abs_path(self):
275         cmd = ['/bar/../foo']
276         self.assertTrue(utils.cmd_matches_expected(cmd, cmd))
277
278     def test_cmd_matches_expected_matches_script(self):
279         self.assertTrue(utils.cmd_matches_expected(['python', 'script'],
280                                                    ['script']))
281
282     def test_cmd_matches_expected_doesnt_match(self):
283         self.assertFalse(utils.cmd_matches_expected('foo', 'bar'))
284
285
286 class FakeUser(object):
287     def __init__(self, name):
288         self.pw_name = name
289
290
291 class FakeGroup(object):
292     def __init__(self, name):
293         self.gr_name = name
294
295
296 class TestBaseOSUtils(base.BaseTestCase):
297
298     EUID = 123
299     EUNAME = 'user'
300     EGID = 456
301     EGNAME = 'group'
302
303     @mock.patch('os.geteuid', return_value=EUID)
304     @mock.patch('pwd.getpwuid', return_value=FakeUser(EUNAME))
305     def test_is_effective_user_id(self, getpwuid, geteuid):
306         self.assertTrue(utils.is_effective_user(self.EUID))
307         geteuid.assert_called_once_with()
308         self.assertFalse(getpwuid.called)
309
310     @mock.patch('os.geteuid', return_value=EUID)
311     @mock.patch('pwd.getpwuid', return_value=FakeUser(EUNAME))
312     def test_is_effective_user_str_id(self, getpwuid, geteuid):
313         self.assertTrue(utils.is_effective_user(str(self.EUID)))
314         geteuid.assert_called_once_with()
315         self.assertFalse(getpwuid.called)
316
317     @mock.patch('os.geteuid', return_value=EUID)
318     @mock.patch('pwd.getpwuid', return_value=FakeUser(EUNAME))
319     def test_is_effective_user_name(self, getpwuid, geteuid):
320         self.assertTrue(utils.is_effective_user(self.EUNAME))
321         geteuid.assert_called_once_with()
322         getpwuid.assert_called_once_with(self.EUID)
323
324     @mock.patch('os.geteuid', return_value=EUID)
325     @mock.patch('pwd.getpwuid', return_value=FakeUser(EUNAME))
326     def test_is_not_effective_user(self, getpwuid, geteuid):
327         self.assertFalse(utils.is_effective_user('wrong'))
328         geteuid.assert_called_once_with()
329         getpwuid.assert_called_once_with(self.EUID)
330
331     @mock.patch('os.getegid', return_value=EGID)
332     @mock.patch('grp.getgrgid', return_value=FakeGroup(EGNAME))
333     def test_is_effective_group_id(self, getgrgid, getegid):
334         self.assertTrue(utils.is_effective_group(self.EGID))
335         getegid.assert_called_once_with()
336         self.assertFalse(getgrgid.called)
337
338     @mock.patch('os.getegid', return_value=EGID)
339     @mock.patch('grp.getgrgid', return_value=FakeGroup(EGNAME))
340     def test_is_effective_group_str_id(self, getgrgid, getegid):
341         self.assertTrue(utils.is_effective_group(str(self.EGID)))
342         getegid.assert_called_once_with()
343         self.assertFalse(getgrgid.called)
344
345     @mock.patch('os.getegid', return_value=EGID)
346     @mock.patch('grp.getgrgid', return_value=FakeGroup(EGNAME))
347     def test_is_effective_group_name(self, getgrgid, getegid):
348         self.assertTrue(utils.is_effective_group(self.EGNAME))
349         getegid.assert_called_once_with()
350         getgrgid.assert_called_once_with(self.EGID)
351
352     @mock.patch('os.getegid', return_value=EGID)
353     @mock.patch('grp.getgrgid', return_value=FakeGroup(EGNAME))
354     def test_is_not_effective_group(self, getgrgid, getegid):
355         self.assertFalse(utils.is_effective_group('wrong'))
356         getegid.assert_called_once_with()
357         getgrgid.assert_called_once_with(self.EGID)
358
359
360 class TestUnixDomainHttpConnection(base.BaseTestCase):
361     def test_connect(self):
362         with mock.patch.object(utils, 'cfg') as cfg:
363             cfg.CONF.metadata_proxy_socket = '/the/path'
364             with mock.patch('socket.socket') as socket_create:
365                 conn = utils.UnixDomainHTTPConnection('169.254.169.254',
366                                                       timeout=3)
367                 conn.connect()
368
369                 socket_create.assert_has_calls([
370                     mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
371                     mock.call().settimeout(3),
372                     mock.call().connect('/the/path')]
373                 )
374                 self.assertEqual(conn.timeout, 3)
375
376
377 class TestUnixDomainHttpProtocol(base.BaseTestCase):
378     def test_init_empty_client(self):
379         u = utils.UnixDomainHttpProtocol(mock.Mock(), '', mock.Mock())
380         self.assertEqual(u.client_address, ('<local>', 0))
381
382     def test_init_with_client(self):
383         u = utils.UnixDomainHttpProtocol(mock.Mock(), 'foo', mock.Mock())
384         self.assertEqual(u.client_address, 'foo')
385
386
387 class TestUnixDomainWSGIServer(base.BaseTestCase):
388     def setUp(self):
389         super(TestUnixDomainWSGIServer, self).setUp()
390         self.eventlet_p = mock.patch.object(utils, 'eventlet')
391         self.eventlet = self.eventlet_p.start()
392         self.server = utils.UnixDomainWSGIServer('test')
393
394     def test_start(self):
395         mock_app = mock.Mock()
396         with mock.patch.object(self.server, '_launch') as launcher:
397             self.server.start(mock_app, '/the/path', workers=5, backlog=128)
398             self.eventlet.assert_has_calls([
399                 mock.call.listen(
400                     '/the/path',
401                     family=socket.AF_UNIX,
402                     backlog=128
403                 )]
404             )
405             launcher.assert_called_once_with(mock_app, workers=5)
406
407     def test_run(self):
408         self.server._run('app', 'sock')
409
410         self.eventlet.wsgi.server.assert_called_once_with(
411             'sock',
412             'app',
413             protocol=utils.UnixDomainHttpProtocol,
414             log=mock.ANY,
415             max_size=self.server.num_threads
416         )