]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
DB migration tests
authorIvan Kolodyazhny <e0ne@e0ne.info>
Wed, 29 Oct 2014 13:52:54 +0000 (15:52 +0200)
committerIvan Kolodyazhny <ikolodyazhny@mirantis.com>
Mon, 15 Dec 2014 10:56:56 +0000 (12:56 +0200)
Refactored migration tests to use OpportunisticTestCase, removed
unused code and ``test_migrations.conf`` file.

The main feature of this approach is to create a new database with
random name for each migration test.  This will avoid migration tests of
race conditions and reduce tests intersection. After this change, database
``openstack_citest`` will be used only for initial connection to the database.

``test_migrations.conf`` file not required anymore, because we create test
database for migration test, so we no longer need to keep database credentials.

Implements blueprint: db-migration-tests
Related-bug: #1266595
Change-Id: I4febd485ff53936b636947c86773a23724e24c65

cinder/cmd/manage.py
cinder/db/migration.py
cinder/db/sqlalchemy/migrate_repo/versions/001_cinder_init.py
cinder/db/sqlalchemy/migrate_repo/versions/011_add_bootable_column.py
cinder/db/sqlalchemy/migration.py [deleted file]
cinder/test.py
cinder/tests/test_cmd.py
cinder/tests/test_migrations.conf [deleted file]
cinder/tests/test_migrations.py
setup.cfg
test-requirements.txt

index dcc1ec57b568a97ba775cda167ca1b75388b1aa5..05b491746a260a5a8a0f31a5552883c612fd9bf5 100755 (executable)
@@ -62,6 +62,7 @@ import warnings
 warnings.simplefilter('once', DeprecationWarning)
 
 from oslo.config import cfg
+from oslo.db.sqlalchemy import migration
 from oslo import messaging
 
 from cinder import i18n
@@ -71,7 +72,8 @@ i18n.enable_lazy()
 from cinder.common import config  # noqa
 from cinder import context
 from cinder import db
-from cinder.db import migration
+from cinder.db import migration as db_migration
+from cinder.db.sqlalchemy import api as db_api
 from cinder.i18n import _
 from cinder.openstack.common import log as logging
 from cinder.openstack.common import uuidutils
@@ -218,11 +220,13 @@ class DbCommands(object):
           help='Database version')
     def sync(self, version=None):
         """Sync the database up to the most recent version."""
-        return migration.db_sync(version)
+        return db_migration.db_sync(version)
 
     def version(self):
         """Print the current database version."""
-        print(migration.db_version())
+        print(migration.db_version(db_api.get_engine(),
+                                   db_migration.MIGRATE_REPO_PATH,
+                                   db_migration.INIT_VERSION))
 
 
 class VersionCommands(object):
index 041aeb7eb97a46c22fa58d671b3fd8d7e6efdfde..29f5b7ec9c82c0869b4944985191d49e518a9830 100644 (file)
 
 """Database setup and migration commands."""
 
-from cinder import utils
+import os
+import threading
 
+from oslo.config import cfg
+from oslo import db
+from stevedore import driver
 
-IMPL = utils.LazyPluggable('db_backend',
-                           sqlalchemy='cinder.db.sqlalchemy.migration')
+from cinder.db.sqlalchemy import api as db_api
 
+INIT_VERSION = 000
 
-def db_sync(version=None):
-    """Migrate the database to `version` or the most recent version."""
-    return IMPL.db_sync(version=version)
+_IMPL = None
+_LOCK = threading.Lock()
+
+db.options.set_defaults(cfg.CONF)
+
+MIGRATE_REPO_PATH = os.path.join(
+    os.path.abspath(os.path.dirname(__file__)),
+    'sqlalchemy',
+    'migrate_repo',
+)
 
 
-def db_version():
-    """Display the current database version."""
-    return IMPL.db_version()
+def get_backend():
+    global _IMPL
+    if _IMPL is None:
+        with _LOCK:
+            if _IMPL is None:
+                _IMPL = driver.DriverManager(
+                    "cinder.database.migration_backend",
+                    cfg.CONF.database.backend).driver
+    return _IMPL
 
 
-def db_initial_version():
-    """The starting version for the database."""
-    return IMPL.db_initial_version()
+def db_sync(version=None, init_version=INIT_VERSION, engine=None):
+    """Migrate the database to `version` or the most recent version."""
+
+    if engine is None:
+        engine = db_api.get_engine()
+    return get_backend().db_sync(engine=engine,
+                                 abs_path=MIGRATE_REPO_PATH,
+                                 version=version,
+                                 init_version=init_version)
index 5cbcbd5226d56da3bd82fb67d6bf94999880e8e2..5df0424b4ddcec638350bdf313d642620d4e62b0 100644 (file)
@@ -23,10 +23,7 @@ from cinder.openstack.common import log as logging
 LOG = logging.getLogger(__name__)
 
 
-def upgrade(migrate_engine):
-    meta = MetaData()
-    meta.bind = migrate_engine
-
+def define_tables(meta):
     migrations = Table(
         'migrations', meta,
         Column('created_at', DateTime),
@@ -217,21 +214,27 @@ def upgrade(migrate_engine):
                nullable=True),
         mysql_engine='InnoDB'
     )
+    return [sm_flavors,
+            sm_backend_config,
+            snapshots,
+            volume_types,
+            volumes,
+            iscsi_targets,
+            migrations,
+            quotas,
+            services,
+            sm_volume,
+            volume_metadata,
+            volume_type_extra_specs]
+
+
+def upgrade(migrate_engine):
+    meta = MetaData()
+    meta.bind = migrate_engine
 
     # create all tables
     # Take care on create order for those with FK dependencies
-    tables = [sm_flavors,
-              sm_backend_config,
-              snapshots,
-              volume_types,
-              volumes,
-              iscsi_targets,
-              migrations,
-              quotas,
-              services,
-              sm_volume,
-              volume_metadata,
-              volume_type_extra_specs]
+    tables = define_tables(meta)
 
     for table in tables:
         try:
@@ -268,4 +271,10 @@ def upgrade(migrate_engine):
 
 
 def downgrade(migrate_engine):
-    LOG.exception(_('Downgrade from initial Cinder install is unsupported.'))
+    meta = MetaData()
+    meta.bind = migrate_engine
+    tables = define_tables(meta)
+    tables.reverse()
+    for table in tables:
+        LOG.info("dropping table %(table)s" % {'table': table})
+        table.drop()
index 112764e5812963635fa031153f4bc73ce4952513..b6adb3bf5424ab718ec75710f2b6b9a462bde0e3 100644 (file)
@@ -40,5 +40,4 @@ def downgrade(migrate_engine):
 
     volumes = Table('volumes', meta, autoload=True)
     bootable = volumes.columns.bootable
-    #bootable = Column('bootable', Boolean)
     volumes.drop_column(bootable)
diff --git a/cinder/db/sqlalchemy/migration.py b/cinder/db/sqlalchemy/migration.py
deleted file mode 100644 (file)
index f6aa7c5..0000000
+++ /dev/null
@@ -1,86 +0,0 @@
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-
-import os
-
-from migrate import exceptions as versioning_exceptions
-from migrate.versioning import api as versioning_api
-from migrate.versioning.repository import Repository
-import sqlalchemy
-
-from cinder.db.sqlalchemy.api import get_engine
-from cinder import exception
-from cinder.i18n import _
-
-INIT_VERSION = 000
-_REPOSITORY = None
-
-
-def db_sync(version=None):
-    if version is not None:
-        try:
-            version = int(version)
-        except ValueError:
-            raise exception.Error(_("version should be an integer"))
-
-    current_version = db_version()
-    repository = _find_migrate_repo()
-    if version is None or version > current_version:
-        return versioning_api.upgrade(get_engine(), repository, version)
-    else:
-        return versioning_api.downgrade(get_engine(), repository,
-                                        version)
-
-
-def db_version():
-    repository = _find_migrate_repo()
-    try:
-        return versioning_api.db_version(get_engine(), repository)
-    except versioning_exceptions.DatabaseNotControlledError:
-        # If we aren't version controlled we may already have the database
-        # in the state from before we started version control, check for that
-        # and set up version_control appropriately
-        meta = sqlalchemy.MetaData()
-        engine = get_engine()
-        meta.reflect(bind=engine)
-        tables = meta.tables
-        if len(tables) == 0:
-            db_version_control(INIT_VERSION)
-            return versioning_api.db_version(get_engine(), repository)
-        else:
-            raise exception.Error(_("Upgrade DB using Essex release first."))
-
-
-def db_initial_version():
-    return INIT_VERSION
-
-
-def db_version_control(version=None):
-    repository = _find_migrate_repo()
-    versioning_api.version_control(get_engine(), repository, version)
-    return version
-
-
-def _find_migrate_repo():
-    """Get the path for the migrate repository."""
-    global _REPOSITORY
-    path = os.path.join(os.path.abspath(os.path.dirname(__file__)),
-                        'migrate_repo')
-    assert os.path.exists(path)
-    if _REPOSITORY is None:
-        _REPOSITORY = Repository(path)
-    return _REPOSITORY
index 121a80ca3b13dbcf08b66a5ac660d98c9e2197da..38896cbf9683e7bcd796267ff78444c6380d6890 100644 (file)
@@ -76,13 +76,6 @@ class Database(fixtures.Fixture):
         self.engine = db_api.get_engine()
         self.engine.dispose()
         conn = self.engine.connect()
