# License for the specific language governing permissions and limitations
# under the License.
-import collections
import logging
import pprint
from oslo.config import cfg
from oslo.db.sqlalchemy import test_base
from oslo.db.sqlalchemy import test_migrations
-from oslo.db.sqlalchemy import utils
import pkg_resources as pkg
import sqlalchemy
-import sqlalchemy.sql.expression as expr
-import testscenarios
from neutron.db.migration import cli as migration
from neutron.db.migration.models import head as head_models
]
SERVICE_PLUGINS = _discover_plugins("neutron.service_plugins")
-CORE_PLUGINS = _discover_plugins('neutron.core_plugins')
+CORE_PLUGIN = 'neutron.plugins.ml2.plugin.Ml2Plugin'
class _TestModelsMigrations(test_migrations.ModelsMigrationsSync):
super(_TestModelsMigrations, self).setUp()
self.cfg = self.useFixture(config.Config())
self.cfg.config(service_plugins=SERVICE_PLUGINS,
- core_plugin=self.core_plugin)
+ core_plugin=CORE_PLUGIN)
self.alembic_config = migration.get_alembic_config()
self.alembic_config.neutron_config = cfg.CONF
return super(_TestModelsMigrations, self).include_object(
object_, name, type_, reflected, compare_to)
- def compare_server_default(self, ctxt, ins_col, meta_col,
- insp_def, meta_def, rendered_meta_def):
- return self._compare_server_default(ctxt.bind, meta_col, insp_def,
- meta_def)
-
- # TODO(akamyshnikova):remove _compare_server_default methods when it
- # appears in oslo.db(version>1.0.0)
- @utils.DialectFunctionDispatcher.dispatch_for_dialect("*")
- def _compare_server_default(bind, meta_col, insp_def, meta_def):
- pass
-
- @_compare_server_default.dispatch_for('mysql')
- def _compare_server_default(bind, meta_col, insp_def, meta_def):
- if isinstance(meta_col.type, sqlalchemy.Boolean):
- if meta_def is None or insp_def is None:
- return meta_def != insp_def
- return not (
- isinstance(meta_def.arg, expr.True_) and insp_def == "'1'" or
- isinstance(meta_def.arg, expr.False_) and insp_def == "'0'"
- )
-
- if isinstance(meta_col.type, sqlalchemy.Integer):
- if meta_def is None or insp_def is None:
- return meta_def != insp_def
- return meta_def.arg == insp_def
-
- @_compare_server_default.dispatch_for('postgresql')
- def _compare_server_default(bind, meta_col, insp_def, meta_def):
- if isinstance(meta_col.type, sqlalchemy.Enum):
- if meta_def is None or insp_def is None:
- return meta_def != insp_def
- return insp_def != "'%s'::%s" % (meta_def.arg, meta_col.type.name)
- elif isinstance(meta_col.type, sqlalchemy.String):
- if meta_def is None or insp_def is None:
- return meta_def != insp_def
- return insp_def != "'%s'::character varying" % meta_def.arg
-
def test_models_sync(self):
# drop all tables after a test run
self.addCleanup(self._cleanup)
self.get_engine())
dialect = self.get_engine().dialect.name
self.check_mysql_engine(dialect, insp)
- diff2 = self.compare_foreign_keys(self.get_metadata(),
- self.get_engine())
+ diff2 = self.check_foreign_keys(self.get_metadata(),
+ self.get_engine())
result = filter(self.remove_unrelated_errors, diff1 + diff2)
if result:
self.assertEqual(0, len(noninnodb), "%s non InnoDB tables created" %
noninnodb)
- FKInfo = collections.namedtuple('FKInfo', ['constrained_columns',
- 'referred_table',
- 'referred_columns'])
-
- def compare_foreign_keys(self, metadata, bind):
- """Compare foreign keys between model and db table.
-
- Returns a list that contains information about:
- * should be a new key added or removed existing,
- * name of that key,
- * source table,
- * referred table,
- * constrained columns,
- * referred columns
-
- Output::
-
- [('drop_key',
- 'testtbl_fk_check_fkey',
- 'testtbl',
- fk_info(constrained_columns=(u'fk_check',),
- referred_table=u'table',
- referred_columns=(u'fk_check',)))]
-
- """
-
- diff = []
- insp = sqlalchemy.engine.reflection.Inspector.from_engine(bind)
- # Get all tables from db
- db_tables = insp.get_table_names()
- # Get all tables from models
- model_tables = metadata.tables
- for table in db_tables:
- if table not in model_tables:
- continue
- # Get all necessary information about key of current table from db
- fk_db = dict((self._get_fk_info_from_db(i), i['name'])
- for i in insp.get_foreign_keys(table))
- fk_db_set = set(fk_db.keys())
- # Get all necessary information about key of current table from
- # models
- fk_models = dict((self._get_fk_info_from_model(fk), fk)
- for fk in model_tables[table].foreign_keys)
- fk_models_set = set(fk_models.keys())
- for key in (fk_db_set - fk_models_set):
- diff.append(('drop_key', fk_db[key], table, key))
- LOG.info(("Detected removed foreign key %(fk)r on "
- "table %(table)r"), {'fk': fk_db[key],
- 'table': table})
- for key in (fk_models_set - fk_db_set):
- diff.append(('add_key', fk_models[key], key))
- LOG.info((
- "Detected added foreign key for column %(fk)r on table "
- "%(table)r"), {'fk': fk_models[key].column.name,
- 'table': table})
- return diff
-
- def _get_fk_info_from_db(self, fk):
- return self.FKInfo(tuple(fk['constrained_columns']),
- fk['referred_table'],
- tuple(fk['referred_columns']))
-
- def _get_fk_info_from_model(self, fk):
- return self.FKInfo((fk.parent.name,), fk.column.table.name,
- (fk.column.name,))
-
# Remove some difference that are not mistakes just specific of
# dialects, etc
def remove_unrelated_errors(self, element):
return False
return True
-load_tests = testscenarios.load_tests_apply_scenarios
-
-_scenarios = []
-for plugin in CORE_PLUGINS:
- plugin_name = plugin.split('.')[-1]
- class_name = plugin_name
- _scenarios.append((class_name, {'core_plugin': plugin}))
-
class TestModelsMigrationsMysql(_TestModelsMigrations,
test_base.MySQLOpportunisticTestCase):
- scenarios = _scenarios
+ pass
class TestModelsMigrationsPsql(_TestModelsMigrations,
test_base.PostgreSQLOpportunisticTestCase):
- scenarios = _scenarios
+ pass