from cinder.tests.image import fake as fake_image
from cinder.tests.keymgr import fake as fake_keymgr
from cinder.tests import utils as tests_utils
+from cinder import units
from cinder import utils
import cinder.volume
from cinder.volume import configuration as conf
]
-class VolumeTestCase(test.TestCase):
- """Test Case for volumes."""
+class FakeImageService:
+ def __init__(self, db_driver=None, image_service=None):
+ pass
+
+ def show(self, context, image_id):
+ return {'size': 2 * units.GiB,
+ 'disk_format': 'raw',
+ 'container_format': 'bare'}
+
+class BaseVolumeTestCase(test.TestCase):
+ """Test Case for volumes."""
def setUp(self):
- super(VolumeTestCase, self).setUp()
+ super(BaseVolumeTestCase, self).setUp()
vol_tmpdir = tempfile.mkdtemp()
self.flags(connection_type='fake',
volumes_dir=vol_tmpdir,
except OSError:
pass
notifier_api._reset_drivers()
- super(VolumeTestCase, self).tearDown()
+ super(BaseVolumeTestCase, self).tearDown()
def fake_get_target(obj, iqn):
return 1
vol['metadata'] = metadata
return db.volume_create(context.get_admin_context(), vol)
+
+class VolumeTestCase(BaseVolumeTestCase):
def test_init_host_clears_downloads(self):
"""Test that init_host will unwedge a volume stuck in downloading."""
volume = self._create_volume(status='downloading')
self.assertDictMatch(msg['payload'], expected)
msg = test_notifier.NOTIFICATIONS[1]
self.assertEqual(msg['event_type'], 'volume.create.end')
- expected = {
- 'status': 'available',
- 'display_name': None,
- 'availability_zone': 'nova',
- 'tenant_id': 'fake',
- 'created_at': 'DONTCARE',
- 'volume_id': volume_id,
- 'volume_type': None,
- 'snapshot_id': None,
- 'user_id': 'fake',
- 'launched_at': 'DONTCARE',
- 'size': 0,
- }
+ expected['status'] = 'available'
self.assertDictMatch(msg['payload'], expected)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 4)
msg = test_notifier.NOTIFICATIONS[2]
self.assertEqual(msg['event_type'], 'volume.delete.start')
- expected = {
- 'status': 'available',
- 'display_name': None,
- 'availability_zone': 'nova',
- 'tenant_id': 'fake',
- 'created_at': 'DONTCARE',
- 'volume_id': volume_id,
- 'volume_type': None,
- 'snapshot_id': None,
- 'user_id': 'fake',
- 'launched_at': 'DONTCARE',
- 'size': 0,
- }
self.assertDictMatch(msg['payload'], expected)
msg = test_notifier.NOTIFICATIONS[3]
self.assertEqual(msg['event_type'], 'volume.delete.end')
- expected = {
- 'status': 'available',
- 'display_name': None,
- 'availability_zone': 'nova',
- 'tenant_id': 'fake',
- 'created_at': 'DONTCARE',
- 'volume_id': volume_id,
- 'volume_type': None,
- 'snapshot_id': None,
- 'user_id': 'fake',
- 'launched_at': 'DONTCARE',
- 'size': 0,
- }
self.assertDictMatch(msg['payload'], expected)
self.assertRaises(exception.NotFound,
db.volume_get,
self.assertDictMatch(msg['payload'], expected)
msg = test_notifier.NOTIFICATIONS[3]
self.assertEquals(msg['event_type'], 'snapshot.create.end')
- expected = {
- 'created_at': 'DONTCARE',
- 'deleted': '',
- 'display_name': None,
- 'snapshot_id': snapshot_id,
- 'status': 'creating',
- 'tenant_id': 'fake',
- 'user_id': 'fake',
- 'volume_id': volume['id'],
- 'volume_size': 0,
- 'availability_zone': 'nova'
- }
self.assertDictMatch(msg['payload'], expected)
self.volume.delete_snapshot(self.context, snapshot_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 6)
msg = test_notifier.NOTIFICATIONS[4]
self.assertEquals(msg['event_type'], 'snapshot.delete.start')
- expected = {
- 'created_at': 'DONTCARE',
- 'deleted': '',
- 'display_name': None,
- 'snapshot_id': snapshot_id,
- 'status': 'available',
- 'tenant_id': 'fake',
- 'user_id': 'fake',
- 'volume_id': volume['id'],
- 'volume_size': 0,
- 'availability_zone': 'nova'
- }
+ expected['status'] = 'available'
self.assertDictMatch(msg['payload'], expected)
msg = test_notifier.NOTIFICATIONS[5]
self.assertEquals(msg['event_type'], 'snapshot.delete.end')
- expected = {
- 'created_at': 'DONTCARE',
- 'deleted': '',
- 'display_name': None,
- 'snapshot_id': snapshot_id,
- 'status': 'available',
- 'tenant_id': 'fake',
- 'user_id': 'fake',
- 'volume_id': volume['id'],
- 'volume_size': 0,
- 'availability_zone': 'nova'
- }
self.assertDictMatch(msg['payload'], expected)
snap = db.snapshot_get(context.get_admin_context(read_deleted='yes'),
db.volume_destroy(self.context, volume_id)
os.unlink(dst_path)
- def test_copy_volume_to_image_status_available(self):
- dst_fd, dst_path = tempfile.mkstemp()
- os.close(dst_fd)
-
- def fake_local_path(volume):
- return dst_path
-
- self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
-
- image_meta = {
- 'id': '70a599e0-31e7-49b7-b260-868f441e862b',
- 'container_format': 'bare',
- 'disk_format': 'raw'}
-
- # creating volume testdata
- volume_id = 1
- db.volume_create(self.context,
- {'id': volume_id,
- 'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'display_description': 'Test Desc',
- 'size': 20,
- 'status': 'uploading',
- 'instance_uuid': None,
- 'host': 'dummy'})
-
- try:
- # start test
- self.volume.copy_volume_to_image(self.context,
- volume_id,
- image_meta)
-
- volume = db.volume_get(self.context, volume_id)
- self.assertEqual(volume['status'], 'available')
- finally:
- # cleanup
- db.volume_destroy(self.context, volume_id)
- os.unlink(dst_path)
-
- def test_copy_volume_to_image_status_use(self):
- dst_fd, dst_path = tempfile.mkstemp()
- os.close(dst_fd)
-
- def fake_local_path(volume):
- return dst_path
-
- self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
-
- image_meta = {
- 'id': 'a440c04b-79fa-479c-bed1-0b816eaec379',
- 'container_format': 'bare',
- 'disk_format': 'raw'}
- # creating volume testdata
- volume_id = 1
- db.volume_create(
- self.context,
- {'id': volume_id,
- 'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'display_description': 'Test Desc',
- 'size': 20,
- 'status': 'uploading',
- 'instance_uuid': 'b21f957d-a72f-4b93-b5a5-45b1161abb02',
- 'host': 'dummy'})
-
- try:
- # start test
- self.volume.copy_volume_to_image(self.context,
- volume_id,
- image_meta)
-
- volume = db.volume_get(self.context, volume_id)
- self.assertEqual(volume['status'], 'in-use')
- finally:
- # cleanup
- db.volume_destroy(self.context, volume_id)
- os.unlink(dst_path)
-
- def test_copy_volume_to_image_exception(self):
- dst_fd, dst_path = tempfile.mkstemp()
- os.close(dst_fd)
-
- def fake_local_path(volume):
- return dst_path
-
- self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
-
- image_meta = {
- 'id': 'aaaaaaaa-0000-0000-0000-000000000000',
- 'container_format': 'bare',
- 'disk_format': 'raw'}
- # creating volume testdata
- volume_id = 1
- db.volume_create(self.context,
- {'id': volume_id,
- 'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'display_description': 'Test Desc',
- 'size': 20,
- 'status': 'in-use',
- 'host': 'dummy'})
-
- try:
- # start test
- self.assertRaises(exception.ImageNotFound,
- self.volume.copy_volume_to_image,
- self.context,
- volume_id,
- image_meta)
-
- volume = db.volume_get(self.context, volume_id)
- self.assertEqual(volume['status'], 'available')
- finally:
- # cleanup
- db.volume_destroy(self.context, volume_id)
- os.unlink(dst_path)
-
def test_create_volume_from_exact_sized_image(self):
"""Verify that an image which is exactly the same size as the
volume, will work correctly.
"""
- class _FakeImageService:
- def __init__(self, db_driver=None, image_service=None):
- pass
-
- def show(self, context, image_id):
- return {'size': 2 * 1024 * 1024 * 1024,
- 'disk_format': 'raw',
- 'container_format': 'bare'}
-
try:
volume_id = None
volume_api = cinder.volume.api.API(
- image_service=_FakeImageService())
+ image_service=FakeImageService())
volume = volume_api.create(self.context, 2, 'name', 'description',
image_id=1)
volume_id = volume['id']
def test_create_volume_from_oversized_image(self):
"""Verify that an image which is too big will fail correctly."""
- class _FakeImageService:
- def __init__(self, db_driver=None, image_service=None):
- pass
-
+ class _ModifiedFakeImageService(FakeImageService):
def show(self, context, image_id):
- return {'size': 2 * 1024 * 1024 * 1024 + 1,
+ return {'size': 2 * units.GiB + 1,
'disk_format': 'raw',
'container_format': 'bare'}
- volume_api = cinder.volume.api.API(image_service=_FakeImageService())
+ volume_api = cinder.volume.api.API(image_service=
+ _ModifiedFakeImageService())
self.assertRaises(exception.InvalidInput,
volume_api.create,
def test_create_volume_with_mindisk_error(self):
"""Verify volumes smaller than image minDisk will cause an error."""
- class _FakeImageService:
- def __init__(self, db_driver=None, image_service=None):
- pass
-
+ class _ModifiedFakeImageService(FakeImageService):
def show(self, context, image_id):
- return {'size': 2 * 1024 * 1024 * 1024,
+ return {'size': 2 * units.GiB,
'disk_format': 'raw',
'container_format': 'bare',
'min_disk': 5}
- volume_api = cinder.volume.api.API(image_service=_FakeImageService())
+ volume_api = cinder.volume.api.API(image_service=
+ _ModifiedFakeImageService())
self.assertRaises(exception.InvalidInput,
volume_api.create,
snap = db.snapshot_get(context.get_admin_context(), snapshot['id'])
self.assertEquals(snap['display_name'], 'test update name')
- def test_volume_get_active_by_window(self):
- ctx = context.get_admin_context(read_deleted="yes")
-
- # Find all all volumes valid within a timeframe window.
-
- # Not in window
- db.volume_create(
- ctx,
- {
- 'id': 1,
- 'host': 'devstack',
- 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'deleted': True, 'status': 'deleted',
- 'deleted_at': datetime.datetime(1, 2, 1, 1, 1, 1),
- }
- )
-
- # In - deleted in window
- db.volume_create(
- ctx,
- {
- 'id': 2,
- 'host': 'devstack',
- 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'deleted': True, 'status': 'deleted',
- 'deleted_at': datetime.datetime(1, 3, 10, 1, 1, 1),
- }
- )
-
- # In - deleted after window
- db.volume_create(
- ctx,
- {
- 'id': 3,
- 'host': 'devstack',
- 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'deleted': True, 'status': 'deleted',
- 'deleted_at': datetime.datetime(1, 5, 1, 1, 1, 1),
- }
- )
-
- # In - created in window
- db.volume_create(
- self.context,
- {
- 'id': 4,
- 'host': 'devstack',
- 'created_at': datetime.datetime(1, 3, 10, 1, 1, 1),
- }
- )
-
- # Not of window.
- db.volume_create(
- self.context,
- {
- 'id': 5,
- 'host': 'devstack',
- 'created_at': datetime.datetime(1, 5, 1, 1, 1, 1),
- }
- )
-
- volumes = db.volume_get_active_by_window(
- self.context,
- datetime.datetime(1, 3, 1, 1, 1, 1),
- datetime.datetime(1, 4, 1, 1, 1, 1))
- self.assertEqual(len(volumes), 3)
- self.assertEqual(volumes[0].id, u'2')
- self.assertEqual(volumes[1].id, u'3')
- self.assertEqual(volumes[2].id, u'4')
-
- def test_snapshot_get_active_by_window(self):
- ctx = context.get_admin_context(read_deleted="yes")
-
- # Find all all snapshots valid within a timeframe window.
- # Not in window
- db.snapshot_create(
- ctx,
- {
- 'id': 1,
- 'host': 'devstack',
- 'volume_id': 1,
- 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'deleted': True, 'status': 'deleted',
- 'deleted_at': datetime.datetime(1, 2, 1, 1, 1, 1),
- }
- )
-
- # In - deleted in window
- db.snapshot_create(
- ctx,
- {
- 'id': 2,
- 'host': 'devstack',
- 'volume_id': 1,
- 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'deleted': True, 'status': 'deleted',
- 'deleted_at': datetime.datetime(1, 3, 10, 1, 1, 1),
- }
- )
-
- # In - deleted after window
- db.snapshot_create(
- ctx,
- {
- 'id': 3,
- 'host': 'devstack',
- 'volume_id': 1,
- 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'deleted': True, 'status': 'deleted',
- 'deleted_at': datetime.datetime(1, 5, 1, 1, 1, 1),
- }
- )
-
- # In - created in window
- db.snapshot_create(
- self.context,
- {
- 'id': 4,
- 'host': 'devstack',
- 'volume_id': 1,
- 'created_at': datetime.datetime(1, 3, 10, 1, 1, 1),
- }
- )
-
- # Not of window.
- db.snapshot_create(
- self.context,
- {
- 'id': 5,
- 'host': 'devstack',
- 'volume_id': 1,
- 'created_at': datetime.datetime(1, 5, 1, 1, 1, 1),
- }
- )
-
- snapshots = db.snapshot_get_active_by_window(
- self.context,
- datetime.datetime(1, 3, 1, 1, 1, 1),
- datetime.datetime(1, 4, 1, 1, 1, 1))
- self.assertEqual(len(snapshots), 3)
- self.assertEqual(snapshots[0].id, u'2')
- self.assertEqual(snapshots[1].id, u'3')
- self.assertEqual(snapshots[2].id, u'4')
-
def test_extend_volume(self):
"""Test volume can be extended at API level."""
# create a volume and assign to host
self.assertEquals(volume['name_id'], 'new_id')
+class CopyVolumeToImageTestCase(BaseVolumeTestCase):
+ def fake_local_path(self, volume):
+ return self.dst_path
+
+ def setUp(self):
+ super(CopyVolumeToImageTestCase, self).setUp()
+ self.dst_fd, self.dst_path = tempfile.mkstemp()
+ os.close(self.dst_fd)
+ self.stubs.Set(self.volume.driver, 'local_path', self.fake_local_path)
+ self.image_meta = {
+ 'id': '70a599e0-31e7-49b7-b260-868f441e862b',
+ 'container_format': 'bare',
+ 'disk_format': 'raw'
+ }
+ self.volume_id = 1
+ self.volume_attrs = {
+ 'id': self.volume_id,
+ 'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
+ 'display_description': 'Test Desc',
+ 'size': 20,
+ 'status': 'uploading',
+ 'host': 'dummy'
+ }
+
+ def tearDown(self):
+ db.volume_destroy(self.context, self.volume_id)
+ os.unlink(self.dst_path)
+ super(CopyVolumeToImageTestCase, self).tearDown()
+
+ def test_copy_volume_to_image_status_available(self):
+ # creating volume testdata
+ self.volume_attrs['instance_uuid'] = None
+ db.volume_create(self.context, self.volume_attrs)
+
+ # start test
+ self.volume.copy_volume_to_image(self.context,
+ self.volume_id,
+ self.image_meta)
+
+ volume = db.volume_get(self.context, self.volume_id)
+ self.assertEqual(volume['status'], 'available')
+
+ def test_copy_volume_to_image_status_use(self):
+ self.image_meta['id'] = 'a440c04b-79fa-479c-bed1-0b816eaec379'
+ # creating volume testdata
+ self.volume_attrs['instance_uuid'] = 'b21f957d-a72f-4b93-b5a5-' \
+ '45b1161abb02'
+ db.volume_create(self.context, self.volume_attrs)
+
+ # start test
+ self.volume.copy_volume_to_image(self.context,
+ self.volume_id,
+ self.image_meta)
+
+ volume = db.volume_get(self.context, self.volume_id)
+ self.assertEqual(volume['status'], 'in-use')
+
+ def test_copy_volume_to_image_exception(self):
+ self.image_meta['id'] = 'aaaaaaaa-0000-0000-0000-000000000000'
+ # creating volume testdata
+ self.volume_attrs['status'] = 'in-use'
+ db.volume_create(self.context, self.volume_attrs)
+
+ # start test
+ self.assertRaises(exception.ImageNotFound,
+ self.volume.copy_volume_to_image,
+ self.context,
+ self.volume_id,
+ self.image_meta)
+
+ volume = db.volume_get(self.context, self.volume_id)
+ self.assertEqual(volume['status'], 'available')
+
+
+class GetActiveByWindowTestCase(BaseVolumeTestCase):
+ def setUp(self):
+ super(GetActiveByWindowTestCase, self).setUp()
+ self.ctx = context.get_admin_context(read_deleted="yes")
+ self.db_attrs = [
+ {
+ 'id': 1,
+ 'host': 'devstack',
+ 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
+ 'deleted': True, 'status': 'deleted',
+ 'deleted_at': datetime.datetime(1, 2, 1, 1, 1, 1),
+ },
+
+ {
+ 'id': 2,
+ 'host': 'devstack',
+ 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
+ 'deleted': True, 'status': 'deleted',
+ 'deleted_at': datetime.datetime(1, 3, 10, 1, 1, 1),
+ },
+ {
+ 'id': 3,
+ 'host': 'devstack',
+ 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
+ 'deleted': True, 'status': 'deleted',
+ 'deleted_at': datetime.datetime(1, 5, 1, 1, 1, 1),
+ },
+ {
+ 'id': 4,
+ 'host': 'devstack',
+ 'created_at': datetime.datetime(1, 3, 10, 1, 1, 1),
+ },
+ {
+ 'id': 5,
+ 'host': 'devstack',
+ 'created_at': datetime.datetime(1, 5, 1, 1, 1, 1),
+ }
+ ]
+
+ def test_volume_get_active_by_window(self):
+ # Find all all volumes valid within a timeframe window.
+
+ # Not in window
+ db.volume_create(self.ctx, self.db_attrs[0])
+
+ # In - deleted in window
+ db.volume_create(self.ctx, self.db_attrs[1])
+
+ # In - deleted after window
+ db.volume_create(self.ctx, self.db_attrs[2])
+
+ # In - created in window
+ db.volume_create(self.context, self.db_attrs[3])
+
+ # Not of window.
+ db.volume_create(self.context, self.db_attrs[4])
+
+ volumes = db.volume_get_active_by_window(
+ self.context,
+ datetime.datetime(1, 3, 1, 1, 1, 1),
+ datetime.datetime(1, 4, 1, 1, 1, 1))
+ self.assertEqual(len(volumes), 3)
+ self.assertEqual(volumes[0].id, u'2')
+ self.assertEqual(volumes[1].id, u'3')
+ self.assertEqual(volumes[2].id, u'4')
+
+ def test_snapshot_get_active_by_window(self):
+ # Find all all snapshots valid within a timeframe window.
+ vol = db.volume_create(self.context, {'id': 1})
+ for i in range(5):
+ self.db_attrs[i]['volume_id'] = 1
+
+ # Not in window
+ db.snapshot_create(self.ctx, self.db_attrs[0])
+
+ # In - deleted in window
+ db.snapshot_create(self.ctx, self.db_attrs[1])
+
+ # In - deleted after window
+ db.snapshot_create(self.ctx, self.db_attrs[2])
+
+ # In - created in window
+ db.snapshot_create(self.context, self.db_attrs[3])
+ # Not of window.
+ db.snapshot_create(self.context, self.db_attrs[4])
+
+ snapshots = db.snapshot_get_active_by_window(
+ self.context,
+ datetime.datetime(1, 3, 1, 1, 1, 1),
+ datetime.datetime(1, 4, 1, 1, 1, 1))
+ self.assertEqual(len(snapshots), 3)
+ self.assertEqual(snapshots[0].id, u'2')
+ self.assertEqual(snapshots[1].id, u'3')
+ self.assertEqual(snapshots[2].id, u'4')
+
+
class DriverTestCase(test.TestCase):
"""Base Test class for Drivers."""
driver_name = "cinder.volume.driver.FakeBaseDriver"