-        if sql_connection == "sqlite://":
-            if db_migrate.db_version() > db_migrate.db_initial_version():
-                return
-        else:
-            testdb = os.path.join(CONF.state_path, sqlite_db)
-            if os.path.exists(testdb):
-                return
         db_migrate.db_sync()
 #        self.post_migrations()
         if sql_connection == "sqlite://":
@@ -91,6 +84,7 @@ class Database(fixtures.Fixture):
             self.engine.dispose()
         else:
             cleandb = os.path.join(CONF.state_path, sqlite_clean_db)
+            testdb = os.path.join(CONF.state_path, sqlite_db)
             shutil.copyfile(testdb, cleandb)
 
     def setUp(self):
index 1bf9ae34c60880c73e075e4685b1f0422ffb53dc..63ab732a783fc4a3cc4163bb03de3ac26797f399 100755 (executable)
@@ -337,11 +337,11 @@ class TestCinderManageCmd(test.TestCase):
         db_cmds.sync(version=version)
         db_sync.assert_called_once_with(version)
 
-    @mock.patch('cinder.db.migration.db_version')
+    @mock.patch('oslo.db.sqlalchemy.migration.db_version')
     def test_db_commands_version(self, db_version):
         db_cmds = cinder_manage.DbCommands()
         db_cmds.version()
-        db_version.assert_called_once_with()
+        self.assertEqual(1, db_version.call_count)
 
     @mock.patch('cinder.version.version_string')
     def test_versions_commands_list(self, version_string):
diff --git a/cinder/tests/test_migrations.conf b/cinder/tests/test_migrations.conf
deleted file mode 100644 (file)
index 774f149..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-[DEFAULT]
-# Set up any number of migration data stores you want, one
-# The "name" used in the test is the config variable key.
-#sqlite=sqlite:///test_migrations.db
-sqlite=sqlite://
-#mysql=mysql://root:@localhost/test_migrations
-#postgresql=postgresql://user:pass@localhost/test_migrations
-[walk_style]
-snake_walk=yes
index 11890439f99655db9de7c48b298085c83bdc8242..0880915bd38e59cd2591f127221e187633c0fec5 100644 (file)
@@ -1,6 +1,3 @@
-# Copyright 2010-2011 OpenStack Foundation
-# All Rights Reserved.
-#
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
 #    a copy of the License at
@@ -22,366 +19,50 @@ properly both upgrading and downgrading, and that no data loss occurs
 if possible.
 """
 
-import ConfigParser
 import os
-import subprocess
 import uuid
 
+from migrate.versioning import api as migration_api
 from migrate.versioning import repository
-import six.moves.urllib.parse as urlparse
+from oslo.db.sqlalchemy import test_base
+from oslo.db.sqlalchemy import test_migrations
+from oslo.db.sqlalchemy import utils as db_utils
 import sqlalchemy
-import testtools
 
-import cinder.db.migration as migration
+from cinder.db import migration
 import cinder.db.sqlalchemy.migrate_repo
-from cinder.db.sqlalchemy.migration import versioning_api as migration_api
-from cinder import test
-
-
-def _get_connect_string(backend,
-                        user="openstack_citest",
-                        passwd="openstack_citest",
-                        database="openstack_citest"):
-    """Return connect string.
-
-    Try to get a connection with a very specific set of values, if we get
-    these then we'll run the tests, otherwise they are skipped.
-    """
-    if backend == "postgres":
-        backend = "postgresql+psycopg2"
-
-    return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" %
-            {'backend': backend, 'user': user, 'passwd': passwd,
-             'database': database})
-
-
-def _is_mysql_avail(**kwargs):
-    return _is_backend_avail('mysql', **kwargs)
-
-
-def _is_backend_avail(backend,
-                      user="openstack_citest",
-                      passwd="openstack_citest",
-                      database="openstack_citest"):
-    try:
-        if backend == "mysql":
-            connect_uri = _get_connect_string("mysql", user=user,
-                                              passwd=passwd, database=database)
-        elif backend == "postgres":
-            connect_uri = _get_connect_string("postgres", user=user,
-                                              passwd=passwd, database=database)
-        engine = sqlalchemy.create_engine(connect_uri)
-        connection = engine.connect()
-    except Exception:
-        # intentionally catch all to handle exceptions even if we don't
-        # have any backend code loaded.
-        return False
-    else:
-        connection.close()
-        engine.dispose()
-        return True
-
-
-def _have_mysql():
-    present = os.environ.get('NOVA_TEST_MYSQL_PRESENT')
-    if present is None:
-        return _is_backend_avail('mysql')
-    return present.lower() in ('', 'true')
-
-
-def get_table(engine, name):
-    """Returns an sqlalchemy table dynamically from db.
-
-    Needed because the models don't work for us in migrations
-    as models will be far out of sync with the current data.
-    """
-    metadata = sqlalchemy.schema.MetaData()
-    metadata.bind = engine
-    return sqlalchemy.Table(name, metadata, autoload=True)
-
-
-class TestMigrations(test.TestCase):
-    """Test sqlalchemy-migrate migrations."""
-
-    DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
-                                       'test_migrations.conf')
-    # Test machines can set the CINDER_TEST_MIGRATIONS_CONF variable
-    # to override the location of the config file for migration testing
-    CONFIG_FILE_PATH = os.environ.get('CINDER_TEST_MIGRATIONS_CONF',
-                                      DEFAULT_CONFIG_FILE)
-    MIGRATE_FILE = cinder.db.sqlalchemy.migrate_repo.__file__
-    REPOSITORY = repository.Repository(
-        os.path.abspath(os.path.dirname(MIGRATE_FILE)))
-
-    def setUp(self):
-        super(TestMigrations, self).setUp()
-
-        self.snake_walk = False
-        self.test_databases = {}
-
-        # Load test databases from the config file. Only do this
-        # once. No need to re-run this on each test...
-        if not self.test_databases:
-            if os.path.exists(TestMigrations.CONFIG_FILE_PATH):
-                cp = ConfigParser.RawConfigParser()
-                try:
-                    cp.read(TestMigrations.CONFIG_FILE_PATH)
-                    defaults = cp.defaults()
-                    for key, value in defaults.items():
-                        self.test_databases[key] = value
-                    self.snake_walk = cp.getboolean('walk_style', 'snake_walk')
-                except ConfigParser.ParsingError as e:
-                    self.fail("Failed to read test_migrations.conf config "
-                              "file. Got error: %s" % e)
-            else:
-                self.fail("Failed to find test_migrations.conf config "
-                          "file.")
-
-        self.engines = {}
-        for key, value in self.test_databases.items():
-            self.engines[key] = sqlalchemy.create_engine(value)
-
-        # Set-up a dict of types for those column types that
-        # are not uniform for all databases.
-        self.bool_type = {}
-        self.time_type = {}
-        for (key, engine) in self.engines.items():
-            self.bool_type[engine.name] = sqlalchemy.types.BOOLEAN
-            self.time_type[engine.name] = sqlalchemy.types.DATETIME
-            if engine.name == 'mysql':
-                self.bool_type[engine.name] = sqlalchemy.dialects.mysql.TINYINT
-            if engine.name == 'postgresql':
-                self.time_type[engine.name] = sqlalchemy.types.TIMESTAMP
-
-        # We start each test case with a completely blank slate.
-        self._reset_databases()
-
-        # We destroy the test data store between each test case,
-        # and recreate it, which ensures that we have no side-effects
-        # from the tests
-        self.addCleanup(self._reset_databases)
-
-    def _reset_databases(self):
-        def execute_cmd(cmd=None):
-            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
-                                    stderr=subprocess.STDOUT, shell=True)
-            proc.communicate()[0]
-            self.assertEqual(0, proc.returncode)
-
-        for key, engine in self.engines.items():
-            conn_string = self.test_databases[key]
-            conn_pieces = urlparse.urlparse(conn_string)
-            engine.dispose()
-            if conn_string.startswith('sqlite'):
-                # We can just delete the SQLite database, which is
-                # the easiest and cleanest solution
-                db_path = conn_pieces.path.strip('/')
-                if os.path.exists(db_path):
-                    os.unlink(db_path)
-                # No need to recreate the SQLite DB. SQLite will
-                # create it for us if it's not there...
-            elif conn_string.startswith('mysql'):
-                # We can execute the MySQL client to destroy and re-create
-                # the MYSQL database, which is easier and less error-prone
-                # than using SQLAlchemy to do this via MetaData...trust me.
-                database = conn_pieces.path.strip('/')
-                loc_pieces = conn_pieces.netloc.split('@')
-                host = loc_pieces[1]
-                auth_pieces = loc_pieces[0].split(':')
-                user = auth_pieces[0]
-                password = ""
-                if len(auth_pieces) > 1:
-                    if auth_pieces[1].strip():
-                        password = "-p\"%s\"" % auth_pieces[1]
-                sql = ("drop database if exists %(database)s; create database "
-                       "%(database)s;") % {'database': database}
-                cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s "
-                       "-e \"%(sql)s\"") % {'user': user, 'password': password,
-                                            'host': host, 'sql': sql}
-                execute_cmd(cmd)
-            elif conn_string.startswith('postgresql'):
-                database = conn_pieces.path.strip('/')
-                loc_pieces = conn_pieces.netloc.split('@')
-                host = loc_pieces[1]
-
-                auth_pieces = loc_pieces[0].split(':')
-                user = auth_pieces[0]
-                password = ""
-                if len(auth_pieces) > 1:
-                    password = auth_pieces[1].strip()
-                # note(krtaylor): File creation problems with tests in
-                # venv using .pgpass authentication, changed to
-                # PGPASSWORD environment variable which is no longer
-                # planned to be deprecated
-                os.environ['PGPASSWORD'] = password
-                os.environ['PGUSER'] = user
-                # note(boris-42): We must create and drop database, we can't
-                # drop database which we have connected to, so for such
-                # operations there is a special database template1.
-                sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
-                          " '%(sql)s' -d template1")
-                sql = ("drop database if exists %(database)s;") % {'database':
-                                                                   database}
-                droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
-                execute_cmd(droptable)
-                sql = ("create database %(database)s;") % {'database':
-                                                           database}
-                createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
-                execute_cmd(createtable)
-                os.unsetenv('PGPASSWORD')
-                os.unsetenv('PGUSER')
-
-    def test_walk_versions(self):
-        """Test walk versions.
-
-        Walks all version scripts for each tested database, ensuring
-        that there are no errors in the version scripts for each engine
-        """
-        for _key, engine in self.engines.items():
-            self._walk_versions(engine, self.snake_walk)
 
-    def test_mysql_connect_fail(self):
-        """Test for mysql connection failure.
 
