"ENABLEINBANDCOMMAND":"true",
"ID":"1",
"INBANDLUNWWN":"",
- "TYPE":245
+ "TYPE":245,
+ "AVAILABLEHOSTLUNIDLIST": "[1]"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/mappingview'] = (
FAKE_GET_MAPPING_VIEW_RESPONSE)
+MAP_COMMAND_TO_FAKE_RESPONSE['/mappingview/PUT'] = (
+ FAKE_GET_MAPPING_VIEW_RESPONSE)
+
MAP_COMMAND_TO_FAKE_RESPONSE['/MAPPINGVIEW/1/GET'] = (
FAKE_GET_SPEC_MAPPING_VIEW_RESPONSE)
return json.loads(data)
-class FakeDB(object):
-
- def volume_update(self, context, volume_id, model_update):
- pass
-
- def volume_get_all_by_group(self, context, group_id):
- volumes = []
- volumes.append(test_volume)
- return volumes
-
-
class FakeReplicaPairManager(replication.ReplicaPairManager):
def _init_rmt_client(self):
self.rmt_client = FakeClient(self.conf)
def __init__(self, configuration):
self.configuration = configuration
- self.db = FakeDB()
self.huawei_conf = FakeHuaweiConf(self.configuration, 'iSCSI')
def do_setup(self):
self.rmt_client = FakeClient(configuration=self.configuration)
self.metro = hypermetro.HuaweiHyperMetro(self.client,
self.rmt_client,
- self.configuration,
- self.db)
+ self.configuration)
self.replica = FakeReplicaPairManager(self.client, self.configuration)
def __init__(self, configuration):
self.configuration = configuration
self.fcsan = None
- self.db = FakeDB()
self.huawei_conf = FakeHuaweiConf(self.configuration, 'iSCSI')
def do_setup(self):
self.rmt_client = FakeClient(configuration=self.configuration)
self.metro = hypermetro.HuaweiHyperMetro(self.client,
self.rmt_client,
- self.configuration,
- self.db)
+ self.configuration)
self.replica = FakeReplicaPairManager(self.client, self.configuration)
FakeConnector)
self.assertEqual(1, iscsi_properties['data']['target_lun'])
+ def test_hypermetro_connection_success(self):
+ self.mock_object(rest_client.RestClient, 'find_array_version',
+ mock.Mock(return_value='V300R003C00'))
+ fc_properties = self.driver.initialize_connection(hyper_volume,
+ FakeConnector)
+ self.assertEqual(1, fc_properties['data']['target_lun'])
+
def test_terminate_connection_success(self):
self.driver.client.terminateFlag = True
self.driver.terminate_connection(test_volume, FakeConnector)
self.assertTrue(self.driver.client.terminateFlag)
+ def test_terminate_connection_hypermetro_in_metadata(self):
+ self.driver.terminate_connection(hyper_volume, FakeConnector)
+
def test_get_volume_status(self):
remote_device_info = {"ARRAYTYPE": "1",
"HEALTHSTATUS": "1",
if opts.get('hypermetro') == 'true':
metro = hypermetro.HuaweiHyperMetro(self.client,
self.rmt_client,
- self.configuration,
- self.db)
+ self.configuration)
try:
metro_info = metro.create_hypermetro(lun_id, lun_params)
model_update['metadata'].update(metro_info)
if 'hypermetro_id' in metadata:
metro = hypermetro.HuaweiHyperMetro(self.client,
self.rmt_client,
- self.configuration,
- self.db)
+ self.configuration)
try:
metro.delete_hypermetro(volume)
except exception.VolumeBackendAPIException as err:
LOG.info(_LI("initialize_connection, metadata is: %s."), metadata)
if 'hypermetro_id' in metadata:
hyperm = hypermetro.HuaweiHyperMetro(self.client,
+ self.rmt_client,
self.configuration)
rmt_fc_info = hyperm.connect_volume_fc(volume, connector)
if 'hypermetro_id' in metadata:
hyperm = hypermetro.HuaweiHyperMetro(self.client,
+ self.rmt_client,
self.configuration)
hyperm.disconnect_volume_fc(volume, connector)