]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Huawei: Implement v2 replication (managed)
authorchenzongliang <chenzongliang@huawei.com>
Sat, 12 Dec 2015 09:11:55 +0000 (17:11 +0800)
committerneochin <jiangliucheng@huawei.com>
Mon, 8 Feb 2016 10:53:17 +0000 (18:53 +0800)
This patch implements the managed side of v2 replication in the
HuaweiDriver. Both Synchronous mode and Asynchronous mode are
supported with Huawei arrays. The volume type need to associate with
the extra spec with 'replication_enabled' equaling to '<is> True', and
'replication_type' equaling to '<in> sync' or '<in> async'. If
'replication_type' is not provided, it will be defaulted to
Asynchronous mode.

What are supported with replication:
1. create volume
2. create volume from snapshot
3. clone a volume
4. create volume from image
5. volume retype
6. volume migration

So far we support a single remote device, and 'cinder.conf' should
configure a replication local backend and a replication remote backend
as follows:

[replica]
volume_driver =
    cinder.volume.drivers.huawei.huawei_driver.HuaweiISCSIDriver
cinder_huawei_conf_file = /etc/cinder/cinder_huawei_conf.xml
volume_backend_name = replica
replication_device = target_device_id:huawei-replica-1,
    managed_backend_name:host@replica-remote#pool,
    san_address:san_url_1;san_url_2,
    san_user:admin,san_password:passwd

[replica-remote]
volume_driver =
    cinder.volume.drivers.huawei.huawei_driver.HuaweiISCSIDriver
cinder_huawei_conf_file = /etc/cinder/cinder_huawei_conf_remote.xml
volume_backend_name = replica-remote
replication_device = target_device_id:huawei-replica-2,
    managed_backend_name:host@replica#pool,
    san_address:san_url_1;san_url_2,
    san_user:admin,san_password:passwd

Change-Id: Iaa3834547f80e7687f7e1946cb10d35f9ff0c136
Implements: blueprint support-replication-for-huawei-volume-driver

cinder/tests/unit/test_huawei_drivers.py
cinder/volume/drivers/huawei/constants.py
cinder/volume/drivers/huawei/huawei_conf.py
cinder/volume/drivers/huawei/huawei_driver.py
cinder/volume/drivers/huawei/replication.py [new file with mode: 0644]
cinder/volume/drivers/huawei/rest_client.py
releasenotes/notes/support-replication-for-huawei-volume-driver-ad61e9f5eba9c422.yaml [new file with mode: 0644]

index f68f8ae4c10e1fa2cac5d5cc4b7e7f293dd4e8a5..6f2bef9fe450e5068c00947656e99c1a4a4a82a7 100644 (file)
@@ -31,6 +31,7 @@ from cinder.volume.drivers.huawei import fc_zone_helper
 from cinder.volume.drivers.huawei import huawei_conf
 from cinder.volume.drivers.huawei import huawei_driver
 from cinder.volume.drivers.huawei import hypermetro
+from cinder.volume.drivers.huawei import replication
 from cinder.volume.drivers.huawei import rest_client
 from cinder.volume.drivers.huawei import smartx
 
@@ -98,6 +99,31 @@ hyper_volume = {'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
                                      'value': '11'}],
                 }
 
+sync_replica_specs = {'replication_enabled': '<is> True',
+                      'replication_type': '<in> sync'}
+async_replica_specs = {'replication_enabled': '<is> True',
+                       'replication_type': '<in> async'}
+
+TEST_PAIR_ID = "3400a30d844d0004"
+replication_volume = {
+    'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
+    'size': 2,
+    'volume_name': 'vol1',
+    'id': '21ec7341-9256-497b-97d9-ef48edcf0635',
+    'volume_id': '21ec7341-9256-497b-97d9-ef48edcf0635',
+    'provider_auth': None,
+    'project_id': 'project',
+    'display_name': 'vol1',
+    'display_description': 'test volume',
+    'volume_type_id': None,
+    'host': 'ubuntu@huawei#OpenStack_Pool',
+    'provider_location': '11',
+    'metadata': {'lun_wwn': '6643e8c1004c5f6723e9f454003'},
+    'replication_status': 'disabled',
+    'replication_driver_data':
+        '{"pair_id": "%s", "rmt_lun_id": "1"}' % TEST_PAIR_ID,
+}
+
 test_snap = {'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
              'size': 1,
              'volume_name': 'vol1',
@@ -157,6 +183,22 @@ test_new_type = {
     'description': None,
 }
 
+test_new_replication_type = {
+    'name': u'new_type',
+    'qos_specs_id': None,
+    'deleted': False,
+    'created_at': None,
+    'updated_at': None,
+    'extra_specs': {
+        'replication_enabled': '<is> True',
+        'replication_type': '<in> sync',
+    },
+    'is_public': True,
+    'deleted_at': None,
+    'id': u'530a56e1-a1a4-49f3-ab6c-779a6e5d999f',
+    'description': None,
+}
+
 hypermetro_devices = """
 {
     "remote_device": {
@@ -279,7 +321,13 @@ FAKE_LUN_INFO_RESPONSE = """
     },
     "data": {
         "ID": "1",
-        "NAME": "5mFHcBv4RkCcD+JyrWc0SA"
+        "NAME": "5mFHcBv4RkCcD+JyrWc0SA",
+        "WWN": "6643e8c1004c5f6723e9f454003",
+        "DESCRIPTION": "21ec7341-9256-497b-97d9-ef48edcf0635",
+        "HEALTHSTATUS": "1",
+        "RUNNINGSTATUS": "27",
+        "ALLOCTYPE": "1",
+        "CAPACITY": "2097152"
     }
 }
 """
@@ -294,7 +342,7 @@ FAKE_LUN_GET_SUCCESS_RESPONSE = """
         "IOCLASSID": "11",
         "NAME": "5mFHcBv4RkCcD+JyrWc0SA",
         "DESCRIPTION": "21ec7341-9256-497b-97d9-ef48edcf0635",
-        "RUNNINGSTATUS": "2",
+        "RUNNINGSTATUS": "10",
         "HEALTHSTATUS": "1",
         "RUNNINGSTATUS": "27",
         "LUNLIST": "",
@@ -309,7 +357,9 @@ FAKE_LUN_GET_SUCCESS_RESPONSE = """
         "WRITECACHEPOLICY": "5",
         "OWNINGCONTROLLER": "0B",
         "SMARTCACHEPARTITIONID": "",
-        "CACHEPARTITIONID": ""
+        "CACHEPARTITIONID": "",
+        "WWN": "6643e8c1004c5f6723e9f454003",
+        "PARENTNAME": "OpenStack_Pool"
     }
 }
 """
@@ -938,7 +988,8 @@ FAKE_SYSTEM_VERSION_RESPONSE = """
         "code": 0
     },
     "data":{
-        "PRODUCTVERSION": "V100R001C10"
+        "PRODUCTVERSION": "V100R001C10",
+        "wwn": "21003400a30d844d"
     }
 }
 """
@@ -1603,6 +1654,148 @@ MAP_COMMAND_TO_FAKE_RESPONSE['/fc_port/associate?TYPE=213&ASSOCIATEOBJTYPE='
     FAKE_PORTS_IN_PG_RESPONSE)
 
 
