from cinder import context
from cinder import exception
from cinder.openstack.common import excutils
+from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import processutils
from cinder import test
+from cinder.tests import utils as testutils
from cinder import units
from cinder import utils
from cinder.volume import configuration as conf
LOG = logging.getLogger(__name__)
-class StorwizeSVCFakeDB:
- def __init__(self):
- self.volume = None
-
- def volume_get(self, ctxt, vol_id):
- return self.volume
-
- def volume_set(self, vol):
- self.volume = vol
-
-
class StorwizeSVCManagementSimulator:
def __init__(self, pool_name):
self._flags = {'storwize_svc_volpool_name': pool_name}
self._connector = utils.brick_get_connector_properties()
self._reset_flags()
- self.driver.db = StorwizeSVCFakeDB()
+ self.ctxt = context.get_admin_context()
+ db_driver = self.driver.configuration.db_driver
+ self.db = importutils.import_module(db_driver)
+ self.driver.db = self.db
self.driver.do_setup(None)
self.driver.check_for_setup_error()
self.sleeppatch = mock.patch('eventlet.greenthread.sleep')
'volume_type_id': None,
'mdisk_grp_name': 'openstack'}
+ def _create_volume(self, **kwargs):
+ vol = testutils.create_volume(self.ctxt, **kwargs)
+ self.driver.create_volume(vol)
+ return vol
+
+ def _delete_volume(self, volume):
+ self.driver.delete_volume(volume)
+ self.db.volume_destroy(self.ctxt, volume['id'])
+
def _create_test_vol(self, opts):
- ctxt = context.get_admin_context()
+ ctxt = testutils.get_test_admin_context()
type_ref = volume_types.create(ctxt, 'testtype', opts)
volume = self._generate_vol_info(None, None)
type_id = type_ref['id']
return attrs
def test_storwize_svc_snapshots(self):
- vol1 = self._generate_vol_info(None, None)
- self.driver.create_volume(vol1)
- self.driver.db.volume_set(vol1)
+ vol1 = self._create_volume()
snap1 = self._generate_vol_info(vol1['name'], vol1['id'])
# Test timeout and volume cleanup
self.driver.delete_snapshot(snap1)
def test_storwize_svc_create_volfromsnap_clone(self):
- vol1 = self._generate_vol_info(None, None)
- self.driver.create_volume(vol1)
- self.driver.db.volume_set(vol1)
+ vol1 = self._create_volume()
snap1 = self._generate_vol_info(vol1['name'], vol1['id'])
self.driver.create_snapshot(snap1)
vol2 = self._generate_vol_info(None, None)
def test_storwize_svc_delete_volume_snapshots(self):
# Create a volume with two snapshots
- master = self._generate_vol_info(None, None)
- self.driver.create_volume(master)
- self.driver.db.volume_set(master)
+ master = self._create_volume()
# Fail creating a snapshot - will force delete the snapshot
if self.USESIM and False:
self.assertAlmostEqual(stats['free_capacity_gb'], 3287.5)
def test_storwize_svc_extend_volume(self):
- volume = self._generate_vol_info(None, None)
- self.driver.db.volume_set(volume)
- self.driver.create_volume(volume)
+ volume = self._create_volume()
self.driver.extend_volume(volume, '13')
attrs = self.driver._helpers.get_vdisk_attributes(volume['name'])
vol_size = int(attrs['capacity']) / units.GiB
cap = {'location_info': 'StorwizeSVCDriver:foo:bar'}
self._check_loc_info(cap, {'moved': False, 'model_update': None})
- def test_storwize_svc_migrate_same_extent_size(self):
+ def test_storwize_svc_volume_migrate(self):
# Make sure we don't call migrate_volume_vdiskcopy
- with mock.patch.object(self.driver._helpers,
- 'migrate_volume_vdiskcopy') as migr_vdiskcopy:
- migr_vdiskcopy.side_effect = KeyError
- self.driver.do_setup(None)
- loc = ('StorwizeSVCDriver:' + self.driver._state['system_id'] +
- ':openstack2')
- cap = {'location_info': loc, 'extent_size': '256'}
- host = {'host': 'foo', 'capabilities': cap}
- ctxt = context.get_admin_context()
- volume = self._generate_vol_info(None, None)
- volume['volume_type_id'] = None
- self.driver.create_volume(volume)
- self.driver.migrate_volume(ctxt, volume, host)
- self.driver.delete_volume(volume)
-
- def test_storwize_svc_migrate_diff_extent_size(self):
self.driver.do_setup(None)
loc = ('StorwizeSVCDriver:' + self.driver._state['system_id'] +
- ':openstack3')
- cap = {'location_info': loc, 'extent_size': '128'}
+ ':openstack2')
+ cap = {'location_info': loc, 'extent_size': '256'}
host = {'host': 'foo', 'capabilities': cap}
ctxt = context.get_admin_context()
- volume = self._generate_vol_info(None, None)
+ volume = self._create_volume()
volume['volume_type_id'] = None
- self.driver.create_volume(volume)
- self.assertNotEqual(cap['extent_size'],
- self.driver._state['extent_size'])
self.driver.migrate_volume(ctxt, volume, host)
- attrs = self.driver._helpers.get_vdisk_attributes(volume['name'])
- self.assertIn('openstack3', attrs['mdisk_grp_name'])
- self.driver.delete_volume(volume)
+ self._delete_volume(volume)
def test_storwize_svc_retype_no_copy(self):
self.driver.do_setup(None)
def test_set_storage_code_level_success(self):
res = self.driver._helpers.get_system_info()
- self.assertEqual((7, 2, 0, 0), res['code_level'],
- 'Get code level error')
+ if self.USESIM:
+ self.assertEqual((7, 2, 0, 0), res['code_level'],
+ 'Get code level error')
+
+ def test_storwize_vdisk_copy_ops(self):
+ ctxt = testutils.get_test_admin_context()
+ volume = self._create_volume()
+ driver = self.driver
+ dest_pool = self.driver.configuration.storwize_svc_volpool_name
+ new_ops = driver._helpers.add_vdisk_copy(volume['name'], dest_pool,
+ None, self.driver._state,
+ self.driver.configuration)
+ self.driver._add_vdisk_copy_op(ctxt, volume, new_ops)
+ admin_metadata = self.db.volume_admin_metadata_get(ctxt, volume['id'])
+ self.assertEqual(":".join(x for x in new_ops),
+ admin_metadata['vdiskcopyops'],
+ 'Storwize driver add vdisk copy error.')
+ self.driver._check_volume_copy_ops()
+ self.driver._rm_vdisk_copy_op(ctxt, volume, new_ops[0], new_ops[1])
+ admin_metadata = self.db.volume_admin_metadata_get(ctxt, volume['id'])
+ self.assertEqual(None, admin_metadata.get('vdiskcopyops', None),
+ 'Storwize driver delete vdisk copy error')
+ self._delete_volume(volume)
class CLIResponseTestCase(test.TestCase):
def __init__(self, execute=utils.execute, *args, **kwargs):
# NOTE(vish): db is set by Manager
self.db = kwargs.get('db')
+ self.host = kwargs.get('host')
self.configuration = kwargs.get('configuration', None)
if self.configuration:
self.configuration.append_config_values(volume_opts)
from cinder import exception
from cinder.openstack.common import excutils
from cinder.openstack.common import log as logging
+from cinder.openstack.common import loopingcall
from cinder import units
from cinder import utils
from cinder.volume.drivers.ibm.storwize_svc import helpers as storwize_helpers
1.2.3 - Fix Fibre Channel connectivity: bug #1279758 (add delim to
lsfabric, clear unused data from connections, ensure matching
WWPNs by comparing lower case
+ 1.2.4 - Fix bug #1278035 (async migration/retype)
"""
- VERSION = "1.2.3"
+ VERSION = "1.2.4"
+ VDISKCOPYOPS_INTERVAL = 600
def __init__(self, *args, **kwargs):
super(StorwizeSVCDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(storwize_svc_opts)
self._helpers = storwize_helpers.StorwizeHelpers(self._run_ssh)
+ self._vdiskcopyops = {}
+ self._vdiskcopyops_loop = None
self._state = {'storage_nodes': {},
'enabled_protocols': set(),
'compression_enabled': False,
'available_iogrps': [],
'system_name': None,
'system_id': None,
- 'extent_size': None,
'code_level': None,
}
# Validate that the pool exists
pool = self.configuration.storwize_svc_volpool_name
try:
- attributes = self._helpers.get_pool_attrs(pool)
+ self._helpers.get_pool_attrs(pool)
except exception.VolumeBackendAPIException:
msg = _('Failed getting details for pool %s') % pool
raise exception.InvalidInput(reason=msg)
- self._state['extent_size'] = attributes['extent_size']
# Check if compression is supported
self._state['compression_enabled'] = \
msg = _('do_setup: No configured nodes.')
LOG.error(msg)
raise exception.VolumeDriverException(message=msg)
+
+ # Build the list of in-progress vdisk copy operations
+ if ctxt is None:
+ admin_context = context.get_admin_context()
+ else:
+ admin_context = ctxt.elevated()
+ volumes = self.db.volume_get_all_by_host(admin_context, self.host)
+
+ for volume in volumes:
+ metadata = self.db.volume_admin_metadata_get(admin_context,
+ volume['id'])
+ curr_ops = metadata.get('vdiskcopyops', None)
+ if curr_ops:
+ ops = [tuple(x.split(':')) for x in curr_ops.split(';')]
+ self._vdiskcopyops[volume['id']] = ops
+
+ # if vdiskcopy exists in database, start the looping call
+ if len(self._vdiskcopyops) >= 1:
+ self._vdiskcopyops_loop = loopingcall.LoopingCall(
+ self._check_volume_copy_ops)
+ self._vdiskcopyops_loop.start(interval=self.VDISKCOPYOPS_INTERVAL)
+
LOG.debug(_('leave: do_setup'))
def check_for_setup_error(self):
if self._state['system_id'] is None:
exception_msg = (_('Unable to determine system id'))
raise exception.VolumeBackendAPIException(data=exception_msg)
- if self._state['extent_size'] is None:
- exception_msg = (_('Unable to determine pool extent size'))
- raise exception.VolumeBackendAPIException(data=exception_msg)
required_flags = ['san_ip', 'san_ssh_port', 'san_login',
'storwize_svc_volpool_name']
'conn': str(connector)})
vol_opts = self._get_vdisk_params(volume['volume_type_id'])
- host_name = connector['host']
volume_name = volume['name']
# Delete irrelevant connection information that later could result
def create_snapshot(self, snapshot):
ctxt = context.get_admin_context()
- source_vol = self.db.volume_get(ctxt, snapshot['volume_id'])
+ try:
+ source_vol = self.db.volume_get(ctxt, snapshot['volume_id'])
+ except Exception:
+ msg = (_('create_snapshot: get source volume failed.'))
+ LOG.error(msg)
+ raise exception.VolumeDriverException(message=msg)
opts = self._get_vdisk_params(source_vol['volume_type_id'])
self._helpers.create_copy(snapshot['volume_name'], snapshot['name'],
snapshot['volume_id'], self.configuration,
self._helpers.extend_vdisk(volume['name'], extend_amt)
LOG.debug(_('leave: extend_volume: volume %s') % volume['id'])
+ def _add_vdisk_copy_op(self, ctxt, volume, new_op):
+ metadata = self.db.volume_admin_metadata_get(ctxt.elevated(),
+ volume['id'])
+ curr_ops = metadata.get('vdiskcopyops', None)
+ if curr_ops:
+ curr_ops_list = [tuple(x.split(':')) for x in curr_ops.split(';')]
+ new_ops_list = curr_ops_list.append(new_op)
+ else:
+ new_ops_list = [new_op]
+ new_ops_str = ';'.join([':'.join(x) for x in new_ops_list])
+ self.db.volume_admin_metadata_update(ctxt.elevated(), volume['id'],
+ {'vdiskcopyops': new_ops_str},
+ False)
+ if volume['id'] in self._vdiskcopyops:
+ self._vdiskcopyops[volume['id']].append(new_op)
+ else:
+ self._vdiskcopyops[volume['id']] = [new_op]
+
+ # We added the first copy operation, so start the looping call
+ if len(self._vdiskcopyops) == 1:
+ self._vdiskcopyops_loop = loopingcall.LoopingCall(
+ self._check_volume_copy_ops)
+ self._vdiskcopyops_loop.start(interval=self.VDISKCOPYOPS_INTERVAL)
+
+ def _rm_vdisk_copy_op(self, ctxt, volume, orig_copy_id, new_copy_id):
+ try:
+ self._vdiskcopyops[volume['id']].remove((orig_copy_id,
+ new_copy_id))
+ if not len(self._vdiskcopyops[volume['id']]):
+ del self._vdiskcopyops[volume['id']]
+ if not len(self._vdiskcopyops):
+ self._vdiskcopyops_loop.stop()
+ self._vdiskcopyops_loop = None
+ except IndexError:
+ msg = (_('_rm_vdisk_copy_op: Volume %s does not have any '
+ 'registered vdisk copy operations.') % volume['id'])
+ LOG.error(msg)
+ return
+ except ValueError:
+ msg = (_('_rm_vdisk_copy_op: Volume %(vol)s does not have the '
+ 'specified vdisk copy operation: orig=%(orig)s '
+ 'new=%(new)s.')
+ % {'vol': volume['id'], 'orig': orig_copy_id,
+ 'new': new_copy_id})
+ LOG.error(msg)
+ return
+
+ metadata = self.db.volume_admin_metadata_get(ctxt.elevated(),
+ volume['id'])
+ curr_ops = metadata.get('vdiskcopyops', None)
+ if not curr_ops:
+ msg = (_('_rm_vdisk_copy_op: Volume metadata %s does not have any '
+ 'registered vdisk copy operations.') % volume['id'])
+ LOG.error(msg)
+ return
+ curr_ops_list = [tuple(x.split(':')) for x in curr_ops.split(';')]
+ try:
+ curr_ops_list.remove((orig_copy_id, new_copy_id))
+ except ValueError:
+ msg = (_('_rm_vdisk_copy_op: Volume %(vol)s metadata does not '
+ 'have the specified vdisk copy operation: orig=%(orig)s '
+ 'new=%(new)s.')
+ % {'vol': volume['id'], 'orig': orig_copy_id,
+ 'new': new_copy_id})
+ LOG.error(msg)
+ return
+
+ if len(curr_ops_list):
+ new_ops_str = ';'.join([':'.join(x) for x in curr_ops_list])
+ self.db.volume_admin_metadata_update(ctxt.elevated(), volume['id'],
+ {'vdiskcopyops': new_ops_str},
+ False)
+ else:
+ self.db.volume_admin_metadata_delete(ctxt.elevated(), volume['id'],
+ 'vdiskcopyops')
+
+ def _check_volume_copy_ops(self):
+ LOG.debug(_("enter: update volume copy status"))
+ ctxt = context.get_admin_context()
+ copy_items = self._vdiskcopyops.items()
+ for vol_id, copy_ops in copy_items:
+ volume = self.db.volume_get(ctxt, vol_id)
+ for copy_op in copy_ops:
+ try:
+ synced = self._helpers.is_vdisk_copy_synced(volume['name'],
+ copy_op[1])
+ except Exception:
+ msg = (_('_check_volume_copy_ops: Volume %(vol)s does not '
+ 'have the specified vdisk copy operation: '
+ 'orig=%(orig)s new=%(new)s.')
+ % {'vol': volume['id'], 'orig': copy_op[0],
+ 'new': copy_op[1]})
+ LOG.info(msg)
+ else:
+ if synced:
+ self._helpers.rm_vdisk_copy(volume['name'], copy_op[0])
+ self._rm_vdisk_copy_op(ctxt, volume, copy_op[0],
+ copy_op[1])
+ LOG.debug(_("exit: update volume copy status"))
+
def migrate_volume(self, ctxt, volume, host):
"""Migrate directly if source and dest are managed by same storage.
- The method uses the migratevdisk method, which returns almost
- immediately, if the source and target pools have the same extent_size.
- Otherwise, it uses addvdiskcopy and rmvdiskcopy, which require waiting
- for the copy operation to complete.
+ We create a new vdisk copy in the desired pool, and add the original
+ vdisk copy to the admin_metadata of the volume to be deleted. The
+ deletion will occur using a periodic task once the new copy is synced.
:param ctxt: Context
:param volume: A dictionary describing the volume to migrate
if dest_pool is None:
return false_ret
- if 'extent_size' not in host['capabilities']:
- return false_ret
- if host['capabilities']['extent_size'] == self._state['extent_size']:
- # If source and dest pools have the same extent size, migratevdisk
- self._helpers.migrate_vdisk(volume['name'], dest_pool)
+ ctxt = context.get_admin_context()
+ if volume['volume_type_id'] is not None:
+ volume_type_id = volume['volume_type_id']
+ vol_type = volume_types.get_volume_type(ctxt, volume_type_id)
else:
- # If source and dest pool extent size differ, add/delete vdisk copy
- ctxt = context.get_admin_context()
- if volume['volume_type_id'] is not None:
- volume_type_id = volume['volume_type_id']
- vol_type = volume_types.get_volume_type(ctxt, volume_type_id)
- else:
- vol_type = None
- self._helpers.migrate_volume_vdiskcopy(volume['name'], dest_pool,
- vol_type,
- self._state,
- self.configuration)
+ vol_type = None
+ new_op = self._helpers.add_vdisk_copy(volume['name'], dest_pool,
+ vol_type, self._state,
+ self.configuration)
+ self._add_vdisk_copy_op(ctxt, volume, new_op)
LOG.debug(_('leave: migrate_volume: id=%(id)s, host=%(host)s') %
{'id': volume['id'], 'host': host['host']})
return (True, None)
if dest_pool is None:
return False
- self._helpers.migrate_volume_vdiskcopy(volume['name'], dest_pool,
- new_type,
- self._state,
- self.configuration)
+ new_op = self._helpers.add_vdisk_copy(volume['name'], dest_pool,
+ new_type, self._state,
+ self.configuration)
+ self._add_vdisk_copy_op(ctxt, volume, new_op)
else:
self._helpers.change_vdisk_options(volume['name'], vdisk_changes,
new_opts, self._state)
units.GiB)
data['easytier_support'] = attributes['easy_tier'] in ['on', 'auto']
data['compression_support'] = self._state['compression_enabled']
- data['extent_size'] = self._state['extent_size']
data['location_info'] = ('StorwizeSVCDriver:%(sys_id)s:%(pool)s' %
{'sys_id': self._state['system_id'],
'pool': pool})
def extend_vdisk(self, vdisk, amount):
self.ssh.expandvdisksize(vdisk, amount)
- def migrate_volume_vdiskcopy(self, vdisk, dest_pool, volume_type,
- state, config):
- """Migrate a volume using addvdiskcopy and rmvdiskcopy.
-
- This will add a vdisk copy with the given volume type in the given
- pool, wait until it syncs, and delete the original copy.
- """
+ def add_vdisk_copy(self, vdisk, dest_pool, volume_type, state, config):
+ """Add a vdisk copy in the given pool."""
this_pool = config.storwize_svc_volpool_name
resp = self.ssh.lsvdiskcopy(vdisk)
orig_copy_id = None
orig_copy_id = copy_id
if orig_copy_id is None:
- msg = (_('migrate_volume started without a vdisk copy in the '
+ msg = (_('add_vdisk_copy started without a vdisk copy in the '
'expected pool.'))
LOG.error(msg)
raise exception.VolumeDriverException(message=msg)
volume_type=volume_type)
params = self._get_vdisk_create_params(opts)
new_copy_id = self.ssh.addvdiskcopy(vdisk, dest_pool, params)
+ return (orig_copy_id, new_copy_id)
- sync = False
- while not sync:
- sync = self.ssh.lsvdiskcopy(vdisk, copy_id=new_copy_id)[0]['sync']
- if sync == 'yes':
- sync = True
- else:
- greenthread.sleep(10)
-
- self.ssh.rmvdiskcopy(vdisk, orig_copy_id)
+ def is_vdisk_copy_synced(self, vdisk, copy_id):
+ sync = self.ssh.lsvdiskcopy(vdisk, copy_id=copy_id)[0]['sync']
+ if sync == 'yes':
+ return True
+ return False
- def migrate_vdisk(self, vdisk, dest_pool):
- self.ssh.migratevdisk(vdisk, dest_pool)
+ def rm_vdisk_copy(self, vdisk, copy_id):
+ self.ssh.rmvdiskcopy(vdisk, copy_id)
@staticmethod
def can_migrate_to_host(host, state):
'-unit', 'gb', vdisk])
self.run_ssh_assert_no_output(ssh_cmd)
- def migratevdisk(self, vdisk, dest_pool):
- ssh_cmd = ['svctask', 'migratevdisk', '-mdiskgrp', dest_pool,
- '-vdisk', vdisk]
- self.run_ssh_assert_no_output(ssh_cmd)
-
def mkfcmap(self, source, target, full_copy):
ssh_cmd = ['svctask', 'mkfcmap', '-source', source, '-target',
target, '-autodelete']
self.driver = importutils.import_object(
volume_driver,
configuration=self.configuration,
- db=self.db)
+ db=self.db,
+ host=self.host)
def _add_to_threadpool(self, func, *args, **kwargs):
self._tp.spawn_n(func, *args, **kwargs)