import random
import re
import time
+import uuid
import mock
from oslo_concurrency import processutils
from cinder.tests.unit import utils as testutils
from cinder import utils
from cinder.volume import configuration as conf
+from import (
+ replication as storwize_rep)
from import storwize_svc_common
from import storwize_svc_fc
from import storwize_svc_iscsi
+ @mock.patch.object(storwize_rep.StorwizeSVCReplicationGlobalMirror,
+ 'create_relationship')
+ @mock.patch.object(storwize_rep.StorwizeSVCReplicationGlobalMirror,
+ 'extend_target_volume')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'delete_relationship')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'get_relationship_info')
+ def test_storwize_svc_extend_volume_replication(self,
+ get_relationship,
+ delete_relationship,
+ extend_target_volume,
+ create_relationship):
+ fake_target = mock.Mock()
+ rep_type = 'global'
+ self.driver.replications[rep_type] = (
+ self.driver.replication_factory(rep_type, fake_target))
+ volume = self._create_volume()
+ volume['replication_status'] = 'enabled'
+ fake_target_vol = 'vol-target-id'
+ get_relationship.return_value = {'aux_vdisk_name': fake_target_vol}
+ with mock.patch.object(
+ self.driver,
+ '_get_volume_replicated_type_mirror') as mirror_type:
+ mirror_type.return_value = 'global'
+ self.driver.extend_volume(volume, '13')
+ attrs = self.driver._helpers.get_vdisk_attributes(volume['name'])
+ vol_size = int(attrs['capacity']) / units.Gi
+ self.assertAlmostEqual(vol_size, 13)
+ delete_relationship.assert_called_once_with(volume)
+ extend_target_volume.assert_called_once_with(fake_target_vol,
+ 12)
+ create_relationship.assert_called_once_with(volume,
+ fake_target_vol)
+ self.driver.delete_volume(volume)
+ def test_storwize_svc_extend_volume_replication_failover(self):
+ volume = self._create_volume()
+ volume['replication_status'] = 'failed-over'
+ with mock.patch.object(
+ self.driver,
+ '_get_volume_replicated_type_mirror') as mirror_type:
+ mirror_type.return_value = 'global'
+ self.driver.extend_volume(volume, '13')
+ attrs = self.driver._helpers.get_vdisk_attributes(volume['name'])
+ vol_size = int(attrs['capacity']) / units.Gi
+ self.assertAlmostEqual(vol_size, 13)
+ self.driver.delete_volume(volume)
def _check_loc_info(self, capabilities, expected):
host = {'host': 'foo', 'capabilities': capabilities}
vol = {'name': 'test', 'id': 1, 'size': 1}
'HOST3', 9999, 511, True)
+class StorwizeSVCReplicationMirrorTestCase(test.TestCase):
+ rep_type = 'global'
+ mirror_class = storwize_rep.StorwizeSVCReplicationGlobalMirror
+ def setUp(self):
+ super(StorwizeSVCReplicationMirrorTestCase, self).setUp()
+ self.svc_driver = storwize_svc_iscsi.StorwizeSVCISCSIDriver(
+ configuration=conf.Configuration(None))
+ extra_spec_rep_type = '<in> ' + self.rep_type
+ fake_target = {"managed_backend_name": "second_host@sv2#sv2",
+ "replication_mode": self.rep_type,
+ "target_device_id": "svc_id_target",
+ "san_ip": "",
+ "san_login": "admin",
+ "san_password": "admin",
+ "pool_name": "cinder_target"}
+ self.fake_targets = [fake_target]
+ self.driver = self.mirror_class(self.svc_driver, fake_target,
+ storwize_svc_common.StorwizeHelpers)
+ self.svc_driver.configuration.set_override('replication_device',
+ self.fake_targets)
+ self.svc_driver._replication_targets = self.fake_targets
+ self.svc_driver._replication_enabled = True
+ self.svc_driver.replications[self.rep_type] = (
+ self.svc_driver.replication_factory(self.rep_type, fake_target))
+ self.ctxt = context.get_admin_context()
+ rand_id = six.text_type(uuid.uuid4())
+ self.volume = {'name': 'volume-%s' % rand_id,
+ 'size': 10, 'id': '%s' % rand_id,
+ 'volume_type_id': None,
+ 'mdisk_grp_name': 'openstack',
+ 'replication_status': 'disabled',
+ 'replication_extended_status': None,
+ 'volume_metadata': None}
+ spec = {'replication_enabled': '<is> True',
+ 'replication_type': extra_spec_rep_type}
+ type_ref = volume_types.create(self.ctxt, "replication", spec)
+ self.replication_type = volume_types.get_volume_type(self.ctxt,
+ type_ref['id'])
+ self.volume['volume_type_id'] = self.replication_type['id']
+ self.volume['volume_type'] = self.replication_type
+ def test_storwize_do_replication_setup(self):
+ self.svc_driver.configuration.set_override('san_ip', "")
+ self.svc_driver.configuration.set_override('replication_device',
+ self.fake_targets)
+ self.svc_driver._do_replication_setup()
+ def test_storwize_do_replication_setup_unmanaged(self):
+ fake_target = {"replication_mode": self.rep_type,
+ "target_device_id": "svc_id_target",
+ "san_ip": "",
+ "san_login": "admin",
+ "san_password": "admin",
+ "pool_name": "cinder_target"}
+ fake_targets = [fake_target]
+ self.svc_driver.configuration.set_override('san_ip', "")
+ self.svc_driver.configuration.set_override('replication_device',
+ fake_targets)
+ self.assertRaises(exception.InvalidConfigurationValue,
+ self.svc_driver._do_replication_setup)
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers, 'create_vdisk')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers, 'get_vdisk_params')
+ @mock.patch.object(context, 'get_admin_context')
+ @mock.patch.object(mirror_class, 'volume_replication_setup')
+ def test_storwize_create_volume_with_mirror_replication(self,
+ rep_setup,
+ ctx,
+ get_vdisk_params,
+ create_vdisk):
+ ctx.return_value = self.ctxt
+ get_vdisk_params.return_value = {'replication': None,
+ 'qos': None}
+ self.svc_driver.create_volume(self.volume)
+ rep_setup.assert_called_once_with(self.ctxt, self.volume)
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers, 'create_copy')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers, 'get_vdisk_params')
+ @mock.patch.object(context, 'get_admin_context')
+ @mock.patch.object(mirror_class, 'volume_replication_setup')
+ def test_storwize_create_volume_from_snap_with_mirror_replication(
+ self, rep_setup, ctx, get_vdisk_params, create_copy):
+ ctx.return_value = self.ctxt
+ get_vdisk_params.return_value = {'replication': None,
+ 'qos': None}
+ snapshot = {'id': 'snapshot-id',
+ 'name': 'snapshot-name',
+ 'volume_size': 10}
+ model_update = self.svc_driver.create_volume_from_snapshot(
+ self.volume, snapshot)
+ rep_setup.assert_called_once_with(self.ctxt, self.volume)
+ self.assertEqual({'replication_status': 'enabled'}, model_update)
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers, 'create_copy')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers, 'get_vdisk_params')
+ @mock.patch.object(context, 'get_admin_context')
+ @mock.patch.object(mirror_class, 'volume_replication_setup')
+ def test_storwize_clone_volume_with_mirror_replication(
+ self, rep_setup, ctx, get_vdisk_params, create_copy):
+ ctx.return_value = self.ctxt
+ get_vdisk_params.return_value = {'replication': None,
+ 'qos': None}
+ rand_id = six.text_type(random.randint(10000, 99999))
+ target_volume = {'name': 'test_volume%s' % rand_id,
+ 'size': 10, 'id': '%s' % rand_id,
+ 'volume_type_id': None,
+ 'mdisk_grp_name': 'openstack',
+ 'replication_status': 'disabled',
+ 'replication_extended_status': None,
+ 'volume_metadata': None}
+ target_volume['volume_type_id'] = self.replication_type['id']
+ target_volume['volume_type'] = self.replication_type
+ model_update = self.svc_driver.create_cloned_volume(
+ target_volume, self.volume)
+ rep_setup.assert_called_once_with(self.ctxt, target_volume)
+ self.assertEqual({'replication_status': 'enabled'}, model_update)
+ @mock.patch.object(mirror_class, 'replication_enable')
+ @mock.patch.object(mirror_class, 'volume_replication_setup')
+ def test_storwize_replication_enable(self, rep_setup,
+ replication_enable):
+ self.svc_driver.replication_enable(self.ctxt, self.volume)
+ replication_enable.assert_called_once_with(self.ctxt, self.volume)
+ @mock.patch.object(mirror_class,
+ 'replication_disable')
+ @mock.patch.object(mirror_class,
+ 'volume_replication_setup')
+ def test_storwize_replication_disable(self, rep_setup,
+ replication_disable):
+ self.svc_driver.replication_disable(self.ctxt, self.volume)
+ replication_disable.assert_called_once_with(self.ctxt, self.volume)
+ @mock.patch.object(mirror_class,
+ 'replication_failover')
+ @mock.patch.object(mirror_class,
+ 'volume_replication_setup')
+ def test_storwize_replication_failover(self, rep_setup,
+ replication_failover):
+ fake_secondary = 'svc_id_target'
+ self.svc_driver.replication_failover(self.ctxt, self.volume,
+ fake_secondary)
+ replication_failover.assert_called_once_with(self.ctxt, self.volume,
+ fake_secondary)
+ @mock.patch.object(mirror_class,
+ 'list_replication_targets')
+ def test_storwize_list_replication_targets(self, list_targets):
+ fake_targets = [{"managed_backend_name": "second_host@sv2#sv2",
+ "type": "managed",
+ "target_device_id": "svc_id_target",
+ "pool_name": "cinder_target"}]
+ list_targets.return_value = fake_targets
+ expected_resp = {'targets': fake_targets,
+ 'volume_id': self.volume['id']}
+ targets = self.svc_driver.list_replication_targets(self.ctxt,
+ self.volume)
+ list_targets.assert_called_once_with(self.ctxt, self.volume)
+ self.assertEqual(expected_resp, targets)
+ @mock.patch.object(mirror_class,
+ '_partnership_validate_create')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'get_system_info')
+ def test_establish_target_partnership(self, get_system_info,
+ partnership_validate_create):
+ source_system_name = 'source_vol'
+ target_system_name = 'target_vol'
+ self.svc_driver.configuration.set_override('san_ip',
+ "")
+ get_system_info.side_effect = [{'system_name': source_system_name},
+ {'system_name': target_system_name}]
+ self.driver.establish_target_partnership()
+ expected_calls = [,
+ 'target_vol', ''),
+ 'source_vol', '')]
+ partnership_validate_create.assert_has_calls(expected_calls)
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'create_relationship')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'get_system_info')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'create_vdisk')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'get_vdisk_params')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'get_vdisk_attributes')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'get_relationship_info')
+ def test_replication_enable(self, get_relationship_info,
+ get_vdisk_attributes,
+ get_vdisk_params,
+ create_vdisk,
+ get_system_info,
+ create_relationship):
+ fake_system = 'fake_system'
+ fake_params = mock.Mock()
+ get_relationship_info.return_value = None
+ get_vdisk_attributes.return_value = None
+ get_vdisk_params.return_value = fake_params
+ get_system_info.return_value = {'system_name': fake_system}
+ model_update = self.driver.replication_enable(self.ctxt,
+ self.volume)
+ get_relationship_info.assert_called_once_with(self.volume)
+ get_vdisk_attributes.assert_called_once_with(self.volume['name'])
+ create_vdisk.assert_called_once_with(self.volume['name'],
+ '10', 'gb', 'cinder_target',
+ fake_params)
+ create_relationship.assert_called_once_with(self.volume['name'],
+ self.volume['name'],
+ fake_system,
+ self.driver.asyncmirror)
+ self.assertEqual({'replication_status': 'enabled'}, model_update)
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'delete_vdisk')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'delete_relationship')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'get_relationship_info')
+ def test_replication_disable(self, get_relationship_info,
+ delete_relationship,
+ delete_vdisk):
+ fake_target_vol_name = 'fake_target_vol_name'
+ get_relationship_info.return_value = {'aux_vdisk_name':
+ fake_target_vol_name}
+ model_update = self.driver.replication_disable(self.ctxt,
+ self.volume)
+ delete_relationship.assert_called_once_with(self.volume['name'])
+ delete_vdisk.assert_called_once_with(fake_target_vol_name,
+ False)
+ self.assertEqual({'replication_status': 'disabled'}, model_update)
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'delete_relationship')
+ @mock.patch.object(storwize_svc_common.StorwizeHelpers,
+ 'get_relationship_info')
+ def test_replication_failover(self, get_relationship_info,
+ delete_relationship):
+ secondary = 'svc_id_target'
+ fake_id = '546582b2-bafb-43cc-b765-bd738ab148c8'
+ expected_model_update = {'host': 'second_host@sv2#sv2',
+ '_name_id': fake_id}
+ fake_name = 'volume-' + fake_id
+ get_relationship_info.return_value = {'aux_vdisk_name':
+ fake_name}
+ model_update = self.driver.replication_failover(self.ctxt,
+ self.volume,
+ secondary)
+ delete_relationship.assert_called_once_with(self.volume['name'])
+ self.assertEqual(expected_model_update, model_update)
+ def test_list_replication_targets(self):
+ fake_targets = [{'target_device_id': 'svc_id_target'}]
+ targets = self.driver.list_replication_targets(self.ctxt,
+ self.volume)
+ self.assertEqual(fake_targets, targets)
+class StorwizeSVCReplicationMetroMirrorTestCase(
+ StorwizeSVCReplicationMirrorTestCase):
+ rep_type = 'metro'
+ mirror_class = storwize_rep.StorwizeSVCReplicationMetroMirror
+ def setUp(self):
+ super(StorwizeSVCReplicationMetroMirrorTestCase, self).setUp()
# under the License.
+import random
+import uuid
+from eventlet import greenthread
+from oslo_concurrency import processutils
from oslo_log import log as logging
+from oslo_utils import excutils
+import six
from cinder import exception
-from cinder.i18n import _, _LI
+from cinder.i18n import _, _LE, _LI
+from cinder import ssh_utils
+from cinder import utils
from cinder.volume import volume_types
LOG = logging.getLogger(__name__)
class StorwizeSVCReplicationStretchedCluster(StorwizeSVCReplication):
- """Support for Storwize/SVC stretched cluster mode replication."""
+ """Support for Storwize/SVC stretched cluster mode replication.
- def __init__(self, driver):
+ This stretched cluster mode implements volume replication in terms of
+ adding a copy to an existing volume, which changes a nonmirrored volume
+ into a mirrored volume.
+ """
+ def __init__(self, driver, replication_target=None):
super(StorwizeSVCReplicationStretchedCluster, self).__init__(driver)
+ = replication_target or {}
def create_replica(self, ctxt, volume, vol_type = None):
# if vol_type is None, use the source volume type
data = {}
data['replication'] = True
return data
+class StorwizeSVCReplicationGlobalMirror(
+ StorwizeSVCReplicationStretchedCluster):
+ """Support for Storwize/SVC global mirror mode replication.
+ Global Mirror establishes a Global Mirror relationship between
+ two volumes of equal size. The volumes in a Global Mirror relationship
+ are referred to as the master (source) volume and the auxiliary
+ (target) volume. This mode is dedicated to the asynchronous volume
+ replication.
+ """
+ asyncmirror = True
+ UUID_LEN = 36
+ def __init__(self, driver, replication_target=None, target_helpers=None):
+ super(StorwizeSVCReplicationGlobalMirror, self).__init__(
+ driver, replication_target)
+ self.sshpool = None
+ self.target_helpers = target_helpers(self._run_ssh)
+ def _partnership_validate_create(self, client, remote_name, remote_ip):
+ try:
+ partnership_info = client.get_partnership_info(
+ remote_name)
+ if not partnership_info:
+ candidate_info = client.get_partnershipcandidate_info(
+ remote_name)
+ if not candidate_info:
+ client.mkippartnership(remote_ip)
+ else:
+ client.mkfcpartnership(remote_name)
+ elif partnership_info['partnership'] == (
+ 'fully_configured_stopped'):
+ client.startpartnership(partnership_info['id'])
+ except Exception:
+ msg = (_('Unable to establish the partnership with '
+ 'the Storwize cluster %s.'), remote_name)
+ LOG.error(msg)
+ raise exception.VolumeDriverException(message=msg)
+ def establish_target_partnership(self):
+ local_system_info = self.driver._helpers.get_system_info()
+ target_system_info = self.target_helpers.get_system_info()
+ local_system_name = local_system_info['system_name']
+ target_system_name = target_system_info['system_name']
+ local_ip = self.driver.configuration.safe_get('san_ip')
+ target_ip ='san_ip')
+ self._partnership_validate_create(self.driver._helpers,
+ target_system_name, target_ip)
+ self._partnership_validate_create(self.target_helpers,
+ local_system_name, local_ip)
+ def _run_ssh(self, cmd_list, check_exit_code=True, attempts=1):
+ utils.check_ssh_injection(cmd_list)
+ # TODO(vhou): We'll have a common method in ssh_utils to take
+ # care of this _run_ssh method.
+ command = ' '. join(cmd_list)
+ if not self.sshpool:
+ self.sshpool = ssh_utils.SSHPool(
+'san_ssh_port', 22),
+'ssh_conn_timeout', 30),
+'san_private_key', ''),
+'ssh_min_pool_conn', 1),
+'ssh_max_pool_conn', 5),)
+ last_exception = None
+ try:
+ with self.sshpool.item() as ssh:
+ while attempts > 0:
+ attempts -= 1
+ try:
+ return processutils.ssh_execute(
+ ssh, command, check_exit_code=check_exit_code)
+ except Exception as e:
+ LOG.error(six.text_type(e))
+ last_exception = e
+ greenthread.sleep(random.randint(20, 500) / 100.0)
+ try:
+ raise processutils.ProcessExecutionError(
+ exit_code=last_exception.exit_code,
+ stdout=last_exception.stdout,
+ stderr=last_exception.stderr,
+ cmd=last_exception.cmd)
+ except AttributeError:
+ raise processutils.ProcessExecutionError(
+ exit_code=-1, stdout="",
+ stderr="Error running SSH command",
+ cmd=command)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Error running SSH command: %s"), command)
+ def volume_replication_setup(self, context, vref):
+ target_vol_name = vref['name']
+ try:
+ attr = self.target_helpers.get_vdisk_attributes(target_vol_name)
+ if attr:
+ # If the volume name exists in the target pool, we need
+ # to change to a different target name.
+ vol_id = six.text_type(uuid.uuid4())
+ prefix = vref['name'][0:len(vref['name']) - len(vol_id)]
+ target_vol_name = prefix + vol_id
+ opts = self.driver._get_vdisk_params(vref['volume_type_id'])
+ pool ='pool_name')
+ self.target_helpers.create_vdisk(target_vol_name,
+ six.text_type(vref['size']),
+ 'gb', pool, opts)
+ system_info = self.target_helpers.get_system_info()
+ self.driver._helpers.create_relationship(
+ vref['name'], target_vol_name, system_info.get('system_name'),
+ self.asyncmirror)
+ except Exception as e:
+ msg = (_("Unable to set up mirror mode replication for %(vol)s. "
+ "Exception: %(err)s."), {'vol': vref['id'],
+ 'err': e})
+ LOG.error(msg)
+ raise exception.VolumeDriverException(message=msg)
+ def create_relationship(self, vref, target_vol_name):
+ if not target_vol_name:
+ return
+ try:
+ system_info = self.target_helpers.get_system_info()
+ self.driver._helpers.create_relationship(
+ vref['name'], target_vol_name, system_info.get('system_name'),
+ self.asyncmirror)
+ except Exception:
+ msg = (_("Unable to create the relationship for %s."),
+ vref['name'])
+ LOG.error(msg)
+ raise exception.VolumeDriverException(message=msg)
+ def extend_target_volume(self, target_vol_name, amount):
+ if not target_vol_name:
+ return
+ self.target_helpers.extend_vdisk(target_vol_name, amount)
+ def delete_target_volume(self, vref):
+ try:
+ rel_info = self.driver._helpers.get_relationship_info(vref)
+ except Exception as e:
+ msg = (_('Fail to get remote copy information for %(volume)s '
+ 'due to %(err)s.'), {'volume': vref['id'], 'err': e})
+ LOG.error(msg)
+ raise exception.VolumeDriverException(data=msg)
+ if rel_info and rel_info.get('aux_vdisk_name', None):
+ try:
+ self.driver._helpers.delete_relationship(vref['name'])
+ self.driver._helpers.delete_vdisk(
+ rel_info['aux_vdisk_name'], False)
+ except Exception as e:
+ msg = (_('Unable to delete the target volume for '
+ 'volume %(vol)s. Exception: %(err)s.'),
+ {'vol': vref['id'], 'err': e})
+ LOG.error(msg)
+ raise exception.VolumeDriverException(message=msg)
+ # #### Implementing V2 replication methods #### #
+ def replication_enable(self, context, vref):
+ try:
+ rel_info = self.driver._helpers.get_relationship_info(vref)
+ except Exception as e:
+ msg = (_('Fail to get remote copy information for %(volume)s '
+ 'due to %(err)s'), {'volume': vref['id'], 'err': e})
+ LOG.error(msg)
+ raise exception.VolumeDriverException(message=msg)
+ if not rel_info or not rel_info.get('aux_vdisk_name', None):
+ self.volume_replication_setup(context, vref)
+ model_update = {'replication_status': 'enabled'}
+ return model_update
+ def replication_disable(self, context, vref):
+ self.delete_target_volume(vref)
+ model_update = {'replication_status': 'disabled'}
+ return model_update
+ def replication_failover(self, context, vref, secondary):
+ if not or'target_device_id') != secondary:
+ msg = _LE("A valid secondary target MUST be specified in order "
+ "to failover.")
+ LOG.error(msg)
+ # If the admin does not provide a valid secondary, the failover
+ # will fail, but it is not severe enough to throw an exception.
+ # The admin can still issue another failover request. That is
+ # why we tentatively put return None instead of raising an
+ # exception.
+ return None
+ try:
+ rel_info = self.driver._helpers.get_relationship_info(vref)
+ target_vol_name = rel_info.get('aux_vdisk_name')
+ target_vol_id = target_vol_name[-self.UUID_LEN:]
+ if rel_info:
+ self.driver._helpers.delete_relationship(vref['name'])
+ if target_vol_id == vref['id']:
+ target_vol_id = None
+ except Exception:
+ msg = (_('Unable to failover the replication for volume %s.'),
+ vref['id'])
+ LOG.error(msg)
+ raise exception.VolumeDriverException(message=msg)
+ model_update = {'host':'managed_backend_name'),
+ '_name_id': target_vol_id}
+ return model_update
+ def list_replication_targets(self, context, vref):
+ # For the mode of global mirror, there is only one replication target.
+ return [{'target_device_id':'target_device_id')}]
+class StorwizeSVCReplicationMetroMirror(
+ StorwizeSVCReplicationGlobalMirror):
+ """Support for Storwize/SVC metro mirror mode replication.
+ Metro Mirror establishes a Metro Mirror relationship between
+ two volumes of equal size. The volumes in a Metro Mirror relationship
+ are referred to as the master (source) volume and the auxiliary
+ (target) volume.
+ """
+ asyncmirror = False
+ def __init__(self, driver, replication_target=None, target_helpers=None):
+ super(StorwizeSVCReplicationMetroMirror, self).__init__(
+ driver, replication_target, target_helpers)
import math
import random
import re
+import string
import time
import unicodedata
with excutils.save_and_reraise_exception():
LOG.error(_LE('Error mapping VDisk-to-host'))
+ def mkrcrelationship(self, master, aux, system, name, asyncmirror):
+ ssh_cmd = ['svctask', 'mkrcrelationship', '-master', master,
+ '-aux', aux, '-cluster', system, '-name', name]
+ if asyncmirror:
+ ssh_cmd.append('-global')
+ return self.run_ssh_check_created(ssh_cmd)
+ def rmrcrelationship(self, relationship):
+ ssh_cmd = ['svctask', 'rmrcrelationship', relationship]
+ self.run_ssh_assert_no_output(ssh_cmd)
+ def startrcrelationship(self, rc_rel, primary=None):
+ ssh_cmd = ['svctask', 'startrcrelationship', '-force']
+ if primary:
+ ssh_cmd.extend(['-primary', primary])
+ ssh_cmd.append(rc_rel)
+ self.run_ssh_assert_no_output(ssh_cmd)
+ def stoprcrelationship(self, relationship, access=False):
+ ssh_cmd = ['svctask', 'stoprcrelationship']
+ if access:
+ ssh_cmd.append('-access')
+ ssh_cmd.append(relationship)
+ self.run_ssh_assert_no_output(ssh_cmd)
+ def lsrcrelationship(self, volume_name):
+ key_value = 'name=%s' % volume_name
+ ssh_cmd = ['svcinfo', 'lsrcrelationship', '-filtervalue',
+ key_value, '-delim', '!']
+ return self.run_ssh_info(ssh_cmd, with_header=True)
+ def lspartnership(self, system_name):
+ key_value = 'name=%s' % system_name
+ ssh_cmd = ['svcinfo', 'lspartnership', '-filtervalue',
+ key_value, '-delim', '!']
+ return self.run_ssh_info(ssh_cmd, with_header=True)
+ def lspartnershipcandidate(self):
+ ssh_cmd = ['svcinfo', 'lspartnershipcandidate', '-delim', '!']
+ return self.run_ssh_info(ssh_cmd, with_header=True)
+ def mkippartnership(self, ip_v4, bandwith):
+ ssh_cmd = ['svctask', 'mkippartnership', '-type', 'ipv4',
+ '-clusterip', ip_v4, '-linkbandwidthmbits',
+ six.text_type(bandwith)]
+ return self.run_ssh_assert_no_output(ssh_cmd)
+ def mkfcpartnership(self, system_name, bandwith):
+ ssh_cmd = ['svctask', 'mkfcpartnership', '-linkbandwidthmbits',
+ six.text_type(bandwith), system_name]
+ return self.run_ssh_assert_no_output(ssh_cmd)
+ def startpartnership(self, partnership_id):
+ ssh_cmd = ['svctask', 'chpartnership', '-start', partnership_id]
+ return self.run_ssh_assert_no_output(ssh_cmd)
def rmvdiskhostmap(self, host, vdisk):
ssh_cmd = ['svctask', 'rmvdiskhostmap', '-host', '"%s"' % host, vdisk]
return ret
+ def start_relationship(self, volume_name, primary=None):
+ vol_attrs = self.get_vdisk_attributes(volume_name)
+ if vol_attrs['RC_name']:
+ self.ssh.startrcrelationship(vol_attrs['RC_name'], primary)
+ def stop_relationship(self, volume_name):
+ vol_attrs = self.get_vdisk_attributes(volume_name)
+ if vol_attrs['RC_name']:
+ self.ssh.stoprcrelationship(vol_attrs['RC_name'], access=True)
+ def create_relationship(self, master, aux, system, asyncmirror):
+ name = 'rcrel' + ''.join(random.sample(string.digits, 10))
+ try:
+ rc_id = self.ssh.mkrcrelationship(master, aux, system, name,
+ asyncmirror)
+ except exception.VolumeBackendAPIException as e:
+ # CMMVC5959E is the code in Stowize storage, meaning that
+ # there is a relationship that already has this name on the
+ # master cluster.
+ if 'CMMVC5959E' not in e:
+ # If there is no relation between the primary and the
+ # secondary back-end storage, the exception is raised.
+ raise
+ if rc_id:
+ self.start_relationship(master)
+ def delete_relationship(self, volume_name):
+ vol_attrs = self.get_vdisk_attributes(volume_name)
+ if vol_attrs['RC_name']:
+ self.ssh.stoprcrelationship(vol_attrs['RC_name'])
+ self.ssh.rmrcrelationship(vol_attrs['RC_name'])
+ vol_attrs = self.get_vdisk_attributes(volume_name)
+ def get_relationship_info(self, volume):
+ vol_attrs = self.get_vdisk_attributes(volume['name'])
+ if not vol_attrs or not vol_attrs['RC_name']:
+"Unable to get remote copy information for "
+ "volume %s"), volume['name'])
+ return
+ relationship = self.ssh.lsrcrelationship(vol_attrs['RC_name'])
+ return relationship[0] if len(relationship) > 0 else None
+ def get_partnership_info(self, system_name):
+ partnership = self.ssh.lspartnership(system_name)
+ return partnership[0] if len(partnership) > 0 else None
+ def get_partnershipcandidate_info(self, system_name):
+ candidates = self.ssh.lspartnershipcandidate()
+ for candidate in candidates:
+ if system_name == candidate['name']:
+ return candidate
+ return None
+ def mkippartnership(self, ip_v4, bandwith=1000):
+ self.ssh.mkippartnership(ip_v4, bandwith)
+ def mkfcpartnership(self, system_name, bandwith=1000):
+ self.ssh.mkfcpartnership(system_name, bandwith)
+ def startpartnership(self, partnership_id):
+ self.ssh.startpartnership(partnership_id)
def delete_vdisk(self, vdisk, force):
"""Ensures that vdisk is not part of FC mapping and deletes it."""
LOG.debug('Enter: delete_vdisk: vdisk %s.', vdisk)
1.3.3 - Update driver to use ABC metaclasses
2.0 - Code refactor, split init file and placed shared methods for
FC and iSCSI within the StorwizeSVCCommonDriver class
+ 2.1 - Added replication V2 support to the global/metro mirror
+ mode
- VERSION = "2.0"
+ VERSION = "2.1"
+ GLOBAL = 'global'
+ METRO = 'metro'
def __init__(self, *args, **kwargs):
super(StorwizeSVCCommonDriver, self).__init__(*args, **kwargs)
'system_id': None,
'code_level': None,
+ # Since there are three replication modes supported by Storwize,
+ # this dictionary is used to map the replication types to certain
+ # replications.
+ self.replications = {}
+ # One driver can be configured with multiple replication targets
+ # to failover.
+ self._replication_targets = []
+ # This boolean is used to indicate whether this driver is configured
+ # with replication.
+ self._replication_enabled = False
+ # This list is used to save the supported replication modes.
+ self._supported_replication_types = []
# Storwize has the limitation that can not burst more than 3 new ssh
# connections within 1 second. So slow down the initialization.
+ # v2 replication setup
+ self._do_replication_setup()
def check_for_setup_error(self):
"""Ensure that the flags are set properly."""
LOG.debug('enter: check_for_setup_error')
self._helpers.add_vdisk_qos(volume['name'], opts['qos'])
model_update = None
- if opts.get('replication'):
- ctxt = context.get_admin_context()
+ ctxt = context.get_admin_context()
+ rep_type = self._get_volume_replicated_type(ctxt, volume)
+ # The replication V2 has a higher priority than the replication V1.
+ # Check if V2 is available first, then check if V1 is available.
+ if rep_type:
+ self.replications.get(rep_type).volume_replication_setup(ctxt,
+ volume)
+ model_update = {'replication_status': 'enabled'}
+ elif opts.get('replication'):
model_update = self.replication.create_replica(ctxt, volume)
return model_update
def delete_volume(self, volume):
+ ctxt = context.get_admin_context()
+ rep_mirror_type = self._get_volume_replicated_type_mirror(ctxt,
+ volume)
+ rep_status = volume.get("replication_status", None)
+ if rep_mirror_type and rep_status != "failed-over":
+ self.replications.get(rep_mirror_type).delete_target_volume(
+ volume)
self._helpers.delete_vdisk(volume['name'], False)
if volume['id'] in self._vdiskcopyops:
if opts['qos']:
self._helpers.add_vdisk_qos(volume['name'], opts['qos'])
- if 'replication' in opts and opts['replication']:
- ctxt = context.get_admin_context()
+ ctxt = context.get_admin_context()
+ rep_type = self._get_volume_replicated_type(ctxt, volume)
+ # The replication V2 has a higher priority than the replication V1.
+ # Check if V2 is available first, then check if V1 is available.
+ if rep_type and self._replication_enabled:
+ self.replications.get(rep_type).volume_replication_setup(ctxt,
+ volume)
+ return {'replication_status': 'enabled'}
+ elif opts.get('replication'):
replica_status = self.replication.create_replica(ctxt, volume)
if replica_status:
return replica_status
if opts['qos']:
self._helpers.add_vdisk_qos(tgt_volume['name'], opts['qos'])
- if 'replication' in opts and opts['replication']:
- ctxt = context.get_admin_context()
+ ctxt = context.get_admin_context()
+ rep_type = self._get_volume_replicated_type(ctxt, tgt_volume)
+ # The replication V2 has a higher priority than the replication V1.
+ # Check if V2 is available first, then check if V1 is available.
+ if rep_type and self._replication_enabled:
+ self.replications.get(rep_type).volume_replication_setup(
+ ctxt, tgt_volume)
+ return {'replication_status': 'enabled'}
+ elif opts.get('replication'):
replica_status = self.replication.create_replica(ctxt, tgt_volume)
if replica_status:
return replica_status
raise exception.VolumeDriverException(message=msg)
extend_amt = int(new_size) - volume['size']
+ ctxt = context.get_admin_context()
+ rep_mirror_type = self._get_volume_replicated_type_mirror(ctxt,
+ volume)
+ rep_status = volume.get("replication_status", None)
+ target_vol_name = None
+ if rep_mirror_type and rep_status != "failed-over":
+ try:
+ rel_info = self._helpers.get_relationship_info(volume)
+ self._helpers.delete_relationship(volume)
+ except Exception as e:
+ msg = (_('Failed to get remote copy information for '
+ '%(volume)s. Exception: %(err)s.'), {'volume':
+ volume['id'],
+ 'err': e})
+ LOG.error(msg)
+ raise exception.VolumeDriverException(message=msg)
+ if rel_info:
+ target_vol_name = rel_info.get('aux_vdisk_name')
+ self.replications.get(rep_mirror_type).extend_target_volume(
+ target_vol_name, extend_amt)
self._helpers.extend_vdisk(volume['name'], extend_amt)
+ if rep_mirror_type and rep_status != "failed-over":
+ self.replications.get(rep_mirror_type).create_relationship(
+ volume, target_vol_name)
LOG.debug('leave: extend_volume: volume %s', volume['id'])
def add_vdisk_copy(self, volume, dest_pool, vol_type):
LOG.debug("Exit: update volume copy status.")
+ # #### V2 replication methods #### #
+ def replication_enable(self, context, vref):
+ """Enable replication on a replication capable volume."""
+ rep_type = self._validate_volume_rep_type(context, vref)
+ if rep_type not in self.replications:
+ msg = _("Driver does not support re-enabling replication for a "
+ "failed over volume.")
+ LOG.error(msg)
+ raise exception.ReplicationError(volume_id=vref['id'],
+ reason=msg)
+ return self.replications.get(rep_type).replication_enable(
+ context, vref)
+ def replication_disable(self, context, vref):
+ """Disable replication on a replication capable volume."""
+ rep_type = self._validate_volume_rep_type(context, vref)
+ return self.replications[rep_type].replication_disable(
+ context, vref)
+ def replication_failover(self, context, vref, secondary):
+ """Force failover to a secondary replication target."""
+ rep_type = self._validate_volume_rep_type(context, vref)
+ return self.replications[rep_type].replication_failover(
+ context, vref, secondary)
+ def list_replication_targets(self, context, vref):
+ """Return the list of replication targets for a volume."""
+ rep_type = self._validate_volume_rep_type(context, vref)
+ # When a volume is failed over, the secondary volume driver will not
+ # have replication configured, so in this case, gracefully handle
+ # request by returning no target volumes
+ if rep_type not in self.replications:
+ targets = []
+ else:
+ targets = self.replications[rep_type].list_replication_targets(
+ context, vref)
+ return {'volume_id': vref['id'],
+ 'targets': targets}
+ def _validate_volume_rep_type(self, ctxt, volume):
+ rep_type = self._get_volume_replicated_type(ctxt, volume)
+ if not rep_type:
+ msg = (_("Volume %s is not of replicated type. "
+ "This volume needs to be of a volume type "
+ "with the extra spec replication_enabled set "
+ "to '<is> True' to support replication "
+ "actions."), volume['id'])
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+ if not self._replication_enabled:
+ msg = _("The back-end where the volume is created "
+ "does not have replication enabled.")
+ LOG.error(msg)
+ raise exception.VolumeBackendAPIException(data=msg)
+ return rep_type
+ def _get_volume_replicated_type_mirror(self, ctxt, volume):
+ rep_type = self._get_volume_replicated_type(ctxt, volume)
+ if rep_type in self.VALID_REP_TYPES:
+ return rep_type
+ else:
+ return None
+ def _get_specs_replicated_type(self, volume_type):
+ replication_type = None
+ extra_specs = volume_type.get("extra_specs", {})
+ rep_val = extra_specs.get('replication_enabled')
+ if rep_val == "<is> True":
+ replication_type = extra_specs.get('replication_type',
+ self.GLOBAL)
+ # The format for replication_type in extra spec is in
+ # "<in> global". Otherwise, the code will
+ # not reach here.
+ if replication_type != self.GLOBAL:
+ # Pick up the replication type specified in the
+ # extra spec from the format like "<in> global".
+ replication_type = replication_type.split()[1]
+ if replication_type not in self.VALID_REP_TYPES:
+ replication_type = None
+ return replication_type
+ def _get_volume_replicated_type(self, ctxt, volume):
+ replication_type = None
+ if volume.get("volume_type_id"):
+ volume_type = volume_types.get_volume_type(
+ ctxt, volume["volume_type_id"])
+ replication_type = self._get_specs_replicated_type(volume_type)
+ return replication_type
+ def _do_replication_setup(self):
+ replication_devices = self.configuration.replication_device
+ if replication_devices:
+ replication_targets = []
+ for dev in replication_devices:
+ remote_array = {}
+ remote_array['managed_backend_name'] = (
+ dev.get('managed_backend_name'))
+ if not remote_array['managed_backend_name']:
+ raise exception.InvalidConfigurationValue(
+ option='managed_backend_name',
+ value=remote_array['managed_backend_name'])
+ rep_mode = dev.get('replication_mode')
+ remote_array['replication_mode'] = rep_mode
+ remote_array['san_ip'] = (
+ dev.get('san_ip'))
+ remote_array['target_device_id'] = (
+ dev.get('target_device_id'))
+ remote_array['san_login'] = (
+ dev.get('san_login'))
+ remote_array['san_password'] = (
+ dev.get('san_password'))
+ remote_array['pool_name'] = (
+ dev.get('pool_name'))
+ replication_targets.append(remote_array)
+ # Each replication type will have a coresponding replication.
+ self.create_replication_types(replication_targets)
+ if len(self._supported_replication_types) > 0:
+ self._replication_enabled = True
+ def create_replication_types(self, replication_targets):
+ for target in replication_targets:
+ rep_type = target['replication_mode']
+ if (rep_type in self.VALID_REP_TYPES
+ and rep_type not in self.replications.keys()):
+ replication = self.replication_factory(rep_type, target)
+ try:
+ replication.establish_target_partnership()
+ except exception.VolumeDriverException:
+ msg = (_LE('The replication mode of %(type)s has not '
+ 'successfully established partnership '
+ 'with the replica Storwize target %(stor)s.'),
+ {'type': rep_type,
+ 'stor': target['target_device_id']})
+ LOG.error(msg)
+ continue
+ self.replications[rep_type] = replication
+ self._replication_targets.append(target)
+ self._supported_replication_types.append(rep_type)
+ def replication_factory(self, replication_type, rep_target):
+ """Use replication methods for the requested mode."""
+ if replication_type == self.GLOBAL:
+ return storwize_rep.StorwizeSVCReplicationGlobalMirror(
+ self, rep_target, StorwizeHelpers)
+ if replication_type == self.METRO:
+ return storwize_rep.StorwizeSVCReplicationMetroMirror(
+ self, rep_target, StorwizeHelpers)
+ def get_replication_updates(self, context):
+ # TODO(vhou): the manager does not need to do anything so far.
+ replication_updates = []
+ return replication_updates
def migrate_volume(self, ctxt, volume, host):
"""Migrate directly if source and dest are managed by same storage.
{'sys_id': self._state['system_id'],
'pool': pool})
- if self.replication:
+ if self._replication_enabled:
+ data['replication_enabled'] = self._replication_enabled
+ data['replication_type'] = self._supported_replication_types
+ data['replication_count'] = len(self._replication_targets)
+ elif self.replication:
self._stats = data
--- /dev/null
+ - Adds managed v2 replication global and metro mirror modes support to the IBM Storwize driver.