+# Replication response
+FAKE_GET_REMOTEDEV_RESPONSE = """
+{
+    "data":[{
+        "ARRAYTYPE":"1",
+        "HEALTHSTATUS":"1",
+        "ID":"0",
+        "NAME":"Huawei.Storage",
+        "RUNNINGSTATUS":"1",
+        "WWN":"21003400a30d844d"
+    }],
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/remote_device/GET'] = (
+    FAKE_GET_REMOTEDEV_RESPONSE)
+
+FAKE_CREATE_PAIR_RESPONSE = """
+{
+    "data":{
+        "ID":"%s"
+    },
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+""" % TEST_PAIR_ID
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/POST'] = (
+    FAKE_CREATE_PAIR_RESPONSE)
+
+FAKE_DELETE_PAIR_RESPONSE = """
+{
+    "data":{},
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/DELETE' % TEST_PAIR_ID] = (
+    FAKE_DELETE_PAIR_RESPONSE)
+
+FAKE_SET_PAIR_ACCESS_RESPONSE = """
+{
+    "data":{},
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/PUT' % TEST_PAIR_ID] = (
+    FAKE_SET_PAIR_ACCESS_RESPONSE)
+
+FAKE_GET_PAIR_NORMAL_RESPONSE = """
+{
+    "data":{
+        "REPLICATIONMODEL": "1",
+        "RUNNINGSTATUS": "1",
+        "SECRESACCESS": "2",
+        "HEALTHSTATUS": "1",
+        "ISPRIMARY": "true"
+    },
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+"""
+
+FAKE_GET_PAIR_SPLIT_RESPONSE = """
+{
+    "data":{
+        "REPLICATIONMODEL": "1",
+        "RUNNINGSTATUS": "26",
+        "SECRESACCESS": "2",
+        "ISPRIMARY": "true"
+    },
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+"""
+
+FAKE_GET_PAIR_SYNC_RESPONSE = """
+{
+    "data":{
+        "REPLICATIONMODEL": "1",
+        "RUNNINGSTATUS": "23",
+        "SECRESACCESS": "2"
+    },
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/GET' % TEST_PAIR_ID] = (
+    FAKE_GET_PAIR_NORMAL_RESPONSE)
+
+FAKE_SYNC_PAIR_RESPONSE = """
+{
+    "data":{},
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/sync/PUT'] = (
+    FAKE_SYNC_PAIR_RESPONSE)
+
+FAKE_SPLIT_PAIR_RESPONSE = """
+{
+    "data":{},
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/split/PUT'] = (
+    FAKE_SPLIT_PAIR_RESPONSE)
+
+FAKE_SWITCH_PAIR_RESPONSE = """
+{
+    "data":{},
+    "error":{
+        "code":0,
+        "description":"0"
+    }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/switch/PUT'] = (
+    FAKE_SWITCH_PAIR_RESPONSE)
+
+
 def Fake_sleep(time):
     pass
 
@@ -1612,7 +1805,14 @@ class FakeHuaweiConf(object):
         self.conf = conf
         self.protocol = protocol
 
+    def safe_get(self, key):
+        try:
+            return getattr(self.conf, key)
+        except Exception:
+            return
+
     def update_config_value(self):
+        setattr(self.conf, 'volume_backend_name', 'huawei_storage')
         setattr(self.conf, 'san_address',
                 ['http://100.115.10.69:8082/deviceManager/rest/'])
         setattr(self.conf, 'san_user', 'admin')
@@ -1646,6 +1846,16 @@ class FakeHuaweiConf(object):
                       'TargetPortGroup': 'portgroup-test', }
         setattr(self.conf, 'iscsi_info', [iscsi_info])
 
+        targets = [{'target_device_id': 'huawei-replica-1',
+                    'managed_backend_name': 'ubuntu@huawei2#OpenStack_Pool',
+                    'san_address':
+                        'https://100.97.10.69:8088/deviceManager/rest/',
+                    'san_user': 'admin',
+                    'san_password': 'Admin@storage1'}]
+        setattr(self.conf, 'replication_device', targets)
+
+        setattr(self.conf, 'safe_get', self.safe_get)
+
 
 class FakeClient(rest_client.RestClient):
 
@@ -1717,6 +1927,11 @@ class FakeDB(object):
         return volumes
 
 
+class FakeReplicaPairManager(replication.ReplicaPairManager):
+    def _init_rmt_client(self):
+        self.rmt_client = FakeClient(self.conf)
+
+
 class FakeISCSIStorage(huawei_driver.HuaweiISCSIDriver):
     """Fake Huawei Storage, Rewrite some methods of HuaweiISCSIDriver."""
 
@@ -1734,6 +1949,7 @@ class FakeISCSIStorage(huawei_driver.HuaweiISCSIDriver):
                                                  self.rmt_client,
                                                  self.configuration,
                                                  self.db)
+        self.replica = FakeReplicaPairManager(self.client, self.configuration)
 
 
 class FakeFCStorage(huawei_driver.HuaweiFCDriver):
@@ -1754,6 +1970,7 @@ class FakeFCStorage(huawei_driver.HuaweiFCDriver):
                                                  self.rmt_client,
                                                  self.configuration,
                                                  self.db)
+        self.replica = FakeReplicaPairManager(self.client, self.configuration)
 
 
 @ddt.ddt
@@ -1836,9 +2053,20 @@ class HuaweiISCSIDriverTestCase(test.TestCase):
         self.assertTrue(delete_flag)
 
     def test_create_volume_from_snapsuccess(self):
-        lun_info = self.driver.create_volume_from_snapshot(test_volume,
-                                                           test_volume)
-        self.assertEqual('1', lun_info['provider_location'])
+        self.mock_object(
+            huawei_driver.HuaweiBaseDriver,
+            '_get_volume_type',
+            mock.Mock(return_value={'extra_specs': sync_replica_specs}))
+        self.mock_object(replication.ReplicaCommonDriver, 'sync')
+        model_update = self.driver.create_volume_from_snapshot(test_volume,
+                                                               test_volume)
+        self.assertEqual('1', model_update['provider_location'])
+
+        driver_data = {'pair_id': TEST_PAIR_ID,
+                       'rmt_lun_id': '1'}
+        driver_data = replication.to_string(driver_data)
+        self.assertEqual(driver_data, model_update['replication_driver_data'])
+        self.assertEqual('enabled', model_update['replication_status'])
 
     def test_initialize_connection_success(self):
         iscsi_properties = self.driver.initialize_connection(test_volume,
@@ -1850,7 +2078,7 @@ class HuaweiISCSIDriverTestCase(test.TestCase):
 
     def test_get_volume_status(self):
         data = self.driver.get_volume_stats()
-        self.assertEqual('2.0.3', data['driver_version'])
+        self.assertEqual('2.0.5', data['driver_version'])
 
     def test_extend_volume(self):
 
@@ -2501,6 +2729,311 @@ class HuaweiISCSIDriverTestCase(test.TestCase):
             self.driver.unmanage_snapshot(test_snapshot)
             self.assertEqual(1, mock_rename.call_count)
 
+    def test_init_rmt_client(self):
+        self.mock_object(rest_client, 'RestClient',
+                         mock.Mock(return_value=None))
+        replica = replication.ReplicaPairManager(self.driver.client,
+                                                 self.configuration)
+        self.assertEqual(replica.rmt_pool, 'OpenStack_Pool')
+        self.assertEqual(replica.target_dev_id, 'huawei-replica-1')
+
+    @ddt.data(sync_replica_specs, async_replica_specs)
+    def test_create_replication_success(self, mock_type):
+        self.mock_object(replication.ReplicaCommonDriver, 'sync')
+        self.mock_object(
+            huawei_driver.HuaweiBaseDriver,
+            '_get_volume_type',
+            mock.Mock(return_value={'extra_specs': mock_type}))
+
+        model_update = self.driver.create_volume(replication_volume)
+        driver_data = {'pair_id': TEST_PAIR_ID,
+                       'rmt_lun_id': '1'}
+        driver_data = replication.to_string(driver_data)
+        self.assertEqual(driver_data, model_update['replication_driver_data'])
+        self.assertEqual('enabled', model_update['replication_status'])
+
+    @ddt.data(
+        [
+            rest_client.RestClient,
+            'get_array_info',
+            mock.Mock(
+                side_effect=exception.VolumeBackendAPIException(data='err'))
+        ],
+        [
+            rest_client.RestClient,
+            'get_remote_devices',
+            mock.Mock(
+                side_effect=exception.VolumeBackendAPIException(data='err'))
+        ],
+        [
+            rest_client.RestClient,
+            'get_remote_devices',
+            mock.Mock(return_value={})
+        ],
+        [
+            replication.ReplicaPairManager,
+            'wait_volume_online',
+            mock.Mock(side_effect=[
+                None,
+                exception.VolumeBackendAPIException(data='err')])
+        ],
+        [
+            rest_client.RestClient,
+            'create_pair',
+            mock.Mock(
+                side_effect=exception.VolumeBackendAPIException(data='err'))
+        ],
+        [
+            replication.ReplicaCommonDriver,
+            'sync',
+            mock.Mock(
+                side_effect=exception.VolumeBackendAPIException(data='err'))
+        ],
+    )
+    @ddt.unpack
+    def test_create_replication_fail(self, mock_module, mock_func, mock_value):
+        self.mock_object(
+            huawei_driver.HuaweiBaseDriver,
+            '_get_volume_type',
+            mock.Mock(return_value={'extra_specs': sync_replica_specs}))
+        self.mock_object(replication.ReplicaPairManager, '_delete_pair')
+        self.mock_object(mock_module, mock_func, mock_value)
+        self.assertRaises(
+            exception.VolumeBackendAPIException,
+            self.driver.create_volume, replication_volume)
+
+    def test_delete_replication_success(self):
+        self.mock_object(replication.ReplicaCommonDriver, 'split')
+        self.mock_object(
+            huawei_driver.HuaweiBaseDriver,
+            '_get_volume_type',
+            mock.Mock(return_value={'extra_specs': sync_replica_specs}))
+        self.driver.delete_volume(replication_volume)
+
+        self.mock_object(rest_client.RestClient, 'check_lun_exist',
+                         mock.Mock(return_value=False))
+        self.driver.delete_volume(replication_volume)
+
+    def test_wait_volume_online(self):
+        replica = FakeReplicaPairManager(self.driver.client,
+                                         self.configuration)
+        lun_info = {'ID': '11'}
+
+        replica.wait_volume_online(self.driver.client, lun_info)
+
+        offline_status = {'RUNNINGSTATUS': '28'}
+        replica.wait_volume_online(self.driver.client, lun_info)
+
+        with mock.patch.object(rest_client.RestClient, 'get_lun_info',
+                               offline_status):
+            self.assertRaises(exception.VolumeBackendAPIException,
+                              replica.wait_volume_online,
+                              self.driver.client,
+                              lun_info)
+
+    def test_wait_second_access(self):
+        pair_id = '1'
+        access_ro = constants.REPLICA_SECOND_RO
+        access_rw = constants.REPLICA_SECOND_RW
+        op = replication.PairOp(self.driver.client)
+        common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+        self.mock_object(replication.PairOp, 'get_replica_info',
+                         mock.Mock(return_value={'SECRESACCESS': access_ro}))
+
+        common_driver.wait_second_access(pair_id, access_ro)
+        self.assertRaises(exception.VolumeBackendAPIException,
+                          common_driver.wait_second_access, pair_id, access_rw)
+
+    def test_wait_replica_ready(self):
+        normal_status = {
+            'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_NORMAL,
+            'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
+        }
+        split_status = {
+            'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_SPLIT,
+            'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
+        }
+        sync_status = {
+            'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_SYNC,
+            'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
+        }
+        pair_id = '1'
+        op = replication.PairOp(self.driver.client)
+        common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+        with mock.patch.object(replication.PairOp, 'get_replica_info',
+                               mock.Mock(return_value=normal_status)):
+            common_driver.wait_replica_ready(pair_id)
+
+        with mock.patch.object(
+                replication.PairOp,
+                'get_replica_info',
+                mock.Mock(side_effect=[sync_status, normal_status])):
+            common_driver.wait_replica_ready(pair_id)
+
+        with mock.patch.object(replication.PairOp, 'get_replica_info',
+                               mock.Mock(return_value=split_status)):
+            self.assertRaises(exception.VolumeBackendAPIException,
+                              common_driver.wait_replica_ready, pair_id)
+
+    def test_replication_enable_success(self):
+        self.mock_object(replication.ReplicaCommonDriver, 'unprotect_second')
+        self.mock_object(replication.ReplicaCommonDriver, 'split')
+        self.mock_object(replication.PairOp, 'is_primary',
+                         mock.Mock(side_effect=[False, True]))
+        self.driver.replication_enable(None, replication_volume)
+
+    @ddt.data(
+        [
+            replication.AbsReplicaOp,
+            'is_running_status',
+            mock.Mock(return_value=False)
+        ],
+        [
+            replication,
+            'get_replication_driver_data',
+            mock.Mock(return_value={})
+        ],
+        [
+            replication.PairOp,
+            'get_replica_info',
+            mock.Mock(return_value={})
+        ],
+    )
+    @ddt.unpack
+    def test_replication_enable_fail(self, mock_module, mock_func, mock_value):
+        self.mock_object(mock_module, mock_func, mock_value)
+        self.assertRaises(
+            exception.VolumeBackendAPIException,
+            self.driver.replication_enable, None, replication_volume)
+
+    def test_replication_disable_fail(self):
+        self.assertRaises(
+            exception.VolumeBackendAPIException,
+            self.driver.replication_disable, None, replication_volume)
+
+    def test_replication_disable_success(self):
+        self.mock_object(replication.ReplicaCommonDriver, 'split')
+        self.driver.replication_disable(None, replication_volume)
+
+        self.mock_object(replication, 'get_replication_driver_data',
+                         mock.Mock(return_value={}))
+        self.driver.replication_disable(None, replication_volume)
+
+    def test_replication_failover_success(self):
+        self.mock_object(replication.ReplicaCommonDriver, 'split')
+        self.mock_object(replication.PairOp, 'is_primary',
+                         mock.Mock(return_value=False))
+        model_update = self.driver.replication_failover(
+            None, replication_volume, None)
+        self.assertEqual('ubuntu@huawei2#OpenStack_Pool', model_update['host'])
+        self.assertEqual('1', model_update['provider_location'])
+        driver_data = {'pair_id': TEST_PAIR_ID,
+                       'rmt_lun_id': '11'}
+        driver_data = replication.to_string(driver_data)
+        self.assertEqual(driver_data, model_update['replication_driver_data'])
+
+    @ddt.data(
+        [
+            replication.PairOp,
+            'is_primary',
+            mock.Mock(return_value=True)
+        ],
+        [
+            replication.PairOp,
+            'is_primary',
+            mock.Mock(return_value=False)
+        ],
+        [
+            replication,
+            'get_replication_driver_data',
+            mock.Mock(return_value={})
+        ],
+        [
+            replication,
+            'get_replication_driver_data',
+            mock.Mock(return_value={'pair_id': '1'})
+        ],
+    )
+    @ddt.unpack
+    def test_replication_failover_fail(self,
+                                       mock_module, mock_func, mock_value):
+        self.mock_object(
+            replication.ReplicaCommonDriver,
+            'wait_second_access',
+            mock.Mock(
+                side_effect=exception.VolumeBackendAPIException(data="error")))
+        self.mock_object(mock_module, mock_func, mock_value)
+        self.assertRaises(
+            exception.VolumeBackendAPIException,
+            self.driver.replication_failover,
+            None,
+            replication_volume, None)
+
+    def test_list_replication_targets(self):
+        info = self.driver.list_replication_targets(None, replication_volume)
+        targets = [{'target_device_id': 'huawei-replica-1'}]
+        self.assertEqual(targets, info['targets'])
+
+        self.mock_object(replication, 'get_replication_driver_data',
+                         mock.Mock(return_value={}))
+        info = self.driver.list_replication_targets(None, replication_volume)
+        self.assertEqual(targets, info['targets'])
+
+    @ddt.data(constants.REPLICA_SECOND_RW, constants.REPLICA_SECOND_RO)
+    def test_replication_protect_second(self, mock_access):
+        replica_id = TEST_PAIR_ID
+        op = replication.PairOp(self.driver.client)
+        common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+
+        self.mock_object(replication.ReplicaCommonDriver, 'wait_second_access')
+        self.mock_object(
+            replication.PairOp,
+            'get_replica_info',
+            mock.Mock(return_value={'SECRESACCESS': mock_access}))
+
+        common_driver.protect_second(replica_id)
+        common_driver.unprotect_second(replica_id)
+
+    def test_replication_sync(self):
+        replica_id = TEST_PAIR_ID
+        op = replication.PairOp(self.driver.client)
+        common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+        async_normal_status = {
+            'REPLICATIONMODEL': constants.REPLICA_ASYNC_MODEL,
+            'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_NORMAL,
+            'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
+        }
+
+        self.mock_object(replication.ReplicaCommonDriver, 'protect_second')
+        self.mock_object(replication.PairOp, 'get_replica_info',
+                         mock.Mock(return_value=async_normal_status))
+        common_driver.sync(replica_id, True)
+        common_driver.sync(replica_id, False)
+
+    def test_replication_split(self):
+        replica_id = TEST_PAIR_ID
+        op = replication.PairOp(self.driver.client)
+        common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+
+        self.mock_object(replication.ReplicaCommonDriver, 'wait_expect_state')
+        self.mock_object(replication.PairOp, 'split', mock.Mock(
+            side_effect=exception.VolumeBackendAPIException(data='err')))
+        common_driver.split(replica_id)
+
+    def test_replication_base_op(self):
+        replica_id = '1'
+        op = replication.AbsReplicaOp(None)
+        op.create()
+        op.delete(replica_id)
+        op.protect_second(replica_id)
+        op.unprotect_second(replica_id)
+        op.sync(replica_id)
+        op.split(replica_id)
+        op.switch(replica_id)
+        op.is_primary({})
+        op.get_replica_info(replica_id)
+        op._is_status(None, {'key': 'volue'}, None)
+
 
 class FCSanLookupService(object):
 
@@ -2565,9 +3098,33 @@ class HuaweiFCDriverTestCase(test.TestCase):
         self.assertTrue(self.driver.client.terminateFlag)
 
     def test_get_volume_status(self):
+        remote_device_info = {"ARRAYTYPE": "1",
+                              "HEALTHSTATUS": "1",
+                              "RUNNINGSTATUS": "10"}
+        self.mock_object(
+            replication.ReplicaPairManager,
+            'get_remote_device_by_wwn',
+            mock.Mock(return_value=remote_device_info))
+        data = self.driver.get_volume_stats()
+        self.assertEqual('2.0.5', data['driver_version'])
+        self.assertTrue(data['pools'][0]['replication_enabled'])
+        self.assertListEqual(['sync', 'async'],
+                             data['pools'][0]['replication_type'])
+
+        self.mock_object(
+            replication.ReplicaPairManager,
+            'get_remote_device_by_wwn',
+            mock.Mock(return_value={}))
+        data = self.driver.get_volume_stats()
+        self.assertNotIn('replication_enabled', data['pools'][0])
 
+        self.mock_object(
+            replication.ReplicaPairManager,
+            'try_get_remote_wwn',
+            mock.Mock(return_value={}))
         data = self.driver.get_volume_stats()
-        self.assertEqual('2.0.4', data['driver_version'])
+        self.assertEqual('2.0.5', data['driver_version'])
+        self.assertNotIn('replication_enabled', data['pools'][0])
 
     def test_extend_volume(self):
         self.driver.extend_volume(test_volume, 3)
@@ -2755,6 +3312,17 @@ class HuaweiFCDriverTestCase(test.TestCase):
                                     test_new_type, None, test_host)
         self.assertTrue(retype)
 
+    @mock.patch.object(rest_client.RestClient, 'add_lun_to_partition')
+    @mock.patch.object(
+        huawei_driver.HuaweiBaseDriver,
+        '_get_volume_type',
+        return_value={'extra_specs': sync_replica_specs})
+    def test_retype_replication_volume_success(self, mock_get_type,
+                                               mock_add_lun_to_partition):
+        retype = self.driver.retype(None, test_volume,
+                                    test_new_replication_type, None, test_host)
+        self.assertTrue(retype)
+
     def test_retype_volume_cache_fail(self):
         self.driver.client.cache_not_exist = True
 
index acd5b3de859fe934d0751913ba9d82f04bd5c680..44e76ab8cf5cc2ef7e57dbc30cc3ea9c1f18d200 100644 (file)
@@ -75,3 +75,31 @@ QOS_KEYS = ['MAXIOPS', 'MINIOPS', 'MINBANDWidth',
             'MAXBANDWidth', 'LATENCY', 'IOTYPE']
 MAX_LUN_NUM_IN_QOS = 64
 HYPERMETRO_CLASS = "cinder.volume.drivers.huawei.hypermetro.HuaweiHyperMetro"
+
+DEFAULT_REPLICA_WAIT_INTERVAL = 1
+DEFAULT_REPLICA_WAIT_TIMEOUT = 10
+
+REPLICA_SYNC_MODEL = '1'
+REPLICA_ASYNC_MODEL = '2'
+REPLICA_SPEED = '2'
+REPLICA_PERIOD = '3600'
+REPLICA_SECOND_RO = '2'
+REPLICA_SECOND_RW = '3'
+
+REPLICA_RUNNING_STATUS_KEY = 'RUNNINGSTATUS'
+REPLICA_RUNNING_STATUS_INITIAL_SYNC = '21'
+REPLICA_RUNNING_STATUS_SYNC = '23'
+REPLICA_RUNNING_STATUS_SYNCED = '24'
+REPLICA_RUNNING_STATUS_NORMAL = '1'
+REPLICA_RUNNING_STATUS_SPLIT = '26'
+REPLICA_RUNNING_STATUS_INVALID = '35'
+
+REPLICA_HEALTH_STATUS_KEY = 'HEALTHSTATUS'
+REPLICA_HEALTH_STATUS_NORMAL = '1'
+
+REPLICA_LOCAL_DATA_STATUS_KEY = 'PRIRESDATASTATUS'
+REPLICA_REMOTE_DATA_STATUS_KEY = 'SECRESDATASTATUS'
+REPLICA_DATA_SYNC_KEY = 'ISDATASYNC'
+REPLICA_DATA_STATUS_SYNCED = '1'
+REPLICA_DATA_STATUS_COMPLETE = '2'
+REPLICA_DATA_STATUS_INCOMPLETE = '3'
index 06a8ef4990fd339e570f3bb4942571f2dc33d876..f4781dee5ce4dae292f6378a151719561373c542 100644 (file)
@@ -151,7 +151,6 @@ class HuaweiConf(object):
             else:
                 msg = (_("Invalid lun type %s is configured.") % lun_type)
                 LOG.exception(msg)
-
                 raise exception.InvalidInput(reason=msg)
 
         setattr(self.conf, 'lun_type', lun_type)
index 3431db1001af26ce2b4198f6fa338608b99ab59d..06eca13d79a83f0b55d09d0d83c3b025905aae03 100644 (file)
@@ -33,6 +33,7 @@ from cinder.volume.drivers.huawei import fc_zone_helper
 from cinder.volume.drivers.huawei import huawei_conf
 from cinder.volume.drivers.huawei import huawei_utils
 from cinder.volume.drivers.huawei import hypermetro
+from cinder.volume.drivers.huawei import replication
 from cinder.volume.drivers.huawei import rest_client
 from cinder.volume.drivers.huawei import smartx
 from cinder.volume import utils as volume_utils
@@ -90,6 +91,10 @@ class HuaweiBaseDriver(driver.VolumeDriver):
                                                      metro_san_password)
             self.rmt_client.login()
 
+        # init replication manager
+        self.replica = replication.ReplicaPairManager(self.client,
+                                                      self.configuration)
+
     def check_for_setup_error(self):
         pass
 
@@ -98,7 +103,9 @@ class HuaweiBaseDriver(driver.VolumeDriver):
         self.huawei_conf.update_config_value()
         if self.metro_flag:
             self.rmt_client.get_all_pools()
-        return self.client.update_volume_stats()
+        stats = self.client.update_volume_stats()
+        stats = self.replica.update_replica_capability(stats)
+        return stats
 
     def _get_volume_type(self, volume):
         volume_type = None
@@ -127,6 +134,8 @@ class HuaweiBaseDriver(driver.VolumeDriver):
             'thin_provisioning_support': False,
             'thick_provisioning_support': False,
             'hypermetro': False,
+            'replication_enabled': False,
+            'replication_type': 'async',
         }
 
         opts_value = {
@@ -146,6 +155,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
                                          opts_associate,
                                          specs)
         opts = smartx.SmartX().get_smartx_specs_opts(opts)
+        opts = replication.get_replication_opts(opts)
         LOG.debug('volume opts %(opts)s.', {'opts': opts})
         return opts
 
@@ -172,12 +182,15 @@ class HuaweiBaseDriver(driver.VolumeDriver):
             if ((not scope or scope == 'capabilities')
                     and key in opts_capability):
                 words = value.split()
-                if not (words and len(words) == 2 and words[0] == '<is>'):
+                if words and len(words) == 2 and words[0] in ('<is>', '<in>'):
+                    opts[key] = words[1].lower()
+                elif key == 'replication_type':
                     LOG.error(_LE("Extra specs must be specified as "
-                                  "capabilities:%s='<is> True' or "
-                                  "'<is> true'."), key)
+                                  "replication_type='<in> sync' or "
+                                  "'<in> async'."))
                 else:
-                    opts[key] = words[1].lower()
+                    LOG.error(_LE("Extra specs must be specified as "
+                                  "capabilities:%s='<is> True'."), key)
 
             if ((scope in opts_capability)
                     and (key in opts_value)
@@ -193,7 +206,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
             'TYPE': '11',
             'NAME': huawei_utils.encode_name(volume['id']),
             'PARENTTYPE': '216',
-            'PARENTID': self.client.get_pool_id(volume, pool_name),
+            'PARENTID': self.client.get_pool_id(pool_name),
             'DESCRIPTION': volume['name'],
             'ALLOCTYPE': opts.get('LUNType', self.configuration.lun_type),
             'CAPACITY': huawei_utils.get_volume_size(volume),
@@ -220,10 +233,11 @@ class HuaweiBaseDriver(driver.VolumeDriver):
         model_update['metadata'] = metadata
         return lun_info, model_update
 
-    def create_volume(self, volume):
-        """Create a volume."""
-        volume_type = self._get_volume_type(volume)
-        opts = self._get_volume_params(volume_type)
+    def _create_base_type_volume(self, opts, volume, volume_type):
+        """Create volume and add some base type.
+
+        Base type is the services won't conflict with the other service.
+        """
         lun_params = self._get_lun_params(volume, opts)
         lun_info, model_update = self._create_volume(volume, lun_params)
         lun_id = lun_info['ID']
@@ -244,7 +258,17 @@ class HuaweiBaseDriver(driver.VolumeDriver):
             msg = _('Create volume error. Because %s.') % six.text_type(err)
             raise exception.VolumeBackendAPIException(data=msg)
 
-        if (opts.get('hypermetro') and opts.get('hypermetro') == 'true'):
+        return lun_params, lun_info, model_update
+
+    def _add_extend_type_to_volume(self, opts, lun_params, lun_info,
+                                   model_update):
+        """Add the extend type.
+
+        Extend type is the services may conflict with LUNCopy.
+        So add it after the those services.
+        """
+        lun_id = lun_info['ID']
+        if opts.get('hypermetro') == 'true':
             metro = hypermetro.HuaweiHyperMetro(self.client,
                                                 self.rmt_client,
                                                 self.configuration,
@@ -257,6 +281,35 @@ class HuaweiBaseDriver(driver.VolumeDriver):
                 self._delete_lun_with_check(lun_id)
                 raise
 
+        if opts.get('replication_enabled') == 'true':
+            replica_model = opts.get('replication_type')
+            try:
+                replica_info = self.replica.create_replica(lun_info,
+                                                           replica_model)
+                model_update.update(replica_info)
+            except Exception as err:
+                LOG.exception(_LE('Create replication volume error.'))
+                self._delete_lun_with_check(lun_id)
+                raise
+
+        return model_update
+
+    def create_volume(self, volume):
+        """Create a volume."""
+        volume_type = self._get_volume_type(volume)
+        opts = self._get_volume_params(volume_type)
+        if (opts.get('hypermetro') == 'true'
+                and opts.get('replication_enabled') == 'true'):
+            err_msg = _("Hypermetro and Replication can not be "
+                        "used in the same volume_type.")
+            LOG.error(err_msg)
+            raise exception.VolumeBackendAPIException(data=err_msg)
+
+        lun_params, lun_info, model_update = (
+            self._create_base_type_volume(opts, volume, volume_type))
+
+        model_update = self._add_extend_type_to_volume(opts, lun_params,
+                                                       lun_info, model_update)
         return model_update
 
     def _delete_volume(self, volume):
@@ -281,7 +334,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
         lun_id = volume.get('provider_location')
         if not lun_id or not self.client.check_lun_exist(lun_id):
             LOG.warning(_LW("Can't find lun %s on the array."), lun_id)
-            return False
+            return
 
         qos_id = self.client.get_qosid_by_lunid(lun_id)
         if qos_id:
@@ -301,6 +354,16 @@ class HuaweiBaseDriver(driver.VolumeDriver):
                 self._delete_volume(volume)
                 raise
 
+        # Delete a replication volume
+        replica_data = volume.get('replication_driver_data')
+        if replica_data:
+            try:
+                self.replica.delete_replica(volume)
+            except exception.VolumeBackendAPIException as err:
+                with excutils.save_and_reraise_exception():
+                    LOG.exception(_LE("Delete replication error."))
+                    self._delete_volume(volume)
+
         self._delete_volume(volume)
 
     def _delete_lun_with_check(self, lun_id):
@@ -414,6 +477,15 @@ class HuaweiBaseDriver(driver.VolumeDriver):
 
     def migrate_volume(self, ctxt, volume, host, new_type=None):
         """Migrate a volume within the same array."""
+
+        # NOTE(jlc): Replication volume can't migrate. But retype
+        # can remove replication relationship first then do migrate.
+        # So don't add this judgement into _check_migration_valid().
+        volume_type = self._get_volume_type(volume)
+        opts = self._get_volume_params(volume_type)
+        if opts.get('replication_enabled') == 'true':
+            return (False, None)
+
         return self._migrate_volume(volume, host, new_type)
 
     def _check_migration_valid(self, host, volume):
@@ -516,6 +588,15 @@ class HuaweiBaseDriver(driver.VolumeDriver):
         We use LUNcopy to copy a new volume from snapshot.
         The time needed increases as volume size does.
         """
+        volume_type = self._get_volume_type(volume)
+        opts = self._get_volume_params(volume_type)
+        if (opts.get('hypermetro') == 'true'
+                and opts.get('replication_enabled') == 'true'):
+            err_msg = _("Hypermetro and Replication can not be "
+                        "used in the same volume_type.")
+            LOG.error(err_msg)
+            raise exception.VolumeBackendAPIException(data=err_msg)
+
         snapshotname = huawei_utils.encode_name(snapshot['id'])
         snapshot_id = snapshot.get('provider_location')
         if snapshot_id is None:
@@ -528,7 +609,9 @@ class HuaweiBaseDriver(driver.VolumeDriver):
             LOG.error(err_msg)
             raise exception.VolumeBackendAPIException(data=err_msg)
 
-        model_update = self.create_volume(volume)
+        lun_params, lun_info, model_update = (
+            self._create_base_type_volume(opts, volume, volume_type))
+
         tgt_lun_id = model_update['provider_location']
         luncopy_name = huawei_utils.encode_name(volume['id'])
         LOG.info(_LI(
@@ -555,6 +638,10 @@ class HuaweiBaseDriver(driver.VolumeDriver):
         self._copy_volume(volume, luncopy_name,
                           snapshot_id, tgt_lun_id)
 
+        # NOTE(jlc): Actually, we just only support replication here right
+        # now, not hypermetro.
+        model_update = self._add_extend_type_to_volume(opts, lun_params,
+                                                       lun_info, model_update)
         return model_update
 
     def create_cloned_volume(self, volume, src_vref):
@@ -598,6 +685,14 @@ class HuaweiBaseDriver(driver.VolumeDriver):
             LOG.error(msg)
             raise exception.VolumeBackendAPIException(data=msg)
 
+        volume_type = self._get_volume_type(volume)
+        opts = self._get_volume_params(volume_type)
+        if opts.get('replication_enabled') == 'true':
+            msg = (_("Can't extend replication volume, volume: %(id)s") %
+                   {"id": volume['id']})
+            LOG.error(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
         old_size = huawei_utils.get_volume_size(volume)
         new_size = int(new_size) * units.Gi / 512
         volume_name = huawei_utils.encode_name(volume['id'])
@@ -670,25 +765,51 @@ class HuaweiBaseDriver(driver.VolumeDriver):
         migration, change_opts, lun_id = self.determine_changes_when_retype(
             volume, new_type, host)
 
+        model_update = {}
+        replica_enabled_change = change_opts.get('replication_enabled')
+        replica_type_change = change_opts.get('replication_type')
+        if replica_enabled_change and replica_enabled_change[0] == 'true':
+            try:
+                self.replica.delete_replica(volume)
+                model_update.update({'replication_status': 'disabled',
+                                     'replication_driver_data': None})
+            except exception.VolumeBackendAPIException:
+                LOG.exception(_LE('Retype volume error. '
+                                  'Delete replication failed.'))
+                return False
+
         try:
             if migration:
                 LOG.debug("Begin to migrate LUN(id: %(lun_id)s) with "
                           "change %(change_opts)s.",
                           {"lun_id": lun_id, "change_opts": change_opts})
-                if self._migrate_volume(volume, host, new_type):
-                    return True
-                else:
+                if not self._migrate_volume(volume, host, new_type):
                     LOG.warning(_LW("Storage-assisted migration failed during "
                                     "retype."))
                     return False
             else:
                 # Modify lun to change policy
                 self.modify_lun(lun_id, change_opts)
-                return True
         except exception.VolumeBackendAPIException:
             LOG.exception(_LE('Retype volume error.'))
             return False
 
+        if replica_enabled_change and replica_enabled_change[1] == 'true':
+            try:
+                # If replica_enabled_change is not None, the
+                # replica_type_change won't be None. See function
+                # determine_changes_when_retype.
+                lun_info = self.client.get_lun_info(lun_id)
+                replica_info = self.replica.create_replica(
+                    lun_info, replica_type_change[1])
+                model_update.update(replica_info)
+            except exception.VolumeBackendAPIException:
+                LOG.exception(_LE('Retype volume error. '
+                                  'Create replication failed.'))
+                return False
+
+        return (True, model_update)
+
     def modify_lun(self, lun_id, change_opts):
         if change_opts.get('partitionid'):
             old, new = change_opts['partitionid']
@@ -858,6 +979,19 @@ class HuaweiBaseDriver(driver.VolumeDriver):
             migration = True
             change_opts['LUNType'] = (old_opts['LUNType'], new_opts['LUNType'])
 
+        volume_type = self._get_volume_type(volume)
+        volume_opts = self._get_volume_params(volume_type)
+        if (volume_opts['replication_enabled'] == 'true'
+                or new_opts['replication_enabled'] == 'true'):
+            # If replication_enabled changes,
+            # then replication_type in change_opts will be set.
+            change_opts['replication_enabled'] = (
+                volume_opts['replication_enabled'],
+                new_opts['replication_enabled'])
+
+            change_opts['replication_type'] = (volume_opts['replication_type'],
+                                               new_opts['replication_type'])
+
         change_opts = self._check_needed_changes(lun_id, old_opts, new_opts,
                                                  change_opts, new_type)
 
@@ -1060,6 +1194,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
         # Check other stuffs to determine whether this LUN can be imported.
         self._check_lun_valid_for_manage(lun_info, external_ref)
         type_id = volume.get('volume_type_id')
+        new_opts = None
         if type_id:
             # Handle volume type if specified.
             old_opts = self.get_lun_specs(lun_id)
@@ -1087,7 +1222,21 @@ class HuaweiBaseDriver(driver.VolumeDriver):
         self.client.rename_lun(lun_id, new_name,  # pylint: disable=E1121
                                description)
 
-        return {'provider_location': lun_id}
+        model_update = {}
+        model_update.update({'provider_location': lun_id})
+
+        if new_opts and new_opts.get('replication_enabled'):
+            LOG.debug("Manage volume need to create replication.")
+            try:
+                lun_info = self.client.get_lun_info(lun_id)
+                replica_info = self.replica.create_replica(
+                    lun_info, new_opts.get('replication_type'))
+                model_update.update(replica_info)
+            except exception.VolumeBackendAPIException:
+                with excutils.save_and_reraise_exception():
+                    LOG.exception(_LE("Manage exist volume failed."))
+
+        return model_update
 
     def _get_lun_info_by_ref(self, external_ref):
         LOG.debug("Get external_ref: %s", external_ref)
@@ -1236,6 +1385,27 @@ class HuaweiBaseDriver(driver.VolumeDriver):
                         {'snapshot_id': snapshot['id'],
                          'snapshot_name': snapshot_name})
 
+    def replication_enable(self, context, volume):
+        """Enable replication and do switch role when needed."""
+        self.replica.enable_replica(volume)
+
+    def replication_disable(self, context, volume):
+        """Disable replication."""
+        self.replica.disable_replica(volume)
+
+    def replication_failover(self, context, volume, secondary):
+        """Disable replication and unprotect remote LUN."""
+        return self.replica.failover_replica(volume)
+
+    def list_replication_targets(self, context, vref):
+        """Obtain volume repliction targets."""
+        return self.replica.list_replica_targets(vref)
+
+    def get_replication_updates(self, context):
+        # NOTE(jlc): The manager does not do aynthing with these updates.
+        # When that is changed, here must be modified as well.
+        return []
+
 
 class HuaweiISCSIDriver(HuaweiBaseDriver, driver.ISCSIDriver):
     """ISCSI driver for Huawei storage arrays.
@@ -1254,9 +1424,10 @@ class HuaweiISCSIDriver(HuaweiBaseDriver, driver.ISCSIDriver):
         2.0.1 - Manage/unmanage volume support
         2.0.2 - Refactor HuaweiISCSIDriver
         2.0.3 - Manage/unmanage snapshot support
+        2.0.5 - Replication V2 support
     """
 
-    VERSION = "2.0.3"
+    VERSION = "2.0.5"
 
     def __init__(self, *args, **kwargs):
         super(HuaweiISCSIDriver, self).__init__(*args, **kwargs)
@@ -1449,9 +1620,10 @@ class HuaweiFCDriver(HuaweiBaseDriver, driver.FibreChannelDriver):
         2.0.2 - Refactor HuaweiFCDriver
         2.0.3 - Manage/unmanage snapshot support
         2.0.4 - Balanced FC port selection
+        2.0.5 - Replication V2 support
     """
 
-    VERSION = "2.0.4"
+    VERSION = "2.0.5"
 
     def __init__(self, *args, **kwargs):
         super(HuaweiFCDriver, self).__init__(*args, **kwargs)
@@ -1515,7 +1687,7 @@ class HuaweiFCDriver(HuaweiBaseDriver, driver.FibreChannelDriver):
                     if not wwns_in_host and not iqns_in_host:
                         self.client.remove_host(host_id)
 
-                    msg = (_('Can not add FC initiator to host.'))
+                    msg = _('Can not add FC initiator to host.')
                     LOG.error(msg)
                     raise exception.VolumeBackendAPIException(data=msg)
 
diff --git a/cinder/volume/drivers/huawei/replication.py b/cinder/volume/drivers/huawei/replication.py
new file mode 100644 (file)
index 0000000..a6d57fb
--- /dev/null
@@ -0,0 +1,675 @@
+# Copyright (c) 2016 Huawei Technologies Co., Ltd.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+import json
+import re
+
+from oslo_log import log as logging
+from oslo_utils import excutils
+
+from cinder import exception
+from cinder.i18n import _, _LW, _LE
+from cinder.volume.drivers.huawei import constants
+from cinder.volume.drivers.huawei import huawei_utils
+from cinder.volume.drivers.huawei import rest_client
+from cinder.volume import utils as volume_utils
+
+LOG = logging.getLogger(__name__)
+
+
+class AbsReplicaOp(object):
+    def __init__(self, client):
+        self.client = client
+
+    def create(self, **kwargs):
+        pass
+
+    def delete(self, replica_id):
+        pass
+
+    def protect_second(self, replica_id):
+        pass
+
+    def unprotect_second(self, replica_id):
+        pass
+
+    def sync(self, replica_id):
+        pass
+
+    def split(self, replica_id):
+        pass
+
+    def switch(self, replica_id):
+        pass
+
+    def is_primary(self, replica_info):
+        flag = replica_info.get('ISPRIMARY')
+        if flag and flag.lower() == 'true':
+            return True
+        return False
+
+    def get_replica_info(self, replica_id):
+        return {}
+
+    def _is_status(self, status_key, status, replica_info):
+        if type(status) in (list, tuple):
+            return replica_info.get(status_key, '') in status
+        if type(status) is str:
+            return replica_info.get(status_key, '') == status
+
+        return False
+
+    def is_running_status(self, status, replica_info):
+        return self._is_status(constants.REPLICA_RUNNING_STATUS_KEY,
+                               status, replica_info)
+
+    def is_health_status(self, status, replica_info):
+        return self._is_status(constants.REPLICA_HEALTH_STATUS_KEY,
+                               status, replica_info)
+
+
+class PairOp(AbsReplicaOp):
+    def create(self, local_lun_id, rmt_lun_id, rmt_dev_id,
+               rmt_dev_name, replica_model,
+               speed=constants.REPLICA_SPEED,
+               period=constants.REPLICA_PERIOD,
+               **kwargs):
+        super(PairOp, self).create(**kwargs)
+
+        params = {
+            "LOCALRESID": local_lun_id,
+            "LOCALRESTYPE": '11',
+            "REMOTEDEVICEID": rmt_dev_id,
+            "REMOTEDEVICENAME": rmt_dev_name,
+            "REMOTERESID": rmt_lun_id,
+            "REPLICATIONMODEL": replica_model,
+            # recovery policy. 1: auto, 2: manual
+            "RECOVERYPOLICY": '2',
+            "SPEED": speed,
+        }
+
+        if replica_model == constants.REPLICA_ASYNC_MODEL:
+            # Synchronize type values:
+            # 1, manual
+            # 2, timed wait when synchronization begins
+            # 3, timed wait when synchronization ends
+            params['SYNCHRONIZETYPE'] = '2'
+            params['TIMINGVAL'] = period
+
+        try:
+            pair_info = self.client.create_pair(params)
+        except Exception as err:
+            msg = _('Create replication pair failed. Error: %s.') % err
+            LOG.error(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+        return pair_info
+
+    def split(self, pair_id):
+        self.client.split_pair(pair_id)
+
+    def delete(self, pair_id, force=False):
+        self.client.delete_pair(pair_id, force)
+
+    def protect_second(self, pair_id):
+        self.client.set_pair_second_access(pair_id,
+                                           constants.REPLICA_SECOND_RO)
+
+    def unprotect_second(self, pair_id):
+        self.client.set_pair_second_access(pair_id,
+                                           constants.REPLICA_SECOND_RW)
+
+    def sync(self, pair_id):
+        self.client.sync_pair(pair_id)
+
+    def switch(self, pair_id):
+        self.client.switch_pair(pair_id)
+
+    def get_replica_info(self, pair_id):
+        return self.client.get_pair_by_id(pair_id)
+
+
+class CGOp(AbsReplicaOp):
+    pass
+
+
+class ReplicaCommonDriver(object):
+    def __init__(self, conf, replica_op):
+        self.conf = conf
+        self.op = replica_op
+
+    def protect_second(self, replica_id):
+        info = self.op.get_replica_info(replica_id)
+        if info.get('SECRESACCESS') == constants.REPLICA_SECOND_RO:
+            return
+
+        self.op.protect_second(replica_id)
+        self.wait_second_access(replica_id, constants.REPLICA_SECOND_RO)
+
+    def unprotect_second(self, replica_id):
+        info = self.op.get_replica_info(replica_id)
+        if info.get('SECRESACCESS') == constants.REPLICA_SECOND_RW:
+            return
+
+        self.op.unprotect_second(replica_id)
+        self.wait_second_access(replica_id, constants.REPLICA_SECOND_RW)
+
+    def sync(self, replica_id, wait_complete=False):
+        self.protect_second(replica_id)
+
+        expect_status = (constants.REPLICA_RUNNING_STATUS_NORMAL,
+                         constants.REPLICA_RUNNING_STATUS_SYNC,
+                         constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
+        info = self.op.get_replica_info(replica_id)
+
+        # When running status is synchronizing or normal,
+        # it's not necessary to do synchronize again.
+        if (info.get('REPLICATIONMODEL') == constants.REPLICA_SYNC_MODEL
+                and self.op.is_running_status(expect_status, info)):
+            return
+
+        self.op.sync(replica_id)
+        self.wait_expect_state(replica_id, expect_status)
+
+        if wait_complete:
+            self.wait_replica_ready(replica_id)
+
+    def split(self, replica_id):
+        running_status = (constants.REPLICA_RUNNING_STATUS_SPLIT,
+                          constants.REPLICA_RUNNING_STATUS_INVALID)
+        info = self.op.get_replica_info(replica_id)
+        if self.op.is_running_status(running_status, info):
+            return
+
+        try:
+            self.op.split(replica_id)
+        except Exception as err:
+            LOG.warning(_LW('Split replication exception: %s.'), err)
+
+        try:
+            self.wait_expect_state(replica_id, running_status)
+        except Exception as err:
+            msg = _('Split replication failed.')
+            LOG.error(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+    def enable(self, replica_id, wait_sync_complete=False):
+        info = self.op.get_replica_info(replica_id)
+        if not self.op.is_primary(info):
+            self.switch(replica_id)
+        self.sync(replica_id)
+        return None
+
+    def disable(self, replica_id):
+        self.split(replica_id)
+        return None
+
+    def switch(self, replica_id):
+        self.split(replica_id)
+        self.unprotect_second(replica_id)
+        self.op.switch(replica_id)
+
+        # Wait to be primary
+        def _wait_switch_to_primary():
+            info = self.op.get_replica_info(replica_id)
+            if self.op.is_primary(info):
+                return True
+            return False
+
+        interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
+        timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
+        huawei_utils.wait_for_condition(_wait_switch_to_primary,
+                                        interval,
+                                        timeout)
+
+    def failover(self, replica_id):
+        """Failover replication.
+
+        Purpose:
+            1. Split replication.
+            2. Set secondary access read & write.
+        """
+        info = self.op.get_replica_info(replica_id)
+        if self.op.is_primary(info):
+            msg = _('We should not do switch over on primary array.')
+            LOG.error(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+        sync_status_set = (constants.REPLICA_RUNNING_STATUS_SYNC,
+                           constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
+        if self.op.is_running_status(sync_status_set, info):
+            self.wait_replica_ready(replica_id)
+
+        self.split(replica_id)
+        self.op.unprotect_second(replica_id)
+
+    def wait_replica_ready(self, replica_id, interval=None, timeout=None):
+        LOG.debug('Wait synchronize complete.')
+        running_status_normal = (constants.REPLICA_RUNNING_STATUS_NORMAL,
+                                 constants.REPLICA_RUNNING_STATUS_SYNCED)
+        running_status_sync = (constants.REPLICA_RUNNING_STATUS_SYNC,
+                               constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
+        health_status_normal = constants.REPLICA_HEALTH_STATUS_NORMAL
+
+        def _replica_ready():
+            info = self.op.get_replica_info(replica_id)
+            if (self.op.is_running_status(running_status_normal, info)
+                    and self.op.is_health_status(health_status_normal, info)):
+                return True
+
+            if not self.op.is_running_status(running_status_sync, info):
+                msg = (_('Wait synchronize failed. Running status: %s.') %
+                       info.get(constants.REPLICA_RUNNING_STATUS_KEY))
+                LOG.error(msg)
+                raise exception.VolumeBackendAPIException(data=msg)
+
+            return False
+
+        if not interval:
+            interval = constants.DEFAULT_WAIT_INTERVAL
+        if not timeout:
+            timeout = constants.DEFAULT_WAIT_TIMEOUT
+
+        huawei_utils.wait_for_condition(_replica_ready,
+                                        interval,
+                                        timeout)
+
+    def wait_second_access(self, replica_id, access_level):
+        def _check_access():
+            info = self.op.get_replica_info(replica_id)
+            if info.get('SECRESACCESS') == access_level:
+                return True
+            return False
+
+        interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
+        timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
+        huawei_utils.wait_for_condition(_check_access,
+                                        interval,
+                                        timeout)
+
+    def wait_expect_state(self, replica_id,
+                          running_status, health_status=None,
+                          interval=None, timeout=None):
+        def _check_state():
+            info = self.op.get_replica_info(replica_id)
+            if self.op.is_running_status(running_status, info):
+                if (not health_status
+                        or self.op.is_health_status(health_status, info)):
+                    return True
+            return False
+
+        if not interval:
+            interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
+        if not timeout:
+            timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
+
+        huawei_utils.wait_for_condition(_check_state, interval, timeout)
+
+
+def get_replication_driver_data(volume):
+    if volume.get('replication_driver_data'):
+        return json.loads(volume['replication_driver_data'])
+    return {}
+
+
+def to_string(dict_data):
+    if dict_data:
+        return json.dumps(dict_data)
+    return ''
+
+
+class ReplicaPairManager(object):
+    def __init__(self, local_client, conf):
+        self.local_client = local_client
+        self.conf = conf
+        self.replica_device = self.conf.safe_get('replication_device')
+        if not self.replica_device:
+            return
+
+        # managed_backed_name format: host_name@backend_name#pool_name
+        self.rmt_backend = self.replica_device[0]['managed_backend_name']
+        self.rmt_pool = volume_utils.extract_host(self.rmt_backend,
+                                                  level='pool')
+        self.target_dev_id = self.replica_device[0]['target_device_id']
+
+        self._init_rmt_client()
+        self.local_op = PairOp(self.local_client)
+        self.local_driver = ReplicaCommonDriver(self.conf, self.local_op)
+        self.rmt_op = PairOp(self.rmt_client)
+        self.rmt_driver = ReplicaCommonDriver(self.conf, self.rmt_op)
+
+        self.try_login_remote_array()
+
+    def try_login_remote_array(self):
+        try:
+            self.rmt_client.login()
+        except Exception as err:
+            LOG.warning(_LW('Remote array login failed. Error: %s.'), err)
+
+    def try_get_remote_wwn(self):
+        try:
+            info = self.rmt_client.get_array_info()
+            return info.get('wwn')
+        except Exception as err:
+            LOG.warning(_LW('Get remote array wwn failed. Error: %s.'), err)
+            return None
+
+    def get_remote_device_by_wwn(self, wwn):
+        devices = {}
+        try:
+            devices = self.local_client.get_remote_devices()
+        except Exception as err:
+            LOG.warning(_LW('Get remote devices failed. Error: %s.'), err)
+
+        for device in devices:
+            if device.get('WWN') == wwn:
+                return device
+
+        return {}
+
+    def check_remote_available(self):
+        if not self.replica_device:
+            return False
+
+        # We get device wwn in every check time.
+        # If remote array changed, we can run normally.
+        wwn = self.try_get_remote_wwn()
+        if not wwn:
+            return False
+
+        device = self.get_remote_device_by_wwn(wwn)
+        # Check remote device is available to use.
+        # If array type is replication, 'ARRAYTYPE' == '1'.
+        # If health status is normal, 'HEALTHSTATUS' == '1'.
+        if (device and device.get('ARRAYTYPE') == '1'
+                and device.get('HEALTHSTATUS') == '1'
+                and device.get('RUNNINGSTATUS') == constants.STATUS_RUNNING):
+            return True
+
+        return False
+
+    def update_replica_capability(self, stats):
+        is_rmt_dev_available = self.check_remote_available()
+        if not is_rmt_dev_available:
+            if self.replica_device:
+                LOG.warning(_LW('Remote device is unavailable. '
+                                'Remote backend: %s.'),
+                            self.rmt_backend)
+            return stats
+
+        for pool in stats['pools']:
+            pool['replication_enabled'] = True
+            pool['replication_type'] = ['sync', 'async']
+
+        return stats
+
+    def _init_rmt_client(self):
+        # Multiple addresses support.
+        rmt_addrs = self.replica_device[0]['san_address'].split(';')
+        rmt_addrs = list(set([x.strip() for x in rmt_addrs if x.strip()]))
+        rmt_user = self.replica_device[0]['san_user']
+        rmt_password = self.replica_device[0]['san_password']
+        self.rmt_client = rest_client.RestClient(self.conf,
+                                                 rmt_addrs,
+                                                 rmt_user,
+                                                 rmt_password)
+
+    def get_rmt_dev_info(self):
+        wwn = self.try_get_remote_wwn()
+        if not wwn:
+            return None, None
+
+        device = self.get_remote_device_by_wwn(wwn)
+        if not device:
+            return None, None
+
+        return device.get('ID'), device.get('NAME')
+
+    def build_rmt_lun_params(self, local_lun_info):
+        params = {
+            'TYPE': '11',
+            'NAME': local_lun_info['NAME'],
+            'PARENTTYPE': '216',
+            'PARENTID': self.rmt_client.get_pool_id(self.rmt_pool),
+            'DESCRIPTION': local_lun_info['DESCRIPTION'],
+            'ALLOCTYPE': local_lun_info['ALLOCTYPE'],
+            'CAPACITY': local_lun_info['CAPACITY'],
+            'WRITEPOLICY': self.conf.lun_write_type,
+            'MIRRORPOLICY': self.conf.lun_mirror_switch,
+            'PREFETCHPOLICY': self.conf.lun_prefetch_type,
+            'PREFETCHVALUE': self.conf.lun_prefetch_value,
+            'DATATRANSFERPOLICY': self.conf.lun_policy,
+            'READCACHEPOLICY': self.conf.lun_read_cache_policy,
+            'WRITECACHEPOLICY': self.conf.lun_write_cache_policy,
+        }
+
+        LOG.debug('Remote lun params: %s.', params)
+        return params
+
+    def wait_volume_online(self, client, lun_info,
+                           interval=None, timeout=None):
+        online_status = constants.STATUS_VOLUME_READY
+        if lun_info.get('RUNNINGSTATUS') == online_status:
+            return
+
+        lun_id = lun_info['ID']
+
+        def _wait_online():
+            info = client.get_lun_info(lun_id)
+            return info.get('RUNNINGSTATUS') == online_status
+
+        if not interval:
+            interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
+        if not timeout:
+            timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
+
+        huawei_utils.wait_for_condition(_wait_online,
+                                        interval,
+                                        timeout)
+
+    def create_rmt_lun(self, local_lun_info):
+        # Create on rmt array. If failed, raise exception.
+        lun_params = self.build_rmt_lun_params(local_lun_info)
+        lun_info = self.rmt_client.create_lun(lun_params)
+        try:
+            self.wait_volume_online(self.rmt_client, lun_info)
+        except exception.VolumeBackendAPIException:
+            with excutils.save_and_reraise_exception():
+                self.rmt_client.delete_lun(lun_info['ID'])
+
+        return lun_info
+
+    def create_replica(self, local_lun_info, replica_model):
+        """Create remote LUN and replication pair.
+
+        Purpose:
+            1. create remote lun
+            2. create replication pair
+            3. enable replication pair
+        """
+        LOG.debug(('Create replication, local lun info: %(info)s, '
+                   'replication model: %(model)s.'),
+                  {'info': local_lun_info, 'model': replica_model})
+
+        local_lun_id = local_lun_info['ID']
+        self.wait_volume_online(self.local_client, local_lun_info)
+
+        # step1, create remote lun
+        rmt_lun_info = self.create_rmt_lun(local_lun_info)
+        rmt_lun_id = rmt_lun_info['ID']
+
+        # step2, get remote device info
+        rmt_dev_id, rmt_dev_name = self.get_rmt_dev_info()
+        if not rmt_lun_id or not rmt_dev_name:
+            self._delete_rmt_lun(rmt_lun_id)
+            msg = _('Get remote device info failed.')
+            LOG.error(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+        # step3, create replication pair
+        try:
+            pair_info = self.local_op.create(local_lun_id,
+                                             rmt_lun_id, rmt_dev_id,
+                                             rmt_dev_name, replica_model)
+            pair_id = pair_info['ID']
+        except Exception as err:
+            with excutils.save_and_reraise_exception():
+                LOG.error(_LE('Create pair failed. Error: %s.'), err)
+                self._delete_rmt_lun(rmt_lun_id)
+
+        # step4, start sync manually. If replication type is sync,
+        # then wait for sync complete.
+        wait_complete = (replica_model == constants.REPLICA_SYNC_MODEL)
+        try:
+            self.local_driver.sync(pair_id, wait_complete)
+        except Exception as err:
+            with excutils.save_and_reraise_exception():
+                LOG.error(_LE('Start synchronization failed. Error: %s.'), err)
+                self._delete_pair(pair_id)
+                self._delete_rmt_lun(rmt_lun_id)
+
+        model_update = {}
+        driver_data = {'pair_id': pair_id,
+                       'rmt_lun_id': rmt_lun_id}
+        model_update['replication_driver_data'] = to_string(driver_data)
+        model_update['replication_status'] = 'enabled'
+        LOG.debug('Create replication, return info: %s.', model_update)
+        return model_update
+
+    def _delete_pair(self, pair_id):
+        if (not pair_id
+                or not self.local_client.check_pair_exist(pair_id)):
+            return
+
+        self.local_driver.split(pair_id)
+        self.local_op.delete(pair_id)
+
+    def _delete_rmt_lun(self, lun_id):
+        if lun_id and self.rmt_client.check_lun_exist(lun_id):
+            self.rmt_client.delete_lun(lun_id)
+
+    def delete_replica(self, volume):
+        """Delete replication pair and remote lun.
+
+        Purpose:
+            1. delete replication pair
+            2. delete remote_lun
+        """
+        LOG.debug('Delete replication, volume: %s.', volume['id'])
+        info = get_replication_driver_data(volume)
+        pair_id = info.get('pair_id')
+        if pair_id:
+            self._delete_pair(pair_id)
+
+        # Delete remote_lun
+        rmt_lun_id = info.get('rmt_lun_id')
+        if rmt_lun_id:
+            self._delete_rmt_lun(rmt_lun_id)
+
+    def enable_replica(self, volume):
+        """Enable replication.
+
+        Purpose:
+            1. If local backend's array is secondary, switch to primary
+            2. Synchronize data
+        """
+        LOG.debug('Enable replication, volume: %s.', volume['id'])
+
+        info = get_replication_driver_data(volume)
+        pair_id = info.get('pair_id')
+        if not pair_id:
+            msg = _('No pair id in volume replication_driver_data.')
+            LOG.error(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+        info = self.local_op.get_replica_info(pair_id)
+        if not info:
+            msg = _('Pair does not exist on array. Pair id: %s.') % pair_id
+            LOG.error(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+        wait_sync_complete = False
+        if info.get('REPLICATIONMODEL') == constants.REPLICA_SYNC_MODEL:
+            wait_sync_complete = True
+
+        return self.local_driver.enable(pair_id, wait_sync_complete)
+
+    def disable_replica(self, volume):
+        """We consider that all abnormal states is disabled."""
+        LOG.debug('Disable replication, volume: %s.', volume['id'])
+
+        info = get_replication_driver_data(volume)
+        pair_id = info.get('pair_id')
+        if not pair_id:
+            LOG.warning(_LW('No pair id in volume replication_driver_data.'))
+            return None
+
+        return self.local_driver.disable(pair_id)
+
+    def failover_replica(self, volume):
+        """Just make the secondary available."""
+        LOG.debug('Failover replication, volume: %s.', volume['id'])
+
+        info = get_replication_driver_data(volume)
+        pair_id = info.get('pair_id')
+        if not pair_id:
+            msg = _('No pair id in volume replication_driver_data.')
+            LOG.error(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+        rmt_lun_id = info.get('rmt_lun_id')
+        if not rmt_lun_id:
+            msg = _('No remote LUN id in volume replication_driver_data.')
+            LOG.error(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+        # Remote array must be available. So we can get the real pool info.
+        lun_info = self.rmt_client.get_lun_info(rmt_lun_id)
+        lun_wwn = lun_info.get('WWN')
+        lun_pool = lun_info.get('PARENTNAME')
+        new_backend = re.sub(r'(?<=#).*$', lun_pool, self.rmt_backend)
+
+        self.rmt_driver.failover(pair_id)
+
+        metadata = huawei_utils.get_volume_metadata(volume)
+        metadata.update({'lun_wwn': lun_wwn})
+
+        new_driver_data = {'pair_id': pair_id,
+                           'rmt_lun_id': volume['provider_location']}
+        new_driver_data = to_string(new_driver_data)
+        return {'host': new_backend,
+                'provider_location': rmt_lun_id,
+                'replication_driver_data': new_driver_data,
+                'metadata': metadata}
+
+    def list_replica_targets(self, volume):
+        info = get_replication_driver_data(volume)
+        if not info:
+            LOG.warning(_LW('Replication driver data does not exist. '
+                            'Volume: %s'), volume['id'])
+
+        targets = [{'target_device_id': self.target_dev_id}]
+        return {'volume_id': volume['id'],
+                'targets': targets}
+
+
+def get_replication_opts(opts):
+    if opts.get('replication_type') == 'sync':
+        opts['replication_type'] = constants.REPLICA_SYNC_MODEL
+    else:
+        opts['replication_type'] = constants.REPLICA_ASYNC_MODEL
+
+    return opts
index 222fe737face2450056b9b9282970d6afd4dc404..e6c374e4ee2cc62ca2c35e9ae1afd2a8e846c69c 100644 (file)
@@ -231,7 +231,7 @@ class RestClient(object):
 
         return info
 
-    def get_pool_id(self, volume, pool_name):
+    def get_pool_id(self, pool_name):
         pools = self.get_all_pools()
         pool_info = self.get_pool_info(pool_name, pools)
         if not pool_info:
@@ -1562,11 +1562,15 @@ class RestClient(object):
 
         self._assert_rest_result(result, _('Add lun to cache error.'))
 
-    def find_array_version(self):
+    def get_array_info(self):
         url = "/system/"
         result = self.call(url, None, "GET")
-        self._assert_rest_result(result, _('Find array version error.'))
-        return result['data']['PRODUCTVERSION']
+        self._assert_rest_result(result, _('Get array info error.'))
+        return result.get('data', None)
+
+    def find_array_version(self):
+        info = self.get_array_info()
+        return info.get('PRODUCTVERSION', None)
 
     def remove_host(self, host_id):
         url = "/host/%s" % host_id
@@ -2008,3 +2012,78 @@ class RestClient(object):
         for item in result.get('data', []):
             wwns.append(item['WWN'])
         return wwns
+
+    def get_remote_devices(self):
+        url = "/remote_device"
+        result = self.call(url, None, "GET")
+        self._assert_rest_result(result, _('Get remote devices error.'))
+        return result.get('data', [])
+
+    def create_pair(self, pair_params):
+        url = "/REPLICATIONPAIR"
+        result = self.call(url, pair_params, "POST")
+
+        msg = _('Create replication error.')
+        self._assert_rest_result(result, msg)
+        self._assert_data_in_result(result, msg)
+        return result['data']
+
+    def get_pair_by_id(self, pair_id):
+        url = "/REPLICATIONPAIR/" + pair_id
+        result = self.call(url, None, "GET")
+
+        msg = _('Get pair failed.')
+        self._assert_rest_result(result, msg)
+        return result.get('data', {})
+
+    def switch_pair(self, pair_id):
+        url = '/REPLICATIONPAIR/switch'
+        data = {"ID": pair_id,
+                "TYPE": "263"}
+        result = self.call(url, data, "PUT")
+
+        msg = _('Switch over pair error.')
+        self._assert_rest_result(result, msg)
+
+    def split_pair(self, pair_id):
+        url = '/REPLICATIONPAIR/split'
+        data = {"ID": pair_id,
+                "TYPE": "263"}
+        result = self.call(url, data, "PUT")
+
+        msg = _('Split pair error.')
+        self._assert_rest_result(result, msg)
+
+    def delete_pair(self, pair_id, force=False):
+        url = "/REPLICATIONPAIR/" + pair_id
+        data = None
+        if force:
+            data = {"ISLOCALDELETE": force}
+
+        result = self.call(url, data, "DELETE")
+
+        msg = _('delete_replication error.')
+        self._assert_rest_result(result, msg)
+
+    def sync_pair(self, pair_id):
+        url = "/REPLICATIONPAIR/sync"
+        data = {"ID": pair_id,
+                "TYPE": "263"}
+        result = self.call(url, data, "PUT")
+
+        msg = _('Sync pair error.')
+        self._assert_rest_result(result, msg)
+
+    def check_pair_exist(self, pair_id):
+        url = "/REPLICATIONPAIR/" + pair_id
+        result = self.call(url, None, "GET")
+        return result['error']['code'] == 0
+
+    def set_pair_second_access(self, pair_id, access):
+        url = "/REPLICATIONPAIR/" + pair_id
+        data = {"ID": pair_id,
+                "SECRESACCESS": access}
+        result = self.call(url, data, "PUT")
+
+        msg = _('Set pair secondary access error.')
+        self._assert_rest_result(result, msg)
diff --git a/releasenotes/notes/support-replication-for-huawei-volume-driver-ad61e9f5eba9c422.yaml b/releasenotes/notes/support-replication-for-huawei-volume-driver-ad61e9f5eba9c422.yaml
new file mode 100644 (file)
index 0000000..2ff013f
--- /dev/null
@@ -0,0 +1,2 @@
+features:
+  - Added Replication V2 support for Huawei drivers.