from alembic import script as alembic_script
from alembic import util as alembic_util
from oslo_config import cfg
+from oslo_utils import fileutils
from oslo_utils import importutils
import pkg_resources
from neutron.common import utils
-# TODO(ihrachyshka): maintain separate HEAD files per branch
HEAD_FILENAME = 'HEAD'
HEADS_FILENAME = 'HEADS'
CURRENT_RELEASE = "liberty"
def do_check_migration(config, cmd):
do_alembic_command(config, 'branches')
validate_revisions(config)
- validate_heads_file(config)
+ validate_head_file(config)
def add_alembic_subparser(sub, cmd):
message=CONF.command.message,
autogenerate=CONF.command.autogenerate,
sql=CONF.command.sql)
- update_heads_file(config)
+ update_head_file(config)
def _get_release_labels(labels):
def validate_revisions(config):
script_dir = alembic_script.ScriptDirectory.from_config(config)
- revisions = [v for v in script_dir.walk_revisions(base='base',
- head='heads')]
+ revisions = _get_revisions(script_dir)
- branchpoints = []
for revision in revisions:
- if revision.is_branch_point:
- branchpoints.append(revision)
-
_validate_revision(script_dir, revision)
+ branchpoints = _get_branch_points(script_dir)
if len(branchpoints) > 1:
branchpoints = ', '.join(p.revision for p in branchpoints)
alembic_util.err(
)
-def _get_sorted_heads(script):
- '''Get the list of heads for all branches, sorted.'''
- return sorted(script.get_heads())
+def _get_revisions(script):
+ return list(script.walk_revisions(base='base', head='heads'))
-def validate_heads_file(config):
- '''Check that HEADS file contains the latest heads for each branch.'''
+def _get_branch_points(script):
+ branchpoints = []
+ for revision in _get_revisions(script):
+ if revision.is_branch_point:
+ branchpoints.append(revision)
+ return branchpoints
+
+
+def validate_head_file(config):
+ '''Check that HEAD file contains the latest head for the branch.'''
+ if _use_separate_migration_branches(config):
+ return
+
script = alembic_script.ScriptDirectory.from_config(config)
- expected_heads = _get_sorted_heads(script)
- heads_path = _get_active_head_file_path(config)
+ expected_head = script.get_heads()
+ head_path = _get_head_file_path(config)
try:
- with open(heads_path) as file_:
- observed_heads = file_.read().split()
- if observed_heads == expected_heads:
+ with open(head_path) as file_:
+ observed_head = file_.read().split()
+ if observed_head == expected_head:
return
except IOError:
pass
alembic_util.err(
- _('HEADS file does not match migration timeline heads, expected: %s')
- % ', '.join(expected_heads))
+ _('HEAD file does not match migration timeline head, expected: %s')
+ % expected_head)
-def update_heads_file(config):
- '''Update HEADS file with the latest branch heads.'''
- script = alembic_script.ScriptDirectory.from_config(config)
- heads = _get_sorted_heads(script)
- heads_path = _get_active_head_file_path(config)
- with open(heads_path, 'w+') as f:
- f.write('\n'.join(heads))
+def update_head_file(config):
+ '''Update HEAD file with the latest branch head.'''
+ head_file = _get_head_file_path(config)
+
if _use_separate_migration_branches(config):
- old_head_file = _get_head_file_path(config)
- if os.path.exists(old_head_file):
- os.remove(old_head_file)
+ # Kill any HEAD(S) files because we don't rely on them for branch-aware
+ # chains anymore
+ files_to_remove = [head_file, _get_heads_file_path(config)]
+ for file_ in files_to_remove:
+ fileutils.delete_if_exists(file_)
+ return
+
+ script = alembic_script.ScriptDirectory.from_config(config)
+ head = script.get_heads()
+ with open(head_file, 'w+') as f:
+ f.write('\n'.join(head))
def add_command_parsers(subparsers):
def _get_heads_file_path(config):
- '''Return the path of the file that contains all latest heads, sorted.'''
+ '''
+ Return the path of the file that was once used to maintain the list of
+ latest heads.
+ '''
return os.path.join(
_get_root_versions_dir(config),
HEADS_FILENAME)
-def _get_active_head_file_path(config):
- '''Return the path of the file that contains latest head(s), depending on
- whether multiple branches are used.
- '''
- if _use_separate_migration_branches(config):
- return _get_heads_file_path(config)
- return _get_head_file_path(config)
-
-
def _get_version_branch_path(config, branch=None):
version_path = _get_root_versions_dir(config)
if branch:
def _use_separate_migration_branches(config):
'''Detect whether split migration branches should be used.'''
- return (CONF.split_branches or
- # Use HEADS file to indicate the new, split migration world
- os.path.exists(_get_heads_file_path(config)))
+ if CONF.split_branches:
+ return True
+
+ script_dir = alembic_script.ScriptDirectory.from_config(config)
+ if _get_branch_points(script_dir):
+ return True
+
+ return False
def _set_version_locations(config):
'''Make alembic see all revisions in all migration branches.'''
version_paths = [_get_version_branch_path(config)]
- if _use_separate_migration_branches(config):
- for branch in MIGRATION_BRANCHES:
- version_paths.append(_get_version_branch_path(config, branch))
+ for branch in MIGRATION_BRANCHES:
+ version_path = _get_version_branch_path(config, branch)
+ if os.path.exists(version_path):
+ version_paths.append(version_path)
config.set_main_option('version_locations', ' '.join(version_paths))
from alembic.autogenerate import api as alembic_ag_api
from alembic import config as alembic_config
from alembic.operations import ops as alembic_ops
+from alembic import script as alembic_script
import fixtures
import mock
import pkg_resources
def _main_test_helper(self, argv, func_name, exp_kwargs=[{}]):
with mock.patch.object(sys, 'argv', argv),\
mock.patch.object(cli, 'run_sanity_checks'),\
- mock.patch.object(cli, 'validate_revisions'):
+ mock.patch.object(cli, 'validate_revisions'),\
+ mock.patch.object(cli, '_use_separate_migration_branches'):
cli.main()
self.do_alembic_cmd.assert_has_calls(
self._main_test_helper(['prog', 'history'], 'history')
def test_check_migration(self):
- with mock.patch.object(cli, 'validate_heads_file') as validate:
+ with mock.patch.object(cli, 'validate_head_file') as validate:
self._main_test_helper(['prog', 'check_migration'], 'branches')
self.assertEqual(len(self.projects), validate.call_count)
def _test_database_sync_revision(self, separate_branches=True):
- with mock.patch.object(cli, 'update_heads_file') as update,\
+ with mock.patch.object(cli, 'update_head_file') as update,\
mock.patch.object(cli, '_use_separate_migration_branches',
return_value=separate_branches):
if separate_branches:
def test_downgrade_fails(self):
self.assert_command_fails(['prog', 'downgrade', '--sql', 'juno'])
- def test_upgrade_negative_relative_revision_fails(self):
+ @mock.patch.object(cli, '_use_separate_migration_branches')
+ def test_upgrade_negative_relative_revision_fails(self, use_mock):
self.assert_command_fails(['prog', 'upgrade', '-2'])
- def test_upgrade_negative_delta_fails(self):
+ @mock.patch.object(cli, '_use_separate_migration_branches')
+ def test_upgrade_negative_delta_fails(self, use_mock):
self.assert_command_fails(['prog', 'upgrade', '--delta', '-2'])
- def test_upgrade_rejects_delta_with_relative_revision(self):
+ @mock.patch.object(cli, '_use_separate_migration_branches')
+ def test_upgrade_rejects_delta_with_relative_revision(self, use_mock):
self.assert_command_fails(['prog', 'upgrade', '+2', '--delta', '3'])
- def _test_validate_heads_file_helper(self, heads, file_heads=None,
- branchless=False):
+ def _test_validate_head_file_helper(self, heads, file_heads=None,
+ branchless=False):
if file_heads is None:
file_heads = []
fake_config = self.configs[0]
mock_open.return_value.read.return_value = (
'\n'.join(file_heads))
- if all(head in file_heads for head in heads):
- cli.validate_heads_file(fake_config)
+ if not branchless or all(head in file_heads for head in heads):
+ cli.validate_head_file(fake_config)
else:
self.assertRaises(
SystemExit,
- cli.validate_heads_file,
+ cli.validate_head_file,
fake_config
)
self.assertTrue(self.mock_alembic_err.called)
mock_open.assert_called_with(
cli._get_head_file_path(fake_config))
else:
- mock_open.assert_called_with(
- cli._get_heads_file_path(fake_config))
+ self.assertFalse(mock_open.called)
- fc.assert_called_once_with(fake_config)
+ if branchless:
+ fc.assert_called_once_with(fake_config)
+ else:
+ self.assertFalse(fc.called)
- def test_validate_heads_file_multiple_heads(self):
- self._test_validate_heads_file_helper(['a', 'b'])
+ def test_validate_head_file_multiple_heads(self):
+ self._test_validate_head_file_helper(['a', 'b'])
- def test_validate_heads_file_missing_file(self):
- self._test_validate_heads_file_helper(['a'])
+ def test_validate_head_file_missing_file(self):
+ self._test_validate_head_file_helper(['a'])
- def test_validate_heads_file_wrong_contents(self):
- self._test_validate_heads_file_helper(['a'], ['b'])
+ def test_validate_head_file_wrong_contents(self):
+ self._test_validate_head_file_helper(['a'], ['b'])
- def test_validate_heads_success(self):
- self._test_validate_heads_file_helper(['a'], ['a'])
+ def test_validate_head_file_success(self):
+ self._test_validate_head_file_helper(['a'], ['a'])
@mock.patch.object(cli, '_use_separate_migration_branches',
return_value=False)
- def test_validate_heads_file_branchless_failure(self, *args):
- self._test_validate_heads_file_helper(['a'], ['b'], branchless=True)
+ def test_validate_head_file_branchless_failure(self, *args):
+ self._test_validate_head_file_helper(['a'], ['b'], branchless=True)
@mock.patch.object(cli, '_use_separate_migration_branches',
return_value=False)
- def test_validate_heads_file_branchless_success(self, *args):
- self._test_validate_heads_file_helper(['a'], ['a'], branchless=True)
-
- def test_update_heads_file_two_heads(self):
- with mock.patch('alembic.script.ScriptDirectory.from_config') as fc:
- heads = ('b', 'a')
- fc.return_value.get_heads.return_value = heads
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
-
- cli.update_heads_file(self.configs[0])
- mock_open.return_value.write.assert_called_once_with(
- '\n'.join(sorted(heads)))
-
- @mock.patch('os.path.exists')
- @mock.patch('os.remove')
- def test_update_heads_file_success(self, *os_mocks):
- with mock.patch('alembic.script.ScriptDirectory.from_config') as fc:
- heads = ('a', 'b')
- fc.return_value.get_heads.return_value = heads
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
-
- cli.update_heads_file(self.configs[0])
- mock_open.return_value.write.assert_called_once_with(
- '\n'.join(heads))
-
- old_head_file = cli._get_head_file_path(self.configs[0])
- for mock_ in os_mocks:
- mock_.assert_called_with(old_head_file)
+ def test_validate_head_file_branchless_success(self, *args):
+ self._test_validate_head_file_helper(['a'], ['a'], branchless=True)
def test_get_project_base(self):
config = alembic_config.Config()
self.assertRaises(
SystemExit, cli.validate_revisions, self.configs[0])
+ @mock.patch('alembic.script.ScriptDirectory.walk_revisions')
+ def test__get_branch_points(self, walk_mock):
+ revisions = [FakeRevision(is_branch_point=tools.get_random_boolean)
+ for i in range(50)]
+ walk_mock.return_value = revisions
+ script_dir = alembic_script.ScriptDirectory.from_config(
+ self.configs[0])
+ self.assertEqual(set(rev for rev in revisions if rev.is_branch_point),
+ set(cli._get_branch_points(script_dir)))
+
@mock.patch.object(cli, '_use_separate_migration_branches')
@mock.patch.object(cli, '_get_version_branch_path')
def test_autogen_process_directives(
alembic_ag_api.render_python_code(contract.upgrade_ops)
)
+ @mock.patch.object(cli, '_get_branch_points', return_value=[])
+ @mock.patch.object(cli.CONF, 'split_branches',
+ new_callable=mock.PropertyMock,
+ return_value=True, create=True)
+ def test__use_separate_migration_branches_enforced(self, *mocks):
+ self.assertTrue(cli._use_separate_migration_branches(self.configs[0]))
+
+ @mock.patch.object(cli, '_get_branch_points', return_value=[])
+ def test__use_separate_migration_branches_no_branch_points(self, *mocks):
+ self.assertFalse(cli._use_separate_migration_branches(self.configs[0]))
+
+ @mock.patch.object(cli, '_get_branch_points', return_value=['fake1'])
+ def test__use_separate_migration_branches_with_branch_points(self, *mocks):
+ self.assertTrue(cli._use_separate_migration_branches(self.configs[0]))
+
class TestSafetyChecks(base.BaseTestCase):