-        Test that we can trigger a mysql connection failure and we fail
-        gracefully to ensure we don't break people without mysql
-        """
-        if _is_mysql_avail(user="openstack_cifail"):
-            self.fail("Shouldn't have connected")
-
-    @testtools.skipUnless(_have_mysql(), "mysql not available")
-    def test_mysql_innodb(self):
-        """Test that table creation on mysql only builds InnoDB tables."""
-        # add this to the global lists to make reset work with it, it's removed
-        # automatically in tearDown so no need to clean it up here.
-        connect_string = _get_connect_string('mysql')
-        engine = sqlalchemy.create_engine(connect_string)
-        self.engines["mysqlcitest"] = engine
-        self.test_databases["mysqlcitest"] = connect_string
+class MigrationsMixin(test_migrations.WalkVersionsMixin):
+    """Test sqlalchemy-migrate migrations."""
 
-        # build a fully populated mysql database with all the tables
-        self._reset_databases()
-        self._walk_versions(engine, False, False)
+    BOOL_TYPE = sqlalchemy.types.BOOLEAN
+    TIME_TYPE = sqlalchemy.types.DATETIME
 
-        uri = _get_connect_string('mysql', database="information_schema")
-        connection = sqlalchemy.create_engine(uri).connect()
+    @property
+    def INIT_VERSION(self):
+        return migration.INIT_VERSION
 
-        # sanity check
-        total = connection.execute("SELECT count(*) "
-                                   "from information_schema.TABLES "
-                                   "where TABLE_SCHEMA='openstack_citest'")
-        self.assertGreater(total.scalar(), 0,
-                           msg="No tables found. Wrong schema?")
+    @property
+    def REPOSITORY(self):
+        migrate_file = cinder.db.sqlalchemy.migrate_repo.__file__
+        return repository.Repository(
+            os.path.abspath(os.path.dirname(migrate_file)))
 
-        noninnodb = connection.execute("SELECT count(*) "
-                                       "from information_schema.TABLES "
-                                       "where TABLE_SCHEMA='openstack_citest' "
-                                       "and ENGINE!='InnoDB' "
-                                       "and TABLE_NAME!='migrate_version'")
-        count = noninnodb.scalar()
-        self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
+    @property
+    def migration_api(self):
+        return migration_api
 
-    def test_postgresql_connect_fail(self):
-        """Test connection failure on PostgrSQL.
+    @property
+    def migrate_engine(self):
+        return self.engine
 
