self.assertNotIn('replication_status', volumes.c)
self.assertNotIn('replication_extended_status', volumes.c)
self.assertNotIn('replication_driver_data', volumes.c)
+
+ def test_migration_025(self):
+ """Test adding table and columns for consistencygroups."""
+ for (key, engine) in self.engines.items():
+ migration_api.version_control(engine,
+ TestMigrations.REPOSITORY,
+ migration.db_initial_version())
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 24)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ # Upgrade
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 25)
+
+ # Test consistencygroup_id is in Table volumes
+ volumes = sqlalchemy.Table('volumes',
+ metadata,
+ autoload=True)
+ self.assertIsInstance(volumes.c.consistencygroup_id.type,
+ sqlalchemy.types.VARCHAR)
+
+ # Test cgsnapshot_id is in Table snapshots
+ snapshots = sqlalchemy.Table('snapshots',
+ metadata,
+ autoload=True)
+ self.assertIsInstance(snapshots.c.cgsnapshot_id.type,
+ sqlalchemy.types.VARCHAR)
+
+ # Test Table consistencygroups exists
+ self.assertTrue(engine.dialect.has_table(engine.connect(),
+ "consistencygroups"))
+ consistencygroups = sqlalchemy.Table('consistencygroups',
+ metadata,
+ autoload=True)
+
+ self.assertIsInstance(consistencygroups.c.created_at.type,
+ self.time_type[engine.name])
+ self.assertIsInstance(consistencygroups.c.updated_at.type,
+ self.time_type[engine.name])
+ self.assertIsInstance(consistencygroups.c.deleted_at.type,
+ self.time_type[engine.name])
+ self.assertIsInstance(consistencygroups.c.deleted.type,
+ self.bool_type[engine.name])
+ self.assertIsInstance(consistencygroups.c.id.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(consistencygroups.c.user_id.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(consistencygroups.c.project_id.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(consistencygroups.c.host.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(consistencygroups.c.availability_zone.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(consistencygroups.c.name.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(consistencygroups.c.description.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(consistencygroups.c.volume_type_id.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(consistencygroups.c.status.type,
+ sqlalchemy.types.VARCHAR)
+
+ # Test Table cgsnapshots exists
+ self.assertTrue(engine.dialect.has_table(engine.connect(),
+ "cgsnapshots"))
+ cgsnapshots = sqlalchemy.Table('cgsnapshots',
+ metadata,
+ autoload=True)
+
+ self.assertIsInstance(cgsnapshots.c.created_at.type,
+ self.time_type[engine.name])
+ self.assertIsInstance(cgsnapshots.c.updated_at.type,
+ self.time_type[engine.name])
+ self.assertIsInstance(cgsnapshots.c.deleted_at.type,
+ self.time_type[engine.name])
+ self.assertIsInstance(cgsnapshots.c.deleted.type,
+ self.bool_type[engine.name])
+ self.assertIsInstance(cgsnapshots.c.id.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(cgsnapshots.c.user_id.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(cgsnapshots.c.project_id.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(cgsnapshots.c.consistencygroup_id.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(cgsnapshots.c.name.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(cgsnapshots.c.description.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(cgsnapshots.c.status.type,
+ sqlalchemy.types.VARCHAR)
+
+ # Downgrade
+ migration_api.downgrade(engine, TestMigrations.REPOSITORY, 24)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ # Test consistencygroup_id is not in Table volumes
+ volumes = sqlalchemy.Table('volumes',
+ metadata,
+ autoload=True)
+ self.assertNotIn('consistencygroup_id', volumes.c)
+
+ # Test cgsnapshot_id is not in Table snapshots
+ snapshots = sqlalchemy.Table('snapshots',
+ metadata,
+ autoload=True)
+ self.assertNotIn('cgsnapshot_id', snapshots.c)
+
+ # Test Table cgsnapshots doesn't exist any more
+ self.assertFalse(engine.dialect.has_table(engine.connect(),
+ "cgsnapshots"))
+
+ # Test Table consistencygroups doesn't exist any more
+ self.assertFalse(engine.dialect.has_table(engine.connect(),
+ "consistencygroups"))
+
+ def test_migration_026(self):
+ """Test adding default data for consistencygroups quota class."""
+ for (key, engine) in self.engines.items():
+ migration_api.version_control(engine,
+ TestMigrations.REPOSITORY,
+ migration.db_initial_version())
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 25)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ quota_class_metadata = sqlalchemy.Table('quota_classes',
+ metadata,
+ autoload=True)
+
+ num_defaults = quota_class_metadata.count().\
+ where(quota_class_metadata.c.class_name == 'default').\
+ execute().scalar()
+
+ self.assertEqual(3, num_defaults)
+
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 26)
+
+ num_defaults = quota_class_metadata.count().\
+ where(quota_class_metadata.c.class_name == 'default').\
+ execute().scalar()
+
+ self.assertEqual(4, num_defaults)
+
+ migration_api.downgrade(engine, TestMigrations.REPOSITORY, 25)
+
+ # Defaults should not be deleted during downgrade
+ num_defaults = quota_class_metadata.count().\
+ where(quota_class_metadata.c.class_name == 'default').\
+ execute().scalar()
+
+ self.assertEqual(4, num_defaults)