BOOL_TYPE = sqlalchemy.types.BOOLEAN
TIME_TYPE = sqlalchemy.types.DATETIME
+ INTEGER_TYPE = sqlalchemy.types.INTEGER
+ VARCHAR_TYPE = sqlalchemy.types.VARCHAR
@property
def INIT_VERSION(self):
"""Test that adding source_volid column works correctly."""
volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.source_volid.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_006(self, engine, data):
snapshots = db_utils.get_table(engine, 'snapshots')
self.assertIsInstance(snapshots.c.provider_location.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_007(self, engine, data):
snapshots = db_utils.get_table(engine, 'snapshots')
self.assertIsInstance(backups.c.deleted.type,
self.BOOL_TYPE)
self.assertIsInstance(backups.c.id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.volume_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.user_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.project_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.host.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.availability_zone.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.display_name.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.display_description.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.container.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.status.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.fail_reason.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.service_metadata.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.service.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.size.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
self.assertIsInstance(backups.c.object_count.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
def _check_009(self, engine, data):
"""Test adding snapshot_metadata table works correctly."""
self.assertIsInstance(snapshot_metadata.c.deleted.type,
self.BOOL_TYPE)
self.assertIsInstance(snapshot_metadata.c.id.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
self.assertIsInstance(snapshot_metadata.c.snapshot_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(snapshot_metadata.c.key.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(snapshot_metadata.c.value.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_010(self, engine, data):
"""Test adding transfers table works correctly."""
self.assertIsInstance(transfers.c.deleted.type,
self.BOOL_TYPE)
self.assertIsInstance(transfers.c.id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.volume_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.display_name.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.salt.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.crypt_hash.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.expires_at.type,
self.TIME_TYPE)
"""Test that adding attached_host column works correctly."""
volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.attached_host.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_013(self, engine, data):
"""Test that adding provider_geometry column works correctly."""
volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.provider_geometry.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_014(self, engine, data):
"""Test that adding _name_id column works correctly."""
volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c._name_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_015(self, engine, data):
"""Test removing migrations table works correctly."""
volumes = db_utils.get_table(engine, 'volumes')
self.assertIn('encryption_key_id', volumes.c)
self.assertIsInstance(volumes.c.encryption_key_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
snapshots = db_utils.get_table(engine, 'snapshots')
self.assertIn('encryption_key_id', snapshots.c)
self.assertIsInstance(snapshots.c.encryption_key_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIn('volume_type_id', snapshots.c)
self.assertIsInstance(snapshots.c.volume_type_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
# encryption types table
encryption = db_utils.get_table(engine, 'encryption')
self.assertIsInstance(encryption.c.volume_type_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(encryption.c.cipher.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(encryption.c.key_size.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
self.assertIsInstance(encryption.c.provider.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_018(self, engine, data):
"""Test that added qos_specs table works correctly."""
self.assertIsInstance(qos_specs.c.deleted.type,
self.BOOL_TYPE)
self.assertIsInstance(qos_specs.c.id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(qos_specs.c.specs_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(qos_specs.c.key.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(qos_specs.c.value.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_019(self, engine, data):
"""Test that adding migration_status column works correctly."""
volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.migration_status.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_020(self, engine, data):
"""Test adding volume_admin_metadata table works correctly."""
self.assertIsInstance(volume_admin_metadata.c.deleted.type,
self.BOOL_TYPE)
self.assertIsInstance(volume_admin_metadata.c.id.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
self.assertIsInstance(volume_admin_metadata.c.volume_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(volume_admin_metadata.c.key.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(volume_admin_metadata.c.value.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _verify_quota_defaults(self, engine):
quota_class_metadata = db_utils.get_table(engine, 'quota_classes')
"""Test that adding disabled_reason column works correctly."""
services = db_utils.get_table(engine, 'services')
self.assertIsInstance(services.c.disabled_reason.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_023(self, engine, data):
"""Test that adding reservations index works correctly."""
"""Test adding replication columns to volume table."""
volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.replication_status.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(volumes.c.replication_extended_status.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(volumes.c.replication_driver_data.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_025(self, engine, data):
"""Test adding table and columns for consistencygroups."""
metadata = sqlalchemy.MetaData()
volumes = self.get_table_ref(engine, 'volumes', metadata)
self.assertIsInstance(volumes.c.consistencygroup_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
# Test cgsnapshot_id is in Table snapshots
snapshots = self.get_table_ref(engine, 'snapshots', metadata)
self.assertIsInstance(snapshots.c.cgsnapshot_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
# Test Table consistencygroups exists
self.assertTrue(engine.dialect.has_table(engine.connect(),
self.assertIsInstance(consistencygroups.c.deleted.type,
self.BOOL_TYPE)
self.assertIsInstance(consistencygroups.c.id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.user_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.project_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.host.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.availability_zone.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.name.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.description.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.volume_type_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.status.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
# Test Table cgsnapshots exists
self.assertTrue(engine.dialect.has_table(engine.connect(),
self.assertIsInstance(cgsnapshots.c.deleted.type,
self.BOOL_TYPE)
self.assertIsInstance(cgsnapshots.c.id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.user_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.project_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.consistencygroup_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.name.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.description.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.status.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
# Verify foreign keys are created
fkey, = volumes.c.consistencygroup_id.foreign_keys
self.assertIsInstance(volume_type_projects.c.deleted.type,
self.BOOL_TYPE)
self.assertIsInstance(volume_type_projects.c.id.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
self.assertIsInstance(volume_type_projects.c.volume_type_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(volume_type_projects.c.project_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
volume_types = db_utils.get_table(engine, 'volume_types')
self.assertIsInstance(volume_types.c.is_public.type,
"""Test adding encryption_id column to encryption table."""
encryptions = db_utils.get_table(engine, 'encryption')
self.assertIsInstance(encryptions.c.encryption_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_034(self, engine, data):
"""Test adding description columns to volume_types table."""
volume_types = db_utils.get_table(engine, 'volume_types')
self.assertIsInstance(volume_types.c.description.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_035(self, engine, data):
volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.provider_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_036(self, engine, data):
snapshots = db_utils.get_table(engine, 'snapshots')
self.assertIsInstance(snapshots.c.provider_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_037(self, engine, data):
consistencygroups = db_utils.get_table(engine, 'consistencygroups')
self.assertIsInstance(consistencygroups.c.cgsnapshot_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_038(self, engine, data):
"""Test adding and removing driver_initiator_data table."""
self.assertIsInstance(private_data.c.updated_at.type,
self.TIME_TYPE)
self.assertIsInstance(private_data.c.id.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
self.assertIsInstance(private_data.c.initiator.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.namespace.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.key.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.value.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_039(self, engine, data):
backups = db_utils.get_table(engine, 'backups')
self.assertIsInstance(backups.c.parent_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_40(self, engine, data):
volumes = db_utils.get_table(engine, 'volumes')
attachments = db_utils.get_table(engine, 'volume_attachment')
self.assertIsInstance(attachments.c.attach_mode.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(attachments.c.instance_uuid.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(attachments.c.attached_host.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(attachments.c.mountpoint.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(attachments.c.attach_status.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_041(self, engine, data):
"""Test that adding modified_at column works correctly."""
def _check_048(self, engine, data):
quotas = db_utils.get_table(engine, 'quotas')
self.assertIsInstance(quotas.c.allocated.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
def _check_049(self, engine, data):
backups = db_utils.get_table(engine, 'backups')
self.assertIsInstance(backups.c.temp_volume_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.temp_snapshot_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_050(self, engine, data):
volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.previous_status.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_051(self, engine, data):
consistencygroups = db_utils.get_table(engine, 'consistencygroups')
self.assertIsInstance(consistencygroups.c.source_cgid.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_052(self, engine, data):
snapshots = db_utils.get_table(engine, 'snapshots')
self.assertIsInstance(snapshots.c.provider_auth.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_053(self, engine, data):
services = db_utils.get_table(engine, 'services')
self.assertIsInstance(services.c.rpc_current_version.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(services.c.rpc_available_version.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(services.c.object_current_version.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(services.c.object_available_version.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
def _check_054(self, engine, data):
backups = db_utils.get_table(engine, 'backups')
self.assertIsInstance(backups.c.num_dependent_backups.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
def _check_055(self, engine, data):
"""Test adding image_volume_cache_entries table."""
)
self.assertIsInstance(private_data.c.id.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
self.assertIsInstance(private_data.c.host.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.image_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.image_updated_at.type,
self.TIME_TYPE)
self.assertIsInstance(private_data.c.volume_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.size.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
self.assertIsInstance(private_data.c.last_used.type,
self.TIME_TYPE)
def _check_061(self, engine, data):
backups = db_utils.get_table(engine, 'backups')
self.assertIsInstance(backups.c.snapshot_id.type,
- sqlalchemy.types.VARCHAR)
+ self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.data_timestamp.type,
self.TIME_TYPE)
volume_type_projects = db_utils.get_table(engine,
'volume_type_projects')
self.assertIsInstance(volume_type_projects.c.id.type,
- sqlalchemy.types.INTEGER)
+ self.INTEGER_TYPE)
def test_walk_versions(self):
self.walk_versions(False, False)