if not os.path.exists(self.images_dir):
os.mkdir(self.images_dir)
self.image_id = '70a599e0-31e7-49b7-b260-868f441e862b'
- self.image_path = self.images_dir + "/" + self.image_id
- utils.execute('truncate', '-s', 1024, self.image_path)
self.driver = GPFSDriver(configuration=conf.Configuration(None))
self.driver.set_execute(self._execute_wrapper)
self._fake_is_gpfs_parent)
self.stubs.Set(GPFSDriver, '_delete_gpfs_file',
self._fake_delete_gpfs_file)
+ self.stubs.Set(GPFSDriver, '_create_sparse_file',
+ self._fake_create_sparse_file)
+ self.stubs.Set(GPFSDriver, '_create_regular_file',
+ self._fake_create_regular_file)
+ self.stubs.Set(GPFSDriver, '_get_available_capacity',
+ self._fake_get_available_capacity)
self.stubs.Set(image_utils, 'qemu_img_info',
self._fake_qemu_qcow2_image_info)
self.stubs.Set(image_utils, 'convert_image',
self._fake_convert_image)
self.context = context.get_admin_context()
-
CONF.gpfs_images_dir = self.images_dir
def tearDown(self):
try:
- os.remove(self.image_path)
os.rmdir(self.images_dir)
os.rmdir(self.volumes_path)
except OSError:
self.assertTrue(os.path.exists(self.volumes_path))
self.volume.create_volume(self.context, volume_id)
path = self.volumes_path + '/' + vol['name']
- self.assertTrue(os.path.exists(self.volumes_path))
self.assertTrue(os.path.exists(path))
self.volume.delete_volume(self.context, volume_id)
self.assertFalse(os.path.exists(path))
- self.assertTrue(os.path.exists(self.volumes_path))
def test_create_delete_volume_sparse_backing_file(self):
"""create and delete vol with default sparse creation method"""
self.assertTrue(os.path.exists(self.volumes_path))
self.volume.create_volume(self.context, volume_id)
path = self.volumes_path + '/' + vol['name']
- self.assertTrue(os.path.exists(self.volumes_path))
self.assertTrue(os.path.exists(path))
self.volume.delete_volume(self.context, volume_id)
self.assertFalse(os.path.exists(path))
- self.assertTrue(os.path.exists(self.volumes_path))
def _create_snapshot(self, volume_id, size='0'):
"""Create a snapshot object."""
self.assertRaises(exception.VolumeBackendAPIException,
self.driver.check_for_setup_error)
+ def _fake_create_file(self, path, modebits='666'):
+ open(path, 'w').close()
+ utils.execute('chmod', modebits, path)
+
def _fake_gpfs_snap(self, src, dest=None, modebits='644'):
if dest is None:
dest = src
- utils.execute('truncate', '-s', 0, dest)
- utils.execute('chmod', '644', dest)
+ self._fake_create_file(dest, '644')
def _fake_gpfs_copy(self, src, dest):
- utils.execute('truncate', '-s', 0, dest)
- utils.execute('chmod', '666', dest)
+ self._fake_create_file(dest)
+
+ def _fake_create_sparse_file(self, path, size):
+ self._fake_create_file(path)
+
+ def _fake_create_regular_file(self, path, size):
+ self._fake_create_file(path)
def _fake_gpfs_redirect(self, src):
return True
def _fake_is_gpfs_parent(self, gpfs_file):
return False
+ def _fake_get_available_capacity(self, path):
+ fake_avail = 80 * 1024 * 1024 * 1024
+ fake_size = 2 * fake_avail
+ return fake_avail, fake_size
+
def _fake_gpfs_get_state_active(self):
active_txt = ('mmgetstate::HEADER:version:reserved:reserved:'
'nodeName:nodeNumber:state:quorum:nodesUp:totalNodes:'