mock_cgroup.id = "4a2f7e3a-312a-40c5-96a8-536b8a0fe074"
mock_cgroup['status'] = "deleted"
mock_context = mock.Mock()
- self.driver.db = mock.Mock()
mock_volume = mock.MagicMock()
- expected_volumes = [mock_volume]
- self.driver.db.volume_get_all_by_group.return_value = expected_volumes
- model_update, volumes = \
- self.driver.delete_consistencygroup(mock_context, mock_cgroup, [])
+ model_update, volumes = self.driver.delete_consistencygroup(
+ mock_context, mock_cgroup, [mock_volume])
expected_name = self.driver._get_pgroup_name_from_id(mock_cgroup.id)
self.array.destroy_pgroup.assert_called_with(expected_name)
- self.assertEqual(expected_volumes, volumes)
+
+ expected_volume_updates = [{
+ 'id': mock_volume.id,
+ 'status': 'deleted'
+ }]
+ self.assertEqual(expected_volume_updates, volumes)
self.assertEqual(mock_cgroup['status'], model_update['status'])
mock_delete_volume.assert_called_with(self.driver, mock_volume)
code=400,
text="Protection group has been destroyed."
)
- self.driver.delete_consistencygroup(mock_context, mock_cgroup, [])
+ self.driver.delete_consistencygroup(mock_context,
+ mock_cgroup,
+ [mock_volume])
self.array.destroy_pgroup.assert_called_with(expected_name)
mock_delete_volume.assert_called_with(self.driver, mock_volume)
code=400,
text="Protection group does not exist"
)
- self.driver.delete_consistencygroup(mock_context, mock_cgroup, [])
+ self.driver.delete_consistencygroup(mock_context,
+ mock_cgroup,
+ [mock_volume])
self.array.destroy_pgroup.assert_called_with(expected_name)
mock_delete_volume.assert_called_with(self.driver, mock_volume)
self.driver.delete_consistencygroup,
mock_context,
mock_volume,
- [])
+ [mock_volume])
self.array.destroy_pgroup.side_effect = \
self.purestorage_module.PureHTTPError(
self.driver.delete_consistencygroup,
mock_context,
mock_volume,
- [])
+ [mock_volume])
self.array.destroy_pgroup.side_effect = None
self.assert_error_propagates(
[self.array.destroy_pgroup],
- self.driver.delete_consistencygroup, mock_context, mock_cgroup, [])
+ self.driver.delete_consistencygroup,
+ mock_context,
+ mock_cgroup,
+ [mock_volume]
+ )
def _create_mock_cg(self):
mock_group = mock.MagicMock()
remvollist=[]
)
- @mock.patch('cinder.objects.snapshot.SnapshotList.get_all_for_cgsnapshot')
- def test_create_cgsnapshot(self, mock_snap_list):
+ def test_create_cgsnapshot(self):
mock_cgsnap = mock.Mock()
mock_cgsnap.id = "4a2f7e3a-312a-40c5-96a8-536b8a0fe074"
mock_cgsnap.consistencygroup_id = \
"4a2f7e3a-312a-40c5-96a8-536b8a0fe075"
mock_context = mock.Mock()
mock_snap = mock.MagicMock()
- expected_snaps = [mock_snap]
- mock_snap_list.return_value = expected_snaps
-
- model_update, snapshots = \
- self.driver.create_cgsnapshot(mock_context, mock_cgsnap, [])
+ model_update, snapshots = self.driver.create_cgsnapshot(mock_context,
+ mock_cgsnap,
+ [mock_snap])
cg_id = mock_cgsnap.consistencygroup_id
expected_pgroup_name = self.driver._get_pgroup_name_from_id(cg_id)
expected_snap_suffix = self.driver._get_pgroup_snap_suffix(mock_cgsnap)
.assert_called_with(expected_pgroup_name,
suffix=expected_snap_suffix)
self.assertEqual({'status': 'available'}, model_update)
- self.assertEqual(expected_snaps, snapshots)
- self.assertEqual('available', mock_snap.status)
+
+ expected_snapshot_update = [{
+ 'id': mock_snap.id,
+ 'status': 'available'
+ }]
+ self.assertEqual(expected_snapshot_update, snapshots)
self.assert_error_propagates(
[self.array.create_pgroup_snapshot],
@mock.patch(BASE_DRIVER_OBJ + "._get_pgroup_snap_name",
spec=pure.PureBaseVolumeDriver._get_pgroup_snap_name)
- @mock.patch('cinder.objects.snapshot.SnapshotList.get_all_for_cgsnapshot')
- def test_delete_cgsnapshot(self, mock_snap_list, mock_get_snap_name):
+ def test_delete_cgsnapshot(self, mock_get_snap_name):
snap_name = "consisgroup-4a2f7e3a-312a-40c5-96a8-536b8a0f" \
"e074-cinder.4a2f7e3a-312a-40c5-96a8-536b8a0fe075"
mock_get_snap_name.return_value = snap_name
mock_cgsnap.status = 'deleted'
mock_context = mock.Mock()
mock_snap = mock.Mock()
- expected_snaps = [mock_snap]
- mock_snap_list.return_value = expected_snaps
- model_update, snapshots = \
- self.driver.delete_cgsnapshot(mock_context, mock_cgsnap, [])
+ model_update, snapshots = self.driver.delete_cgsnapshot(mock_context,
+ mock_cgsnap,
+ [mock_snap])
self.array.destroy_pgroup.assert_called_with(snap_name)
self.assertEqual({'status': mock_cgsnap.status}, model_update)
- self.assertEqual(expected_snaps, snapshots)
- self.assertEqual('deleted', mock_snap.status)
+
+ expected_snapshot_update = [{
+ 'id': mock_snap.id,
+ 'status': 'deleted'
+ }]
+ self.assertEqual(expected_snapshot_update, snapshots)
self.array.destroy_pgroup.side_effect = \
self.purestorage_module.PureHTTPError(
code=400,
text="Protection group snapshot has been destroyed."
)
- self.driver.delete_cgsnapshot(mock_context, mock_cgsnap, [])
+ self.driver.delete_cgsnapshot(mock_context, mock_cgsnap, [mock_snap])
self.array.destroy_pgroup.assert_called_with(snap_name)
self.array.destroy_pgroup.side_effect = \
code=400,
text="Protection group snapshot does not exist"
)
- self.driver.delete_cgsnapshot(mock_context, mock_cgsnap, [])
+ self.driver.delete_cgsnapshot(mock_context, mock_cgsnap, [mock_snap])
self.array.destroy_pgroup.assert_called_with(snap_name)
self.array.destroy_pgroup.side_effect = \
self.driver.delete_cgsnapshot,
mock_context,
mock_cgsnap,
- [])
+ [mock_snap])
self.array.destroy_pgroup.side_effect = \
self.purestorage_module.PureHTTPError(
self.driver.delete_cgsnapshot,
mock_context,
mock_cgsnap,
- [])
+ [mock_snap])
self.array.destroy_pgroup.side_effect = None
self.assert_error_propagates(
[self.array.destroy_pgroup],
- self.driver.delete_cgsnapshot, mock_context, mock_cgsnap, [])
+ self.driver.delete_cgsnapshot,
+ mock_context,
+ mock_cgsnap,
+ [mock_snap]
+ )
def test_manage_existing(self):
ref_name = 'vol1'
from cinder import context
from cinder import exception
from cinder.i18n import _, _LE, _LI, _LW
-from cinder import objects
from cinder.objects import fields
from cinder import utils
from cinder.volume import driver
LOG.warning(_LW("Unable to delete Protection Group: %s"),
err.text)
- volumes = self.db.volume_get_all_by_group(context, group.id)
-
+ volume_updates = []
for volume in volumes:
self.delete_volume(volume)
- volume.status = 'deleted'
+ volume_updates.append({
+ 'id': volume.id,
+ 'status': 'deleted'
+ })
model_update = {'status': group['status']}
- return model_update, volumes
+ return model_update, volume_updates
@log_debug_trace
def update_consistencygroup(self, context, group,
pgsnap_suffix = self._get_pgroup_snap_suffix(cgsnapshot)
self._array.create_pgroup_snapshot(pgroup_name, suffix=pgsnap_suffix)
- snapshots = objects.SnapshotList().get_all_for_cgsnapshot(
- context, cgsnapshot.id)
-
+ snapshot_updates = []
for snapshot in snapshots:
- snapshot.status = 'available'
+ snapshot_updates.append({
+ 'id': snapshot.id,
+ 'status': 'available'
+ })
model_update = {'status': 'available'}
- return model_update, snapshots
+ return model_update, snapshot_updates
def _delete_pgsnapshot(self, pgsnap_name):
try:
pgsnap_name = self._get_pgroup_snap_name(cgsnapshot)
self._delete_pgsnapshot(pgsnap_name)
- snapshots = objects.SnapshotList.get_all_for_cgsnapshot(
- context, cgsnapshot.id)
-
+ snapshot_updates = []
for snapshot in snapshots:
- snapshot.status = 'deleted'
+ snapshot_updates.append({
+ 'id': snapshot.id,
+ 'status': 'deleted',
+ })
model_update = {'status': cgsnapshot.status}
- return model_update, snapshots
+ return model_update, snapshot_updates
def _validate_manage_existing_ref(self, existing_ref, is_snap=False):
"""Ensure that an existing_ref is valid and return volume info