-        Test that we can trigger a postgres connection failure and we fail
-        gracefully to ensure we don't break people without postgres.
-        """
-        if _is_backend_avail('postgres', user="openstack_cifail"):
-            self.fail("Shouldn't have connected")
+    def get_table_ref(self, engine, name, metadata):
+        metadata.bind = engine
+        return sqlalchemy.Table(name, metadata, autoload=True)
 
-    @testtools.skipUnless(_is_backend_avail('postgres'),
-                          "postgresql not available")
-    def test_postgresql_opportunistically(self):
-        # add this to the global lists to make reset work with it, it's removed
-        # automatically in tearDown so no need to clean it up here.
-        connect_string = _get_connect_string("postgres")
-        engine = sqlalchemy.create_engine(connect_string)
-        self.engines["postgresqlcitest"] = engine
-        self.test_databases["postgresqlcitest"] = connect_string
-
-        # build a fully populated postgresql database with all the tables
-        self._reset_databases()
-        self._walk_versions(engine, False, False)
-
-    def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):
-        # Determine latest version script from the repo, then
-        # upgrade from 1 through to the latest, with no data
-        # in the databases. This just checks that the schema itself
-        # upgrades successfully.
-
-        # Place the database under version control
-        migration_api.version_control(engine,
-                                      TestMigrations.REPOSITORY,
-                                      migration.db_initial_version())
-        self.assertEqual(migration.db_initial_version(),
-                         migration_api.db_version(engine,
-                                                  TestMigrations.REPOSITORY))
-
-        migration_api.upgrade(engine, TestMigrations.REPOSITORY,
-                              migration.db_initial_version() + 1)
-
-        for version in xrange(migration.db_initial_version() + 2,
-                              TestMigrations.REPOSITORY.latest + 1):
-            # upgrade -> downgrade -> upgrade
-            self._migrate_up(engine, version, with_data=True)
-            if snake_walk:
-                self._migrate_down(engine, version - 1)
-                self._migrate_up(engine, version)
-
-        if downgrade:
-            # Now walk it back down to 0 from the latest, testing
-            # the downgrade paths.
-            for version in reversed(
-                xrange(migration.db_initial_version() + 1,
-                       TestMigrations.REPOSITORY.latest)):
-                # downgrade -> upgrade -> downgrade
-                self._migrate_down(engine, version)
-                if snake_walk:
-                    self._migrate_up(engine, version + 1)
-                    self._migrate_down(engine, version)
-
-    def _migrate_down(self, engine, version):
-        migration_api.downgrade(engine,
-                                TestMigrations.REPOSITORY,
-                                version)
-        self.assertEqual(version,
-                         migration_api.db_version(engine,
-                                                  TestMigrations.REPOSITORY))
-
-    def _migrate_up(self, engine, version, with_data=False):
-        """Migrate up to a new version of the db.
-
-        We allow for data insertion and post checks at every
-        migration version with special _prerun_### and
-        _check_### functions in the main test.
-        """
-        # NOTE(sdague): try block is here because it's impossible to debug
-        # where a failed data migration happens otherwise
-        try:
-            if with_data:
-                data = None
-                prerun = getattr(self, "_prerun_%3.3d" % version, None)
-                if prerun:
-                    data = prerun(engine)
-
-            migration_api.upgrade(engine,
-                                  TestMigrations.REPOSITORY,
-                                  version)
-            self.assertEqual(
-                version,
-                migration_api.db_version(engine,
-                                         TestMigrations.REPOSITORY))
-
-            if with_data:
-                check = getattr(self, "_check_%3.3d" % version, None)
-                if check:
-                    check(engine, data)
-        except Exception:
-            raise
-
-    # migration 004 - change volume types to UUID
-    def _prerun_004(self, engine):
+    def _pre_upgrade_004(self, engine):
+        """Change volume types to UUID """
         data = {
             'volumes': [{'id': str(uuid.uuid4()), 'host': 'test1',
                          'volume_type_id': 1},
@@ -408,17 +89,17 @@ class TestMigrations(test.TestCase):
                                          },
                                         ]}
 
-        volume_types = get_table(engine, 'volume_types')
+        volume_types = db_utils.get_table(engine, 'volume_types')
         for vtype in data['volume_types']:
             r = volume_types.insert().values(vtype).execute()
             vtype['id'] = r.inserted_primary_key[0]
 
-        volume_type_es = get_table(engine, 'volume_type_extra_specs')
+        volume_type_es = db_utils.get_table(engine, 'volume_type_extra_specs')
         for vtes in data['volume_type_extra_specs']:
             r = volume_type_es.insert().values(vtes).execute()
             vtes['id'] = r.inserted_primary_key[0]
 
-        volumes = get_table(engine, 'volumes')
+        volumes = db_utils.get_table(engine, 'volumes')
         for vol in data['volumes']:
             r = volumes.insert().values(vol).execute()
             vol['id'] = r.inserted_primary_key[0]
@@ -426,7 +107,7 @@ class TestMigrations(test.TestCase):
         return data
 
     def _check_004(self, engine, data):
-        volumes = get_table(engine, 'volumes')
+        volumes = db_utils.get_table(engine, 'volumes')
         v1 = volumes.select(volumes.c.id ==
                             data['volumes'][0]['id']
                             ).execute().first()
@@ -437,7 +118,7 @@ class TestMigrations(test.TestCase):
                             data['volumes'][2]['id']
                             ).execute().first()
 
-        volume_types = get_table(engine, 'volume_types')
+        volume_types = db_utils.get_table(engine, 'volume_types')
         vt1 = volume_types.select(volume_types.c.name ==
                                   data['volume_types'][0]['name']
                                   ).execute().first()
@@ -448,7 +129,7 @@ class TestMigrations(test.TestCase):
                                   data['volume_types'][2]['name']
                                   ).execute().first()
 
-        vtes = get_table(engine, 'volume_type_extra_specs')
+        vtes = db_utils.get_table(engine, 'volume_type_extra_specs')
         vtes1 = vtes.select(vtes.c.key ==
                             data['volume_type_extra_specs'][0]['key']
                             ).execute().first()
@@ -467,884 +148,573 @@ class TestMigrations(test.TestCase):
         self.assertEqual(vtes2['volume_type_id'], vt1['id'])
         self.assertEqual(vtes3['volume_type_id'], vt2['id'])
 
-    def test_migration_005(self):
+    def _check_005(self, engine, data):
         """Test that adding source_volid column works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 4)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 5)
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertIsInstance(volumes.c.source_volid.type,
-                                  sqlalchemy.types.VARCHAR)
-
-    def _metadatas(self, upgrade_to, downgrade_to=None):
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine,
-                                  TestMigrations.REPOSITORY,
-                                  upgrade_to)
-
-            if downgrade_to is not None:
-                migration_api.downgrade(
-                    engine, TestMigrations.REPOSITORY, downgrade_to)
-
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-            yield metadata
-
-    def metadatas_upgraded_to(self, revision):
-        return self._metadatas(revision)
-
-    def metadatas_downgraded_from(self, revision):
-        return self._metadatas(revision, revision - 1)
-
-    def test_upgrade_006_adds_provider_location(self):
-        for metadata in self.metadatas_upgraded_to(6):
-            snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
-            self.assertIsInstance(snapshots.c.provider_location.type,
-                                  sqlalchemy.types.VARCHAR)
-
-    def test_downgrade_006_removes_provider_location(self):
-        for metadata in self.metadatas_downgraded_from(6):
-            snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
-
-            self.assertNotIn('provider_location', snapshots.c)
-
-    def test_upgrade_007_adds_fk(self):
-        for metadata in self.metadatas_upgraded_to(7):
-            snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
-            volumes = sqlalchemy.Table('volumes', metadata, autoload=True)
-
-            fkey, = snapshots.c.volume_id.foreign_keys
-
-            self.assertEqual(volumes.c.id, fkey.column)
-
-    def test_downgrade_007_removes_fk(self):
-        for metadata in self.metadatas_downgraded_from(7):
-            snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
-
-            self.assertEqual(0, len(snapshots.c.volume_id.foreign_keys))
-
-    def test_migration_008(self):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertIsInstance(volumes.c.source_volid.type,
+                              sqlalchemy.types.VARCHAR)
+
+    def _check_006(self, engine, data):
+        snapshots = db_utils.get_table(engine, 'snapshots')
+        self.assertIsInstance(snapshots.c.provider_location.type,
+                              sqlalchemy.types.VARCHAR)
+
+    def _post_downgrade_006(self, engine):
+        snapshots = db_utils.get_table(engine, 'snapshots')
+        self.assertNotIn('provider_location', snapshots.c)
+
+    def _check_007(self, engine, data):
+        snapshots = db_utils.get_table(engine, 'snapshots')
+        fkey, = snapshots.c.volume_id.foreign_keys
+
+        self.assertIsNotNone(fkey)
+
+    def _post_downgrade_007(self, engine):
+        snapshots = db_utils.get_table(engine, 'snapshots')
+
+        self.assertEqual(0, len(snapshots.c.volume_id.foreign_keys))
+
+    def _pre_upgrade_008(self, engine):
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  "backups"))
+
+    def _check_008(self, engine, data):
         """Test that adding and removing the backups table works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 7)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 8)
-
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     "backups"))
-            backups = sqlalchemy.Table('backups',
-                                       metadata,
-                                       autoload=True)
-
-            self.assertIsInstance(backups.c.created_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(backups.c.updated_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(backups.c.deleted_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(backups.c.deleted.type,
-                                  self.bool_type[engine.name])
-            self.assertIsInstance(backups.c.id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.volume_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.user_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.project_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.host.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.availability_zone.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.display_name.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.display_description.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.container.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.status.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.fail_reason.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.service_metadata.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.service.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(backups.c.size.type,
-                                  sqlalchemy.types.INTEGER)
-            self.assertIsInstance(backups.c.object_count.type,
-                                  sqlalchemy.types.INTEGER)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 7)
-
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      "backups"))
-
-    def test_migration_009(self):
+
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 "backups"))
+        backups = db_utils.get_table(engine, 'backups')
+
+        self.assertIsInstance(backups.c.created_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(backups.c.updated_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(backups.c.deleted_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(backups.c.deleted.type,
+                              self.BOOL_TYPE)
+        self.assertIsInstance(backups.c.id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.volume_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.user_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.project_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.host.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.availability_zone.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.display_name.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.display_description.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.container.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.status.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.fail_reason.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.service_metadata.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.service.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(backups.c.size.type,
+                              sqlalchemy.types.INTEGER)
+        self.assertIsInstance(backups.c.object_count.type,
+                              sqlalchemy.types.INTEGER)
+
+    def _check_009(self, engine, data):
         """Test adding snapshot_metadata table works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 8)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 9)
