common = self.driver._login()
unm_matcher = common._get_3par_unm_name(self.volume['id'])
+ ums_matcher = common._get_3par_ums_name(self.volume['id'])
existing_ref = {'source-name': unm_matcher}
result = common._get_existing_volume_ref_name(existing_ref)
result = common._get_existing_volume_ref_name(existing_ref)
self.assertEqual(unm_matcher, result)
+ existing_ref = {'source-id': self.volume['id']}
+ result = common._get_existing_volume_ref_name(existing_ref, True)
+ self.assertEqual(ums_matcher, result)
+
existing_ref = {'bad-key': 'foo'}
self.assertRaises(
exception.ManageExistingInvalidReference,
expected +
self.standard_logout)
+ def test_manage_existing_snapshot(self):
+ mock_client = self.setup_driver()
+
+ new_comment = Comment({
+ "display_name": "snap",
+ "volume_name": "volume-007dbfce-7579-40bc-8f90-a20b3902283e",
+ "volume_id": "007dbfce-7579-40bc-8f90-a20b3902283e",
+ "description": "",
+ })
+ snapshot = {
+ 'display_name': None,
+ 'id': '007dbfce-7579-40bc-8f90-a20b3902283e',
+ 'volume_id': self.VOLUME_ID,
+ }
+
+ mock_client.getVolume.return_value = {
+ "comment": "{'display_name': 'snap'}",
+ 'copyOf': self.VOLUME_NAME_3PAR,
+ }
+
+ with mock.patch.object(hpecommon.HPE3PARCommon,
+ '_create_client') as mock_create_client:
+ mock_create_client.return_value = mock_client
+ common = self.driver._login()
+
+ oss_matcher = common._get_3par_snap_name(snapshot['id'])
+ ums_matcher = common._get_3par_ums_name(snapshot['id'])
+ existing_ref = {'source-name': ums_matcher}
+ expected_obj = {'display_name': 'snap'}
+
+ obj = self.driver.manage_existing_snapshot(snapshot, existing_ref)
+
+ expected = [
+ mock.call.getVolume(existing_ref['source-name']),
+ mock.call.modifyVolume(existing_ref['source-name'],
+ {'newName': oss_matcher,
+ 'comment': new_comment}),
+ ]
+
+ mock_client.assert_has_calls(
+ self.standard_login +
+ expected +
+ self.standard_logout)
+ self.assertEqual(expected_obj, obj)
+
+ def test_manage_existing_snapshot_invalid_parent(self):
+ mock_client = self.setup_driver()
+
+ snapshot = {
+ 'display_name': None,
+ 'id': '007dbfce-7579-40bc-8f90-a20b3902283e',
+ 'volume_id': self.VOLUME_ID,
+ }
+
+ mock_client.getVolume.return_value = {
+ "comment": "{'display_name': 'snap'}",
+ 'copyOf': 'fake-invalid',
+ }
+
+ with mock.patch.object(hpecommon.HPE3PARCommon,
+ '_create_client') as mock_create_client:
+ mock_create_client.return_value = mock_client
+ common = self.driver._login()
+
+ ums_matcher = common._get_3par_ums_name(snapshot['id'])
+ existing_ref = {'source-name': ums_matcher}
+
+ self.assertRaises(exception.InvalidInput,
+ self.driver.manage_existing_snapshot,
+ snapshot=snapshot,
+ existing_ref=existing_ref)
+
+ expected = [
+ mock.call.getVolume(existing_ref['source-name']),
+ ]
+
+ mock_client.assert_has_calls(
+ self.standard_login +
+ expected +
+ self.standard_logout)
+
def test_manage_existing_get_size(self):
mock_client = self.setup_driver()
mock_client.getVolume.return_value = {'sizeMiB': 2048}
expected +
self.standard_logout)
+ def test_manage_existing_snapshot_get_size(self):
+ mock_client = self.setup_driver()
+ mock_client.getVolume.return_value = {'sizeMiB': 2048}
+
+ with mock.patch.object(hpecommon.HPE3PARCommon,
+ '_create_client') as mock_create_client:
+ mock_create_client.return_value = mock_client
+ common = self.driver._login()
+
+ ums_matcher = common._get_3par_ums_name(self.snapshot['id'])
+ snapshot = {}
+ existing_ref = {'source-name': ums_matcher}
+
+ size = self.driver.manage_existing_snapshot_get_size(snapshot,
+ existing_ref)
+
+ expected_size = 2
+ expected = [mock.call.getVolume(existing_ref['source-name'])]
+
+ mock_client.assert_has_calls(
+ self.standard_login +
+ expected +
+ self.standard_logout)
+ self.assertEqual(expected_size, size)
+
+ def test_manage_existing_snapshot_get_size_invalid_reference(self):
+ mock_client = self.setup_driver()
+
+ with mock.patch.object(hpecommon.HPE3PARCommon,
+ '_create_client') as mock_create_client:
+ mock_create_client.return_value = mock_client
+
+ snapshot = {}
+ existing_ref = {'source-name': self.SNAPSHOT_3PAR_NAME}
+
+ self.assertRaises(exception.ManageExistingInvalidReference,
+ self.driver.manage_existing_snapshot_get_size,
+ snapshot=snapshot,
+ existing_ref=existing_ref)
+
+ mock_client.assert_has_calls(
+ self.standard_login +
+ self.standard_logout)
+
+ existing_ref = {}
+
+ self.assertRaises(exception.ManageExistingInvalidReference,
+ self.driver.manage_existing_snapshot_get_size,
+ snapshot=snapshot,
+ existing_ref=existing_ref)
+
+ mock_client.assert_has_calls(
+ self.standard_login +
+ self.standard_logout)
+
+ def test_manage_existing_snapshot_get_size_invalid_input(self):
+ mock_client = self.setup_driver()
+ mock_client.getVolume.side_effect = hpeexceptions.HTTPNotFound('fake')
+
+ with mock.patch.object(hpecommon.HPE3PARCommon,
+ '_create_client') as mock_create_client:
+ mock_create_client.return_value = mock_client
+ common = self.driver._login()
+
+ ums_matcher = common._get_3par_ums_name(self.snapshot['id'])
+ snapshot = {}
+ existing_ref = {'source-name': ums_matcher}
+
+ self.assertRaises(exception.InvalidInput,
+ self.driver.manage_existing_snapshot_get_size,
+ snapshot=snapshot,
+ existing_ref=existing_ref)
+
+ expected = [mock.call.getVolume(existing_ref['source-name'])]
+
+ mock_client.assert_has_calls(
+ self.standard_login +
+ expected +
+ self.standard_logout)
+
def test_unmanage(self):
mock_client = self.setup_driver()
with mock.patch.object(hpecommon.HPE3PARCommon,
expected +
self.standard_logout)
+ def test_unmanage_snapshot(self):
+ mock_client = self.setup_driver()
+ with mock.patch.object(hpecommon.HPE3PARCommon,
+ '_create_client') as mock_create_client:
+ mock_create_client.return_value = mock_client
+ common = self.driver._login()
+ self.driver.unmanage_snapshot(self.snapshot)
+
+ oss_matcher = common._get_3par_snap_name(self.snapshot['id'])
+ ums_matcher = common._get_3par_ums_name(self.snapshot['id'])
+
+ expected = [
+ mock.call.modifyVolume(oss_matcher, {'newName': ums_matcher})
+ ]
+
+ mock_client.assert_has_calls(
+ self.standard_login +
+ expected +
+ self.standard_logout)
+
def test__safe_hostname(self):
long_hostname = "abc123abc123abc123abc123abc123abc123"
fixed_hostname = "abc123abc123abc123abc123abc123a"
3.0.3 - Remove db access for consistency groups
3.0.4 - Adds v2 managed replication support
3.0.5 - Adds v2 unmanaged replication support
+ 3.0.6 - Adding manage/unmanage snapshot support
"""
- VERSION = "3.0.5"
+ VERSION = "3.0.6"
stats = {}
# any model updates from retype.
return updates
+ def manage_existing_snapshot(self, snapshot, existing_ref):
+ """Manage an existing 3PAR snapshot.
+
+ existing_ref is a dictionary of the form:
+ {'source-name': <name of the snapshot>}
+ """
+ target_snap_name = self._get_existing_volume_ref_name(existing_ref,
+ is_snapshot=True)
+
+ # Check for the existence of the snapshot.
+ try:
+ snap = self.client.getVolume(target_snap_name)
+ except hpeexceptions.HTTPNotFound:
+ err = (_("Snapshot '%s' doesn't exist on array.") %
+ target_snap_name)
+ LOG.error(err)
+ raise exception.InvalidInput(reason=err)
+
+ # Make sure the snapshot is being associated with the correct volume.
+ parent_vol_name = self._get_3par_vol_name(snapshot['volume_id'])
+ if parent_vol_name != snap['copyOf']:
+ err = (_("The provided snapshot '%s' is not a snapshot of "
+ "the provided volume.") % target_snap_name)
+ LOG.error(err)
+ raise exception.InvalidInput(reason=err)
+
+ new_comment = {}
+
+ # Use the display name from the existing snapshot if no new name
+ # was chosen by the user.
+ if snapshot['display_name']:
+ display_name = snapshot['display_name']
+ new_comment['display_name'] = snapshot['display_name']
+ elif 'comment' in snap:
+ display_name = self._get_3par_vol_comment_value(snap['comment'],
+ 'display_name')
+ if display_name:
+ new_comment['display_name'] = display_name
+ else:
+ display_name = None
+
+ # Generate the new snapshot information based on the new ID.
+ new_snap_name = self._get_3par_snap_name(snapshot['id'])
+ new_comment['volume_id'] = snapshot['id']
+ new_comment['volume_name'] = 'volume-' + snapshot['id']
+ if snapshot.get('display_description', None):
+ new_comment['description'] = snapshot['display_description']
+ else:
+ new_comment['description'] = ""
+
+ new_vals = {'newName': new_snap_name,
+ 'comment': json.dumps(new_comment)}
+
+ # Update the existing snapshot with the new name and comments.
+ self.client.modifyVolume(target_snap_name, new_vals)
+
+ LOG.info(_LI("Snapshot '%(ref)s' renamed to '%(new)s'."),
+ {'ref': existing_ref['source-name'], 'new': new_snap_name})
+
+ updates = {'display_name': display_name}
+
+ LOG.info(_LI("Snapshot %(disp)s '%(new)s' is now being managed."),
+ {'disp': display_name, 'new': new_snap_name})
+
+ # Return display name to update the name displayed in the GUI.
+ return updates
+
def manage_existing_get_size(self, volume, existing_ref):
"""Return size of volume to be managed by manage_existing.
return int(math.ceil(float(vol['sizeMiB']) / units.Ki))
+ def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
+ """Return size of snapshot to be managed by manage_existing_snapshot.
+
+ existing_ref is a dictionary of the form:
+ {'source-name': <name of the snapshot>}
+ """
+ target_snap_name = self._get_existing_volume_ref_name(existing_ref,
+ is_snapshot=True)
+
+ # Make sure the reference is not in use.
+ if re.match('osv-*|oss-*|vvs-*|unm-*', target_snap_name):
+ reason = _("Reference must be for an unmanaged snapshot.")
+ raise exception.ManageExistingInvalidReference(
+ existing_ref=target_snap_name,
+ reason=reason)
+
+ # Check for the existence of the snapshot.
+ try:
+ snap = self.client.getVolume(target_snap_name)
+ except hpeexceptions.HTTPNotFound:
+ err = (_("Snapshot '%s' doesn't exist on array.") %
+ target_snap_name)
+ LOG.error(err)
+ raise exception.InvalidInput(reason=err)
+
+ return int(math.ceil(float(snap['sizeMiB']) / units.Ki))
+
def unmanage(self, volume):
"""Removes the specified volume from Cinder management."""
# Rename the volume's name to unm-* format so that it can be
'vol': vol_name,
'new': new_vol_name})
- def _get_existing_volume_ref_name(self, existing_ref):
+ def unmanage_snapshot(self, snapshot):
+ """Removes the specified snapshot from Cinder management."""
+ # Rename the snapshots's name to ums-* format so that it can be
+ # easily found later.
+ snap_name = self._get_3par_snap_name(snapshot['id'])
+ new_snap_name = self._get_3par_ums_name(snapshot['id'])
+ self.client.modifyVolume(snap_name, {'newName': new_snap_name})
+
+ LOG.info(_LI("Snapshot %(disp)s '%(vol)s' is no longer managed. "
+ "Snapshot renamed to '%(new)s'."),
+ {'disp': snapshot['display_name'],
+ 'vol': snap_name,
+ 'new': new_snap_name})
+
+ def _get_existing_volume_ref_name(self, existing_ref, is_snapshot=False):
"""Returns the volume name of an existing reference.
Checks if an existing volume reference has a source-name or
if 'source-name' in existing_ref:
vol_name = existing_ref['source-name']
elif 'source-id' in existing_ref:
- vol_name = self._get_3par_unm_name(existing_ref['source-id'])
+ if is_snapshot:
+ vol_name = self._get_3par_ums_name(existing_ref['source-id'])
+ else:
+ vol_name = self._get_3par_unm_name(existing_ref['source-id'])
else:
reason = _("Reference must contain source-name or source-id.")
raise exception.ManageExistingInvalidReference(
snapshot_name = self._encode_name(snapshot_id)
return "oss-%s" % snapshot_name
+ def _get_3par_ums_name(self, snapshot_id):
+ ums_name = self._encode_name(snapshot_id)
+ return "ums-%s" % ums_name
+
def _get_3par_vvs_name(self, volume_id):
vvs_name = self._encode_name(volume_id)
return "vvs-%s" % vvs_name
driver.ManageableVD,
driver.ExtendVD,
driver.SnapshotVD,
+ driver.ManageableSnapshotsVD,
driver.MigrateVD,
driver.ConsistencyGroupVD,
driver.BaseVD):
3.0.1 - Remove db access for consistency groups
3.0.2 - Adds v2 managed replication support
3.0.3 - Adds v2 unmanaged replication support
+ 3.0.4 - Adding manage/unmanage snapshot support
"""
- VERSION = "3.0.3"
+ VERSION = "3.0.4"
def __init__(self, *args, **kwargs):
super(HPE3PARFCDriver, self).__init__(*args, **kwargs)
finally:
self._logout(common)
+ def manage_existing_snapshot(self, snapshot, existing_ref):
+ common = self._login()
+ try:
+ return common.manage_existing_snapshot(snapshot, existing_ref)
+ finally:
+ self._logout(common)
+
def manage_existing_get_size(self, volume, existing_ref):
common = self._login(volume)
try:
finally:
self._logout(common)
+ def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
+ common = self._login()
+ try:
+ return common.manage_existing_snapshot_get_size(snapshot,
+ existing_ref)
+ finally:
+ self._logout(common)
+
def unmanage(self, volume):
common = self._login(volume)
try:
finally:
self._logout(common)
+ def unmanage_snapshot(self, snapshot):
+ common = self._login()
+ try:
+ common.unmanage_snapshot(snapshot)
+ finally:
+ self._logout(common)
+
def attach_volume(self, context, volume, instance_uuid, host_name,
mountpoint):
common = self._login(volume)
driver.ManageableVD,
driver.ExtendVD,
driver.SnapshotVD,
+ driver.ManageableSnapshotsVD,
driver.MigrateVD,
driver.ConsistencyGroupVD,
driver.BaseVD):
3.0.3 - Fix multipath dictionary key error. bug #1522062
3.0.4 - Adds v2 managed replication support
3.0.5 - Adds v2 unmanaged replication support
+ 3.0.6 - Adding manage/unmanage snapshot support
"""
- VERSION = "3.0.5"
+ VERSION = "3.0.6"
def __init__(self, *args, **kwargs):
super(HPE3PARISCSIDriver, self).__init__(*args, **kwargs)
finally:
self._logout(common)
+ def manage_existing_snapshot(self, snapshot, existing_ref):
+ common = self._login()
+ try:
+ return common.manage_existing_snapshot(snapshot, existing_ref)
+ finally:
+ self._logout(common)
+
def manage_existing_get_size(self, volume, existing_ref):
common = self._login(volume)
try:
finally:
self._logout(common)
+ def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
+ common = self._login()
+ try:
+ return common.manage_existing_snapshot_get_size(snapshot,
+ existing_ref)
+ finally:
+ self._logout(common)
+
def unmanage(self, volume):
common = self._login(volume)
try:
finally:
self._logout(common)
+ def unmanage_snapshot(self, snapshot):
+ common = self._login()
+ try:
+ common.unmanage_snapshot(snapshot)
+ finally:
+ self._logout(common)
+
def attach_volume(self, context, volume, instance_uuid, host_name,
mountpoint):
common = self._login(volume)
--- /dev/null
+---
+features:
+ - Added snapshot manage/unmanage support to the HPE 3PAR driver.