]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Migration for detached volumes with no snaps.
authorAvishay Traeger <avishay@il.ibm.com>
Tue, 18 Jun 2013 18:53:15 +0000 (21:53 +0300)
committerAvishay Traeger <avishay@il.ibm.com>
Wed, 24 Jul 2013 13:05:50 +0000 (16:05 +0300)
Implementation of volume migration for detached volumes with no
snapshots. Migration is initiated by an admin API. The scheduler
confirms that the specified destination host can accept the volume.
The source driver is given the opportunity to migrate the volume on
their own. Otherwise, a new volume is created on the destination, both
volumes are attached, the data is copied over, the volumes are
detached, the source is deleted, and the destination is renamed. In
the database, the destination volume's attributes are copied to the
source so that the volume-id remains unchanged, and the destination
volume row is deleted.

DocImpact

Implements: bp volume-migration
Change-Id: Ib6fcf27051f45e60aa3ba5f599e88c1421db753e

31 files changed:
cinder/api/contrib/admin_actions.py
cinder/db/api.py
cinder/db/sqlalchemy/api.py
cinder/db/sqlalchemy/migrate_repo/versions/014_add_name_id.py [new file with mode: 0644]
cinder/db/sqlalchemy/migrate_repo/versions/014_sqlite_downgrade.sql [new file with mode: 0644]
cinder/db/sqlalchemy/models.py
cinder/exception.py
cinder/scheduler/chance.py
cinder/scheduler/driver.py
cinder/scheduler/filter_scheduler.py
cinder/scheduler/manager.py
cinder/scheduler/rpcapi.py
cinder/tests/api/contrib/test_admin_actions.py
cinder/tests/brick/test_brick_linuxscsi.py
cinder/tests/db/test_finish_migration.py [new file with mode: 0644]
cinder/tests/db/test_name_id.py [new file with mode: 0644]
cinder/tests/policy.json
cinder/tests/scheduler/test_filter_scheduler.py
cinder/tests/scheduler/test_rpcapi.py
cinder/tests/scheduler/test_scheduler.py
cinder/tests/test_migrations.py
cinder/tests/test_volume.py
cinder/tests/test_volume_rpcapi.py
cinder/tests/utils.py
cinder/volume/api.py
cinder/volume/driver.py
cinder/volume/drivers/lvm.py
cinder/volume/manager.py
cinder/volume/rpcapi.py
etc/cinder/policy.json
etc/cinder/rootwrap.d/volume.filters

index f8491b426e047862e24d0ee6ca3a51e3ab5cd640..e919090dea6095f8c0bc5f55cc2bb795f974c4e6 100644 (file)
@@ -140,6 +140,21 @@ class VolumeAdminController(AdminController):
         self.volume_api.detach(context, volume)
         return webob.Response(status_int=202)
 
+    @wsgi.action('os-migrate_volume')
+    def _migrate_volume(self, req, id, body):
+        """Migrate a volume to the specified host."""
+        context = req.environ['cinder.context']
+        self.authorize(context, 'migrate_volume')
+        try:
+            volume = self._get(context, id)
+        except exception.NotFound:
+            raise exc.HTTPNotFound()
+        params = body['os-migrate_volume']
+        host = params['host']
+        force_host_copy = params.get('force_host_copy', False)
+        self.volume_api.migrate_volume(context, volume, host, force_host_copy)
+        return webob.Response(status_int=202)
+
 
 class SnapshotAdminController(AdminController):
     """AdminController for Snapshots."""
index 28e349f36e9897466dc836a37e0f36d335ab882a..468a8e04cb723f8226e51864d5f774b09cea1826 100644 (file)
@@ -225,6 +225,11 @@ def volume_data_get_for_project(context, project_id, volume_type_id=None,
                                             session)
 
 
+def finish_volume_migration(context, src_vol_id, dest_vol_id):
+    """Perform database updates upon completion of volume migration."""
+    return IMPL.finish_volume_migration(context, src_vol_id, dest_vol_id)
+
+
 def volume_destroy(context, volume_id):
     """Destroy the volume or raise if it does not exist."""
     return IMPL.volume_destroy(context, volume_id)
index a2e4bf814d9667dcc42acdfe5bfc431e7fb62842..8e6b4b242f2b31c7dde3cc06de4055f2b44d13e0 100644 (file)
@@ -1048,6 +1048,25 @@ def volume_data_get_for_project(context, project_id, volume_type_id=None,
                                         session)
 
 
+@require_admin_context
+def finish_volume_migration(context, src_vol_id, dest_vol_id):
+    """Copy almost all columns from dest to source, then delete dest."""
+    session = get_session()
+    with session.begin():
+        dest_volume_ref = _volume_get(context, dest_vol_id, session=session)
+        updates = {}
+        for key, value in dest_volume_ref.iteritems():
+            if key in ['id', 'status']:
+                continue
+            updates[key] = value
+        session.query(models.Volume).\
+            filter_by(id=src_vol_id).\
+            update(updates)
+        session.query(models.Volume).\
+            filter_by(id=dest_vol_id).\
+            delete()
+
+
 @require_admin_context
 def volume_destroy(context, volume_id):
     session = get_session()
diff --git a/cinder/db/sqlalchemy/migrate_repo/versions/014_add_name_id.py b/cinder/db/sqlalchemy/migrate_repo/versions/014_add_name_id.py
new file mode 100644 (file)
index 0000000..e356747
--- /dev/null
@@ -0,0 +1,37 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+from sqlalchemy import String, Column, MetaData, Table
+
+
+def upgrade(migrate_engine):
+    """Add _name_id column to volumes."""
+    meta = MetaData()
+    meta.bind = migrate_engine
+
+    volumes = Table('volumes', meta, autoload=True)
+    _name_id = Column('_name_id', String(36))
+    volumes.create_column(_name_id)
+    volumes.update().values(_name_id=None).execute()
+
+
+def downgrade(migrate_engine):
+    """Remove _name_id column from volumes."""
+    meta = MetaData()
+    meta.bind = migrate_engine
+
+    volumes = Table('volumes', meta, autoload=True)
+    _name_id = volumes.columns._name_id
+    volumes.drop_column(_name_id)
diff --git a/cinder/db/sqlalchemy/migrate_repo/versions/014_sqlite_downgrade.sql b/cinder/db/sqlalchemy/migrate_repo/versions/014_sqlite_downgrade.sql
new file mode 100644 (file)
index 0000000..0d0b665
--- /dev/null
@@ -0,0 +1,68 @@
+BEGIN TRANSACTION;
+
+CREATE TABLE volumes_v12 (
+    created_at DATETIME,
+    updated_at DATETIME,
+    deleted_at DATETIME,
+    deleted BOOLEAN,
+    id VARCHAR(36) NOT NULL,
+    ec2_id INTEGER,
+    user_id VARCHAR(255),
+    project_id VARCHAR(255),
+    snapshot_id VARCHAR(36),
+    host VARCHAR(255),
+    size INTEGER,
+    availability_zone VARCHAR(255),
+    instance_uuid VARCHAR(36),
+    attached_host VARCHAR(255),
+    mountpoint VARCHAR(255),
+    attach_time VARCHAR(255),
+    status VARCHAR(255),
+    attach_status VARCHAR(255),
+    scheduled_at DATETIME,
+    launched_at DATETIME,
+    terminated_at DATETIME,
+    display_name VARCHAR(255),
+    display_description VARCHAR(255),
+    provider_location VARCHAR(255),
+    provider_auth VARCHAR(255),
+    volume_type_id VARCHAR(36),
+    source_volid VARCHAR(36),
+    bootable BOOLEAN,
+    PRIMARY KEY (id)
+);
+
+INSERT INTO volumes_v12
+    SELECT created_at,
+        updated_at,
+        deleted_at,
+        deleted,
+        id,
+        ec2_id,
+        user_id,
+        project_id,
+        snapshot_id,
+        host,
+        size,
+        availability_zone,
+        instance_uuid,
+        attached_host,
+        mountpoint,
+        attach_time,
+        status,
+        attach_status,
+        scheduled_at,
+        launched_at,
+        terminated_at,
+        display_name,
+        display_description,
+        provider_location,
+        provider_auth,
+        volume_type_id,
+        source_volid,
+        bootable
+    FROM volumes;
+
+DROP TABLE volumes;
+ALTER TABLE volumes_v12 RENAME TO volumes;
+COMMIT;
index d665f0dd8068dc470ff64d850946c59eaf42bc67..8ae1aec219877eec3bb64d8967be4e2f3c084b33 100644 (file)
@@ -81,10 +81,19 @@ class Volume(BASE, CinderBase):
     """Represents a block storage device that can be attached to a vm."""
     __tablename__ = 'volumes'
     id = Column(String(36), primary_key=True)