-
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     "snapshot_metadata"))
-            snapshot_metadata = sqlalchemy.Table('snapshot_metadata',
-                                                 metadata,
-                                                 autoload=True)
-
-            self.assertIsInstance(snapshot_metadata.c.created_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(snapshot_metadata.c.updated_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(snapshot_metadata.c.deleted_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(snapshot_metadata.c.deleted.type,
-                                  self.bool_type[engine.name])
-            self.assertIsInstance(snapshot_metadata.c.deleted.type,
-                                  self.bool_type[engine.name])
-            self.assertIsInstance(snapshot_metadata.c.id.type,
-                                  sqlalchemy.types.INTEGER)
-            self.assertIsInstance(snapshot_metadata.c.snapshot_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(snapshot_metadata.c.key.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(snapshot_metadata.c.value.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 8)
-
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      "snapshot_metadata"))
-
-    def test_migration_010(self):
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 "snapshot_metadata"))
+        snapshot_metadata = db_utils.get_table(engine, 'snapshot_metadata')
+
+        self.assertIsInstance(snapshot_metadata.c.created_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(snapshot_metadata.c.updated_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(snapshot_metadata.c.deleted_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(snapshot_metadata.c.deleted.type,
+                              self.BOOL_TYPE)
+        self.assertIsInstance(snapshot_metadata.c.deleted.type,
+                              self.BOOL_TYPE)
+        self.assertIsInstance(snapshot_metadata.c.id.type,
+                              sqlalchemy.types.INTEGER)
+        self.assertIsInstance(snapshot_metadata.c.snapshot_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(snapshot_metadata.c.key.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(snapshot_metadata.c.value.type,
+                              sqlalchemy.types.VARCHAR)
+
+    def _post_downgrade_008(self, engine):
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  "snapshot_metadata"))
+
+    def _check_010(self, engine, data):
         """Test adding transfers table works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 9)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 10)
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     "transfers"))
-            transfers = sqlalchemy.Table('transfers',
-                                         metadata,
-                                         autoload=True)
-
-            self.assertIsInstance(transfers.c.created_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(transfers.c.updated_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(transfers.c.deleted_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(transfers.c.deleted.type,
-                                  self.bool_type[engine.name])
-            self.assertIsInstance(transfers.c.id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(transfers.c.volume_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(transfers.c.display_name.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(transfers.c.salt.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(transfers.c.crypt_hash.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(transfers.c.expires_at.type,
-                                  self.time_type[engine.name])
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 9)
-
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      "transfers"))
-
-    def test_migration_011(self):
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 "transfers"))
+        transfers = db_utils.get_table(engine, 'transfers')
+
+        self.assertIsInstance(transfers.c.created_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(transfers.c.updated_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(transfers.c.deleted_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(transfers.c.deleted.type,
+                              self.BOOL_TYPE)
+        self.assertIsInstance(transfers.c.id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(transfers.c.volume_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(transfers.c.display_name.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(transfers.c.salt.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(transfers.c.crypt_hash.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(transfers.c.expires_at.type,
+                              self.TIME_TYPE)
+
+    def _post_downgrade_010(self, engine):
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  "transfers"))
+
+    def _check_011(self, engine, data):
         """Test adding transfers table works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 10)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            volumes_v10 = sqlalchemy.Table('volumes',
-                                           metadata,
-                                           autoload=True)
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 11)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     "volumes"))
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-
-            # Make sure we didn't miss any columns in the upgrade
-            for column in volumes_v10.c:
-                self.assertTrue(volumes.c.__contains__(column.name))
-
-            self.assertIsInstance(volumes.c.bootable.type,
-                                  self.bool_type[engine.name])
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 10)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertNotIn('bootable', volumes.c)
-
-            # Make sure we put all the columns back
-            for column in volumes_v10.c:
-                self.assertTrue(volumes.c.__contains__(column.name))
-
-    def test_migration_012(self):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertIn('bootable', volumes.c)
+        self.assertIsInstance(volumes.c.bootable.type,
+                              self.BOOL_TYPE)
+
+    def _post_downgrade_011(self, engine):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertNotIn('bootable', volumes.c)
+
+    def _check_012(self, engine, data):
         """Test that adding attached_host column works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 11)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 12)
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertIsInstance(volumes.c.attached_host.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 11)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertNotIn('attached_host', volumes.c)
-
-    def test_migration_013(self):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertIsInstance(volumes.c.attached_host.type,
+                              sqlalchemy.types.VARCHAR)
+
+    def _post_downgrade_012(self, engine):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertNotIn('attached_host', volumes.c)
+
+    def _check_013(self, engine, data):
         """Test that adding provider_geometry column works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 12)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 13)
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertIsInstance(volumes.c.provider_geometry.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 12)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertNotIn('provider_geometry', volumes.c)
-
-    def test_migration_014(self):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertIsInstance(volumes.c.provider_geometry.type,
+                              sqlalchemy.types.VARCHAR)
+
+    def _post_downgrade_013(self, engine):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertNotIn('provider_geometry', volumes.c)
+
+    def _check_014(self, engine, data):
         """Test that adding _name_id column works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 13)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 14)
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertIsInstance(volumes.c._name_id.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 13)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertNotIn('_name_id', volumes.c)
-
-    def test_migration_015(self):
-        """Test removing migrations table works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 15)
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertIsInstance(volumes.c._name_id.type,
+                              sqlalchemy.types.VARCHAR)
 
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      "migrations"))
+    def _post_downgrade_014(self, engine):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertNotIn('_name_id', volumes.c)
 
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 14)
+    def _check_015(self, engine, data):
+        """Test removing migrations table works correctly."""
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  "migrations"))
 
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     "migrations"))
+    def _post_downgrade_015(self, engine):
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 "migrations"))
 
-    def test_migration_016(self):
+    def _check_016(self, engine, data):
         """Test that dropping xen storage manager tables works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 15)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 16)
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      'sm_flavors'))
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      'sm_backend_config'))
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      'sm_volume'))
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 15)
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     'sm_flavors'))
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     'sm_backend_config'))
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     'sm_volume'))
-
-    def test_migration_017(self):
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  'sm_flavors'))
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  'sm_backend_config'))
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  'sm_volume'))
+
+    def _post_downgrade_016(self, engine):
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 'sm_flavors'))
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 'sm_backend_config'))
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 'sm_volume'))
+
+    def _check_017(self, engine, data):
         """Test that added encryption information works correctly."""
-
-        # upgrade schema
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 16)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 17)
-
-            # encryption key UUID
-            volumes = sqlalchemy.Table('volumes', metadata, autoload=True)
-            self.assertIn('encryption_key_id', volumes.c)
-            self.assertIsInstance(volumes.c.encryption_key_id.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
-            self.assertIn('encryption_key_id', snapshots.c)
-            self.assertIsInstance(snapshots.c.encryption_key_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIn('volume_type_id', snapshots.c)
-            self.assertIsInstance(snapshots.c.volume_type_id.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            # encryption types table
-            encryption = sqlalchemy.Table('encryption',
-                                          metadata,
-                                          autoload=True)
-            self.assertIsInstance(encryption.c.volume_type_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(encryption.c.cipher.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(encryption.c.key_size.type,
-                                  sqlalchemy.types.INTEGER)
-            self.assertIsInstance(encryption.c.provider.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            # downgrade schema
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 16)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            volumes = sqlalchemy.Table('volumes', metadata, autoload=True)
-            self.assertNotIn('encryption_key_id', volumes.c)
-
-            snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
-            self.assertNotIn('encryption_key_id', snapshots.c)
-
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      'encryption'))
-
-    def test_migration_018(self):
+        # encryption key UUID
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertIn('encryption_key_id', volumes.c)
+        self.assertIsInstance(volumes.c.encryption_key_id.type,
+                              sqlalchemy.types.VARCHAR)
+
+        snapshots = db_utils.get_table(engine, 'snapshots')
+        self.assertIn('encryption_key_id', snapshots.c)
+        self.assertIsInstance(snapshots.c.encryption_key_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIn('volume_type_id', snapshots.c)
+        self.assertIsInstance(snapshots.c.volume_type_id.type,
+                              sqlalchemy.types.VARCHAR)
+
+        # encryption types table
+        encryption = db_utils.get_table(engine, 'encryption')
+        self.assertIsInstance(encryption.c.volume_type_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(encryption.c.cipher.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(encryption.c.key_size.type,
+                              sqlalchemy.types.INTEGER)
+        self.assertIsInstance(encryption.c.provider.type,
+                              sqlalchemy.types.VARCHAR)
+
+    def _post_downgrade_017(self, engine):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertNotIn('encryption_key_id', volumes.c)
+
+        snapshots = db_utils.get_table(engine, 'snapshots')
+        self.assertNotIn('encryption_key_id', snapshots.c)
+
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  'encryption'))
+
+    def _check_018(self, engine, data):
         """Test that added qos_specs table works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 17)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 18)
