import ConfigParser
import os
import urlparse
+import uuid
from migrate.versioning import repository
import sqlalchemy
# We destroy the test data store between each test case,
# and recreate it, which ensures that we have no side-effects
# from the tests
- self._reset_databases()
+ # self._reset_databases()
# remove these from the list so they aren't used in the migration tests
if "mysqlcitest" in self.engines:
try:
if with_data:
data = None
- prerun = getattr(self, "_prerun_%d" % version, None)
+ prerun = getattr(self, "_prerun_%3.3d" % version, None)
if prerun:
data = prerun(engine)
TestMigrations.REPOSITORY))
if with_data:
- check = getattr(self, "_check_%d" % version, None)
+ check = getattr(self, "_check_%3.3d" % version, None)
if check:
check(engine, data)
except Exception:
(version, engine))
raise
- def test_migration_004(self):
- """Test that volume_type_id migration works correctly."""
- for (key, engine) in self.engines.items():
- migration_api.version_control(engine,
- TestMigrations.REPOSITORY,
- migration.INIT_VERSION)
- migration_api.upgrade(engine, TestMigrations.REPOSITORY, 3)
- metadata = sqlalchemy.schema.MetaData()
- metadata.bind = engine
-
- migration_api.upgrade(engine, TestMigrations.REPOSITORY, 4)
- volumes = sqlalchemy.Table('volumes',
- metadata,
- autoload=True)
- volume_types = sqlalchemy.Table('volume_types',
- metadata,
- autoload=True)
- extra_specs = sqlalchemy.Table('volume_type_extra_specs',
- metadata,
- autoload=True)
-
- self.assertTrue(isinstance(volumes.c.volume_type_id.type,
- sqlalchemy.types.VARCHAR))
- self.assertTrue(isinstance(volume_types.c.id.type,
- sqlalchemy.types.VARCHAR))
- self.assertTrue(isinstance(extra_specs.c.volume_type_id.type,
- sqlalchemy.types.VARCHAR))
-
- self.assertTrue(extra_specs.c.volume_type_id.foreign_keys)
+ # migration 004 - change volume types to UUID
+ def _prerun_004(self, engine):
+ data = {
+ 'volumes': [{'id': str(uuid.uuid4()), 'host': 'test1',
+ 'volume_type_id': 1},
+ {'id': str(uuid.uuid4()), 'host': 'test2',
+ 'volume_type_id': 1},
+ {'id': str(uuid.uuid4()), 'host': 'test3',
+ 'volume_type_id': 3},
+ ],
+ 'volume_types': [{'name': 'vtype1'},
+ {'name': 'vtype2'},
+ {'name': 'vtype3'},
+ ],
+ 'volume_type_extra_specs': [{'volume_type_id': 1,
+ 'key': 'v1',
+ 'value': 'hotep',
+ },
+ {'volume_type_id': 1,
+ 'key': 'v2',
+ 'value': 'bending rodrigez',
+ },
+ {'volume_type_id': 2,
+ 'key': 'v3',
+ 'value': 'bending rodrigez',
+ },
+ ]}
+
+ volume_types = get_table(engine, 'volume_types')
+ for vtype in data['volume_types']:
+ r = volume_types.insert().values(vtype).execute()
+ vtype['id'] = r.inserted_primary_key[0]
+
+ volume_type_es = get_table(engine, 'volume_type_extra_specs')
+ for vtes in data['volume_type_extra_specs']:
+ r = volume_type_es.insert().values(vtes).execute()
+ vtes['id'] = r.inserted_primary_key[0]
+
+ volumes = get_table(engine, 'volumes')
+ for vol in data['volumes']:
+ r = volumes.insert().values(vol).execute()
+ vol['id'] = r.inserted_primary_key[0]
+
+ return data
+
+ def _check_004(self, engine, data):
+ volumes = get_table(engine, 'volumes')
+ v1 = volumes.select(volumes.c.id ==
+ data['volumes'][0]['id']
+ ).execute().first()
+ v2 = volumes.select(volumes.c.id ==
+ data['volumes'][1]['id']
+ ).execute().first()
+ v3 = volumes.select(volumes.c.id ==
+ data['volumes'][2]['id']
+ ).execute().first()
+
+ volume_types = get_table(engine, 'volume_types')
+ vt1 = volume_types.select(volume_types.c.name ==
+ data['volume_types'][0]['name']
+ ).execute().first()
+ vt2 = volume_types.select(volume_types.c.name ==
+ data['volume_types'][1]['name']
+ ).execute().first()
+ vt3 = volume_types.select(volume_types.c.name ==
+ data['volume_types'][2]['name']
+ ).execute().first()
+
+ vtes = get_table(engine, 'volume_type_extra_specs')
+ vtes1 = vtes.select(vtes.c.key ==
+ data['volume_type_extra_specs'][0]['key']
+ ).execute().first()
+ vtes2 = vtes.select(vtes.c.key ==
+ data['volume_type_extra_specs'][1]['key']
+ ).execute().first()
+ vtes3 = vtes.select(vtes.c.key ==
+ data['volume_type_extra_specs'][2]['key']
+ ).execute().first()
+
+ self.assertEqual(v1['volume_type_id'], vt1['id'])
+ self.assertEqual(v2['volume_type_id'], vt1['id'])
+ self.assertEqual(v3['volume_type_id'], vt3['id'])
+
+ self.assertEqual(vtes1['volume_type_id'], vt1['id'])
+ self.assertEqual(vtes2['volume_type_id'], vt1['id'])
+ self.assertEqual(vtes3['volume_type_id'], vt2['id'])
def test_migration_005(self):
"""Test that adding source_volid column works correctly."""