+    _name_id = Column(String(36))  # Don't access/modify this directly!
+
+    @property
+    def name_id(self):
+        return self.id if not self._name_id else self._name_id
+
+    @name_id.setter
+    def name_id(self, value):
+        self._name_id = value
 
     @property
     def name(self):
-        return CONF.volume_name_template % self.id
+        return CONF.volume_name_template % self.name_id
 
     ec2_id = Column(Integer)
     user_id = Column(String(255))
index da35a30b21216bbc7997304ba9200057c8139483..777ec12838d5b729f66ba3045925b732295f2a63 100644 (file)
@@ -188,6 +188,10 @@ class InvalidContentType(Invalid):
     message = _("Invalid content type %(content_type)s.")
 
 
+class InvalidHost(Invalid):
+    message = _("Invalid host") + ": %(reason)s"
+
+
 # Cannot be templated as the error syntax varies.
 # msg needs to be constructed when raised.
 class InvalidParameterValue(Invalid):
@@ -594,3 +598,11 @@ class SwiftConnectionFailed(CinderException):
 
 class TransferNotFound(NotFound):
     message = _("Transfer %(transfer_id)s could not be found.")
+
+
+class VolumeMigrationFailed(CinderException):
+    message = _("Volume migration failed") + ": %(reason)s"
+
+
+class ProtocolNotSupported(CinderException):
+    message = _("Connect to volume via protocol %(protocol)s not supported.")
index 8bb0eb72aaa0044d77ef3f4465c56a5841fb3c35..0b9a4dac67d6ed8a608c0196c38fa29ce357cd14 100644 (file)
@@ -39,12 +39,14 @@ class ChanceScheduler(driver.Scheduler):
         """Filter a list of hosts based on request_spec."""
 
         filter_properties = kwargs.get('filter_properties', {})
+        if not filter_properties:
+            filter_properties = {}
         ignore_hosts = filter_properties.get('ignore_hosts', [])
         hosts = [host for host in hosts if host not in ignore_hosts]
         return hosts
 
-    def _schedule(self, context, topic, request_spec, **kwargs):
-        """Picks a host that is up at random."""
+    def _get_weighted_candidates(self, context, topic, request_spec, **kwargs):
+        """Returns a list of the available hosts."""
 
         elevated = context.elevated()
         hosts = self.hosts_up(elevated, topic)
@@ -52,11 +54,15 @@ class ChanceScheduler(driver.Scheduler):
             msg = _("Is the appropriate service running?")
             raise exception.NoValidHost(reason=msg)
 
-        hosts = self._filter_hosts(request_spec, hosts, **kwargs)
+        return self._filter_hosts(request_spec, hosts, **kwargs)
+
+    def _schedule(self, context, topic, request_spec, **kwargs):
+        """Picks a host that is up at random."""
+        hosts = self._get_weighted_candidates(context, topic,
+                                              request_spec, **kwargs)
         if not hosts:
             msg = _("Could not find another host")
             raise exception.NoValidHost(reason=msg)
-
         return hosts[int(random.random() * len(hosts))]
 
     def schedule_create_volume(self, context, request_spec, filter_properties):
@@ -71,3 +77,24 @@ class ChanceScheduler(driver.Scheduler):
         updated_volume = driver.volume_update_db(context, volume_id, host)
         self.volume_rpcapi.create_volume(context, updated_volume, host,
                                          snapshot_id, image_id)
+
+    def host_passes_filters(self, context, host, request_spec,
+                            filter_properties):
+        """Check if the specified host passes the filters."""
+        weighed_hosts = self._get_weighted_candidates(
+            context,
+            CONF.volume_topic,
+            request_spec,
+            filter_properties=filter_properties)
+
+        for weighed_host in weighed_hosts:
+            if weighed_host == host:
+                elevated = context.elevated()
+                host_states = self.host_manager.get_all_host_states(elevated)
+                for host_state in host_states:
+                    if host_state.host == host:
+                        return host_state
+
+        msg = (_('cannot place volume %(id)s on %(host)s')
+               % {'id': request_spec['volume_id'], 'host': host})
+        raise exception.NoValidHost(reason=msg)
index 9e970a7282b8104261e0ce75e0d130068cb07851..13114c6aabecacc0d680eafdaa4dff9eb67114cf 100644 (file)
@@ -84,6 +84,10 @@ class Scheduler(object):
                 for service in services
                 if utils.service_is_up(service)]
 
+    def host_passes_filters(self, context, volume_id, host, filter_properties):
+        """Check if the specified host passes the filters."""
+        raise NotImplementedError(_("Must implement host_passes_filters"))
+
     def schedule(self, context, topic, method, *_args, **_kwargs):
         """Must override schedule method for scheduler to work."""
         raise NotImplementedError(_("Must implement a fallback schedule"))
index bdeb6178d1e44fa8bc7bf03769ce8e2842a7edf1..83a135ef7ec26c9c089b0cce02b41bf299e3ce8b 100644 (file)
@@ -85,6 +85,20 @@ class FilterScheduler(driver.Scheduler):
                                          snapshot_id=snapshot_id,
                                          image_id=image_id)
 