-            self.assertTrue(engine.dialect.has_table(
-                engine.connect(), "quality_of_service_specs"))
-            qos_specs = sqlalchemy.Table('quality_of_service_specs',
-                                         metadata,
-                                         autoload=True)
-            self.assertIsInstance(qos_specs.c.created_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(qos_specs.c.updated_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(qos_specs.c.deleted_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(qos_specs.c.deleted.type,
-                                  self.bool_type[engine.name])
-            self.assertIsInstance(qos_specs.c.id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(qos_specs.c.specs_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(qos_specs.c.key.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(qos_specs.c.value.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 17)
-
-            self.assertFalse(engine.dialect.has_table(
-                engine.connect(), "quality_of_service_specs"))
-
-    def test_migration_019(self):
+        self.assertTrue(engine.dialect.has_table(
+            engine.connect(), "quality_of_service_specs"))
+        qos_specs = db_utils.get_table(engine, 'quality_of_service_specs')
+        self.assertIsInstance(qos_specs.c.created_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(qos_specs.c.updated_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(qos_specs.c.deleted_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(qos_specs.c.deleted.type,
+                              self.BOOL_TYPE)
+        self.assertIsInstance(qos_specs.c.id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(qos_specs.c.specs_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(qos_specs.c.key.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(qos_specs.c.value.type,
+                              sqlalchemy.types.VARCHAR)
+
+    def _post_downgrade_018(self, engine):
+        self.assertFalse(engine.dialect.has_table(
+            engine.connect(), "quality_of_service_specs"))
+
+    def _check_019(self, engine, data):
         """Test that adding migration_status column works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 18)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 19)
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertIsInstance(volumes.c.migration_status.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 18)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertNotIn('migration_status', volumes.c)
-
-    def test_migration_020(self):
-        """Test adding volume_admin_metadata table works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 19)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 20)
-
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     "volume_admin_metadata"))
-            volume_admin_metadata = sqlalchemy.Table('volume_admin_metadata',
-                                                     metadata,
-                                                     autoload=True)
-
-            self.assertIsInstance(volume_admin_metadata.c.created_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(volume_admin_metadata.c.updated_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(volume_admin_metadata.c.deleted_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(volume_admin_metadata.c.deleted.type,
-                                  self.bool_type[engine.name])
-            self.assertIsInstance(volume_admin_metadata.c.id.type,
-                                  sqlalchemy.types.INTEGER)
-            self.assertIsInstance(volume_admin_metadata.c.volume_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(volume_admin_metadata.c.key.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(volume_admin_metadata.c.value.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 19)
-
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      "volume_admin_metadata"))
-
-    def test_migration_021(self):
-        """Test adding default data for quota classes works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 20)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 21)
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertIsInstance(volumes.c.migration_status.type,
+                              sqlalchemy.types.VARCHAR)
 
-            quota_class_metadata = sqlalchemy.Table('quota_classes',
-                                                    metadata,
-                                                    autoload=True)
+    def _post_downgrade_019(self, engine):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertNotIn('migration_status', volumes.c)
 
-            num_defaults = quota_class_metadata.count().\
-                where(quota_class_metadata.c.class_name == 'default').\
-                execute().scalar()
-
-            self.assertEqual(3, num_defaults)
+    def _check_020(self, engine, data):
+        """Test adding volume_admin_metadata table works correctly."""
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 "volume_admin_metadata"))
+        volume_admin_metadata = db_utils.get_table(engine,
+                                                   'volume_admin_metadata')
+
+        self.assertIsInstance(volume_admin_metadata.c.created_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(volume_admin_metadata.c.updated_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(volume_admin_metadata.c.deleted_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(volume_admin_metadata.c.deleted.type,
+                              self.BOOL_TYPE)
+        self.assertIsInstance(volume_admin_metadata.c.id.type,
+                              sqlalchemy.types.INTEGER)
+        self.assertIsInstance(volume_admin_metadata.c.volume_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(volume_admin_metadata.c.key.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(volume_admin_metadata.c.value.type,
+                              sqlalchemy.types.VARCHAR)
+
+    def _post_downgrade_020(self, engine):
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  "volume_admin_metadata"))
+
+    def _verify_quota_defaults(self, engine):
+        quota_class_metadata = db_utils.get_table(engine, 'quota_classes')
+
+        num_defaults = quota_class_metadata.count().\
+            where(quota_class_metadata.c.class_name == 'default').\
+            execute().scalar()
+
+        self.assertEqual(3, num_defaults)
+
+    def _check_021(self, engine, data):
+        """Test adding default data for quota classes works correctly."""
+        self._verify_quota_defaults(engine)
 
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 20)
+    def _post_downgrade_021(self, engine):
+        # Defaults should not be deleted during downgrade
+        self._verify_quota_defaults(engine)
 
-            # Defaults should not be deleted during downgrade
-            num_defaults = quota_class_metadata.count().\
-                where(quota_class_metadata.c.class_name == 'default').\
-                execute().scalar()
+    def _check_022(self, engine, data):
+        """Test that adding disabled_reason column works correctly."""
+        services = db_utils.get_table(engine, 'services')
+        self.assertIsInstance(services.c.disabled_reason.type,
+                              sqlalchemy.types.VARCHAR)
 
-            self.assertEqual(3, num_defaults)
+    def _post_downgrade_022(self, engine):
+        services = db_utils.get_table(engine, 'services')
+        self.assertNotIn('disabled_reason', services.c)
 
-    def test_migration_022(self):
-        """Test that adding disabled_reason column works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 21)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 22)
-            services = sqlalchemy.Table('services',
-                                        metadata,
-                                        autoload=True)
-            self.assertIsInstance(services.c.disabled_reason.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 21)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            services = sqlalchemy.Table('services',
-                                        metadata,
-                                        autoload=True)
-            self.assertNotIn('disabled_reason', services.c)
-
-    def test_migration_023(self):
+    def _check_023(self, engine, data):
         """Test that adding reservations index works correctly."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 22)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 23)
-            reservations = sqlalchemy.Table('reservations',
-                                            metadata,
-                                            autoload=True)
-            index_columns = []
-            for idx in reservations.indexes:
-                if idx.name == 'reservations_deleted_expire_idx':
-                    index_columns = idx.columns.keys()
-                    break
-
-            self.assertEqual(sorted(['deleted', 'expire']),
-                             sorted(index_columns))
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 22)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            reservations = sqlalchemy.Table('reservations',
-                                            metadata,
-                                            autoload=True)
-            index_names = [idx.name for idx in reservations.indexes]
-            self.assertNotIn('reservations_deleted_expire_idx', index_names)
-
-    def test_migration_024(self):
+        reservations = db_utils.get_table(engine, 'reservations')
+        index_columns = []
+        for idx in reservations.indexes:
+            if idx.name == 'reservations_deleted_expire_idx':
+                index_columns = idx.columns.keys()
+                break
+
+        self.assertEqual(sorted(['deleted', 'expire']),
+                         sorted(index_columns))
+
+    def _post_downgrade_023(self, engine):
+        reservations = db_utils.get_table(engine, 'reservations')
+        index_names = [idx.name for idx in reservations.indexes]
+        self.assertNotIn('reservations_deleted_expire_idx', index_names)
+
+    def _check_024(self, engine, data):
         """Test adding replication columns to volume table."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 23)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 24)
-
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertIsInstance(volumes.c.replication_status.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(volumes.c.replication_extended_status.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(volumes.c.replication_driver_data.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 23)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertNotIn('replication_status', volumes.c)
-            self.assertNotIn('replication_extended_status', volumes.c)
-            self.assertNotIn('replication_driver_data', volumes.c)
-
-    def test_migration_025(self):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertIsInstance(volumes.c.replication_status.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(volumes.c.replication_extended_status.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(volumes.c.replication_driver_data.type,
+                              sqlalchemy.types.VARCHAR)
+
+    def _post_downgrade_024(self, engine):
+        volumes = db_utils.get_table(engine, 'volumes')
+        self.assertNotIn('replication_status', volumes.c)
+        self.assertNotIn('replication_extended_status', volumes.c)
+        self.assertNotIn('replication_driver_data', volumes.c)
+
+    def _check_025(self, engine, data):
         """Test adding table and columns for consistencygroups."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 24)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            # Upgrade
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 25)
-
-            # Test consistencygroup_id is in Table volumes
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertIsInstance(volumes.c.consistencygroup_id.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            # Test cgsnapshot_id is in Table snapshots
-            snapshots = sqlalchemy.Table('snapshots',
-                                         metadata,
-                                         autoload=True)
-            self.assertIsInstance(snapshots.c.cgsnapshot_id.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            # Test Table consistencygroups exists
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     "consistencygroups"))
-            consistencygroups = sqlalchemy.Table('consistencygroups',
-                                                 metadata,
-                                                 autoload=True)
-
-            self.assertIsInstance(consistencygroups.c.created_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(consistencygroups.c.updated_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(consistencygroups.c.deleted_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(consistencygroups.c.deleted.type,
-                                  self.bool_type[engine.name])
-            self.assertIsInstance(consistencygroups.c.id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(consistencygroups.c.user_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(consistencygroups.c.project_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(consistencygroups.c.host.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(consistencygroups.c.availability_zone.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(consistencygroups.c.name.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(consistencygroups.c.description.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(consistencygroups.c.volume_type_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(consistencygroups.c.status.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            # Test Table cgsnapshots exists
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     "cgsnapshots"))
-            cgsnapshots = sqlalchemy.Table('cgsnapshots',
-                                           metadata,
-                                           autoload=True)
-
-            self.assertIsInstance(cgsnapshots.c.created_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(cgsnapshots.c.updated_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(cgsnapshots.c.deleted_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(cgsnapshots.c.deleted.type,
-                                  self.bool_type[engine.name])
-            self.assertIsInstance(cgsnapshots.c.id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(cgsnapshots.c.user_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(cgsnapshots.c.project_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(cgsnapshots.c.consistencygroup_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(cgsnapshots.c.name.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(cgsnapshots.c.description.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(cgsnapshots.c.status.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            # Verify foreign keys are created
-            fkey, = volumes.c.consistencygroup_id.foreign_keys
-            self.assertEqual(consistencygroups.c.id, fkey.column)
-            self.assertEqual(1, len(volumes.foreign_keys))
-
-            fkey, = snapshots.c.cgsnapshot_id.foreign_keys
-            self.assertEqual(cgsnapshots.c.id, fkey.column)
-            fkey, = snapshots.c.volume_id.foreign_keys
-            self.assertEqual(volumes.c.id, fkey.column)
-            # 2 foreign keys in Table snapshots
-            self.assertEqual(2, len(snapshots.foreign_keys))
-
-            # Downgrade
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 24)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            # Test consistencygroup_id is not in Table volumes
-            volumes = sqlalchemy.Table('volumes',
-                                       metadata,
-                                       autoload=True)
-            self.assertNotIn('consistencygroup_id', volumes.c)
-
-            # Test cgsnapshot_id is not in Table snapshots
-            snapshots = sqlalchemy.Table('snapshots',
-                                         metadata,
-                                         autoload=True)
-            self.assertNotIn('cgsnapshot_id', snapshots.c)
-
-            # Verify foreign keys are removed
-            self.assertEqual(0, len(volumes.foreign_keys))
-            self.assertEqual(1, len(snapshots.foreign_keys))
-            # volume_id foreign key is still in Table snapshots
-            fkey, = snapshots.c.volume_id.foreign_keys
-            self.assertEqual(volumes.c.id, fkey.column)
-
-            # Test Table cgsnapshots doesn't exist any more
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      "cgsnapshots"))
-
-            # Test Table consistencygroups doesn't exist any more
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      "consistencygroups"))
-
-    def test_migration_026(self):
+        # Test consistencygroup_id is in Table volumes
+        metadata = sqlalchemy.MetaData()
+        volumes = self.get_table_ref(engine, 'volumes', metadata)
+        self.assertIsInstance(volumes.c.consistencygroup_id.type,
+                              sqlalchemy.types.VARCHAR)
+
+        # Test cgsnapshot_id is in Table snapshots
+        snapshots = self.get_table_ref(engine, 'snapshots', metadata)
+        self.assertIsInstance(snapshots.c.cgsnapshot_id.type,
+                              sqlalchemy.types.VARCHAR)
+
+        # Test Table consistencygroups exists
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 "consistencygroups"))
+        consistencygroups = self.get_table_ref(engine,
+                                               'consistencygroups',
+                                               metadata)
+        self.assertIsInstance(consistencygroups.c.created_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(consistencygroups.c.updated_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(consistencygroups.c.deleted_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(consistencygroups.c.deleted.type,
+                              self.BOOL_TYPE)
+        self.assertIsInstance(consistencygroups.c.id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(consistencygroups.c.user_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(consistencygroups.c.project_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(consistencygroups.c.host.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(consistencygroups.c.availability_zone.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(consistencygroups.c.name.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(consistencygroups.c.description.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(consistencygroups.c.volume_type_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(consistencygroups.c.status.type,
+                              sqlalchemy.types.VARCHAR)
+
+        # Test Table cgsnapshots exists
+        self.assertTrue(engine.dialect.has_table(engine.connect(),
+                                                 "cgsnapshots"))
+        cgsnapshots = self.get_table_ref(engine,
+                                         'cgsnapshots',
+                                         metadata)
+
+        self.assertIsInstance(cgsnapshots.c.created_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(cgsnapshots.c.updated_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(cgsnapshots.c.deleted_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(cgsnapshots.c.deleted.type,
+                              self.BOOL_TYPE)
+        self.assertIsInstance(cgsnapshots.c.id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(cgsnapshots.c.user_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(cgsnapshots.c.project_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(cgsnapshots.c.consistencygroup_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(cgsnapshots.c.name.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(cgsnapshots.c.description.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(cgsnapshots.c.status.type,
+                              sqlalchemy.types.VARCHAR)
+
+        # Verify foreign keys are created
+        fkey, = volumes.c.consistencygroup_id.foreign_keys
+        self.assertEqual(consistencygroups.c.id, fkey.column)
+        self.assertEqual(1, len(volumes.foreign_keys))
+
+        fkey, = snapshots.c.cgsnapshot_id.foreign_keys
+        self.assertEqual(cgsnapshots.c.id, fkey.column)
+        fkey, = snapshots.c.volume_id.foreign_keys
+        self.assertEqual(volumes.c.id, fkey.column)
+        # 2 foreign keys in Table snapshots
+        self.assertEqual(2, len(snapshots.foreign_keys))
+
+    def _post_downgrade_025(self, engine):
+        metadata = sqlalchemy.MetaData()
+        # Test consistencygroup_id is not in Table volumes
+        volumes = self.get_table_ref(engine, 'volumes', metadata)
+        self.assertNotIn('consistencygroup_id', volumes.c)
+
+        # Test cgsnapshot_id is not in Table snapshots
+        snapshots = self.get_table_ref(engine, 'snapshots', metadata)
+        self.assertNotIn('cgsnapshot_id', snapshots.c)
+
+        # Verify foreign keys are removed
+        self.assertEqual(0, len(volumes.foreign_keys))
+        self.assertEqual(1, len(snapshots.foreign_keys))
+        # volume_id foreign key is still in Table snapshots
+        fkey, = snapshots.c.volume_id.foreign_keys
+        self.assertEqual(volumes.c.id, fkey.column)
+
+        # Test Table cgsnapshots doesn't exist any more
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  "cgsnapshots"))
+
+        # Test Table consistencygroups doesn't exist any more
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  "consistencygroups"))
+
+    def _pre_upgrade_026(self, engine):
         """Test adding default data for consistencygroups quota class."""
-        for (_key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 25)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
+        quota_class_metadata = db_utils.get_table(engine, 'quota_classes')
 
-            quota_class_metadata = sqlalchemy.Table('quota_classes',
-                                                    metadata,
-                                                    autoload=True)
+        num_defaults = quota_class_metadata.count().\
+            where(quota_class_metadata.c.class_name == 'default').\
+            execute().scalar()
 
-            num_defaults = quota_class_metadata.count().\
-                where(quota_class_metadata.c.class_name == 'default').\
-                execute().scalar()
+        self.assertEqual(3, num_defaults)
 
-            self.assertEqual(3, num_defaults)
+    def _check_026(self, engine, data):
+        quota_class_metadata = db_utils.get_table(engine, 'quota_classes')
+        num_defaults = quota_class_metadata.count().\
+            where(quota_class_metadata.c.class_name == 'default').\
+            execute().scalar()
 
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 26)
+        self.assertEqual(4, num_defaults)
 
-            num_defaults = quota_class_metadata.count().\
-                where(quota_class_metadata.c.class_name == 'default').\
-                execute().scalar()
+    def _post_downgrade_026(self, engine):
+        # Defaults should not be deleted during downgrade
+        quota_class_metadata = db_utils.get_table(engine, 'quota_classes')
+        num_defaults = quota_class_metadata.count().\
+            where(quota_class_metadata.c.class_name == 'default').\
+            execute().scalar()
 
-            self.assertEqual(4, num_defaults)
+        self.assertEqual(4, num_defaults)
 
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 25)
+    def _check_032(self, engine, data):
+        """Test adding volume_type_projects table works correctly."""
+        volume_type_projects = db_utils.get_table(engine,
+                                                  'volume_type_projects')
+        self.assertIsInstance(volume_type_projects.c.created_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(volume_type_projects.c.updated_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(volume_type_projects.c.deleted_at.type,
+                              self.TIME_TYPE)
+        self.assertIsInstance(volume_type_projects.c.deleted.type,
+                              self.BOOL_TYPE)
+        self.assertIsInstance(volume_type_projects.c.id.type,
+                              sqlalchemy.types.INTEGER)
+        self.assertIsInstance(volume_type_projects.c.volume_type_id.type,
+                              sqlalchemy.types.VARCHAR)
+        self.assertIsInstance(volume_type_projects.c.project_id.type,
+                              sqlalchemy.types.VARCHAR)
+
+        volume_types = db_utils.get_table(engine, 'volume_types')
+        self.assertIsInstance(volume_types.c.is_public.type,
+                              self.BOOL_TYPE)
+
+    def _post_downgrade_032(self, engine):
+        self.assertFalse(engine.dialect.has_table(engine.connect(),
+                                                  "volume_type_projects"))
+        volume_types = db_utils.get_table(engine, 'volume_types')
+        self.assertNotIn('is_public', volume_types.c)
 
-            # Defaults should not be deleted during downgrade
-            num_defaults = quota_class_metadata.count().\
-                where(quota_class_metadata.c.class_name == 'default').\
-                execute().scalar()
+    def test_walk_versions(self):
+        self.walk_versions(True, False)
 
-            self.assertEqual(4, num_defaults)
 
-    def test_migration_032(self):
-        """Test adding volume_type_projects table works correctly."""
-        for (key, engine) in self.engines.items():
-            migration_api.version_control(engine,
-                                          TestMigrations.REPOSITORY,
-                                          migration.db_initial_version())
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 31)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 32)
-
-            self.assertTrue(engine.dialect.has_table(engine.connect(),
-                                                     "volume_type_projects"))
-
-            volume_type_projects = sqlalchemy.Table('volume_type_projects',
-                                                    metadata,
-                                                    autoload=True)
-            self.assertIsInstance(volume_type_projects.c.created_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(volume_type_projects.c.updated_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(volume_type_projects.c.deleted_at.type,
-                                  self.time_type[engine.name])
-            self.assertIsInstance(volume_type_projects.c.deleted.type,
-                                  self.bool_type[engine.name])
-            self.assertIsInstance(volume_type_projects.c.id.type,
-                                  sqlalchemy.types.INTEGER)
-            self.assertIsInstance(volume_type_projects.c.volume_type_id.type,
-                                  sqlalchemy.types.VARCHAR)
-            self.assertIsInstance(volume_type_projects.c.project_id.type,
-                                  sqlalchemy.types.VARCHAR)
-
-            volume_types = sqlalchemy.Table('volume_types',
-                                            metadata,
-                                            autoload=True)
-            self.assertIsInstance(volume_types.c.is_public.type,
-                                  self.bool_type[engine.name])
-
-            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 31)
-            metadata = sqlalchemy.schema.MetaData()
-            metadata.bind = engine
-
-            self.assertFalse(engine.dialect.has_table(engine.connect(),
-                                                      "volume_type_projects"))
-            volume_types = sqlalchemy.Table('volume_types',
-                                            metadata,
-                                            autoload=True)
-            self.assertNotIn('is_public', volume_types.c)
+class TestSqliteMigrations(test_base.DbTestCase,
+                           MigrationsMixin):
+    pass
+
+
+class TestMysqlMigrations(test_base.MySQLOpportunisticTestCase,
+                          MigrationsMixin):
+
+    BOOL_TYPE = sqlalchemy.dialects.mysql.TINYINT
+
+    def test_mysql_innodb(self):
+        """Test that table creation on mysql only builds InnoDB tables."""
+        # add this to the global lists to make reset work with it, it's removed
+        # automatically in tearDown so no need to clean it up here.
+        # sanity check
+        migration.db_sync(engine=self.migrate_engine)
+
+        total = self.migrate_engine.execute(
+            "SELECT count(*) "
+            "from information_schema.TABLES "
+            "where TABLE_SCHEMA='{0}'".format(
+                self.migrate_engine.url.database))
+        self.assertGreater(total.scalar(), 0,
+                           msg="No tables found. Wrong schema?")
+
+        noninnodb = self.migrate_engine.execute(
+            "SELECT count(*) "
+            "from information_schema.TABLES "
+            "where TABLE_SCHEMA='openstack_citest' "
+            "and ENGINE!='InnoDB' "
+            "and TABLE_NAME!='migrate_version'")
+        count = noninnodb.scalar()
+        self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
index d26905bff5bdef49f63ff1e5dc58ac3da8da4542..bf120a66fd41793b7977c1096d46b5d16310be24 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -57,6 +57,9 @@ oslo.messaging.notify.drivers =
     cinder.openstack.common.notifier.rpc_notifier = oslo.messaging.notify._impl_messaging:MessagingDriver
     cinder.openstack.common.notifier.test_notifier = oslo.messaging.notify._impl_test:TestDriver
 
+cinder.database.migration_backend =
+    sqlalchemy = oslo.db.sqlalchemy.migration
+
 [build_sphinx]
 all_files = 1
 build-dir = doc/build
index 6881a08c2d5fc9a9718674498372055e00bcda5c..2815ec8fa0820f13d2f1ee7bcdbf5e590d2c5258 100644 (file)
@@ -11,6 +11,7 @@ mock>=1.0
 mox>=0.5.3
 MySQL-python
 psycopg2
+oslotest>=1.2.0  # Apache-2.0
 sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3
 python-subunit>=0.0.18
 testtools>=0.9.36,!=1.2.0