1 # Copyright 2012, VMware, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
23 from neutron.agent.linux import utils
24 from neutron.tests import base
25 from neutron.tests.common import helpers
31 class AgentUtilsExecuteTest(base.BaseTestCase):
33 super(AgentUtilsExecuteTest, self).setUp()
34 self.test_file = self.get_temp_file_path('test_execute.tmp')
35 open(self.test_file, 'w').close()
36 self.process = mock.patch('eventlet.green.subprocess.Popen').start()
37 self.process.return_value.returncode = 0
38 self.mock_popen = self.process.return_value.communicate
40 def test_without_helper(self):
41 expected = "%s\n" % self.test_file
42 self.mock_popen.return_value = [expected, ""]
43 result = utils.execute(["ls", self.test_file])
44 self.assertEqual(result, expected)
46 def test_with_helper(self):
47 expected = "ls %s\n" % self.test_file
48 self.mock_popen.return_value = [expected, ""]
49 self.config(group='AGENT', root_helper='echo')
50 result = utils.execute(["ls", self.test_file], run_as_root=True)
51 self.assertEqual(result, expected)
53 def test_stderr_true(self):
54 expected = "%s\n" % self.test_file
55 self.mock_popen.return_value = [expected, ""]
56 out = utils.execute(["ls", self.test_file], return_stderr=True)
57 self.assertIsInstance(out, tuple)
58 self.assertEqual(out, (expected, ""))
60 def test_check_exit_code(self):
61 self.mock_popen.return_value = ["", ""]
62 stdout = utils.execute(["ls", self.test_file[:-1]],
63 check_exit_code=False)
64 self.assertEqual("", stdout)
66 def test_execute_raises(self):
67 self.mock_popen.side_effect = RuntimeError
68 self.assertRaises(RuntimeError, utils.execute,
69 ["ls", self.test_file[:-1]])
71 def test_process_input(self):
72 expected = "%s\n" % self.test_file[:-1]
73 self.mock_popen.return_value = [expected, ""]
74 result = utils.execute(["cat"], process_input="%s\n" %
76 self.assertEqual(result, expected)
78 def test_with_addl_env(self):
79 expected = "%s\n" % self.test_file
80 self.mock_popen.return_value = [expected, ""]
81 result = utils.execute(["ls", self.test_file],
82 addl_env={'foo': 'bar'})
83 self.assertEqual(result, expected)
85 def test_return_code_log_error_raise_runtime(self):
86 self.mock_popen.return_value = ('', '')
87 self.process.return_value.returncode = 1
88 with mock.patch.object(utils, 'LOG') as log:
89 self.assertRaises(RuntimeError, utils.execute,
91 self.assertTrue(log.error.called)
93 def test_return_code_log_error_no_raise_runtime(self):
94 self.mock_popen.return_value = ('', '')
95 self.process.return_value.returncode = 1
96 with mock.patch.object(utils, 'LOG') as log:
97 utils.execute(['ls'], check_exit_code=False)
98 self.assertTrue(log.error.called)
100 def test_return_code_log_debug(self):
101 self.mock_popen.return_value = ('', '')
102 with mock.patch.object(utils, 'LOG') as log:
103 utils.execute(['ls'])
104 self.assertTrue(log.debug.called)
106 def test_return_code_log_error_change_locale(self):
107 ja_output = 'std_out in Japanese'
108 ja_error = 'std_err in Japanese'
109 ja_message_out = oslo_i18n._message.Message(ja_output)
110 ja_message_err = oslo_i18n._message.Message(ja_error)
111 ja_translate_out = oslo_i18n._translate.translate(ja_message_out, 'ja')
112 ja_translate_err = oslo_i18n._translate.translate(ja_message_err, 'ja')
113 self.mock_popen.return_value = (ja_translate_out, ja_translate_err)
114 self.process.return_value.returncode = 1
116 with mock.patch.object(utils, 'LOG') as log:
117 utils.execute(['ls'], check_exit_code=False)
118 self.assertIn(ja_translate_out, str(log.error.call_args_list))
119 self.assertIn(ja_translate_err, str(log.error.call_args_list))
121 def test_return_code_raise_runtime_do_not_log_fail_as_error(self):
122 self.mock_popen.return_value = ('', '')
123 self.process.return_value.returncode = 1
124 with mock.patch.object(utils, 'LOG') as log:
125 self.assertRaises(RuntimeError, utils.execute,
126 ['ls'], log_fail_as_error=False)
127 self.assertFalse(log.error.called)
129 def test_encode_process_input(self):
130 str_idata = "%s\n" % self.test_file[:-1]
131 str_odata = "%s\n" % self.test_file
133 bytes_idata = str_idata.encode(encoding='utf-8')
134 bytes_odata = str_odata.encode(encoding='utf-8')
135 self.mock_popen.return_value = [bytes_odata, b'']
136 result = utils.execute(['cat'], process_input=str_idata)
137 self.mock_popen.assert_called_once_with(bytes_idata)
138 self.assertEqual(str_odata, result)
140 self.mock_popen.return_value = [str_odata, '']
141 result = utils.execute(['cat'], process_input=str_idata)
142 self.mock_popen.assert_called_once_with(str_idata)
143 self.assertEqual(str_odata, result)
145 def test_return_str_data(self):
146 str_data = "%s\n" % self.test_file
147 self.mock_popen.return_value = [str_data, '']
148 result = utils.execute(['ls', self.test_file], return_stderr=True)
149 self.assertEqual((str_data, ''), result)
151 @helpers.requires_py3
152 def test_surrogateescape_in_decoding_out_data(self):
153 bytes_err_data = b'\xed\xa0\xbd'
154 err_data = bytes_err_data.decode('utf-8', 'surrogateescape')
155 out_data = "%s\n" % self.test_file
156 bytes_out_data = out_data.encode(encoding='utf-8')
157 self.mock_popen.return_value = [bytes_out_data, bytes_err_data]
158 result = utils.execute(['ls', self.test_file], return_stderr=True)
159 self.assertEqual((out_data, err_data), result)
162 class AgentUtilsExecuteEncodeTest(base.BaseTestCase):
164 super(AgentUtilsExecuteEncodeTest, self).setUp()
165 self.test_file = self.get_temp_file_path('test_execute.tmp')
166 open(self.test_file, 'w').close()
168 def test_decode_return_data(self):
169 str_data = "%s\n" % self.test_file
170 result = utils.execute(['ls', self.test_file], return_stderr=True)
171 self.assertEqual((str_data, ''), result)
174 class AgentUtilsGetInterfaceMAC(base.BaseTestCase):
175 def test_get_interface_mac(self):
176 expect_val = '01:02:03:04:05:06'
177 with mock.patch('fcntl.ioctl') as ioctl:
178 ioctl.return_value = ''.join(['\x00' * 18,
179 '\x01\x02\x03\x04\x05\x06',
181 actual_val = utils.get_interface_mac('eth0')
182 self.assertEqual(actual_val, expect_val)
185 class AgentUtilsReplaceFile(base.BaseTestCase):
186 def _test_replace_file_helper(self, explicit_perms=None):
187 # make file to replace
188 with mock.patch('tempfile.NamedTemporaryFile') as ntf:
189 ntf.return_value.name = '/baz'
190 with mock.patch('os.chmod') as chmod:
191 with mock.patch('os.rename') as rename:
192 if explicit_perms is None:
193 expected_perms = 0o644
194 utils.replace_file('/foo', 'bar')
196 expected_perms = explicit_perms
197 utils.replace_file('/foo', 'bar', explicit_perms)
199 expected = [mock.call('w+', dir='/', delete=False),
200 mock.call().write('bar'),
203 ntf.assert_has_calls(expected)
204 chmod.assert_called_once_with('/baz', expected_perms)
205 rename.assert_called_once_with('/baz', '/foo')
207 def test_replace_file_with_default_perms(self):
208 self._test_replace_file_helper()
210 def test_replace_file_with_0o600_perms(self):
211 self._test_replace_file_helper(0o600)
214 class TestFindChildPids(base.BaseTestCase):
216 def test_returns_empty_list_for_exit_code_1(self):
217 with mock.patch.object(utils, 'execute',
218 side_effect=RuntimeError('Exit code: 1')):
219 self.assertEqual([], utils.find_child_pids(-1))
221 def test_returns_empty_list_for_no_output(self):
222 with mock.patch.object(utils, 'execute', return_value=''):
223 self.assertEqual([], utils.find_child_pids(-1))
225 def test_returns_list_of_child_process_ids_for_good_ouput(self):
226 with mock.patch.object(utils, 'execute', return_value=' 123 \n 185\n'):
227 self.assertEqual(utils.find_child_pids(-1), ['123', '185'])
229 def test_raises_unknown_exception(self):
230 with testtools.ExpectedException(RuntimeError):
231 with mock.patch.object(utils, 'execute',
232 side_effect=RuntimeError()):
233 utils.find_child_pids(-1)
236 class TestGetRoothelperChildPid(base.BaseTestCase):
237 def _test_get_root_helper_child_pid(self, expected=_marker,
238 run_as_root=False, pids=None):
239 def _find_child_pids(x):
246 with mock.patch.object(utils, 'find_child_pids',
247 side_effect=_find_child_pids):
248 actual = utils.get_root_helper_child_pid(mock_pid, run_as_root)
249 if expected is _marker:
250 expected = str(mock_pid)
251 self.assertEqual(expected, actual)
253 def test_returns_process_pid_not_root(self):
254 self._test_get_root_helper_child_pid()
256 def test_returns_child_pid_as_root(self):
257 self._test_get_root_helper_child_pid(expected='2', pids=['1', '2'],
260 def test_returns_last_child_pid_as_root(self):
261 self._test_get_root_helper_child_pid(expected='3',
262 pids=['1', '2', '3'],
265 def test_returns_none_as_root(self):
266 self._test_get_root_helper_child_pid(expected=None, run_as_root=True)
269 class TestPathUtilities(base.BaseTestCase):
270 def test_remove_abs_path(self):
271 self.assertEqual(['ping', '8.8.8.8'],
272 utils.remove_abs_path(['/usr/bin/ping', '8.8.8.8']))
274 def test_cmd_matches_expected_matches_abs_path(self):
275 cmd = ['/bar/../foo']
276 self.assertTrue(utils.cmd_matches_expected(cmd, cmd))
278 def test_cmd_matches_expected_matches_script(self):
279 self.assertTrue(utils.cmd_matches_expected(['python', 'script'],
282 def test_cmd_matches_expected_doesnt_match(self):
283 self.assertFalse(utils.cmd_matches_expected('foo', 'bar'))
286 class FakeUser(object):
287 def __init__(self, name):
291 class FakeGroup(object):
292 def __init__(self, name):
296 class TestBaseOSUtils(base.BaseTestCase):
303 @mock.patch('os.geteuid', return_value=EUID)
304 @mock.patch('pwd.getpwuid', return_value=FakeUser(EUNAME))
305 def test_is_effective_user_id(self, getpwuid, geteuid):
306 self.assertTrue(utils.is_effective_user(self.EUID))
307 geteuid.assert_called_once_with()
308 self.assertFalse(getpwuid.called)
310 @mock.patch('os.geteuid', return_value=EUID)
311 @mock.patch('pwd.getpwuid', return_value=FakeUser(EUNAME))
312 def test_is_effective_user_str_id(self, getpwuid, geteuid):
313 self.assertTrue(utils.is_effective_user(str(self.EUID)))
314 geteuid.assert_called_once_with()
315 self.assertFalse(getpwuid.called)
317 @mock.patch('os.geteuid', return_value=EUID)
318 @mock.patch('pwd.getpwuid', return_value=FakeUser(EUNAME))
319 def test_is_effective_user_name(self, getpwuid, geteuid):
320 self.assertTrue(utils.is_effective_user(self.EUNAME))
321 geteuid.assert_called_once_with()
322 getpwuid.assert_called_once_with(self.EUID)
324 @mock.patch('os.geteuid', return_value=EUID)
325 @mock.patch('pwd.getpwuid', return_value=FakeUser(EUNAME))
326 def test_is_not_effective_user(self, getpwuid, geteuid):
327 self.assertFalse(utils.is_effective_user('wrong'))
328 geteuid.assert_called_once_with()
329 getpwuid.assert_called_once_with(self.EUID)
331 @mock.patch('os.getegid', return_value=EGID)
332 @mock.patch('grp.getgrgid', return_value=FakeGroup(EGNAME))
333 def test_is_effective_group_id(self, getgrgid, getegid):
334 self.assertTrue(utils.is_effective_group(self.EGID))
335 getegid.assert_called_once_with()
336 self.assertFalse(getgrgid.called)
338 @mock.patch('os.getegid', return_value=EGID)
339 @mock.patch('grp.getgrgid', return_value=FakeGroup(EGNAME))
340 def test_is_effective_group_str_id(self, getgrgid, getegid):
341 self.assertTrue(utils.is_effective_group(str(self.EGID)))
342 getegid.assert_called_once_with()
343 self.assertFalse(getgrgid.called)
345 @mock.patch('os.getegid', return_value=EGID)
346 @mock.patch('grp.getgrgid', return_value=FakeGroup(EGNAME))
347 def test_is_effective_group_name(self, getgrgid, getegid):
348 self.assertTrue(utils.is_effective_group(self.EGNAME))
349 getegid.assert_called_once_with()
350 getgrgid.assert_called_once_with(self.EGID)
352 @mock.patch('os.getegid', return_value=EGID)
353 @mock.patch('grp.getgrgid', return_value=FakeGroup(EGNAME))
354 def test_is_not_effective_group(self, getgrgid, getegid):
355 self.assertFalse(utils.is_effective_group('wrong'))
356 getegid.assert_called_once_with()
357 getgrgid.assert_called_once_with(self.EGID)
360 class TestUnixDomainHttpConnection(base.BaseTestCase):
361 def test_connect(self):
362 with mock.patch.object(utils, 'cfg') as cfg:
363 cfg.CONF.metadata_proxy_socket = '/the/path'
364 with mock.patch('socket.socket') as socket_create:
365 conn = utils.UnixDomainHTTPConnection('169.254.169.254',
369 socket_create.assert_has_calls([
370 mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
371 mock.call().settimeout(3),
372 mock.call().connect('/the/path')]
374 self.assertEqual(conn.timeout, 3)
377 class TestUnixDomainHttpProtocol(base.BaseTestCase):
378 def test_init_empty_client(self):
379 u = utils.UnixDomainHttpProtocol(mock.Mock(), '', mock.Mock())
380 self.assertEqual(u.client_address, ('<local>', 0))
382 def test_init_with_client(self):
383 u = utils.UnixDomainHttpProtocol(mock.Mock(), 'foo', mock.Mock())
384 self.assertEqual(u.client_address, 'foo')
387 class TestUnixDomainWSGIServer(base.BaseTestCase):
389 super(TestUnixDomainWSGIServer, self).setUp()
390 self.eventlet_p = mock.patch.object(utils, 'eventlet')
391 self.eventlet = self.eventlet_p.start()
392 self.server = utils.UnixDomainWSGIServer('test')
394 def test_start(self):
395 mock_app = mock.Mock()
396 with mock.patch.object(self.server, '_launch') as launcher:
397 self.server.start(mock_app, '/the/path', workers=5, backlog=128)
398 self.eventlet.assert_has_calls([
401 family=socket.AF_UNIX,
405 launcher.assert_called_once_with(mock_app, workers=5)
408 self.server._run('app', 'sock')
410 self.eventlet.wsgi.server.assert_called_once_with(
413 protocol=utils.UnixDomainHttpProtocol,
415 max_size=self.server.num_threads