from cinder.volume import volume_types
FAKE = "fake"
+FAKE2 = "fake2"
CANNOT_DELETE = "Can not delete"
TOO_BIG_VOLUME_SIZE = 12000
POOL_SIZE = 100
'id': 1,
'consistencygroup_id': CONSISTGROUP_ID,
'status': 'available'}
+VOLUME2 = {'size': 32,
+ 'name': FAKE2,
+ 'id': 2,
+ 'consistencygroup_id': CONSISTGROUP_ID,
+ 'status': 'available'}
MANAGED_FAKE = "managed_fake"
MANAGED_VOLUME = {'size': 16,
CONTEXT = {}
+FAKESNAPSHOT = 'fakesnapshot'
+SNAPSHOT = {'name': 'fakesnapshot',
+ 'id': 3}
+
CONSISTGROUP = {'id': CONSISTGROUP_ID, }
CG_SNAPSHOT_ID = 1
CG_SNAPSHOT = {'id': CG_SNAPSHOT_ID,
return {'status': 'deleted'}, volumes
+ def update_consistencygroup(
+ self, context, group,
+ add_volumes, remove_volumes):
+
+ model_update = {'status': 'available'}
+ return model_update, None, None
+
+ def create_consistencygroup_from_src(
+ self, context, group, volumes, cgsnapshot, snapshots,
+ source_cg=None, source_vols=None):
+
+ return None, None
+
def create_cgsnapshot(self, ctxt, cgsnapshot):
snapshots = []
for volume in self.volumes.values():
self.driver.delete_volume(VOLUME)
def test_create_volume_should_fail_if_no_pool_space_left(self):
- """Vertify that the xiv_ds8k_proxy validates volume pool space."""
+ """Verify that the xiv_ds8k_proxy validates volume pool space."""
self.driver.do_setup(None)
self.assertRaises(exception.VolumeBackendAPIException,
self.assertRaises(exception.VolumeBackendAPIException,
self.driver.delete_cgsnapshot,
ctxt, CG_SNAPSHOT)
+
+ def test_update_consistencygroup_without_volumes(self):
+ """Test update_consistencygroup when there are no volumes specified."""
+
+ self.driver.do_setup(None)
+
+ ctxt = context.get_admin_context()
+
+ # Update consistency group
+ model_update, added, removed = self.driver.update_consistencygroup(
+ ctxt, CONSISTGROUP, [], [])
+
+ self.assertEqual('available',
+ model_update['status'],
+ "Consistency Group update failed")
+ self.assertFalse(added,
+ "added volumes list is not empty")
+ self.assertFalse(removed,
+ "removed volumes list is not empty")
+
+ def test_update_consistencygroup_with_volumes(self):
+ """Test update_consistencygroup when there are volumes specified."""
+
+ self.driver.do_setup(None)
+
+ ctxt = context.get_admin_context()
+
+ # Update consistency group
+ model_update, added, removed = self.driver.update_consistencygroup(
+ ctxt, CONSISTGROUP, [VOLUME], [VOLUME2])
+
+ self.assertEqual('available',
+ model_update['status'],
+ "Consistency Group update failed")
+ self.assertFalse(added,
+ "added volumes list is not empty")
+ self.assertFalse(removed,
+ "removed volumes list is not empty")
+
+ def test_create_consistencygroup_from_src_without_volumes(self):
+ """Test create_consistencygroup_from_src with no volumes specified."""
+
+ self.driver.do_setup(None)
+
+ ctxt = context.get_admin_context()
+
+ # Create consistency group from source
+ model_update, volumes_model_update = (
+ self.driver.create_consistencygroup_from_src(
+ ctxt, CONSISTGROUP, [], CG_SNAPSHOT, []))
+
+ # model_update can be None or return available in status
+ if model_update:
+ self.assertEqual('available',
+ model_update['status'],
+ "Consistency Group create from source failed")
+ # volumes_model_update can be None or return available in status
+ if volumes_model_update:
+ self.assertFalse(volumes_model_update,
+ "volumes list is not empty")
+
+ def test_create_consistencygroup_from_src_with_volumes(self):
+ """Test create_consistencygroup_from_src with volumes specified."""
+
+ self.driver.do_setup(None)
+
+ ctxt = context.get_admin_context()
+
+ # Create consistency group from source
+ model_update, volumes_model_update = (
+ self.driver.create_consistencygroup_from_src(
+ ctxt, CONSISTGROUP, [VOLUME], CG_SNAPSHOT, [SNAPSHOT]))
+
+ # model_update can be None or return available in status
+ if model_update:
+ self.assertEqual('available',
+ model_update['status'],
+ "Consistency Group create from source failed")
+ # volumes_model_update can be None or return available in status
+ if volumes_model_update:
+ self.assertEqual('available',
+ volumes_model_update['status'],
+ "volumes list status failed")
"""Deletes a consistency group snapshot."""
return self.xiv_ds8k_proxy.delete_cgsnapshot(context, cgsnapshot)
+
+ def update_consistencygroup(self, context, group,
+ add_volumes, remove_volumes):
+ """Adds or removes volume(s) to/from an existing consistency group."""
+
+ return self.xiv_ds8k_proxy.update_consistencygroup(
+ context, group, add_volumes, remove_volumes)
+
+ def create_consistencygroup_from_src(
+ self, context, group, volumes, cgsnapshot, snapshots,
+ source_cg=None, source_vols=None):
+ """Creates a consistencygroup from source."""
+
+ return self.xiv_ds8k_proxy.create_consistencygroup_from_src(
+ context, group, volumes, cgsnapshot, snapshots,
+ source_cg, source_vols)