]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Refactor cinder/tests/test_volume.py
authorJulia Varlamova <jvarlamova@mirantis.com>
Mon, 12 Aug 2013 08:49:25 +0000 (12:49 +0400)
committerJulia Varlamova <jvarlamova@mirantis.com>
Thu, 22 Aug 2013 07:31:24 +0000 (11:31 +0400)
Refactor test_volume.py to remove code duplication.
- remove iterant definition of the 'expected' dict from
'test_create_delete_volume' and 'test_create_delete_snapshot'
method;
- move 'test_copy_volume_to_image_status_available',
'test_copy_volume_to_image_status_use' and
'test_copy_volume_to_image_exception' methods to new class
 CopyVolumeToImageTestCase;
- move 'test_volume_get_active_by_window' and
'test_snapshot_get_active_by_window' to new class
GetActiveByWindowTestCase;
- create base class BaseVolumeTestCase for TestCase classes.
bp cinder-tests-improvement

Change-Id: I5d3644375d321e613418ea8e544557d33401ff77

cinder/tests/test_volume.py

index d7b3665e1dfb03092d427779245962f6e4172984..fd465b4958b669050bcf535e3759c2e02541ed27 100644 (file)
@@ -51,6 +51,7 @@ from cinder.tests import conf_fixture
 from cinder.tests.image import fake as fake_image
 from cinder.tests.keymgr import fake as fake_keymgr
 from cinder.tests import utils as tests_utils
+from cinder import units
 from cinder import utils
 import cinder.volume
 from cinder.volume import configuration as conf
@@ -69,11 +70,20 @@ fake_opt = [
 ]
 
 
-class VolumeTestCase(test.TestCase):
-    """Test Case for volumes."""
+class FakeImageService:
+    def __init__(self, db_driver=None, image_service=None):
+        pass
+
+    def show(self, context, image_id):
+        return {'size': 2 * units.GiB,
+                'disk_format': 'raw',
+                'container_format': 'bare'}
+
 
+class BaseVolumeTestCase(test.TestCase):
+    """Test Case for volumes."""
     def setUp(self):
-        super(VolumeTestCase, self).setUp()
+        super(BaseVolumeTestCase, self).setUp()
         vol_tmpdir = tempfile.mkdtemp()
         self.flags(connection_type='fake',
                    volumes_dir=vol_tmpdir,
@@ -94,7 +104,7 @@ class VolumeTestCase(test.TestCase):
         except OSError:
             pass
         notifier_api._reset_drivers()
-        super(VolumeTestCase, self).tearDown()
+        super(BaseVolumeTestCase, self).tearDown()
 
     def fake_get_target(obj, iqn):
         return 1
@@ -127,6 +137,8 @@ class VolumeTestCase(test.TestCase):
             vol['metadata'] = metadata
         return db.volume_create(context.get_admin_context(), vol)
 
+
+class VolumeTestCase(BaseVolumeTestCase):
     def test_init_host_clears_downloads(self):
         """Test that init_host will unwedge a volume stuck in downloading."""
         volume = self._create_volume(status='downloading')
@@ -176,19 +188,7 @@ class VolumeTestCase(test.TestCase):
         self.assertDictMatch(msg['payload'], expected)
         msg = test_notifier.NOTIFICATIONS[1]
         self.assertEqual(msg['event_type'], 'volume.create.end')
-        expected = {
-            'status': 'available',
-            'display_name': None,
-            'availability_zone': 'nova',
-            'tenant_id': 'fake',
-            'created_at': 'DONTCARE',
-            'volume_id': volume_id,
-            'volume_type': None,
-            'snapshot_id': None,
-            'user_id': 'fake',
-            'launched_at': 'DONTCARE',
-            'size': 0,
-        }
+        expected['status'] = 'available'
         self.assertDictMatch(msg['payload'], expected)
         self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
                          volume_id).id)
@@ -200,35 +200,9 @@ class VolumeTestCase(test.TestCase):
         self.assertEquals(len(test_notifier.NOTIFICATIONS), 4)
         msg = test_notifier.NOTIFICATIONS[2]
         self.assertEqual(msg['event_type'], 'volume.delete.start')