+    def host_passes_filters(self, context, host, request_spec,
+                            filter_properties):
+        """Check if the specified host passes the filters."""
+        weighed_hosts = self._get_weighted_candidates(context, request_spec,
+                                                      filter_properties)
+        for weighed_host in weighed_hosts:
+            host_state = weighed_host.obj
+            if host_state.host == host:
+                return host_state
+
+        msg = (_('cannot place volume %(id)s on %(host)s')
+               % {'id': request_spec['volume_id'], 'host': host})
+        raise exception.NoValidHost(reason=msg)
+
     def _post_select_populate_filter_properties(self, filter_properties,
                                                 host_state):
         """Add additional information to the filter properties after a host has
@@ -165,7 +179,8 @@ class FilterScheduler(driver.Scheduler):
                     }
             raise exception.NoValidHost(reason=msg)
 
-    def _schedule(self, context, request_spec, filter_properties=None):
+    def _get_weighted_candidates(self, context, request_spec,
+                                 filter_properties=None):
         """Returns a list of hosts that meet the required specs,
         ordered by their fitness.
         """
@@ -214,7 +229,15 @@ class FilterScheduler(driver.Scheduler):
         # host for the job.
         weighed_hosts = self.host_manager.get_weighed_hosts(hosts,
                                                             filter_properties)
+        return weighed_hosts
+
+    def _schedule(self, context, request_spec, filter_properties=None):
+        weighed_hosts = self._get_weighted_candidates(context, request_spec,
+                                                      filter_properties)
+        if not weighed_hosts:
+            return None
         best_host = weighed_hosts[0]
         LOG.debug(_("Choosing %s") % best_host)
+        volume_properties = request_spec['volume_properties']
         best_host.obj.consume_from_volume(volume_properties)
         return best_host
index 4bef56526502822e64c61fc9b4ae1eb5d0be81be..8d4c3c8337c63bcace71e888a974372b6083f7e2 100644 (file)
@@ -48,7 +48,7 @@ LOG = logging.getLogger(__name__)
 class SchedulerManager(manager.Manager):
     """Chooses a host to create volumes."""
 
-    RPC_API_VERSION = '1.2'
+    RPC_API_VERSION = '1.3'
 
     def __init__(self, scheduler_driver=None, service_name=None,
                  *args, **kwargs):
@@ -139,3 +139,28 @@ class SchedulerManager(manager.Manager):
 
     def request_service_capabilities(self, context):
         volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
+
+    def _migrate_volume_set_error(self, context, ex, request_spec):
+        volume_state = {'volume_state': {'status': 'error_migrating'}}
+        self._set_volume_state_and_notify('migrate_volume_to_host',
+                                          volume_state,
+                                          context, ex, request_spec)
+
+    def migrate_volume_to_host(self, context, topic, volume_id, host,
+                               force_host_copy, request_spec,
+                               filter_properties=None):
+        """Ensure that the host exists and can accept the volume."""
+        try:
+            tgt_host = self.driver.host_passes_filters(context, host,
+                                                       request_spec,
+                                                       filter_properties)
+        except exception.NoValidHost as ex:
+                self._migrate_volume_set_error(context, ex, request_spec)
+        except Exception as ex:
+            with excutils.save_and_reraise_exception():
+                self._migrate_volume_set_error(context, ex, request_spec)
+        else:
+            volume_ref = db.volume_get(context, volume_id)
+            volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
+                                                     tgt_host,
+                                                     force_host_copy)
index 4d10c1f269eb5edc57ef9f2a6d3ba2744c329f57..60fe5a67a9c2120aae0c52a248b0277faaa115a4 100644 (file)
@@ -36,6 +36,7 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
         1.1 - Add create_volume() method
         1.2 - Add request_spec, filter_properties arguments
               to create_volume()
+        1.3 - Add migrate_volume_to_host() method
     '''
 
     RPC_API_VERSION = '1.0'
@@ -59,6 +60,20 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
             filter_properties=filter_properties),
             version='1.2')
 
+    def migrate_volume_to_host(self, ctxt, topic, volume_id, host,
+                               force_host_copy=False, request_spec=None,
+                               filter_properties=None):
+        request_spec_p = jsonutils.to_primitive(request_spec)
+        return self.cast(ctxt, self.make_msg(
+            'migrate_volume_to_host',
+            topic=topic,
+            volume_id=volume_id,
+            host=host,
+            force_host_copy=force_host_copy,
+            request_spec=request_spec_p,
+            filter_properties=filter_properties),
+            version='1.3')
+
     def update_service_capabilities(self, ctxt,
                                     service_name, host,
                                     capabilities):
index 80e53d7d3639d5cd2dc1efc7696471fe65b00d8c..a30ec3b9fb7c723a5f310d84ad935dcea688ea45 100644 (file)
@@ -2,15 +2,20 @@ import shutil
 import tempfile
 import webob
 
+from oslo.config import cfg
+
 from cinder import context
 from cinder import db
 from cinder import exception
 from cinder.openstack.common import jsonutils
+from cinder.openstack.common import timeutils
 from cinder import test
 from cinder.tests.api import fakes
 from cinder.tests.api.v2 import stubs
 from cinder.volume import api as volume_api
 
+CONF = cfg.CONF
+
 
 def app():
     # no auth, just let environ['cinder.context'] pass through
@@ -437,3 +442,82 @@ class AdminActionsTest(test.TestCase):
                           mountpoint)
         # cleanup
         svc.stop()
+
+    def _migrate_volume_prep(self):
+        admin_ctx = context.get_admin_context()
+        # create volume's current host and the destination host
+        db.service_create(admin_ctx,
+                          {'host': 'test',
+                           'topic': CONF.volume_topic,
+                           'created_at': timeutils.utcnow()})
+        db.service_create(admin_ctx,
+                          {'host': 'test2',
+                           'topic': CONF.volume_topic,
+                           'created_at': timeutils.utcnow()})
+        # current status is available
+        volume = db.volume_create(admin_ctx,
+                                  {'status': 'available',
+                                   'host': 'test',
+                                   'provider_location': '',
+                                   'attach_status': ''})
+        return volume
+
+    def _migrate_volume_exec(self, ctx, volume, host, expected_status):
+        admin_ctx = context.get_admin_context()
+        # build request to migrate to host
+        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
+        req.method = 'POST'
+        req.headers['content-type'] = 'application/json'
+        req.body = jsonutils.dumps({'os-migrate_volume': {'host': host}})
+        req.environ['cinder.context'] = ctx
+        resp = req.get_response(app())
+        # verify status
+        self.assertEquals(resp.status_int, expected_status)
+        volume = db.volume_get(admin_ctx, volume['id'])
+        return volume
+
+    def test_migrate_volume_success(self):
+        expected_status = 202
+        host = 'test2'
+        ctx = context.RequestContext('admin', 'fake', True)
+        volume = self._migrate_volume_prep()
+        volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
+        self.assertEquals(volume['status'], 'migrating')
+
+    def test_migrate_volume_as_non_admin(self):
+        expected_status = 403
+        host = 'test2'
+        ctx = context.RequestContext('fake', 'fake')
+        volume = self._migrate_volume_prep()
+        self._migrate_volume_exec(ctx, volume, host, expected_status)
+
+    def test_migrate_volume_host_no_exist(self):
+        expected_status = 400
+        host = 'test3'
+        ctx = context.RequestContext('admin', 'fake', True)
+        volume = self._migrate_volume_prep()
+        self._migrate_volume_exec(ctx, volume, host, expected_status)
+
+    def test_migrate_volume_same_host(self):
+        expected_status = 400
+        host = 'test'
+        ctx = context.RequestContext('admin', 'fake', True)
+        volume = self._migrate_volume_prep()
+        self._migrate_volume_exec(ctx, volume, host, expected_status)
+
+    def test_migrate_volume_in_use(self):
+        expected_status = 400
+        host = 'test2'
+        ctx = context.RequestContext('admin', 'fake', True)
+        volume = self._migrate_volume_prep()
+        model_update = {'status': 'in-use'}
+        volume = db.volume_update(ctx, volume['id'], model_update)
+        self._migrate_volume_exec(ctx, volume, host, expected_status)
+
+    def test_migrate_volume_with_snap(self):
+        expected_status = 400
+        host = 'test2'
+        ctx = context.RequestContext('admin', 'fake', True)
+        volume = self._migrate_volume_prep()
+        db.snapshot_create(ctx, {'volume_id': volume['id']})
+        self._migrate_volume_exec(ctx, volume, host, expected_status)
index 29be9bb98b9cb73538327430d948d7645451cd36..c1bc072e5b8dde305ea75b09bb57957fd5ec237d 100644 (file)
@@ -20,6 +20,7 @@ import string
 from cinder.brick.initiator import linuxscsi
 from cinder.openstack.common import log as logging
 from cinder import test
+from cinder import utils
 
 LOG = logging.getLogger(__name__)
 
diff --git a/cinder/tests/db/test_finish_migration.py b/cinder/tests/db/test_finish_migration.py
new file mode 100644 (file)
index 0000000..87ade42
--- /dev/null
@@ -0,0 +1,49 @@
+# Copyright 2013 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Tests for finish_volume_migration."""
+
+
+from cinder import context
+from cinder import db
+from cinder import exception
+from cinder import test
+from cinder.tests import utils as testutils
+
+
+class FinishVolumeMigrationTestCase(test.TestCase):
+    """Test cases for finish_volume_migration."""
+
+    def setUp(self):
+        super(FinishVolumeMigrationTestCase, self).setUp()
+
+    def tearDown(self):
+        super(FinishVolumeMigrationTestCase, self).tearDown()
+
+    def test_finish_volume_migration(self):
+        ctxt = context.RequestContext(user_id='user_id',
+                                      project_id='project_id',
+                                      is_admin=True)
+        src_volume = testutils.create_volume(ctxt, host='src',
+                                             status='migrating')
+        dest_volume = testutils.create_volume(ctxt, host='dest',
+                                              status='migration_target')
+        db.finish_volume_migration(ctxt, src_volume['id'],
+                                   dest_volume['id'])
+
+        self.assertRaises(exception.VolumeNotFound, db.volume_get, ctxt,
+                          dest_volume['id'])
+        src_volume = db.volume_get(ctxt, src_volume['id'])
+        self.assertEqual(src_volume['host'], 'dest')
+        self.assertEqual(src_volume['status'], 'migrating')
diff --git a/cinder/tests/db/test_name_id.py b/cinder/tests/db/test_name_id.py
new file mode 100644 (file)
index 0000000..cdd206c
--- /dev/null
@@ -0,0 +1,52 @@
+# Copyright 2013 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Tests for volume name_id."""
+
+from oslo.config import cfg
+
+from cinder import context
+from cinder import db
+from cinder import test
+from cinder.tests import utils as testutils
+
+
+CONF = cfg.CONF
+
+
+class NameIDsTestCase(test.TestCase):
+    """Test cases for naming volumes with name_id."""
+
+    def setUp(self):
+        super(NameIDsTestCase, self).setUp()
+        self.ctxt = context.RequestContext(user_id='user_id',
+                                           project_id='project_id')
+
+    def tearDown(self):
+        super(NameIDsTestCase, self).tearDown()
+
+    def test_name_id_same(self):
+        """New volume should have same 'id' and 'name_id'."""
+        vol_ref = testutils.create_volume(self.ctxt, size=1)
+        self.assertEqual(vol_ref['name_id'], vol_ref['id'])
+        expected_name = CONF.volume_name_template % vol_ref['id']
+        self.assertEqual(vol_ref['name'], expected_name)
+
+    def test_name_id_diff(self):
+        """Change name ID to mimic volume after migration."""
+        vol_ref = testutils.create_volume(self.ctxt, size=1)
+        db.volume_update(self.ctxt, vol_ref['id'], {'name_id': 'fake'})
+        vol_ref = db.volume_get(self.ctxt, vol_ref['id'])
+        expected_name = CONF.volume_name_template % 'fake'
+        self.assertEqual(vol_ref['name'], expected_name)
index da0920fadea20f63917d58469d2f4558d344e033..55f4776db5d2a0bbd4d261fe9aba09dcac743300 100644 (file)
@@ -32,6 +32,7 @@
     "volume_extension:volume_admin_actions:force_delete": [["rule:admin_api"]],
     "volume_extension:snapshot_admin_actions:force_delete": [["rule:admin_api"]],
     "volume_extension:volume_admin_actions:force_detach": [["rule:admin_api"]],
