import ast
import tempfile
-import time
import webob
from oslo.config import cfg
from cinder.tests.api.v2 import stubs
from cinder.tests import cast_as_call
from cinder.volume import api as volume_api
-from cinder.volume import utils as volutils
CONF = cfg.CONF
self.assertRaises(exception.NotFound, db.volume_get, ctx, volume['id'])
def test_force_delete_snapshot(self):
- self.stubs.Set(volutils, 'clear_volume',
- lambda a, b, volume_clear=CONF.volume_clear,
- volume_clear_size=CONF.volume_clear_size: None)
- # admin context
ctx = context.RequestContext('admin', 'fake', True)
- # current status is creating
- volume = db.volume_create(ctx, {'host': 'test', 'size': 1})
- snapshot = db.snapshot_create(ctx, {'status': 'creating',
- 'volume_size': 1,
- 'volume_id': volume['id']})
+ snapshot = stubs.stub_snapshot(1, host='foo')
+ self.stubs.Set(db, 'volume_get', lambda x, y: snapshot)
+ self.stubs.Set(db, 'snapshot_get', lambda x, y: snapshot)
+ self.stubs.Set(volume_api.API, 'delete_snapshot', lambda *x, **y: True)
path = '/v2/fake/snapshots/%s/action' % snapshot['id']
req = webob.Request.blank(path)
req.method = 'POST'
req.body = jsonutils.dumps({'os-force_delete': {}})
# attach admin context to request
req.environ['cinder.context'] = ctx
- # start service to handle rpc.cast for 'delete snapshot'
- svc = self.start_service('volume', host='test')
-
- cast_as_call.mock_cast_as_call(svc.manager.scheduler_rpcapi.client)
-
- # NOTE(flaper87): Instead fo patch `os.path.exists`
- # create a fake path for the snapshot that should
- # be deleted and let the check pass
- def local_path(volume, vg=None):
- tfile = tempfile.mkstemp(suffix='-cow', dir=self.tempdir)
- # NOTE(flaper87): Strip `-cow` since it'll be added
- # later in the happy path.
- return tfile[1].strip('-cow')
-
- self.stubs.Set(svc.manager.driver, "local_path", local_path)
- # make request
resp = req.get_response(app())
-
- # NOTE(flaper87): Since we're using a nested service
- # lets make sure we yield the control over the service
- # thread so it can process the recent calls.
- time.sleep(0.6)
-
- # Request is accepted
self.assertEqual(resp.status_int, 202)
- # snapshot is deleted
- self.assertRaises(exception.NotFound, db.snapshot_get, ctx,
- snapshot['id'])
- # cleanup
- svc.stop()
def test_force_detach_instance_attached_volume(self):
# admin context