-        expected = {
-            'status': 'available',
-            'display_name': None,
-            'availability_zone': 'nova',
-            'tenant_id': 'fake',
-            'created_at': 'DONTCARE',
-            'volume_id': volume_id,
-            'volume_type': None,
-            'snapshot_id': None,
-            'user_id': 'fake',
-            'launched_at': 'DONTCARE',
-            'size': 0,
-        }
         self.assertDictMatch(msg['payload'], expected)
         msg = test_notifier.NOTIFICATIONS[3]
         self.assertEqual(msg['event_type'], 'volume.delete.end')
-        expected = {
-            'status': 'available',
-            'display_name': None,
-            'availability_zone': 'nova',
-            'tenant_id': 'fake',
-            'created_at': 'DONTCARE',
-            'volume_id': volume_id,
-            'volume_type': None,
-            'snapshot_id': None,
-            'user_id': 'fake',
-            'launched_at': 'DONTCARE',
-            'size': 0,
-        }
         self.assertDictMatch(msg['payload'], expected)
         self.assertRaises(exception.NotFound,
                           db.volume_get,
@@ -661,51 +635,16 @@ class VolumeTestCase(test.TestCase):
         self.assertDictMatch(msg['payload'], expected)
         msg = test_notifier.NOTIFICATIONS[3]
         self.assertEquals(msg['event_type'], 'snapshot.create.end')
-        expected = {
-            'created_at': 'DONTCARE',
-            'deleted': '',
-            'display_name': None,
-            'snapshot_id': snapshot_id,
-            'status': 'creating',
-            'tenant_id': 'fake',
-            'user_id': 'fake',
-            'volume_id': volume['id'],
-            'volume_size': 0,
-            'availability_zone': 'nova'
-        }
         self.assertDictMatch(msg['payload'], expected)
 
         self.volume.delete_snapshot(self.context, snapshot_id)
         self.assertEquals(len(test_notifier.NOTIFICATIONS), 6)
         msg = test_notifier.NOTIFICATIONS[4]
         self.assertEquals(msg['event_type'], 'snapshot.delete.start')
-        expected = {
-            'created_at': 'DONTCARE',
-            'deleted': '',
-            'display_name': None,
-            'snapshot_id': snapshot_id,
-            'status': 'available',
-            'tenant_id': 'fake',
-            'user_id': 'fake',
-            'volume_id': volume['id'],
-            'volume_size': 0,
-            'availability_zone': 'nova'
-        }
+        expected['status'] = 'available'
         self.assertDictMatch(msg['payload'], expected)
         msg = test_notifier.NOTIFICATIONS[5]
         self.assertEquals(msg['event_type'], 'snapshot.delete.end')
-        expected = {
-            'created_at': 'DONTCARE',
-            'deleted': '',
-            'display_name': None,
-            'snapshot_id': snapshot_id,
-            'status': 'available',
-            'tenant_id': 'fake',
-            'user_id': 'fake',
-            'volume_id': volume['id'],
-            'volume_size': 0,
-            'availability_zone': 'nova'
-        }
         self.assertDictMatch(msg['payload'], expected)
 
         snap = db.snapshot_get(context.get_admin_context(read_deleted='yes'),
@@ -992,137 +931,14 @@ class VolumeTestCase(test.TestCase):
         db.volume_destroy(self.context, volume_id)
         os.unlink(dst_path)
 
-    def test_copy_volume_to_image_status_available(self):
-        dst_fd, dst_path = tempfile.mkstemp()
-        os.close(dst_fd)
-
-        def fake_local_path(volume):
-            return dst_path
-
-        self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
-
-        image_meta = {
-            'id': '70a599e0-31e7-49b7-b260-868f441e862b',
-            'container_format': 'bare',
-            'disk_format': 'raw'}
-
-        # creating volume testdata
-        volume_id = 1
-        db.volume_create(self.context,
-                         {'id': volume_id,
-                          'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-                          'display_description': 'Test Desc',
-                          'size': 20,
-                          'status': 'uploading',
-                          'instance_uuid': None,
-                          'host': 'dummy'})
-
-        try:
-            # start test
-            self.volume.copy_volume_to_image(self.context,
-                                             volume_id,
-                                             image_meta)
-
-            volume = db.volume_get(self.context, volume_id)
-            self.assertEqual(volume['status'], 'available')
-        finally:
-            # cleanup
-            db.volume_destroy(self.context, volume_id)
-            os.unlink(dst_path)
-
-    def test_copy_volume_to_image_status_use(self):
-        dst_fd, dst_path = tempfile.mkstemp()
-        os.close(dst_fd)
-
-        def fake_local_path(volume):
-            return dst_path
-
-        self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
-
-        image_meta = {
-            'id': 'a440c04b-79fa-479c-bed1-0b816eaec379',
-            'container_format': 'bare',
-            'disk_format': 'raw'}
-        # creating volume testdata
-        volume_id = 1
-        db.volume_create(
-            self.context,
-            {'id': volume_id,
-             'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-             'display_description': 'Test Desc',
-             'size': 20,
-             'status': 'uploading',
-             'instance_uuid': 'b21f957d-a72f-4b93-b5a5-45b1161abb02',
-             'host': 'dummy'})
-
-        try:
-            # start test
-            self.volume.copy_volume_to_image(self.context,
-                                             volume_id,
-                                             image_meta)
-
-            volume = db.volume_get(self.context, volume_id)
-            self.assertEqual(volume['status'], 'in-use')
-        finally:
-            # cleanup
-            db.volume_destroy(self.context, volume_id)
-            os.unlink(dst_path)
-
-    def test_copy_volume_to_image_exception(self):
-        dst_fd, dst_path = tempfile.mkstemp()
-        os.close(dst_fd)
-
-        def fake_local_path(volume):
-            return dst_path
-
-        self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
-
-        image_meta = {
-            'id': 'aaaaaaaa-0000-0000-0000-000000000000',
-            'container_format': 'bare',
-            'disk_format': 'raw'}
-        # creating volume testdata
-        volume_id = 1
-        db.volume_create(self.context,
-                         {'id': volume_id,
-                          'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-                          'display_description': 'Test Desc',
-                          'size': 20,
-                          'status': 'in-use',
-                          'host': 'dummy'})
-
-        try:
-            # start test
-            self.assertRaises(exception.ImageNotFound,
-                              self.volume.copy_volume_to_image,
-                              self.context,
-                              volume_id,
-                              image_meta)
-
-            volume = db.volume_get(self.context, volume_id)
-            self.assertEqual(volume['status'], 'available')
-        finally:
-            # cleanup
-            db.volume_destroy(self.context, volume_id)
-            os.unlink(dst_path)
-
     def test_create_volume_from_exact_sized_image(self):
         """Verify that an image which is exactly the same size as the
         volume, will work correctly.
         """
-        class _FakeImageService:
-            def __init__(self, db_driver=None, image_service=None):
-                pass
-
-            def show(self, context, image_id):
-                return {'size': 2 * 1024 * 1024 * 1024,
-                        'disk_format': 'raw',
-                        'container_format': 'bare'}
-
         try:
             volume_id = None
             volume_api = cinder.volume.api.API(
-                image_service=_FakeImageService())
+                image_service=FakeImageService())
             volume = volume_api.create(self.context, 2, 'name', 'description',
                                        image_id=1)
             volume_id = volume['id']
@@ -1134,16 +950,14 @@ class VolumeTestCase(test.TestCase):
 
     def test_create_volume_from_oversized_image(self):
         """Verify that an image which is too big will fail correctly."""
-        class _FakeImageService:
-            def __init__(self, db_driver=None, image_service=None):
-                pass
-
+        class _ModifiedFakeImageService(FakeImageService):
             def show(self, context, image_id):
-                return {'size': 2 * 1024 * 1024 * 1024 + 1,
+                return {'size': 2 * units.GiB + 1,
                         'disk_format': 'raw',
                         'container_format': 'bare'}
 
-        volume_api = cinder.volume.api.API(image_service=_FakeImageService())
+        volume_api = cinder.volume.api.API(image_service=
+                                           _ModifiedFakeImageService())
 
         self.assertRaises(exception.InvalidInput,
                           volume_api.create,
@@ -1152,17 +966,15 @@ class VolumeTestCase(test.TestCase):
 
     def test_create_volume_with_mindisk_error(self):
         """Verify volumes smaller than image minDisk will cause an error."""
-        class _FakeImageService:
-            def __init__(self, db_driver=None, image_service=None):
-                pass
-
+        class _ModifiedFakeImageService(FakeImageService):
             def show(self, context, image_id):
-                return {'size': 2 * 1024 * 1024 * 1024,
+                return {'size': 2 * units.GiB,
                         'disk_format': 'raw',
                         'container_format': 'bare',
                         'min_disk': 5}
 
-        volume_api = cinder.volume.api.API(image_service=_FakeImageService())
+        volume_api = cinder.volume.api.API(image_service=
+                                           _ModifiedFakeImageService())
 
         self.assertRaises(exception.InvalidInput,
                           volume_api.create,
@@ -1257,150 +1069,6 @@ class VolumeTestCase(test.TestCase):
         snap = db.snapshot_get(context.get_admin_context(), snapshot['id'])
         self.assertEquals(snap['display_name'], 'test update name')
 
-    def test_volume_get_active_by_window(self):
-        ctx = context.get_admin_context(read_deleted="yes")
-
-        # Find all all volumes valid within a timeframe window.
-
-        # Not in window
-        db.volume_create(
-            ctx,
-            {
-                'id': 1,
-                'host': 'devstack',
-                'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-                'deleted': True, 'status': 'deleted',
-                'deleted_at': datetime.datetime(1, 2, 1, 1, 1, 1),
-            }
-        )
-
-        # In - deleted in window
-        db.volume_create(
-            ctx,
-            {
-                'id': 2,
-                'host': 'devstack',
-                'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-                'deleted': True, 'status': 'deleted',
-                'deleted_at': datetime.datetime(1, 3, 10, 1, 1, 1),
-            }
-        )
-
-        # In - deleted after window
-        db.volume_create(
-            ctx,
-            {
-                'id': 3,
-                'host': 'devstack',
-                'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-                'deleted': True, 'status': 'deleted',
-                'deleted_at': datetime.datetime(1, 5, 1, 1, 1, 1),
-            }
-        )
-
-        # In - created in window
-        db.volume_create(
-            self.context,
-            {
-                'id': 4,
-                'host': 'devstack',
-                'created_at': datetime.datetime(1, 3, 10, 1, 1, 1),
-            }
-        )
-
-        # Not of window.
-        db.volume_create(
-            self.context,
-            {
-                'id': 5,
-                'host': 'devstack',
-                'created_at': datetime.datetime(1, 5, 1, 1, 1, 1),
-            }
-        )
-
-        volumes = db.volume_get_active_by_window(
-            self.context,
-            datetime.datetime(1, 3, 1, 1, 1, 1),
-            datetime.datetime(1, 4, 1, 1, 1, 1))
-        self.assertEqual(len(volumes), 3)
-        self.assertEqual(volumes[0].id, u'2')
-        self.assertEqual(volumes[1].id, u'3')
-        self.assertEqual(volumes[2].id, u'4')
-
-    def test_snapshot_get_active_by_window(self):
-        ctx = context.get_admin_context(read_deleted="yes")
-
-        # Find all all snapshots valid within a timeframe window.
-        # Not in window
-        db.snapshot_create(
-            ctx,
-            {
-                'id': 1,
-                'host': 'devstack',
-                'volume_id': 1,
-                'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-                'deleted': True, 'status': 'deleted',
-                'deleted_at': datetime.datetime(1, 2, 1, 1, 1, 1),
-            }
-        )
-
-        # In - deleted in window
-        db.snapshot_create(
-            ctx,
-            {
-                'id': 2,
-                'host': 'devstack',
-                'volume_id': 1,
-                'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-                'deleted': True, 'status': 'deleted',
-                'deleted_at': datetime.datetime(1, 3, 10, 1, 1, 1),
-            }
-        )
-
-        # In - deleted after window
-        db.snapshot_create(
-            ctx,
-            {
-                'id': 3,
-                'host': 'devstack',
-                'volume_id': 1,
-                'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-                'deleted': True, 'status': 'deleted',
-                'deleted_at': datetime.datetime(1, 5, 1, 1, 1, 1),
-            }
-        )
-
-        # In - created in window
-        db.snapshot_create(
-            self.context,
-            {
-                'id': 4,
-                'host': 'devstack',
-                'volume_id': 1,
-                'created_at': datetime.datetime(1, 3, 10, 1, 1, 1),
-            }
-        )
-
-        # Not of window.
-        db.snapshot_create(
-            self.context,
-            {
-                'id': 5,
-                'host': 'devstack',
-                'volume_id': 1,
-                'created_at': datetime.datetime(1, 5, 1, 1, 1, 1),
-            }
-        )
-
-        snapshots = db.snapshot_get_active_by_window(
-            self.context,
-            datetime.datetime(1, 3, 1, 1, 1, 1),
-            datetime.datetime(1, 4, 1, 1, 1, 1))
-        self.assertEqual(len(snapshots), 3)
-        self.assertEqual(snapshots[0].id, u'2')
-        self.assertEqual(snapshots[1].id, u'3')
-        self.assertEqual(snapshots[2].id, u'4')
-
     def test_extend_volume(self):
         """Test volume can be extended at API level."""
         # create a volume and assign to host
@@ -1698,6 +1366,176 @@ class VolumeTestCase(test.TestCase):
         self.assertEquals(volume['name_id'], 'new_id')
 
 
+class CopyVolumeToImageTestCase(BaseVolumeTestCase):
+    def fake_local_path(self, volume):
+        return self.dst_path
+
+    def setUp(self):
+        super(CopyVolumeToImageTestCase, self).setUp()
+        self.dst_fd, self.dst_path = tempfile.mkstemp()
+        os.close(self.dst_fd)
+        self.stubs.Set(self.volume.driver, 'local_path', self.fake_local_path)
+        self.image_meta = {
+            'id': '70a599e0-31e7-49b7-b260-868f441e862b',
+            'container_format': 'bare',
+            'disk_format': 'raw'
+        }
+        self.volume_id = 1
+        self.volume_attrs = {
+            'id': self.volume_id,
+            'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
+            'display_description': 'Test Desc',
+            'size': 20,
+            'status': 'uploading',
+            'host': 'dummy'
+        }
+
+    def tearDown(self):
+        db.volume_destroy(self.context, self.volume_id)
+        os.unlink(self.dst_path)
+        super(CopyVolumeToImageTestCase, self).tearDown()
+
+    def test_copy_volume_to_image_status_available(self):
+        # creating volume testdata
+        self.volume_attrs['instance_uuid'] = None
+        db.volume_create(self.context, self.volume_attrs)
+
+        # start test
+        self.volume.copy_volume_to_image(self.context,
+                                         self.volume_id,
+                                         self.image_meta)
+
+        volume = db.volume_get(self.context, self.volume_id)
+        self.assertEqual(volume['status'], 'available')
+
+    def test_copy_volume_to_image_status_use(self):
+        self.image_meta['id'] = 'a440c04b-79fa-479c-bed1-0b816eaec379'
+        # creating volume testdata
+        self.volume_attrs['instance_uuid'] = 'b21f957d-a72f-4b93-b5a5-' \
+                                             '45b1161abb02'
+        db.volume_create(self.context, self.volume_attrs)
+
+        # start test
+        self.volume.copy_volume_to_image(self.context,
+                                         self.volume_id,
+                                         self.image_meta)
+
+        volume = db.volume_get(self.context, self.volume_id)
+        self.assertEqual(volume['status'], 'in-use')
+
+    def test_copy_volume_to_image_exception(self):
+        self.image_meta['id'] = 'aaaaaaaa-0000-0000-0000-000000000000'
+        # creating volume testdata
+        self.volume_attrs['status'] = 'in-use'
+        db.volume_create(self.context, self.volume_attrs)
+
+        # start test
+        self.assertRaises(exception.ImageNotFound,
+                          self.volume.copy_volume_to_image,
+                          self.context,
+                          self.volume_id,
+                          self.image_meta)
+
+        volume = db.volume_get(self.context, self.volume_id)
+        self.assertEqual(volume['status'], 'available')
+
+
+class GetActiveByWindowTestCase(BaseVolumeTestCase):
+    def setUp(self):
+        super(GetActiveByWindowTestCase, self).setUp()
+        self.ctx = context.get_admin_context(read_deleted="yes")
+        self.db_attrs = [
+            {
+                'id': 1,
+                'host': 'devstack',
+                'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
+                'deleted': True, 'status': 'deleted',
+                'deleted_at': datetime.datetime(1, 2, 1, 1, 1, 1),
+            },
+
+            {
+                'id': 2,
+                'host': 'devstack',
+                'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
+                'deleted': True, 'status': 'deleted',
+                'deleted_at': datetime.datetime(1, 3, 10, 1, 1, 1),
+            },
+            {
+                'id': 3,
+                'host': 'devstack',
+                'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
+                'deleted': True, 'status': 'deleted',
+                'deleted_at': datetime.datetime(1, 5, 1, 1, 1, 1),
+            },
+            {
+                'id': 4,
+                'host': 'devstack',
+                'created_at': datetime.datetime(1, 3, 10, 1, 1, 1),
+            },
+            {
+                'id': 5,
+                'host': 'devstack',
+                'created_at': datetime.datetime(1, 5, 1, 1, 1, 1),
+            }
+        ]
+
+    def test_volume_get_active_by_window(self):
+        # Find all all volumes valid within a timeframe window.
+
+        # Not in window
+        db.volume_create(self.ctx, self.db_attrs[0])
+
+        # In - deleted in window
+        db.volume_create(self.ctx, self.db_attrs[1])
+
+        # In - deleted after window
+        db.volume_create(self.ctx, self.db_attrs[2])
+
+        # In - created in window
+        db.volume_create(self.context, self.db_attrs[3])
+
+        # Not of window.
+        db.volume_create(self.context, self.db_attrs[4])
+
+        volumes = db.volume_get_active_by_window(
+            self.context,
+            datetime.datetime(1, 3, 1, 1, 1, 1),
+            datetime.datetime(1, 4, 1, 1, 1, 1))
+        self.assertEqual(len(volumes), 3)
+        self.assertEqual(volumes[0].id, u'2')
+        self.assertEqual(volumes[1].id, u'3')
+        self.assertEqual(volumes[2].id, u'4')
+
+    def test_snapshot_get_active_by_window(self):
+        # Find all all snapshots valid within a timeframe window.
+        vol = db.volume_create(self.context, {'id': 1})
+        for i in range(5):
+            self.db_attrs[i]['volume_id'] = 1
+
+        # Not in window
+        db.snapshot_create(self.ctx, self.db_attrs[0])
+
+        # In - deleted in window
+        db.snapshot_create(self.ctx, self.db_attrs[1])
+
+        # In - deleted after window
+        db.snapshot_create(self.ctx, self.db_attrs[2])
+
+        # In - created in window
+        db.snapshot_create(self.context, self.db_attrs[3])
+        # Not of window.
+        db.snapshot_create(self.context, self.db_attrs[4])
+
+        snapshots = db.snapshot_get_active_by_window(
+            self.context,
+            datetime.datetime(1, 3, 1, 1, 1, 1),
+            datetime.datetime(1, 4, 1, 1, 1, 1))
+        self.assertEqual(len(snapshots), 3)
+        self.assertEqual(snapshots[0].id, u'2')
+        self.assertEqual(snapshots[1].id, u'3')
+        self.assertEqual(snapshots[2].id, u'4')
+
+
 class DriverTestCase(test.TestCase):
     """Base Test class for Drivers."""
     driver_name = "cinder.volume.driver.FakeBaseDriver"