from cinder.openstack.common import timeutils
from cinder import test
from cinder.tests.api import fakes
+from cinder.tests import utils
# needed for stubs to work
import cinder.volume
super(BackupsAPITestCase, self).setUp()
self.volume_api = cinder.volume.API()
self.backup_api = cinder.backup.API()
+ self.context = context.get_admin_context()
+ self.context.project_id = 'fake'
+ self.context.user_id = 'fake'
def tearDown(self):
super(BackupsAPITestCase, self).tearDown()
return db.backup_get(context.get_admin_context(),
backup_id)[attrib_name]
- @staticmethod
- def _create_volume(display_name='test_volume',
- display_description='this is a test volume',
- status='creating',
- availability_zone='fake_az',
- host='fake_host',
- size=1):
- """Create a volume object."""
- vol = {}
- vol['size'] = size
- vol['user_id'] = 'fake'
- vol['project_id'] = 'fake'
- vol['status'] = status
- vol['display_name'] = display_name
- vol['display_description'] = display_description
- vol['attach_status'] = 'detached'
- vol['availability_zone'] = availability_zone
- vol['host'] = host
- return db.volume_create(context.get_admin_context(), vol)['id']
-
@staticmethod
def _stub_service_get_all_by_topic(context, topic):
- return [{'availability_zone': "fake_az", 'host': 'fake_host',
+ return [{'availability_zone': "fake_az", 'host': 'test_host',
'disabled': 0, 'updated_at': timeutils.utcnow()}]
def test_show_backup(self):
- volume_id = self._create_volume(size=5)
+ volume_id = utils.create_volume(self.context, size=5,
+ status='creating')['id']
backup_id = self._create_backup(volume_id)
LOG.debug('Created backup with id %s' % backup_id)
req = webob.Request.blank('/v2/fake/backups/%s' %
db.volume_destroy(context.get_admin_context(), volume_id)
def test_show_backup_xml_content_type(self):
- volume_id = self._create_volume(size=5)
+ volume_id = utils.create_volume(self.context, size=5,
+ status='creating')['id']
backup_id = self._create_backup(volume_id)
req = webob.Request.blank('/v2/fake/backups/%s' % backup_id)
req.method = 'GET'
def test_create_backup_json(self):
self.stubs.Set(cinder.db, 'service_get_all_by_topic',
self._stub_service_get_all_by_topic)
- volume_id = self._create_volume(status='available', size=5)
+
+ volume_id = utils.create_volume(self.context, size=5)['id']
+
body = {"backup": {"display_name": "nightly001",
"display_description":
"Nightly Backup 03-Sep-2012",
def test_create_backup_xml(self):
self.stubs.Set(cinder.db, 'service_get_all_by_topic',
self._stub_service_get_all_by_topic)
- volume_size = 2
- volume_id = self._create_volume(status='available', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=2)['id']
req = webob.Request.blank('/v2/fake/backups')
req.body = ('<backup display_name="backup-001" '
def test_create_backup_with_InvalidVolume(self):
# need to create the volume referenced below first
- volume_size = 5
- volume_id = self._create_volume(status='restoring', size=volume_size)
-
+ volume_id = utils.create_volume(self.context, size=5,
+ status='restoring')['id']
body = {"backup": {"display_name": "nightly001",
"display_description":
"Nightly Backup 03-Sep-2012",
self.stubs.Set(cinder.db, 'service_get_all_by_topic',
stub_empty_service_get_all_by_topic)
- volume_size = 2
- volume_id = self._create_volume(status='available', size=volume_size)
-
+ volume_id = utils.create_volume(self.context, size=2)['id']
req = webob.Request.blank('/v2/fake/backups')
body = {"backup": {"display_name": "nightly001",
"display_description":
#service az not match with volume's az
def az_not_match(context, topic):
- return [{'availability_zone': "strange_az", 'host': 'fake_host',
+ return [{'availability_zone': "strange_az", 'host': 'test_host',
'disabled': 0, 'updated_at': timeutils.utcnow()}]
#service disabled
def disabled_service(context, topic):
- return [{'availability_zone': "fake_az", 'host': 'fake_host',
+ return [{'availability_zone': "fake_az", 'host': 'test_host',
'disabled': 1, 'updated_at': timeutils.utcnow()}]
#dead service that last reported at 20th centry
def multi_services(context, topic):
return [{'availability_zone': "fake_az", 'host': 'strange_host',
'disabled': 0, 'updated_at': timeutils.utcnow()},
- {'availability_zone': "fake_az", 'host': 'fake_host',
+ {'availability_zone': "fake_az", 'host': 'test_host',
'disabled': 0, 'updated_at': timeutils.utcnow()}]
- volume_id = self._create_volume(status='available', size=2,
- availability_zone='fake_az',
- host='fake_host')
+ volume_id = utils.create_volume(self.context, size=2)['id']
volume = self.volume_api.get(context.get_admin_context(), volume_id)
#test empty service
def test_restore_backup_volume_id_specified_json(self):
backup_id = self._create_backup(status='available')
# need to create the volume referenced below first
- volume_size = 5
- volume_id = self._create_volume(status='available', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=5)['id']
body = {"restore": {"volume_id": volume_id, }}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
def test_restore_backup_volume_id_specified_xml(self):
backup_id = self._create_backup(status='available')
- volume_size = 2
- volume_id = self._create_volume(status='available', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=2)['id']
req = webob.Request.blank('/v2/fake/backups/%s/restore' % backup_id)
req.body = '<restore volume_id="%s"/>' % volume_id
# intercept volume creation to ensure created volume
# has status of available
def fake_volume_api_create(cls, context, size, name, description):
- volume_id = self._create_volume(status='available', size=size)
+ volume_id = utils.create_volume(self.context, size=size)['id']
return db.volume_get(context, volume_id)
self.stubs.Set(cinder.volume.API, 'create',
backup_id = self._create_backup(status='available')
# need to create the volume referenced below first
- volume_size = 0
- volume_id = self._create_volume(status='available', size=volume_size)
-
+ volume_id = utils.create_volume(self.context, size=0)['id']
body = {"restore": {"volume_id": volume_id, }}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
backup_id)
def test_restore_backup_with_InvalidVolume(self):
backup_id = self._create_backup(status='available')
# need to create the volume referenced below first
- volume_size = 5
- volume_id = self._create_volume(status='attaching', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=5,
+ status='attaching')['id']
body = {"restore": {"volume_id": volume_id, }}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
def test_restore_backup_with_InvalidBackup(self):
backup_id = self._create_backup(status='restoring')
# need to create the volume referenced below first
- volume_size = 5
- volume_id = self._create_volume(status='available', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=5)['id']
body = {"restore": {"volume_id": volume_id, }}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
def test_restore_backup_with_BackupNotFound(self):
# need to create the volume referenced below first
- volume_size = 5
- volume_id = self._create_volume(status='available', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=5)['id']
body = {"restore": {"volume_id": volume_id, }}
req = webob.Request.blank('/v2/fake/backups/9999/restore')
backup_id = self._create_backup(status='available')
# need to create the volume referenced below first
- volume_size = 5
- volume_id = self._create_volume(status='available', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=5)['id']
body = {"restore": {"volume_id": volume_id, }}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
backup_id = self._create_backup(status='available')
# need to create the volume referenced below first
- volume_size = 5
- volume_id = self._create_volume(status='available', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=5)['id']
body = {"restore": {"volume_id": volume_id, }}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
backup_id = self._create_backup(status='available', size=backup_size)
# need to create the volume referenced below first
volume_size = 5
- volume_id = self._create_volume(status='available', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=volume_size)['id']
body = {"restore": {"volume_id": volume_id, }}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
def test_restore_backup_to_oversized_volume(self):
backup_id = self._create_backup(status='available', size=10)
# need to create the volume referenced below first
- volume_size = 15
- volume_id = self._create_volume(status='available', size=volume_size)
+ volume_id = utils.create_volume(self.context, size=15)['id']
body = {"restore": {"volume_id": volume_id, }}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
+from cinder.tests import utils
LOG = logging.getLogger(__name__)
def tearDown(self):
super(TransfersTableTestCase, self).tearDown()
- def _create_volume(self,
- display_name='test_volume',
- display_description='this is a test volume',
- status='available',
- size=1):
- """Create a volume object."""
- vol = {}
- vol['size'] = size
- vol['user_id'] = self.ctxt.user_id
- vol['project_id'] = self.ctxt.project_id
- vol['status'] = status
- vol['display_name'] = display_name
- vol['display_description'] = display_description
- vol['attach_status'] = 'detached'
- return db.volume_create(self.ctxt, vol)['id']
-
def _create_transfer(self, volume_id=None):
"""Create a transfer object."""
transfer = {'display_name': 'display_name',
self.assertRaises(KeyError,
self._create_transfer)
- volume_id = self._create_volume(size=1)
+ volume_id = utils.create_volume(self.ctxt)['id']
self._create_transfer(volume_id)
def test_transfer_get(self):
- volume_id1 = self._create_volume(size=1)
+ volume_id1 = utils.create_volume(self.ctxt)['id']
xfer_id1 = self._create_transfer(volume_id1)
xfer = db.transfer_get(self.ctxt, xfer_id1)
self.assertEquals(xfer.volume_id, volume_id1, "Unexpected volume_id")
def test_transfer_get_all(self):
- volume_id1 = self._create_volume(size=1)
- volume_id2 = self._create_volume(size=1)
+ volume_id1 = utils.create_volume(self.ctxt)['id']
+ volume_id2 = utils.create_volume(self.ctxt)['id']
self._create_transfer(volume_id1)
self._create_transfer(volume_id2)
"Unexpected number of transfer records")
def test_transfer_destroy(self):
- volume_id = self._create_volume(size=1)
- volume_id2 = self._create_volume(size=1)
+ volume_id = utils.create_volume(self.ctxt)['id']
+ volume_id2 = utils.create_volume(self.ctxt)['id']
xfer_id1 = self._create_transfer(volume_id)
xfer_id2 = self._create_transfer(volume_id2)
from cinder.openstack.common import log as logging
from cinder.openstack.common import processutils
from cinder import test
+from cinder.tests import utils as test_utils
from cinder import utils
from cinder.volume import configuration as conf
from cinder.volume.drivers.gpfs import GPFSDriver
self._fake_qemu_image_resize)
self.context = context.get_admin_context()
+ self.context.user_id = 'fake'
+ self.context.project_id = 'fake'
CONF.gpfs_images_dir = self.images_dir
def tearDown(self):
pass
super(GPFSDriverTestCase, self).tearDown()
- def _create_volume(self, size=0, snapshot_id=None, image_id=None,
- metadata=None):
- """Create a volume object."""
- vol = {}
- vol['size'] = size
- vol['snapshot_id'] = snapshot_id
- vol['image_id'] = image_id
- vol['user_id'] = 'fake'
- vol['project_id'] = 'fake'
- vol['availability_zone'] = CONF.storage_availability_zone
- vol['status'] = "creating"
- vol['attach_status'] = "detached"
- vol['host'] = CONF.host
- if metadata is not None:
- vol['metadata'] = metadata
- return db.volume_create(context.get_admin_context(), vol)
-
def test_create_delete_volume_full_backing_file(self):
"""create and delete vol with full creation method"""
CONF.gpfs_sparse_volumes = False
- vol = self._create_volume(size=1)
+ vol = test_utils.create_volume(self.context, host=CONF.host)
volume_id = vol['id']
self.assertTrue(os.path.exists(self.volumes_path))
self.volume.create_volume(self.context, volume_id)
def test_create_delete_volume_sparse_backing_file(self):
"""create and delete vol with default sparse creation method"""
CONF.gpfs_sparse_volumes = True
- vol = self._create_volume(size=1)
+ vol = test_utils.create_volume(self.context, host=CONF.host)
volume_id = vol['id']
self.assertTrue(os.path.exists(self.volumes_path))
self.volume.create_volume(self.context, volume_id)
'block_group_factor': '1',
'write_affinity_failure-group':
'1,1,1:2;2,1,1:2;2,0,3:4'}
- vol = self._create_volume(size=1, metadata=attributes)
+ vol = test_utils.create_volume(self.context, host=CONF.host,
+ metadata=attributes)
volume_id = vol['id']
self.assertTrue(os.path.exists(self.volumes_path))
self.volume.create_volume(self.context, volume_id)
return db.snapshot_create(context.get_admin_context(), snap)
def test_create_delete_snapshot(self):
- volume_src = self._create_volume()
+ volume_src = test_utils.create_volume(self.context, host=CONF.host)
self.volume.create_volume(self.context, volume_src['id'])
snapCount = len(db.snapshot_get_all_for_volume(self.context,
volume_src['id']))
self.assertTrue(snapCount == 0)
def test_create_volume_from_snapshot(self):
- volume_src = self._create_volume()
+ volume_src = test_utils.create_volume(self.context, host=CONF.host)
self.volume.create_volume(self.context, volume_src['id'])
snapshot = self._create_snapshot(volume_src['id'])
snapshot_id = snapshot['id']
snapshot_id)
self.assertTrue(os.path.exists(os.path.join(self.volumes_path,
snapshot['name'])))
- volume_dst = self._create_volume(0, snapshot_id)
+ volume_dst = test_utils.create_volume(self.context, host=CONF.host,
+ snapshot_id=snapshot_id)
self.volume.create_volume(self.context, volume_dst['id'], snapshot_id)
self.assertEqual(volume_dst['id'], db.volume_get(
context.get_admin_context(),
self.volume.delete_snapshot(self.context, snapshot_id)
def test_create_cloned_volume(self):
- volume_src = self._create_volume()
+ volume_src = test_utils.create_volume(self.context, host=CONF.host)
self.volume.create_volume(self.context, volume_src['id'])
- volume_dst = self._create_volume()
+ volume_dst = test_utils.create_volume(self.context, host=CONF.host)
volumepath = os.path.join(self.volumes_path, volume_dst['name'])
self.assertFalse(os.path.exists(volumepath))
self.volume.delete_volume(self.context, volume_dst['id'])
def test_create_volume_from_snapshot_method(self):
- volume_src = self._create_volume()
+ volume_src = test_utils.create_volume(self.context, host=CONF.host)
self.volume.create_volume(self.context, volume_src['id'])
snapshot = self._create_snapshot(volume_src['id'])
snapshot_id = snapshot['id']
self.volume.create_snapshot(self.context, volume_src['id'],
snapshot_id)
- volume_dst = self._create_volume()
+ volume_dst = test_utils.create_volume(self.context, host=CONF.host)
self.driver.create_volume_from_snapshot(volume_dst, snapshot)
self.assertEqual(volume_dst['id'], db.volume_get(
context.get_admin_context(),
self.stubs.Set(image_utils, 'qemu_img_info',
self._fake_qemu_raw_image_info)
- volume = self._create_volume()
+ volume = test_utils.create_volume(self.context, host=CONF.host)
volumepath = os.path.join(self.volumes_path, volume['name'])
CONF.gpfs_images_share_mode = 'copy_on_write'
self.driver.clone_image(volume,
self.stubs.Set(image_utils, 'qemu_img_info',
self._fake_qemu_raw_image_info)
- volume = self._create_volume()
+ volume = test_utils.create_volume(self.context, host=CONF.host)
volumepath = os.path.join(self.volumes_path, volume['name'])
CONF.gpfs_images_share_mode = 'copy'
self.driver.clone_image(volume,
self._fake_qemu_raw_image_info)
for share_mode in ['copy_on_write', 'copy']:
- volume = self._create_volume()
+ volume = test_utils.create_volume(self.context, host=CONF.host)
volumepath = os.path.join(self.volumes_path, volume['name'])
CONF.gpfs_images_share_mode = share_mode
CONF.gpfs_images_dir = None
self.stubs.Set(image_utils, 'qemu_img_info',
self._fake_qemu_qcow2_image_info)
- volume = self._create_volume()
+ volume = test_utils.create_volume(self.context, host=CONF.host)
CONF.gpfs_images_share_mode = 'copy'
CONF.gpfs_images_dir = self.images_dir
self.assertRaises(exception.ImageUnacceptable,
notification_driver=[test_notifier.__name__])
self.volume = importutils.import_object(CONF.volume_manager)
self.context = context.get_admin_context()
+ self.context.user_id = 'fake'
+ self.context.project_id = 'fake'
+ self.volume_params = {
+ 'status': 'creating',
+ 'host': CONF.host,
+ 'size': 0}
self.stubs.Set(iscsi.TgtAdm, '_get_target', self.fake_get_target)
self.stubs.Set(brick_lvm.LVM,
'get_all_volume_groups',
'lv_count': '2',
'uuid': 'vR1JU3-FAKE-C4A9-PQFh-Mctm-9FwA-Xwzc1m'}]
- @staticmethod
- def _create_volume(size=0, snapshot_id=None, image_id=None,
- source_volid=None, metadata=None, admin_metadata=None,
- status="creating", migration_status=None,
- availability_zone=None):
- """Create a volume object."""
- vol = {}
- vol['size'] = size
- vol['snapshot_id'] = snapshot_id
- vol['image_id'] = image_id
- vol['source_volid'] = source_volid
- vol['user_id'] = 'fake'
- vol['project_id'] = 'fake'
- vol['availability_zone'] = \
- availability_zone or CONF.storage_availability_zone
- vol['status'] = status
- vol['attach_status'] = "detached"
- vol['host'] = CONF.host
- if metadata is not None:
- vol['metadata'] = metadata
- if admin_metadata is not None:
- vol['admin_metadata'] = admin_metadata
- return db.volume_create(context.get_admin_context(), vol)
-
class VolumeTestCase(BaseVolumeTestCase):
def test_init_host_clears_downloads(self):
"""Test that init_host will unwedge a volume stuck in downloading."""
- volume = self._create_volume(status='downloading')
+ volume = tests_utils.create_volume(self.context, status='downloading',
+ size=0, host=CONF.host)
volume_id = volume['id']
self.volume.init_host()
volume = db.volume_get(context.get_admin_context(), volume_id)
self.stubs.Set(QUOTAS, "commit", fake_commit)
self.stubs.Set(QUOTAS, "rollback", fake_rollback)
- volume = self._create_volume()
+ volume = tests_utils.create_volume(
+ self.context,
+ availability_zone=CONF.storage_availability_zone,
+ **self.volume_params)
volume_id = volume['id']
self.assertIsNone(volume['encryption_key_id'])
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
self.assertEqual(msg['event_type'], 'volume.create.start')
expected = {
'status': 'creating',
- 'display_name': None,
+ 'display_name': 'test_volume',
'availability_zone': 'nova',
'tenant_id': 'fake',
'created_at': 'DONTCARE',
def test_create_delete_volume_with_metadata(self):
"""Test volume can be created with metadata and deleted."""
test_meta = {'fake_key': 'fake_value'}
- volume = self._create_volume(0, None, metadata=test_meta)
+ volume = tests_utils.create_volume(self.context, metadata=test_meta,
+ **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
result_meta = {
def test_delete_busy_volume(self):
"""Test volume survives deletion if driver reports it as busy."""
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
def test_delete_volume_in_error_extending(self):
"""Test volume can be deleted in error_extending stats."""
# create a volume
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
# delete 'error_extending' volume
def test_create_volume_from_snapshot(self):
"""Test volume can be created from a snapshot."""
- volume_src = self._create_volume()
+ volume_src = tests_utils.create_volume(self.context,
+ **self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
snapshot_id = self._create_snapshot(volume_src['id'])['id']
self.volume.create_snapshot(self.context, volume_src['id'],
snapshot_id)
- volume_dst = self._create_volume(0, snapshot_id)
+ volume_dst = tests_utils.create_volume(self.context,
+ snapshot_id=snapshot_id,
+ **self.volume_params)
self.volume.create_volume(self.context, volume_dst['id'], snapshot_id)
self.assertEqual(volume_dst['id'],
db.volume_get(
'list_availability_zones',
fake_list_availability_zones)
- volume_src = self._create_volume(availability_zone='az2')
+ volume_src = tests_utils.create_volume(self.context,
+ availability_zone='az2',
+ **self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
snapshot = self._create_snapshot(volume_src['id'])
self.volume.create_snapshot(self.context, volume_src['id'],
# volume_create
return True
try:
- volume = self._create_volume(1001)
+ volume = tests_utils.create_volume(self.context, size=1001,
+ status='creating',
+ host=CONF.host)
self.volume.create_volume(self.context, volume)
self.fail("Should have thrown TypeError")
except TypeError:
mountpoint = "/dev/sdf"
# attach volume to the instance then to detach
instance_uuid = '12345678-1234-5678-1234-567812345678'
- volume = self._create_volume(admin_metadata={'readonly': 'True'})
+ volume = tests_utils.create_volume(self.context,
+ admin_metadata={'readonly': 'True'},
+ **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.volume.attach_volume(self.context, volume_id, instance_uuid,
def test_run_attach_detach_volume_for_host(self):
"""Make sure volume can be attached and detached from host."""
mountpoint = "/dev/sdf"
- volume = self._create_volume(admin_metadata={'readonly': 'False'})
+ volume = tests_utils.create_volume(
+ self.context,
+ admin_metadata={'readonly': 'False'},
+ **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.volume.attach_volume(self.context, volume_id, None,
def test_run_attach_detach_volume_with_attach_mode(self):
instance_uuid = '12345678-1234-5678-1234-567812345678'
mountpoint = "/dev/sdf"
- volume = self._create_volume(admin_metadata={'readonly': 'True'})
+ volume = tests_utils.create_volume(self.context,
+ admin_metadata={'readonly': 'True'},
+ **self.volume_params)
volume_id = volume['id']
db.volume_update(self.context, volume_id, {'status': 'available',
'mountpoint': None,
# Not allow using 'read-write' mode attach readonly volume
instance_uuid = '12345678-1234-5678-1234-567812345678'
mountpoint = "/dev/sdf"
- volume = self._create_volume(admin_metadata={'readonly': 'True'})
+ volume = tests_utils.create_volume(self.context,
+ admin_metadata={'readonly': 'True'},
+ **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.assertRaises(exception.InvalidVolumeAttachMode,
# Not allow using 'read-write' mode attach readonly volume
instance_uuid = '12345678-1234-5678-1234-567812345678'
mountpoint = "/dev/sdf"
- volume = self._create_volume(admin_metadata={'readonly': 'True'})
+ volume = tests_utils.create_volume(self.context,
+ admin_metadata={'readonly': 'True'},
+ **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
volume_api = cinder.volume.api.API()
total_slots = CONF.iscsi_num_targets
for _index in xrange(total_slots):
- self._create_volume()
+ tests_utils.create_volume(self.context, **self.volume_params)
for volume_id in volume_ids:
self.volume.delete_volume(self.context, volume_id)
def test_create_delete_snapshot(self):
"""Test snapshot can be created and deleted."""
- volume = self._create_volume()
+ volume = tests_utils.create_volume(
+ self.context,
+ availability_zone=CONF.storage_availability_zone,
+ **self.volume_params)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
self.volume.create_volume(self.context, volume['id'])
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
def test_create_delete_snapshot_with_metadata(self):
"""Test snapshot can be created with metadata and deleted."""
test_meta = {'fake_key': 'fake_value'}
- volume = self._create_volume(0, None)
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
snapshot = self._create_snapshot(volume['id'], metadata=test_meta)
snapshot_id = snapshot['id']
def test_cant_delete_volume_in_use(self):
"""Test volume can't be deleted in invalid stats."""
# create a volume and assign to host
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
volume['host'] = 'fakehost'
def test_force_delete_volume(self):
"""Test volume can be forced to delete."""
# create a volume and assign to host
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'error_deleting'
volume['host'] = 'fakehost'
def test_cant_force_delete_attached_volume(self):
"""Test volume can't be force delete in attached state"""
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
volume['attach_status'] = 'attached'
def test_cant_delete_volume_with_snapshots(self):
"""Test volume can't be deleted with dependent snapshots."""
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
snapshot_id = self._create_snapshot(volume['id'])['id']
self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
def test_can_delete_errored_snapshot(self):
"""Test snapshot can be created and deleted."""
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
snapshot_id = self._create_snapshot(volume['id'])['id']
self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
self.stubs.Set(rpc, 'cast', fake_cast)
instance_uuid = '12345678-1234-5678-1234-567812345678'
# create volume and attach to the instance
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
db.volume_attached(self.context, volume['id'], instance_uuid,
None, '/dev/sda1')
db.volume_destroy(self.context, volume['id'])
# create volume and attach to the host
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
db.volume_attached(self.context, volume['id'], None,
'fake_host', '/dev/sda1')
None,
'default')
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
snapshot_id = self._create_snapshot(volume_id)['id']
fake_copy_image_to_volume)
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
- volume_id = self._create_volume(status='creating')['id']
+ volume_id = tests_utils.create_volume(self.context,
+ **self.volume_params)['id']
# creating volume testdata
try:
self.volume.create_volume(self.context,
def test_begin_roll_detaching_volume(self):
"""Test begin_detaching and roll_detaching functions."""
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_api = cinder.volume.api.API()
volume_api.begin_detaching(self.context, volume)
volume = db.volume_get(self.context, volume['id'])
def test_volume_api_update(self):
# create a raw vol
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
# use volume.api to update name
volume_api = cinder.volume.api.API()
update_dict = {'display_name': 'test update name'}
def test_volume_api_update_snapshot(self):
# create raw snapshot
- volume = self._create_volume()
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
snapshot = self._create_snapshot(volume['id'])
self.assertEquals(snapshot['display_name'], None)
# use volume.api to update name
def test_extend_volume(self):
"""Test volume can be extended at API level."""
# create a volume and assign to host
- volume = self._create_volume(2)
+ volume = tests_utils.create_volume(self.context, size=2,
+ status='creating', host=CONF.host)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
volume['host'] = 'fakehost'
def fake_extend_exc(volume, new_size):
raise exception.CinderException('fake exception')
- volume = self._create_volume(2)
+ volume = tests_utils.create_volume(self.context, size=2,
+ status='creating', host=CONF.host)
self.volume.create_volume(self.context, volume['id'])
# Test quota exceeded
fake_reschedule_or_error)
self.stubs.Set(self.volume.driver, 'create_volume', fake_create_volume)
- volume_src = self._create_volume()
+ volume_src = tests_utils.create_volume(self.context,
+ **self.volume_params)
self.assertRaises(exception.CinderException,
self.volume.create_volume, ctxt, volume_src['id'])
self.stubs.Set(self.volume.driver, 'create_cloned_volume',
fake_create_cloned_volume)
- volume_src = self._create_volume()
+ volume_src = tests_utils.create_volume(self.context,
+ **self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
- volume_dst = self._create_volume(source_volid=volume_src['id'])
+ volume_dst = tests_utils.create_volume(self.context,
+ source_volid=volume_src['id'],
+ **self.volume_params)
self.volume.create_volume(self.context, volume_dst['id'],
source_volid=volume_src['id'])
self.assertEqual('available',
'list_availability_zones',
fake_list_availability_zones)
- volume_src = self._create_volume(availability_zone='az2')
+ volume_src = tests_utils.create_volume(self.context,
+ availability_zone='az2',
+ **self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
volume_src = db.volume_get(self.context, volume_src['id'])
fake_create_cloned_volume)
volume_src = self._create_volume_from_image()
self.volume.create_volume(self.context, volume_src['id'])
- volume_dst = self._create_volume(source_volid=volume_src['id'])
+ volume_dst = tests_utils.create_volume(self.context,
+ source_volid=volume_src['id'],
+ **self.volume_params)
self.volume.create_volume(self.context, volume_dst['id'],
source_volid=volume_src['id'])
self.assertEqual('available',
self.stubs.Set(self.volume.driver, 'create_cloned_volume',
fake_error_create_cloned_volume)
- volume_src = self._create_volume()
+ volume_src = tests_utils.create_volume(self.context,
+ **self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
- volume_dst = self._create_volume(0, source_volid=volume_src['id'])
+ volume_dst = tests_utils.create_volume(self.context,
+ source_volid=volume_src['id'],
+ **self.volume_params)
self.assertRaises(exception.CinderException,
self.volume.create_volume,
self.context,
self.stubs.Set(self.volume.driver, 'migrate_volume',
lambda x, y, z: (True, {'user_id': 'foo'}))
- volume = self._create_volume(status='available',
- migration_status='migrating')
+ volume = tests_utils.create_volume(self.context, size=0,
+ host=CONF.host,
+ migration_status='migrating')
host_obj = {'host': 'newhost', 'capabilities': {}}
self.volume.migrate_volume(self.context, volume['id'],
host_obj, False)
self.stubs.Set(volume_rpcapi.VolumeAPI, 'delete_volume',
fake_delete_volume_rpc)
- volume = self._create_volume(status='available')
+ volume = tests_utils.create_volume(self.context, size=0,
+ host=CONF.host)
host_obj = {'host': 'newhost', 'capabilities': {}}
self.volume.migrate_volume(self.context, volume['id'],
host_obj, True)
def test_update_volume_readonly_flag(self):
"""Test volume readonly flag can be updated at API level."""
# create a volume and assign to host
- volume = self._create_volume(admin_metadata={'readonly': 'True'})
+ volume = tests_utils.create_volume(self.context,
+ admin_metadata={'readonly': 'True'},
+ **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
+from cinder.tests import utils
from cinder.transfer import api as transfer_api
super(VolumeTransferTestCase, self).setUp()
self.ctxt = context.RequestContext(user_id='user_id',
project_id='project_id')
-
- def _create_volume(self, volume_id, status='available',
- user_id=None, project_id=None):
- if user_id is None:
- user_id = self.ctxt.user_id
- if project_id is None:
- project_id = self.ctxt.project_id
- vol = {'id': volume_id,
- 'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'user_id': user_id,
- 'project_id': project_id,
- 'display_name': 'Display Name',
- 'display_description': 'Display Description',
- 'size': 1,
- 'status': status}
- volume = db.volume_create(self.ctxt, vol)
- return volume
+ self.updated_at = datetime.datetime(1, 1, 1, 1, 1, 1)
def test_transfer_volume_create_delete(self):
tx_api = transfer_api.API()
- volume = self._create_volume('1')
+ volume = utils.create_volume(self.ctxt, id='1',
+ updated_at=self.updated_at)
response = tx_api.create(self.ctxt, '1', 'Description')
volume = db.volume_get(self.ctxt, '1')
self.assertEquals('awaiting-transfer', volume['status'],
def test_transfer_invalid_volume(self):
tx_api = transfer_api.API()
- volume = self._create_volume('1', status='in-use')
+ volume = utils.create_volume(self.ctxt, id='1', status='in-use',
+ updated_at=self.updated_at)
self.assertRaises(exception.InvalidVolume,
tx_api.create,
self.ctxt, '1', 'Description')
def test_transfer_accept(self):
tx_api = transfer_api.API()
- volume = self._create_volume('1')
+ volume = utils.create_volume(self.ctxt, id='1',
+ updated_at=self.updated_at)
transfer = tx_api.create(self.ctxt, '1', 'Description')
volume = db.volume_get(self.ctxt, '1')
self.assertEquals('awaiting-transfer', volume['status'],
def test_transfer_get(self):
tx_api = transfer_api.API()
- volume = self._create_volume('1')
+ volume = utils.create_volume(self.ctxt, id='1',
+ updated_at=self.updated_at)
transfer = tx_api.create(self.ctxt, volume['id'], 'Description')
t = tx_api.get(self.ctxt, transfer['id'])
self.assertEquals(t['id'], transfer['id'], 'Unexpected transfer id')