FAKE_LIST_STORAGE_SNAPSHOTS_RESPONSE = """{ "listDatasetSnapshotsResponse" : {
"count":1 ,
"snapshot" : [{
- "name": "DS1Snap1",
- "path": "devpool1/acc1openstacktsm/DS1@DS1Snap1",
+ "name": "snap_c60890b1f23646f29e6d51e6e592cee6",
+ "path": "DS1@snap_c60890b1f23646f29e6d51e6e592cee6",
"availMem": "-",
"usedMem": "0",
"refer": "26K",
"""{ "createStorageSnapshotResponse" : {
"StorageSnapshot" : {
"id": "21d7a92a-f15e-3f5b-b981-cb30697b8028",
- "name": "DS1Snap1",
+ "name": "snap_c60890b1f23646f29e6d51e6e592cee6",
"usn": "21d7a92af15e3f5bb981cb30697b8028",
"lunusn": "12371e7c392b34b9ac43073b3c85f1d1",
"lunid": "12371e7c-392b-34b9-ac43-073b3c85f1d1",
json.loads(FAKE_UPDATE_FILE_SYSTEM_RESPONSE))
MAP_COMMAND_TO_FAKE_RESPONSE['updateQosGroup'] = (
json.loads(FAKE_UPDATE_QOS_GROUP_RESPONSE))
+MAP_COMMAND_TO_FAKE_RESPONSE['listStorageSnapshots'] = (
+ json.loads(FAKE_LIST_STORAGE_SNAPSHOTS_RESPONSE))
# This dict maps the http commands of elasticenter
# with its respective fake json responses
fake_volume_id = self._get_fake_volume_id()
snapshot = {
- 'id': 'SomeID',
+ 'id': 'c60890b1-f236-46f2-9e6d-51e6e592cee6',
'display_name': 'DS1Snap1',
'volume_id': 'SomeVol',
'volume': {
mock_api_req.side_effect = self._side_effect_api_req
# now run the test
- self.driver.create_snapshot(snapshot)
+ model_update = self.driver.create_snapshot(snapshot)
# assert that 2 api calls were invoked
self.assertEqual(2, mock_api_req.call_count)
+ self.assertEqual('DS1@snap_c60890b1f23646f29e6d51e6e592cee6',
+ model_update['provider_id'])
+
# Test - II
# reconfigure the dependencies
import httplib
import json
-import time
import urllib
+import uuid
from oslo_log import log as logging
from oslo_utils import units
self._api_request_for_cloudbyte(
'updateVolumeiSCSIService', params)
- def _get_cb_snapshot_path(self, snapshot, volume_id):
+ def _get_cb_snapshot_path(self, snapshot_name, volume_id):
"""Find CloudByte snapshot path."""
params = {"id": volume_id}
# Filter snapshot path
for snap in cb_snapshot:
- if snap['name'] == snapshot['display_name']:
+ if snap['name'] == snapshot_name:
path = snap['path']
break
return volume_id
- def _generate_clone_name(self):
- """Generates clone name when it is not provided."""
-
- clone_name = ("clone_" + time.strftime("%d%m%Y") +
- time.strftime("%H%M%S"))
- return clone_name
-
- def _generate_snapshot_name(self):
- """Generates snapshot_name when it is not provided."""
-
- snapshot_name = ("snap_" + time.strftime("%d%m%Y") +
- time.strftime("%H%M%S"))
- return snapshot_name
-
def _get_storage_info(self, tsmname):
"""Get CloudByte TSM that is associated with OpenStack backend."""
if cb_volume_id is not None:
- snapshot_name = snapshot['display_name']
- if snapshot_name is None or snapshot_name == '':
- # Generate the snapshot name
- snapshot_name = self._generate_snapshot_name()
- # Update the snapshot dict for later use
- snapshot['display_name'] = snapshot_name
+ # Set backend storage snapshot name using OpenStack snapshot id
+ snapshot_name = "snap_" + snapshot['id'].replace("-", "")
params = {
"name": snapshot_name,
self._api_request_for_cloudbyte('createStorageSnapshot', params)
# Get the snapshot path from CloudByte
- path = self._get_cb_snapshot_path(snapshot, cb_volume_id)
+ path = self._get_cb_snapshot_path(snapshot_name, cb_volume_id)
LOG.info(
_LI("Created CloudByte snapshot [%(cb_snap)s] "
# Extract necessary information from input params
parent_volume_id = cloned_volume.get('source_volid')
- # Generating name and id for snapshot
+ # Generating id for snapshot
# as this is not user entered in this particular usecase
- snapshot_name = self._generate_snapshot_name()
-
- snapshot_id = (six.text_type(parent_volume_id) + "_" +
- time.strftime("%d%m%Y") + time.strftime("%H%M%S"))
+ snapshot_id = six.text_type(uuid.uuid1())
# Prepare the params for create_snapshot
# as well as create_volume_from_snapshot method
snapshot_params = {
'id': snapshot_id,
- 'display_name': snapshot_name,
'volume_id': parent_volume_id,
'volume': src_volume,
}