import contextlib
+import mock
from oslo.utils import units
from oslo_concurrency import processutils
self.driver.extend_volume(fake_vol, fake_size)
self.mox.VerifyAll()
+
+ def test_create_volume_from_snapshot(self):
+ fake_name = u'volume-00000001'
+ fake_size = '10'
+ fake_vol = {'project_id': 'testprjid', 'name': fake_name,
+ 'size': fake_size,
+ 'id': 'a720b3c0-d1f0-11e1-9b23-0800200c9a66'}
+
+ ss_uuid = '00000000-0000-0000-0000-c3aa7ee01536'
+ fake_snapshot = {'volume_name': fake_name,
+ 'name': 'volume-%s' % ss_uuid,
+ 'id': ss_uuid,
+ 'size': fake_size}
+
+ with mock.patch.object(SheepdogDriver, '_try_execute') as mock_exe:
+ self.driver.create_volume_from_snapshot(fake_vol, fake_snapshot)
+ args = ['qemu-img', 'create', '-b',
+ "sheepdog:%s:%s" % (fake_snapshot['volume_name'],
+ fake_snapshot['name']),
+ "sheepdog:%s" % fake_vol['name'],
+ "%sG" % fake_vol['size']]
+ mock_exe.assert_called_once_with(*args)
self._try_execute('qemu-img', 'create', '-b',
"sheepdog:%s:%s" % (snapshot['volume_name'],
snapshot['name']),
- "sheepdog:%s" % volume['name'])
+ "sheepdog:%s" % volume['name'],
+ '%sG' % volume['size'])
def delete_volume(self, volume):
"""Delete a logical volume."""