from sqlalchemy import event
from neutron.db.migration.alembic_migrations import external
+from neutron.db.migration import autogen
from neutron.db.migration.models import head # noqa
from neutron.db import model_base
context.configure(
connection=connection,
target_metadata=target_metadata,
- include_object=include_object
+ include_object=include_object,
+ process_revision_directives=autogen.process_revision_directives
)
try:
--- /dev/null
+# Copyright (c) 2015 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from alembic.operations import ops
+from alembic.util import Dispatcher
+from alembic.util import rev_id as new_rev_id
+
+from neutron.db.migration import cli
+
+_ec_dispatcher = Dispatcher()
+
+
+def process_revision_directives(context, revision, directives):
+ if cli._use_separate_migration_branches(context.config):
+ directives[:] = [
+ directive for directive in _assign_directives(context, directives)
+ ]
+
+
+def _assign_directives(context, directives, phase=None):
+ for directive in directives:
+ decider = _ec_dispatcher.dispatch(directive)
+ if phase is None:
+ phases = cli.MIGRATION_BRANCHES
+ else:
+ phases = (phase,)
+ for phase in phases:
+ decided = decider(context, directive, phase)
+ if decided:
+ yield decided
+
+
+@_ec_dispatcher.dispatch_for(ops.MigrationScript)
+def _migration_script_ops(context, directive, phase):
+ """Generate a new ops.MigrationScript() for a given phase.
+
+ E.g. given an ops.MigrationScript() directive from a vanilla autogenerate
+ and an expand/contract phase name, produce a new ops.MigrationScript()
+ which contains only those sub-directives appropriate to "expand" or
+ "contract". Also ensure that the branch directory exists and that
+ the correct branch labels/depends_on/head revision are set up.
+
+ """
+ version_path = cli._get_version_branch_path(context.config, phase)
+ autogen_kwargs = {}
+ cli._check_bootstrap_new_branch(phase, version_path, autogen_kwargs)
+
+ op = ops.MigrationScript(
+ new_rev_id(),
+ ops.UpgradeOps(ops=[
+ d for d in _assign_directives(
+ context, directive.upgrade_ops.ops, phase)
+ ]),
+ ops.DowngradeOps(ops=[]),
+ message=directive.message,
+ **autogen_kwargs
+ )
+
+ if not op.upgrade_ops.is_empty():
+ return op
+
+
+@_ec_dispatcher.dispatch_for(ops.AddConstraintOp)
+@_ec_dispatcher.dispatch_for(ops.CreateIndexOp)
+@_ec_dispatcher.dispatch_for(ops.CreateTableOp)
+@_ec_dispatcher.dispatch_for(ops.AddColumnOp)
+def _expands(context, directive, phase):
+ if phase == 'expand':
+ return directive
+ else:
+ return None
+
+
+@_ec_dispatcher.dispatch_for(ops.DropConstraintOp)
+@_ec_dispatcher.dispatch_for(ops.DropIndexOp)
+@_ec_dispatcher.dispatch_for(ops.DropTableOp)
+@_ec_dispatcher.dispatch_for(ops.DropColumnOp)
+def _contracts(context, directive, phase):
+ if phase == 'contract':
+ return directive
+ else:
+ return None
+
+
+@_ec_dispatcher.dispatch_for(ops.AlterColumnOp)
+def _alter_column(context, directive, phase):
+ is_expand = phase == 'expand'
+
+ if is_expand and (
+ directive.modify_nullable is True
+ ):
+ return directive
+ elif not is_expand and (
+ directive.modify_nullable is False
+ ):
+ return directive
+ else:
+ raise NotImplementedError(
+ "Don't know if operation is an expand or "
+ "contract at the moment: %s" % directive)
+
+
+@_ec_dispatcher.dispatch_for(ops.ModifyTableOps)
+def _modify_table_ops(context, directive, phase):
+ op = ops.ModifyTableOps(
+ directive.table_name,
+ ops=[
+ d for d in _assign_directives(context, directive.ops, phase)
+ ],
+ schema=directive.schema)
+ if not op.is_empty():
+ return op
return '%s@head' % branch
-def do_revision(config, cmd):
- '''Generate new revision files, one per branch.'''
- addn_kwargs = {
- 'message': CONF.command.message,
- 'autogenerate': CONF.command.autogenerate,
- 'sql': CONF.command.sql,
- }
-
- if _use_separate_migration_branches(config):
- for branch in MIGRATION_BRANCHES:
- version_path = _get_version_branch_path(config, branch)
- addn_kwargs['version_path'] = version_path
- addn_kwargs['head'] = _get_branch_head(branch)
+def _check_bootstrap_new_branch(branch, version_path, addn_kwargs):
+ addn_kwargs['version_path'] = version_path
+ addn_kwargs['head'] = _get_branch_head(branch)
+ if not os.path.exists(version_path):
+ # Bootstrap initial directory structure
+ utils.ensure_dir(version_path)
+ addn_kwargs['branch_label'] = branch
- if not os.path.exists(version_path):
- # Bootstrap initial directory structure
- utils.ensure_dir(version_path)
- # Mark the very first revision in the new branch with its label
- addn_kwargs['branch_label'] = branch
- do_alembic_command(config, cmd, **addn_kwargs)
- else:
- do_alembic_command(config, cmd, **addn_kwargs)
+def do_revision(config, cmd):
+ '''Generate new revision files, one per branch.'''
+ do_alembic_command(config, cmd,
+ message=CONF.command.message,
+ autogenerate=CONF.command.autogenerate,
+ sql=CONF.command.sql)
update_heads_file(config)
import copy
import os
import sys
+import textwrap
+from alembic.autogenerate import api as alembic_ag_api
from alembic import config as alembic_config
+from alembic.operations import ops as alembic_ops
import fixtures
import mock
import pkg_resources
+import sqlalchemy as sa
from neutron.db import migration
+from neutron.db.migration import autogen
from neutron.db.migration import cli
from neutron.tests import base
from neutron.tests.unit import testlib_api
return_value=separate_branches):
if separate_branches:
mock.patch('os.path.exists').start()
- expected_kwargs = [
- {'message': 'message', 'sql': False, 'autogenerate': True,
- 'version_path':
- cli._get_version_branch_path(config, branch),
- 'head': cli._get_branch_head(branch)}
- for config in self.configs
- for branch in cli.MIGRATION_BRANCHES]
- else:
- expected_kwargs = [{
- 'message': 'message', 'sql': False, 'autogenerate': True,
- }]
+ expected_kwargs = [{
+ 'message': 'message', 'sql': False, 'autogenerate': True,
+ }]
self._main_test_helper(
['prog', 'revision', '--autogenerate', '-m', 'message'],
'revision',
[mock.call(mock.ANY, revision) for revision in revisions]
)
+ @mock.patch.object(cli, '_use_separate_migration_branches')
+ @mock.patch.object(cli, '_get_version_branch_path')
+ def test_autogen_process_directives(
+ self,
+ get_version_branch_path,
+ use_separate_migration_branches):
+
+ use_separate_migration_branches.return_value = True
+ get_version_branch_path.side_effect = lambda cfg, branch: (
+ "/foo/expand" if branch == 'expand' else "/foo/contract")
+
+ migration_script = alembic_ops.MigrationScript(
+ 'eced083f5df',
+ # these directives will be split into separate
+ # expand/contract scripts
+ alembic_ops.UpgradeOps(
+ ops=[
+ alembic_ops.CreateTableOp(
+ 'organization',
+ [
+ sa.Column('id', sa.Integer(), primary_key=True),
+ sa.Column('name', sa.String(50), nullable=False)
+ ]
+ ),
+ alembic_ops.ModifyTableOps(
+ 'user',
+ ops=[
+ alembic_ops.AddColumnOp(
+ 'user',
+ sa.Column('organization_id', sa.Integer())
+ ),
+ alembic_ops.CreateForeignKeyOp(
+ 'org_fk', 'user', 'organization',
+ ['organization_id'], ['id']
+ ),
+ alembic_ops.DropConstraintOp(
+ 'user', 'uq_user_org'
+ ),
+ alembic_ops.DropColumnOp(
+ 'user', 'organization_name'
+ )
+ ]
+ )
+ ]
+ ),
+ # these will be discarded
+ alembic_ops.DowngradeOps(
+ ops=[
+ alembic_ops.AddColumnOp(
+ 'user', sa.Column(
+ 'organization_name', sa.String(50), nullable=True)
+ ),
+ alembic_ops.CreateUniqueConstraintOp(
+ 'uq_user_org', 'user',
+ ['user_name', 'organization_name']
+ ),
+ alembic_ops.ModifyTableOps(
+ 'user',
+ ops=[
+ alembic_ops.DropConstraintOp('org_fk', 'user'),
+ alembic_ops.DropColumnOp('user', 'organization_id')
+ ]
+ ),
+ alembic_ops.DropTableOp('organization')
+ ]
+ ),
+ message='create the organization table and '
+ 'replace user.organization_name'
+ )
+
+ directives = [migration_script]
+ autogen.process_revision_directives(
+ mock.Mock(), mock.Mock(), directives
+ )
+
+ expand = directives[0]
+ contract = directives[1]
+ self.assertEqual("/foo/expand", expand.version_path)
+ self.assertEqual("/foo/contract", contract.version_path)
+ self.assertTrue(expand.downgrade_ops.is_empty())
+ self.assertTrue(contract.downgrade_ops.is_empty())
+
+ self.assertEqual(
+ textwrap.dedent("""\
+ ### commands auto generated by Alembic - please adjust! ###
+ op.create_table('organization',
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('name', sa.String(length=50), nullable=False),
+ sa.PrimaryKeyConstraint('id')
+ )
+ op.add_column('user', """
+ """sa.Column('organization_id', sa.Integer(), nullable=True))
+ op.create_foreign_key('org_fk', 'user', """
+ """'organization', ['organization_id'], ['id'])
+ ### end Alembic commands ###"""),
+ alembic_ag_api.render_python_code(expand.upgrade_ops)
+ )
+ self.assertEqual(
+ textwrap.dedent("""\
+ ### commands auto generated by Alembic - please adjust! ###
+ op.drop_constraint('user', 'uq_user_org', type_=None)
+ op.drop_column('user', 'organization_name')
+ ### end Alembic commands ###"""),
+ alembic_ag_api.render_python_code(contract.upgrade_ops)
+ )
+
class TestSafetyChecks(base.BaseTestCase):