]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add tests for consistency groups DB migration
authorXing Yang <xing.yang@emc.com>
Thu, 18 Sep 2014 20:50:58 +0000 (16:50 -0400)
committerXing Yang <xing.yang@emc.com>
Thu, 18 Sep 2014 21:18:52 +0000 (17:18 -0400)
The consistency group patch added 2 new DB migration versions:
025 and 026. However, there were no unit tests to support them in
cinder/tests/test_migrations.py. This patch added missing tests.

Change-Id: I670825980fddd4b6dc400e5a17e1003ad47ace1a
Closes-Bug: #1370756

cinder/tests/test_migrations.py

index 0068186a65deff7688192a7184fa04aa27d72e4d..8ed94e4873451ff580fbe7e29f78f91f9e17ed23 100644 (file)
@@ -1126,3 +1126,156 @@ class TestMigrations(test.TestCase):
             self.assertNotIn('replication_status', volumes.c)
             self.assertNotIn('replication_extended_status', volumes.c)
             self.assertNotIn('replication_driver_data', volumes.c)
+
+    def test_migration_025(self):
+        """Test adding table and columns for consistencygroups."""
+        for (key, engine) in self.engines.items():
+            migration_api.version_control(engine,
+                                          TestMigrations.REPOSITORY,
+                                          migration.db_initial_version())
+            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 24)
+            metadata = sqlalchemy.schema.MetaData()
+            metadata.bind = engine
+
+            # Upgrade
+            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 25)
+
+            # Test consistencygroup_id is in Table volumes
+            volumes = sqlalchemy.Table('volumes',
+                                       metadata,
+                                       autoload=True)
+            self.assertIsInstance(volumes.c.consistencygroup_id.type,
+                                  sqlalchemy.types.VARCHAR)
+
+            # Test cgsnapshot_id is in Table snapshots
+            snapshots = sqlalchemy.Table('snapshots',
+                                         metadata,
+                                         autoload=True)
+            self.assertIsInstance(snapshots.c.cgsnapshot_id.type,
+                                  sqlalchemy.types.VARCHAR)
+
+            # Test Table consistencygroups exists
+            self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                     "consistencygroups"))
+            consistencygroups = sqlalchemy.Table('consistencygroups',
+                                                 metadata,
+                                                 autoload=True)
+
+            self.assertIsInstance(consistencygroups.c.created_at.type,
+                                  self.time_type[engine.name])
+            self.assertIsInstance(consistencygroups.c.updated_at.type,
+                                  self.time_type[engine.name])
+            self.assertIsInstance(consistencygroups.c.deleted_at.type,
+                                  self.time_type[engine.name])
+            self.assertIsInstance(consistencygroups.c.deleted.type,
+                                  self.bool_type[engine.name])
+            self.assertIsInstance(consistencygroups.c.id.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(consistencygroups.c.user_id.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(consistencygroups.c.project_id.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(consistencygroups.c.host.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(consistencygroups.c.availability_zone.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(consistencygroups.c.name.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(consistencygroups.c.description.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(consistencygroups.c.volume_type_id.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(consistencygroups.c.status.type,
+                                  sqlalchemy.types.VARCHAR)
+
+            # Test Table cgsnapshots exists
+            self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                     "cgsnapshots"))
+            cgsnapshots = sqlalchemy.Table('cgsnapshots',
+                                           metadata,
+                                           autoload=True)
+
+            self.assertIsInstance(cgsnapshots.c.created_at.type,
+                                  self.time_type[engine.name])
+            self.assertIsInstance(cgsnapshots.c.updated_at.type,
+                                  self.time_type[engine.name])
+            self.assertIsInstance(cgsnapshots.c.deleted_at.type,
+                                  self.time_type[engine.name])
+            self.assertIsInstance(cgsnapshots.c.deleted.type,
+                                  self.bool_type[engine.name])
+            self.assertIsInstance(cgsnapshots.c.id.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(cgsnapshots.c.user_id.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(cgsnapshots.c.project_id.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(cgsnapshots.c.consistencygroup_id.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(cgsnapshots.c.name.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(cgsnapshots.c.description.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(cgsnapshots.c.status.type,
+                                  sqlalchemy.types.VARCHAR)
+
+            # Downgrade
+            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 24)
+            metadata = sqlalchemy.schema.MetaData()
+            metadata.bind = engine
+
+            # Test consistencygroup_id is not in Table volumes
+            volumes = sqlalchemy.Table('volumes',
+                                       metadata,
+                                       autoload=True)
+            self.assertNotIn('consistencygroup_id', volumes.c)
+
+            # Test cgsnapshot_id is not in Table snapshots
+            snapshots = sqlalchemy.Table('snapshots',
+                                         metadata,
+                                         autoload=True)
+            self.assertNotIn('cgsnapshot_id', snapshots.c)
+
+            # Test Table cgsnapshots doesn't exist any more
+            self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                      "cgsnapshots"))
+
+            # Test Table consistencygroups doesn't exist any more
+            self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                      "consistencygroups"))
+
+    def test_migration_026(self):
+        """Test adding default data for consistencygroups quota class."""
+        for (key, engine) in self.engines.items():
+            migration_api.version_control(engine,
+                                          TestMigrations.REPOSITORY,
+                                          migration.db_initial_version())
+            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 25)
+            metadata = sqlalchemy.schema.MetaData()
+            metadata.bind = engine
+
+            quota_class_metadata = sqlalchemy.Table('quota_classes',
+                                                    metadata,
+                                                    autoload=True)
+
+            num_defaults = quota_class_metadata.count().\
+                where(quota_class_metadata.c.class_name == 'default').\
+                execute().scalar()
+
+            self.assertEqual(3, num_defaults)
+
+            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 26)
+
+            num_defaults = quota_class_metadata.count().\
+                where(quota_class_metadata.c.class_name == 'default').\
+                execute().scalar()
+
+            self.assertEqual(4, num_defaults)
+
+            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 25)
+
+            # Defaults should not be deleted during downgrade
+            num_defaults = quota_class_metadata.count().\
+                where(quota_class_metadata.c.class_name == 'default').\
+                execute().scalar()
+
+            self.assertEqual(4, num_defaults)