+    "volume_extension:volume_admin_actions:migrate_volume": [["rule:admin_api"]],
     "volume_extension:volume_actions:upload_image": [],
     "volume_extension:types_manage": [],
     "volume_extension:types_extra_specs": [],
index 0a15511eb90ee868e18c04b0ed6972dfb8edc215..c747c62a6977a00f0fd3d2a1837584962de963c2 100644 (file)
@@ -227,3 +227,50 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
                          filter_properties['retry']['hosts'][0])
 
         self.assertEqual(1024, host_state.total_capacity_gb)
+
+    def _host_passes_filters_setup(self):
+        self.next_weight = 1.0
+
+        def _fake_weigh_objects(_self, functions, hosts, options):
+            self.next_weight += 2.0
+            host_state = hosts[0]
+            return [weights.WeighedHost(host_state, self.next_weight)]
+
+        sched = fakes.FakeFilterScheduler()
+        sched.host_manager = fakes.FakeHostManager()
+        fake_context = context.RequestContext('user', 'project',
+                                              is_admin=True)
+
+        self.stubs.Set(sched.host_manager, 'get_filtered_hosts',
+                       fake_get_filtered_hosts)
+        self.stubs.Set(weights.HostWeightHandler,
+                       'get_weighed_objects', _fake_weigh_objects)
+        fakes.mox_host_manager_db_calls(self.mox, fake_context)
+
+        self.mox.ReplayAll()
+        return (sched, fake_context)
+
+    @testtools.skipIf(not test_utils.is_cinder_installed(),
+                      'Test requires Cinder installed (try setup.py develop')
+    def test_host_passes_filters_happy_day(self):
+        """Do a successful pass through of with host_passes_filters()."""
+        sched, ctx = self._host_passes_filters_setup()
+        request_spec = {'volume_id': 1,
+                        'volume_type': {'name': 'LVM_iSCSI'},
+                        'volume_properties': {'project_id': 1,
+                                              'size': 1}}
+        ret_host = sched.host_passes_filters(ctx, 'host1', request_spec, {})
+        self.assertEqual(ret_host.host, 'host1')
+
+    @testtools.skipIf(not test_utils.is_cinder_installed(),
+                      'Test requires Cinder installed (try setup.py develop')
+    def test_host_passes_filters_no_capacity(self):
+        """Fail the host due to insufficient capacity."""
+        sched, ctx = self._host_passes_filters_setup()
+        request_spec = {'volume_id': 1,
+                        'volume_type': {'name': 'LVM_iSCSI'},
+                        'volume_properties': {'project_id': 1,
+                                              'size': 1024}}
+        self.assertRaises(exception.NoValidHost,
+                          sched.host_passes_filters,
+                          ctx, 'host1', request_spec, {})
index ce2509c0138a4120c93c434cc89122c82fe04f53..9d53ffa61af345efa4aa681ec0a597d9915945d1 100644 (file)
@@ -81,3 +81,14 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  request_spec='fake_request_spec',
                                  filter_properties='filter_properties',
                                  version='1.2')
+
+    def test_migrate_volume_to_host(self):
+        self._test_scheduler_api('migrate_volume_to_host',
+                                 rpc_method='cast',
+                                 topic='topic',
+                                 volume_id='volume_id',
+                                 host='host',
+                                 force_host_copy=True,
+                                 request_spec='fake_request_spec',
+                                 filter_properties='filter_properties',
+                                 version='1.3')
index cda27f197ff45453c7169c8a2e603b53654f716b..4802ce8085b1212f58661032469b58b71d6c6105 100644 (file)
@@ -83,7 +83,7 @@ class SchedulerManagerTestCase(test.TestCase):
             capabilities=capabilities)
 
     def test_create_volume_exception_puts_volume_in_error_state(self):
