from cinder.volume.drivers.huawei import huawei_conf
from cinder.volume.drivers.huawei import huawei_driver
from cinder.volume.drivers.huawei import hypermetro
+from cinder.volume.drivers.huawei import replication
from cinder.volume.drivers.huawei import rest_client
from cinder.volume.drivers.huawei import smartx
'value': '11'}],
}
+sync_replica_specs = {'replication_enabled': '<is> True',
+ 'replication_type': '<in> sync'}
+async_replica_specs = {'replication_enabled': '<is> True',
+ 'replication_type': '<in> async'}
+
+TEST_PAIR_ID = "3400a30d844d0004"
+replication_volume = {
+ 'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
+ 'size': 2,
+ 'volume_name': 'vol1',
+ 'id': '21ec7341-9256-497b-97d9-ef48edcf0635',
+ 'volume_id': '21ec7341-9256-497b-97d9-ef48edcf0635',
+ 'provider_auth': None,
+ 'project_id': 'project',
+ 'display_name': 'vol1',
+ 'display_description': 'test volume',
+ 'volume_type_id': None,
+ 'host': 'ubuntu@huawei#OpenStack_Pool',
+ 'provider_location': '11',
+ 'metadata': {'lun_wwn': '6643e8c1004c5f6723e9f454003'},
+ 'replication_status': 'disabled',
+ 'replication_driver_data':
+ '{"pair_id": "%s", "rmt_lun_id": "1"}' % TEST_PAIR_ID,
+}
+
test_snap = {'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
'size': 1,
'volume_name': 'vol1',
'description': None,
}
+test_new_replication_type = {
+ 'name': u'new_type',
+ 'qos_specs_id': None,
+ 'deleted': False,
+ 'created_at': None,
+ 'updated_at': None,
+ 'extra_specs': {
+ 'replication_enabled': '<is> True',
+ 'replication_type': '<in> sync',
+ },
+ 'is_public': True,
+ 'deleted_at': None,
+ 'id': u'530a56e1-a1a4-49f3-ab6c-779a6e5d999f',
+ 'description': None,
+}
+
hypermetro_devices = """
{
"remote_device": {
},
"data": {
"ID": "1",
- "NAME": "5mFHcBv4RkCcD+JyrWc0SA"
+ "NAME": "5mFHcBv4RkCcD+JyrWc0SA",
+ "WWN": "6643e8c1004c5f6723e9f454003",
+ "DESCRIPTION": "21ec7341-9256-497b-97d9-ef48edcf0635",
+ "HEALTHSTATUS": "1",
+ "RUNNINGSTATUS": "27",
+ "ALLOCTYPE": "1",
+ "CAPACITY": "2097152"
}
}
"""
"IOCLASSID": "11",
"NAME": "5mFHcBv4RkCcD+JyrWc0SA",
"DESCRIPTION": "21ec7341-9256-497b-97d9-ef48edcf0635",
- "RUNNINGSTATUS": "2",
+ "RUNNINGSTATUS": "10",
"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "27",
"LUNLIST": "",
"WRITECACHEPOLICY": "5",
"OWNINGCONTROLLER": "0B",
"SMARTCACHEPARTITIONID": "",
- "CACHEPARTITIONID": ""
+ "CACHEPARTITIONID": "",
+ "WWN": "6643e8c1004c5f6723e9f454003",
+ "PARENTNAME": "OpenStack_Pool"
}
}
"""
"code": 0
},
"data":{
- "PRODUCTVERSION": "V100R001C10"
+ "PRODUCTVERSION": "V100R001C10",
+ "wwn": "21003400a30d844d"
}
}
"""
FAKE_PORTS_IN_PG_RESPONSE)
+# Replication response
+FAKE_GET_REMOTEDEV_RESPONSE = """
+{
+ "data":[{
+ "ARRAYTYPE":"1",
+ "HEALTHSTATUS":"1",
+ "ID":"0",
+ "NAME":"Huawei.Storage",
+ "RUNNINGSTATUS":"1",
+ "WWN":"21003400a30d844d"
+ }],
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/remote_device/GET'] = (
+ FAKE_GET_REMOTEDEV_RESPONSE)
+
+FAKE_CREATE_PAIR_RESPONSE = """
+{
+ "data":{
+ "ID":"%s"
+ },
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+""" % TEST_PAIR_ID
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/POST'] = (
+ FAKE_CREATE_PAIR_RESPONSE)
+
+FAKE_DELETE_PAIR_RESPONSE = """
+{
+ "data":{},
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/DELETE' % TEST_PAIR_ID] = (
+ FAKE_DELETE_PAIR_RESPONSE)
+
+FAKE_SET_PAIR_ACCESS_RESPONSE = """
+{
+ "data":{},
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/PUT' % TEST_PAIR_ID] = (
+ FAKE_SET_PAIR_ACCESS_RESPONSE)
+
+FAKE_GET_PAIR_NORMAL_RESPONSE = """
+{
+ "data":{
+ "REPLICATIONMODEL": "1",
+ "RUNNINGSTATUS": "1",
+ "SECRESACCESS": "2",
+ "HEALTHSTATUS": "1",
+ "ISPRIMARY": "true"
+ },
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+"""
+
+FAKE_GET_PAIR_SPLIT_RESPONSE = """
+{
+ "data":{
+ "REPLICATIONMODEL": "1",
+ "RUNNINGSTATUS": "26",
+ "SECRESACCESS": "2",
+ "ISPRIMARY": "true"
+ },
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+"""
+
+FAKE_GET_PAIR_SYNC_RESPONSE = """
+{
+ "data":{
+ "REPLICATIONMODEL": "1",
+ "RUNNINGSTATUS": "23",
+ "SECRESACCESS": "2"
+ },
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/GET' % TEST_PAIR_ID] = (
+ FAKE_GET_PAIR_NORMAL_RESPONSE)
+
+FAKE_SYNC_PAIR_RESPONSE = """
+{
+ "data":{},
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/sync/PUT'] = (
+ FAKE_SYNC_PAIR_RESPONSE)
+
+FAKE_SPLIT_PAIR_RESPONSE = """
+{
+ "data":{},
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/split/PUT'] = (
+ FAKE_SPLIT_PAIR_RESPONSE)
+
+FAKE_SWITCH_PAIR_RESPONSE = """
+{
+ "data":{},
+ "error":{
+ "code":0,
+ "description":"0"
+ }
+}
+"""
+MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/switch/PUT'] = (
+ FAKE_SWITCH_PAIR_RESPONSE)
+
+
def Fake_sleep(time):
pass
self.conf = conf
self.protocol = protocol
+ def safe_get(self, key):
+ try:
+ return getattr(self.conf, key)
+ except Exception:
+ return
+
def update_config_value(self):
+ setattr(self.conf, 'volume_backend_name', 'huawei_storage')
setattr(self.conf, 'san_address',
['http://100.115.10.69:8082/deviceManager/rest/'])
setattr(self.conf, 'san_user', 'admin')
'TargetPortGroup': 'portgroup-test', }
setattr(self.conf, 'iscsi_info', [iscsi_info])
+ targets = [{'target_device_id': 'huawei-replica-1',
+ 'managed_backend_name': 'ubuntu@huawei2#OpenStack_Pool',
+ 'san_address':
+ 'https://100.97.10.69:8088/deviceManager/rest/',
+ 'san_user': 'admin',
+ 'san_password': 'Admin@storage1'}]
+ setattr(self.conf, 'replication_device', targets)
+
+ setattr(self.conf, 'safe_get', self.safe_get)
+
class FakeClient(rest_client.RestClient):
return volumes
+class FakeReplicaPairManager(replication.ReplicaPairManager):
+ def _init_rmt_client(self):
+ self.rmt_client = FakeClient(self.conf)
+
+
class FakeISCSIStorage(huawei_driver.HuaweiISCSIDriver):
"""Fake Huawei Storage, Rewrite some methods of HuaweiISCSIDriver."""
self.rmt_client,
self.configuration,
self.db)
+ self.replica = FakeReplicaPairManager(self.client, self.configuration)
class FakeFCStorage(huawei_driver.HuaweiFCDriver):
self.rmt_client,
self.configuration,
self.db)
+ self.replica = FakeReplicaPairManager(self.client, self.configuration)
@ddt.ddt
self.assertTrue(delete_flag)
def test_create_volume_from_snapsuccess(self):
- lun_info = self.driver.create_volume_from_snapshot(test_volume,
- test_volume)
- self.assertEqual('1', lun_info['provider_location'])
+ self.mock_object(
+ huawei_driver.HuaweiBaseDriver,
+ '_get_volume_type',
+ mock.Mock(return_value={'extra_specs': sync_replica_specs}))
+ self.mock_object(replication.ReplicaCommonDriver, 'sync')
+ model_update = self.driver.create_volume_from_snapshot(test_volume,
+ test_volume)
+ self.assertEqual('1', model_update['provider_location'])
+
+ driver_data = {'pair_id': TEST_PAIR_ID,
+ 'rmt_lun_id': '1'}
+ driver_data = replication.to_string(driver_data)
+ self.assertEqual(driver_data, model_update['replication_driver_data'])
+ self.assertEqual('enabled', model_update['replication_status'])
def test_initialize_connection_success(self):
iscsi_properties = self.driver.initialize_connection(test_volume,
def test_get_volume_status(self):
data = self.driver.get_volume_stats()
- self.assertEqual('2.0.3', data['driver_version'])
+ self.assertEqual('2.0.5', data['driver_version'])
def test_extend_volume(self):
self.driver.unmanage_snapshot(test_snapshot)
self.assertEqual(1, mock_rename.call_count)
+ def test_init_rmt_client(self):
+ self.mock_object(rest_client, 'RestClient',
+ mock.Mock(return_value=None))
+ replica = replication.ReplicaPairManager(self.driver.client,
+ self.configuration)
+ self.assertEqual(replica.rmt_pool, 'OpenStack_Pool')
+ self.assertEqual(replica.target_dev_id, 'huawei-replica-1')
+
+ @ddt.data(sync_replica_specs, async_replica_specs)
+ def test_create_replication_success(self, mock_type):
+ self.mock_object(replication.ReplicaCommonDriver, 'sync')
+ self.mock_object(
+ huawei_driver.HuaweiBaseDriver,
+ '_get_volume_type',
+ mock.Mock(return_value={'extra_specs': mock_type}))
+
+ model_update = self.driver.create_volume(replication_volume)
+ driver_data = {'pair_id': TEST_PAIR_ID,
+ 'rmt_lun_id': '1'}
+ driver_data = replication.to_string(driver_data)
+ self.assertEqual(driver_data, model_update['replication_driver_data'])
+ self.assertEqual('enabled', model_update['replication_status'])
+
+ @ddt.data(
+ [
+ rest_client.RestClient,
+ 'get_array_info',
+ mock.Mock(
+ side_effect=exception.VolumeBackendAPIException(data='err'))
+ ],
+ [
+ rest_client.RestClient,
+ 'get_remote_devices',
+ mock.Mock(
+ side_effect=exception.VolumeBackendAPIException(data='err'))
+ ],
+ [
+ rest_client.RestClient,
+ 'get_remote_devices',
+ mock.Mock(return_value={})
+ ],
+ [
+ replication.ReplicaPairManager,
+ 'wait_volume_online',
+ mock.Mock(side_effect=[
+ None,
+ exception.VolumeBackendAPIException(data='err')])
+ ],
+ [
+ rest_client.RestClient,
+ 'create_pair',
+ mock.Mock(
+ side_effect=exception.VolumeBackendAPIException(data='err'))
+ ],
+ [
+ replication.ReplicaCommonDriver,
+ 'sync',
+ mock.Mock(
+ side_effect=exception.VolumeBackendAPIException(data='err'))
+ ],
+ )
+ @ddt.unpack
+ def test_create_replication_fail(self, mock_module, mock_func, mock_value):
+ self.mock_object(
+ huawei_driver.HuaweiBaseDriver,
+ '_get_volume_type',
+ mock.Mock(return_value={'extra_specs': sync_replica_specs}))
+ self.mock_object(replication.ReplicaPairManager, '_delete_pair')
+ self.mock_object(mock_module, mock_func, mock_value)
+ self.assertRaises(
+ exception.VolumeBackendAPIException,
+ self.driver.create_volume, replication_volume)
+
+ def test_delete_replication_success(self):
+ self.mock_object(replication.ReplicaCommonDriver, 'split')
+ self.mock_object(
+ huawei_driver.HuaweiBaseDriver,
+ '_get_volume_type',
+ mock.Mock(return_value={'extra_specs': sync_replica_specs}))
+ self.driver.delete_volume(replication_volume)
+
+ self.mock_object(rest_client.RestClient, 'check_lun_exist',
+ mock.Mock(return_value=False))
+ self.driver.delete_volume(replication_volume)
+
+ def test_wait_volume_online(self):
+ replica = FakeReplicaPairManager(self.driver.client,
+ self.configuration)
+ lun_info = {'ID': '11'}
+
+ replica.wait_volume_online(self.driver.client, lun_info)
+
+ offline_status = {'RUNNINGSTATUS': '28'}
+ replica.wait_volume_online(self.driver.client, lun_info)
+
+ with mock.patch.object(rest_client.RestClient, 'get_lun_info',
+ offline_status):
+ self.assertRaises(exception.VolumeBackendAPIException,
+ replica.wait_volume_online,
+ self.driver.client,
+ lun_info)
+
+ def test_wait_second_access(self):
+ pair_id = '1'
+ access_ro = constants.REPLICA_SECOND_RO
+ access_rw = constants.REPLICA_SECOND_RW
+ op = replication.PairOp(self.driver.client)
+ common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+ self.mock_object(replication.PairOp, 'get_replica_info',
+ mock.Mock(return_value={'SECRESACCESS': access_ro}))
+
+ common_driver.wait_second_access(pair_id, access_ro)
+ self.assertRaises(exception.VolumeBackendAPIException,
+ common_driver.wait_second_access, pair_id, access_rw)
+
+ def test_wait_replica_ready(self):
+ normal_status = {
+ 'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_NORMAL,
+ 'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
+ }
+ split_status = {
+ 'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_SPLIT,
+ 'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
+ }
+ sync_status = {
+ 'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_SYNC,
+ 'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
+ }
+ pair_id = '1'
+ op = replication.PairOp(self.driver.client)
+ common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+ with mock.patch.object(replication.PairOp, 'get_replica_info',
+ mock.Mock(return_value=normal_status)):
+ common_driver.wait_replica_ready(pair_id)
+
+ with mock.patch.object(
+ replication.PairOp,
+ 'get_replica_info',
+ mock.Mock(side_effect=[sync_status, normal_status])):
+ common_driver.wait_replica_ready(pair_id)
+
+ with mock.patch.object(replication.PairOp, 'get_replica_info',
+ mock.Mock(return_value=split_status)):
+ self.assertRaises(exception.VolumeBackendAPIException,
+ common_driver.wait_replica_ready, pair_id)
+
+ def test_replication_enable_success(self):
+ self.mock_object(replication.ReplicaCommonDriver, 'unprotect_second')
+ self.mock_object(replication.ReplicaCommonDriver, 'split')
+ self.mock_object(replication.PairOp, 'is_primary',
+ mock.Mock(side_effect=[False, True]))
+ self.driver.replication_enable(None, replication_volume)
+
+ @ddt.data(
+ [
+ replication.AbsReplicaOp,
+ 'is_running_status',
+ mock.Mock(return_value=False)
+ ],
+ [
+ replication,
+ 'get_replication_driver_data',
+ mock.Mock(return_value={})
+ ],
+ [
+ replication.PairOp,
+ 'get_replica_info',
+ mock.Mock(return_value={})
+ ],
+ )
+ @ddt.unpack
+ def test_replication_enable_fail(self, mock_module, mock_func, mock_value):
+ self.mock_object(mock_module, mock_func, mock_value)
+ self.assertRaises(
+ exception.VolumeBackendAPIException,
+ self.driver.replication_enable, None, replication_volume)
+
+ def test_replication_disable_fail(self):
+ self.assertRaises(
+ exception.VolumeBackendAPIException,
+ self.driver.replication_disable, None, replication_volume)
+
+ def test_replication_disable_success(self):
+ self.mock_object(replication.ReplicaCommonDriver, 'split')
+ self.driver.replication_disable(None, replication_volume)
+
+ self.mock_object(replication, 'get_replication_driver_data',
+ mock.Mock(return_value={}))
+ self.driver.replication_disable(None, replication_volume)
+
+ def test_replication_failover_success(self):
+ self.mock_object(replication.ReplicaCommonDriver, 'split')
+ self.mock_object(replication.PairOp, 'is_primary',
+ mock.Mock(return_value=False))
+ model_update = self.driver.replication_failover(
+ None, replication_volume, None)
+ self.assertEqual('ubuntu@huawei2#OpenStack_Pool', model_update['host'])
+ self.assertEqual('1', model_update['provider_location'])
+ driver_data = {'pair_id': TEST_PAIR_ID,
+ 'rmt_lun_id': '11'}
+ driver_data = replication.to_string(driver_data)
+ self.assertEqual(driver_data, model_update['replication_driver_data'])
+
+ @ddt.data(
+ [
+ replication.PairOp,
+ 'is_primary',
+ mock.Mock(return_value=True)
+ ],
+ [
+ replication.PairOp,
+ 'is_primary',
+ mock.Mock(return_value=False)
+ ],
+ [
+ replication,
+ 'get_replication_driver_data',
+ mock.Mock(return_value={})
+ ],
+ [
+ replication,
+ 'get_replication_driver_data',
+ mock.Mock(return_value={'pair_id': '1'})
+ ],
+ )
+ @ddt.unpack
+ def test_replication_failover_fail(self,
+ mock_module, mock_func, mock_value):
+ self.mock_object(
+ replication.ReplicaCommonDriver,
+ 'wait_second_access',
+ mock.Mock(
+ side_effect=exception.VolumeBackendAPIException(data="error")))
+ self.mock_object(mock_module, mock_func, mock_value)
+ self.assertRaises(
+ exception.VolumeBackendAPIException,
+ self.driver.replication_failover,
+ None,
+ replication_volume, None)
+
+ def test_list_replication_targets(self):
+ info = self.driver.list_replication_targets(None, replication_volume)
+ targets = [{'target_device_id': 'huawei-replica-1'}]
+ self.assertEqual(targets, info['targets'])
+
+ self.mock_object(replication, 'get_replication_driver_data',
+ mock.Mock(return_value={}))
+ info = self.driver.list_replication_targets(None, replication_volume)
+ self.assertEqual(targets, info['targets'])
+
+ @ddt.data(constants.REPLICA_SECOND_RW, constants.REPLICA_SECOND_RO)
+ def test_replication_protect_second(self, mock_access):
+ replica_id = TEST_PAIR_ID
+ op = replication.PairOp(self.driver.client)
+ common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+
+ self.mock_object(replication.ReplicaCommonDriver, 'wait_second_access')
+ self.mock_object(
+ replication.PairOp,
+ 'get_replica_info',
+ mock.Mock(return_value={'SECRESACCESS': mock_access}))
+
+ common_driver.protect_second(replica_id)
+ common_driver.unprotect_second(replica_id)
+
+ def test_replication_sync(self):
+ replica_id = TEST_PAIR_ID
+ op = replication.PairOp(self.driver.client)
+ common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+ async_normal_status = {
+ 'REPLICATIONMODEL': constants.REPLICA_ASYNC_MODEL,
+ 'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_NORMAL,
+ 'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
+ }
+
+ self.mock_object(replication.ReplicaCommonDriver, 'protect_second')
+ self.mock_object(replication.PairOp, 'get_replica_info',
+ mock.Mock(return_value=async_normal_status))
+ common_driver.sync(replica_id, True)
+ common_driver.sync(replica_id, False)
+
+ def test_replication_split(self):
+ replica_id = TEST_PAIR_ID
+ op = replication.PairOp(self.driver.client)
+ common_driver = replication.ReplicaCommonDriver(self.configuration, op)
+
+ self.mock_object(replication.ReplicaCommonDriver, 'wait_expect_state')
+ self.mock_object(replication.PairOp, 'split', mock.Mock(
+ side_effect=exception.VolumeBackendAPIException(data='err')))
+ common_driver.split(replica_id)
+
+ def test_replication_base_op(self):
+ replica_id = '1'
+ op = replication.AbsReplicaOp(None)
+ op.create()
+ op.delete(replica_id)
+ op.protect_second(replica_id)
+ op.unprotect_second(replica_id)
+ op.sync(replica_id)
+ op.split(replica_id)
+ op.switch(replica_id)
+ op.is_primary({})
+ op.get_replica_info(replica_id)
+ op._is_status(None, {'key': 'volue'}, None)
+
class FCSanLookupService(object):
self.assertTrue(self.driver.client.terminateFlag)
def test_get_volume_status(self):
+ remote_device_info = {"ARRAYTYPE": "1",
+ "HEALTHSTATUS": "1",
+ "RUNNINGSTATUS": "10"}
+ self.mock_object(
+ replication.ReplicaPairManager,
+ 'get_remote_device_by_wwn',
+ mock.Mock(return_value=remote_device_info))
+ data = self.driver.get_volume_stats()
+ self.assertEqual('2.0.5', data['driver_version'])
+ self.assertTrue(data['pools'][0]['replication_enabled'])
+ self.assertListEqual(['sync', 'async'],
+ data['pools'][0]['replication_type'])
+
+ self.mock_object(
+ replication.ReplicaPairManager,
+ 'get_remote_device_by_wwn',
+ mock.Mock(return_value={}))
+ data = self.driver.get_volume_stats()
+ self.assertNotIn('replication_enabled', data['pools'][0])
+ self.mock_object(
+ replication.ReplicaPairManager,
+ 'try_get_remote_wwn',
+ mock.Mock(return_value={}))
data = self.driver.get_volume_stats()
- self.assertEqual('2.0.4', data['driver_version'])
+ self.assertEqual('2.0.5', data['driver_version'])
+ self.assertNotIn('replication_enabled', data['pools'][0])
def test_extend_volume(self):
self.driver.extend_volume(test_volume, 3)
test_new_type, None, test_host)
self.assertTrue(retype)
+ @mock.patch.object(rest_client.RestClient, 'add_lun_to_partition')
+ @mock.patch.object(
+ huawei_driver.HuaweiBaseDriver,
+ '_get_volume_type',
+ return_value={'extra_specs': sync_replica_specs})
+ def test_retype_replication_volume_success(self, mock_get_type,
+ mock_add_lun_to_partition):
+ retype = self.driver.retype(None, test_volume,
+ test_new_replication_type, None, test_host)
+ self.assertTrue(retype)
+
def test_retype_volume_cache_fail(self):
self.driver.client.cache_not_exist = True
'MAXBANDWidth', 'LATENCY', 'IOTYPE']
MAX_LUN_NUM_IN_QOS = 64
HYPERMETRO_CLASS = "cinder.volume.drivers.huawei.hypermetro.HuaweiHyperMetro"
+
+DEFAULT_REPLICA_WAIT_INTERVAL = 1
+DEFAULT_REPLICA_WAIT_TIMEOUT = 10
+
+REPLICA_SYNC_MODEL = '1'
+REPLICA_ASYNC_MODEL = '2'
+REPLICA_SPEED = '2'
+REPLICA_PERIOD = '3600'
+REPLICA_SECOND_RO = '2'
+REPLICA_SECOND_RW = '3'
+
+REPLICA_RUNNING_STATUS_KEY = 'RUNNINGSTATUS'
+REPLICA_RUNNING_STATUS_INITIAL_SYNC = '21'
+REPLICA_RUNNING_STATUS_SYNC = '23'
+REPLICA_RUNNING_STATUS_SYNCED = '24'
+REPLICA_RUNNING_STATUS_NORMAL = '1'
+REPLICA_RUNNING_STATUS_SPLIT = '26'
+REPLICA_RUNNING_STATUS_INVALID = '35'
+
+REPLICA_HEALTH_STATUS_KEY = 'HEALTHSTATUS'
+REPLICA_HEALTH_STATUS_NORMAL = '1'
+
+REPLICA_LOCAL_DATA_STATUS_KEY = 'PRIRESDATASTATUS'
+REPLICA_REMOTE_DATA_STATUS_KEY = 'SECRESDATASTATUS'
+REPLICA_DATA_SYNC_KEY = 'ISDATASYNC'
+REPLICA_DATA_STATUS_SYNCED = '1'
+REPLICA_DATA_STATUS_COMPLETE = '2'
+REPLICA_DATA_STATUS_INCOMPLETE = '3'
else:
msg = (_("Invalid lun type %s is configured.") % lun_type)
LOG.exception(msg)
-
raise exception.InvalidInput(reason=msg)
setattr(self.conf, 'lun_type', lun_type)
from cinder.volume.drivers.huawei import huawei_conf
from cinder.volume.drivers.huawei import huawei_utils
from cinder.volume.drivers.huawei import hypermetro
+from cinder.volume.drivers.huawei import replication
from cinder.volume.drivers.huawei import rest_client
from cinder.volume.drivers.huawei import smartx
from cinder.volume import utils as volume_utils
metro_san_password)
self.rmt_client.login()
+ # init replication manager
+ self.replica = replication.ReplicaPairManager(self.client,
+ self.configuration)
+
def check_for_setup_error(self):
pass
self.huawei_conf.update_config_value()
if self.metro_flag:
self.rmt_client.get_all_pools()
- return self.client.update_volume_stats()
+ stats = self.client.update_volume_stats()
+ stats = self.replica.update_replica_capability(stats)
+ return stats
def _get_volume_type(self, volume):
volume_type = None
'thin_provisioning_support': False,
'thick_provisioning_support': False,
'hypermetro': False,
+ 'replication_enabled': False,
+ 'replication_type': 'async',
}
opts_value = {
opts_associate,
specs)
opts = smartx.SmartX().get_smartx_specs_opts(opts)
+ opts = replication.get_replication_opts(opts)
LOG.debug('volume opts %(opts)s.', {'opts': opts})
return opts
if ((not scope or scope == 'capabilities')
and key in opts_capability):
words = value.split()
- if not (words and len(words) == 2 and words[0] == '<is>'):
+ if words and len(words) == 2 and words[0] in ('<is>', '<in>'):
+ opts[key] = words[1].lower()
+ elif key == 'replication_type':
LOG.error(_LE("Extra specs must be specified as "
- "capabilities:%s='<is> True' or "
- "'<is> true'."), key)
+ "replication_type='<in> sync' or "
+ "'<in> async'."))
else:
- opts[key] = words[1].lower()
+ LOG.error(_LE("Extra specs must be specified as "
+ "capabilities:%s='<is> True'."), key)
if ((scope in opts_capability)
and (key in opts_value)
'TYPE': '11',
'NAME': huawei_utils.encode_name(volume['id']),
'PARENTTYPE': '216',
- 'PARENTID': self.client.get_pool_id(volume, pool_name),
+ 'PARENTID': self.client.get_pool_id(pool_name),
'DESCRIPTION': volume['name'],
'ALLOCTYPE': opts.get('LUNType', self.configuration.lun_type),
'CAPACITY': huawei_utils.get_volume_size(volume),
model_update['metadata'] = metadata
return lun_info, model_update
- def create_volume(self, volume):
- """Create a volume."""
- volume_type = self._get_volume_type(volume)
- opts = self._get_volume_params(volume_type)
+ def _create_base_type_volume(self, opts, volume, volume_type):
+ """Create volume and add some base type.
+
+ Base type is the services won't conflict with the other service.
+ """
lun_params = self._get_lun_params(volume, opts)
lun_info, model_update = self._create_volume(volume, lun_params)
lun_id = lun_info['ID']
msg = _('Create volume error. Because %s.') % six.text_type(err)
raise exception.VolumeBackendAPIException(data=msg)
- if (opts.get('hypermetro') and opts.get('hypermetro') == 'true'):
+ return lun_params, lun_info, model_update
+
+ def _add_extend_type_to_volume(self, opts, lun_params, lun_info,
+ model_update):
+ """Add the extend type.
+
+ Extend type is the services may conflict with LUNCopy.
+ So add it after the those services.
+ """
+ lun_id = lun_info['ID']
+ if opts.get('hypermetro') == 'true':
metro = hypermetro.HuaweiHyperMetro(self.client,
self.rmt_client,
self.configuration,
self._delete_lun_with_check(lun_id)
raise
+ if opts.get('replication_enabled') == 'true':
+ replica_model = opts.get('replication_type')
+ try:
+ replica_info = self.replica.create_replica(lun_info,
+ replica_model)
+ model_update.update(replica_info)
+ except Exception as err:
+ LOG.exception(_LE('Create replication volume error.'))
+ self._delete_lun_with_check(lun_id)
+ raise
+
+ return model_update
+
+ def create_volume(self, volume):
+ """Create a volume."""
+ volume_type = self._get_volume_type(volume)
+ opts = self._get_volume_params(volume_type)
+ if (opts.get('hypermetro') == 'true'
+ and opts.get('replication_enabled') == 'true'):
+ err_msg = _("Hypermetro and Replication can not be "
+ "used in the same volume_type.")
+ LOG.error(err_msg)
+ raise exception.VolumeBackendAPIException(data=err_msg)
+
+ lun_params, lun_info, model_update = (
+ self._create_base_type_volume(opts, volume, volume_type))
+
+ model_update = self._add_extend_type_to_volume(opts, lun_params,
+ lun_info, model_update)
return model_update
def _delete_volume(self, volume):
lun_id = volume.get('provider_location')
if not lun_id or not self.client.check_lun_exist(lun_id):
LOG.warning(_LW("Can't find lun %s on the array."), lun_id)
- return False
+ return
qos_id = self.client.get_qosid_by_lunid(lun_id)
if qos_id:
self._delete_volume(volume)
raise
+ # Delete a replication volume
+ replica_data = volume.get('replication_driver_data')
+ if replica_data:
+ try:
+ self.replica.delete_replica(volume)
+ except exception.VolumeBackendAPIException as err:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(_LE("Delete replication error."))
+ self._delete_volume(volume)
+
self._delete_volume(volume)
def _delete_lun_with_check(self, lun_id):
def migrate_volume(self, ctxt, volume, host, new_type=None):
"""Migrate a volume within the same array."""
+
+ # NOTE(jlc): Replication volume can't migrate. But retype
+ # can remove replication relationship first then do migrate.
+ # So don't add this judgement into _check_migration_valid().
+ volume_type = self._get_volume_type(volume)
+ opts = self._get_volume_params(volume_type)
+ if opts.get('replication_enabled') == 'true':
+ return (False, None)
+
return self._migrate_volume(volume, host, new_type)
def _check_migration_valid(self, host, volume):
We use LUNcopy to copy a new volume from snapshot.
The time needed increases as volume size does.
"""
+ volume_type = self._get_volume_type(volume)
+ opts = self._get_volume_params(volume_type)
+ if (opts.get('hypermetro') == 'true'
+ and opts.get('replication_enabled') == 'true'):
+ err_msg = _("Hypermetro and Replication can not be "
+ "used in the same volume_type.")
+ LOG.error(err_msg)
+ raise exception.VolumeBackendAPIException(data=err_msg)
+
snapshotname = huawei_utils.encode_name(snapshot['id'])
snapshot_id = snapshot.get('provider_location')
if snapshot_id is None:
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
- model_update = self.create_volume(volume)
+ lun_params, lun_info, model_update = (
+ self._create_base_type_volume(opts, volume, volume_type))
+
tgt_lun_id = model_update['provider_location']
luncopy_name = huawei_utils.encode_name(volume['id'])
LOG.info(_LI(
self._copy_volume(volume, luncopy_name,
snapshot_id, tgt_lun_id)
+ # NOTE(jlc): Actually, we just only support replication here right
+ # now, not hypermetro.
+ model_update = self._add_extend_type_to_volume(opts, lun_params,
+ lun_info, model_update)
return model_update
def create_cloned_volume(self, volume, src_vref):
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
+ volume_type = self._get_volume_type(volume)
+ opts = self._get_volume_params(volume_type)
+ if opts.get('replication_enabled') == 'true':
+ msg = (_("Can't extend replication volume, volume: %(id)s") %
+ {"id": volume['id']})
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
old_size = huawei_utils.get_volume_size(volume)
new_size = int(new_size) * units.Gi / 512
volume_name = huawei_utils.encode_name(volume['id'])
migration, change_opts, lun_id = self.determine_changes_when_retype(
volume, new_type, host)
+ model_update = {}
+ replica_enabled_change = change_opts.get('replication_enabled')
+ replica_type_change = change_opts.get('replication_type')
+ if replica_enabled_change and replica_enabled_change[0] == 'true':
+ try:
+ self.replica.delete_replica(volume)
+ model_update.update({'replication_status': 'disabled',
+ 'replication_driver_data': None})
+ except exception.VolumeBackendAPIException:
+ LOG.exception(_LE('Retype volume error. '
+ 'Delete replication failed.'))
+ return False
+
try:
if migration:
LOG.debug("Begin to migrate LUN(id: %(lun_id)s) with "
"change %(change_opts)s.",
{"lun_id": lun_id, "change_opts": change_opts})
- if self._migrate_volume(volume, host, new_type):
- return True
- else:
+ if not self._migrate_volume(volume, host, new_type):
LOG.warning(_LW("Storage-assisted migration failed during "
"retype."))
return False
else:
# Modify lun to change policy
self.modify_lun(lun_id, change_opts)
- return True
except exception.VolumeBackendAPIException:
LOG.exception(_LE('Retype volume error.'))
return False
+ if replica_enabled_change and replica_enabled_change[1] == 'true':
+ try:
+ # If replica_enabled_change is not None, the
+ # replica_type_change won't be None. See function
+ # determine_changes_when_retype.
+ lun_info = self.client.get_lun_info(lun_id)
+ replica_info = self.replica.create_replica(
+ lun_info, replica_type_change[1])
+ model_update.update(replica_info)
+ except exception.VolumeBackendAPIException:
+ LOG.exception(_LE('Retype volume error. '
+ 'Create replication failed.'))
+ return False
+
+ return (True, model_update)
+
def modify_lun(self, lun_id, change_opts):
if change_opts.get('partitionid'):
old, new = change_opts['partitionid']
migration = True
change_opts['LUNType'] = (old_opts['LUNType'], new_opts['LUNType'])
+ volume_type = self._get_volume_type(volume)
+ volume_opts = self._get_volume_params(volume_type)
+ if (volume_opts['replication_enabled'] == 'true'
+ or new_opts['replication_enabled'] == 'true'):
+ # If replication_enabled changes,
+ # then replication_type in change_opts will be set.
+ change_opts['replication_enabled'] = (
+ volume_opts['replication_enabled'],
+ new_opts['replication_enabled'])
+
+ change_opts['replication_type'] = (volume_opts['replication_type'],
+ new_opts['replication_type'])
+
change_opts = self._check_needed_changes(lun_id, old_opts, new_opts,
change_opts, new_type)
# Check other stuffs to determine whether this LUN can be imported.
self._check_lun_valid_for_manage(lun_info, external_ref)
type_id = volume.get('volume_type_id')
+ new_opts = None
if type_id:
# Handle volume type if specified.
old_opts = self.get_lun_specs(lun_id)
self.client.rename_lun(lun_id, new_name, # pylint: disable=E1121
description)
- return {'provider_location': lun_id}
+ model_update = {}
+ model_update.update({'provider_location': lun_id})
+
+ if new_opts and new_opts.get('replication_enabled'):
+ LOG.debug("Manage volume need to create replication.")
+ try:
+ lun_info = self.client.get_lun_info(lun_id)
+ replica_info = self.replica.create_replica(
+ lun_info, new_opts.get('replication_type'))
+ model_update.update(replica_info)
+ except exception.VolumeBackendAPIException:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(_LE("Manage exist volume failed."))
+
+ return model_update
def _get_lun_info_by_ref(self, external_ref):
LOG.debug("Get external_ref: %s", external_ref)
{'snapshot_id': snapshot['id'],
'snapshot_name': snapshot_name})
+ def replication_enable(self, context, volume):
+ """Enable replication and do switch role when needed."""
+ self.replica.enable_replica(volume)
+
+ def replication_disable(self, context, volume):
+ """Disable replication."""
+ self.replica.disable_replica(volume)
+
+ def replication_failover(self, context, volume, secondary):
+ """Disable replication and unprotect remote LUN."""
+ return self.replica.failover_replica(volume)
+
+ def list_replication_targets(self, context, vref):
+ """Obtain volume repliction targets."""
+ return self.replica.list_replica_targets(vref)
+
+ def get_replication_updates(self, context):
+ # NOTE(jlc): The manager does not do aynthing with these updates.
+ # When that is changed, here must be modified as well.
+ return []
+
class HuaweiISCSIDriver(HuaweiBaseDriver, driver.ISCSIDriver):
"""ISCSI driver for Huawei storage arrays.
2.0.1 - Manage/unmanage volume support
2.0.2 - Refactor HuaweiISCSIDriver
2.0.3 - Manage/unmanage snapshot support
+ 2.0.5 - Replication V2 support
"""
- VERSION = "2.0.3"
+ VERSION = "2.0.5"
def __init__(self, *args, **kwargs):
super(HuaweiISCSIDriver, self).__init__(*args, **kwargs)
2.0.2 - Refactor HuaweiFCDriver
2.0.3 - Manage/unmanage snapshot support
2.0.4 - Balanced FC port selection
+ 2.0.5 - Replication V2 support
"""
- VERSION = "2.0.4"
+ VERSION = "2.0.5"
def __init__(self, *args, **kwargs):
super(HuaweiFCDriver, self).__init__(*args, **kwargs)
if not wwns_in_host and not iqns_in_host:
self.client.remove_host(host_id)
- msg = (_('Can not add FC initiator to host.'))
+ msg = _('Can not add FC initiator to host.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
--- /dev/null
+# Copyright (c) 2016 Huawei Technologies Co., Ltd.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import json
+import re
+
+from oslo_log import log as logging
+from oslo_utils import excutils
+
+from cinder import exception
+from cinder.i18n import _, _LW, _LE
+from cinder.volume.drivers.huawei import constants
+from cinder.volume.drivers.huawei import huawei_utils
+from cinder.volume.drivers.huawei import rest_client
+from cinder.volume import utils as volume_utils
+
+LOG = logging.getLogger(__name__)
+
+
+class AbsReplicaOp(object):
+ def __init__(self, client):
+ self.client = client
+
+ def create(self, **kwargs):
+ pass
+
+ def delete(self, replica_id):
+ pass
+
+ def protect_second(self, replica_id):
+ pass
+
+ def unprotect_second(self, replica_id):
+ pass
+
+ def sync(self, replica_id):
+ pass
+
+ def split(self, replica_id):
+ pass
+
+ def switch(self, replica_id):
+ pass
+
+ def is_primary(self, replica_info):
+ flag = replica_info.get('ISPRIMARY')
+ if flag and flag.lower() == 'true':
+ return True
+ return False
+
+ def get_replica_info(self, replica_id):
+ return {}
+
+ def _is_status(self, status_key, status, replica_info):
+ if type(status) in (list, tuple):
+ return replica_info.get(status_key, '') in status
+ if type(status) is str:
+ return replica_info.get(status_key, '') == status
+
+ return False
+
+ def is_running_status(self, status, replica_info):
+ return self._is_status(constants.REPLICA_RUNNING_STATUS_KEY,
+ status, replica_info)
+
+ def is_health_status(self, status, replica_info):
+ return self._is_status(constants.REPLICA_HEALTH_STATUS_KEY,
+ status, replica_info)
+
+
+class PairOp(AbsReplicaOp):
+ def create(self, local_lun_id, rmt_lun_id, rmt_dev_id,
+ rmt_dev_name, replica_model,
+ speed=constants.REPLICA_SPEED,
+ period=constants.REPLICA_PERIOD,
+ **kwargs):
+ super(PairOp, self).create(**kwargs)
+
+ params = {
+ "LOCALRESID": local_lun_id,
+ "LOCALRESTYPE": '11',
+ "REMOTEDEVICEID": rmt_dev_id,
+ "REMOTEDEVICENAME": rmt_dev_name,
+ "REMOTERESID": rmt_lun_id,
+ "REPLICATIONMODEL": replica_model,
+ # recovery policy. 1: auto, 2: manual
+ "RECOVERYPOLICY": '2',
+ "SPEED": speed,
+ }
+
+ if replica_model == constants.REPLICA_ASYNC_MODEL:
+ # Synchronize type values:
+ # 1, manual
+ # 2, timed wait when synchronization begins
+ # 3, timed wait when synchronization ends
+ params['SYNCHRONIZETYPE'] = '2'
+ params['TIMINGVAL'] = period
+
+ try:
+ pair_info = self.client.create_pair(params)
+ except Exception as err:
+ msg = _('Create replication pair failed. Error: %s.') % err
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ return pair_info
+
+ def split(self, pair_id):
+ self.client.split_pair(pair_id)
+
+ def delete(self, pair_id, force=False):
+ self.client.delete_pair(pair_id, force)
+
+ def protect_second(self, pair_id):
+ self.client.set_pair_second_access(pair_id,
+ constants.REPLICA_SECOND_RO)
+
+ def unprotect_second(self, pair_id):
+ self.client.set_pair_second_access(pair_id,
+ constants.REPLICA_SECOND_RW)
+
+ def sync(self, pair_id):
+ self.client.sync_pair(pair_id)
+
+ def switch(self, pair_id):
+ self.client.switch_pair(pair_id)
+
+ def get_replica_info(self, pair_id):
+ return self.client.get_pair_by_id(pair_id)
+
+
+class CGOp(AbsReplicaOp):
+ pass
+
+
+class ReplicaCommonDriver(object):
+ def __init__(self, conf, replica_op):
+ self.conf = conf
+ self.op = replica_op
+
+ def protect_second(self, replica_id):
+ info = self.op.get_replica_info(replica_id)
+ if info.get('SECRESACCESS') == constants.REPLICA_SECOND_RO:
+ return
+
+ self.op.protect_second(replica_id)
+ self.wait_second_access(replica_id, constants.REPLICA_SECOND_RO)
+
+ def unprotect_second(self, replica_id):
+ info = self.op.get_replica_info(replica_id)
+ if info.get('SECRESACCESS') == constants.REPLICA_SECOND_RW:
+ return
+
+ self.op.unprotect_second(replica_id)
+ self.wait_second_access(replica_id, constants.REPLICA_SECOND_RW)
+
+ def sync(self, replica_id, wait_complete=False):
+ self.protect_second(replica_id)
+
+ expect_status = (constants.REPLICA_RUNNING_STATUS_NORMAL,
+ constants.REPLICA_RUNNING_STATUS_SYNC,
+ constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
+ info = self.op.get_replica_info(replica_id)
+
+ # When running status is synchronizing or normal,
+ # it's not necessary to do synchronize again.
+ if (info.get('REPLICATIONMODEL') == constants.REPLICA_SYNC_MODEL
+ and self.op.is_running_status(expect_status, info)):
+ return
+
+ self.op.sync(replica_id)
+ self.wait_expect_state(replica_id, expect_status)
+
+ if wait_complete:
+ self.wait_replica_ready(replica_id)
+
+ def split(self, replica_id):
+ running_status = (constants.REPLICA_RUNNING_STATUS_SPLIT,
+ constants.REPLICA_RUNNING_STATUS_INVALID)
+ info = self.op.get_replica_info(replica_id)
+ if self.op.is_running_status(running_status, info):
+ return
+
+ try:
+ self.op.split(replica_id)
+ except Exception as err:
+ LOG.warning(_LW('Split replication exception: %s.'), err)
+
+ try:
+ self.wait_expect_state(replica_id, running_status)
+ except Exception as err:
+ msg = _('Split replication failed.')
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ def enable(self, replica_id, wait_sync_complete=False):
+ info = self.op.get_replica_info(replica_id)
+ if not self.op.is_primary(info):
+ self.switch(replica_id)
+ self.sync(replica_id)
+ return None
+
+ def disable(self, replica_id):
+ self.split(replica_id)
+ return None
+
+ def switch(self, replica_id):
+ self.split(replica_id)
+ self.unprotect_second(replica_id)
+ self.op.switch(replica_id)
+
+ # Wait to be primary
+ def _wait_switch_to_primary():
+ info = self.op.get_replica_info(replica_id)
+ if self.op.is_primary(info):
+ return True
+ return False
+
+ interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
+ timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
+ huawei_utils.wait_for_condition(_wait_switch_to_primary,
+ interval,
+ timeout)
+
+ def failover(self, replica_id):
+ """Failover replication.
+
+ Purpose:
+ 1. Split replication.
+ 2. Set secondary access read & write.
+ """
+ info = self.op.get_replica_info(replica_id)
+ if self.op.is_primary(info):
+ msg = _('We should not do switch over on primary array.')
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ sync_status_set = (constants.REPLICA_RUNNING_STATUS_SYNC,
+ constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
+ if self.op.is_running_status(sync_status_set, info):
+ self.wait_replica_ready(replica_id)
+
+ self.split(replica_id)
+ self.op.unprotect_second(replica_id)
+
+ def wait_replica_ready(self, replica_id, interval=None, timeout=None):
+ LOG.debug('Wait synchronize complete.')
+ running_status_normal = (constants.REPLICA_RUNNING_STATUS_NORMAL,
+ constants.REPLICA_RUNNING_STATUS_SYNCED)
+ running_status_sync = (constants.REPLICA_RUNNING_STATUS_SYNC,
+ constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
+ health_status_normal = constants.REPLICA_HEALTH_STATUS_NORMAL
+
+ def _replica_ready():
+ info = self.op.get_replica_info(replica_id)
+ if (self.op.is_running_status(running_status_normal, info)
+ and self.op.is_health_status(health_status_normal, info)):
+ return True
+
+ if not self.op.is_running_status(running_status_sync, info):
+ msg = (_('Wait synchronize failed. Running status: %s.') %
+ info.get(constants.REPLICA_RUNNING_STATUS_KEY))
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ return False
+
+ if not interval:
+ interval = constants.DEFAULT_WAIT_INTERVAL
+ if not timeout:
+ timeout = constants.DEFAULT_WAIT_TIMEOUT
+
+ huawei_utils.wait_for_condition(_replica_ready,
+ interval,
+ timeout)
+
+ def wait_second_access(self, replica_id, access_level):
+ def _check_access():
+ info = self.op.get_replica_info(replica_id)
+ if info.get('SECRESACCESS') == access_level:
+ return True
+ return False
+
+ interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
+ timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
+ huawei_utils.wait_for_condition(_check_access,
+ interval,
+ timeout)
+
+ def wait_expect_state(self, replica_id,
+ running_status, health_status=None,
+ interval=None, timeout=None):
+ def _check_state():
+ info = self.op.get_replica_info(replica_id)
+ if self.op.is_running_status(running_status, info):
+ if (not health_status
+ or self.op.is_health_status(health_status, info)):
+ return True
+ return False
+
+ if not interval:
+ interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
+ if not timeout:
+ timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
+
+ huawei_utils.wait_for_condition(_check_state, interval, timeout)
+
+
+def get_replication_driver_data(volume):
+ if volume.get('replication_driver_data'):
+ return json.loads(volume['replication_driver_data'])
+ return {}
+
+
+def to_string(dict_data):
+ if dict_data:
+ return json.dumps(dict_data)
+ return ''
+
+
+class ReplicaPairManager(object):
+ def __init__(self, local_client, conf):
+ self.local_client = local_client
+ self.conf = conf
+ self.replica_device = self.conf.safe_get('replication_device')
+ if not self.replica_device:
+ return
+
+ # managed_backed_name format: host_name@backend_name#pool_name
+ self.rmt_backend = self.replica_device[0]['managed_backend_name']
+ self.rmt_pool = volume_utils.extract_host(self.rmt_backend,
+ level='pool')
+ self.target_dev_id = self.replica_device[0]['target_device_id']
+
+ self._init_rmt_client()
+ self.local_op = PairOp(self.local_client)
+ self.local_driver = ReplicaCommonDriver(self.conf, self.local_op)
+ self.rmt_op = PairOp(self.rmt_client)
+ self.rmt_driver = ReplicaCommonDriver(self.conf, self.rmt_op)
+
+ self.try_login_remote_array()
+
+ def try_login_remote_array(self):
+ try:
+ self.rmt_client.login()
+ except Exception as err:
+ LOG.warning(_LW('Remote array login failed. Error: %s.'), err)
+
+ def try_get_remote_wwn(self):
+ try:
+ info = self.rmt_client.get_array_info()
+ return info.get('wwn')
+ except Exception as err:
+ LOG.warning(_LW('Get remote array wwn failed. Error: %s.'), err)
+ return None
+
+ def get_remote_device_by_wwn(self, wwn):
+ devices = {}
+ try:
+ devices = self.local_client.get_remote_devices()
+ except Exception as err:
+ LOG.warning(_LW('Get remote devices failed. Error: %s.'), err)
+
+ for device in devices:
+ if device.get('WWN') == wwn:
+ return device
+
+ return {}
+
+ def check_remote_available(self):
+ if not self.replica_device:
+ return False
+
+ # We get device wwn in every check time.
+ # If remote array changed, we can run normally.
+ wwn = self.try_get_remote_wwn()
+ if not wwn:
+ return False
+
+ device = self.get_remote_device_by_wwn(wwn)
+ # Check remote device is available to use.
+ # If array type is replication, 'ARRAYTYPE' == '1'.
+ # If health status is normal, 'HEALTHSTATUS' == '1'.
+ if (device and device.get('ARRAYTYPE') == '1'
+ and device.get('HEALTHSTATUS') == '1'
+ and device.get('RUNNINGSTATUS') == constants.STATUS_RUNNING):
+ return True
+
+ return False
+
+ def update_replica_capability(self, stats):
+ is_rmt_dev_available = self.check_remote_available()
+ if not is_rmt_dev_available:
+ if self.replica_device:
+ LOG.warning(_LW('Remote device is unavailable. '
+ 'Remote backend: %s.'),
+ self.rmt_backend)
+ return stats
+
+ for pool in stats['pools']:
+ pool['replication_enabled'] = True
+ pool['replication_type'] = ['sync', 'async']
+
+ return stats
+
+ def _init_rmt_client(self):
+ # Multiple addresses support.
+ rmt_addrs = self.replica_device[0]['san_address'].split(';')
+ rmt_addrs = list(set([x.strip() for x in rmt_addrs if x.strip()]))
+ rmt_user = self.replica_device[0]['san_user']
+ rmt_password = self.replica_device[0]['san_password']
+ self.rmt_client = rest_client.RestClient(self.conf,
+ rmt_addrs,
+ rmt_user,
+ rmt_password)
+
+ def get_rmt_dev_info(self):
+ wwn = self.try_get_remote_wwn()
+ if not wwn:
+ return None, None
+
+ device = self.get_remote_device_by_wwn(wwn)
+ if not device:
+ return None, None
+
+ return device.get('ID'), device.get('NAME')
+
+ def build_rmt_lun_params(self, local_lun_info):
+ params = {
+ 'TYPE': '11',
+ 'NAME': local_lun_info['NAME'],
+ 'PARENTTYPE': '216',
+ 'PARENTID': self.rmt_client.get_pool_id(self.rmt_pool),
+ 'DESCRIPTION': local_lun_info['DESCRIPTION'],
+ 'ALLOCTYPE': local_lun_info['ALLOCTYPE'],
+ 'CAPACITY': local_lun_info['CAPACITY'],
+ 'WRITEPOLICY': self.conf.lun_write_type,
+ 'MIRRORPOLICY': self.conf.lun_mirror_switch,
+ 'PREFETCHPOLICY': self.conf.lun_prefetch_type,
+ 'PREFETCHVALUE': self.conf.lun_prefetch_value,
+ 'DATATRANSFERPOLICY': self.conf.lun_policy,
+ 'READCACHEPOLICY': self.conf.lun_read_cache_policy,
+ 'WRITECACHEPOLICY': self.conf.lun_write_cache_policy,
+ }
+
+ LOG.debug('Remote lun params: %s.', params)
+ return params
+
+ def wait_volume_online(self, client, lun_info,
+ interval=None, timeout=None):
+ online_status = constants.STATUS_VOLUME_READY
+ if lun_info.get('RUNNINGSTATUS') == online_status:
+ return
+
+ lun_id = lun_info['ID']
+
+ def _wait_online():
+ info = client.get_lun_info(lun_id)
+ return info.get('RUNNINGSTATUS') == online_status
+
+ if not interval:
+ interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
+ if not timeout:
+ timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
+
+ huawei_utils.wait_for_condition(_wait_online,
+ interval,
+ timeout)
+
+ def create_rmt_lun(self, local_lun_info):
+ # Create on rmt array. If failed, raise exception.
+ lun_params = self.build_rmt_lun_params(local_lun_info)
+ lun_info = self.rmt_client.create_lun(lun_params)
+ try:
+ self.wait_volume_online(self.rmt_client, lun_info)
+ except exception.VolumeBackendAPIException:
+ with excutils.save_and_reraise_exception():
+ self.rmt_client.delete_lun(lun_info['ID'])
+
+ return lun_info
+
+ def create_replica(self, local_lun_info, replica_model):
+ """Create remote LUN and replication pair.
+
+ Purpose:
+ 1. create remote lun
+ 2. create replication pair
+ 3. enable replication pair
+ """
+ LOG.debug(('Create replication, local lun info: %(info)s, '
+ 'replication model: %(model)s.'),
+ {'info': local_lun_info, 'model': replica_model})
+
+ local_lun_id = local_lun_info['ID']
+ self.wait_volume_online(self.local_client, local_lun_info)
+
+ # step1, create remote lun
+ rmt_lun_info = self.create_rmt_lun(local_lun_info)
+ rmt_lun_id = rmt_lun_info['ID']
+
+ # step2, get remote device info
+ rmt_dev_id, rmt_dev_name = self.get_rmt_dev_info()
+ if not rmt_lun_id or not rmt_dev_name:
+ self._delete_rmt_lun(rmt_lun_id)
+ msg = _('Get remote device info failed.')
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ # step3, create replication pair
+ try:
+ pair_info = self.local_op.create(local_lun_id,
+ rmt_lun_id, rmt_dev_id,
+ rmt_dev_name, replica_model)
+ pair_id = pair_info['ID']
+ except Exception as err:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE('Create pair failed. Error: %s.'), err)
+ self._delete_rmt_lun(rmt_lun_id)
+
+ # step4, start sync manually. If replication type is sync,
+ # then wait for sync complete.
+ wait_complete = (replica_model == constants.REPLICA_SYNC_MODEL)
+ try:
+ self.local_driver.sync(pair_id, wait_complete)
+ except Exception as err:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE('Start synchronization failed. Error: %s.'), err)
+ self._delete_pair(pair_id)
+ self._delete_rmt_lun(rmt_lun_id)
+
+ model_update = {}
+ driver_data = {'pair_id': pair_id,
+ 'rmt_lun_id': rmt_lun_id}
+ model_update['replication_driver_data'] = to_string(driver_data)
+ model_update['replication_status'] = 'enabled'
+ LOG.debug('Create replication, return info: %s.', model_update)
+ return model_update
+
+ def _delete_pair(self, pair_id):
+ if (not pair_id
+ or not self.local_client.check_pair_exist(pair_id)):
+ return
+
+ self.local_driver.split(pair_id)
+ self.local_op.delete(pair_id)
+
+ def _delete_rmt_lun(self, lun_id):
+ if lun_id and self.rmt_client.check_lun_exist(lun_id):
+ self.rmt_client.delete_lun(lun_id)
+
+ def delete_replica(self, volume):
+ """Delete replication pair and remote lun.
+
+ Purpose:
+ 1. delete replication pair
+ 2. delete remote_lun
+ """
+ LOG.debug('Delete replication, volume: %s.', volume['id'])
+ info = get_replication_driver_data(volume)
+ pair_id = info.get('pair_id')
+ if pair_id:
+ self._delete_pair(pair_id)
+
+ # Delete remote_lun
+ rmt_lun_id = info.get('rmt_lun_id')
+ if rmt_lun_id:
+ self._delete_rmt_lun(rmt_lun_id)
+
+ def enable_replica(self, volume):
+ """Enable replication.
+
+ Purpose:
+ 1. If local backend's array is secondary, switch to primary
+ 2. Synchronize data
+ """
+ LOG.debug('Enable replication, volume: %s.', volume['id'])
+
+ info = get_replication_driver_data(volume)
+ pair_id = info.get('pair_id')
+ if not pair_id:
+ msg = _('No pair id in volume replication_driver_data.')
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ info = self.local_op.get_replica_info(pair_id)
+ if not info:
+ msg = _('Pair does not exist on array. Pair id: %s.') % pair_id
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ wait_sync_complete = False
+ if info.get('REPLICATIONMODEL') == constants.REPLICA_SYNC_MODEL:
+ wait_sync_complete = True
+
+ return self.local_driver.enable(pair_id, wait_sync_complete)
+
+ def disable_replica(self, volume):
+ """We consider that all abnormal states is disabled."""
+ LOG.debug('Disable replication, volume: %s.', volume['id'])
+
+ info = get_replication_driver_data(volume)
+ pair_id = info.get('pair_id')
+ if not pair_id:
+ LOG.warning(_LW('No pair id in volume replication_driver_data.'))
+ return None
+
+ return self.local_driver.disable(pair_id)
+
+ def failover_replica(self, volume):
+ """Just make the secondary available."""
+ LOG.debug('Failover replication, volume: %s.', volume['id'])
+
+ info = get_replication_driver_data(volume)
+ pair_id = info.get('pair_id')
+ if not pair_id:
+ msg = _('No pair id in volume replication_driver_data.')
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ rmt_lun_id = info.get('rmt_lun_id')
+ if not rmt_lun_id:
+ msg = _('No remote LUN id in volume replication_driver_data.')
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+
+ # Remote array must be available. So we can get the real pool info.
+ lun_info = self.rmt_client.get_lun_info(rmt_lun_id)
+ lun_wwn = lun_info.get('WWN')
+ lun_pool = lun_info.get('PARENTNAME')
+ new_backend = re.sub(r'(?<=#).*$', lun_pool, self.rmt_backend)
+
+ self.rmt_driver.failover(pair_id)
+
+ metadata = huawei_utils.get_volume_metadata(volume)
+ metadata.update({'lun_wwn': lun_wwn})
+
+ new_driver_data = {'pair_id': pair_id,
+ 'rmt_lun_id': volume['provider_location']}
+ new_driver_data = to_string(new_driver_data)
+ return {'host': new_backend,
+ 'provider_location': rmt_lun_id,
+ 'replication_driver_data': new_driver_data,
+ 'metadata': metadata}
+
+ def list_replica_targets(self, volume):
+ info = get_replication_driver_data(volume)
+ if not info:
+ LOG.warning(_LW('Replication driver data does not exist. '
+ 'Volume: %s'), volume['id'])
+
+ targets = [{'target_device_id': self.target_dev_id}]
+ return {'volume_id': volume['id'],
+ 'targets': targets}
+
+
+def get_replication_opts(opts):
+ if opts.get('replication_type') == 'sync':
+ opts['replication_type'] = constants.REPLICA_SYNC_MODEL
+ else:
+ opts['replication_type'] = constants.REPLICA_ASYNC_MODEL
+
+ return opts
return info
- def get_pool_id(self, volume, pool_name):
+ def get_pool_id(self, pool_name):
pools = self.get_all_pools()
pool_info = self.get_pool_info(pool_name, pools)
if not pool_info:
self._assert_rest_result(result, _('Add lun to cache error.'))
- def find_array_version(self):
+ def get_array_info(self):
url = "/system/"
result = self.call(url, None, "GET")
- self._assert_rest_result(result, _('Find array version error.'))
- return result['data']['PRODUCTVERSION']
+ self._assert_rest_result(result, _('Get array info error.'))
+ return result.get('data', None)
+
+ def find_array_version(self):
+ info = self.get_array_info()
+ return info.get('PRODUCTVERSION', None)
def remove_host(self, host_id):
url = "/host/%s" % host_id
for item in result.get('data', []):
wwns.append(item['WWN'])
return wwns
+
+ def get_remote_devices(self):
+ url = "/remote_device"
+ result = self.call(url, None, "GET")
+ self._assert_rest_result(result, _('Get remote devices error.'))
+ return result.get('data', [])
+
+ def create_pair(self, pair_params):
+ url = "/REPLICATIONPAIR"
+ result = self.call(url, pair_params, "POST")
+
+ msg = _('Create replication error.')
+ self._assert_rest_result(result, msg)
+ self._assert_data_in_result(result, msg)
+ return result['data']
+
+ def get_pair_by_id(self, pair_id):
+ url = "/REPLICATIONPAIR/" + pair_id
+ result = self.call(url, None, "GET")
+
+ msg = _('Get pair failed.')
+ self._assert_rest_result(result, msg)
+ return result.get('data', {})
+
+ def switch_pair(self, pair_id):
+ url = '/REPLICATIONPAIR/switch'
+ data = {"ID": pair_id,
+ "TYPE": "263"}
+ result = self.call(url, data, "PUT")
+
+ msg = _('Switch over pair error.')
+ self._assert_rest_result(result, msg)
+
+ def split_pair(self, pair_id):
+ url = '/REPLICATIONPAIR/split'
+ data = {"ID": pair_id,
+ "TYPE": "263"}
+ result = self.call(url, data, "PUT")
+
+ msg = _('Split pair error.')
+ self._assert_rest_result(result, msg)
+
+ def delete_pair(self, pair_id, force=False):
+ url = "/REPLICATIONPAIR/" + pair_id
+ data = None
+ if force:
+ data = {"ISLOCALDELETE": force}
+
+ result = self.call(url, data, "DELETE")
+
+ msg = _('delete_replication error.')
+ self._assert_rest_result(result, msg)
+
+ def sync_pair(self, pair_id):
+ url = "/REPLICATIONPAIR/sync"
+ data = {"ID": pair_id,
+ "TYPE": "263"}
+ result = self.call(url, data, "PUT")
+
+ msg = _('Sync pair error.')
+ self._assert_rest_result(result, msg)
+
+ def check_pair_exist(self, pair_id):
+ url = "/REPLICATIONPAIR/" + pair_id
+ result = self.call(url, None, "GET")
+ return result['error']['code'] == 0
+
+ def set_pair_second_access(self, pair_id, access):
+ url = "/REPLICATIONPAIR/" + pair_id
+ data = {"ID": pair_id,
+ "SECRESACCESS": access}
+ result = self.call(url, data, "PUT")
+
+ msg = _('Set pair secondary access error.')
+ self._assert_rest_result(result, msg)
--- /dev/null
+features:
+ - Added Replication V2 support for Huawei drivers.