check_exit_code=True, return_stderr=False, log_fail_as_error=True,
extra_ok_codes=None, run_as_root=False):
try:
+ if (process_input is None or
+ isinstance(process_input, six.binary_type)):
+ _process_input = process_input
+ else:
+ _process_input = process_input.encode('utf-8')
if run_as_root and cfg.CONF.AGENT.root_helper_daemon:
returncode, _stdout, _stderr = (
execute_rootwrap_daemon(cmd, process_input, addl_env))
else:
obj, cmd = create_process(cmd, run_as_root=run_as_root,
addl_env=addl_env)
- _stdout, _stderr = obj.communicate(process_input)
+ _stdout, _stderr = obj.communicate(_process_input)
returncode = obj.returncode
obj.stdin.close()
+ if six.PY3:
+ if isinstance(_stdout, bytes):
+ try:
+ _stdout = _stdout.decode(encoding='utf-8')
+ except UnicodeError:
+ pass
+ if isinstance(_stderr, bytes):
+ try:
+ _stderr = _stderr.decode(encoding='utf-8')
+ except UnicodeError:
+ pass
m = _("\nCommand: {cmd}\nExit code: {code}\nStdin: {stdin}\n"
"Stdout: {stdout}\nStderr: {stderr}").format(
from eventlet.green import subprocess
from eventlet import greenthread
from oslo_log import log as logging
+import six
from neutron.common import utils
def execute(cmd, process_input=None, addl_env=None,
check_exit_code=True, return_stderr=False, log_fail_as_error=True,
- extra_ok_codes=None, run_as_root=False):
+ extra_ok_codes=None, run_as_root=False, do_decode=True):
try:
+ if (process_input is None or
+ isinstance(process_input, six.binary_type)):
+ _process_input = process_input
+ else:
+ _process_input = process_input.encode('utf-8')
obj, cmd = create_process(cmd, addl_env=addl_env)
- _stdout, _stderr = obj.communicate(process_input)
+ _stdout, _stderr = obj.communicate(_process_input)
obj.stdin.close()
+ if six.PY3:
+ if isinstance(_stdout, bytes):
+ try:
+ _stdout = _stdout.decode(encoding='utf-8')
+ except UnicodeError:
+ pass
+ if isinstance(_stderr, bytes):
+ try:
+ _stderr = _stderr.decode(encoding='utf-8')
+ except UnicodeError:
+ pass
+
m = _("\nCommand: %(cmd)s\nExit code: %(code)s\nStdin: %(stdin)s\n"
"Stdout: %(stdout)s\nStderr: %(stderr)s") % \
{'cmd': cmd,
import socket
import mock
+import six
import testtools
from neutron.agent.linux import utils
['ls'], log_fail_as_error=False)
self.assertFalse(log.error.called)
+ def test_encode_process_input(self):
+ str_idata = "%s\n" % self.test_file[:-1]
+ str_odata = "%s\n" % self.test_file
+ if six.PY3:
+ bytes_idata = str_idata.encode(encoding='utf-8')
+ bytes_odata = str_odata.encode(encoding='utf-8')
+ self.mock_popen.return_value = [bytes_odata, b'']
+ result = utils.execute(['cat'], process_input=str_idata)
+ self.mock_popen.assert_called_once_with(bytes_idata)
+ self.assertEqual(str_odata, result)
+ else:
+ self.mock_popen.return_value = [str_odata, '']
+ result = utils.execute(['cat'], process_input=str_idata)
+ self.mock_popen.assert_called_once_with(str_idata)
+ self.assertEqual(str_odata, result)
+
+ def test_return_str_data(self):
+ str_data = "%s\n" % self.test_file
+ self.mock_popen.return_value = [str_data, '']
+ result = utils.execute(['ls', self.test_file], return_stderr=True)
+ self.assertEqual((str_data, ''), result)
+
+ def test_raise_unicodeerror_in_decoding_out_data(self):
+ class m_bytes(bytes):
+ def decode(self, encoding=None):
+ raise UnicodeError
+
+ err_data = 'UnicodeError'
+ bytes_err_data = b'UnicodeError'
+ out_data = "%s\n" % self.test_file
+ bytes_out_data = m_bytes(out_data.encode(encoding='utf-8'))
+ if six.PY3:
+ self.mock_popen.return_value = [bytes_out_data, bytes_err_data]
+ result = utils.execute(['ls', self.test_file],
+ return_stderr=True)
+ self.assertEqual((bytes_out_data, err_data), result)
+
+
+class AgentUtilsExecuteEncodeTest(base.BaseTestCase):
+ def setUp(self):
+ super(AgentUtilsExecuteEncodeTest, self).setUp()
+ self.test_file = self.get_temp_file_path('test_execute.tmp')
+ open(self.test_file, 'w').close()
+
+ def test_decode_return_data(self):
+ str_data = "%s\n" % self.test_file
+ result = utils.execute(['ls', self.test_file], return_stderr=True)
+ self.assertEqual((str_data, ''), result)
+
class AgentUtilsGetInterfaceMAC(base.BaseTestCase):
def test_get_interface_mac(self):