]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Constant defined for sqlAlchemy VARCHAR & INTEGER
authorSheel Rana <ranasheel2000@gmail.com>
Wed, 6 Jan 2016 09:17:19 +0000 (14:47 +0530)
committerSheel Rana <ranasheel2000@gmail.com>
Wed, 6 Jan 2016 09:19:39 +0000 (14:49 +0530)
sqlalchemy.types.VARCHAR and sqlalchemy.types.INTEGER are defined
and used as VARCHAR_TYPE and INTEGER_TYPE respectively in
cinder/tests/unit/test_migrations.py.

Change-Id: I3ed83f270843e93d3d1f730d6eaf2320e8269743
Closes-Bug: #1528989

cinder/tests/unit/test_migrations.py

index 2d2122a39a9ca86d0d9b7dbcfd35492f1dbd4bd1..ac692a00bd1c194b7c217eb97d6362006e00eefc 100644 (file)
@@ -38,6 +38,8 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
 
     BOOL_TYPE = sqlalchemy.types.BOOLEAN
     TIME_TYPE = sqlalchemy.types.DATETIME
+    INTEGER_TYPE = sqlalchemy.types.INTEGER
+    VARCHAR_TYPE = sqlalchemy.types.VARCHAR
 
     @property
     def INIT_VERSION(self):
