for version in xrange(migration.INIT_VERSION + 2,
TestMigrations.REPOSITORY.latest + 1):
# upgrade -> downgrade -> upgrade
- self._migrate_up(engine, version)
+ self._migrate_up(engine, version, with_data=True)
if snake_walk:
self._migrate_down(engine, version - 1)
self._migrate_up(engine, version)
migration_api.db_version(engine,
TestMigrations.REPOSITORY))
- def _migrate_up(self, engine, version):
- migration_api.upgrade(engine,
- TestMigrations.REPOSITORY,
- version)
- self.assertEqual(version,
- migration_api.db_version(engine,
- TestMigrations.REPOSITORY))
+ def _migrate_up(self, engine, version, with_data=False):
+ """migrate up to a new version of the db.
+
+ We allow for data insertion and post checks at every
+ migration version with special _prerun_### and
+ _check_### functions in the main test.
+ """
+ # NOTE(sdague): try block is here because it's impossible to debug
+ # where a failed data migration happens otherwise
+ try:
+ if with_data:
+ data = None
+ prerun = getattr(self, "_prerun_%d" % version, None)
+ if prerun:
+ data = prerun(engine)
+
+ migration_api.upgrade(engine,
+ TestMigrations.REPOSITORY,
+ version)
+ self.assertEqual(
+ version,
+ migration_api.db_version(engine,
+ TestMigrations.REPOSITORY))
+
+ if with_data:
+ check = getattr(self, "_check_%d" % version, None)
+ if check:
+ check(engine, data)
+ except Exception:
+ LOG.error("Failed to migrate to version %s on engine %s" %
+ (version, engine))
+ raise
def test_migration_004(self):
"""Test that volume_type_id migration works correctly."""