]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Python 3: encode or decode i/o data of Popen.communicate()
authorfumihiko kakuma <kakuma@valinux.co.jp>
Wed, 22 Jul 2015 05:00:25 +0000 (14:00 +0900)
committerfumihiko kakuma <kakuma@valinux.co.jp>
Sun, 16 Aug 2015 05:42:48 +0000 (14:42 +0900)
In Python 3, input and output for Popen.communicate() is bytes type.
Therefore, encode input data and decode return data for Popen.communicate().

Change-Id: I70f009e3366f0eeda5790652ea14f3627b934664
Blueprint: neutron-python3
Closes-Bug: #1479159

neutron/agent/linux/utils.py
neutron/agent/windows/utils.py
neutron/tests/unit/agent/linux/test_utils.py

index 3594a1b388ab5a6abcbc25e51ae2445329a7303b..95c47a0607bc0080b6c0efd431366889250df66a 100644 (file)
@@ -108,15 +108,31 @@ def execute(cmd, process_input=None, addl_env=None,
             check_exit_code=True, return_stderr=False, log_fail_as_error=True,
             extra_ok_codes=None, run_as_root=False):
     try:
+        if (process_input is None or
+            isinstance(process_input, six.binary_type)):
+            _process_input = process_input
+        else:
+            _process_input = process_input.encode('utf-8')
         if run_as_root and cfg.CONF.AGENT.root_helper_daemon:
             returncode, _stdout, _stderr = (
                 execute_rootwrap_daemon(cmd, process_input, addl_env))
         else:
             obj, cmd = create_process(cmd, run_as_root=run_as_root,
                                       addl_env=addl_env)
-            _stdout, _stderr = obj.communicate(process_input)
+            _stdout, _stderr = obj.communicate(_process_input)
             returncode = obj.returncode
             obj.stdin.close()
+        if six.PY3:
+            if isinstance(_stdout, bytes):
+                try:
+                    _stdout = _stdout.decode(encoding='utf-8')
+                except UnicodeError:
+                    pass
+            if isinstance(_stderr, bytes):
+                try:
+                    _stderr = _stderr.decode(encoding='utf-8')
+                except UnicodeError:
+                    pass
 
         m = _("\nCommand: {cmd}\nExit code: {code}\nStdin: {stdin}\n"
               "Stdout: {stdout}\nStderr: {stderr}").format(
index 5221534a63bd27317c285cf8719b43673f909a2d..bcbccd3bcd12bc15098136f23419ed830b684961 100644 (file)
@@ -18,6 +18,7 @@ import os
 from eventlet.green import subprocess
 from eventlet import greenthread
 from oslo_log import log as logging
+import six
 
 from neutron.common import utils
 
@@ -45,12 +46,29 @@ def create_process(cmd, addl_env=None):
 
 def execute(cmd, process_input=None, addl_env=None,
             check_exit_code=True, return_stderr=False, log_fail_as_error=True,
-            extra_ok_codes=None, run_as_root=False):
+            extra_ok_codes=None, run_as_root=False, do_decode=True):
 
     try:
+        if (process_input is None or
+            isinstance(process_input, six.binary_type)):
+            _process_input = process_input
+        else:
+            _process_input = process_input.encode('utf-8')
         obj, cmd = create_process(cmd, addl_env=addl_env)
-        _stdout, _stderr = obj.communicate(process_input)
+        _stdout, _stderr = obj.communicate(_process_input)
         obj.stdin.close()
+        if six.PY3:
+            if isinstance(_stdout, bytes):
+                try:
+                    _stdout = _stdout.decode(encoding='utf-8')
+                except UnicodeError:
+                    pass
+            if isinstance(_stderr, bytes):
+                try:
+                    _stderr = _stderr.decode(encoding='utf-8')
+                except UnicodeError:
+                    pass
+
         m = _("\nCommand: %(cmd)s\nExit code: %(code)s\nStdin: %(stdin)s\n"
               "Stdout: %(stdout)s\nStderr: %(stderr)s") % \
             {'cmd': cmd,
index 9a2e89ffa355fd8745f0a9d4b410829c05fda1fc..b4db92f958d7a2aa8d732b0c84f3711108a02fee 100644 (file)
@@ -15,6 +15,7 @@
 import socket
 
 import mock
+import six
 import testtools
 
 from neutron.agent.linux import utils
@@ -107,6 +108,55 @@ class AgentUtilsExecuteTest(base.BaseTestCase):
                               ['ls'], log_fail_as_error=False)
             self.assertFalse(log.error.called)
 
+    def test_encode_process_input(self):
+        str_idata = "%s\n" % self.test_file[:-1]
+        str_odata = "%s\n" % self.test_file
+        if six.PY3:
+            bytes_idata = str_idata.encode(encoding='utf-8')
+            bytes_odata = str_odata.encode(encoding='utf-8')
+            self.mock_popen.return_value = [bytes_odata, b'']
+            result = utils.execute(['cat'], process_input=str_idata)
+            self.mock_popen.assert_called_once_with(bytes_idata)
+            self.assertEqual(str_odata, result)
+        else:
+            self.mock_popen.return_value = [str_odata, '']
+            result = utils.execute(['cat'], process_input=str_idata)
+            self.mock_popen.assert_called_once_with(str_idata)
+            self.assertEqual(str_odata, result)
+
+    def test_return_str_data(self):
+        str_data = "%s\n" % self.test_file
+        self.mock_popen.return_value = [str_data, '']
+        result = utils.execute(['ls', self.test_file], return_stderr=True)
+        self.assertEqual((str_data, ''), result)
+
+    def test_raise_unicodeerror_in_decoding_out_data(self):
+        class m_bytes(bytes):
+            def decode(self, encoding=None):
+                raise UnicodeError
+
+        err_data = 'UnicodeError'
+        bytes_err_data = b'UnicodeError'
+        out_data = "%s\n" % self.test_file
+        bytes_out_data = m_bytes(out_data.encode(encoding='utf-8'))
+        if six.PY3:
+            self.mock_popen.return_value = [bytes_out_data, bytes_err_data]
+            result = utils.execute(['ls', self.test_file],
+                                   return_stderr=True)
+            self.assertEqual((bytes_out_data, err_data), result)
+
+
+class AgentUtilsExecuteEncodeTest(base.BaseTestCase):
+    def setUp(self):
+        super(AgentUtilsExecuteEncodeTest, self).setUp()
+        self.test_file = self.get_temp_file_path('test_execute.tmp')
+        open(self.test_file, 'w').close()
+
+    def test_decode_return_data(self):
+        str_data = "%s\n" % self.test_file
+        result = utils.execute(['ls', self.test_file], return_stderr=True)
+        self.assertEqual((str_data, ''), result)
+
 
 class AgentUtilsGetInterfaceMAC(base.BaseTestCase):
     def test_get_interface_mac(self):