return mapper
-class AdminActionsTest(test.TestCase):
+class BaseAdminTest(test.TestCase):
+ def setUp(self):
+ super(BaseAdminTest, self).setUp()
+ self.volume_api = volume_api.API()
+ # admin context
+ self.ctx = context.RequestContext('admin', 'fake', True)
+
+ def _create_volume(self, context, updates=None):
+ db_volume = {'status': 'available',
+ 'host': 'test',
+ 'availability_zone': 'fake_zone',
+ 'attach_status': 'detached'}
+ if updates:
+ db_volume.update(updates)
+
+ volume = objects.Volume(context=context, **db_volume)
+ volume.create()
+ return volume
+
+
+class AdminActionsTest(BaseAdminTest):
def setUp(self):
super(AdminActionsTest, self).setUp()
group='oslo_concurrency')
self.flags(rpc_backend='cinder.openstack.common.rpc.impl_fake')
- self.volume_api = volume_api.API()
cast_as_call.mock_cast_as_call(self.volume_api.volume_rpcapi.client)
cast_as_call.mock_cast_as_call(self.volume_api.scheduler_rpcapi.client)
resp = req.get_response(app())
return resp
- def _create_volume(self, context, updates=None):
- db_volume = {'status': 'available',
- 'host': 'test',
- 'availability_zone': 'fake_zone',
- 'attach_status': 'detached'}
- if updates:
- db_volume.update(updates)
-
- volume = objects.Volume(context=context, **db_volume)
- volume.create()
- return volume
-
def test_valid_updates(self):
vac = admin_actions.VolumeAdminController()
vac.validate_update({'migration_status': 'starting'})
def test_reset_attach_status(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'attach_status': 'detached'})
+ volume = db.volume_create(self.ctx, {'attach_status': 'detached'})
- resp = self._issue_volume_reset(ctx,
+ resp = self._issue_volume_reset(self.ctx,
volume,
{'attach_status': 'attached'})
self.assertEqual(202, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('attached', volume['attach_status'])
def test_reset_attach_invalid_status(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'attach_status': 'detached'})
+ volume = db.volume_create(self.ctx, {'attach_status': 'detached'})
- resp = self._issue_volume_reset(ctx,
+ resp = self._issue_volume_reset(self.ctx,
volume,
{'attach_status': 'bogus-status'})
self.assertEqual(400, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('detached', volume['attach_status'])
def test_reset_migration_invalid_status(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'migration_status': None})
+ volume = db.volume_create(self.ctx, {'migration_status': None})
- resp = self._issue_volume_reset(ctx,
+ resp = self._issue_volume_reset(self.ctx,
volume,
{'migration_status': 'bogus-status'})
self.assertEqual(400, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertIsNone(volume['migration_status'])
def test_reset_migration_status(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'migration_status': None})
+ volume = db.volume_create(self.ctx, {'migration_status': None})
- resp = self._issue_volume_reset(ctx,
+ resp = self._issue_volume_reset(self.ctx,
volume,
{'migration_status': 'migrating'})
self.assertEqual(202, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('migrating', volume['migration_status'])
def test_reset_status_as_admin(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available'})
+ volume = db.volume_create(self.ctx, {'status': 'available'})
- resp = self._issue_volume_reset(ctx,
+ resp = self._issue_volume_reset(self.ctx,
volume,
{'status': 'error'})
self.assertEqual(202, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('error', volume['status'])
def test_reset_status_as_non_admin(self):
ctx = context.RequestContext('fake', 'fake')
- volume = db.volume_create(context.get_admin_context(),
+ volume = db.volume_create(self.ctx,
{'status': 'error', 'size': 1})
resp = self._issue_volume_reset(ctx,
# request is not authorized
self.assertEqual(403, resp.status_int)
- volume = db.volume_get(context.get_admin_context(), volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
# status is still 'error'
self.assertEqual('error', volume['status'])
def test_backup_reset_status_as_admin(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available',
- 'user_id': 'user',
- 'project_id': 'project'})
- backup = db.backup_create(ctx, {'status': 'available',
- 'size': 1,
- 'volume_id': volume['id'],
- 'user_id': 'user',
- 'project_id': 'project'})
-
- resp = self._issue_backup_reset(ctx,
+ volume = db.volume_create(self.ctx, {'status': 'available',
+ 'user_id': 'user',
+ 'project_id': 'project'})
+ backup = db.backup_create(self.ctx, {'status': 'available',
+ 'size': 1,
+ 'volume_id': volume['id'],
+ 'user_id': 'user',
+ 'project_id': 'project'})
+
+ resp = self._issue_backup_reset(self.ctx,
backup,
{'status': 'error'})
self.assertEqual(403, resp.status_int)
def test_backup_reset_status(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
- backup = db.backup_create(ctx, {'status': 'available',
- 'volume_id': volume['id'],
- 'user_id': 'user',
- 'project_id': 'project'})
-
- resp = self._issue_backup_reset(ctx,
+ volume = db.volume_create(self.ctx,
+ {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1})
+ backup = db.backup_create(self.ctx, {'status': 'available',
+ 'volume_id': volume['id'],
+ 'user_id': 'user',
+ 'project_id': 'project'})
+
+ resp = self._issue_backup_reset(self.ctx,
backup,
{'status': 'error'})
self.assertEqual(202, resp.status_int)
def test_invalid_status_for_backup(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
- backup = db.backup_create(ctx, {'status': 'available',
- 'volume_id': volume['id']})
- resp = self._issue_backup_reset(ctx,
+ volume = db.volume_create(self.ctx,
+ {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1})
+ backup = db.backup_create(self.ctx, {'status': 'available',
+ 'volume_id': volume['id']})
+ resp = self._issue_backup_reset(self.ctx,
backup,
{'status': 'restoring'})
self.assertEqual(400, resp.status_int)
def test_backup_reset_status_with_invalid_backup(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
- backup = db.backup_create(ctx, {'status': 'available',
- 'volume_id': volume['id'],
- 'user_id': 'user',
- 'project_id': 'project'})
+ volume = db.volume_create(self.ctx,
+ {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1})
+ backup = db.backup_create(self.ctx, {'status': 'available',
+ 'volume_id': volume['id'],
+ 'user_id': 'user',
+ 'project_id': 'project'})
backup['id'] = 'fake_id'
- resp = self._issue_backup_reset(ctx,
+ resp = self._issue_backup_reset(self.ctx,
backup,
{'status': 'error'})
self.assertEqual(404, resp.status_int)
def test_malformed_reset_status_body(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available', 'size': 1})
+ volume = db.volume_create(self.ctx, {'status': 'available', 'size': 1})
- resp = self._issue_volume_reset(ctx,
+ resp = self._issue_volume_reset(self.ctx,
volume,
{'x-status': 'bad'})
self.assertEqual(400, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('available', volume['status'])
def test_invalid_status_for_volume(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available', 'size': 1})
- resp = self._issue_volume_reset(ctx,
+ volume = db.volume_create(self.ctx, {'status': 'available', 'size': 1})
+ resp = self._issue_volume_reset(self.ctx,
volume,
{'status': 'invalid'})
self.assertEqual(400, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('available', volume['status'])
def test_reset_status_for_missing_volume(self):
- ctx = context.RequestContext('admin', 'fake', True)
req = webob.Request.blank('/v2/fake/volumes/%s/action' %
'missing-volume-id')
req.method = 'POST'
req.headers['content-type'] = 'application/json'
body = {'os-reset_status': {'status': 'available'}}
req.body = jsonutils.dump_as_bytes(body)
- req.environ['cinder.context'] = ctx
+ req.environ['cinder.context'] = self.ctx
resp = req.get_response(app())
self.assertEqual(404, resp.status_int)
- self.assertRaises(exception.NotFound, db.volume_get, ctx,
+ self.assertRaises(exception.NotFound, db.volume_get, self.ctx,
'missing-volume-id')
def test_reset_attached_status(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1,
- 'attach_status': 'attached'})
+ volume = db.volume_create(self.ctx,
+ {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1,
+ 'attach_status': 'attached'})
- resp = self._issue_volume_reset(ctx,
+ resp = self._issue_volume_reset(self.ctx,
volume,
{'status': 'available',
'attach_status': 'detached'})
self.assertEqual(202, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('detached', volume['attach_status'])
self.assertEqual('available', volume['status'])
def test_invalid_reset_attached_status(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1,
- 'attach_status': 'detached'})
- resp = self._issue_volume_reset(ctx,
+ volume = db.volume_create(self.ctx,
+ {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1,
+ 'attach_status': 'detached'})
+ resp = self._issue_volume_reset(self.ctx,
volume,
{'status': 'available',
'attach_status': 'invalid'})
self.assertEqual(400, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('available', volume['status'])
self.assertEqual('detached', volume['attach_status'])
def test_snapshot_reset_status(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1,
- 'availability_zone': 'test',
- 'attach_status': 'detached'})
+ volume = db.volume_create(self.ctx,
+ {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1,
+ 'availability_zone': 'test',
+ 'attach_status': 'detached'})
kwargs = {
'volume_id': volume['id'],
'cgsnapshot_id': None,
- 'user_id': ctx.user_id,
- 'project_id': ctx.project_id,
+ 'user_id': self.ctx.user_id,
+ 'project_id': self.ctx.project_id,
'status': 'error_deleting',
'progress': '0%',
'volume_size': volume['size'],
'metadata': {}
}
- snapshot = objects.Snapshot(context=ctx, **kwargs)
+ snapshot = objects.Snapshot(context=self.ctx, **kwargs)
snapshot.create()
self.addCleanup(snapshot.destroy)
- resp = self._issue_snapshot_reset(ctx, snapshot, {'status': 'error'})
+ resp = self._issue_snapshot_reset(self.ctx, snapshot,
+ {'status': 'error'})
self.assertEqual(202, resp.status_int)
- snapshot = objects.Snapshot.get_by_id(ctx, snapshot['id'])
+ snapshot = objects.Snapshot.get_by_id(self.ctx, snapshot['id'])
self.assertEqual('error', snapshot.status)
def test_invalid_status_for_snapshot(self):
- ctx = context.RequestContext('admin', 'fake', True)
- volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': '', 'size': 1})
- snapshot = objects.Snapshot(ctx, status='available',
+ volume = db.volume_create(self.ctx,
+ {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1})
+ snapshot = objects.Snapshot(self.ctx, status='available',
volume_id=volume['id'])
snapshot.create()
self.addCleanup(snapshot.destroy)
- resp = self._issue_snapshot_reset(ctx, snapshot,
+ resp = self._issue_snapshot_reset(self.ctx, snapshot,
{'status': 'attaching'})
self.assertEqual(400, resp.status_int)
self.assertEqual('available', snapshot.status)
def test_force_delete(self):
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
# current status is creating
- volume = self._create_volume(ctx, {'size': 1, 'host': None})
+ volume = self._create_volume(self.ctx, {'size': 1, 'host': None})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.body = jsonutils.dump_as_bytes({'os-force_delete': {}})
# attach admin context to request
- req.environ['cinder.context'] = ctx
+ req.environ['cinder.context'] = self.ctx
resp = req.get_response(app())
# request is accepted
self.assertEqual(202, resp.status_int)
# volume is deleted
- self.assertRaises(exception.NotFound, objects.Volume.get_by_id, ctx,
- volume.id)
+ self.assertRaises(exception.NotFound, objects.Volume.get_by_id,
+ self.ctx, volume.id)
@mock.patch.object(volume_api.API, 'delete_snapshot', return_value=True)
@mock.patch('cinder.objects.Snapshot.get_by_id')
@mock.patch.object(db, 'volume_get')
def test_force_delete_snapshot(self, volume_get, snapshot_get, get_by_id,
delete_snapshot):
- ctx = context.RequestContext('admin', 'fake', True)
volume = stubs.stub_volume(1)
snapshot = stubs.stub_snapshot(1)
- snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot)
+ snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot)
volume_get.return_value = volume
snapshot_get.return_value = snapshot
get_by_id.return_value = snapshot_obj
req.headers['content-type'] = 'application/json'
req.body = jsonutils.dump_as_bytes({'os-force_delete': {}})
# attach admin context to request
- req.environ['cinder.context'] = ctx
+ req.environ['cinder.context'] = self.ctx
resp = req.get_response(app())
self.assertEqual(202, resp.status_int)
- def test_force_detach_instance_attached_volume(self):
+ def _migrate_volume_prep(self):
+ # create volume's current host and the destination host
+ db.service_create(self.ctx,
+ {'host': 'test',
+ 'topic': CONF.volume_topic,
+ 'created_at': timeutils.utcnow()})
+ db.service_create(self.ctx,
+ {'host': 'test2',
+ 'topic': CONF.volume_topic,
+ 'created_at': timeutils.utcnow()})
+ # current status is available
+ volume = self._create_volume(self.ctx)
+ return volume
+
+ def _migrate_volume_exec(self, ctx, volume, host, expected_status,
+ force_host_copy=False):
+ # build request to migrate to host
+ req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
+ req.method = 'POST'
+ req.headers['content-type'] = 'application/json'
+ body = {'os-migrate_volume': {'host': host,
+ 'force_host_copy': force_host_copy}}
+ req.body = jsonutils.dump_as_bytes(body)
+ req.environ['cinder.context'] = ctx
+ resp = req.get_response(app())
+ # verify status
+ self.assertEqual(expected_status, resp.status_int)
+ volume = db.volume_get(self.ctx, volume['id'])
+ return volume
+
+ def test_migrate_volume_success(self):
+ expected_status = 202
+ host = 'test2'
+ volume = self._migrate_volume_prep()
+ volume = self._migrate_volume_exec(self.ctx, volume, host,
+ expected_status)
+ self.assertEqual('starting', volume['migration_status'])
+
+ def test_migrate_volume_fail_replication(self):
+ expected_status = 400
+ host = 'test2'
+ volume = self._migrate_volume_prep()
+ # current status is available
+ volume = self._create_volume(self.ctx,
+ {'provider_location': '',
+ 'attach_status': '',
+ 'replication_status': 'active'})
+ volume = self._migrate_volume_exec(self.ctx, volume, host,
+ expected_status)
+
+ def test_migrate_volume_as_non_admin(self):
+ expected_status = 403
+ host = 'test2'
+ ctx = context.RequestContext('fake', 'fake')
+ volume = self._migrate_volume_prep()
+ self._migrate_volume_exec(ctx, volume, host, expected_status)
+
+ def test_migrate_volume_without_host_parameter(self):
+ expected_status = 400
+ host = 'test3'
+ volume = self._migrate_volume_prep()
+ # build request to migrate without host
+ req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
+ req.method = 'POST'
+ req.headers['content-type'] = 'application/json'
+ body = {'os-migrate_volume': {'host': host,
+ 'force_host_copy': False}}
+ req.body = jsonutils.dump_as_bytes(body)
+ req.environ['cinder.context'] = self.ctx
+ resp = req.get_response(app())
+ # verify status
+ self.assertEqual(expected_status, resp.status_int)
+
+ def test_migrate_volume_host_no_exist(self):
+ expected_status = 400
+ host = 'test3'
+ volume = self._migrate_volume_prep()
+ self._migrate_volume_exec(self.ctx, volume, host, expected_status)
+
+ def test_migrate_volume_same_host(self):
+ expected_status = 400
+ host = 'test'
+ volume = self._migrate_volume_prep()
+ self._migrate_volume_exec(self.ctx, volume, host, expected_status)
+
+ def test_migrate_volume_migrating(self):
+ expected_status = 400
+ host = 'test2'
+ volume = self._migrate_volume_prep()
+ model_update = {'migration_status': 'migrating'}
+ volume = db.volume_update(self.ctx, volume['id'], model_update)
+ self._migrate_volume_exec(self.ctx, volume, host, expected_status)
+
+ def test_migrate_volume_with_snap(self):
+ expected_status = 400
+ host = 'test2'
+ volume = self._migrate_volume_prep()
+ snap = objects.Snapshot(self.ctx, volume_id=volume['id'])
+ snap.create()
+ self.addCleanup(snap.destroy)
+ self._migrate_volume_exec(self.ctx, volume, host, expected_status)
+
+ def test_migrate_volume_bad_force_host_copy(self):
+ expected_status = 400
+ host = 'test2'
+ volume = self._migrate_volume_prep()
+ self._migrate_volume_exec(self.ctx, volume, host, expected_status,
+ force_host_copy='foo')
+
+ def _migrate_volume_comp_exec(self, ctx, volume, new_volume, error,
+ expected_status, expected_id, no_body=False):
+ req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
+ req.method = 'POST'
+ req.headers['content-type'] = 'application/json'
+ body = {'new_volume': new_volume['id'], 'error': error}
+ if no_body:
+ body = {'': body}
+ else:
+ body = {'os-migrate_volume_completion': body}
+ req.body = jsonutils.dump_as_bytes(body)
+ req.environ['cinder.context'] = ctx
+ resp = req.get_response(app())
+ resp_dict = resp.json
+ # verify status
+ self.assertEqual(expected_status, resp.status_int)
+ if expected_id:
+ self.assertEqual(expected_id, resp_dict['save_volume_id'])
+ else:
+ self.assertNotIn('save_volume_id', resp_dict)
+
+ def test_migrate_volume_comp_as_non_admin(self):
+ volume = db.volume_create(self.ctx, {'id': 'fake1'})
+ new_volume = db.volume_create(self.ctx, {'id': 'fake2'})
+ expected_status = 403
+ expected_id = None
+ ctx = context.RequestContext('fake', 'fake')
+ self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
+ expected_status, expected_id)
+
+ def test_migrate_volume_comp_no_mig_status(self):
+ volume1 = self._create_volume(self.ctx, {'migration_status': 'foo'})
+ volume2 = self._create_volume(self.ctx, {'migration_status': None})
+
+ expected_status = 400
+ expected_id = None
+ self._migrate_volume_comp_exec(self.ctx, volume1, volume2, False,
+ expected_status, expected_id)
+ self._migrate_volume_comp_exec(self.ctx, volume2, volume1, False,
+ expected_status, expected_id)
+
+ def test_migrate_volume_comp_bad_mig_status(self):
+ volume1 = self._create_volume(self.ctx,
+ {'migration_status': 'migrating'})
+ volume2 = self._create_volume(self.ctx,
+ {'migration_status': 'target:foo'})
+ expected_status = 400
+ expected_id = None
+ self._migrate_volume_comp_exec(self.ctx, volume1, volume2, False,
+ expected_status, expected_id)
+
+ def test_migrate_volume_comp_no_action(self):
+ volume = db.volume_create(self.ctx, {'id': 'fake1'})
+ new_volume = db.volume_create(self.ctx, {'id': 'fake2'})
+ expected_status = 400
+ expected_id = None
+ ctx = context.RequestContext('fake', 'fake')
+ self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
+ expected_status, expected_id, True)
+
+ def test_migrate_volume_comp_from_nova(self):
+ volume = self._create_volume(self.ctx, {'status': 'in-use',
+ 'migration_status': None,
+ 'attach_status': 'attached'})
+ new_volume = self._create_volume(self.ctx,
+ {'migration_status': None,
+ 'attach_status': 'detached'})
+ expected_status = 200
+ expected_id = new_volume.id
+ self._migrate_volume_comp_exec(self.ctx, volume, new_volume, False,
+ expected_status, expected_id)
+
+ def test_backup_reset_valid_updates(self):
+ vac = admin_actions.BackupAdminController()
+ vac.validate_update({'status': 'available'})
+ vac.validate_update({'status': 'error'})
+ self.assertRaises(exc.HTTPBadRequest,
+ vac.validate_update,
+ {'status': 'restoring'})
+ self.assertRaises(exc.HTTPBadRequest,
+ vac.validate_update,
+ {'status': 'creating'})
+
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ @mock.patch('cinder.backup.api.API._check_support_to_force_delete')
+ def _force_delete_backup_util(self, test_status, mock_check_support,
+ _mock_service_get_all_by_topic):
+ _mock_service_get_all_by_topic.return_value = [
+ {'availability_zone': "az1", 'host': 'testhost',
+ 'disabled': 0, 'updated_at': timeutils.utcnow()}]
+ # admin context
+ mock_check_support.return_value = True
+ # current status is dependent on argument: test_status.
+ id = test_backups.BackupsAPITestCase._create_backup(status=test_status)
+ req = webob.Request.blank('/v2/fake/backups/%s/action' % id)
+ req.method = 'POST'
+ req.headers['Content-Type'] = 'application/json'
+ req.body = jsonutils.dump_as_bytes({'os-force_delete': {}})
+ req.environ['cinder.context'] = self.ctx
+ res = req.get_response(app())
+
+ self.assertEqual(202, res.status_int)
+ self.assertEqual('deleting',
+ test_backups.BackupsAPITestCase.
+ _get_backup_attrib(id, 'status'))
+ db.backup_destroy(self.ctx, id)
+
+ def test_delete_backup_force_when_creating(self):
+ self._force_delete_backup_util('creating')
+
+ def test_delete_backup_force_when_deleting(self):
+ self._force_delete_backup_util('deleting')
+
+ def test_delete_backup_force_when_restoring(self):
+ self._force_delete_backup_util('restoring')
+
+ def test_delete_backup_force_when_available(self):
+ self._force_delete_backup_util('available')
+
+ def test_delete_backup_force_when_error(self):
+ self._force_delete_backup_util('error')
+
+ def test_delete_backup_force_when_error_deleting(self):
+ self._force_delete_backup_util('error_deleting')
+
+ @mock.patch('cinder.backup.rpcapi.BackupAPI.check_support_to_force_delete',
+ return_value=False)
+ def test_delete_backup_force_when_not_supported(self, mock_check_support):
# admin context
- ctx = context.RequestContext('admin', 'fake', True)
+ self.override_config('backup_driver', 'cinder.backup.drivers.ceph')
+ id = test_backups.BackupsAPITestCase._create_backup()
+ req = webob.Request.blank('/v2/fake/backups/%s/action' % id)
+ req.method = 'POST'
+ req.headers['Content-Type'] = 'application/json'
+ req.body = jsonutils.dump_as_bytes({'os-force_delete': {}})
+ req.environ['cinder.context'] = self.ctx
+ res = req.get_response(app())
+ self.assertEqual(405, res.status_int)
+
+
+class AdminActionsAttachDetachTest(BaseAdminTest):
+ def setUp(self):
+ super(AdminActionsAttachDetachTest, self).setUp()
+ # start service to handle rpc messages for attach requests
+ self.svc = self.start_service('volume', host='test')
+
+ def tearDown(self):
+ self.svc.stop()
+ super(AdminActionsAttachDetachTest, self).tearDown()
+
+ def test_force_detach_instance_attached_volume(self):
# current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'size': 1})
+ volume = self._create_volume(self.ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
- # start service to handle rpc messages for attach requests
- svc = self.start_service('volume', host='test')
- self.addCleanup(svc.stop)
- self.volume_api.reserve_volume(ctx, volume)
+
+ self.volume_api.reserve_volume(self.ctx, volume)
mountpoint = '/dev/vbd'
- attachment = self.volume_api.attach(ctx, volume, stubs.FAKE_UUID,
+ attachment = self.volume_api.attach(self.ctx, volume, stubs.FAKE_UUID,
None, mountpoint, 'rw')
# volume is attached
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('in-use', volume['status'])
self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertEqual('False', admin_metadata[0]['value'])
self.assertEqual('attached_mode', admin_metadata[1]['key'])
self.assertEqual('rw', admin_metadata[1]['value'])
- conn_info = self.volume_api.initialize_connection(ctx,
+ conn_info = self.volume_api.initialize_connection(self.ctx,
volume,
connector)
self.assertEqual('rw', conn_info['data']['access_mode'])
'connector': connector}}
req.body = jsonutils.dump_as_bytes(body)
# attach admin context to request
- req.environ['cinder.context'] = ctx
+ req.environ['cinder.context'] = self.ctx
# make request
resp = req.get_response(app())
# request is accepted
self.assertEqual(202, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertRaises(exception.VolumeAttachmentNotFound,
db.volume_attachment_get,
- ctx, attachment['id'])
+ self.ctx, attachment['id'])
# status changed to 'available'
self.assertEqual('available', volume['status'])
self.assertEqual('False', admin_metadata[0]['value'])
def test_force_detach_host_attached_volume(self):
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'size': 1})
+ volume = self._create_volume(self.ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
- # start service to handle rpc messages for attach requests
- svc = self.start_service('volume', host='test')
- self.addCleanup(svc.stop)
- self.volume_api.initialize_connection(ctx, volume, connector)
+
+ self.volume_api.initialize_connection(self.ctx, volume, connector)
mountpoint = '/dev/vbd'
host_name = 'fake-host'
- attachment = self.volume_api.attach(ctx, volume, None, host_name,
+ attachment = self.volume_api.attach(self.ctx, volume, None, host_name,
mountpoint, 'ro')
# volume is attached
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('in-use', volume['status'])
self.assertIsNone(attachment['instance_uuid'])
self.assertEqual(host_name, attachment['attached_host'])
self.assertEqual('False', admin_metadata[0]['value'])
self.assertEqual('attached_mode', admin_metadata[1]['key'])
self.assertEqual('ro', admin_metadata[1]['value'])
- conn_info = self.volume_api.initialize_connection(ctx,
+ conn_info = self.volume_api.initialize_connection(self.ctx,
volume, connector)
self.assertEqual('ro', conn_info['data']['access_mode'])
# build request to force detach
'connector': connector}}
req.body = jsonutils.dump_as_bytes(body)
# attach admin context to request
- req.environ['cinder.context'] = ctx
+ req.environ['cinder.context'] = self.ctx
# make request
resp = req.get_response(app())
# request is accepted
self.assertEqual(202, resp.status_int)
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertRaises(exception.VolumeAttachmentNotFound,
db.volume_attachment_get,
- ctx, attachment['id'])
+ self.ctx, attachment['id'])
# status changed to 'available'
self.assertEqual('available', volume['status'])
admin_metadata = volume['volume_admin_metadata']
self.assertEqual('False', admin_metadata[0]['value'])
def test_volume_force_detach_raises_remote_error(self):
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'size': 1})
+ volume = self._create_volume(self.ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
- # start service to handle rpc messages for attach requests
- svc = self.start_service('volume', host='test')
- self.addCleanup(svc.stop)
- self.volume_api.reserve_volume(ctx, volume)
+
+ self.volume_api.reserve_volume(self.ctx, volume)
mountpoint = '/dev/vbd'
- attachment = self.volume_api.attach(ctx, volume, stubs.FAKE_UUID,
+ attachment = self.volume_api.attach(self.ctx, volume, stubs.FAKE_UUID,
None, mountpoint, 'rw')
# volume is attached
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('in-use', volume['status'])
self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertEqual('False', admin_metadata[0]['value'])
self.assertEqual('attached_mode', admin_metadata[1]['key'])
self.assertEqual('rw', admin_metadata[1]['value'])
- conn_info = self.volume_api.initialize_connection(ctx,
+ conn_info = self.volume_api.initialize_connection(self.ctx,
volume,
connector)
self.assertEqual('rw', conn_info['data']['access_mode'])
body = {'os-force_detach': {'attachment_id': 'fake'}}
req.body = jsonutils.dump_as_bytes(body)
# attach admin context to request
- req.environ['cinder.context'] = ctx
+ req.environ['cinder.context'] = self.ctx
# make request
resp = req.get_response(app())
self.assertEqual(400, resp.status_int)
body = {'os-force_detach': {'attachment_id': 'fake'}}
req.body = jsonutils.dump_as_bytes(body)
# attach admin context to request
- req.environ['cinder.context'] = ctx
+ req.environ['cinder.context'] = self.ctx
# make request
self.assertRaises(messaging.RemoteError,
req.get_response,
req.body = jsonutils.dump_as_bytes(body)
# attach admin context to request
- req.environ['cinder.context'] = ctx
+ req.environ['cinder.context'] = self.ctx
# make request
self.assertRaises(messaging.RemoteError,
req.get_response,
def test_volume_force_detach_raises_db_error(self):
# In case of DB error 500 error code is returned to user
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'size': 1})
+ volume = self._create_volume(self.ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
- # start service to handle rpc messages for attach requests
- svc = self.start_service('volume', host='test')
- self.addCleanup(svc.stop)
- self.volume_api.reserve_volume(ctx, volume)
+
+ self.volume_api.reserve_volume(self.ctx, volume)
mountpoint = '/dev/vbd'
- attachment = self.volume_api.attach(ctx, volume, stubs.FAKE_UUID,
+ attachment = self.volume_api.attach(self.ctx, volume, stubs.FAKE_UUID,
None, mountpoint, 'rw')
# volume is attached
- volume = db.volume_get(ctx, volume['id'])
+ volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('in-use', volume['status'])
self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertEqual('False', admin_metadata[0]['value'])
self.assertEqual('attached_mode', admin_metadata[1]['key'])
self.assertEqual('rw', admin_metadata[1]['value'])
- conn_info = self.volume_api.initialize_connection(ctx,
+ conn_info = self.volume_api.initialize_connection(self.ctx,
volume,
connector)
self.assertEqual('rw', conn_info['data']['access_mode'])
# build request to force detach
- volume_remote_error = \
- messaging.RemoteError(exc_type='DBError')
+ volume_remote_error = messaging.RemoteError(exc_type='DBError')
with mock.patch.object(volume_api.API, 'detach',
side_effect=volume_remote_error):
req = webob.Request.blank('/v2/fake/volumes/%s/action' %
'connector': connector}}
req.body = jsonutils.dump_as_bytes(body)
# attach admin context to request
- req.environ['cinder.context'] = ctx
+ req.environ['cinder.context'] = self.ctx
# make request
self.assertRaises(messaging.RemoteError,
req.get_response,
def test_attach_in_used_volume_by_instance(self):
"""Test that attaching to an in-use volume fails."""
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'size': 1})
+ volume = self._create_volume(self.ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
- # start service to handle rpc messages for attach requests
- svc = self.start_service('volume', host='test')
- self.addCleanup(svc.stop)
- self.volume_api.reserve_volume(ctx, volume)
- conn_info = self.volume_api.initialize_connection(ctx,
+ self.volume_api.reserve_volume(self.ctx, volume)
+ conn_info = self.volume_api.initialize_connection(self.ctx,
volume, connector)
- self.volume_api.attach(ctx, volume, fakes.get_fake_uuid(), None,
+ self.volume_api.attach(self.ctx, volume, fakes.get_fake_uuid(), None,
'/dev/vbd0', 'rw')
self.assertEqual('rw', conn_info['data']['access_mode'])
self.assertRaises(exception.InvalidVolume,
self.volume_api.attach,
- ctx,
+ self.ctx,
volume,
fakes.get_fake_uuid(),
None,
def test_attach_in_used_volume_by_host(self):
"""Test that attaching to an in-use volume fails."""
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'size': 1})
+ volume = self._create_volume(self.ctx, {'provider_location': '',
+ 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
- # start service to handle rpc messages for attach requests
- svc = self.start_service('volume', host='test')
- self.addCleanup(svc.stop)
- self.volume_api.reserve_volume(ctx, volume)
- self.volume_api.initialize_connection(ctx, volume, connector)
- self.volume_api.attach(ctx, volume, None, 'fake_host1',
+
+ self.volume_api.reserve_volume(self.ctx, volume)
+ self.volume_api.initialize_connection(self.ctx, volume, connector)
+ self.volume_api.attach(self.ctx, volume, None, 'fake_host1',
'/dev/vbd0', 'rw')
- conn_info = self.volume_api.initialize_connection(ctx,
+ conn_info = self.volume_api.initialize_connection(self.ctx,
volume, connector)
conn_info['data']['access_mode'] = 'rw'
self.assertRaises(exception.InvalidVolume,
self.volume_api.attach,
- ctx,
+ self.ctx,
volume,
None,
'fake_host2',
def test_invalid_iscsi_connector(self):
"""Test connector without the initiator (required by iscsi driver)."""
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'size': 1})
+ volume = self._create_volume(self.ctx, {'provider_location': '',
+ 'size': 1})
connector = {}
- # start service to handle rpc messages for attach requests
- svc = self.start_service('volume', host='test')
- self.addCleanup(svc.stop)
+
self.assertRaises(exception.InvalidInput,
self.volume_api.initialize_connection,
- ctx, volume, connector)
+ self.ctx, volume, connector)
def test_attach_attaching_volume_with_different_instance(self):
"""Test that attaching volume reserved for another instance fails."""
- ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'size': 1})
- # start service to handle rpc messages for attach requests
- svc = self.start_service('volume', host='test')
- self.addCleanup(svc.stop)
- self.volume_api.reserve_volume(ctx, volume)
+ volume = self._create_volume(self.ctx, {'provider_location': '',
+ 'size': 1})
+
+ self.volume_api.reserve_volume(self.ctx, volume)
values = {'volume_id': volume['id'],
'attach_status': 'attaching',
'attach_time': timeutils.utcnow(),
'instance_uuid': 'abc123',
}
- db.volume_attach(ctx, values)
- db.volume_admin_metadata_update(ctx, volume['id'],
+ db.volume_attach(self.ctx, values)
+ db.volume_admin_metadata_update(self.ctx, volume['id'],
{"attached_mode": 'rw'}, False)
mountpoint = '/dev/vbd'
- attachment = self.volume_api.attach(ctx, volume,
+ attachment = self.volume_api.attach(self.ctx, volume,
stubs.FAKE_UUID, None,
mountpoint, 'rw')
def test_attach_attaching_volume_with_different_mode(self):
"""Test that attaching volume reserved for another mode fails."""
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'size': 1})
- # start service to handle rpc messages for attach requests
- svc = self.start_service('volume', host='test')
- self.addCleanup(svc.stop)
+ volume = self._create_volume(self.ctx, {'provider_location': '',
+ 'size': 1})
+
values = {'status': 'attaching',
'instance_uuid': fakes.get_fake_uuid()}
- db.volume_update(ctx, volume['id'], values)
- db.volume_admin_metadata_update(ctx, volume['id'],
+ db.volume_update(self.ctx, volume['id'], values)
+ db.volume_admin_metadata_update(self.ctx, volume['id'],
{"attached_mode": 'rw'}, False)
mountpoint = '/dev/vbd'
self.assertRaises(exception.InvalidVolume,
self.volume_api.attach,
- ctx,
+ self.ctx,
volume,
values['instance_uuid'],
None,
mountpoint,
'ro')
-
- def _migrate_volume_prep(self):
- admin_ctx = context.get_admin_context()
- # create volume's current host and the destination host
- db.service_create(admin_ctx,
- {'host': 'test',
- 'topic': CONF.volume_topic,
- 'created_at': timeutils.utcnow()})
- db.service_create(admin_ctx,
- {'host': 'test2',
- 'topic': CONF.volume_topic,
- 'created_at': timeutils.utcnow()})
- # current status is available
- volume = self._create_volume(admin_ctx)
- return volume
-
- def _migrate_volume_exec(self, ctx, volume, host, expected_status,
- force_host_copy=False):
- admin_ctx = context.get_admin_context()
- # build request to migrate to host
- req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume.id)
- req.method = 'POST'
- req.headers['content-type'] = 'application/json'
- body = {'os-migrate_volume': {'host': host,
- 'force_host_copy': force_host_copy}}
- req.body = jsonutils.dump_as_bytes(body)
- req.environ['cinder.context'] = ctx
- resp = req.get_response(app())
- # verify status
- self.assertEqual(expected_status, resp.status_int)
- volume = objects.Volume.get_by_id(admin_ctx, volume.id)
- return volume
-
- def test_migrate_volume_success(self):
- expected_status = 202
- host = 'test2'
- ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_prep()
- volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
- self.assertEqual('starting', volume['migration_status'])
-
- def test_migrate_volume_fail_replication(self):
- expected_status = 400
- host = 'test2'
- ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_prep()
- # current status is available
- volume = self._create_volume(ctx, {'provider_location': '',
- 'attach_status': '',
- 'replication_status': 'active'})
- volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
-
- def test_migrate_volume_as_non_admin(self):
- expected_status = 403
- host = 'test2'
- ctx = context.RequestContext('fake', 'fake')
- volume = self._migrate_volume_prep()
- self._migrate_volume_exec(ctx, volume, host, expected_status)
-
- def test_migrate_volume_without_host_parameter(self):
- expected_status = 400
- host = 'test3'
- ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_prep()
- # build request to migrate without host
- req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
- req.method = 'POST'
- req.headers['content-type'] = 'application/json'
- body = {'os-migrate_volume': {'host': host,
- 'force_host_copy': False}}
- req.body = jsonutils.dump_as_bytes(body)
- req.environ['cinder.context'] = ctx
- resp = req.get_response(app())
- # verify status
- self.assertEqual(expected_status, resp.status_int)
-
- def test_migrate_volume_host_no_exist(self):
- expected_status = 400
- host = 'test3'
- ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_prep()
- self._migrate_volume_exec(ctx, volume, host, expected_status)
-
- def test_migrate_volume_same_host(self):
- expected_status = 400
- host = 'test'
- ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_prep()
- self._migrate_volume_exec(ctx, volume, host, expected_status)
-
- def test_migrate_volume_migrating(self):
- expected_status = 400
- host = 'test2'
- ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_prep()
- model_update = {'migration_status': 'migrating'}
- volume = db.volume_update(ctx, volume['id'], model_update)
- self._migrate_volume_exec(ctx, volume, host, expected_status)
-
- def test_migrate_volume_with_snap(self):
- expected_status = 400
- host = 'test2'
- ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_prep()
- snap = objects.Snapshot(ctx, volume_id=volume['id'])
- snap.create()
- self.addCleanup(snap.destroy)
- self._migrate_volume_exec(ctx, volume, host, expected_status)
-
- def test_migrate_volume_bad_force_host_copy(self):
- expected_status = 400
- host = 'test2'
- ctx = context.RequestContext('admin', 'fake', True)
- volume = self._migrate_volume_prep()
- self._migrate_volume_exec(ctx, volume, host, expected_status,
- force_host_copy='foo')
-
- def _migrate_volume_comp_exec(self, ctx, volume, new_volume, error,
- expected_status, expected_id, no_body=False):
- req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
- req.method = 'POST'
- req.headers['content-type'] = 'application/json'
- body = {'new_volume': new_volume['id'], 'error': error}
- if no_body:
- body = {'': body}
- else:
- body = {'os-migrate_volume_completion': body}
- req.body = jsonutils.dump_as_bytes(body)
- req.environ['cinder.context'] = ctx
- resp = req.get_response(app())
- resp_dict = resp.json
- # verify status
- self.assertEqual(expected_status, resp.status_int)
- if expected_id:
- self.assertEqual(expected_id, resp_dict['save_volume_id'])
- else:
- self.assertNotIn('save_volume_id', resp_dict)
-
- def test_migrate_volume_comp_as_non_admin(self):
- admin_ctx = context.get_admin_context()
- volume = db.volume_create(admin_ctx, {'id': 'fake1'})
- new_volume = db.volume_create(admin_ctx, {'id': 'fake2'})
- expected_status = 403
- expected_id = None
- ctx = context.RequestContext('fake', 'fake')
- self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
- expected_status, expected_id)
-
- def test_migrate_volume_comp_no_mig_status(self):
- admin_ctx = context.get_admin_context()
- volume1 = self._create_volume(admin_ctx, {'migration_status': 'foo'})
- volume2 = self._create_volume(admin_ctx, {'migration_status': None})
-
- expected_status = 400
- expected_id = None
- ctx = context.RequestContext('admin', 'fake', True)
- self._migrate_volume_comp_exec(ctx, volume1, volume2, False,
- expected_status, expected_id)
- self._migrate_volume_comp_exec(ctx, volume2, volume1, False,
- expected_status, expected_id)
-
- def test_migrate_volume_comp_bad_mig_status(self):
- admin_ctx = context.get_admin_context()
- volume1 = self._create_volume(admin_ctx,
- {'migration_status': 'migrating'})
- volume2 = self._create_volume(admin_ctx,
- {'migration_status': 'target:foo'})
- expected_status = 400
- expected_id = None
- ctx = context.RequestContext('admin', 'fake', True)
- self._migrate_volume_comp_exec(ctx, volume1, volume2, False,
- expected_status, expected_id)
-
- def test_migrate_volume_comp_no_action(self):
- admin_ctx = context.get_admin_context()
- volume = db.volume_create(admin_ctx, {'id': 'fake1'})
- new_volume = db.volume_create(admin_ctx, {'id': 'fake2'})
- expected_status = 400
- expected_id = None
- ctx = context.RequestContext('fake', 'fake')
- self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
- expected_status, expected_id, True)
-
- def test_migrate_volume_comp_from_nova(self):
- admin_ctx = context.get_admin_context()
- volume = self._create_volume(admin_ctx, {'status': 'in-use',
- 'migration_status': None,
- 'attach_status': 'attached'})
- new_volume = self._create_volume(admin_ctx,
- {'migration_status': None,
- 'attach_status': 'detached'})
- expected_status = 200
- expected_id = new_volume.id
- ctx = context.RequestContext('admin', 'fake', True)
- self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
- expected_status, expected_id)
-
- def test_backup_reset_valid_updates(self):
- vac = admin_actions.BackupAdminController()
- vac.validate_update({'status': 'available'})
- vac.validate_update({'status': 'error'})
- self.assertRaises(exc.HTTPBadRequest,
- vac.validate_update,
- {'status': 'restoring'})
- self.assertRaises(exc.HTTPBadRequest,
- vac.validate_update,
- {'status': 'creating'})
-
- @mock.patch('cinder.db.service_get_all_by_topic')
- @mock.patch('cinder.backup.api.API._check_support_to_force_delete')
- def _force_delete_backup_util(self, test_status, mock_check_support,
- _mock_service_get_all_by_topic):
- _mock_service_get_all_by_topic.return_value = [
- {'availability_zone': "az1", 'host': 'testhost',
- 'disabled': 0, 'updated_at': timeutils.utcnow()}]
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
- mock_check_support.return_value = True
- # current status is dependent on argument: test_status.
- id = test_backups.BackupsAPITestCase._create_backup(status=test_status)
- req = webob.Request.blank('/v2/fake/backups/%s/action' % id)
- req.method = 'POST'
- req.headers['Content-Type'] = 'application/json'
- req.body = jsonutils.dump_as_bytes({'os-force_delete': {}})
- req.environ['cinder.context'] = ctx
- res = req.get_response(app())
-
- self.assertEqual(202, res.status_int)
- self.assertEqual('deleting',
- test_backups.BackupsAPITestCase.
- _get_backup_attrib(id, 'status'))
- db.backup_destroy(context.get_admin_context(), id)
-
- def test_delete_backup_force_when_creating(self):
- self._force_delete_backup_util('creating')
-
- def test_delete_backup_force_when_deleting(self):
- self._force_delete_backup_util('deleting')
-
- def test_delete_backup_force_when_restoring(self):
- self._force_delete_backup_util('restoring')
-
- def test_delete_backup_force_when_available(self):
- self._force_delete_backup_util('available')
-
- def test_delete_backup_force_when_error(self):
- self._force_delete_backup_util('error')
-
- def test_delete_backup_force_when_error_deleting(self):
- self._force_delete_backup_util('error_deleting')
-
- @mock.patch('cinder.backup.rpcapi.BackupAPI.check_support_to_force_delete',
- return_value=False)
- def test_delete_backup_force_when_not_supported(self, mock_check_support):
- # admin context
- ctx = context.RequestContext('admin', 'fake', True)
- self.override_config('backup_driver', 'cinder.backup.drivers.ceph')
- id = test_backups.BackupsAPITestCase._create_backup()
- req = webob.Request.blank('/v2/fake/backups/%s/action' % id)
- req.method = 'POST'
- req.headers['Content-Type'] = 'application/json'
- req.body = jsonutils.dump_as_bytes({'os-force_delete': {}})
- req.environ['cinder.context'] = ctx
- res = req.get_response(app())
- self.assertEqual(405, res.status_int)