@@ -152,12 +154,12 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         """Test that adding source_volid column works correctly."""
         volumes = db_utils.get_table(engine, 'volumes')
         self.assertIsInstance(volumes.c.source_volid.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_006(self, engine, data):
         snapshots = db_utils.get_table(engine, 'snapshots')
         self.assertIsInstance(snapshots.c.provider_location.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_007(self, engine, data):
         snapshots = db_utils.get_table(engine, 'snapshots')
@@ -185,35 +187,35 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         self.assertIsInstance(backups.c.deleted.type,
                               self.BOOL_TYPE)
         self.assertIsInstance(backups.c.id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.volume_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.user_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.project_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.host.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.availability_zone.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.display_name.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.display_description.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.container.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.status.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.fail_reason.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.service_metadata.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.service.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.size.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
         self.assertIsInstance(backups.c.object_count.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
 
     def _check_009(self, engine, data):
         """Test adding snapshot_metadata table works correctly."""
@@ -232,13 +234,13 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         self.assertIsInstance(snapshot_metadata.c.deleted.type,
                               self.BOOL_TYPE)
         self.assertIsInstance(snapshot_metadata.c.id.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
         self.assertIsInstance(snapshot_metadata.c.snapshot_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(snapshot_metadata.c.key.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(snapshot_metadata.c.value.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_010(self, engine, data):
         """Test adding transfers table works correctly."""
@@ -255,15 +257,15 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         self.assertIsInstance(transfers.c.deleted.type,
                               self.BOOL_TYPE)
         self.assertIsInstance(transfers.c.id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(transfers.c.volume_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(transfers.c.display_name.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(transfers.c.salt.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(transfers.c.crypt_hash.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(transfers.c.expires_at.type,
                               self.TIME_TYPE)
 
@@ -278,19 +280,19 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         """Test that adding attached_host column works correctly."""
         volumes = db_utils.get_table(engine, 'volumes')
         self.assertIsInstance(volumes.c.attached_host.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_013(self, engine, data):
         """Test that adding provider_geometry column works correctly."""
         volumes = db_utils.get_table(engine, 'volumes')
         self.assertIsInstance(volumes.c.provider_geometry.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_014(self, engine, data):
         """Test that adding _name_id column works correctly."""
         volumes = db_utils.get_table(engine, 'volumes')
         self.assertIsInstance(volumes.c._name_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_015(self, engine, data):
         """Test removing migrations table works correctly."""
@@ -312,26 +314,26 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         volumes = db_utils.get_table(engine, 'volumes')
         self.assertIn('encryption_key_id', volumes.c)
         self.assertIsInstance(volumes.c.encryption_key_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
         snapshots = db_utils.get_table(engine, 'snapshots')
         self.assertIn('encryption_key_id', snapshots.c)
         self.assertIsInstance(snapshots.c.encryption_key_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIn('volume_type_id', snapshots.c)
         self.assertIsInstance(snapshots.c.volume_type_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
         # encryption types table
         encryption = db_utils.get_table(engine, 'encryption')
         self.assertIsInstance(encryption.c.volume_type_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(encryption.c.cipher.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(encryption.c.key_size.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
         self.assertIsInstance(encryption.c.provider.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_018(self, engine, data):
         """Test that added qos_specs table works correctly."""
@@ -347,19 +349,19 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         self.assertIsInstance(qos_specs.c.deleted.type,
                               self.BOOL_TYPE)
         self.assertIsInstance(qos_specs.c.id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(qos_specs.c.specs_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(qos_specs.c.key.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(qos_specs.c.value.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_019(self, engine, data):
         """Test that adding migration_status column works correctly."""
         volumes = db_utils.get_table(engine, 'volumes')
         self.assertIsInstance(volumes.c.migration_status.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_020(self, engine, data):
         """Test adding volume_admin_metadata table works correctly."""
@@ -377,13 +379,13 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         self.assertIsInstance(volume_admin_metadata.c.deleted.type,
                               self.BOOL_TYPE)
         self.assertIsInstance(volume_admin_metadata.c.id.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
         self.assertIsInstance(volume_admin_metadata.c.volume_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(volume_admin_metadata.c.key.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(volume_admin_metadata.c.value.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _verify_quota_defaults(self, engine):
         quota_class_metadata = db_utils.get_table(engine, 'quota_classes')
@@ -402,7 +404,7 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         """Test that adding disabled_reason column works correctly."""
         services = db_utils.get_table(engine, 'services')
         self.assertIsInstance(services.c.disabled_reason.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_023(self, engine, data):
         """Test that adding reservations index works correctly."""
@@ -420,11 +422,11 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         """Test adding replication columns to volume table."""
         volumes = db_utils.get_table(engine, 'volumes')
         self.assertIsInstance(volumes.c.replication_status.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(volumes.c.replication_extended_status.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(volumes.c.replication_driver_data.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_025(self, engine, data):
         """Test adding table and columns for consistencygroups."""
@@ -432,12 +434,12 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         metadata = sqlalchemy.MetaData()
         volumes = self.get_table_ref(engine, 'volumes', metadata)
         self.assertIsInstance(volumes.c.consistencygroup_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
         # Test cgsnapshot_id is in Table snapshots
         snapshots = self.get_table_ref(engine, 'snapshots', metadata)
         self.assertIsInstance(snapshots.c.cgsnapshot_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
         # Test Table consistencygroups exists
         self.assertTrue(engine.dialect.has_table(engine.connect(),
@@ -454,23 +456,23 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         self.assertIsInstance(consistencygroups.c.deleted.type,
                               self.BOOL_TYPE)
         self.assertIsInstance(consistencygroups.c.id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(consistencygroups.c.user_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(consistencygroups.c.project_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(consistencygroups.c.host.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(consistencygroups.c.availability_zone.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(consistencygroups.c.name.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(consistencygroups.c.description.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(consistencygroups.c.volume_type_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(consistencygroups.c.status.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
         # Test Table cgsnapshots exists
         self.assertTrue(engine.dialect.has_table(engine.connect(),
@@ -488,19 +490,19 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         self.assertIsInstance(cgsnapshots.c.deleted.type,
                               self.BOOL_TYPE)
         self.assertIsInstance(cgsnapshots.c.id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(cgsnapshots.c.user_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(cgsnapshots.c.project_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(cgsnapshots.c.consistencygroup_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(cgsnapshots.c.name.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(cgsnapshots.c.description.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(cgsnapshots.c.status.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
         # Verify foreign keys are created
         fkey, = volumes.c.consistencygroup_id.foreign_keys
@@ -545,11 +547,11 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         self.assertIsInstance(volume_type_projects.c.deleted.type,
                               self.BOOL_TYPE)
         self.assertIsInstance(volume_type_projects.c.id.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
         self.assertIsInstance(volume_type_projects.c.volume_type_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(volume_type_projects.c.project_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
         volume_types = db_utils.get_table(engine, 'volume_types')
         self.assertIsInstance(volume_types.c.is_public.type,
@@ -559,28 +561,28 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         """Test adding encryption_id column to encryption table."""
         encryptions = db_utils.get_table(engine, 'encryption')
         self.assertIsInstance(encryptions.c.encryption_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_034(self, engine, data):
         """Test adding description columns to volume_types table."""
         volume_types = db_utils.get_table(engine, 'volume_types')
         self.assertIsInstance(volume_types.c.description.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_035(self, engine, data):
         volumes = db_utils.get_table(engine, 'volumes')
         self.assertIsInstance(volumes.c.provider_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_036(self, engine, data):
         snapshots = db_utils.get_table(engine, 'snapshots')
         self.assertIsInstance(snapshots.c.provider_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_037(self, engine, data):
         consistencygroups = db_utils.get_table(engine, 'consistencygroups')
         self.assertIsInstance(consistencygroups.c.cgsnapshot_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_038(self, engine, data):
         """Test adding and removing driver_initiator_data table."""
@@ -599,20 +601,20 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         self.assertIsInstance(private_data.c.updated_at.type,
                               self.TIME_TYPE)
         self.assertIsInstance(private_data.c.id.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
         self.assertIsInstance(private_data.c.initiator.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(private_data.c.namespace.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(private_data.c.key.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(private_data.c.value.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_039(self, engine, data):
         backups = db_utils.get_table(engine, 'backups')
         self.assertIsInstance(backups.c.parent_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_40(self, engine, data):
         volumes = db_utils.get_table(engine, 'volumes')
@@ -625,15 +627,15 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
 
         attachments = db_utils.get_table(engine, 'volume_attachment')
         self.assertIsInstance(attachments.c.attach_mode.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(attachments.c.instance_uuid.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(attachments.c.attached_host.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(attachments.c.mountpoint.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(attachments.c.attach_status.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_041(self, engine, data):
         """Test that adding modified_at column works correctly."""
@@ -644,45 +646,45 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
     def _check_048(self, engine, data):
         quotas = db_utils.get_table(engine, 'quotas')
         self.assertIsInstance(quotas.c.allocated.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
 
     def _check_049(self, engine, data):
         backups = db_utils.get_table(engine, 'backups')
         self.assertIsInstance(backups.c.temp_volume_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.temp_snapshot_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_050(self, engine, data):
         volumes = db_utils.get_table(engine, 'volumes')
         self.assertIsInstance(volumes.c.previous_status.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_051(self, engine, data):
         consistencygroups = db_utils.get_table(engine, 'consistencygroups')
         self.assertIsInstance(consistencygroups.c.source_cgid.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_052(self, engine, data):
         snapshots = db_utils.get_table(engine, 'snapshots')
         self.assertIsInstance(snapshots.c.provider_auth.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_053(self, engine, data):
         services = db_utils.get_table(engine, 'services')
         self.assertIsInstance(services.c.rpc_current_version.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(services.c.rpc_available_version.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(services.c.object_current_version.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(services.c.object_available_version.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
 
     def _check_054(self, engine, data):
         backups = db_utils.get_table(engine, 'backups')
         self.assertIsInstance(backups.c.num_dependent_backups.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
 
     def _check_055(self, engine, data):
         """Test adding image_volume_cache_entries table."""
@@ -696,24 +698,24 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         )
 
         self.assertIsInstance(private_data.c.id.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
         self.assertIsInstance(private_data.c.host.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(private_data.c.image_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(private_data.c.image_updated_at.type,
                               self.TIME_TYPE)
         self.assertIsInstance(private_data.c.volume_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(private_data.c.size.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
         self.assertIsInstance(private_data.c.last_used.type,
                               self.TIME_TYPE)
 
     def _check_061(self, engine, data):
         backups = db_utils.get_table(engine, 'backups')
         self.assertIsInstance(backups.c.snapshot_id.type,
-                              sqlalchemy.types.VARCHAR)
+                              self.VARCHAR_TYPE)
         self.assertIsInstance(backups.c.data_timestamp.type,
                               self.TIME_TYPE)
 
@@ -721,7 +723,7 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
         volume_type_projects = db_utils.get_table(engine,
                                                   'volume_type_projects')
         self.assertIsInstance(volume_type_projects.c.id.type,
-                              sqlalchemy.types.INTEGER)
+                              self.INTEGER_TYPE)
 
     def test_walk_versions(self):
         self.walk_versions(False, False)