self.volume_api.detach(context, volume)
return webob.Response(status_int=202)
+ @wsgi.action('os-migrate_volume')
+ def _migrate_volume(self, req, id, body):
+ """Migrate a volume to the specified host."""
+ context = req.environ['cinder.context']
+ self.authorize(context, 'migrate_volume')
+ try:
+ volume = self._get(context, id)
+ except exception.NotFound:
+ raise exc.HTTPNotFound()
+ params = body['os-migrate_volume']
+ host = params['host']
+ force_host_copy = params.get('force_host_copy', False)
+ self.volume_api.migrate_volume(context, volume, host, force_host_copy)
+ return webob.Response(status_int=202)
+
class SnapshotAdminController(AdminController):
"""AdminController for Snapshots."""
session)
+def finish_volume_migration(context, src_vol_id, dest_vol_id):
+ """Perform database updates upon completion of volume migration."""
+ return IMPL.finish_volume_migration(context, src_vol_id, dest_vol_id)
+
+
def volume_destroy(context, volume_id):
"""Destroy the volume or raise if it does not exist."""
return IMPL.volume_destroy(context, volume_id)
session)
+@require_admin_context
+def finish_volume_migration(context, src_vol_id, dest_vol_id):
+ """Copy almost all columns from dest to source, then delete dest."""
+ session = get_session()
+ with session.begin():
+ dest_volume_ref = _volume_get(context, dest_vol_id, session=session)
+ updates = {}
+ for key, value in dest_volume_ref.iteritems():
+ if key in ['id', 'status']:
+ continue
+ updates[key] = value
+ session.query(models.Volume).\
+ filter_by(id=src_vol_id).\
+ update(updates)
+ session.query(models.Volume).\
+ filter_by(id=dest_vol_id).\
+ delete()
+
+
@require_admin_context
def volume_destroy(context, volume_id):
session = get_session()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from sqlalchemy import String, Column, MetaData, Table
+
+
+def upgrade(migrate_engine):
+ """Add _name_id column to volumes."""
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ volumes = Table('volumes', meta, autoload=True)
+ _name_id = Column('_name_id', String(36))
+ volumes.create_column(_name_id)
+ volumes.update().values(_name_id=None).execute()
+
+
+def downgrade(migrate_engine):
+ """Remove _name_id column from volumes."""
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ volumes = Table('volumes', meta, autoload=True)
+ _name_id = volumes.columns._name_id
+ volumes.drop_column(_name_id)
--- /dev/null
+BEGIN TRANSACTION;
+
+CREATE TABLE volumes_v12 (
+ created_at DATETIME,
+ updated_at DATETIME,
+ deleted_at DATETIME,
+ deleted BOOLEAN,
+ id VARCHAR(36) NOT NULL,
+ ec2_id INTEGER,
+ user_id VARCHAR(255),
+ project_id VARCHAR(255),
+ snapshot_id VARCHAR(36),
+ host VARCHAR(255),
+ size INTEGER,
+ availability_zone VARCHAR(255),
+ instance_uuid VARCHAR(36),
+ attached_host VARCHAR(255),
+ mountpoint VARCHAR(255),
+ attach_time VARCHAR(255),
+ status VARCHAR(255),
+ attach_status VARCHAR(255),
+ scheduled_at DATETIME,
+ launched_at DATETIME,
+ terminated_at DATETIME,
+ display_name VARCHAR(255),
+ display_description VARCHAR(255),
+ provider_location VARCHAR(255),
+ provider_auth VARCHAR(255),
+ volume_type_id VARCHAR(36),
+ source_volid VARCHAR(36),
+ bootable BOOLEAN,
+ PRIMARY KEY (id)
+);
+
+INSERT INTO volumes_v12
+ SELECT created_at,
+ updated_at,
+ deleted_at,
+ deleted,
+ id,
+ ec2_id,
+ user_id,
+ project_id,
+ snapshot_id,
+ host,
+ size,
+ availability_zone,
+ instance_uuid,
+ attached_host,
+ mountpoint,
+ attach_time,
+ status,
+ attach_status,
+ scheduled_at,
+ launched_at,
+ terminated_at,
+ display_name,
+ display_description,
+ provider_location,
+ provider_auth,
+ volume_type_id,
+ source_volid,
+ bootable
+ FROM volumes;
+
+DROP TABLE volumes;
+ALTER TABLE volumes_v12 RENAME TO volumes;
+COMMIT;
"""Represents a block storage device that can be attached to a vm."""
__tablename__ = 'volumes'
id = Column(String(36), primary_key=True)
+ _name_id = Column(String(36)) # Don't access/modify this directly!
+
+ @property
+ def name_id(self):
+ return self.id if not self._name_id else self._name_id
+
+ @name_id.setter
+ def name_id(self, value):
+ self._name_id = value
@property
def name(self):
- return CONF.volume_name_template % self.id
+ return CONF.volume_name_template % self.name_id
ec2_id = Column(Integer)
user_id = Column(String(255))
message = _("Invalid content type %(content_type)s.")
+class InvalidHost(Invalid):
+ message = _("Invalid host") + ": %(reason)s"
+
+
# Cannot be templated as the error syntax varies.
# msg needs to be constructed when raised.
class InvalidParameterValue(Invalid):
class TransferNotFound(NotFound):
message = _("Transfer %(transfer_id)s could not be found.")
+
+
+class VolumeMigrationFailed(CinderException):
+ message = _("Volume migration failed") + ": %(reason)s"
+
+
+class ProtocolNotSupported(CinderException):
+ message = _("Connect to volume via protocol %(protocol)s not supported.")
"""Filter a list of hosts based on request_spec."""
filter_properties = kwargs.get('filter_properties', {})
+ if not filter_properties:
+ filter_properties = {}
ignore_hosts = filter_properties.get('ignore_hosts', [])
hosts = [host for host in hosts if host not in ignore_hosts]
return hosts
- def _schedule(self, context, topic, request_spec, **kwargs):
- """Picks a host that is up at random."""
+ def _get_weighted_candidates(self, context, topic, request_spec, **kwargs):
+ """Returns a list of the available hosts."""
elevated = context.elevated()
hosts = self.hosts_up(elevated, topic)
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)
- hosts = self._filter_hosts(request_spec, hosts, **kwargs)
+ return self._filter_hosts(request_spec, hosts, **kwargs)
+
+ def _schedule(self, context, topic, request_spec, **kwargs):
+ """Picks a host that is up at random."""
+ hosts = self._get_weighted_candidates(context, topic,
+ request_spec, **kwargs)
if not hosts:
msg = _("Could not find another host")
raise exception.NoValidHost(reason=msg)
-
return hosts[int(random.random() * len(hosts))]
def schedule_create_volume(self, context, request_spec, filter_properties):
updated_volume = driver.volume_update_db(context, volume_id, host)
self.volume_rpcapi.create_volume(context, updated_volume, host,
snapshot_id, image_id)
+
+ def host_passes_filters(self, context, host, request_spec,
+ filter_properties):
+ """Check if the specified host passes the filters."""
+ weighed_hosts = self._get_weighted_candidates(
+ context,
+ CONF.volume_topic,
+ request_spec,
+ filter_properties=filter_properties)
+
+ for weighed_host in weighed_hosts:
+ if weighed_host == host:
+ elevated = context.elevated()
+ host_states = self.host_manager.get_all_host_states(elevated)
+ for host_state in host_states:
+ if host_state.host == host:
+ return host_state
+
+ msg = (_('cannot place volume %(id)s on %(host)s')
+ % {'id': request_spec['volume_id'], 'host': host})
+ raise exception.NoValidHost(reason=msg)
for service in services
if utils.service_is_up(service)]
+ def host_passes_filters(self, context, volume_id, host, filter_properties):
+ """Check if the specified host passes the filters."""
+ raise NotImplementedError(_("Must implement host_passes_filters"))
+
def schedule(self, context, topic, method, *_args, **_kwargs):
"""Must override schedule method for scheduler to work."""
raise NotImplementedError(_("Must implement a fallback schedule"))
snapshot_id=snapshot_id,
image_id=image_id)
+ def host_passes_filters(self, context, host, request_spec,
+ filter_properties):
+ """Check if the specified host passes the filters."""
+ weighed_hosts = self._get_weighted_candidates(context, request_spec,
+ filter_properties)
+ for weighed_host in weighed_hosts:
+ host_state = weighed_host.obj
+ if host_state.host == host:
+ return host_state
+
+ msg = (_('cannot place volume %(id)s on %(host)s')
+ % {'id': request_spec['volume_id'], 'host': host})
+ raise exception.NoValidHost(reason=msg)
+
def _post_select_populate_filter_properties(self, filter_properties,
host_state):
"""Add additional information to the filter properties after a host has
}
raise exception.NoValidHost(reason=msg)
- def _schedule(self, context, request_spec, filter_properties=None):
+ def _get_weighted_candidates(self, context, request_spec,
+ filter_properties=None):
"""Returns a list of hosts that meet the required specs,
ordered by their fitness.
"""
# host for the job.
weighed_hosts = self.host_manager.get_weighed_hosts(hosts,
filter_properties)
+ return weighed_hosts
+
+ def _schedule(self, context, request_spec, filter_properties=None):
+ weighed_hosts = self._get_weighted_candidates(context, request_spec,
+ filter_properties)
+ if not weighed_hosts:
+ return None
best_host = weighed_hosts[0]
LOG.debug(_("Choosing %s") % best_host)
+ volume_properties = request_spec['volume_properties']
best_host.obj.consume_from_volume(volume_properties)
return best_host
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
- RPC_API_VERSION = '1.2'
+ RPC_API_VERSION = '1.3'
def __init__(self, scheduler_driver=None, service_name=None,
*args, **kwargs):
def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
+
+ def _migrate_volume_set_error(self, context, ex, request_spec):
+ volume_state = {'volume_state': {'status': 'error_migrating'}}
+ self._set_volume_state_and_notify('migrate_volume_to_host',
+ volume_state,
+ context, ex, request_spec)
+
+ def migrate_volume_to_host(self, context, topic, volume_id, host,
+ force_host_copy, request_spec,
+ filter_properties=None):
+ """Ensure that the host exists and can accept the volume."""
+ try:
+ tgt_host = self.driver.host_passes_filters(context, host,
+ request_spec,
+ filter_properties)
+ except exception.NoValidHost as ex:
+ self._migrate_volume_set_error(context, ex, request_spec)
+ except Exception as ex:
+ with excutils.save_and_reraise_exception():
+ self._migrate_volume_set_error(context, ex, request_spec)
+ else:
+ volume_ref = db.volume_get(context, volume_id)
+ volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
+ tgt_host,
+ force_host_copy)
1.1 - Add create_volume() method
1.2 - Add request_spec, filter_properties arguments
to create_volume()
+ 1.3 - Add migrate_volume_to_host() method
'''
RPC_API_VERSION = '1.0'
filter_properties=filter_properties),
version='1.2')
+ def migrate_volume_to_host(self, ctxt, topic, volume_id, host,
+ force_host_copy=False, request_spec=None,
+ filter_properties=None):
+ request_spec_p = jsonutils.to_primitive(request_spec)
+ return self.cast(ctxt, self.make_msg(
+ 'migrate_volume_to_host',
+ topic=topic,
+ volume_id=volume_id,
+ host=host,
+ force_host_copy=force_host_copy,
+ request_spec=request_spec_p,
+ filter_properties=filter_properties),
+ version='1.3')
+
def update_service_capabilities(self, ctxt,
service_name, host,
capabilities):
import tempfile
import webob
+from oslo.config import cfg
+
from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import jsonutils
+from cinder.openstack.common import timeutils
from cinder import test
from cinder.tests.api import fakes
from cinder.tests.api.v2 import stubs
from cinder.volume import api as volume_api
+CONF = cfg.CONF
+
def app():
# no auth, just let environ['cinder.context'] pass through
mountpoint)
# cleanup
svc.stop()
+
+ def _migrate_volume_prep(self):
+ admin_ctx = context.get_admin_context()
+ # create volume's current host and the destination host
+ db.service_create(admin_ctx,
+ {'host': 'test',
+ 'topic': CONF.volume_topic,
+ 'created_at': timeutils.utcnow()})
+ db.service_create(admin_ctx,
+ {'host': 'test2',
+ 'topic': CONF.volume_topic,
+ 'created_at': timeutils.utcnow()})
+ # current status is available
+ volume = db.volume_create(admin_ctx,
+ {'status': 'available',
+ 'host': 'test',
+ 'provider_location': '',
+ 'attach_status': ''})
+ return volume
+
+ def _migrate_volume_exec(self, ctx, volume, host, expected_status):
+ admin_ctx = context.get_admin_context()
+ # build request to migrate to host
+ req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
+ req.method = 'POST'
+ req.headers['content-type'] = 'application/json'
+ req.body = jsonutils.dumps({'os-migrate_volume': {'host': host}})
+ req.environ['cinder.context'] = ctx
+ resp = req.get_response(app())
+ # verify status
+ self.assertEquals(resp.status_int, expected_status)
+ volume = db.volume_get(admin_ctx, volume['id'])
+ return volume
+
+ def test_migrate_volume_success(self):
+ expected_status = 202
+ host = 'test2'
+ ctx = context.RequestContext('admin', 'fake', True)
+ volume = self._migrate_volume_prep()
+ volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
+ self.assertEquals(volume['status'], 'migrating')
+
+ def test_migrate_volume_as_non_admin(self):
+ expected_status = 403
+ host = 'test2'
+ ctx = context.RequestContext('fake', 'fake')
+ volume = self._migrate_volume_prep()
+ self._migrate_volume_exec(ctx, volume, host, expected_status)
+
+ def test_migrate_volume_host_no_exist(self):
+ expected_status = 400
+ host = 'test3'
+ ctx = context.RequestContext('admin', 'fake', True)
+ volume = self._migrate_volume_prep()
+ self._migrate_volume_exec(ctx, volume, host, expected_status)
+
+ def test_migrate_volume_same_host(self):
+ expected_status = 400
+ host = 'test'
+ ctx = context.RequestContext('admin', 'fake', True)
+ volume = self._migrate_volume_prep()
+ self._migrate_volume_exec(ctx, volume, host, expected_status)
+
+ def test_migrate_volume_in_use(self):
+ expected_status = 400
+ host = 'test2'
+ ctx = context.RequestContext('admin', 'fake', True)
+ volume = self._migrate_volume_prep()
+ model_update = {'status': 'in-use'}
+ volume = db.volume_update(ctx, volume['id'], model_update)
+ self._migrate_volume_exec(ctx, volume, host, expected_status)
+
+ def test_migrate_volume_with_snap(self):
+ expected_status = 400
+ host = 'test2'
+ ctx = context.RequestContext('admin', 'fake', True)
+ volume = self._migrate_volume_prep()
+ db.snapshot_create(ctx, {'volume_id': volume['id']})
+ self._migrate_volume_exec(ctx, volume, host, expected_status)
from cinder.brick.initiator import linuxscsi
from cinder.openstack.common import log as logging
from cinder import test
+from cinder import utils
LOG = logging.getLogger(__name__)
--- /dev/null
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Tests for finish_volume_migration."""
+
+
+from cinder import context
+from cinder import db
+from cinder import exception
+from cinder import test
+from cinder.tests import utils as testutils
+
+
+class FinishVolumeMigrationTestCase(test.TestCase):
+ """Test cases for finish_volume_migration."""
+
+ def setUp(self):
+ super(FinishVolumeMigrationTestCase, self).setUp()
+
+ def tearDown(self):
+ super(FinishVolumeMigrationTestCase, self).tearDown()
+
+ def test_finish_volume_migration(self):
+ ctxt = context.RequestContext(user_id='user_id',
+ project_id='project_id',
+ is_admin=True)
+ src_volume = testutils.create_volume(ctxt, host='src',
+ status='migrating')
+ dest_volume = testutils.create_volume(ctxt, host='dest',
+ status='migration_target')
+ db.finish_volume_migration(ctxt, src_volume['id'],
+ dest_volume['id'])
+
+ self.assertRaises(exception.VolumeNotFound, db.volume_get, ctxt,
+ dest_volume['id'])
+ src_volume = db.volume_get(ctxt, src_volume['id'])
+ self.assertEqual(src_volume['host'], 'dest')
+ self.assertEqual(src_volume['status'], 'migrating')
--- /dev/null
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Tests for volume name_id."""
+
+from oslo.config import cfg
+
+from cinder import context
+from cinder import db
+from cinder import test
+from cinder.tests import utils as testutils
+
+
+CONF = cfg.CONF
+
+
+class NameIDsTestCase(test.TestCase):
+ """Test cases for naming volumes with name_id."""
+
+ def setUp(self):
+ super(NameIDsTestCase, self).setUp()
+ self.ctxt = context.RequestContext(user_id='user_id',
+ project_id='project_id')
+
+ def tearDown(self):
+ super(NameIDsTestCase, self).tearDown()
+
+ def test_name_id_same(self):
+ """New volume should have same 'id' and 'name_id'."""
+ vol_ref = testutils.create_volume(self.ctxt, size=1)
+ self.assertEqual(vol_ref['name_id'], vol_ref['id'])
+ expected_name = CONF.volume_name_template % vol_ref['id']
+ self.assertEqual(vol_ref['name'], expected_name)
+
+ def test_name_id_diff(self):
+ """Change name ID to mimic volume after migration."""
+ vol_ref = testutils.create_volume(self.ctxt, size=1)
+ db.volume_update(self.ctxt, vol_ref['id'], {'name_id': 'fake'})
+ vol_ref = db.volume_get(self.ctxt, vol_ref['id'])
+ expected_name = CONF.volume_name_template % 'fake'
+ self.assertEqual(vol_ref['name'], expected_name)
"volume_extension:volume_admin_actions:force_delete": [["rule:admin_api"]],
"volume_extension:snapshot_admin_actions:force_delete": [["rule:admin_api"]],
"volume_extension:volume_admin_actions:force_detach": [["rule:admin_api"]],
+ "volume_extension:volume_admin_actions:migrate_volume": [["rule:admin_api"]],
"volume_extension:volume_actions:upload_image": [],
"volume_extension:types_manage": [],
"volume_extension:types_extra_specs": [],
filter_properties['retry']['hosts'][0])
self.assertEqual(1024, host_state.total_capacity_gb)
+
+ def _host_passes_filters_setup(self):
+ self.next_weight = 1.0
+
+ def _fake_weigh_objects(_self, functions, hosts, options):
+ self.next_weight += 2.0
+ host_state = hosts[0]
+ return [weights.WeighedHost(host_state, self.next_weight)]
+
+ sched = fakes.FakeFilterScheduler()
+ sched.host_manager = fakes.FakeHostManager()
+ fake_context = context.RequestContext('user', 'project',
+ is_admin=True)
+
+ self.stubs.Set(sched.host_manager, 'get_filtered_hosts',
+ fake_get_filtered_hosts)
+ self.stubs.Set(weights.HostWeightHandler,
+ 'get_weighed_objects', _fake_weigh_objects)
+ fakes.mox_host_manager_db_calls(self.mox, fake_context)
+
+ self.mox.ReplayAll()
+ return (sched, fake_context)
+
+ @testtools.skipIf(not test_utils.is_cinder_installed(),
+ 'Test requires Cinder installed (try setup.py develop')
+ def test_host_passes_filters_happy_day(self):
+ """Do a successful pass through of with host_passes_filters()."""
+ sched, ctx = self._host_passes_filters_setup()
+ request_spec = {'volume_id': 1,
+ 'volume_type': {'name': 'LVM_iSCSI'},
+ 'volume_properties': {'project_id': 1,
+ 'size': 1}}
+ ret_host = sched.host_passes_filters(ctx, 'host1', request_spec, {})
+ self.assertEqual(ret_host.host, 'host1')
+
+ @testtools.skipIf(not test_utils.is_cinder_installed(),
+ 'Test requires Cinder installed (try setup.py develop')
+ def test_host_passes_filters_no_capacity(self):
+ """Fail the host due to insufficient capacity."""
+ sched, ctx = self._host_passes_filters_setup()
+ request_spec = {'volume_id': 1,
+ 'volume_type': {'name': 'LVM_iSCSI'},
+ 'volume_properties': {'project_id': 1,
+ 'size': 1024}}
+ self.assertRaises(exception.NoValidHost,
+ sched.host_passes_filters,
+ ctx, 'host1', request_spec, {})
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.2')
+
+ def test_migrate_volume_to_host(self):
+ self._test_scheduler_api('migrate_volume_to_host',
+ rpc_method='cast',
+ topic='topic',
+ volume_id='volume_id',
+ host='host',
+ force_host_copy=True,
+ request_spec='fake_request_spec',
+ filter_properties='filter_properties',
+ version='1.3')
capabilities=capabilities)
def test_create_volume_exception_puts_volume_in_error_state(self):
- """Test that a NoValideHost exception for create_volume.
+ """Test NoValidHost exception behavior for create_volume.
Puts the volume in 'error' state and eats the exception.
"""
request_spec=request_spec,
filter_properties={})
+ def test_migrate_volume_exception_puts_volume_in_error_state(self):
+ """Test NoValidHost exception behavior for migrate_volume_to_host.
+
+ Puts the volume in 'error_migrating' state and eats the exception.
+ """
+ fake_volume_id = 1
+ self._mox_schedule_method_helper('host_passes_filters')
+ self.mox.StubOutWithMock(db, 'volume_update')
+
+ topic = 'fake_topic'
+ volume_id = fake_volume_id
+ request_spec = {'volume_id': fake_volume_id}
+
+ self.manager.driver.host_passes_filters(
+ self.context, 'host',
+ request_spec, {}).AndRaise(exception.NoValidHost(reason=""))
+ db.volume_update(self.context, fake_volume_id,
+ {'status': 'error_migrating'})
+
+ self.mox.ReplayAll()
+ self.manager.migrate_volume_to_host(self.context, topic, volume_id,
+ 'host', True,
+ request_spec=request_spec,
+ filter_properties={})
+
def _mox_schedule_method_helper(self, method_name):
# Make sure the method exists that we're going to test call
def stub_method(*args, **kwargs):
metadata,
autoload=True)
self.assertTrue('provider_geometry' not in volumes.c)
+
+ def test_migration_014(self):
+ """Test that adding _name_id column works correctly."""
+ for (key, engine) in self.engines.items():
+ migration_api.version_control(engine,
+ TestMigrations.REPOSITORY,
+ migration.INIT_VERSION)
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 13)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 14)
+ volumes = sqlalchemy.Table('volumes',
+ metadata,
+ autoload=True)
+ self.assertTrue(isinstance(volumes.c._name_id.type,
+ sqlalchemy.types.VARCHAR))
+
+ migration_api.downgrade(engine, TestMigrations.REPOSITORY, 13)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ volumes = sqlalchemy.Table('volumes',
+ metadata,
+ autoload=True)
+ self.assertTrue('_name_id' not in volumes.c)
import os
import re
import shutil
+import socket
import tempfile
import mox
from oslo.config import cfg
+from cinder.brick.initiator import connector as brick_conn
from cinder.brick.iscsi import iscsi
from cinder import context
from cinder import db
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume.drivers import lvm
+from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
self.assertEqual(expected, azs)
+ def test_migrate_volume_driver(self):
+ """Test volume migration done by driver."""
+ # stub out driver and rpc functions
+ self.stubs.Set(self.volume.driver, 'migrate_volume',
+ lambda x, y, z: (True, {'user_id': 'foo'}))
+
+ volume = self._create_volume(status='migrating')
+ host_obj = {'host': 'newhost', 'capabilities': {}}
+ self.volume.migrate_volume(self.context, volume['id'],
+ host_obj, False)
+
+ # check volume properties
+ volume = db.volume_get(context.get_admin_context(), volume['id'])
+ self.assertEquals(volume['host'], 'newhost')
+ self.assertEquals(volume['status'], 'available')
+
+ def test_migrate_volume_generic(self):
+ """Test the generic offline volume migration."""
+ def fake_migr(vol, host):
+ raise Exception('should not be called')
+
+ def fake_delete_volume_rpc(self, ctxt, vol_id):
+ raise Exception('should not be called')
+
+ def fake_create_volume(self, ctxt, volume, host, req_spec, filters):
+ db.volume_update(ctxt, volume['id'],
+ {'status': 'migration_target'})
+
+ def fake_rename_volume(self, ctxt, volume, new_name_id):
+ db.volume_update(ctxt, volume['id'], {'name_id': new_name_id})
+
+ self.stubs.Set(self.volume.driver, 'migrate_volume', fake_migr)
+ self.stubs.Set(volume_rpcapi.VolumeAPI, 'create_volume',
+ fake_create_volume)
+ self.stubs.Set(self.volume.driver, 'copy_volume_data',
+ lambda x, y, z, remote='dest': True)
+ self.stubs.Set(volume_rpcapi.VolumeAPI, 'delete_volume',
+ fake_delete_volume_rpc)
+ self.stubs.Set(volume_rpcapi.VolumeAPI, 'rename_volume',
+ fake_rename_volume)
+
+ volume = self._create_volume(status='migrating')
+ host_obj = {'host': 'newhost', 'capabilities': {}}
+ self.volume.migrate_volume(self.context, volume['id'],
+ host_obj, True)
+ volume = db.volume_get(context.get_admin_context(), volume['id'])
+ self.assertEquals(volume['host'], 'newhost')
+ self.assertEquals(volume['status'], 'available')
+
+ def test_rename_volume(self):
+ self.stubs.Set(self.volume.driver, 'rename_volume',
+ lambda x, y: None)
+ volume = self._create_volume()
+ self.volume.rename_volume(self.context, volume['id'], 'new_id')
+ volume = db.volume_get(context.get_admin_context(), volume['id'])
+ self.assertEquals(volume['name_id'], 'new_id')
+
class DriverTestCase(test.TestCase):
"""Base Test class for Drivers."""
self.volume.delete_volume(self.context, volume_id)
-class VolumeDriverTestCase(DriverTestCase):
+class LVMISCSIVolumeDriverTestCase(DriverTestCase):
"""Test case for VolumeDriver"""
- driver_name = "cinder.volume.drivers.lvm.LVMVolumeDriver"
+ driver_name = "cinder.volume.drivers.lvm.LVMISCSIDriver"
def test_delete_busy_volume(self):
"""Test deleting a busy volume."""
self.output = 'x'
self.volume.driver.delete_volume({'name': 'test1', 'size': 1024})
+ def test_lvm_migrate_volume_no_loc_info(self):
+ host = {'capabilities': {}}
+ vol = {'name': 'test', 'id': 1, 'size': 1}
+ moved, model_update = self.volume.driver.migrate_volume(self.context,
+ vol, host)
+ self.assertEqual(moved, False)
+ self.assertEqual(model_update, None)
+
+ def test_lvm_migrate_volume_bad_loc_info(self):
+ capabilities = {'location_info': 'foo'}
+ host = {'capabilities': capabilities}
+ vol = {'name': 'test', 'id': 1, 'size': 1}
+ moved, model_update = self.volume.driver.migrate_volume(self.context,
+ vol, host)
+ self.assertEqual(moved, False)
+ self.assertEqual(model_update, None)
+
+ def test_lvm_migrate_volume_diff_driver(self):
+ capabilities = {'location_info': 'FooDriver:foo:bar'}
+ host = {'capabilities': capabilities}
+ vol = {'name': 'test', 'id': 1, 'size': 1}
+ moved, model_update = self.volume.driver.migrate_volume(self.context,
+ vol, host)
+ self.assertEqual(moved, False)
+ self.assertEqual(model_update, None)
+
+ def test_lvm_migrate_volume_diff_host(self):
+ capabilities = {'location_info': 'LVMVolumeDriver:foo:bar'}
+ host = {'capabilities': capabilities}
+ vol = {'name': 'test', 'id': 1, 'size': 1}
+ moved, model_update = self.volume.driver.migrate_volume(self.context,
+ vol, host)
+ self.assertEqual(moved, False)
+ self.assertEqual(model_update, None)
+
+ def test_lvm_migrate_volume_proceed(self):
+ hostname = socket.gethostname()
+ capabilities = {'location_info': 'LVMVolumeDriver:%s:bar' % hostname}
+ host = {'capabilities': capabilities}
+ vol = {'name': 'test', 'id': 1, 'size': 1}
+ self.stubs.Set(self.volume.driver, 'remove_export',
+ lambda x, y: None)
+ self.stubs.Set(self.volume.driver, '_create_volume',
+ lambda x, y, z: None)
+ self.stubs.Set(volutils, 'copy_volume',
+ lambda x, y, z, sync=False, execute='foo': None)
+ self.stubs.Set(self.volume.driver, '_delete_volume',
+ lambda x: None)
+ self.stubs.Set(self.volume.driver, '_create_export',
+ lambda x, y, vg='vg': None)
+ moved, model_update = self.volume.driver.migrate_volume(self.context,
+ vol, host)
+ self.assertEqual(moved, True)
+ self.assertEqual(model_update, None)
+
class LVMVolumeDriverTestCase(DriverTestCase):
"""Test case for VolumeDriver"""
expected_msg['args']['snapshot_id'] = snapshot['id']
if 'host' in expected_msg['args']:
del expected_msg['args']['host']
+ if 'dest_host' in expected_msg['args']:
+ dest_host = expected_msg['args']['dest_host']
+ dest_host_dict = {'host': dest_host.host,
+ 'capabilities': dest_host.capabilities}
+ del expected_msg['args']['dest_host']
+ expected_msg['args']['host'] = dest_host_dict
expected_msg['version'] = expected_version
volume=self.fake_volume,
new_size=1,
version='1.6')
+
+ def test_migrate_volume(self):
+ class FakeHost(object):
+ def __init__(self):
+ self.host = 'host'
+ self.capabilities = {}
+ dest_host = FakeHost()
+ self._test_volume_api('migrate_volume',
+ rpc_method='cast',
+ volume=self.fake_volume,
+ dest_host=dest_host,
+ force_host_copy=True,
+ version='1.8')
+
+ def test_rename_volume(self):
+ self._test_volume_api('rename_volume',
+ rpc_method='call',
+ volume=self.fake_volume,
+ new_name_id='new_id',
+ version='1.8')
import os
-import cinder.context
+from cinder import context
+from cinder import db
def get_test_admin_context():
- return cinder.context.get_admin_context()
+ return context.get_admin_context()
def is_cinder_installed():
return True
else:
return False
+
+
+def create_volume(ctxt,
+ host='test_host',
+ display_name='test_volume',
+ display_description='this is a test volume',
+ status='available',
+ size=1):
+ """Create a volume object in the DB."""
+ vol = {}
+ vol['size'] = size
+ vol['host'] = host
+ vol['user_id'] = ctxt.user_id
+ vol['project_id'] = ctxt.project_id
+ vol['status'] = status
+ vol['display_name'] = display_name
+ vol['display_description'] = display_description
+ vol['attach_status'] = 'detached'
+ return db.volume_create(ctxt, vol)
from cinder import quota
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import units
+from cinder import utils
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import volume_types
# Volume is still attached, need to detach first
raise exception.VolumeAttached(volume_id=volume_id)
+ if volume['attach_status'] == "migrating":
+ # Volume is migrating, wait until done
+ msg = _("Volume cannot be deleted while migrating")
+ raise exception.InvalidVolume(reason=msg)
+
snapshots = self.db.snapshot_get_all_for_volume(context, volume_id)
if len(snapshots):
msg = _("Volume still has %d dependent snapshots") % len(snapshots)
marker, limit,
sort_key, sort_dir)
+ # Non-admin shouldn't see temporary target of a volume migration
+ if not context.is_admin:
+ filters['no_migration_targets'] = True
+
if filters:
LOG.debug(_("Searching by: %s") % str(filters))
return False
return True
+ def _check_migration_target(volume, searchdict):
+ if not volume['status'].startswith('migration_target'):
+ return True
+ return False
+
# search_option to filter_name mapping.
- filter_mapping = {'metadata': _check_metadata_match}
+ filter_mapping = {'metadata': _check_metadata_match,
+ 'no_migration_targets': _check_migration_target}
result = []
not_found = object()
self.update(context, volume, {'status': 'extending'})
self.volume_rpcapi.extend_volume(context, volume, new_size)
+ def migrate_volume(self, context, volume, host, force_host_copy):
+ """Migrate the volume to the specified host."""
+
+ # We only handle "available" volumes for now
+ if volume['status'] != "available":
+ msg = _("status must be available")
+ LOG.error(msg)
+ raise exception.InvalidVolume(reason=msg)
+
+ # We only handle volumes without snapshots for now
+ snaps = self.db.snapshot_get_all_for_volume(context, volume['id'])
+ if snaps:
+ msg = _("volume must not have snapshots")
+ LOG.error(msg)
+ raise exception.InvalidVolume(reason=msg)
+
+ # Make sure the host is in the list of available hosts
+ elevated = context.elevated()
+ topic = CONF.volume_topic
+ services = self.db.service_get_all_by_topic(elevated, topic)
+ found = False
+ for service in services:
+ if utils.service_is_up(service) and service['host'] == host:
+ found = True
+ if not found:
+ msg = (_('No available service named %s') % host)
+ LOG.error(msg)
+ raise exception.InvalidHost(reason=msg)
+
+ # Make sure the destination host is different than the current one
+ if host == volume['host']:
+ msg = _('Destination host must be different than current host')
+ LOG.error(msg)
+ raise exception.InvalidHost(reason=msg)
+
+ self.update(context, volume, {'status': 'migrating'})
+
+ # Call the scheduler to ensure that the host exists and that it can
+ # accept the volume
+ volume_type = {}
+ if volume['volume_type_id']:
+ volume_types.get_volume_type(context, volume['volume_type_id'])
+ request_spec = {'volume_properties': volume,
+ 'volume_type': volume_type,
+ 'volume_id': volume['id']}
+ self.scheduler_rpcapi.migrate_volume_to_host(context,
+ CONF.volume_topic,
+ volume['id'],
+ host,
+ force_host_copy,
+ request_spec)
+
class HostAPI(base.Base):
def __init__(self):
from cinder.brick.initiator import connector as initiator
from cinder import exception
from cinder.image import image_utils
+from cinder.openstack.common import excutils
from cinder.openstack.common import log as logging
from cinder import utils
+from cinder.volume import rpcapi as volume_rpcapi
+from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
"""Fail if connector doesn't contain all the data needed by driver"""
pass
+ def _copy_volume_data_cleanup(self, context, volume, properties,
+ attach_info, remote, force=False):
+ self._detach_volume(attach_info)
+ if remote:
+ rpcapi = volume_rpcapi.VolumeAPI()
+ rpcapi.terminate_connection(context, volume, properties,
+ force=force)
+ else:
+ self.terminate_connection(volume, properties, force=False)
+
+ def copy_volume_data(self, context, src_vol, dest_vol, remote=None):
+ """Copy data from src_vol to dest_vol."""
+ LOG.debug(_('copy_data_between_volumes %(src)s -> %(dest)s.')
+ % {'src': src_vol['name'], 'dest': dest_vol['name']})
+
+ properties = initiator.get_connector_properties()
+ dest_remote = True if remote in ['dest', 'both'] else False
+ dest_orig_status = dest_vol['status']
+ try:
+ dest_attach_info = self._attach_volume(context,
+ dest_vol,
+ properties,
+ remote=dest_remote)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ msg = _("Failed to attach volume %(vol)s")
+ LOG.error(msg % {'vol': dest_vol['id']})
+ self.db.volume_update(context, dest_vol['id'],
+ {'status': dest_orig_status})
+
+ src_remote = True if remote in ['src', 'both'] else False
+ src_orig_status = src_vol['status']
+ try:
+ src_attach_info = self._attach_volume(context,
+ src_vol,
+ properties,
+ remote=src_remote)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ msg = _("Failed to attach volume %(vol)s")
+ LOG.error(msg % {'vol': src_vol['id']})
+ self.db.volume_update(context, src_vol['id'],
+ {'status': src_orig_status})
+ self._copy_volume_data_cleanup(context, dest_vol, properties,
+ dest_attach_info, dest_remote,
+ force=True)
+
+ try:
+ volume_utils.copy_volume(src_attach_info['device']['path'],
+ dest_attach_info['device']['path'],
+ src_vol['size'])
+ copy_error = False
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ msg = _("Failed to copy volume %(src)s to %(dest)d")
+ LOG.error(msg % {'src': src_vol['id'], 'dest': dest_vol['id']})
+ copy_error = True
+ finally:
+ self._copy_volume_data_cleanup(context, dest_vol, properties,
+ dest_attach_info, dest_remote,
+ force=copy_error)
+ self._copy_volume_data_cleanup(context, src_vol, properties,
+ src_attach_info, src_remote,
+ force=copy_error)
+
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Fetch the image from image_service and write it to the volume."""
LOG.debug(_('copy_image_to_volume %s.') % volume['name'])
properties = initiator.get_connector_properties()
- connection, device, connector = self._attach_volume(context, volume,
- properties)
+ attach_info = self._attach_volume(context, volume, properties)
try:
image_utils.fetch_to_raw(context,
image_service,
image_id,
- device['path'])
+ attach_info['device']['path'])
finally:
- self._detach_volume(connection, device, connector)
+ self._detach_volume(attach_info)
self.terminate_connection(volume, properties)
def copy_volume_to_image(self, context, volume, image_service, image_meta):
LOG.debug(_('copy_volume_to_image %s.') % volume['name'])
properties = initiator.get_connector_properties()
- connection, device, connector = self._attach_volume(context, volume,
- properties)
+ attach_info = self._attach_volume(context, volume, properties)
try:
image_utils.upload_volume(context,
image_service,
image_meta,
- device['path'])
+ attach_info['device']['path'])
finally:
- self._detach_volume(connection, device, connector)
+ self._detach_volume(attach_info)
self.terminate_connection(volume, properties)
- def _attach_volume(self, context, volume, properties):
+ def _attach_volume(self, context, volume, properties, remote=False):
"""Attach the volume."""
- host_device = None
- conn = self.initialize_connection(volume, properties)
+ if remote:
+ rpcapi = volume_rpcapi.VolumeAPI()
+ conn = rpcapi.initialize_connection(context, volume, properties)
+ else:
+ conn = self.initialize_connection(volume, properties)
# Use Brick's code to do attach/detach
use_multipath = self.configuration.use_multipath_for_image_xfer
"via the path "
"%(path)s.") %
{'path': host_device}))
- return conn, device, connector
+ return {'conn': conn, 'device': device, 'connector': connector}
- def _detach_volume(self, connection, device, connector):
+ def _detach_volume(self, attach_info):
"""Disconnect the volume from the host."""
- protocol = connection['driver_volume_type']
# Use Brick's code to do attach/detach
- connector.disconnect_volume(connection['data'], device)
+ connector = attach_info['connector']
+ connector.disconnect_volume(attach_info['conn']['data'],
+ attach_info['device'])
def clone_image(self, volume, image_location):
"""Create a volume efficiently from an existing image.
msg = _("Extend volume not implemented")
raise NotImplementedError(msg)
+ def migrate_volume(self, context, volume, host):
+ """Migrate the volume to the specified host.
+
+ Returns a boolean indicating whether the migration occurred, as well as
+ model_update.
+ """
+ return (False, None)
+
+ def rename_volume(self, volume, orig_name):
+ """Rename the volume according to the volume object.
+
+ The original name is passed for reference, and the function can return
+ model_update.
+ """
+ return None
+
class ISCSIDriver(VolumeDriver):
"""Executes commands relating to ISCSI volumes.
import math
import os
import re
+import socket
from oslo.config import cfg
def __init__(self, *args, **kwargs):
super(LVMVolumeDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(volume_opts)
+ self.hostname = socket.gethostname()
def check_for_setup_error(self):
"""Returns an error if prerequisites aren't met"""
% self.configuration.volume_group)
raise exception.VolumeBackendAPIException(data=exception_message)
- def _create_volume(self, volume_name, sizestr):
-
+ def _create_volume(self, volume_name, sizestr, vg=None):
+ if vg is None:
+ vg = self.configuration.volume_group
no_retry_list = ['Insufficient free extents',
'One or more specified logical volume(s) not found']
- cmd = ['lvcreate', '-L', sizestr, '-n', volume_name,
- self.configuration.volume_group]
+ cmd = ['lvcreate', '-L', sizestr, '-n', volume_name, vg]
if self.configuration.lvm_mirrors:
cmd += ['-m', self.configuration.lvm_mirrors, '--nosync']
terras = int(sizestr[:-1]) / 1024.0
# it's quite slow.
self._delete_volume(snapshot)
- def local_path(self, volume):
+ def local_path(self, volume, vg=None):
+ if vg is None:
+ vg = self.configuration.volume_group
# NOTE(vish): stops deprecation warning
- escaped_group = self.configuration.volume_group.replace('-', '--')
+ escaped_group = vg.replace('-', '--')
escaped_name = self._escape_snapshot(volume['name']).replace('-', '--')
return "/dev/mapper/%s-%s" % (escaped_group, escaped_name)
self.db.iscsi_target_create_safe(context, target)
def create_export(self, context, volume):
+ return self._create_export(context, volume)
+
+ def _create_export(self, context, volume, vg=None):
"""Creates an export for a logical volume."""
+ if vg is None:
+ vg = self.configuration.volume_group
iscsi_name = "%s%s" % (self.configuration.iscsi_target_prefix,
volume['name'])
- volume_path = "/dev/%s/%s" % (self.configuration.volume_group,
- volume['name'])
+ volume_path = "/dev/%s/%s" % (vg, volume['name'])
model_update = {}
# TODO(jdg): In the future move all of the dependent stuff into the
self.tgtadm.remove_iscsi_target(iscsi_target, 0, volume['id'])
+ def migrate_volume(self, ctxt, volume, host):
+ """Optimize the migration if the destination is on the same server.
+
+ If the specified host is another back-end on the same server, and
+ the volume is not attached, we can do the migration locally without
+ going through iSCSI.
+ """
+ false_ret = (False, None)
+ if 'location_info' not in host['capabilities']:
+ return false_ret
+ info = host['capabilities']['location_info']
+ try:
+ (dest_type, dest_hostname, dest_vg) = info.split(':')
+ except ValueError:
+ return false_ret
+ if (dest_type != 'LVMVolumeDriver' or dest_hostname != self.hostname):
+ return false_ret
+
+ self.remove_export(ctxt, volume)
+ self._create_volume(volume['name'],
+ self._sizestr(volume['size']),
+ dest_vg)
+ volutils.copy_volume(self.local_path(volume),
+ self.local_path(volume, vg=dest_vg),
+ volume['size'],
+ execute=self._execute)
+ self._delete_volume(volume)
+ model_update = self._create_export(ctxt, volume, vg=dest_vg)
+
+ return (True, model_update)
+
+ def rename_volume(self, volume, orig_name):
+ self._execute('lvrename', self.configuration.volume_group,
+ orig_name, volume['name'],
+ run_as_root=True)
+
def get_volume_stats(self, refresh=False):
"""Get volume status.
data['free_capacity_gb'] = 0
data['reserved_percentage'] = self.configuration.reserved_percentage
data['QoS_support'] = False
+ data['location_info'] = ('LVMVolumeDriver:%(hostname)s:%(vg)s' %
+ {'hostname': self.hostname,
+ 'vg': self.configuration.volume_group})
try:
out, err = self._execute('vgs', '--noheadings', '--nosuffix',
data['QoS_support'] = False
data['total_capacity_gb'] = 'infinite'
data['free_capacity_gb'] = 'infinite'
+ data['location_info'] = ('LVMVolumeDriver:%(hostname)s:%(vg)s' %
+ {'hostname': self.hostname,
+ 'vg': self.configuration.volume_group})
self._stats = data
import sys
+import time
import traceback
from oslo.config import cfg
+from cinder.brick.initiator import connector as initiator
from cinder import context
from cinder import exception
from cinder.image import glance
from cinder import quota
from cinder import utils
from cinder.volume.configuration import Configuration
+from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
cfg.StrOpt('volume_driver',
default='cinder.volume.drivers.lvm.LVMISCSIDriver',
help='Driver to use for volume creation'),
+ cfg.IntOpt('migration_create_volume_timeout_secs',
+ default=300,
+ help='Timeout for creating the volume to migrate to '
+ 'when performing volume migration (seconds)'),
]
CONF = cfg.CONF
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
- RPC_API_VERSION = '1.7'
+ RPC_API_VERSION = '1.8'
def __init__(self, volume_driver=None, service_name=None,
*args, **kwargs):
# before passing it to the driver.
volume_ref['host'] = self.host
- status = 'available'
+ if volume_ref['status'] == 'migration_target_creating':
+ status = 'migration_target'
+ else:
+ status = 'available'
model_update = False
image_meta = None
cloned = False
volume_ref['id'],
{'status': 'error_deleting'})
+ # If deleting the source volume in a migration, we want to skip quotas
+ # and other database updates.
+ if volume_ref['status'] == 'migrating':
+ return True
+
# Get reservations
try:
reserve_opts = {'volumes': -1, 'gigabytes': -volume_ref['size']}
volume_ref = self.db.volume_get(context, volume_id)
self.driver.accept_transfer(volume_ref)
+ def _migrate_volume_generic(self, ctxt, volume, host):
+ rpcapi = volume_rpcapi.VolumeAPI()
+
+ # Create new volume on remote host
+ new_vol_values = {}
+ for k, v in volume.iteritems():
+ new_vol_values[k] = v
+ del new_vol_values['id']
+ new_vol_values['host'] = host['host']
+ new_vol_values['status'] = 'migration_target_creating'
+ new_volume = self.db.volume_create(ctxt, new_vol_values)
+ rpcapi.create_volume(ctxt, new_volume, host['host'],
+ None, None)
+
+ # Wait for new_volume to become ready
+ starttime = time.time()
+ deadline = starttime + CONF.migration_create_volume_timeout_secs
+ new_volume = self.db.volume_get(ctxt, new_volume['id'])
+ tries = 0
+ while new_volume['status'] != 'migration_target':
+ tries = tries + 1
+ now = time.time()
+ if new_volume['status'] == 'error':
+ msg = _("failed to create new_volume on destination host")
+ raise exception.VolumeMigrationFailed(reason=msg)
+ elif now > deadline:
+ msg = _("timeout creating new_volume on destination host")
+ raise exception.VolumeMigrationFailed(reason=msg)
+ else:
+ time.sleep(tries ** 2)
+ new_volume = self.db.volume_get(ctxt, new_volume['id'])
+
+ # Copy the source volume to the destination volume
+ try:
+ self.driver.copy_volume_data(ctxt, volume, new_volume,
+ remote='dest')
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ msg = _("Failed to copy volume %(vol1)s to %(vol2)s")
+ LOG.error(msg % {'vol1': volume['id'],
+ 'vol2': new_volume['id']})
+ rpcapi.delete_volume(ctxt, volume)
+
+ # Delete the source volume (if it fails, don't fail the migration)
+ try:
+ self.delete_volume(ctxt, volume['id'])
+ except Exception as ex:
+ msg = _("Failed to delete migration source vol %(vol)s: %(err)s")
+ LOG.error(msg % {'vol': volume['id'], 'err': ex})
+
+ # Rename the destination volume to the name of the source volume.
+ # We rename rather than create the destination with the same as the
+ # source because: (a) some backends require unique names between pools
+ # in addition to within pools, and (b) we want to enable migration
+ # within one pool (for example, changing a volume's type by creating a
+ # new volume and copying the data over)
+ try:
+ rpcapi.rename_volume(ctxt, new_volume, volume['id'])
+ except Exception:
+ msg = _("Failed to rename migration destination volume "
+ "%(vol)s to %(name)s")
+ LOG.error(msg % {'vol': new_volume['id'], 'name': volume['name']})
+
+ self.db.finish_volume_migration(ctxt, volume['id'], new_volume['id'])
+
+ def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False):
+ """Migrate the volume to the specified host (called on source host)."""
+ volume_ref = self.db.volume_get(ctxt, volume_id)
+ model_update = None
+ moved = False
+ if not force_host_copy:
+ try:
+ LOG.debug(_("volume %s: calling driver migrate_volume"),
+ volume_ref['name'])
+ moved, model_update = self.driver.migrate_volume(ctxt,
+ volume_ref,
+ host)
+ if moved:
+ updates = {'host': host['host']}
+ if model_update:
+ updates.update(model_update)
+ volume_ref = self.db.volume_update(ctxt,
+ volume_ref['id'],
+ updates)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ updates = {'status': 'error_migrating'}
+ model_update = self.driver.create_export(ctxt, volume_ref)
+ if model_update:
+ updates.update(model_update)
+ self.db.volume_update(ctxt, volume_ref['id'], updates)
+ if not moved:
+ try:
+ self._migrate_volume_generic(ctxt, volume_ref, host)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ updates = {'status': 'error_migrating'}
+ model_update = self.driver.create_export(ctxt, volume_ref)
+ if model_update:
+ updates.update(model_update)
+ self.db.volume_update(ctxt, volume_ref['id'], updates)
+ self.db.volume_update(ctxt, volume_ref['id'],
+ {'status': 'available'})
+
+ def rename_volume(self, ctxt, volume_id, new_name_id):
+ volume_ref = self.db.volume_get(ctxt, volume_id)
+ orig_name = volume_ref['name']
+ self.driver.remove_export(ctxt, volume_ref)
+ self.db.volume_update(ctxt, volume_id, {'name_id': new_name_id})
+ volume_ref = self.db.volume_get(ctxt, volume_id)
+ model_update = self.driver.rename_volume(volume_ref, orig_name)
+ if model_update:
+ self.db.volume_update(ctxt, volume_ref['id'], model_update)
+ model_update = self.driver.create_export(ctxt, volume_ref)
+ if model_update:
+ self.db.volume_update(ctxt, volume_ref['id'], model_update)
+
@periodic_task.periodic_task
def _report_driver_status(self, context):
LOG.info(_("Updating volume status"))
Client side of the volume RPC API.
"""
-
from oslo.config import cfg
from cinder.openstack.common import rpc
1.6 - Add extend_volume.
1.7 - Adds host_name parameter to attach_volume()
to allow attaching to host rather than instance.
+ 1.8 - Add migrate_volume, rename_volume.
'''
BASE_RPC_API_VERSION = '1.0'
new_size=new_size),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']),
version='1.6')
+
+ def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
+ host_p = {'host': dest_host.host,
+ 'capabilities': dest_host.capabilities}
+ self.cast(ctxt,
+ self.make_msg('migrate_volume',
+ volume_id=volume['id'],
+ host=host_p,
+ force_host_copy=force_host_copy),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']),
+ version='1.8')
+
+ def rename_volume(self, ctxt, volume, new_name_id):
+ self.call(ctxt,
+ self.make_msg('rename_volume',
+ volume_id=volume['id'],
+ new_name_id=new_name_id),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']),
+ version='1.8')
"volume_extension:snapshot_admin_actions:reset_status": [["rule:admin_api"]],
"volume_extension:volume_admin_actions:force_delete": [["rule:admin_api"]],
"volume_extension:snapshot_admin_actions:force_delete": [["rule:admin_api"]],
+ "volume_extension:volume_admin_actions:migrate_volume": [["rule:admin_api"]],
"volume_extension:volume_host_attribute": [["rule:admin_api"]],
"volume_extension:volume_tenant_attribute": [["rule:admin_api"]],
# cinder/volume/driver.py: 'lvdisplay', '--noheading', '-C', '-o', 'Attr',..
lvdisplay: CommandFilter, lvdisplay, root
+# cinder/volume/driver.py: 'lvrename', '%(vg)s', '%(orig)s' '(new)s'...
+lvrename: CommandFilter, lvrename, root
+
# cinder/volume/driver.py: 'iscsiadm', '-m', 'discovery', '-t',...
# cinder/volume/driver.py: 'iscsiadm', '-m', 'node', '-T', ...
iscsiadm: CommandFilter, iscsiadm, root