self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
return rsrc
+ def _mock_create_volume(self, fv, stack_name):
+ clients.OpenStackClients.cinder().MultipleTimes().AndReturn(
+ self.cinder_fc)
+ vol_name = utils.PhysName(stack_name, 'DataVolume')
+ self.cinder_fc.volumes.create(
+ size=u'1', availability_zone='nova',
+ display_description=vol_name,
+ display_name=vol_name).AndReturn(fv)
+
+ def _mock_create_server_volume_script(self, fva):
+ clients.OpenStackClients.nova().MultipleTimes().AndReturn(self.fc)
+ self.fc.volumes.create_server_volume(
+ device=u'/dev/vdc', server_id=u'WikiDatabase',
+ volume_id=u'vol-123').AndReturn(fva)
+ self.cinder_fc.volumes.get('vol-123').AndReturn(fva)
+
def test_volume(self):
fv = FakeVolume('creating', 'available')
stack_name = 'test_volume_stack'
self.m.VerifyAll()
+ def test_volume_detach_with_latency(self):
+ fv = FakeVolume('creating', 'available')
+ fva = FakeVolume('attaching', 'in-use')
+ stack_name = 'test_volume_attach_stack'
+
+ self._mock_create_volume(fv, stack_name)
+
+ self._mock_create_server_volume_script(fva)
+
+ # delete script
+ volume_detach_cycle = 'in-use', 'detaching', 'available'
+ fva = FakeLatencyVolume(life_cycle=volume_detach_cycle)
+ self.fc.volumes.delete_server_volume(
+ 'WikiDatabase', 'vol-123').MultipleTimes().AndReturn(None)
+ self.cinder_fc.volumes.get('vol-123').AndReturn(fva)
+
+ self.m.ReplayAll()
+
+ t = template_format.parse(volume_template)
+ t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
+ stack = parse_stack(t, stack_name=stack_name)
+
+ scheduler.TaskRunner(stack['DataVolume'].create)()
+ self.assertEqual(fv.status, 'available')
+ rsrc = self.create_attachment(t, stack, 'MountPoint')
+
+ self.assertEqual(rsrc.delete(), None)
+
+ self.m.VerifyAll()
+
+ def test_volume_detach_with_error(self):
+ fv = FakeVolume('creating', 'available')
+ fva = FakeVolume('attaching', 'in-use')
+ stack_name = 'test_volume_attach_stack'
+
+ self._mock_create_volume(fv, stack_name)
+
+ self._mock_create_server_volume_script(fva)
+
+ # delete script
+ fva = FakeVolume('in-use', 'error')
+ self.fc.volumes.delete_server_volume('WikiDatabase',
+ 'vol-123').AndReturn(None)
+ self.cinder_fc.volumes.get('vol-123').AndReturn(fva)
+
+ self.m.ReplayAll()
+
+ t = template_format.parse(volume_template)
+ t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
+ stack = parse_stack(t, stack_name=stack_name)
+
+ scheduler.TaskRunner(stack['DataVolume'].create)()
+ self.assertEqual(fv.status, 'available')
+ rsrc = self.create_attachment(t, stack, 'MountPoint')
+ detach_task = scheduler.TaskRunner(rsrc.delete)
+
+ self.assertRaises(exception.ResourceFailure, detach_task)
+
+ self.m.VerifyAll()
+
@skipIf(volume_backups is None, 'unable to import volume_backups')
def test_snapshot(self):
stack_name = 'test_volume_stack'
pass
+class FakeLatencyVolume(object):
+ status = 'attaching'
+ id = 'vol-123'
+
+ def __init__(self, life_cycle=('creating', 'available'), **attrs):
+ if not isinstance(life_cycle, tuple):
+ raise exception.Error('life_cycle need to be a tuple.')
+ if not len(life_cycle):
+ raise exception.Error('life_cycle should not be an empty tuple.')
+ self.life_cycle = iter(life_cycle)
+ self.status = next(self.life_cycle)
+ for key, value in attrs.iteritems():
+ setattr(self, key, value)
+
+ def get(self):
+ self.status = next(self.life_cycle)
+
+ def update(self, **kw):
+ pass
+
+
class FakeBackup(FakeVolume):
status = 'creating'
id = 'backup-123'