-        """Test that a NoValideHost exception for create_volume.
+        """Test NoValidHost exception behavior for create_volume.
 
         Puts the volume in 'error' state and eats the exception.
         """
@@ -105,6 +105,31 @@ class SchedulerManagerTestCase(test.TestCase):
                                    request_spec=request_spec,
                                    filter_properties={})
 
+    def test_migrate_volume_exception_puts_volume_in_error_state(self):
+        """Test NoValidHost exception behavior for migrate_volume_to_host.
+
+        Puts the volume in 'error_migrating' state and eats the exception.
+        """
+        fake_volume_id = 1
+        self._mox_schedule_method_helper('host_passes_filters')
+        self.mox.StubOutWithMock(db, 'volume_update')
+
+        topic = 'fake_topic'
+        volume_id = fake_volume_id
+        request_spec = {'volume_id': fake_volume_id}
+
+        self.manager.driver.host_passes_filters(
+            self.context, 'host',
+            request_spec, {}).AndRaise(exception.NoValidHost(reason=""))
+        db.volume_update(self.context, fake_volume_id,
+                         {'status': 'error_migrating'})
+
+        self.mox.ReplayAll()
+        self.manager.migrate_volume_to_host(self.context, topic, volume_id,
+                                            'host', True,
+                                            request_spec=request_spec,
+                                            filter_properties={})
+
     def _mox_schedule_method_helper(self, method_name):
         # Make sure the method exists that we're going to test call
         def stub_method(*args, **kwargs):
index b33d309f02fc10b1032f3182f1c7425bd089cc91..3076b1c5b45f1cd21a64d7783317299d757cd47a 100644 (file)
@@ -773,3 +773,29 @@ class TestMigrations(test.TestCase):
                                        metadata,
                                        autoload=True)
             self.assertTrue('provider_geometry' not in volumes.c)
+
+    def test_migration_014(self):
+        """Test that adding _name_id column works correctly."""
+        for (key, engine) in self.engines.items():
+            migration_api.version_control(engine,
+                                          TestMigrations.REPOSITORY,
+                                          migration.INIT_VERSION)
+            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 13)
+            metadata = sqlalchemy.schema.MetaData()
+            metadata.bind = engine
+
+            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 14)
+            volumes = sqlalchemy.Table('volumes',
+                                       metadata,
+                                       autoload=True)
+            self.assertTrue(isinstance(volumes.c._name_id.type,
+                                       sqlalchemy.types.VARCHAR))
+
+            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 13)
+            metadata = sqlalchemy.schema.MetaData()
+            metadata.bind = engine
+
+            volumes = sqlalchemy.Table('volumes',
+                                       metadata,
+                                       autoload=True)
+            self.assertTrue('_name_id' not in volumes.c)
index 41c389b6e6e0a769257da4a801cbca85e61b6936..b8e6a9a6eb5cfc7365323d3f4534f76ca0e0340f 100644 (file)
@@ -24,11 +24,13 @@ import datetime
 import os
 import re
 import shutil
+import socket
 import tempfile
 
 import mox
 from oslo.config import cfg
 
+from cinder.brick.initiator import connector as brick_conn
 from cinder.brick.iscsi import iscsi
 from cinder import context
 from cinder import db
@@ -46,6 +48,7 @@ from cinder.tests.image import fake as fake_image
 from cinder.volume import configuration as conf
 from cinder.volume import driver
 from cinder.volume.drivers import lvm
+from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volutils
 
 
@@ -1467,6 +1470,63 @@ class VolumeTestCase(test.TestCase):
 
         self.assertEqual(expected, azs)
 
+    def test_migrate_volume_driver(self):
+        """Test volume migration done by driver."""
+        # stub out driver and rpc functions
+        self.stubs.Set(self.volume.driver, 'migrate_volume',
+                       lambda x, y, z: (True, {'user_id': 'foo'}))
+
+        volume = self._create_volume(status='migrating')
+        host_obj = {'host': 'newhost', 'capabilities': {}}
+        self.volume.migrate_volume(self.context, volume['id'],
+                                   host_obj, False)
+
+        # check volume properties
+        volume = db.volume_get(context.get_admin_context(), volume['id'])
+        self.assertEquals(volume['host'], 'newhost')
+        self.assertEquals(volume['status'], 'available')
+
+    def test_migrate_volume_generic(self):
+        """Test the generic offline volume migration."""
+        def fake_migr(vol, host):
+            raise Exception('should not be called')
+
+        def fake_delete_volume_rpc(self, ctxt, vol_id):
+            raise Exception('should not be called')
+
+        def fake_create_volume(self, ctxt, volume, host, req_spec, filters):
+            db.volume_update(ctxt, volume['id'],
+                             {'status': 'migration_target'})
+
+        def fake_rename_volume(self, ctxt, volume, new_name_id):
+            db.volume_update(ctxt, volume['id'], {'name_id': new_name_id})
+
+        self.stubs.Set(self.volume.driver, 'migrate_volume', fake_migr)
+        self.stubs.Set(volume_rpcapi.VolumeAPI, 'create_volume',
+                       fake_create_volume)
+        self.stubs.Set(self.volume.driver, 'copy_volume_data',
+                       lambda x, y, z, remote='dest': True)
+        self.stubs.Set(volume_rpcapi.VolumeAPI, 'delete_volume',
+                       fake_delete_volume_rpc)
+        self.stubs.Set(volume_rpcapi.VolumeAPI, 'rename_volume',
+                       fake_rename_volume)
+
+        volume = self._create_volume(status='migrating')
+        host_obj = {'host': 'newhost', 'capabilities': {}}
+        self.volume.migrate_volume(self.context, volume['id'],
+                                   host_obj, True)
+        volume = db.volume_get(context.get_admin_context(), volume['id'])
+        self.assertEquals(volume['host'], 'newhost')
+        self.assertEquals(volume['status'], 'available')
+
+    def test_rename_volume(self):
+        self.stubs.Set(self.volume.driver, 'rename_volume',
+                       lambda x, y: None)
+        volume = self._create_volume()
+        self.volume.rename_volume(self.context, volume['id'], 'new_id')
+        volume = db.volume_get(context.get_admin_context(), volume['id'])
+        self.assertEquals(volume['name_id'], 'new_id')
+
 
 class DriverTestCase(test.TestCase):
     """Base Test class for Drivers."""
@@ -1508,9 +1568,9 @@ class DriverTestCase(test.TestCase):
             self.volume.delete_volume(self.context, volume_id)
 
 
-class VolumeDriverTestCase(DriverTestCase):
+class LVMISCSIVolumeDriverTestCase(DriverTestCase):
     """Test case for VolumeDriver"""
-    driver_name = "cinder.volume.drivers.lvm.LVMVolumeDriver"
+    driver_name = "cinder.volume.drivers.lvm.LVMISCSIDriver"
 
     def test_delete_busy_volume(self):
         """Test deleting a busy volume."""
@@ -1529,6 +1589,61 @@ class VolumeDriverTestCase(DriverTestCase):
         self.output = 'x'
         self.volume.driver.delete_volume({'name': 'test1', 'size': 1024})
 
+    def test_lvm_migrate_volume_no_loc_info(self):
+        host = {'capabilities': {}}
+        vol = {'name': 'test', 'id': 1, 'size': 1}
+        moved, model_update = self.volume.driver.migrate_volume(self.context,
+                                                                vol, host)
+        self.assertEqual(moved, False)
+        self.assertEqual(model_update, None)
+
+    def test_lvm_migrate_volume_bad_loc_info(self):
+        capabilities = {'location_info': 'foo'}
+        host = {'capabilities': capabilities}
+        vol = {'name': 'test', 'id': 1, 'size': 1}
+        moved, model_update = self.volume.driver.migrate_volume(self.context,
+                                                                vol, host)
+        self.assertEqual(moved, False)
+        self.assertEqual(model_update, None)
+
+    def test_lvm_migrate_volume_diff_driver(self):
+        capabilities = {'location_info': 'FooDriver:foo:bar'}
+        host = {'capabilities': capabilities}
+        vol = {'name': 'test', 'id': 1, 'size': 1}
+        moved, model_update = self.volume.driver.migrate_volume(self.context,
+                                                                vol, host)
+        self.assertEqual(moved, False)
+        self.assertEqual(model_update, None)
+
+    def test_lvm_migrate_volume_diff_host(self):
+        capabilities = {'location_info': 'LVMVolumeDriver:foo:bar'}
+        host = {'capabilities': capabilities}
+        vol = {'name': 'test', 'id': 1, 'size': 1}
+        moved, model_update = self.volume.driver.migrate_volume(self.context,
+                                                                vol, host)
+        self.assertEqual(moved, False)
+        self.assertEqual(model_update, None)
+
+    def test_lvm_migrate_volume_proceed(self):
+        hostname = socket.gethostname()
+        capabilities = {'location_info': 'LVMVolumeDriver:%s:bar' % hostname}
+        host = {'capabilities': capabilities}
+        vol = {'name': 'test', 'id': 1, 'size': 1}
+        self.stubs.Set(self.volume.driver, 'remove_export',
+                       lambda x, y: None)
+        self.stubs.Set(self.volume.driver, '_create_volume',
+                       lambda x, y, z: None)
+        self.stubs.Set(volutils, 'copy_volume',
+                       lambda x, y, z, sync=False, execute='foo': None)
+        self.stubs.Set(self.volume.driver, '_delete_volume',
+                       lambda x: None)
+        self.stubs.Set(self.volume.driver, '_create_export',
+                       lambda x, y, vg='vg': None)
+        moved, model_update = self.volume.driver.migrate_volume(self.context,
+                                                                vol, host)
+        self.assertEqual(moved, True)
+        self.assertEqual(model_update, None)
+
 
 class LVMVolumeDriverTestCase(DriverTestCase):
     """Test case for VolumeDriver"""
index 17871e340d486a6fd51939414c8b1a2be8174c98..2239396739adc17d12c8db5ea07219fb025fb513 100644 (file)
@@ -81,6 +81,12 @@ class VolumeRpcAPITestCase(test.TestCase):
             expected_msg['args']['snapshot_id'] = snapshot['id']
         if 'host' in expected_msg['args']:
             del expected_msg['args']['host']
+        if 'dest_host' in expected_msg['args']:
+            dest_host = expected_msg['args']['dest_host']
+            dest_host_dict = {'host': dest_host.host,
+                              'capabilities': dest_host.capabilities}
+            del expected_msg['args']['dest_host']
+            expected_msg['args']['host'] = dest_host_dict
 
         expected_msg['version'] = expected_version
 
@@ -195,3 +201,23 @@ class VolumeRpcAPITestCase(test.TestCase):
                               volume=self.fake_volume,
                               new_size=1,
                               version='1.6')
+
+    def test_migrate_volume(self):
+        class FakeHost(object):
+            def __init__(self):
+                self.host = 'host'
+                self.capabilities = {}
+        dest_host = FakeHost()
+        self._test_volume_api('migrate_volume',
+                              rpc_method='cast',
+                              volume=self.fake_volume,
+                              dest_host=dest_host,
+                              force_host_copy=True,
+                              version='1.8')
+
+    def test_rename_volume(self):
+        self._test_volume_api('rename_volume',
+                              rpc_method='call',
+                              volume=self.fake_volume,
+                              new_name_id='new_id',
+                              version='1.8')
index 042134a5c85218ddd77e90a96e1e41a6dd668c17..ccf38cba548711425497b80b1df37026ccb3550c 100644 (file)
 
 import os
 
-import cinder.context
+from cinder import context
+from cinder import db
 
 
 def get_test_admin_context():
-    return cinder.context.get_admin_context()
+    return context.get_admin_context()
 
 
 def is_cinder_installed():
@@ -30,3 +31,22 @@ def is_cinder_installed():
         return True
     else:
         return False
+
+
+def create_volume(ctxt,
+                  host='test_host',
+                  display_name='test_volume',
+                  display_description='this is a test volume',
+                  status='available',
+                  size=1):
+    """Create a volume object in the DB."""
+    vol = {}
+    vol['size'] = size
+    vol['host'] = host
+    vol['user_id'] = ctxt.user_id
+    vol['project_id'] = ctxt.project_id
+    vol['status'] = status
+    vol['display_name'] = display_name
+    vol['display_description'] = display_description
+    vol['attach_status'] = 'detached'
+    return db.volume_create(ctxt, vol)
index c00573113959e2a9bf0098a321bd3ce1b4c37f24..43d4a99e14f8cd759be1284d7da46375ce150a57 100644 (file)
@@ -36,6 +36,7 @@ import cinder.policy
 from cinder import quota
 from cinder.scheduler import rpcapi as scheduler_rpcapi
 from cinder import units
+from cinder import utils
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import volume_types
 
@@ -370,6 +371,11 @@ class API(base.Base):
             # Volume is still attached, need to detach first
             raise exception.VolumeAttached(volume_id=volume_id)
 
+        if volume['attach_status'] == "migrating":
+            # Volume is migrating, wait until done
+            msg = _("Volume cannot be deleted while migrating")
+            raise exception.InvalidVolume(reason=msg)
+
         snapshots = self.db.snapshot_get_all_for_volume(context, volume_id)
         if len(snapshots):
             msg = _("Volume still has %d dependent snapshots") % len(snapshots)
@@ -416,6 +422,10 @@ class API(base.Base):
                                                         marker, limit,
                                                         sort_key, sort_dir)
 
+        # Non-admin shouldn't see temporary target of a volume migration
+        if not context.is_admin:
+            filters['no_migration_targets'] = True
+
         if filters:
             LOG.debug(_("Searching by: %s") % str(filters))
 
@@ -430,8 +440,14 @@ class API(base.Base):
                         return False
                 return True
 
+            def _check_migration_target(volume, searchdict):
+                if not volume['status'].startswith('migration_target'):
+                    return True
+                return False
+
             # search_option to filter_name mapping.
-            filter_mapping = {'metadata': _check_metadata_match}
+            filter_mapping = {'metadata': _check_metadata_match,
+                              'no_migration_targets': _check_migration_target}
 
             result = []
             not_found = object()
@@ -815,6 +831,58 @@ class API(base.Base):
         self.update(context, volume, {'status': 'extending'})
         self.volume_rpcapi.extend_volume(context, volume, new_size)
 
+    def migrate_volume(self, context, volume, host, force_host_copy):
+        """Migrate the volume to the specified host."""
+
+        # We only handle "available" volumes for now
+        if volume['status'] != "available":
+            msg = _("status must be available")
+            LOG.error(msg)
+            raise exception.InvalidVolume(reason=msg)
+
+        # We only handle volumes without snapshots for now
+        snaps = self.db.snapshot_get_all_for_volume(context, volume['id'])
+        if snaps:
+            msg = _("volume must not have snapshots")
+            LOG.error(msg)
+            raise exception.InvalidVolume(reason=msg)
+
+        # Make sure the host is in the list of available hosts
+        elevated = context.elevated()
+        topic = CONF.volume_topic
+        services = self.db.service_get_all_by_topic(elevated, topic)
+        found = False
+        for service in services:
+            if utils.service_is_up(service) and service['host'] == host:
+                found = True
+        if not found:
+            msg = (_('No available service named %s') % host)
+            LOG.error(msg)
+            raise exception.InvalidHost(reason=msg)
+
+        # Make sure the destination host is different than the current one
+        if host == volume['host']:
+            msg = _('Destination host must be different than current host')
+            LOG.error(msg)
+            raise exception.InvalidHost(reason=msg)
+
+        self.update(context, volume, {'status': 'migrating'})
+
+        # Call the scheduler to ensure that the host exists and that it can
+        # accept the volume
+        volume_type = {}
+        if volume['volume_type_id']:
+            volume_types.get_volume_type(context, volume['volume_type_id'])
+        request_spec = {'volume_properties': volume,
+                        'volume_type': volume_type,
+                        'volume_id': volume['id']}
+        self.scheduler_rpcapi.migrate_volume_to_host(context,
+                                                     CONF.volume_topic,
+                                                     volume['id'],
+                                                     host,
+                                                     force_host_copy,
+                                                     request_spec)
+
 
 class HostAPI(base.Base):
     def __init__(self):
index bb088573da049554ad09197c9f5dd3402121ed94..97886e1ed367de15b2869473815db761e55a2f5c 100644 (file)
@@ -29,8 +29,11 @@ from oslo.config import cfg
 from cinder.brick.initiator import connector as initiator
 from cinder import exception
 from cinder.image import image_utils
+from cinder.openstack.common import excutils
 from cinder.openstack.common import log as logging
 from cinder import utils
+from cinder.volume import rpcapi as volume_rpcapi
+from cinder.volume import utils as volume_utils
 
 LOG = logging.getLogger(__name__)
 
@@ -190,21 +193,85 @@ class VolumeDriver(object):
         """Fail if connector doesn't contain all the data needed by driver"""
         pass
 
