from cinder import exception
from cinder import test
+from cinder.tests.unit import utils
from cinder.volume import configuration as conf
from cinder.volume.drivers.huawei import constants
from cinder.volume.drivers.huawei import fc_zone_helper
from cinder.volume.drivers.huawei import huawei_conf
from cinder.volume.drivers.huawei import huawei_driver
+from cinder.volume.drivers.huawei import huawei_utils
from cinder.volume.drivers.huawei import hypermetro
from cinder.volume.drivers.huawei import replication
from cinder.volume.drivers.huawei import rest_client
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
self.mock_object(replication.PairOp, 'get_replica_info',
mock.Mock(return_value={'SECRESACCESS': access_ro}))
+ self.mock_object(huawei_utils.time, 'time', mock.Mock(
+ side_effect = utils.generate_timeout_series(
+ constants.DEFAULT_REPLICA_WAIT_TIMEOUT)))
common_driver.wait_second_access(pair_id, access_ro)
self.assertRaises(exception.VolumeBackendAPIException,
common_driver.wait_second_access, pair_id, access_rw)
+ @mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
+ new=utils.ZeroIntervalLoopingCall)
def test_wait_replica_ready(self):
normal_status = {
'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_NORMAL,
@ddt.unpack
def test_replication_enable_fail(self, mock_module, mock_func, mock_value):
self.mock_object(mock_module, mock_func, mock_value)
+ self.mock_object(huawei_utils.time, 'time', mock.Mock(
+ side_effect = utils.generate_timeout_series(
+ constants.DEFAULT_REPLICA_WAIT_TIMEOUT)))
+
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.replication_enable, None, replication_volume)
def test_replication_disable_fail(self):
+ self.mock_object(huawei_utils.time, 'time', mock.Mock(
+ side_effect = utils.generate_timeout_series(
+ constants.DEFAULT_REPLICA_WAIT_TIMEOUT)))
+
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.replication_disable, None, replication_volume)
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data="error")))
self.mock_object(mock_module, mock_func, mock_value)
+ self.mock_object(huawei_utils.time, 'time', mock.Mock(
+ side_effect = utils.generate_timeout_series(
+ constants.DEFAULT_REPLICA_WAIT_TIMEOUT)))
+
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.replication_failover,