import json
import os
import socket
-import StringIO
import eventlet
from oslo.config import cfg
+import six
from cinder.backup.driver import BackupDriver
from cinder import exception
metadata['created_at'] = str(backup['created_at'])
metadata['objects'] = object_list
metadata_json = json.dumps(metadata, sort_keys=True, indent=2)
- reader = StringIO.StringIO(metadata_json)
+ reader = six.StringIO(metadata_json)
etag = self.conn.put_object(container, filename, reader,
content_length=reader.len)
md5 = hashlib.md5(metadata_json).hexdigest()
LOG.debug(_('not compressing data'))
obj[object_name]['compression'] = 'none'
- reader = StringIO.StringIO(data)
+ reader = six.StringIO(data)
LOG.debug(_('About to put_object'))
try:
etag = self.conn.put_object(container, object_name, reader,
# License for the specific language governing permissions and limitations
# under the License.
-import StringIO
-
from oslo.config import cfg
+import six
import webob
from cinder.api.middleware import sizelimit
def test_limiting_reader(self):
BYTES = 1024
bytes_read = 0
- data = StringIO.StringIO("*" * BYTES)
+ data = six.StringIO("*" * BYTES)
for chunk in sizelimit.LimitingReader(data, BYTES):
bytes_read += len(chunk)
self.assertEqual(bytes_read, BYTES)
bytes_read = 0
- data = StringIO.StringIO("*" * BYTES)
+ data = six.StringIO("*" * BYTES)
reader = sizelimit.LimitingReader(data, BYTES)
byte = reader.read(1)
while len(byte) != 0:
def _consume_all_iter():
bytes_read = 0
- data = StringIO.StringIO("*" * BYTES)
+ data = six.StringIO("*" * BYTES)
for chunk in sizelimit.LimitingReader(data, BYTES - 1):
bytes_read += len(chunk)
def _consume_all_read():
bytes_read = 0
- data = StringIO.StringIO("*" * BYTES)
+ data = six.StringIO("*" * BYTES)
reader = sizelimit.LimitingReader(data, BYTES - 1)
byte = reader.read(1)
while len(byte) != 0:
"""
import httplib
-import StringIO
from xml.dom import minidom
from lxml import etree
+import six
import webob
from cinder.api.v1 import limits
def __init__(self, response_string):
"""Initialize new `FakeHttplibSocket`."""
- self._buffer = StringIO.StringIO(response_string)
+ self._buffer = six.StringIO(response_string)
def makefile(self, _mode, _other):
"""Returns the socket's internal buffer."""
"""
import httplib
-import StringIO
from lxml import etree
+import six
import webob
from xml.dom import minidom
def __init__(self, response_string):
"""Initialize new `FakeHttplibSocket`."""
- self._buffer = StringIO.StringIO(response_string)
+ self._buffer = six.StringIO(response_string)
def makefile(self, _mode, _other):
"""Returns the socket's internal buffer."""
"""
import datetime
-import StringIO
+
+import six
from cinder.openstack.common import jsonutils
from cinder.scheduler import scheduler_options
def _get_file_handle(self, filename):
self.file_was_loaded = True
- return StringIO.StringIO(self._file_data)
+ return six.StringIO(self._file_data)
def _get_time_now(self):
return self._time_now
"""Unit tests for the API endpoint."""
import httplib
-import StringIO
+import six
import webob
"""A fake socket implementation for httplib.HTTPResponse, trivial."""
def __init__(self, response_string):
self.response_string = response_string
- self._buffer = StringIO.StringIO(response_string)
+ self._buffer = six.StringIO(response_string)
def makefile(self, _mode, _other):
"""Returns the socket's internal buffer."""
import BaseHTTPServer
import httplib
from lxml import etree
-import StringIO
+
+import six
from cinder import exception
from cinder.openstack.common import log as logging
class FakeHttplibSocket(object):
"""A fake socket implementation for httplib.HTTPResponse."""
def __init__(self, value):
- self._rbuffer = StringIO.StringIO(value)
- self._wbuffer = StringIO.StringIO('')
+ self._rbuffer = six.StringIO(value)
+ self._wbuffer = six.StringIO('')
oldclose = self._wbuffer.close
def newclose():
import httplib
from lxml import etree
from mox import IgnoreArg
-import StringIO
+
+import six
from cinder import exception
from cinder import test
class FakeHttplibSocket(object):
"""A fake socket implementation for httplib.HTTPResponse."""
def __init__(self, value):
- self._rbuffer = StringIO.StringIO(value)
- self._wbuffer = StringIO.StringIO('')
+ self._rbuffer = six.StringIO(value)
+ self._wbuffer = six.StringIO('')
oldclose = self._wbuffer.close
def newclose():
"""Test of Policy Engine For Cinder."""
import os.path
-import StringIO
import urllib2
from oslo.config import cfg
+import six
from cinder import context
from cinder import exception
def test_enforce_http_true(self):
def fakeurlopen(url, post_data):
- return StringIO.StringIO("True")
+ return six.StringIO("True")
self.stubs.Set(urllib2, 'urlopen', fakeurlopen)
action = "example:get_http"
target = {}
def test_enforce_http_false(self):
def fakeurlopen(url, post_data):
- return StringIO.StringIO("False")
+ return six.StringIO("False")
self.stubs.Set(urllib2, 'urlopen', fakeurlopen)
action = "example:get_http"
target = {}
import hashlib
import os
import socket
-import StringIO
import tempfile
import uuid
import mox
from oslo.config import cfg
import paramiko
+import six
import cinder
from cinder.brick.initiator import connector
def test_hash_file(self):
data = 'Mary had a little lamb, its fleece as white as snow'
- flo = StringIO.StringIO(data)
+ flo = six.StringIO(data)
h1 = utils.hash_file(flo)
h2 = hashlib.sha1(data).hexdigest()
self.assertEqual(h1, h2)
import contextlib
-import StringIO
import mock
import mox
+import six
from cinder.db import api as db_api
from cinder import exception
def test_stripped_first_line_of(self):
mock_context_manager = mock.Mock()
mock_context_manager.__enter__ = mock.Mock(
- return_value=StringIO.StringIO(' blah \n second line \n'))
+ return_value=six.StringIO(' blah \n second line \n'))
mock_context_manager.__exit__ = mock.Mock(return_value=False)
mock_open = mock.Mock(return_value=mock_context_manager)