]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Assisted volume migration for Oracle ZFSSA drivers
authorkedar-vidvans <kedar.vidvans@oracle.com>
Fri, 7 Aug 2015 21:23:46 +0000 (17:23 -0400)
committerkedar-vidvans <kedar.vidvans@oracle.com>
Mon, 24 Aug 2015 20:03:31 +0000 (16:03 -0400)
The Oracle ZFSSA drivers will migrate the volume directly from the
source host to the destination host if both hosts are configured
with Oracle ZFSSA. This type of volume migration will bypass the
cinder host and will directly transfer the volume to the destination
backend.

The ZFSSA iSCSI driver will use zfssa's remote replication service
to move the the volume to the destination zfssa.

Implements: blueprint oracle-zfssa-volume-migration
Change-Id: I4de8108e3c6c0969c83e87eac139f04b4fe8c7ac

cinder/tests/unit/test_zfssa.py
cinder/volume/drivers/zfssa/zfssaiscsi.py
cinder/volume/drivers/zfssa/zfssanfs.py
cinder/volume/drivers/zfssa/zfssarest.py

index f519c5ca9d3f6fa483764165ad80fe433722f539..b3aed1a9dc75d3cb526c0dcb26ad9ac65ef236b1 100644 (file)
@@ -19,6 +19,7 @@ import mock
 from oslo_utils import units
 import six
 
+from cinder import context
 from cinder import exception
 from cinder import test
 from cinder.tests.unit import fake_utils
@@ -104,6 +105,171 @@ class TestZFSSAISCSIDriver(test.TestCase):
         self.configuration.zfssa_rest_timeout = 60
         self.configuration.volume_backend_name = 'fake_zfssa'
         self.configuration.safe_get = self.fake_safe_get
