HEAD_FILENAME = 'HEAD'
HEADS_FILENAME = 'HEADS'
CURRENT_RELEASE = "liberty"
-RELEASES = (CURRENT_RELEASE,)
-MIGRATION_BRANCHES = ('expand', 'contract')
+
+EXPAND_BRANCH = 'expand'
+CONTRACT_BRANCH = 'contract'
+MIGRATION_BRANCHES = (EXPAND_BRANCH, CONTRACT_BRANCH)
MIGRATION_ENTRYPOINTS = 'neutron.db.alembic_migrations'
migration_entrypoints = {
sql=CONF.command.sql)
-def _get_branch_label(branch, release=None):
- '''Get the latest branch label corresponding to release cycle.'''
- return '%s_%s' % (release or CURRENT_RELEASE, branch)
-
-
def _get_branch_head(branch):
'''Get the latest @head specification for a branch.'''
- return '%s@head' % _get_branch_label(branch)
+ return '%s@head' % branch
def do_revision(config, cmd):
for branch in MIGRATION_BRANCHES:
version_path = _get_version_branch_path(config, branch)
addn_kwargs['version_path'] = version_path
+ addn_kwargs['head'] = _get_branch_head(branch)
if not os.path.exists(version_path):
# Bootstrap initial directory structure
utils.ensure_dir(version_path)
- # Each new release stream of migrations is detached from
- # previous migration chains
- addn_kwargs['head'] = 'base'
# Mark the very first revision in the new branch with its label
- addn_kwargs['branch_label'] = _get_branch_label(branch)
- # TODO(ihrachyshka): ideally, we would also add depends_on here
- # to refer to the head of the previous release stream. But
- # alembic API does not support it yet.
- else:
- addn_kwargs['head'] = _get_branch_head(branch)
+ addn_kwargs['branch_label'] = branch
do_alembic_command(config, cmd, **addn_kwargs)
else:
update_heads_file(config)
+def _get_release_labels(labels):
+ result = set()
+ for label in labels:
+ result.add('%s_%s' % (CURRENT_RELEASE, label))
+ return result
+
+
def _compare_labels(revision, expected_labels):
- # validate that the script has the only label that corresponds to path
+ # validate that the script has expected labels only
bad_labels = revision.branch_labels - expected_labels
if bad_labels:
+ # NOTE(ihrachyshka): this hack is temporary to accomodate those
+ # projects that already initialized their branches with liberty_*
+ # labels. Let's notify them about the deprecation for now and drop it
+ # later.
+ bad_labels_with_release = (revision.branch_labels -
+ _get_release_labels(expected_labels))
+ if not bad_labels_with_release:
+ alembic_util.warn(
+ _('Release aware branch labels (%s) are deprecated. '
+ 'Please switch to expand@ and contract@ '
+ 'labels.') % bad_labels)
+ return
+
script_name = os.path.basename(revision.path)
alembic_util.err(
_('Unexpected label for script %(script_name)s: %(labels)s') %
)
-def _validate_single_revision_labels(script_dir, revision,
- release=None, branch=None):
- if branch is not None:
- branch_label = _get_branch_label(branch, release=release)
- expected_labels = set([branch_label])
- else:
- expected_labels = set()
+def _validate_single_revision_labels(script_dir, revision, label=None):
+ expected_labels = set()
+ if label is not None:
+ expected_labels.add(label)
_compare_labels(revision, expected_labels)
def _validate_revision(script_dir, revision):
for branch in MIGRATION_BRANCHES:
- for release in RELEASES:
- marker = os.path.join(release, branch)
- if marker in revision.path:
- _validate_single_revision_labels(
- script_dir, revision, release=release, branch=branch)
- return
+ if branch in revision.path:
+ _validate_single_revision_labels(
+ script_dir, revision, label=branch)
+ return
# validate script from branchless part of migration rules
_validate_single_revision_labels(script_dir, revision)
self.assertRaises(
SystemExit, cli._get_subproject_base, 'not-installed')
- def test__get_branch_label_current(self):
- self.assertEqual('%s_fakebranch' % cli.CURRENT_RELEASE,
- cli._get_branch_label('fakebranch'))
-
- def test__get_branch_label_other_release(self):
- self.assertEqual('fakerelease_fakebranch',
- cli._get_branch_label('fakebranch',
- release='fakerelease'))
-
def test__compare_labels_ok(self):
labels = {'label1', 'label2'}
fake_revision = FakeRevision(labels)
script_dir = mock.Mock()
script_dir.get_revision.return_value = fake_down_revision
cli._validate_single_revision_labels(script_dir, fake_revision,
- branch=None)
+ label=None)
expected_labels = set()
compare_mock.assert_has_calls(
script_dir = mock.Mock()
script_dir.get_revision.return_value = fake_down_revision
cli._validate_single_revision_labels(
- script_dir, fake_revision,
- release='fakerelease', branch='fakebranch')
+ script_dir, fake_revision, label='fakebranch')
- expected_labels = {'fakerelease_fakebranch'}
+ expected_labels = {'fakebranch'}
compare_mock.assert_has_calls(
[mock.call(revision, expected_labels)
for revision in (fake_revision, fake_down_revision)]
def test__validate_revision_validates_branches(self, validate_mock):
script_dir = mock.Mock()
fake_revision = FakeRevision()
- release = cli.RELEASES[0]
branch = cli.MIGRATION_BRANCHES[0]
- fake_revision.path = os.path.join('/fake/path', release, branch)
+ fake_revision.path = os.path.join('/fake/path', branch)
cli._validate_revision(script_dir, fake_revision)
validate_mock.assert_called_with(
- script_dir, fake_revision, release=release, branch=branch)
+ script_dir, fake_revision, label=branch)
@mock.patch.object(cli, '_validate_single_revision_labels')
def test__validate_revision_validates_branchless_migrations(