+    def _copy_volume_data_cleanup(self, context, volume, properties,
+                                  attach_info, remote, force=False):
+        self._detach_volume(attach_info)
+        if remote:
+            rpcapi = volume_rpcapi.VolumeAPI()
+            rpcapi.terminate_connection(context, volume, properties,
+                                        force=force)
+        else:
+            self.terminate_connection(volume, properties, force=False)
+
+    def copy_volume_data(self, context, src_vol, dest_vol, remote=None):
+        """Copy data from src_vol to dest_vol."""
+        LOG.debug(_('copy_data_between_volumes %(src)s -> %(dest)s.')
+                  % {'src': src_vol['name'], 'dest': dest_vol['name']})
+
+        properties = initiator.get_connector_properties()
+        dest_remote = True if remote in ['dest', 'both'] else False
+        dest_orig_status = dest_vol['status']
+        try:
+            dest_attach_info = self._attach_volume(context,
+                                                   dest_vol,
+                                                   properties,
+                                                   remote=dest_remote)
+        except Exception:
+            with excutils.save_and_reraise_exception():
+                msg = _("Failed to attach volume %(vol)s")
+                LOG.error(msg % {'vol': dest_vol['id']})
+                self.db.volume_update(context, dest_vol['id'],
+                                      {'status': dest_orig_status})
+
+        src_remote = True if remote in ['src', 'both'] else False
+        src_orig_status = src_vol['status']
+        try:
+            src_attach_info = self._attach_volume(context,
+                                                  src_vol,
+                                                  properties,
+                                                  remote=src_remote)
+        except Exception:
+            with excutils.save_and_reraise_exception():
+                msg = _("Failed to attach volume %(vol)s")
+                LOG.error(msg % {'vol': src_vol['id']})
+                self.db.volume_update(context, src_vol['id'],
+                                      {'status': src_orig_status})
+                self._copy_volume_data_cleanup(context, dest_vol, properties,
+                                               dest_attach_info, dest_remote,
+                                               force=True)
+
+        try:
+            volume_utils.copy_volume(src_attach_info['device']['path'],
+                                     dest_attach_info['device']['path'],
+                                     src_vol['size'])
+            copy_error = False
+        except Exception:
+            with excutils.save_and_reraise_exception():
+                msg = _("Failed to copy volume %(src)s to %(dest)d")
+                LOG.error(msg % {'src': src_vol['id'], 'dest': dest_vol['id']})
+                copy_error = True
+        finally:
+            self._copy_volume_data_cleanup(context, dest_vol, properties,
+                                           dest_attach_info, dest_remote,
+                                           force=copy_error)
+            self._copy_volume_data_cleanup(context, src_vol, properties,
+                                           src_attach_info, src_remote,
+                                           force=copy_error)
+
     def copy_image_to_volume(self, context, volume, image_service, image_id):
         """Fetch the image from image_service and write it to the volume."""
         LOG.debug(_('copy_image_to_volume %s.') % volume['name'])
 
         properties = initiator.get_connector_properties()
-        connection, device, connector = self._attach_volume(context, volume,
-                                                            properties)
+        attach_info = self._attach_volume(context, volume, properties)
 
         try:
             image_utils.fetch_to_raw(context,
                                      image_service,
                                      image_id,
-                                     device['path'])
+                                     attach_info['device']['path'])
         finally:
-            self._detach_volume(connection, device, connector)
+            self._detach_volume(attach_info)
             self.terminate_connection(volume, properties)
 
     def copy_volume_to_image(self, context, volume, image_service, image_meta):
@@ -212,22 +279,24 @@ class VolumeDriver(object):
         LOG.debug(_('copy_volume_to_image %s.') % volume['name'])
 
         properties = initiator.get_connector_properties()
-        connection, device, connector = self._attach_volume(context, volume,
-                                                            properties)
+        attach_info = self._attach_volume(context, volume, properties)
 
         try:
             image_utils.upload_volume(context,
                                       image_service,
                                       image_meta,
-                                      device['path'])
+                                      attach_info['device']['path'])
         finally:
-            self._detach_volume(connection, device, connector)
+            self._detach_volume(attach_info)
             self.terminate_connection(volume, properties)
 
-    def _attach_volume(self, context, volume, properties):
+    def _attach_volume(self, context, volume, properties, remote=False):
         """Attach the volume."""
-        host_device = None
-        conn = self.initialize_connection(volume, properties)
+        if remote:
+            rpcapi = volume_rpcapi.VolumeAPI()
+            conn = rpcapi.initialize_connection(context, volume, properties)
+        else:
+            conn = self.initialize_connection(volume, properties)
 
         # Use Brick's code to do attach/detach
         use_multipath = self.configuration.use_multipath_for_image_xfer
@@ -245,13 +314,14 @@ class VolumeDriver(object):
                                                         "via the path "
                                                         "%(path)s.") %
                                                       {'path': host_device}))
-        return conn, device, connector
+        return {'conn': conn, 'device': device, 'connector': connector}
 
-    def _detach_volume(self, connection, device, connector):
+    def _detach_volume(self, attach_info):
         """Disconnect the volume from the host."""
