import collections
import copy
-import httplib
import math
import re
import time
from oslo_serialization import jsonutils
from oslo_utils import importutils
+from six.moves import http_client
import webob.dec
import webob.exc
body = jsonutils.dumps({"verb": verb, "path": path})
headers = {"Content-Type": "application/json"}
- conn = httplib.HTTPConnection(self.limiter_address)
+ conn = http_client.HTTPConnection(self.limiter_address)
if username:
conn.request("POST", "/%s" % (username), body, headers)
import collections
import copy
-import httplib
import math
import re
import time
from oslo_serialization import jsonutils
from oslo_utils import importutils
+from six.moves import http_client
import webob.dec
import webob.exc
body = jsonutils.dumps({"verb": verb, "path": path})
headers = {"Content-Type": "application/json"}
- conn = httplib.HTTPConnection(self.limiter_address)
+ conn = http_client.HTTPConnection(self.limiter_address)
if username:
conn.request("POST", "/%s" % (username), body, headers)
"""
import eventlet
+from six.moves import builtins
eventlet.monkey_patch()
# See http://code.google.com/p/python-nose/issues/detail?id=373
# The code below enables nosetests to work with i18n _() blocks
-import __builtin__
-setattr(__builtin__, '_', lambda x: x)
+setattr(builtins, '_', lambda x: x)
Tests dealing with HTTP rate-limiting.
"""
-import httplib
from xml.dom import minidom
from lxml import etree
from oslo_serialization import jsonutils
import six
+from six.moves import http_client
import webob
from cinder.api.v1 import limits
class FakeHttplibSocket(object):
- """Fake `httplib.HTTPResponse` replacement."""
+ """Fake `http_client.HTTPResponse` replacement."""
def __init__(self, response_string):
"""Initialize new `FakeHttplibSocket`."""
class FakeHttplibConnection(object):
- """Fake `httplib.HTTPConnection`."""
+ """Fake `http_client.HTTPConnection`."""
def __init__(self, app, host):
"""Initialize `FakeHttplibConnection`."""
Requests made via this connection actually get translated and
routed into our WSGI app, we then wait for the response and turn
- it back into an `httplib.HTTPResponse`.
+ it back into an `http_client.HTTPResponse`.
"""
if not headers:
headers = {}
resp = str(req.get_response(self.app))
resp = "HTTP/1.0 %s" % resp
sock = FakeHttplibSocket(resp)
- self.http_response = httplib.HTTPResponse(sock)
+ self.http_response = http_client.HTTPResponse(sock)
self.http_response.begin()
def getresponse(self):
After calling this method, when any code calls
- httplib.HTTPConnection(host)
+ http_client.HTTPConnection(host)
the connection object will be a fake. Its requests will be sent directly
to the given WSGI app rather than through a socket.
else:
return self.wrapped(connection_host, *args, **kwargs)
- oldHTTPConnection = httplib.HTTPConnection
- httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
+ oldHTTPConnection = http_client.HTTPConnection
+ new_http_connection = HTTPConnectionDecorator(http_client.HTTPConnection)
+ http_client.HTTPConnection = new_http_connection
return oldHTTPConnection
"""setUp for test suite.
Do some nifty HTTP/WSGI magic which allows for WSGI to be called
- directly by something like the `httplib` library.
+ directly by something like the `http_client` library.
"""
super(WsgiLimiterProxyTest, self).setUp()
self.app = limits.WsgiLimiter(TEST_LIMITS)
def _restore(self, oldHTTPConnection):
# restore original HTTPConnection object
- httplib.HTTPConnection = oldHTTPConnection
+ http_client.HTTPConnection = oldHTTPConnection
def test_200(self):
"""Successful request test."""
Tests dealing with HTTP rate-limiting.
"""
-import httplib
from xml.dom import minidom
from lxml import etree
from oslo_serialization import jsonutils
import six
+from six.moves import http_client
import webob
from cinder.api.v2 import limits
class FakeHttplibSocket(object):
- """Fake `httplib.HTTPResponse` replacement."""
+ """Fake `http_client.HTTPResponse` replacement."""
def __init__(self, response_string):
"""Initialize new `FakeHttplibSocket`."""
class FakeHttplibConnection(object):
- """Fake `httplib.HTTPConnection`."""
+ """Fake `http_client.HTTPConnection`."""
def __init__(self, app, host):
"""Initialize `FakeHttplibConnection`."""
Requests made via this connection actually get translated and
routed into our WSGI app, we then wait for the response and turn
- it back into an `httplib.HTTPResponse`.
+ it back into an `http_client.HTTPResponse`.
"""
if not headers:
headers = {}
resp = str(req.get_response(self.app))
resp = "HTTP/1.0 %s" % resp
sock = FakeHttplibSocket(resp)
- self.http_response = httplib.HTTPResponse(sock)
+ self.http_response = http_client.HTTPResponse(sock)
self.http_response.begin()
def getresponse(self):
After calling this method, when any code calls
- httplib.HTTPConnection(host)
+ http_client.HTTPConnection(host)
the connection object will be a fake. Its requests will be sent directly
to the given WSGI app rather than through a socket.
else:
return self.wrapped(connection_host, *args, **kwargs)
- oldHTTPConnection = httplib.HTTPConnection
- httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
+ oldHTTPConnection = http_client.HTTPConnection
+ new_http_connection = HTTPConnectionDecorator(http_client.HTTPConnection)
+ http_client.HTTPConnection = new_http_connection
return oldHTTPConnection
"""setUp() for WsgiLimiterProxyTest.
Do some nifty HTTP/WSGI magic which allows for WSGI to be called
- directly by something like the `httplib` library.
+ directly by something like the `http_client` library.
"""
super(WsgiLimiterProxyTest, self).setUp()
self.app = limits.WsgiLimiter(TEST_LIMITS)
def _restore(self, oldHTTPConnection):
# restore original HTTPConnection object
- httplib.HTTPConnection = oldHTTPConnection
+ http_client.HTTPConnection = oldHTTPConnection
def test_200(self):
"""Successful request test."""
Tests for Backup NFS driver.
"""
-import __builtin__
import bz2
import exceptions
import filecmp
from os_brick.remotefs import remotefs as remotefs_brick
from oslo_config import cfg
from oslo_log import log as logging
+from six.moves import builtins
from cinder.backup.drivers import nfs
from cinder import context
self.assertEqual([], result)
def test_get_object_writer(self):
- self.mock_object(__builtin__, 'open', mock.mock_open())
+ self.mock_object(builtins, 'open', mock.mock_open())
self.mock_object(os, 'chmod')
self.driver.get_object_writer(FAKE_CONTAINER, FAKE_OBJECT_NAME)
os.chmod.assert_called_once_with(FAKE_OBJECT_PATH, 0o660)
- __builtin__.open.assert_called_once_with(FAKE_OBJECT_PATH, 'w')
+ builtins.open.assert_called_once_with(FAKE_OBJECT_PATH, 'w')
def test_get_object_reader(self):
- self.mock_object(__builtin__, 'open', mock.mock_open())
+ self.mock_object(builtins, 'open', mock.mock_open())
self.driver.get_object_reader(FAKE_CONTAINER, FAKE_OBJECT_NAME)
- __builtin__.open.assert_called_once_with(FAKE_OBJECT_PATH, 'r')
+ builtins.open.assert_called_once_with(FAKE_OBJECT_PATH, 'r')
def test_delete_object(self):
self.mock_object(os, 'remove')
# License for the specific language governing permissions and limitations
# under the License.
-import httplib
import json
import os
import socket
import zlib
from oslo_log import log as logging
+from six.moves import http_client
from swiftclient import client as swift
LOG = logging.getLogger(__name__)
LOG.debug("fake head_container(%s)" % container)
if container == 'missing_container':
raise swift.ClientException('fake exception',
- http_status=httplib.NOT_FOUND)
+ http_status=http_client.NOT_FOUND)
elif container == 'unauthorized_container':
raise swift.ClientException('fake exception',
- http_status=httplib.UNAUTHORIZED)
+ http_status=http_client.UNAUTHORIZED)
elif container == 'socket_error_on_head':
raise socket.error(111, 'ECONNREFUSED')
pass
# under the License.
import hashlib
-import httplib
import os
import socket
import tempfile
from oslo_log import log as logging
+from six.moves import http_client
from swiftclient import client as swift
from cinder.openstack.common import fileutils
LOG.debug("fake head_container(%s)", container)
if container == 'missing_container':
raise swift.ClientException('fake exception',
- http_status=httplib.NOT_FOUND)
+ http_status=http_client.NOT_FOUND)
elif container == 'unauthorized_container':
raise swift.ClientException('fake exception',
- http_status=httplib.UNAUTHORIZED)
+ http_status=http_client.UNAUTHORIZED)
elif container == 'socket_error_on_head':
raise socket.error(111, 'ECONNREFUSED')
"""Unit tests for the API endpoint."""
-import httplib
-
import six
+from six.moves import http_client
import webob
class FakeHttplibSocket(object):
- """A fake socket implementation for httplib.HTTPResponse, trivial."""
+ """A fake socket implementation for http_client.HTTPResponse, trivial."""
def __init__(self, response_string):
self.response_string = response_string
self._buffer = six.StringIO(response_string)
class FakeHttplibConnection(object):
- """A fake httplib.HTTPConnection for boto.
+ """A fake http_client.HTTPConnection for boto.
requests made via this connection actually get translated and routed into
our WSGI app, we then wait for the response and turn it back into
- the httplib.HTTPResponse that boto expects.
+ the http_client.HTTPResponse that boto expects.
"""
def __init__(self, app, host, is_secure=False):
self.app = app
# guess that's a function the web server usually provides.
resp = "HTTP/1.0 %s" % resp
self.sock = FakeHttplibSocket(resp)
- self.http_response = httplib.HTTPResponse(self.sock)
+ self.http_response = http_client.HTTPResponse(self.sock)
# NOTE(vish): boto is accessing private variables for some reason
self._HTTPConnection__response = self.http_response
self.http_response.begin()
"""
-import BaseHTTPServer
-import httplib
-
from lxml import etree
import mock
from oslo_log import log as logging
import six
+from six.moves import BaseHTTPServer
+from six.moves import http_client
from cinder import exception
from cinder import test
class FakeHttplibSocket(object):
- """A fake socket implementation for httplib.HTTPResponse."""
+ """A fake socket implementation for http_client.HTTPResponse."""
def __init__(self, value):
self._rbuffer = six.StringIO(value)
self._wbuffer = six.StringIO('')
class FakeDirectCmodeHTTPConnection(object):
- """A fake httplib.HTTPConnection for netapp tests
+ """A fake http_client.HTTPConnection for netapp tests
Requests made via this connection actually get translated and routed into
the fake direct handler above, we then turn the response into
- the httplib.HTTPResponse that the caller expects.
+ the http_client.HTTPResponse that the caller expects.
"""
def __init__(self, host, timeout=None):
self.host = host
self.app = FakeDirectCMODEServerHandler(sock, '127.0.0.1:80', None)
self.sock = FakeHttplibSocket(sock.result)
- self.http_response = httplib.HTTPResponse(self.sock)
+ self.http_response = http_client.HTTPResponse(self.sock)
def set_debuglevel(self, level):
pass
self.mock_object(utils, 'OpenStackInfo')
configuration = self._set_config(create_configuration())
driver = common.NetAppDriver(configuration=configuration)
- self.stubs.Set(httplib, 'HTTPConnection',
+ self.stubs.Set(http_client, 'HTTPConnection',
FakeDirectCmodeHTTPConnection)
driver.do_setup(context='')
self.driver = driver
class FakeDirect7modeHTTPConnection(object):
- """A fake httplib.HTTPConnection for netapp tests
+ """A fake http_client.HTTPConnection for netapp tests
Requests made via this connection actually get translated and routed into
the fake direct handler above, we then turn the response into
- the httplib.HTTPResponse that the caller expects.
+ the http_client.HTTPResponse that the caller expects.
"""
def __init__(self, host, timeout=None):
self.host = host
self.app = FakeDirect7MODEServerHandler(sock, '127.0.0.1:80', None)
self.sock = FakeHttplibSocket(sock.result)
- self.http_response = httplib.HTTPResponse(self.sock)
+ self.http_response = http_client.HTTPResponse(self.sock)
def set_debuglevel(self, level):
pass
self.mock_object(utils, 'OpenStackInfo')
configuration = self._set_config(create_configuration())
driver = common.NetAppDriver(configuration=configuration)
- self.stubs.Set(httplib, 'HTTPConnection',
+ self.stubs.Set(http_client, 'HTTPConnection',
FakeDirect7modeHTTPConnection)
driver.do_setup(context='')
driver.root_volume_name = 'root'
self.mock_object(utils, 'OpenStackInfo')
configuration = self._set_config(create_configuration())
driver = common.NetAppDriver(configuration=configuration)
- self.stubs.Set(httplib, 'HTTPConnection',
+ self.stubs.Set(http_client, 'HTTPConnection',
FakeDirect7modeHTTPConnection)
driver.do_setup(context='')
self.driver = driver
# under the License.
"""Unit tests for the NetApp-specific ssc module."""
-import BaseHTTPServer
import copy
-import httplib
-
from lxml import etree
from mox3 import mox
import six
+from six.moves import BaseHTTPServer
+from six.moves import http_client
from cinder import exception
from cinder import test
class FakeHttplibSocket(object):
- """A fake socket implementation for httplib.HTTPResponse."""
+ """A fake socket implementation for http_client.HTTPResponse."""
def __init__(self, value):
self._rbuffer = six.StringIO(value)
self._wbuffer = six.StringIO('')
class FakeDirectCmodeHTTPConnection(object):
- """A fake httplib.HTTPConnection for netapp tests.
+ """A fake http_client.HTTPConnection for netapp tests.
Requests made via this connection actually get translated and routed into
the fake direct handler above, we then turn the response into
- the httplib.HTTPResponse that the caller expects.
+ the http_client.HTTPResponse that the caller expects.
"""
def __init__(self, host, timeout=None):
self.host = host
self.app = FakeDirectCMODEServerHandler(sock, '127.0.0.1:80', None)
self.sock = FakeHttplibSocket(sock.result)
- self.http_response = httplib.HTTPResponse(self.sock)
+ self.http_response = http_client.HTTPResponse(self.sock)
def set_debuglevel(self, level):
pass
def setUp(self):
super(SscUtilsTestCase, self).setUp()
- self.stubs.Set(httplib, 'HTTPConnection',
+ self.stubs.Set(http_client, 'HTTPConnection',
FakeDirectCmodeHTTPConnection)
def test_cl_vols_ssc_all(self):
import copy
import errno
-import httplib
import re
import mock
from oslo_utils import units
+from six.moves import http_client
from cinder import exception
from cinder import test
'GET',
'/%s/%s/' % (DPLCOMMON.DPL_VER_V1, DPLCOMMON.DPL_OBJ_SYSTEM),
None,
- [httplib.OK, httplib.ACCEPTED])
+ [http_client.OK, http_client.ACCEPTED])
def test_createvdev(self):
self.dplcmd.create_vdev(DATA_IN_VOLUME['id'],
'/%s/%s/%s/' % (DPLCOMMON.DPL_VER_V1, DPLCOMMON.DPL_OBJ_VOLUME,
DATA_IN_VOLUME['id']),
params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED, http_client.CREATED])
def test_extendvdev(self):
self.dplcmd.extend_vdev(DATA_IN_VOLUME['id'],
'/%s/%s/%s/' % (DPLCOMMON.DPL_VER_V1, DPLCOMMON.DPL_OBJ_VOLUME,
DATA_IN_VOLUME['id']),
params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED, http_client.CREATED])
def test_deletevdev(self):
self.dplcmd.delete_vdev(DATA_IN_VOLUME['id'], True)
'/%s/%s/%s/' % (DPLCOMMON.DPL_VER_V1, DPLCOMMON.DPL_OBJ_VOLUME,
DATA_IN_VOLUME['id']),
params,
- [httplib.OK, httplib.ACCEPTED, httplib.NOT_FOUND,
- httplib.NO_CONTENT])
+ [http_client.OK, http_client.ACCEPTED, http_client.NOT_FOUND,
+ http_client.NO_CONTENT])
def test_createvdevfromsnapshot(self):
self.dplcmd.create_vdev_from_snapshot(
'/%s/%s/%s/' % (DPLCOMMON.DPL_VER_V1, DPLCOMMON.DPL_OBJ_VOLUME,
DATA_IN_VOLUME['id']),
params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED, http_client.CREATED])
def test_getpool(self):
self.dplcmd.get_pool(POOLUUID)
'/%s/%s/%s/' % (DPLCOMMON.DPL_VER_V1, DPLCOMMON.DPL_OBJ_POOL,
POOLUUID),
None,
- [httplib.OK, httplib.ACCEPTED])
+ [http_client.OK, http_client.ACCEPTED])
def test_clonevdev(self):
self.dplcmd.clone_vdev(
'/%s/%s/%s/' % (DPLCOMMON.DPL_VER_V1, DPLCOMMON.DPL_OBJ_VOLUME,
DATA_IN_VOLUME1['id']),
params,
- [httplib.OK, httplib.CREATED, httplib.ACCEPTED])
+ [http_client.OK, http_client.CREATED, http_client.ACCEPTED])
def test_createvdevsnapshot(self):
self.dplcmd.create_vdev_snapshot(
'/%s/%s/%s/' % (DPLCOMMON.DPL_VER_V1, DPLCOMMON.DPL_OBJ_VOLUME,
DATA_IN_VOLUME['id']),
params,
- [httplib.OK, httplib.CREATED, httplib.ACCEPTED])
+ [http_client.OK, http_client.CREATED, http_client.ACCEPTED])
def test_getvdev(self):
self.dplcmd.get_vdev(DATA_IN_VOLUME['id'])
'/%s/%s/%s/' % (DPLCOMMON.DPL_VER_V1, DPLCOMMON.DPL_OBJ_VOLUME,
DATA_IN_VOLUME['id']),
None,
- [httplib.OK, httplib.ACCEPTED, httplib.NOT_FOUND])
+ [http_client.OK, http_client.ACCEPTED, http_client.NOT_FOUND])
def test_getvdevstatus(self):
self.dplcmd.get_vdev_status(DATA_IN_VOLUME['id'], '123456')
DATA_IN_VOLUME['id'],
'123456'),
None,
- [httplib.OK, httplib.NOT_FOUND])
+ [http_client.OK, http_client.NOT_FOUND])
def test_getpoolstatus(self):
self.dplcmd.get_pool_status(POOLUUID, '123456')
POOLUUID,
'123456'),
None,
- [httplib.OK, httplib.NOT_FOUND])
+ [http_client.OK, http_client.NOT_FOUND])
def test_assignvdev(self):
self.dplcmd.assign_vdev(
DPLCOMMON.DPL_OBJ_VOLUME,
DATA_IN_VOLUME['id']),
params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED, http_client.CREATED])
def test_unassignvdev(self):
self.dplcmd.unassign_vdev(DATA_IN_VOLUME['id'],
DPLCOMMON.DPL_OBJ_VOLUME,
DATA_IN_VOLUME['id']),
params,
- [httplib.OK, httplib.ACCEPTED,
- httplib.NO_CONTENT, httplib.NOT_FOUND])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.NO_CONTENT, http_client.NOT_FOUND])
def test_deletevdevsnapshot(self):
self.dplcmd.delete_vdev_snapshot(DATA_IN_VOLUME['id'],
DPLCOMMON.DPL_OBJ_SNAPSHOT,
DATA_IN_SNAPSHOT['id']),
None,
- [httplib.OK, httplib.ACCEPTED, httplib.NO_CONTENT,
- httplib.NOT_FOUND])
+ [http_client.OK, http_client.ACCEPTED, http_client.NO_CONTENT,
+ http_client.NOT_FOUND])
def test_listvdevsnapshots(self):
self.dplcmd.list_vdev_snapshots(DATA_IN_VOLUME['id'])
DATA_IN_VOLUME['id'],
DPLCOMMON.DPL_OBJ_SNAPSHOT),
None,
- [httplib.OK])
+ [http_client.OK])
class TestProphetStorDPLDriver(test.TestCase):
# License for the specific language governing permissions and limitations
# under the License.
-import httplib
import json
import uuid
from oslo_log import log as logging
from oslo_utils import units
import six
+from six.moves import http_client
from six.moves import urllib
from cinder import exception
res_details = {}
try:
# Prepare the connection
- connection = httplib.HTTPSConnection(host)
+ connection = http_client.HTTPSConnection(host)
# Make the connection
connection.request('GET', url)
# Extract the response as the connection was successful
error_details = res_obj['error']
http_status = res_obj['http_status']
- except httplib.HTTPException as ex:
+ except http_client.HTTPException as ex:
msg = (_("Error executing CloudByte API [%(cmd)s], "
"Error: %(err)s.") %
{'cmd': cmd, 'err': ex})
# under the License.
import base64
-import httplib
import os
import socket
import ssl
import OpenSSL
from oslo_log import log as logging
import six
+from six.moves import http_client
from six.moves import urllib
from cinder.i18n import _, _LI
class OpenSSLConnectionDelegator(object):
"""An OpenSSL.SSL.Connection delegator.
- Supplies an additional 'makefile' method which httplib requires
+ Supplies an additional 'makefile' method which http_client requires
and is not present in OpenSSL.SSL.Connection.
Note: Since it is not possible to inherit from OpenSSL.SSL.Connection
a delegator must be used.
return socket._fileobject(self.connection, *args, **kwargs)
-class HTTPSConnection(httplib.HTTPSConnection):
+class HTTPSConnection(http_client.HTTPSConnection):
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, ca_certs=None, no_verification=False):
if not pywbemAvailable:
else:
excp_lst = ()
try:
- httplib.HTTPSConnection.__init__(self, host, port,
- key_file=key_file,
- cert_file=cert_file)
+ http_client.HTTPSConnection.__init__(self, host, port,
+ key_file=key_file,
+ cert_file=cert_file)
self.key_file = None if key_file is None else key_file
self.cert_file = None if cert_file is None else cert_file
"""Send request over HTTP.
Send XML data over HTTP to the specified url. Return the
- response in XML. Uses Python's build-in httplib. x509 may be a
+ response in XML. Uses Python's build-in http_client. x509 may be a
dictionary containing the location of the SSL certificate and key
files.
"""
if response.status != 200:
raise pywbem.cim_http.Error('HTTP error')
- except httplib.BadStatusLine as arg:
+ except http_client.BadStatusLine as arg:
msg = (_("Bad Status line returned: %(arg)s.")
% {'arg': arg})
raise pywbem.cim_http.Error(msg)
"""Common class for Huawei 18000 storage drivers."""
import base64
-import cookielib
import json
import socket
import time
from oslo_utils import excutils
from oslo_utils import units
import six
+from six.moves import http_cookiejar
from six.moves import urllib
from cinder import context
def __init__(self, configuration):
self.configuration = configuration
- self.cookie = cookielib.CookieJar()
+ self.cookie = http_cookiejar.CookieJar()
self.url = None
self.productversion = None
self.headers = {"Connection": "keep-alive",
import base64
import errno
-import httplib
import json
import random
import time
from oslo_log import log as logging
from oslo_utils import units
import six
+from six.moves import http_client
from cinder import exception
from cinder.i18n import _, _LI, _LW, _LE
retcode = errno.EINVAL
for i in range(CONNECTION_RETRY):
try:
- connection = httplib.HTTPSConnection(self.ip,
- self.port,
- timeout=60)
+ connection = http_client.HTTPSConnection(self.ip,
+ self.port,
+ timeout=60)
if connection:
retcode = 0
break
while (connection and retry):
try:
connection.request(method, url, payload, header)
- except httplib.CannotSendRequest as e:
+ except http_client.CannotSendRequest as e:
connection.close()
time.sleep(1)
- connection = httplib.HTTPSConnection(self.ip,
- self.port,
- timeout=60)
+ connection = http_client.HTTPSConnection(self.ip,
+ self.port,
+ timeout=60)
retry -= 1
if connection:
if retry == 0:
if retcode == 0:
try:
response = connection.getresponse()
- if response.status == httplib.SERVICE_UNAVAILABLE:
+ if response.status == http_client.SERVICE_UNAVAILABLE:
LOG.error(_LE('The Flexvisor service is unavailable.'))
time.sleep(1)
retry -= 1
else:
retcode = 0
break
- except httplib.ResponseNotReady as e:
+ except http_client.ResponseNotReady as e:
time.sleep(1)
retry -= 1
retcode = errno.EFAULT
break
if (retcode == 0 and response.status in expected_status
- and response.status == httplib.NOT_FOUND):
+ and response.status == http_client.NOT_FOUND):
retcode = errno.ENODATA
elif retcode == 0 and response.status not in expected_status:
LOG.error(_LE('%(method)s %(url)s unexpected response status: '
'%(response)s (expects: %(expects)s).'),
{'method': method,
'url': url,
- 'response': httplib.responses[response.status],
+ 'response': http_client.responses[response.status],
'expects': expected_status})
- if response.status == httplib.UNAUTHORIZED:
+ if response.status == http_client.UNAUTHORIZED:
raise exception.NotAuthorized
retcode = errno.EACCES
else:
retcode = errno.EIO
- elif retcode == 0 and response.status is httplib.NOT_FOUND:
+ elif retcode == 0 and response.status is http_client.NOT_FOUND:
retcode = errno.ENODATA
- elif retcode == 0 and response.status is httplib.ACCEPTED:
+ elif retcode == 0 and response.status is http_client.ACCEPTED:
retcode = errno.EAGAIN
try:
data = response.read()
e)
retcode = errno.ENOEXEC
elif (retcode == 0 and
- response.status in [httplib.OK, httplib.CREATED] and
- httplib.NO_CONTENT not in expected_status):
+ response.status in [http_client.OK, http_client.CREATED] and
+ http_client.NO_CONTENT not in expected_status):
try:
data = response.read()
data = json.loads(data)
def get_server_info(self):
method = 'GET'
url = ('/%s/%s/' % (DPL_VER_V1, DPL_OBJ_SYSTEM))
- return self._execute(method, url, None, [httplib.OK, httplib.ACCEPTED])
+ return self._execute(method, url, None,
+ [http_client.OK, http_client.ACCEPTED])
def create_vdev(self, volumeID, volumeName, volumeDesc, poolID, volumeSize,
fthinprovision=True, maximum_snapshot=MAXSNAPSHOTS,
params['metadata'] = metadata
return self._execute(method,
url, params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.CREATED])
def extend_vdev(self, volumeID, volumeName, volumeDesc, volumeSize,
maximum_snapshot=MAXSNAPSHOTS, snapshot_quota=None):
params['metadata'] = metadata
return self._execute(method,
url, params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.CREATED])
def delete_vdev(self, volumeID, force=True):
method = 'DELETE'
params['metadata'] = metadata
return self._execute(method,
url, params,
- [httplib.OK, httplib.ACCEPTED, httplib.NOT_FOUND,
- httplib.NO_CONTENT])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.NOT_FOUND, http_client.NO_CONTENT])
def create_vdev_from_snapshot(self, vdevID, vdevDisplayName, vdevDesc,
snapshotID, poolID, fthinprovision=True,
params['copy'] = self._gen_snapshot_url(vdevID, snapshotID)
return self._execute(method,
url, params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.CREATED])
def spawn_vdev_from_snapshot(self, new_vol_id, src_vol_id,
vol_display_name, description, snap_id):
params['copy'] = self._gen_snapshot_url(src_vol_id, snap_id)
return self._execute(method, url, params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.CREATED])
def get_pools(self):
method = 'GET'
url = '/%s/%s/' % (DPL_VER_V1, DPL_OBJ_POOL)
- return self._execute(method, url, None, [httplib.OK])
+ return self._execute(method, url, None, [http_client.OK])
def get_pool(self, poolid):
method = 'GET'
url = '/%s/%s/%s/' % (DPL_VER_V1, DPL_OBJ_POOL, poolid)
- return self._execute(method, url, None, [httplib.OK, httplib.ACCEPTED])
+ return self._execute(method, url, None,
+ [http_client.OK, http_client.ACCEPTED])
def clone_vdev(self, SourceVolumeID, NewVolumeID, poolID, volumeName,
volumeDesc, volumeSize, fthinprovision=True,
return self._execute(method,
url, params,
- [httplib.OK, httplib.CREATED, httplib.ACCEPTED])
+ [http_client.OK, http_client.CREATED,
+ http_client.ACCEPTED])
def create_vdev_snapshot(self, vdevid, snapshotid, snapshotname='',
snapshotdes='', isgroup=False):
return self._execute(method,
url, params,
- [httplib.OK, httplib.CREATED, httplib.ACCEPTED])
+ [http_client.OK, http_client.CREATED,
+ http_client.ACCEPTED])
def get_vdev(self, vdevid):
method = 'GET'
return self._execute(method,
url, None,
- [httplib.OK, httplib.ACCEPTED, httplib.NOT_FOUND])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.NOT_FOUND])
def get_vdev_status(self, vdevid, eventid):
method = 'GET'
return self._execute(method,
url, None,
- [httplib.OK, httplib.NOT_FOUND])
+ [http_client.OK, http_client.NOT_FOUND])
def get_pool_status(self, poolid, eventid):
method = 'GET'
return self._execute(method,
url, None,
- [httplib.OK, httplib.NOT_FOUND])
+ [http_client.OK, http_client.NOT_FOUND])
def assign_vdev(self, vdevid, iqn, lunname, portal, lunid=0):
method = 'PUT'
return self._execute(method,
url, params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.CREATED])
def assign_vdev_fc(self, vdevid, targetwwpn, initiatorwwpn, lunname,
lunid=-1):
return self._execute(method,
url, params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.CREATED])
def unassign_vdev(self, vdevid, initiatorIqn, targetIqn=''):
method = 'PUT'
return self._execute(method,
url, params,
- [httplib.OK, httplib.ACCEPTED,
- httplib.NO_CONTENT, httplib.NOT_FOUND])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.NO_CONTENT, http_client.NOT_FOUND])
def unassign_vdev_fc(self, vdevid, targetwwpn, initiatorwwpns):
method = 'PUT'
return self._execute(method,
url, params,
- [httplib.OK, httplib.ACCEPTED,
- httplib.NO_CONTENT, httplib.NOT_FOUND])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.NO_CONTENT, http_client.NOT_FOUND])
def delete_vdev_snapshot(self, objID, snapshotID, isGroup=False):
method = 'DELETE'
return self._execute(method,
url, None,
- [httplib.OK, httplib.ACCEPTED, httplib.NO_CONTENT,
- httplib.NOT_FOUND])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.NO_CONTENT, http_client.NOT_FOUND])
def rollback_vdev(self, vdevid, snapshotid):
method = 'PUT'
return self._execute(method,
url, params,
- [httplib.OK, httplib.ACCEPTED])
+ [http_client.OK, http_client.ACCEPTED])
def list_vdev_snapshots(self, vdevid, isGroup=False):
method = 'GET'
return self._execute(method,
url, None,
- [httplib.OK])
+ [http_client.OK])
def query_vdev_snapshot(self, vdevid, snapshotID, isGroup=False):
method = 'GET'
return self._execute(method,
url, None,
- [httplib.OK])
+ [http_client.OK])
def create_target(self, targetID, protocol, displayName, targetAddress,
description=''):
metadata['display_name'] = displayName
metadata['display_description'] = description
metadata['address'] = targetAddress
- return self._execute(method, url, params, [httplib.OK])
+ return self._execute(method, url, params, [http_client.OK])
def get_target(self, targetID):
method = 'GET'
url = '/%s/%s/%s/' % (DPL_VER_V1, DPL_OBJ_EXPORT, targetID)
- return self._execute(method, url, None, [httplib.OK])
+ return self._execute(method, url, None, [http_client.OK])
def delete_target(self, targetID):
method = 'DELETE'
url = '/%s/%s/%s/' % (DPL_VER_V1, DPL_OBJ_EXPORT, targetID)
return self._execute(method,
url, None,
- [httplib.OK, httplib.ACCEPTED, httplib.NOT_FOUND])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.NOT_FOUND])
def get_target_list(self, type='target'):
# type = target/initiator
url = '/%s/%s/' % (DPL_VER_V1, DPL_OBJ_EXPORT)
else:
url = '/%s/%s/?type=%s' % (DPL_VER_V1, DPL_OBJ_EXPORT, type)
- return self._execute(method, url, None, [httplib.OK])
+ return self._execute(method, url, None, [http_client.OK])
def get_sns_table(self, wwpn):
method = 'PUT'
params['metadata'] = {}
params['metadata']['protocol'] = 'fc'
params['metadata']['address'] = str(wwpn)
- return self._execute(method, url, params, [httplib.OK])
+ return self._execute(method, url, params, [http_client.OK])
def create_vg(self, groupID, groupName, groupDesc='', listVolume=None,
maxSnapshots=MAXSNAPSHOTS, rotationSnapshot=True):
metadata['properties'] = properties
params['metadata'] = metadata
return self._execute(method, url, params,
- [httplib.OK, httplib.ACCEPTED, httplib.CREATED])
+ [http_client.OK, http_client.ACCEPTED,
+ http_client.CREATED])
def get_vg_list(self, vgtype=None):
method = 'GET'
url = '/%s/?volume_group_type=%s' % (DPL_OBJ_VOLUMEGROUP, vgtype)
else:
url = '/%s/' % (DPL_OBJ_VOLUMEGROUP)
- return self._execute(method, url, None, [httplib.OK])
+ return self._execute(method, url, None, [http_client.OK])
def get_vg(self, groupID):
method = 'GET'
url = '/%s/%s/' % (DPL_OBJ_VOLUMEGROUP, groupID)
- return self._execute(method, url, None, [httplib.OK])
+ return self._execute(method, url, None, [http_client.OK])
def delete_vg(self, groupID, force=True):
method = 'DELETE'
metadata['force'] = force
params['metadata'] = metadata
return self._execute(method, url, params,
- [httplib.NO_CONTENT, httplib.NOT_FOUND])
+ [http_client.NO_CONTENT, http_client.NOT_FOUND])
def join_vg(self, volumeID, groupID):
method = 'PUT'
metadata['volume'].append(volumeID)
params['metadata'] = metadata
return self._execute(method, url, params,
- [httplib.OK, httplib.ACCEPTED])
+ [http_client.OK, http_client.ACCEPTED])
def leave_vg(self, volumeID, groupID):
method = 'PUT'
metadata['volume'].append(volumeID)
params['metadata'] = metadata
return self._execute(method, url, params,
- [httplib.OK, httplib.ACCEPTED])
+ [http_client.OK, http_client.ACCEPTED])
class DPLCOMMONDriver(driver.ConsistencyGroupVD, driver.ExtendVD,
ZFS Storage Appliance REST API Client Programmatic Interface
"""
-import httplib
import json
import StringIO
import time
from oslo_log import log
+from six.moves import http_client
from six.moves import urllib
from cinder.i18n import _LE, _LI
pass
#: Request return OK
- OK = httplib.OK
+ OK = http_client.OK
#: New resource created successfully
- CREATED = httplib.CREATED
+ CREATED = http_client.CREATED
#: Command accepted
- ACCEPTED = httplib.ACCEPTED
+ ACCEPTED = http_client.ACCEPTED
#: Command returned OK but no data will be returned
- NO_CONTENT = httplib.NO_CONTENT
+ NO_CONTENT = http_client.NO_CONTENT
#: Bad Request
- BAD_REQUEST = httplib.BAD_REQUEST
+ BAD_REQUEST = http_client.BAD_REQUEST
#: User is not authorized
- UNAUTHORIZED = httplib.UNAUTHORIZED
+ UNAUTHORIZED = http_client.UNAUTHORIZED
#: The request is not allowed
- FORBIDDEN = httplib.FORBIDDEN
+ FORBIDDEN = http_client.FORBIDDEN
#: The requested resource was not found
- NOT_FOUND = httplib.NOT_FOUND
+ NOT_FOUND = http_client.NOT_FOUND
#: The request is not allowed
- NOT_ALLOWED = httplib.METHOD_NOT_ALLOWED
+ NOT_ALLOWED = http_client.METHOD_NOT_ALLOWED
#: Request timed out
- TIMEOUT = httplib.REQUEST_TIMEOUT
+ TIMEOUT = http_client.REQUEST_TIMEOUT
#: Invalid request
- CONFLICT = httplib.CONFLICT
+ CONFLICT = http_client.CONFLICT
#: Service Unavailable
- BUSY = httplib.SERVICE_UNAVAILABLE
+ BUSY = http_client.SERVICE_UNAVAILABLE
class RestResult(object):
if self.error:
self.status = self.error.code
- self.data = httplib.responses[self.status]
+ self.data = http_client.responses[self.status]
LOG.debug('Response code: %s', self.status)
LOG.debug('Response data: %s', self.data)
self.code = status
self.name = name
self.msg = message
- if status in httplib.responses:
- self.msg = httplib.responses[status]
+ if status in http_client.responses:
+ self.msg = http_client.responses[status]
def __str__(self):
return "%d %s %s" % (self.code, self.name, self.msg)
try:
result = self.post("/access/v1")
del self.headers['authorization']
- if result.status == httplib.CREATED:
+ if result.status == http_client.CREATED:
self.headers['x-auth-session'] = \
result.get_header('x-auth-session')
self.do_logout = True
LOG.info(_LI('ZFSSA version: %s'),
result.get_header('x-zfssa-version'))
- elif result.status == httplib.NOT_FOUND:
+ elif result.status == http_client.NOT_FOUND:
raise RestClientError(result.status, name="ERR_RESTError",
message="REST Not Available: \
Please Upgrade")
try:
response = urllib.request.urlopen(req, timeout=self.timeout)
except urllib.error.HTTPError as err:
- if err.code == httplib.NOT_FOUND:
+ if err.code == http_client.NOT_FOUND:
LOG.debug('REST Not Found: %s', err.code)
else:
LOG.error(_LE('REST Not Available: %s'), err.code)
- if err.code == httplib.SERVICE_UNAVAILABLE and \
+ if err.code == http_client.SERVICE_UNAVAILABLE and \
retry < maxreqretries:
retry += 1
time.sleep(1)
LOG.error(_LE('Server Busy retry request: %s'), retry)
continue
- if (err.code == httplib.UNAUTHORIZED or
- err.code == httplib.INTERNAL_SERVER_ERROR) and \
+ if (err.code == http_client.UNAUTHORIZED or
+ err.code == http_client.INTERNAL_SERVER_ERROR) and \
'/access/v1' not in zfssaurl:
try:
LOG.error(_LE('Authorizing request: %(zfssaurl)s '
break
- if response and response.getcode() == httplib.SERVICE_UNAVAILABLE and \
+ if response and response.getcode() == http_client.SERVICE_UNAVAILABLE and \
retry >= maxreqretries:
raise RestClientError(response.getcode(), name="ERR_HTTPError",
message="REST Not Available: Disabled")
ZFS Storage Appliance WebDAV Client
"""
-import httplib
import time
from oslo_log import log
+from six.moves import http_client
from six.moves import urllib
from cinder import exception
'in cinder.conf.')
WebDAVHTTPErrors = {
- httplib.UNAUTHORIZED: _('User not authorized to perform WebDAV '
- 'operations.'),
- httplib.BAD_GATEWAY: bad_gateway_err,
- httplib.FORBIDDEN: _('Check access permissions for the ZFS share assigned '
- 'to this driver.'),
- httplib.NOT_FOUND: _('The source volume for this WebDAV operation not '
- 'found.'),
- httplib.INSUFFICIENT_STORAGE: _('Not enough storage space in the ZFS '
- 'share to perform this operation.')
+ http_client.UNAUTHORIZED: _('User not authorized to perform WebDAV '
+ 'operations.'),
+ http_client.BAD_GATEWAY: bad_gateway_err,
+ http_client.FORBIDDEN: _('Check access permissions for the ZFS share '
+ 'assigned to this driver.'),
+ http_client.NOT_FOUND: _('The source volume for this WebDAV operation not '
+ 'found.'),
+ http_client.INSUFFICIENT_STORAGE: _('Not enough storage space in the ZFS '
+ 'share to perform this operation.')
}
WebDAVErrors = {
def _lookup_error(self, error):
msg = ''
- if error in httplib.responses:
- msg = httplib.responses[error]
+ if error in http_client.responses:
+ msg = http_client.responses[error]
if error in WebDAVHTTPErrors:
msg = WebDAVHTTPErrors[error]
'%(method)s call.'),
{'code': err.code, 'method': method})
- if err.code == httplib.INTERNAL_SERVER_ERROR:
+ if err.code == http_client.INTERNAL_SERVER_ERROR:
LOG.error(_LE('WebDAV operation failed with error code: '
'%(code)s reason: %(reason)s Retry attempt '
'%(retry)s in progress.'),
src=src_file, dst=dst_file,
method=method)
- except httplib.BadStatusLine as err:
+ except http_client.BadStatusLine as err:
msg = self._lookup_error('BadStatusLine')
+ code = 'http_client.BadStatusLine'
raise exception.WebDAVClientError(msg=msg,
- code='httplib.BadStatusLine',
+ code=code,
src=src_file, dst=dst_file,
method=method)
[Variables]
dummy-variables-rgx=_
+
+[Typecheck]
+# Disable warnings on the HTTPSConnection classes because pylint doesn't
+# support importing from six.moves yet, see:
+# https://bitbucket.org/logilab/pylint/issue/550/
+ignored-classes=HTTPSConnection