Change constructor arguments of NexentaJSONProxy.
Change-Id: I43d69485e6a98704f4cb02a4b55acccf9c05b683
import urllib2
import mox as mox_lib
-from mox import stubout
from cinder import test
from cinder import units
class TestNexentaJSONRPC(test.TestCase):
- URL = 'http://example.com/'
- URL_S = 'https://example.com/'
+ HOST = 'example.com'
+ URL = 'http://%s/' % HOST
+ URL_S = 'https://%s/' % HOST
USER = 'user'
PASSWORD = 'password'
- HEADERS = {'Authorization': 'Basic %s' % (
- base64.b64encode(':'.join((USER, PASSWORD))),),
- 'Content-Type': 'application/json'}
+ HEADERS = {
+ 'Authorization':
+ 'Basic %s' % base64.b64encode('%s:%s' % (USER, PASSWORD)),
+ 'Content-Type': 'application/json'
+ }
REQUEST = 'the request'
def setUp(self):
super(TestNexentaJSONRPC, self).setUp()
self.proxy = jsonrpc.NexentaJSONProxy(
- self.URL, self.USER, self.PASSWORD, auto=True)
+ 'http', self.HOST, 2000, '/', self.USER, self.PASSWORD, auto=True)
self.mox.StubOutWithMock(urllib2, 'Request', True)
self.mox.StubOutWithMock(urllib2, 'urlopen')
self.resp_mock = self.mox.CreateMockAnything()
def test_call(self):
urllib2.Request(
- self.URL,
+ 'http://%s:2000/' % self.HOST,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
self.resp_info_mock.status = ''
def test_call_deep(self):
urllib2.Request(
- self.URL,
+ 'http://%s:2000/' % self.HOST,
'{"object": "obj1.subobj", "params": ["arg1", "arg2"],'
' "method": "meth"}',
self.HEADERS).AndReturn(self.REQUEST)
def test_call_auto(self):
urllib2.Request(
- self.URL,
+ 'http://%s:2000/' % self.HOST,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
urllib2.Request(
- self.URL_S,
+ 'https://%s:2000/' % self.HOST,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
self.resp_info_mock.status = 'EOF in headers'
def test_call_error(self):
urllib2.Request(
- self.URL,
+ 'http://%s:2000/' % self.HOST,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
self.resp_info_mock.status = ''
def test_call_fail(self):
urllib2.Request(
- self.URL,
+ 'http://%s:2000/' % self.HOST,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
self.resp_info_mock.status = 'EOF in headers'
.. automodule:: nexenta.jsonrpc
.. moduleauthor:: Yuriy Taraday <yorik.sar@gmail.com>
+.. moduleauthor:: Victor Rodionov <victor.rodionov@nexenta.com>
"""
import urllib2
class NexentaJSONProxy(object):
- def __init__(self, url, user, password, auto=False, obj=None, method=None):
- self.url = url
+
+ def __init__(self, scheme, host, port, path, user, password, auto=False,
+ obj=None, method=None):
+ self.scheme = scheme.lower()
+ self.host = host
+ self.port = port
+ self.path = path
self.user = user
self.password = password
self.auto = auto
obj, method = self.obj, name
else:
obj, method = '%s.%s' % (self.obj, self.method), name
- return NexentaJSONProxy(self.url, self.user, self.password, self.auto,
- obj, method)
+ return NexentaJSONProxy(self.scheme, self.host, self.port, self.path,
+ self.user, self.password, self.auto, obj,
+ method)
+
+ @property
+ def url(self):
+ return '%s://%s:%s%s' % (self.scheme, self.host, self.port, self.path)
+
+ def __hash__(self):
+ return self.url.__hash__()
+
+ def __repr__(self):
+ return 'NMS proxy: %s' % self.url
def __call__(self, *args):
- data = jsonutils.dumps({'object': self.obj,
- 'method': self.method,
- 'params': args})
+ data = jsonutils.dumps({
+ 'object': self.obj,
+ 'method': self.method,
+ 'params': args
+ })
auth = ('%s:%s' % (self.user, self.password)).encode('base64')[:-1]
- headers = {'Content-Type': 'application/json',
- 'Authorization': 'Basic %s' % (auth,)}
+ headers = {
+ 'Content-Type': 'application/json',
+ 'Authorization': 'Basic %s' % auth
+ }
LOG.debug(_('Sending JSON data: %s'), data)
request = urllib2.Request(self.url, data, headers)
response_obj = urllib2.urlopen(request)
if response_obj.info().status == 'EOF in headers':
- if self.auto and self.url.startswith('http://'):
- LOG.info(_('Auto switching to HTTPS connection to %s'),
- self.url)
- self.url = 'https' + self.url[4:]
- request = urllib2.Request(self.url, data, headers)
- response_obj = urllib2.urlopen(request)
- else:
+ if not self.auto or self.scheme != 'http':
LOG.error(_('No headers in server response'))
raise NexentaJSONException(_('Bad response from server'))
+ LOG.info(_('Auto switching to HTTPS connection to %s'), self.url)
+ self.scheme = 'https'
+ request = urllib2.Request(self.url, data, headers)
+ response_obj = urllib2.urlopen(request)
response_data = response_obj.read()
LOG.debug(_('Got response: %s'), response_data)
response = jsonutils.loads(response_data)
if response.get('error') is not None:
raise NexentaJSONException(response['error'].get('message', ''))
- else:
- return response.get('result')
+ return response.get('result')
allocated = utils.str2size(folder_props['used'])
return free + allocated, free, allocated
- def _get_nms_for_url(self, nms_url):
- pr = urlparse.urlparse(nms_url)
- scheme = pr.scheme
- auto = scheme == 'auto'
- if auto:
- scheme = 'http'
- user = 'admin'
- password = 'nexenta'
- if '@' not in pr.netloc:
- host_and_port = pr.netloc
- else:
- user_and_password, host_and_port = pr.netloc.split('@', 1)
- if ':' in user_and_password:
- user, password = user_and_password.split(':')
- else:
- user = user_and_password
- if ':' in host_and_port:
- host, port = host_and_port.split(':', 1)
- else:
- host, port = host_and_port, '2000'
- url = '%s://%s:%s/rest/nms/' % (scheme, host, port)
- return jsonrpc.NexentaJSONProxy(url, user, password, auto=auto)
+ def _get_nms_for_url(self, url):
+ """Returns initialized nms object for url."""
+ auto, scheme, user, password, host, port, path =\
+ utils.parse_nms_url(url)
+ return jsonrpc.NexentaJSONProxy(scheme, host, port, path, user,
+ password, auto=auto)
def _get_snapshot_volume(self, snapshot):
ctxt = context.get_admin_context()
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+"""
+:mod:`nexenta.utils` -- Nexenta-specific utils functions.
+=========================================================
+
+.. automodule:: nexenta.utils
+.. moduleauthor:: Victor Rodionov <victor.rodionov@nexenta.com>
+.. moduleauthor:: Mikhail Khodos <hodosmb@gmail.com>
+"""
import re
+import urlparse
def str2size(s, scale=1024):
value = float(groups[0])
suffix = len(groups) > 1 and groups[1].upper() or 'B'
- types = ['B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']
+ types = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
for i, t in enumerate(types):
if suffix == t:
return int(value * pow(scale, i))
+
+
+def parse_nms_url(url):
+ """Parse NMS url into normalized parts like scheme, user, host and others.
+
+ Example NMS URL:
+ auto://admin:nexenta@192.168.1.1:2000/
+
+ NMS URL parts:
+ auto True if url starts with auto://, protocol will be
+ automatically switched to https if http not
+ supported;
+ scheme (auto) connection protocol (http or https);
+ user (admin) NMS user;
+ password (nexenta) NMS password;
+ host (192.168.1.1) NMS host;
+ port (2000) NMS port.
+
+ :param url: url string
+ :return: tuple (auto, scheme, user, password, host, port, path)
+ """
+ pr = urlparse.urlparse(url)
+ scheme = pr.scheme
+ auto = scheme == 'auto'
+ if auto:
+ scheme = 'http'
+ user = 'admin'
+ password = 'nexenta'
+ if '@' not in pr.netloc:
+ host_and_port = pr.netloc
+ else:
+ user_and_password, host_and_port = pr.netloc.split('@', 1)
+ if ':' in user_and_password:
+ user, password = user_and_password.split(':')
+ else:
+ user = user_and_password
+ if ':' in host_and_port:
+ host, port = host_and_port.split(':', 1)
+ else:
+ host, port = host_and_port, '2000'
+ return auto, scheme, user, password, host, port, '/rest/nms/'
auto = protocol == 'auto'
if auto:
protocol = 'http'
- url = '%s://%s:%s/rest/nms/' % (protocol,
- self.configuration.nexenta_host,
- self.configuration.nexenta_rest_port)
self.nms = jsonrpc.NexentaJSONProxy(
- url, self.configuration.nexenta_user,
+ protocol, self.configuration.nexenta_host,
+ self.configuration.nexenta_rest_port, '/rest/nms',
+ self.configuration.nexenta_user,
self.configuration.nexenta_password, auto=auto)
def check_for_setup_error(self):