-        protocol = connection['driver_volume_type']
         # Use Brick's code to do attach/detach
-        connector.disconnect_volume(connection['data'], device)
+        connector = attach_info['connector']
+        connector.disconnect_volume(attach_info['conn']['data'],
+                                    attach_info['device'])
 
     def clone_image(self, volume, image_location):
         """Create a volume efficiently from an existing image.
@@ -281,6 +351,22 @@ class VolumeDriver(object):
         msg = _("Extend volume not implemented")
         raise NotImplementedError(msg)
 
+    def migrate_volume(self, context, volume, host):
+        """Migrate the volume to the specified host.
+
+        Returns a boolean indicating whether the migration occurred, as well as
+        model_update.
+        """
+        return (False, None)
+
+    def rename_volume(self, volume, orig_name):
+        """Rename the volume according to the volume object.
+
+        The original name is passed for reference, and the function can return
+        model_update.
+        """
+        return None
+
 
 class ISCSIDriver(VolumeDriver):
     """Executes commands relating to ISCSI volumes.
index 18640c077df8d6ba9f199ebc50cfb6d41a122696..2a9af298166e3cb09dffaec8c9c7c738202de546 100644 (file)
@@ -23,6 +23,7 @@ Driver for Linux servers running LVM.
 import math
 import os
 import re
+import socket
 
 from oslo.config import cfg
 
@@ -70,6 +71,7 @@ class LVMVolumeDriver(driver.VolumeDriver):
     def __init__(self, *args, **kwargs):
         super(LVMVolumeDriver, self).__init__(*args, **kwargs)
         self.configuration.append_config_values(volume_opts)
+        self.hostname = socket.gethostname()
 
     def check_for_setup_error(self):
         """Returns an error if prerequisites aren't met"""
@@ -81,13 +83,13 @@ class LVMVolumeDriver(driver.VolumeDriver):
                                  % self.configuration.volume_group)
             raise exception.VolumeBackendAPIException(data=exception_message)
 
-    def _create_volume(self, volume_name, sizestr):
-
+    def _create_volume(self, volume_name, sizestr, vg=None):
+        if vg is None:
+            vg = self.configuration.volume_group
         no_retry_list = ['Insufficient free extents',
                          'One or more specified logical volume(s) not found']
 
-        cmd = ['lvcreate', '-L', sizestr, '-n', volume_name,
-               self.configuration.volume_group]
+        cmd = ['lvcreate', '-L', sizestr, '-n', volume_name, vg]
         if self.configuration.lvm_mirrors:
             cmd += ['-m', self.configuration.lvm_mirrors, '--nosync']
             terras = int(sizestr[:-1]) / 1024.0
@@ -225,9 +227,11 @@ class LVMVolumeDriver(driver.VolumeDriver):
         # it's quite slow.
         self._delete_volume(snapshot)
 
-    def local_path(self, volume):
+    def local_path(self, volume, vg=None):
+        if vg is None:
+            vg = self.configuration.volume_group
         # NOTE(vish): stops deprecation warning
-        escaped_group = self.configuration.volume_group.replace('-', '--')
+        escaped_group = vg.replace('-', '--')
         escaped_name = self._escape_snapshot(volume['name']).replace('-', '--')
         return "/dev/mapper/%s-%s" % (escaped_group, escaped_name)
 
@@ -442,12 +446,16 @@ class LVMISCSIDriver(LVMVolumeDriver, driver.ISCSIDriver):
                 self.db.iscsi_target_create_safe(context, target)
 
     def create_export(self, context, volume):
+        return self._create_export(context, volume)
+
+    def _create_export(self, context, volume, vg=None):
         """Creates an export for a logical volume."""
+        if vg is None:
+            vg = self.configuration.volume_group
 
         iscsi_name = "%s%s" % (self.configuration.iscsi_target_prefix,
                                volume['name'])
-        volume_path = "/dev/%s/%s" % (self.configuration.volume_group,
-                                      volume['name'])
+        volume_path = "/dev/%s/%s" % (vg, volume['name'])
         model_update = {}
 
         # TODO(jdg): In the future move all of the dependent stuff into the
@@ -530,6 +538,42 @@ class LVMISCSIDriver(LVMVolumeDriver, driver.ISCSIDriver):
 
         self.tgtadm.remove_iscsi_target(iscsi_target, 0, volume['id'])
 