+        self.configuration.zfssa_replication_ip = '1.1.1.1'
+
+    def _util_migrate_volume_exceptions(self):
+        self.drv.zfssa.get_lun.return_value = (
+            {'targetgroup': 'test-target-grp1'})
+        self.drv.zfssa.get_asn.return_value = (
+            '9a2b5a0f-e3af-6d14-9578-8825f229dc89')
+        self.drv.tgt_zfssa.get_asn.return_value = (
+            '9a2b5a0f-e3af-6d14-9578-8825f229dc89')
+        targets = {'targets': [{'hostname': '2.2.2.2',
+                                'address': '2.2.2.2:216',
+                                'label': '2.2.2.2',
+                                'asn':
+                                '9a2b5a0f-e3af-6d14-9578-8825f229dc89'}]}
+
+        self.drv.zfssa.get_replication_targets.return_value = targets
+        self.drv.zfssa.edit_inherit_replication_flag.return_value = {}
+        self.drv.zfssa.create_replication_action.return_value = 'action-123'
+        self.drv.zfssa.send_repl_update.return_value = True
+
+    def test_migrate_volume(self):
+        self._util_migrate_volume_exceptions()
+
+        volume = self.test_vol
+        volume.update({'host': 'fake_host',
+                       'status': 'available',
+                       'name': 'vol-1',
+                       'source_volid': self.test_vol['id']})
+
+        loc_info = '2.2.2.2:fake_auth:pool2:project2:test-target-grp1:2.2.2.2'
+
+        host = {'host': 'stack@zfssa_iscsi#fake_zfssa',
+                'capabilities': {'vendor_name': 'Oracle',
+                                 'storage_protocol': 'iSCSI',
+                                 'location_info': loc_info}}
+        ctxt = context.get_admin_context()
+
+        # Test the normal case
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((True, None), result)
+
+        # Test when volume status is not available
+        volume['status'] = 'in-use'
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+        volume['status'] = 'available'
+
+        # Test when vendor is not Oracle
+        host['capabilities']['vendor_name'] = 'elcarO'
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+        host['capabilities']['vendor_name'] = 'Oracle'
+
+        # Test when storage protocol is not iSCSI
+        host['capabilities']['storage_protocol'] = 'not_iSCSI'
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+        host['capabilities']['storage_protocol'] = 'iSCSI'
+
+        # Test when location_info is incorrect
+        host['capabilities']['location_info'] = ''
+        self.assertEqual((False, None), result)
+        host['capabilities']['location_info'] = loc_info
+
+        # Test if replication ip and replication target's address dont match
+        invalid_loc_info = (
+            '2.2.2.2:fake_auth:pool2:project2:test-target-grp1:9.9.9.9')
+        host['capabilities']['location_info'] = invalid_loc_info
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+        host['capabilities']['location_info'] = loc_info
+
+        # Test if no targets are returned
+        self.drv.zfssa.get_replication_targets.return_value = {'targets': []}
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+
+    def test_migrate_volume_uninherit_exception(self):
+        self._util_migrate_volume_exceptions()
+
+        volume = self.test_vol
+        volume.update({'host': 'fake_host',
+                       'status': 'available',
+                       'name': 'vol-1',
+                       'source_volid': self.test_vol['id']})
+
+        loc_info = '2.2.2.2:fake_auth:pool2:project2:test-target-grp1:2.2.2.2'
+
+        host = {'host': 'stack@zfssa_iscsi#fake_zfssa',
+                'capabilities': {'vendor_name': 'Oracle',
+                                 'storage_protocol': 'iSCSI',
+                                 'location_info': loc_info}}
+        ctxt = context.get_admin_context()
+
+        self.drv.zfssa.edit_inherit_replication_flag.side_effect = (
+            exception.VolumeBackendAPIException(data='uniherit ex'))
+        self.assertRaises(exception.VolumeBackendAPIException,
+                          self.drv.migrate_volume, ctxt, volume, host)
+
+    def test_migrate_volume_create_action_exception(self):
+        self._util_migrate_volume_exceptions()
+
+        volume = self.test_vol
+        volume.update({'host': 'fake_host',
+                       'status': 'available',
+                       'name': 'vol-1',
+                       'source_volid': self.test_vol['id']})
+
+        loc_info = '2.2.2.2:fake_auth:pool2:project2:test-target-grp1:2.2.2.2'
+
+        host = {'host': 'stack@zfssa_iscsi#fake_zfssa',
+                'capabilities': {'vendor_name': 'Oracle',
+                                 'storage_protocol': 'iSCSI',
+                                 'location_info': loc_info}}
+        ctxt = context.get_admin_context()
+
+        self.drv.zfssa.create_replication_action.side_effect = (
+            exception.VolumeBackendAPIException(data=
+                                                'failed to create action'))
+        self.assertRaises(exception.VolumeBackendAPIException,
+                          self.drv.migrate_volume, ctxt, volume, host)
+
+    def test_migrate_volume_send_update_exception(self):
+        self._util_migrate_volume_exceptions()
+
+        volume = self.test_vol
+        volume.update({'host': 'fake_host',
+                       'status': 'available',
+                       'name': 'vol-1',
+                       'source_volid': self.test_vol['id']})
+
+        loc_info = '2.2.2.2:fake_auth:pool2:project2:test-target-grp1:2.2.2.2'
+
+        host = {'host': 'stack@zfssa_iscsi#fake_zfssa',
+                'capabilities': {'vendor_name': 'Oracle',
+                                 'storage_protocol': 'iSCSI',
+                                 'location_info': loc_info}}
+        ctxt = context.get_admin_context()
+
+        self.drv.zfssa.send_repl_update.side_effect = (
+            exception.VolumeBackendAPIException(data='failed to send update'))
+        self.assertRaises(exception.VolumeBackendAPIException,
+                          self.drv.migrate_volume, ctxt, volume, host)
+
+    def test_migrate_volume_sever_repl_exception(self):
+        self._util_migrate_volume_exceptions()
+
+        volume = self.test_vol
+        volume.update({'host': 'fake_host',
+                       'status': 'available',
+                       'name': 'vol-1',
+                       'source_volid': self.test_vol['id']})
+
+        loc_info = '2.2.2.2:fake_auth:pool2:project2:test-target-grp1:2.2.2.2'
+
+        host = {'host': 'stack@zfssa_iscsi#fake_zfssa',
+                'capabilities': {'vendor_name': 'Oracle',
+                                 'storage_protocol': 'iSCSI',
+                                 'location_info': loc_info}}
+        ctxt = context.get_admin_context()
+        self.drv.tgt_zfssa.sever_replication.side_effect = (
+            exception.VolumeBackendAPIException(data=
+                                                'failed to sever replication'))
+        self.assertRaises(exception.VolumeBackendAPIException,
+                          self.drv.migrate_volume, ctxt, volume, host)
 
     def test_create_delete_volume(self):
         self.drv.zfssa.get_lun.return_value = {'guid':
@@ -249,7 +415,8 @@ class TestZFSSANFSDriver(test.TestCase):
 
     test_vol = {
         'name': 'test-vol',
-        'size': 1
+        'size': 1,
+        'id': '1'
     }
 
     test_snap = {
@@ -292,6 +459,66 @@ class TestZFSSANFSDriver(test.TestCase):
         self.configuration.nfs_oversub_ratio = 1
         self.configuration.nfs_used_ratio = 1
 
+    def test_migrate_volume(self):
+        self.drv.zfssa.get_asn.return_value = (
+            '9a2b5a0f-e3af-6d14-9578-8825f229dc89')
+        volume = self.test_vol
+        volume.update({'host': 'fake_host',
+                       'status': 'available',
+                       'name': 'vol-1',
+                       'source_volid': self.test_vol['id']})
+
+        loc_info = '9a2b5a0f-e3af-6d14-9578-8825f229dc89:nfs_share'
+
+        host = {'host': 'stack@zfssa_nfs#fake_zfssa',
+                'capabilities': {'vendor_name': 'Oracle',
+                                 'storage_protocol': 'nfs',
+                                 'location_info': loc_info}}
+        ctxt = context.get_admin_context()
+
+        # Test Normal case
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((True, None), result)
+
+        # Test when volume status is not available
+        volume['status'] = 'in-use'
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+        volume['status'] = 'available'
+
+        # Test when Vendor is not Oracle
+        host['capabilities']['vendor_name'] = 'elcarO'
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+        host['capabilities']['vendor_name'] = 'Oracle'
+
+        # Test when storage protocol is not iSCSI
+        host['capabilities']['storage_protocol'] = 'not_nfs'
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+        host['capabilities']['storage_protocol'] = 'nfs'
+
+        # Test for exceptions
+        host['capabilities']['location_info'] = ''
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+        host['capabilities']['location_info'] = loc_info
+
+        # Test case when source and target asn dont match
+        invalid_loc_info = (
+            'fake_asn*https://2.2.2.2:/shares/export/nfs_share*nfs_share')
+        host['capabilities']['location_info'] = invalid_loc_info
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+
+        # Test case when source and target shares names are different
+        invalid_loc_info = (
+            '9a2b5a0f-e3af-6d14-9578-8825f229dc89*' +
+            'https://tgt:/shares/export/nfs_share*nfs_share_1')
+        host['capabilities']['location_info'] = invalid_loc_info
+        result = self.drv.migrate_volume(ctxt, volume, host)
+        self.assertEqual((False, None), result)
+
     def test_create_delete_snapshot(self):
         lcfg = self.configuration
         self.drv.create_snapshot(self.test_snap)
index 933daac9dc118bfeb98f18d4c220fa9f5c9effd1..5b9044296c4750c8658cef990fb0d19ce9981c62 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -28,6 +28,10 @@ from cinder.volume.drivers.san import san
 from cinder.volume.drivers.zfssa import zfssarest
 from cinder.volume import volume_types
 
+import taskflow.engines
+from taskflow.patterns import linear_flow as lf
+from taskflow import task
+
 CONF = cfg.CONF
 LOG = log.getLogger(__name__)
 
@@ -69,7 +73,10 @@ ZFSSA_OPTS = [
     cfg.StrOpt('zfssa_target_interfaces',
                help='Network interfaces of iSCSI targets. (comma separated)'),
     cfg.IntOpt('zfssa_rest_timeout',
-               help='REST connection timeout. (seconds)')
+               help='REST connection timeout. (seconds)'),
+    cfg.StrOpt('zfssa_replication_ip', default='',
+               help='IP address used for replication data. (maybe the same as '
+                    'data ip)')
 
 ]
 
@@ -88,9 +95,13 @@ def factory_zfssa():
 
 
 class ZFSSAISCSIDriver(driver.ISCSIDriver):
-    """ZFSSA Cinder iSCSI volume driver."""
+    """ZFSSA Cinder iSCSI volume driver.
+
+    Version history:
+    1.0.1: Backend enabled volume migration.
+    """
 
-    VERSION = '1.0.0'
+    VERSION = '1.0.1'
     protocol = 'iSCSI'
 
     def __init__(self, *args, **kwargs):
@@ -98,6 +109,7 @@ class ZFSSAISCSIDriver(driver.ISCSIDriver):
         self.configuration.append_config_values(ZFSSA_OPTS)
         self.configuration.append_config_values(san.san_opts)
         self.zfssa = None
+        self.tgt_zfssa = None
         self._stats = None
         self.tgtiqn = None
 
@@ -113,11 +125,13 @@ class ZFSSAISCSIDriver(driver.ISCSIDriver):
         lcfg = self.configuration
         LOG.info(_LI('Connecting to host: %s.'), lcfg.san_ip)
         self.zfssa = factory_zfssa()
+        self.tgt_zfssa = factory_zfssa()
         self.zfssa.set_host(lcfg.san_ip, timeout=lcfg.zfssa_rest_timeout)
         auth_str = base64.encodestring('%s:%s' %
                                        (lcfg.san_login,
                                         lcfg.san_password))[:-1]
         self.zfssa.login(auth_str)
+
         self.zfssa.create_project(lcfg.zfssa_pool, lcfg.zfssa_project,
                                   compression=lcfg.zfssa_lun_compression,
                                   logbias=lcfg.zfssa_lun_logbias)
@@ -351,6 +365,20 @@ class ZFSSAISCSIDriver(driver.ISCSIDriver):
         if avail is None or total is None:
             return
 
+        host = lcfg.san_ip
+        pool = lcfg.zfssa_pool
+        project = lcfg.zfssa_project
+        auth_str = base64.encodestring('%s:%s' %
+                                       (lcfg.san_login,
+                                        lcfg.san_password))[:-1]
+        zfssa_tgt_group = lcfg.zfssa_target_group
+        repl_ip = lcfg.zfssa_replication_ip
+
+        data['location_info'] = "%s:%s:%s:%s:%s:%s" % (host, auth_str, pool,
+                                                       project,
+                                                       zfssa_tgt_group,
+                                                       repl_ip)
+
         data['total_capacity_gb'] = int(total) / units.Gi
         data['free_capacity_gb'] = int(avail) / units.Gi
         data['reserved_percentage'] = 0
@@ -486,3 +514,234 @@ class ZFSSAISCSIDriver(driver.ISCSIDriver):
                 result.update({prop: val})
 
         return result
+
+    def migrate_volume(self, ctxt, volume, host):
+        LOG.debug('Attempting ZFSSA enabled volume migration. volume: %(id)s, '
+                  'host: %(host)s, status=%(status)s.',
+                  {'id': volume['id'],
+                   'host': host,
+                   'status': volume['status']})
+
+        lcfg = self.configuration
+        default_ret = (False, None)
+
+        if volume['status'] != "available":
+            LOG.debug('Only available volumes can be migrated using backend '
+                      'assisted migration. Defaulting to generic migration.')
+            return default_ret
+
+        if (host['capabilities']['vendor_name'] != 'Oracle' or
+                host['capabilities']['storage_protocol'] != self.protocol):
+            LOG.debug('Source and destination drivers need to be Oracle iSCSI '
+                      'to use backend assisted migration. Defaulting to '
+                      'generic migration.')
+            return default_ret
+
+        if 'location_info' not in host['capabilities']:
+            LOG.debug('Could not find location_info in capabilities reported '
+                      'by the destination driver. Defaulting to generic '
+                      'migration.')
+            return default_ret
+
+        loc_info = host['capabilities']['location_info']
+
+        try:
+            (tgt_host, auth_str, tgt_pool, tgt_project, tgt_tgtgroup,
+             tgt_repl_ip) = loc_info.split(':')
+        except ValueError:
+            LOG.error(_LE("Location info needed for backend enabled volume "
+                          "migration not in correct format: %s. Continuing "
+                          "with generic volume migration."), loc_info)
+            return default_ret
+
+        if tgt_repl_ip == '':
+            msg = _LE("zfssa_replication_ip not set in cinder.conf. "
+                      "zfssa_replication_ip is needed for backend enabled "
+                      "volume migration. Continuing with generic volume "
+                      "migration.")
+            LOG.error(msg)
+            return default_ret
+
+        src_pool = lcfg.zfssa_pool
+        src_project = lcfg.zfssa_project
+
+        try:
+            LOG.info(_LI('Connecting to target host: %s for backend enabled '
+                         'migration.'), tgt_host)
+            self.tgt_zfssa.set_host(tgt_host)
+            self.tgt_zfssa.login(auth_str)
+
+            # Verify that the replication service is online
+            try:
+                self.zfssa.verify_service('replication')
+                self.tgt_zfssa.verify_service('replication')
+            except exception.VolumeBackendAPIException:
+                return default_ret
+
+            # ensure that a target group by the same name exists on the target
+            # system also, if not, use default migration.
+            lun = self.zfssa.get_lun(src_pool, src_project, volume['name'])
+
+            if lun['targetgroup'] != tgt_tgtgroup:
+                return default_ret
+
+            tgt_asn = self.tgt_zfssa.get_asn()
+            src_asn = self.zfssa.get_asn()
+
+            # verify on the source system that the destination has been
+            # registered as a replication target
+            tgts = self.zfssa.get_replication_targets()
+            targets = []
+            for target in tgts['targets']:
+                if target['asn'] == tgt_asn:
+                    targets.append(target)
+
+            if targets == []:
+                LOG.debug('Target host: %(host)s for volume migration '
+                          'not configured as a replication target '
+                          'for volume: %(vol)s.',
+                          {'host': tgt_repl_ip,
+                           'vol': volume['name']})
+                return default_ret
+
+            # Multiple ips from the same appliance may be configured
+            # as different targets
+            for target in targets:
+                if target['address'] == tgt_repl_ip + ':216':
+                    break
+
+            if target['address'] != tgt_repl_ip + ':216':
+                LOG.debug('Target with replication ip: %s not configured on '
+                          'the source appliance for backend enabled volume '
+                          'migration. Proceeding with default migration.',
+                          tgt_repl_ip)
+                return default_ret
+
+            flow = lf.Flow('zfssa_volume_migration').add(
+                MigrateVolumeInit(),
+                MigrateVolumeCreateAction(provides='action_id'),
+                MigrateVolumeSendReplUpdate(),
+                MigrateVolumeSeverRepl(),
+                MigrateVolumeMoveVol(),
+                MigrateVolumeCleanUp()
+            )
+            taskflow.engines.run(flow,
+                                 store={'driver': self,
+                                        'tgt_zfssa': self.tgt_zfssa,
+                                        'tgt_pool': tgt_pool,
+                                        'tgt_project': tgt_project,
+                                        'volume': volume, 'tgt_asn': tgt_asn,
+                                        'src_zfssa': self.zfssa,
+                                        'src_asn': src_asn,
+                                        'src_pool': src_pool,
+                                        'src_project': src_project,
+                                        'target': target})
+
+            return(True, None)
+
+        except Exception:
+            LOG.error(_LE("Error migrating volume: %s"), volume['name'])
+            raise
+
+    def update_migrated_volume(self, ctxt, volume, new_volume,
+                               original_volume_status):
+        """Return model update for migrated volume.
+
+        :param volume: The original volume that was migrated to this backend
+        :param new_volume: The migration volume object that was created on
+                           this backend as part of the migration process
+        :param original_volume_status: The status of the original volume
+        :return model_update to update DB with any needed changes
+        """
+
+        lcfg = self.configuration
+        original_name = CONF.volume_name_template % volume['id']
+        current_name = CONF.volume_name_template % new_volume['id']
+
+        LOG.debug('Renaming migrated volume: %(cur)s to %(org)s',
+                  {'cur': current_name,
+                   'org': original_name})
+        self.zfssa.set_lun_props(lcfg.zfssa_pool, lcfg.zfssa_project,
+                                 current_name, name=original_name)
+        return {'_name_id': None}
+
+
+class MigrateVolumeInit(task.Task):
+    def execute(self, src_zfssa, volume, src_pool, src_project):
+        LOG.debug('Setting inherit flag on source backend to False.')
+        src_zfssa.edit_inherit_replication_flag(src_pool, src_project,
+                                                volume['name'], set=False)
+
+    def revert(self, src_zfssa, volume, src_pool, src_project, **kwargs):
+        LOG.debug('Rollback: Setting inherit flag on source appliance to '
+                  'True.')
+        src_zfssa.edit_inherit_replication_flag(src_pool, src_project,
+                                                volume['name'], set=True)
+
+
+class MigrateVolumeCreateAction(task.Task):
+    def execute(self, src_zfssa, volume, src_pool, src_project, target,
+                tgt_pool):
+        LOG.debug('Creating replication action on source appliance.')
+        action_id = src_zfssa.create_replication_action(src_pool,
+                                                        src_project,
+                                                        target['label'],
+                                                        tgt_pool,
+                                                        volume['name'])
+
+        self._action_id = action_id
+        return action_id
+
+    def revert(self, src_zfssa, **kwargs):
+        if hasattr(self, '_action_id'):
+            LOG.debug('Rollback: deleting replication action on source '
+                      'appliance.')
+            src_zfssa.delete_replication_action(self._action_id)
+
+
+class MigrateVolumeSendReplUpdate(task.Task):
+    def execute(self, src_zfssa, action_id):
+        LOG.debug('Sending replication update from source appliance.')
+        src_zfssa.send_repl_update(action_id)
+        LOG.debug('Deleting replication action on source appliance.')
+        src_zfssa.delete_replication_action(action_id)
+        self._action_deleted = True
+
+
+class MigrateVolumeSeverRepl(task.Task):
+    def execute(self, tgt_zfssa, src_asn, action_id, driver):
+        source = tgt_zfssa.get_replication_source(src_asn)
+        if not source:
+            err = (_('Source with host ip/name: %s not found on the '
+                     'target appliance for backend enabled volume '
+                     'migration, procedding with default migration.'),
+                   driver.configuration.san_ip)
+            LOG.error(err)
+            raise exception.VolumeBackendAPIException(data=err)
+        LOG.debug('Severing replication package on destination appliance.')
+        tgt_zfssa.sever_replication(action_id, source['name'],
+                                    project=action_id)
+
+
+class MigrateVolumeMoveVol(task.Task):
+    def execute(self, tgt_zfssa, tgt_pool, tgt_project, action_id, volume):
+        LOG.debug('Moving LUN to destination project on destination '
+                  'appliance.')
+        tgt_zfssa.move_volume(tgt_pool, action_id, volume['name'], tgt_project)
+        LOG.debug('Deleting temporary project on destination appliance.')
+        tgt_zfssa.delete_project(tgt_pool, action_id)
+        self._project_deleted = True
+
+    def revert(self, tgt_zfssa, tgt_pool, tgt_project, action_id, volume,
+               **kwargs):
+        if not hasattr(self, '_project_deleted'):
+            LOG.debug('Rollback: deleting temporary project on destination '
+                      'appliance.')
+            tgt_zfssa.delete_project(tgt_pool, action_id)
+
+
+class MigrateVolumeCleanUp(task.Task):
+    def execute(self, driver, volume, tgt_zfssa):
+        LOG.debug('Finally, delete source volume on source appliance.')
+        driver.delete_volume(volume)
+        tgt_zfssa.logout()
index 9571735dcd04fe4703545f3da09b29aad08c470f..55ce98fed96b5b13836a6bf901199d404eebaff0 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
 #
 # Licensed under the Apache License, Version 2.0 (the "License"); you may
 # not use this file except in compliance with the License. You may obtain
@@ -64,7 +64,13 @@ def factory_zfssa():
 
 
 class ZFSSANFSDriver(nfs.NfsDriver):
-    VERSION = '1.0.0'
+    """ZFSSA Cinder NFS volume driver.
+
+    Version history:
+    1.0.1: Backend enabled volume migration.
+    """
+
+    VERSION = '1.0.1'
     volume_backend_name = 'ZFSSA_NFS'
     protocol = driver_prefix = driver_volume_type = 'nfs'
 
@@ -280,12 +286,16 @@ class ZFSSANFSDriver(nfs.NfsDriver):
         """Get volume stats from zfssa"""
         self._ensure_shares_mounted()
         data = {}
+        lcfg = self.configuration
         backend_name = self.configuration.safe_get('volume_backend_name')
         data['volume_backend_name'] = backend_name or self.__class__.__name__
         data['vendor_name'] = 'Oracle'
         data['driver_version'] = self.VERSION
         data['storage_protocol'] = self.protocol
 
+        asn = self.zfssa.get_asn()
+        data['location_info'] = '%s:%s' % (asn, lcfg.zfssa_nfs_share)
+
         free, used = self._get_share_capacity_info()
         capacity = float(free) + float(used)
         ratio_used = used / capacity
@@ -301,3 +311,73 @@ class ZFSSANFSDriver(nfs.NfsDriver):
         data['free_capacity_gb'] = float(free) / units.Gi
 
         self._stats = data
+
+    def migrate_volume(self, ctxt, volume, host):
+        LOG.debug('Attempting ZFSSA enabled volume migration. volume: %(id)s, '
+                  'host: %(host)s, status=%(status)s',
+                  {'id': volume['id'],
+                   'host': host,
+                   'status': volume['status']})
+
+        lcfg = self.configuration
+        default_ret = (False, None)
+
+        if volume['status'] != "available":
+            LOG.debug('Only available volumes can be migrated using backend '
+                      'assisted migration. Defaulting to generic migration.')
+            return default_ret
+
+        if (host['capabilities']['vendor_name'] != 'Oracle' or
+                host['capabilities']['storage_protocol'] != self.protocol):
+            LOG.debug('Source and destination drivers need to be Oracle iSCSI '
+                      'to use backend assisted migration. Defaulting to '
+                      'generic migration.')
+            return default_ret
+
+        if 'location_info' not in host['capabilities']:
+            LOG.debug('Could not find location_info in capabilities reported '
+                      'by the destination driver. Defaulting to generic '
+                      'migration.')
+            return default_ret
+
+        loc_info = host['capabilities']['location_info']
+
+        try:
+            (tgt_asn, tgt_share) = loc_info.split(':')
+        except ValueError:
+            LOG.error(_LE("Location info needed for backend enabled volume "
+                          "migration not in correct format: %s. Continuing "
+                          "with generic volume migration."), loc_info)
+            return default_ret
+
+        src_asn = self.zfssa.get_asn()
+
+        if tgt_asn == src_asn and lcfg.zfssa_nfs_share == tgt_share:
+            LOG.info(_LI('Source and destination ZFSSA shares are the same. '
+                         'Do nothing. volume: %s'), volume['name'])
+            return (True, None)
+
+        return (False, None)
+
+    def update_migrated_volume(self, ctxt, volume, new_volume,
+                               original_volume_status):
+        """Return model update for migrated volume.
+
+        :param volume: The original volume that was migrated to this backend
+        :param new_volume: The migration volume object that was created on
+                           this backend as part of the migration process
+        :param original_volume_status: The status of the original volume
+        :return model_update to update DB with any needed changes
+        """
+
+        original_name = CONF.volume_name_template % volume['id']
+        current_name = CONF.volume_name_template % new_volume['id']
+
+        LOG.debug('Renaming migrated volume: %(cur)s to %(org)s.',
+                  {'cur': current_name,
+                   'org': original_name})
+        self.zfssa.create_volume_from_snapshot_file(src_file=current_name,
+                                                    dst_file=original_name,
+                                                    method='MOVE')
+        provider_location = new_volume['provider_location']
+        return {'_name_id': None, 'provider_location': provider_location}
index 1c7dd4a005b29cc137b37f68f26c27b155df1738..5fac3de0796990c330072817dffb8cc550e8b656 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -17,6 +17,7 @@ ZFS Storage Appliance Proxy
 import json
 
 from oslo_log import log
+from oslo_service import loopingcall
 
 from cinder import exception
 from cinder.i18n import _, _LE
@@ -71,6 +72,286 @@ class ZFSSAApi(object):
         if self.rclient and not self.rclient.islogin():
             self.rclient.login(auth_str)
 
+    def logout(self):
+        self.rclient.logout()
+
+    def verify_service(self, service, status='online'):
+        """Checks whether a service is online or not"""
+        svc = '/api/service/v1/services/' + service
+        ret = self.rclient.get(svc)
+
+        if ret.status != restclient.Status.OK:
+            exception_msg = (_('Error Verifying '
+                               'Service: %(service)s '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s.')
+                             % {'service': service,
+                                'ret.status': ret.status,
+                                'ret.data': ret.data})
+
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+        data = json.loads(ret.data)['service']
+
+        if data['<status>'] != status:
+            exception_msg = (_('%(service)s Service is not %(status)s '
+                               'on storage appliance: %(host)s')
+                             % {'service': service,
+                                'status': status,
+                                'host': self.host})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+    def get_asn(self):
+        """Returns appliance asn."""
+        svc = '/api/system/v1/version'
+        ret = self.rclient.get(svc)
+        if ret.status != restclient.Status.OK:
+            exception_msg = (_('Error getting appliance version details. '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s .')
+                             % {'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+        val = json.loads(ret.data)
+        return val['version']['asn']
+
+    def get_replication_targets(self):
+        """Returns all replication targets configured on the appliance."""
+        svc = '/api/storage/v1/replication/targets'
+        ret = self.rclient.get(svc)
+        if ret.status != restclient.Status.OK:
+            exception_msg = (_('Error getting replication target details. '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s .')
+                             % {'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+        val = json.loads(ret.data)
+        return val
+
+    def edit_inherit_replication_flag(self, pool, project, volume, set=True):
+        """Edit the inherit replication flag for volume."""
+        svc = ('/api/storage/v1/pools/%(pool)s/projects/%(project)s'
+               '/filesystems/%(volume)s/replication'
+               % {'pool': pool,
+                  'project': project,
+                  'volume': volume})
+        arg = {'inherited': set}
+        ret = self.rclient.put(svc, arg)
+
+        if ret.status != restclient.Status.ACCEPTED:
+            exception_msg = (_('Error setting replication inheritance '
+                               'to %(set)s '
+                               'for volume: %(vol)s '
+                               'project %(project)s '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s .')
+                             % {'set': set,
+                                'project': project,
+                                'vol': volume,
+                                'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+    def create_replication_action(self, host_pool, host_project, tgt_name,
+                                  tgt_pool, volume):
+        """Create a replication action."""
+        arg = {'pool': host_pool,
+               'project': host_project,
+               'target_pool': tgt_pool,
+               'target': tgt_name}
+
+        if volume is not None:
+            arg.update({'share': volume})
+
+        svc = '/api/storage/v1/replication/actions'
+        ret = self.rclient.post(svc, arg)
+        if ret.status != restclient.Status.CREATED:
+            exception_msg = (_('Error Creating replication action on: '
+                               'pool: %(pool)s '
+                               'Project: %(proj)s '
+                               'volume: %(vol)s '
+                               'for target: %(tgt)s and pool: %(tgt_pool)s'
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s .')
+                             % {'pool': host_pool,
+                                'proj': host_project,
+                                'vol': volume,
+                                'tgt': tgt_name,
+                                'tgt_pool': tgt_pool,
+                                'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+        val = json.loads(ret.data)
+        return val['action']['id']
+
+    def delete_replication_action(self, action_id):
+        """Delete a replication action."""
+        svc = '/api/storage/v1/replication/actions/%s' % action_id
+        ret = self.rclient.delete(svc)
+        if ret.status != restclient.Status.NO_CONTENT:
+            exception_msg = (_('Error Deleting '
+                               'replication action: %(id)s '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s.')
+                             % {'id': action_id,
+                                'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+    def send_repl_update(self, action_id):
+        """Send replication update
+
+           Send replication update to the target appliance and then wait for
+           it to complete.
+        """
+
+        svc = '/api/storage/v1/replication/actions/%s/sendupdate' % action_id
+        ret = self.rclient.put(svc)
+        if ret.status != restclient.Status.ACCEPTED:
+            exception_msg = (_('Error sending replication update '
+                               'for action id: %(id)s . '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s .')
+                             % {'id': action_id,
+                                'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+        def _loop_func():
+            svc = '/api/storage/v1/replication/actions/%s' % action_id
+            ret = self.rclient.get(svc)
+            if ret.status != restclient.Status.OK:
+                exception_msg = (_('Error getting replication action: %(id)s. '
+                                   'Return code: %(ret.status)d '
+                                   'Message: %(ret.data)s .')
+                                 % {'id': action_id,
+                                    'ret.status': ret.status,
+                                    'ret.data': ret.data})
+                LOG.error(exception_msg)
+                raise exception.VolumeBackendAPIException(data=exception_msg)
+
+            val = json.loads(ret.data)
+            if val['action']['last_result'] == 'success':
+                raise loopingcall.LoopingCallDone()
+            elif (val['action']['last_result'] == '<unknown>' and
+                    val['action']['state'] == 'sending'):
+                pass
+            else:
+                exception_msg = (_('Error sending replication update. '
+                                   'Returned error: %(err)s. '
+                                   'Action: %(id)s.')
+                                 % {'err': val['action']['last_result'],
+                                    'id': action_id})
+                LOG.error(exception_msg)
+                raise exception.VolumeBackendAPIException(data=exception_msg)
+
+        timer = loopingcall.FixedIntervalLoopingCall(_loop_func)
+        timer.start(interval=5).wait()
+
+    def get_replication_source(self, asn):
+        """Return the replication source json which has a matching asn."""
+        svc = "/api/storage/v1/replication/sources"
+        ret = self.rclient.get(svc)
+        if ret.status != restclient.Status.OK:
+            exception_msg = (_('Error getting replication source details. '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s .')
+                             % {'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+        val = json.loads(ret.data)
+
+        for source in val['sources']:
+            if source['asn'] == asn:
+                return source
+        return None
+
+    def sever_replication(self, package, src_name, project=None):
+        """Sever Replication at the destination.
+
+           This method will sever the package and move the volume to a project,
+           if project name is not passed in then the package name is selected
+           as the project name
+        """
+
+        svc = ('/api/storage/v1/replication/sources/%(src)s/packages/%(pkg)s'
+               '/sever' % {'src': src_name, 'pkg': package})
+
+        if not project:
+            project = package
+
+        arg = {'projname': project}
+        ret = self.rclient.put(svc, arg)
+
+        if ret.status != restclient.Status.ACCEPTED:
+            exception_msg = (_('Error severing the package: %(package)s '
+                               'from source: %(src)s '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s .')
+                             % {'package': package,
+                                'src': src_name,
+                                'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+    def move_volume(self, pool, project, volume, tgt_project):
+        """Move a LUN from one project to another within the same pool."""
+        svc = ('/api/storage/v1/pools/%(pool)s/projects/%(project)s'
+               '/filesystems/%(volume)s' % {'pool': pool,
+                                            'project': project,
+                                            'volume': volume})
+
+        arg = {'project': tgt_project}
+
+        ret = self.rclient.put(svc, arg)
+        if ret.status != restclient.Status.ACCEPTED:
+            exception_msg = (_('Error moving volume: %(vol)s '
+                               'from source project: %(src)s '
+                               'to target project: %(tgt)s '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s .')
+                             % {'vol': volume,
+                                'src': project,
+                                'tgt': tgt_project,
+                                'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
+    def delete_project(self, pool, project):
+        """Delete a project."""
+        svc = ('/api/storage/v1/pools/%(pool)s/projects/%(project)s' %
+               {'pool': pool,
+                'project': project})
+        ret = self.rclient.delete(svc)
+        if ret.status != restclient.Status.NO_CONTENT:
+            exception_msg = (_('Error Deleting '
+                               'project: %(project)s '
+                               'on pool: %(pool)s '
+                               'Return code: %(ret.status)d '
+                               'Message: %(ret.data)s.')
+                             % {'project': project,
+                                'pool': pool,
+                                'ret.status': ret.status,
+                                'ret.data': ret.data})
+            LOG.error(exception_msg)
+            raise exception.VolumeBackendAPIException(data=exception_msg)
+
     def get_pool_stats(self, pool):
         """Get pool stats.
 
@@ -448,7 +729,8 @@ class ZFSSAApi(object):
             'number': val['lun']['assignednumber'],
             'initiatorgroup': val['lun']['initiatorgroup'],
             'size': val['lun']['volsize'],
-            'nodestroy': val['lun']['nodestroy']
+            'nodestroy': val['lun']['nodestroy'],
+            'targetgroup': val['lun']['targetgroup']
         }
         if 'origin' in val['lun']:
             ret.update({'origin': val['lun']['origin']})
@@ -780,34 +1062,6 @@ class ZFSSANfsApi(ZFSSAApi):
         self._change_service_state(service, state='disable')
         self.verify_service(service, status='offline')
 
-    def verify_service(self, service, status='online'):
-        """Checks whether a service is online or not"""
-        svc = self.services_path + service
-        ret = self.rclient.get(svc)
-
-        if ret.status != restclient.Status.OK:
-            exception_msg = (_('Error Verifying '
-                               'Service: %(service)s '
-                               'Return code: %(ret.status)d '
-                               'Message: %(ret.data)s.')
-                             % {'service': service,
-                                'ret.status': ret.status,
-                                'ret.data': ret.data})
-
-            LOG.error(exception_msg)
-            raise exception.VolumeBackendAPIException(data=exception_msg)
-
-        data = json.loads(ret.data)['service']
-
-        if data['<status>'] != status:
-            exception_msg = (_('%(service)s Service is not %(status)s '
-                               'on storage appliance: %(host)s')
-                             % {'service': service,
-                                'status': status,
-                                'host': self.host})
-            LOG.error(exception_msg)
-            raise exception.VolumeBackendAPIException(data=exception_msg)
-
     def modify_service(self, service, edit_args=None):
         """Edit service properties"""
         if edit_args is None: