import base64
import json
-import os
-import eventlet
-from oslo_concurrency import processutils
+from os_brick.initiator import connector
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import units
BLOCK_SIZE = 8
OK_STATUS_CODE = 200
-VOLUME_NOT_FOUND_ERROR = 3
+VOLUME_NOT_FOUND_ERROR = 78
VOLUME_NOT_MAPPED_ERROR = 84
VOLUME_ALREADY_MAPPED_ERROR = 81
self.configuration.sio_storage_pools.split(',')]
self.storage_pool_name = self.configuration.sio_storage_pool_name
self.storage_pool_id = self.configuration.sio_storage_pool_id
- if (self.storage_pool_name is None and self.storage_pool_id is None):
+ if self.storage_pool_name is None and self.storage_pool_id is None:
LOG.warning(_LW("No storage pool name or id was found."))
else:
LOG.info(_LI(
"Protection domain name: %(domain_id)s."),
{'domain_id': self.protection_domain_id})
+ self.connector = connector.InitiatorConnector.factory(
+ # TODO(xyang): Change 'SCALEIO' to connector.SCALEIO after
+ # os-brick 0.4.0 is released.
+ 'SCALEIO', utils.get_root_helper(),
+ device_scan_attempts=
+ self.configuration.num_volume_device_scan_tries
+ )
+
+ self.connection_properties = {}
+ self.connection_properties['scaleIO_volname'] = None
+ self.connection_properties['hostIP'] = None
+ self.connection_properties['serverIP'] = self.server_ip
+ self.connection_properties['serverPort'] = self.server_port
+ self.connection_properties['serverUsername'] = self.server_username
+ self.connection_properties['serverPassword'] = self.server_password
+ self.connection_properties['serverToken'] = self.server_token
+ self.connection_properties['iopsLimit'] = None
+ self.connection_properties['bandwidthLimit'] = None
+
def check_for_setup_error(self):
if (not self.protection_domain_name and
not self.protection_domain_id):
def _find_bandwidth_limit(self, storage_type):
return storage_type.get(BANDWIDTH_LIMIT)
- def id_to_base64(self, id):
+ def _id_to_base64(self, id):
# Base64 encode the id to get a volume name less than 32 characters due
# to ScaleIO limitation.
name = six.text_type(id).replace("-", "")
"""Creates a scaleIO volume."""
self._check_volume_size(volume.size)
- volname = self.id_to_base64(volume.id)
+ volname = self._id_to_base64(volume.id)
storage_type = self._get_volumetype_extraspecs(volume)
storage_pool_name = self._find_storage_pool_name_from_storage_type(
LOG.info(_LI("Created volume %(volname)s, volume id %(volid)s."),
{'volname': volname, 'volid': volume.id})
+ return {'provider_id': response['id']}
+
def _check_volume_size(self, size):
if size % 8 != 0:
round_volume_capacity = (
def create_snapshot(self, snapshot):
"""Creates a scaleio snapshot."""
- volname = self.id_to_base64(snapshot.volume_id)
- snapname = self.id_to_base64(snapshot.id)
- self._snapshot_volume(volname, snapname)
+ volume_id = snapshot.volume.provider_id
+ snapname = self._id_to_base64(snapshot.id)
+ return self._snapshot_volume(volume_id, snapname)
- def _snapshot_volume(self, volname, snapname):
- vol_id = self._get_volume_id(volname)
+ def _snapshot_volume(self, vol_id, snapname):
+ LOG.info(_LI("Snapshot volume %(vol)s into snapshot %(id)s.") %
+ {'vol': vol_id, 'id': snapname})
params = {
'snapshotDefs': [{"volumeId": vol_id, "snapshotName": snapname}]}
req_vars = {'server_ip': self.server_ip,
verify=self._get_verify_cert())
r = self._check_response(r, request, False, params)
response = r.json()
- LOG.info(_LI("snapshot volume response: %s."), response)
+ LOG.info(_LI("Snapshot volume response: %s."), response)
if r.status_code != OK_STATUS_CODE and "errorCode" in response:
msg = (_("Failed creating snapshot for volume %(volname)s: "
"%(response)s.") %
- {'volname': volname,
+ {'volname': vol_id,
'response': response['message']})
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
+ return {'provider_id': response['volumeIdList'][0]}
+
def _check_response(self, response, request, is_get_request=True,
params=None):
if response.status_code == 401 or response.status_code == 403:
# becomes a new unmapped volume in the system and the user
# may manipulate it in the same manner as any other volume
# exposed by the system
- volname = self.id_to_base64(snapshot.id)
- snapname = self.id_to_base64(volume.id)
+ volume_id = snapshot.provider_id
+ snapname = self._id_to_base64(volume.id)
LOG.info(_LI(
"ScaleIO create volume from snapshot: snapshot %(snapname)s "
"to volume %(volname)s."),
- {'volname': volname,
+ {'volname': volume_id,
'snapname': snapname})
- self._snapshot_volume(volname, snapname)
-
- def _get_volume_id(self, volname):
- volname_encoded = urllib.quote(volname, '')
- volname_double_encoded = urllib.quote(volname_encoded, '')
- LOG.info(_LI("Volume name after double encoding is %s."),
- volname_double_encoded)
- req_vars = {'server_ip': self.server_ip,
- 'server_port': self.server_port,
- 'encoded': volname_double_encoded}
- request = ("https://%(server_ip)s:%(server_port)s"
- "/api/types/Volume/instances/getByName::"
- "%(encoded)s") % req_vars
- LOG.info(_LI("ScaleIO get volume id by name request: %s"), request)
- r = requests.get(
- request,
- auth=(self.server_username,
- self.server_token),
- verify=self._get_verify_cert())
- r = self._check_response(r, request)
-
- vol_id = r.json()
-
- if not vol_id:
- msg = _("Volume with name %s wasn't found.") % volname
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
- if r.status_code != OK_STATUS_CODE and "errorCode" in vol_id:
- msg = (_("Error getting volume id from name %(volname)s: %(err)s.")
- % {'volname': volname,
- 'err': vol_id['message']})
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
- LOG.info(_LI("volume id is %s."), vol_id)
- return vol_id
+ return self._snapshot_volume(volume_id, snapname)
def _get_headers(self):
return {'content-type': 'application/json'}
self._check_volume_size(new_size)
- volname = self.id_to_base64(volume.id)
-
+ vol_id = volume['provider_id']
LOG.info(_LI(
"ScaleIO extend volume: volume %(volname)s to size %(new_size)s."),
- {'volname': volname,
+ {'volname': vol_id,
'new_size': new_size})
- vol_id = self._get_volume_id(volname)
req_vars = {'server_ip': self.server_ip,
'server_port': self.server_port,
'vol_id': vol_id}
if r.status_code != OK_STATUS_CODE:
response = r.json()
msg = (_("Error extending volume %(vol)s: %(err)s.")
- % {'vol': volname,
+ % {'vol': vol_id,
'err': response['message']})
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
def create_cloned_volume(self, volume, src_vref):
"""Creates a cloned volume."""
- volname = self.id_to_base64(src_vref.id)
- snapname = self.id_to_base64(volume.id)
+ volume_id = src_vref['provider_id']
+ snapname = self._id_to_base64(volume.id)
LOG.info(_LI(
"ScaleIO create cloned volume: source volume %(src)s to target "
"volume %(tgt)s."),
- {'src': volname,
+ {'src': volume_id,
'tgt': snapname})
- self._snapshot_volume(volname, snapname)
+
+ return self._snapshot_volume(volume_id, snapname)
def delete_volume(self, volume):
"""Deletes a self.logical volume"""
- volname = self.id_to_base64(volume.id)
- self._delete_volume(volname)
-
- def _delete_volume(self, volname):
- volname_encoded = urllib.quote(volname, '')
- volname_double_encoded = urllib.quote(volname_encoded, '')
- LOG.info(_LI("Volume name after double encoding is %s."),
- volname_double_encoded)
+ volume_id = volume['provider_id']
+ self._delete_volume(volume_id)
+ def _delete_volume(self, vol_id):
verify_cert = self._get_verify_cert()
- # convert volume name to id
req_vars = {'server_ip': self.server_ip,
'server_port': self.server_port,
- 'encoded': volname_double_encoded}
- request = ("https://%(server_ip)s:%(server_port)s"
- "/api/types/Volume/instances/getByName::"
- "%(encoded)s") % req_vars
- LOG.info(_LI("ScaleIO get volume id by name request: %s."), request)
- r = requests.get(
- request,
- auth=(
- self.server_username,
- self.server_token),
- verify=verify_cert)
- r = self._check_response(r, request)
- LOG.info(_LI("Get by name response: %s."), r.text)
- vol_id = r.json()
- LOG.info(_LI("ScaleIO volume id to delete is %s."), vol_id)
-
- if r.status_code != OK_STATUS_CODE and "errorCode" in vol_id:
- msg = (_("Error getting volume id from name %(vol)s: %(err)s.")
- % {'vol': volname, 'err': vol_id['message']})
- LOG.error(msg)
-
- error_code = vol_id['errorCode']
- if (error_code == VOLUME_NOT_FOUND_ERROR):
- force_delete = self.configuration.sio_force_delete
- if force_delete:
- LOG.warning(_LW(
- "Ignoring error in delete volume %s: volume not found "
- "due to force delete settings."), volname)
- return
-
- raise exception.VolumeBackendAPIException(data=msg)
+ 'vol_id': six.text_type(vol_id)}
unmap_before_delete = (
self.configuration.sio_unmap_volume_before_deletion)
# case unmap_before_deletion is enabled.
if unmap_before_delete:
params = {'allSdcs': ''}
- req_vars = {'server_ip': self.server_ip,
- 'server_port': self.server_port,
- 'vol_id': vol_id}
request = ("https://%(server_ip)s:%(server_port)s"
"/api/instances/Volume::%(vol_id)s"
"/action/removeMappedSdc") % req_vars
auth=(
self.server_username,
self.server_token),
- verify=verify_cert)
+ verify=verify_cert
+ )
r = self._check_response(r, request, False, params)
LOG.debug("Unmap volume response: %s.", r.text)
params = {'removeMode': 'ONLY_ME'}
+ request = ("https://%(server_ip)s:%(server_port)s"
+ "/api/instances/Volume::%(vol_id)s"
+ "/action/removeVolume") % req_vars
r = requests.post(
- "https://" +
- self.server_ip +
- ":" +
- self.server_port +
- "/api/instances/Volume::" +
- six.text_type(vol_id) +
- "/action/removeVolume",
+ request,
data=json.dumps(params),
headers=self._get_headers(),
auth=(self.server_username,
self.server_token),
- verify=verify_cert)
+ verify=verify_cert
+ )
r = self._check_response(r, request, False, params)
if r.status_code != OK_STATUS_CODE:
response = r.json()
error_code = response['errorCode']
if error_code == 78:
- force_delete = self.configuration.sio_orce_delete
+ force_delete = self.configuration.sio_force_delete
if force_delete:
LOG.warning(_LW(
"Ignoring error in delete volume %s: volume not found "
def delete_snapshot(self, snapshot):
"""Deletes a ScaleIO snapshot."""
- snapname = self.id_to_base64(snapshot.id)
+ snap_id = snapshot.provider_id
LOG.info(_LI("ScaleIO delete snapshot."))
- self._delete_volume(snapname)
+ return self._delete_volume(snap_id)
def initialize_connection(self, volume, connector):
"""Initializes the connection and returns connection info.
"""
LOG.debug("Connector is %s.", connector)
- volname = self.id_to_base64(volume.id)
- properties = {}
-
- properties['scaleIO_volname'] = volname
- properties['hostIP'] = connector['ip']
- properties['serverIP'] = self.server_ip
- properties['serverPort'] = self.server_port
- properties['serverUsername'] = self.server_username
- properties['serverPassword'] = self.server_password
- properties['serverToken'] = self.server_token
+ connection_properties = dict(self.connection_properties)
+ volname = self._id_to_base64(volume.id)
+ connection_properties['scaleIO_volname'] = volname
storage_type = self._get_volumetype_extraspecs(volume)
LOG.info(_LI("Volume type is %s."), storage_type)
iops_limit = self._find_iops_limit(storage_type)
LOG.info(_LI("iops limit is: %s."), iops_limit)
bandwidth_limit = self._find_bandwidth_limit(storage_type)
LOG.info(_LI("Bandwidth limit is: %s."), bandwidth_limit)
- properties['iopsLimit'] = iops_limit
- properties['bandwidthLimit'] = bandwidth_limit
+ connection_properties['iopsLimit'] = iops_limit
+ connection_properties['bandwidthLimit'] = bandwidth_limit
return {'driver_volume_type': 'scaleio',
- 'data': properties}
+ 'data': connection_properties}
def terminate_connection(self, volume, connector, **kwargs):
LOG.debug("scaleio driver terminate connection.")
return specs
- def find_volume_path(self, volume_id):
-
- LOG.info(_LI("looking for volume %s."), volume_id)
- # Look for the volume in /dev/disk/by-id directory.
- disk_filename = ""
- tries = 0
- while not disk_filename:
- if tries > self.configuration.num_volume_device_scan_tries:
- msg = (_(
- "scaleIO volume %s not found at expected path.")
- % volume_id)
- raise exception.VolumeBackendAPIException(msg)
- by_id_path = "/dev/disk/by-id"
- if not os.path.isdir(by_id_path):
- LOG.warning(_LW(
- "scaleIO volume %(vol)s not yet found (no directory "
- "/dev/disk/by-id yet). Try number: %(tries)d."),
- {'vol': volume_id,
- 'tries': tries})
- tries = tries + 1
- eventlet.sleep(1)
- continue
- filenames = os.listdir(by_id_path)
- LOG.info(_LI(
- "Files found in path %(path)s: %(file)s."),
- {'path': by_id_path,
- 'file': filenames})
- for filename in filenames:
- if (filename.startswith("emc-vol") and
- filename.endswith(volume_id)):
- disk_filename = filename
- if not disk_filename:
- LOG.warning(_LW(
- "scaleIO volume %(vol)s not yet found. "
- "Try number: %(tries)d."),
- {'vol': volume_id,
- 'tries': tries})
- tries = tries + 1
- eventlet.sleep(1)
-
- if (tries != 0):
- LOG.info(_LI(
- "Found scaleIO device %(file)s after %(tries)d retries "),
- {'file': disk_filename,
- 'tries': tries})
- full_disk_name = by_id_path + "/" + disk_filename
- LOG.info(_LI("Full disk name is %s."), full_disk_name)
- return full_disk_name
-
- def _get_client_id(
- self, server_ip, server_username, server_password, sdc_ip):
- req_vars = {'server_ip': server_ip,
- 'server_port': self.server_port,
- 'sdc_ip': sdc_ip}
- request = ("https://%(server_ip)s:%(server_port)s"
- "/api/types/Client/instances/getByIp::"
- "%(sdc_ip)s/") % req_vars
- LOG.info(_LI("ScaleIO get client id by ip request: %s."), request)
- r = requests.get(
- request,
- auth=(
- server_username,
- self.server_token),
- verify=self._get_verify_cert())
- r = self._check_response(r, request)
-
- sdc_id = r.json()
- if not sdc_id:
- msg = _("Client with ip %s wasn't found.") % sdc_ip
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
- if r.status_code != 200 and "errorCode" in sdc_id:
- msg = (_("Error getting sdc id from ip %(ip)s: %(id)s.")
- % {'ip': sdc_ip, 'id': sdc_id['message']})
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
- LOG.info(_LI("ScaleIO sdc id is %s."), sdc_id)
- return sdc_id
+ def _sio_attach_volume(self, volume):
+ """Call connector.connect_volume() and return the path. """
+ LOG.debug("Calling os-brick to attach ScaleIO volume.")
+ connection_properties = dict(self.connection_properties)
+ connection_properties['scaleIO_volname'] = self._id_to_base64(
+ volume.id)
+ device_info = self.connector.connect_volume(connection_properties)
- def _sio_attach_volume(self, volume, sdc_ip):
- # We need to make sure we even *have* a local path
- LOG.info(_LI("ScaleIO attach volume in scaleio cinder driver."))
- volname = self.id_to_base64(volume.id)
+ return device_info['path']
- cmd = ['drv_cfg']
- cmd += ["--query_guid"]
-
- LOG.info(_LI("ScaleIO sdc query guid command: %s"), six.text_type(cmd))
-
- try:
- (out, err) = utils.execute(*cmd, run_as_root=True)
- LOG.debug("Map volume %(cmd)s: stdout=%(out)s stderr=%(err)s",
- {'cmd': cmd, 'out': out, 'err': err})
- except processutils.ProcessExecutionError as e:
- msg = _("Error querying sdc guid: %s.") % six.text_type(e.stderr)
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
-
- guid = out
- LOG.info(_LI("Current sdc guid: %s."), guid)
-
- params = {'guid': guid}
-
- volume_id = self._get_volume_id(volname)
- req_vars = {'server_ip': self.server_ip,
- 'server_port': self.server_port,
- 'volume_id': volume_id}
- request = ("https://%(server_ip)s:%(server_port)s"
- "/api/instances/Volume::%(volume_id)s"
- "/action/addMappedSdc") % req_vars
- LOG.info(_LI("Map volume request: %s."), request)
- r = requests.post(
- request,
- data=json.dumps(params),
- headers=self._get_headers(),
- auth=(
- self.server_username,
- self.server_token),
- verify=self._get_verify_cert())
- r = self._check_response(r, request, False)
-
- if r.status_code != OK_STATUS_CODE:
- response = r.json()
- error_code = response['errorCode']
- if (error_code == VOLUME_ALREADY_MAPPED_ERROR):
- LOG.warning(_LW("Ignoring error mapping volume %s: "
- "volume already mapped."), volname)
- else:
- msg = (_("Error mapping volume %(vol)s: %(err)s.")
- % {'vol': volname,
- 'err': response['message']})
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
-
- formated_id = volume_id
-
- return self.find_volume_path(formated_id)
-
- def _sio_detach_volume(self, volume, sdc_ip):
- LOG.info(_LI("ScaleIO detach volume in scaleio cinder driver."))
- volname = self.id_to_base64(volume.id)
-
- cmd = ['drv_cfg']
- cmd += ["--query_guid"]
-
- LOG.info(_LI("ScaleIO sdc query guid command: %s."), cmd)
-
- try:
- (out, err) = utils.execute(*cmd, run_as_root=True)
- LOG.debug("Unmap volume %(cmd)s: stdout=%(out)s stderr=%(err)s",
- {'cmd': cmd, 'out': out, 'err': err})
-
- except processutils.ProcessExecutionError as e:
- msg = _("Error querying sdc guid: %s.") % six.text_type(e.stderr)
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
-
- guid = out
- LOG.info(_LI("Current sdc guid: %s."), guid)
-
- params = {'guid': guid}
-
- volume_id = self._get_volume_id(volname)
- req_vars = {'server_ip': self.server_ip,
- 'server_port': self.server_port,
- 'vol_id': volume_id}
- request = ("https://%(server_ip)s:%(server_port)s"
- "/api/instances/Volume::%(vol_id)s"
- "/action/removeMappedSdc") % req_vars
- LOG.info(_LI("Unmap volume request: %s."), request)
- r = requests.post(
- request,
- data=json.dumps(params),
- headers=self._get_headers(),
- auth=(
- self.server_username,
- self.server_token),
- verify=self._get_verify_cert())
- r = self._check_response(r, request, False, params)
-
- if r.status_code != OK_STATUS_CODE:
- response = r.json()
- error_code = response['errorCode']
- if error_code == VOLUME_NOT_MAPPED_ERROR:
- LOG.warning(_LW("Ignoring error unmapping volume %s: "
- "volume not mapped."), volname)
- else:
- msg = (_("Error unmapping volume %(vol)s: %(err)s.")
- % {'vol': volname,
- 'err': response['message']})
- LOG.error(msg)
- raise exception.VolumeBackendAPIException(data=msg)
+ def _sio_detach_volume(self, volume):
+ """Call the connector.disconnect() """
+ LOG.info(_LI("Calling os-brick to detach ScaleIO volume."))
+ connection_properties = dict(self.connection_properties)
+ connection_properties['scaleIO_volname'] = self._id_to_base64(
+ volume.id)
+ self.connector.disconnect_volume(connection_properties, volume)
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Fetch the image from image_service and write it to the volume."""
{'vol': volume,
'service': six.text_type(image_service),
'id': six.text_type(image_id)})
- properties = utils.brick_get_connector_properties()
- sdc_ip = properties['ip']
- LOG.debug("SDC ip is: %s", sdc_ip)
try:
image_utils.fetch_to_raw(context,
image_service,
image_id,
- self._sio_attach_volume(volume, sdc_ip),
+ self._sio_attach_volume(volume),
BLOCK_SIZE,
size=volume['size'])
finally:
- self._sio_detach_volume(volume, sdc_ip)
+ self._sio_detach_volume(volume)
def copy_volume_to_image(self, context, volume, image_service, image_meta):
"""Copy the volume to the specified image."""
{'vol': volume,
'service': six.text_type(image_service),
'meta': six.text_type(image_meta)})
- properties = utils.brick_get_connector_properties()
- sdc_ip = properties['ip']
- LOG.debug("SDC ip is: {0}".format(sdc_ip))
try:
image_utils.upload_volume(context,
image_service,
image_meta,
- self._sio_attach_volume(volume, sdc_ip))
+ self._sio_attach_volume(volume))
finally:
- self._sio_detach_volume(volume, sdc_ip)
+ self._sio_detach_volume(volume)
def ensure_export(self, context, volume):
"""Driver entry point to get the export info for an existing volume."""