+    def migrate_volume(self, ctxt, volume, host):
+        """Optimize the migration if the destination is on the same server.
+
+        If the specified host is another back-end on the same server, and
+        the volume is not attached, we can do the migration locally without
+        going through iSCSI.
+        """
+        false_ret = (False, None)
+        if 'location_info' not in host['capabilities']:
+            return false_ret
+        info = host['capabilities']['location_info']
+        try:
+            (dest_type, dest_hostname, dest_vg) = info.split(':')
+        except ValueError:
+            return false_ret
+        if (dest_type != 'LVMVolumeDriver' or dest_hostname != self.hostname):
+            return false_ret
+
+        self.remove_export(ctxt, volume)
+        self._create_volume(volume['name'],
+                            self._sizestr(volume['size']),
+                            dest_vg)
+        volutils.copy_volume(self.local_path(volume),
+                             self.local_path(volume, vg=dest_vg),
+                             volume['size'],
+                             execute=self._execute)
+        self._delete_volume(volume)
+        model_update = self._create_export(ctxt, volume, vg=dest_vg)
+
+        return (True, model_update)
+
+    def rename_volume(self, volume, orig_name):
+        self._execute('lvrename', self.configuration.volume_group,
+                      orig_name, volume['name'],
+                      run_as_root=True)
+
     def get_volume_stats(self, refresh=False):
         """Get volume status.
 
@@ -559,6 +603,9 @@ class LVMISCSIDriver(LVMVolumeDriver, driver.ISCSIDriver):
         data['free_capacity_gb'] = 0
         data['reserved_percentage'] = self.configuration.reserved_percentage
         data['QoS_support'] = False
+        data['location_info'] = ('LVMVolumeDriver:%(hostname)s:%(vg)s' %
+                                 {'hostname': self.hostname,
+                                 'vg': self.configuration.volume_group})
 
         try:
             out, err = self._execute('vgs', '--noheadings', '--nosuffix',
@@ -682,4 +729,7 @@ class ThinLVMVolumeDriver(LVMISCSIDriver):
         data['QoS_support'] = False
         data['total_capacity_gb'] = 'infinite'
         data['free_capacity_gb'] = 'infinite'
+        data['location_info'] = ('LVMVolumeDriver:%(hostname)s:%(vg)s' %
+                                 {'hostname': self.hostname,
+                                 'vg': self.configuration.volume_group})
         self._stats = data
index 6b0fb3e2225383441871279ca61cf7e96a2c77c2..2f9fb3f3a0c39ce09dba7bc7c58004bead711171 100644 (file)
@@ -39,10 +39,12 @@ intact.
 
 
 import sys
+import time
 import traceback
 
 from oslo.config import cfg
 
+from cinder.brick.initiator import connector as initiator
 from cinder import context
 from cinder import exception
 from cinder.image import glance
@@ -56,6 +58,7 @@ from cinder.openstack.common import uuidutils
 from cinder import quota
 from cinder import utils
 from cinder.volume.configuration import Configuration
+from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volume_utils
 
 LOG = logging.getLogger(__name__)
@@ -66,6 +69,10 @@ volume_manager_opts = [
     cfg.StrOpt('volume_driver',
                default='cinder.volume.drivers.lvm.LVMISCSIDriver',
                help='Driver to use for volume creation'),
+    cfg.IntOpt('migration_create_volume_timeout_secs',
+               default=300,
+               help='Timeout for creating the volume to migrate to '
+                    'when performing volume migration (seconds)'),
 ]
 
 CONF = cfg.CONF
@@ -104,7 +111,7 @@ MAPPING = {
 class VolumeManager(manager.SchedulerDependentManager):
     """Manages attachable block storage devices."""
 
-    RPC_API_VERSION = '1.7'
+    RPC_API_VERSION = '1.8'
 
     def __init__(self, volume_driver=None, service_name=None,
                  *args, **kwargs):
@@ -226,7 +233,10 @@ class VolumeManager(manager.SchedulerDependentManager):
         #             before passing it to the driver.
         volume_ref['host'] = self.host
 
-        status = 'available'
+        if volume_ref['status'] == 'migration_target_creating':
+            status = 'migration_target'
+        else:
+            status = 'available'
         model_update = False
         image_meta = None
         cloned = False
@@ -478,6 +488,11 @@ class VolumeManager(manager.SchedulerDependentManager):
                                       volume_ref['id'],
                                       {'status': 'error_deleting'})
 
+        # If deleting the source volume in a migration, we want to skip quotas
+        # and other database updates.
+        if volume_ref['status'] == 'migrating':
+            return True
+
         # Get reservations
         try:
             reserve_opts = {'volumes': -1, 'gigabytes': -volume_ref['size']}
@@ -767,6 +782,123 @@ class VolumeManager(manager.SchedulerDependentManager):
         volume_ref = self.db.volume_get(context, volume_id)
         self.driver.accept_transfer(volume_ref)
 
+    def _migrate_volume_generic(self, ctxt, volume, host):
+        rpcapi = volume_rpcapi.VolumeAPI()
+
+        # Create new volume on remote host
+        new_vol_values = {}
+        for k, v in volume.iteritems():
+            new_vol_values[k] = v
+        del new_vol_values['id']
+        new_vol_values['host'] = host['host']
+        new_vol_values['status'] = 'migration_target_creating'
+        new_volume = self.db.volume_create(ctxt, new_vol_values)
+        rpcapi.create_volume(ctxt, new_volume, host['host'],
+                             None, None)
+
+        # Wait for new_volume to become ready
+        starttime = time.time()
+        deadline = starttime + CONF.migration_create_volume_timeout_secs
+        new_volume = self.db.volume_get(ctxt, new_volume['id'])
+        tries = 0
+        while new_volume['status'] != 'migration_target':
+            tries = tries + 1
+            now = time.time()
+            if new_volume['status'] == 'error':
+                msg = _("failed to create new_volume on destination host")
+                raise exception.VolumeMigrationFailed(reason=msg)
+            elif now > deadline:
+                msg = _("timeout creating new_volume on destination host")
+                raise exception.VolumeMigrationFailed(reason=msg)
+            else:
+                time.sleep(tries ** 2)
+            new_volume = self.db.volume_get(ctxt, new_volume['id'])
+
+        # Copy the source volume to the destination volume
+        try:
+            self.driver.copy_volume_data(ctxt, volume, new_volume,
+                                         remote='dest')
+        except Exception:
+            with excutils.save_and_reraise_exception():
+                msg = _("Failed to copy volume %(vol1)s to %(vol2)s")
+                LOG.error(msg % {'vol1': volume['id'],
+                                 'vol2': new_volume['id']})
+                rpcapi.delete_volume(ctxt, volume)
+
+        # Delete the source volume (if it fails, don't fail the migration)
+        try:
+            self.delete_volume(ctxt, volume['id'])
+        except Exception as ex:
+            msg = _("Failed to delete migration source vol %(vol)s: %(err)s")
+            LOG.error(msg % {'vol': volume['id'], 'err': ex})
+
+        # Rename the destination volume to the name of the source volume.
+        # We rename rather than create the destination with the same as the
+        # source because: (a) some backends require unique names between pools
+        # in addition to within pools, and (b) we want to enable migration
+        # within one pool (for example, changing a volume's type by creating a
+        # new volume and copying the data over)
+        try:
+            rpcapi.rename_volume(ctxt, new_volume, volume['id'])
+        except Exception:
+            msg = _("Failed to rename migration destination volume "
+                    "%(vol)s to %(name)s")
+            LOG.error(msg % {'vol': new_volume['id'], 'name': volume['name']})
+
+        self.db.finish_volume_migration(ctxt, volume['id'], new_volume['id'])
+
+    def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False):
+        """Migrate the volume to the specified host (called on source host)."""
+        volume_ref = self.db.volume_get(ctxt, volume_id)
+        model_update = None
+        moved = False
+        if not force_host_copy:
+            try:
+                LOG.debug(_("volume %s: calling driver migrate_volume"),
+                          volume_ref['name'])
+                moved, model_update = self.driver.migrate_volume(ctxt,
+                                                                 volume_ref,
+                                                                 host)
+                if moved:
+                    updates = {'host': host['host']}
+                    if model_update:
+                        updates.update(model_update)
+                    volume_ref = self.db.volume_update(ctxt,
+                                                       volume_ref['id'],
+                                                       updates)
+            except Exception:
+                with excutils.save_and_reraise_exception():
+                    updates = {'status': 'error_migrating'}
+                    model_update = self.driver.create_export(ctxt, volume_ref)
+                    if model_update:
+                        updates.update(model_update)
+                    self.db.volume_update(ctxt, volume_ref['id'], updates)
+        if not moved:
+            try:
+                self._migrate_volume_generic(ctxt, volume_ref, host)
+            except Exception:
+                with excutils.save_and_reraise_exception():
+                    updates = {'status': 'error_migrating'}
+                    model_update = self.driver.create_export(ctxt, volume_ref)
+                    if model_update:
+                        updates.update(model_update)
+                    self.db.volume_update(ctxt, volume_ref['id'], updates)
+        self.db.volume_update(ctxt, volume_ref['id'],
+                              {'status': 'available'})
+
+    def rename_volume(self, ctxt, volume_id, new_name_id):
+        volume_ref = self.db.volume_get(ctxt, volume_id)
+        orig_name = volume_ref['name']
+        self.driver.remove_export(ctxt, volume_ref)
+        self.db.volume_update(ctxt, volume_id, {'name_id': new_name_id})
+        volume_ref = self.db.volume_get(ctxt, volume_id)
+        model_update = self.driver.rename_volume(volume_ref, orig_name)
+        if model_update:
+            self.db.volume_update(ctxt, volume_ref['id'], model_update)
+        model_update = self.driver.create_export(ctxt, volume_ref)
+        if model_update:
+            self.db.volume_update(ctxt, volume_ref['id'], model_update)
+
     @periodic_task.periodic_task
     def _report_driver_status(self, context):
         LOG.info(_("Updating volume status"))
index c72446fe4fadff720678c914bd4446dcc13c95ac..b428f79340b4ddc11d7df5e16148c21a31ab6724 100644 (file)
@@ -18,7 +18,6 @@
 Client side of the volume RPC API.
 """
 
-
 from oslo.config import cfg
 
 from cinder.openstack.common import rpc
@@ -43,6 +42,7 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
         1.6 - Add extend_volume.
         1.7 - Adds host_name parameter to attach_volume()
               to allow attaching to host rather than instance.
+        1.8 - Add migrate_volume, rename_volume.
     '''
 
     BASE_RPC_API_VERSION = '1.0'
@@ -151,3 +151,22 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
                                 new_size=new_size),
                   topic=rpc.queue_get_for(ctxt, self.topic, volume['host']),
                   version='1.6')
+
+    def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
+        host_p = {'host': dest_host.host,
+                  'capabilities': dest_host.capabilities}
+        self.cast(ctxt,
+                  self.make_msg('migrate_volume',
+                                volume_id=volume['id'],
+                                host=host_p,
+                                force_host_copy=force_host_copy),
+                  topic=rpc.queue_get_for(ctxt, self.topic, volume['host']),
+                  version='1.8')
+
+    def rename_volume(self, ctxt, volume, new_name_id):
+        self.call(ctxt,
+                  self.make_msg('rename_volume',
+                                volume_id=volume['id'],
+                                new_name_id=new_name_id),
+                  topic=rpc.queue_get_for(ctxt, self.topic, volume['host']),
+                  version='1.8')
index da120983d6190c15c1de9d70b68f47e57408e70e..c62d83138a4b871fd3336e5d396bb2bdb25ed622 100644 (file)
@@ -25,6 +25,7 @@
     "volume_extension:snapshot_admin_actions:reset_status": [["rule:admin_api"]],
     "volume_extension:volume_admin_actions:force_delete": [["rule:admin_api"]],
     "volume_extension:snapshot_admin_actions:force_delete": [["rule:admin_api"]],
+    "volume_extension:volume_admin_actions:migrate_volume": [["rule:admin_api"]],
 
     "volume_extension:volume_host_attribute": [["rule:admin_api"]],
     "volume_extension:volume_tenant_attribute": [["rule:admin_api"]],
index 1b90415c164366876c0eddcf1fd8c52486470a25..53c7a8bfac21997d13998d62a2faf6b63967b48e 100644 (file)
@@ -24,6 +24,9 @@ lvremove: CommandFilter, lvremove, root
 # cinder/volume/driver.py: 'lvdisplay', '--noheading', '-C', '-o', 'Attr',..
 lvdisplay: CommandFilter, lvdisplay, root
 
+# cinder/volume/driver.py: 'lvrename', '%(vg)s', '%(orig)s' '(new)s'...
+lvrename: CommandFilter, lvrename, root
+
 # cinder/volume/driver.py: 'iscsiadm', '-m', 'discovery', '-t',...
 # cinder/volume/driver.py: 'iscsiadm', '-m', 'node', '-T', ...
 iscsiadm: CommandFilter, iscsiadm, root