class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
- RPC_API_VERSION = '1.9'
+ RPC_API_VERSION = '1.10'
target = messaging.Target(version=RPC_API_VERSION)
force_host_copy)
def retype(self, context, topic, volume_id,
- request_spec, filter_properties=None):
+ request_spec, filter_properties=None, volume=None):
"""Schedule the modification of a volume's type.
:param context: the request context
:param volume_id: the ID of the volume to retype
:param request_spec: parameters for this retype request
:param filter_properties: parameters to filter by
+ :param volume: the volume object to retype
"""
self._wait_for_scheduler()
+ # FIXME(thangp): Remove this in v2.0 of RPC API.
+ if volume is None:
+ # For older clients, mimic the old behavior and look up the
+ # volume by its volume_id.
+ volume = objects.Volume.get_by_id(context, volume_id)
+
def _retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations):
if reservations:
self._set_volume_state_and_notify('retype', volume_state,
context, ex, request_spec, msg)
- volume_ref = db.volume_get(context, volume_id)
reservations = request_spec.get('quota_reservations')
new_type = request_spec.get('volume_type')
if new_type is None:
msg = _('New volume type not specified in request_spec.')
ex = exception.ParameterNotFound(param='volume_type')
_retype_volume_set_error(self, context, ex, request_spec,
- volume_ref, msg, reservations)
+ volume, msg, reservations)
# Default migration policy is 'never'
migration_policy = request_spec.get('migration_policy')
except exception.NoValidHost as ex:
msg = (_("Could not find a host for volume %(volume_id)s with "
"type %(type_id)s.") %
- {'type_id': new_type['id'], 'volume_id': volume_id})
+ {'type_id': new_type['id'], 'volume_id': volume.id})
_retype_volume_set_error(self, context, ex, request_spec,
- volume_ref, msg, reservations)
+ volume, msg, reservations)
except Exception as ex:
with excutils.save_and_reraise_exception():
_retype_volume_set_error(self, context, ex, request_spec,
- volume_ref, None, reservations)
+ volume, None, reservations)
else:
- volume_rpcapi.VolumeAPI().retype(context, volume_ref,
+ volume_rpcapi.VolumeAPI().retype(context, volume,
new_type['id'], tgt_host,
migration_policy, reservations)
1.7 - Add get_active_pools method
1.8 - Add sending object over RPC in create_consistencygroup method
1.9 - Adds support for sending objects over RPC in create_volume()
+ 1.10 - Adds support for sending objects over RPC in retype()
"""
RPC_API_VERSION = '1.0'
filter_properties=filter_properties)
def retype(self, ctxt, topic, volume_id,
- request_spec=None, filter_properties=None):
+ request_spec=None, filter_properties=None, volume=None):
- cctxt = self.client.prepare(version='1.4')
request_spec_p = jsonutils.to_primitive(request_spec)
- return cctxt.cast(ctxt, 'retype',
- topic=topic,
- volume_id=volume_id,
- request_spec=request_spec_p,
- filter_properties=filter_properties)
+ msg_args = {'topic': topic, 'volume_id': volume_id,
+ 'request_spec': request_spec_p,
+ 'filter_properties': filter_properties}
+ if self.client.can_send_version('1.10'):
+ version = '1.10'
+ msg_args['volume'] = volume
+ else:
+ version = '1.4'
+
+ cctxt = self.client.prepare(version=version)
+ return cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):
filter_properties='filter_properties',
version='1.3')
- def test_retype(self):
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=True)
+ def test_retype(self, can_send_version):
self._test_scheduler_api('retype',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
+ volume='volume',
+ version='1.10')
+ can_send_version.assert_called_with('1.10')
+
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=False)
+ def test_retype_old(self, can_send_version):
+ self._test_scheduler_api('retype',
+ rpc_method='cast',
+ topic='topic',
+ volume_id='volume_id',
+ request_spec='fake_request_spec',
+ filter_properties='filter_properties',
+ volume='volume',
version='1.4')
+ can_send_version.assert_called_with('1.10')
def test_manage_existing(self):
self._test_scheduler_api('manage_existing',
request_spec, {})
@mock.patch('cinder.db.volume_update')
- @mock.patch('cinder.db.volume_get')
- def test_retype_volume_exception_returns_volume_state(self, _mock_vol_get,
- _mock_vol_update):
+ @mock.patch('cinder.db.volume_attachment_get_used_by_volume_id')
+ def test_retype_volume_exception_returns_volume_state(
+ self, _mock_vol_attachment_get, _mock_vol_update):
# Test NoValidHost exception behavior for retype.
# Puts the volume in original state and eats the exception.
volume = tests_utils.create_volume(self.context,
status='retyping',
previous_status='in-use')
instance_uuid = '12345678-1234-5678-1234-567812345678'
- volume = tests_utils.attach_volume(self.context, volume['id'],
- instance_uuid, None, '/dev/fake')
- fake_volume_id = volume.id
+ volume_attach = tests_utils.attach_volume(self.context, volume.id,
+ instance_uuid, None,
+ '/dev/fake')
+ _mock_vol_attachment_get.return_value = [volume_attach]
topic = 'fake_topic'
- request_spec = {'volume_id': fake_volume_id, 'volume_type': {'id': 3},
+ request_spec = {'volume_id': volume.id, 'volume_type': {'id': 3},
'migration_policy': 'on-demand'}
- _mock_vol_get.return_value = volume
_mock_vol_update.return_value = {'status': 'in-use'}
_mock_find_retype_host = mock.Mock(
side_effect=exception.NoValidHost(reason=""))
orig_retype = self.manager.driver.find_retype_host
self.manager.driver.find_retype_host = _mock_find_retype_host
- self.manager.retype(self.context, topic, fake_volume_id,
+ self.manager.retype(self.context, topic, volume.id,
request_spec=request_spec,
- filter_properties={})
+ filter_properties={},
+ volume=volume)
- _mock_vol_get.assert_called_once_with(self.context, fake_volume_id)
_mock_find_retype_host.assert_called_once_with(self.context,
request_spec, {},
'on-demand')
- _mock_vol_update.assert_called_once_with(self.context, fake_volume_id,
+ _mock_vol_update.assert_called_once_with(self.context, volume.id,
{'status': 'in-use'})
self.manager.driver.find_retype_host = orig_retype
False,
FAKE_METADATA_TYPE.fake_type)
+ @mock.patch('cinder.db.volume_update')
+ def test_update_with_ovo(self, volume_update):
+ """Test update volume using oslo_versionedobject."""
+ volume = tests_utils.create_volume(self.context, **self.volume_params)
+ volume_api = cinder.volume.api.API()
+ updates = {'display_name': 'foobbar'}
+ volume_api.update(self.context, volume, updates)
+ volume_update.assert_called_once_with(self.context, volume.id,
+ updates)
+ self.assertEqual('foobbar', volume.display_name)
+
def test_delete_volume_metadata_with_metatype(self):
"""Test delete volume metadata with different metadata type."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
status='creating', host=CONF.host)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
- volume['host'] = 'fakehost'
volume_api = cinder.volume.api.API()
host=CONF.host, status='retyping',
volume_type_id=old_vol_type['id'],
replication_status=rep_status)
- volume['previous_status'] = 'available'
+ volume.previous_status = 'available'
+ volume.save()
if snap:
- self._create_snapshot(volume['id'], size=volume['size'])
+ self._create_snapshot(volume.id, size=volume.size)
if driver or diff_equal:
host_obj = {'host': CONF.host, 'capabilities': {}}
else:
host_obj = {'host': 'newhost', 'capabilities': {}}
- reserve_opts = {'volumes': 1, 'gigabytes': volume['size']}
+ reserve_opts = {'volumes': 1, 'gigabytes': volume.size}
QUOTAS.add_volume_type_opts(self.context,
reserve_opts,
vol_type['id'])
_mig.return_value = True
if not exc:
- self.volume.retype(self.context, volume['id'],
+ self.volume.retype(self.context, volume.id,
vol_type['id'], host_obj,
migration_policy=policy,
- reservations=reservations)
+ reservations=reservations,
+ volume=volume)
else:
self.assertRaises(exc, self.volume.retype,
- self.context, volume['id'],
+ self.context, volume.id,
vol_type['id'], host_obj,
migration_policy=policy,
- reservations=reservations)
- get_volume.assert_called_once_with(self.context, volume['id'])
+ reservations=reservations,
+ volume=volume)
# get volume/quota properties
- volume = db.volume_get(elevated, volume['id'])
+ volume = objects.Volume.get_by_id(elevated, volume.id)
try:
usage = db.quota_usage_get(elevated, project_id, 'volumes_new')
volumes_in_use = usage.in_use
# check properties
if driver or diff_equal:
- self.assertEqual(vol_type['id'], volume['volume_type_id'])
- self.assertEqual('available', volume['status'])
- self.assertEqual(CONF.host, volume['host'])
+ self.assertEqual(vol_type['id'], volume.volume_type_id)
+ self.assertEqual('available', volume.status)
+ self.assertEqual(CONF.host, volume.host)
self.assertEqual(1, volumes_in_use)
elif not exc:
- self.assertEqual(old_vol_type['id'], volume['volume_type_id'])
- self.assertEqual('retyping', volume['status'])
- self.assertEqual(CONF.host, volume['host'])
+ self.assertEqual(old_vol_type['id'], volume.volume_type_id)
+ self.assertEqual('retyping', volume.status)
+ self.assertEqual(CONF.host, volume.host)
self.assertEqual(1, volumes_in_use)
else:
- self.assertEqual(old_vol_type['id'], volume['volume_type_id'])
- self.assertEqual('available', volume['status'])
- self.assertEqual(CONF.host, volume['host'])
+ self.assertEqual(old_vol_type['id'], volume.volume_type_id)
+ self.assertEqual('available', volume.status)
+ self.assertEqual(CONF.host, volume.host)
self.assertEqual(0, volumes_in_use)
def test_retype_volume_driver_success(self):
error=False,
version='1.10')
- def test_retype(self):
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=True)
+ def test_retype(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
dest_host = FakeHost()
self._test_volume_api('retype',
rpc_method='cast',
- volume=self.fake_volume,
+ volume=self.fake_volume_obj,
+ new_type_id='fake',
+ dest_host=dest_host,
+ migration_policy='never',
+ reservations=None,
+ version='1.34')
+ can_send_version.assert_called_once_with('1.34')
+
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=False)
+ def test_retype_old(self, can_send_version):
+ class FakeHost(object):
+ def __init__(self):
+ self.host = 'host'
+ self.capabilities = {}
+ dest_host = FakeHost()
+ self._test_volume_api('retype',
+ rpc_method='cast',
+ volume=self.fake_volume_obj,
new_type_id='fake',
dest_host=dest_host,
migration_policy='never',
reservations=None,
version='1.12')
+ can_send_version.assert_called_once_with('1.34')
def test_manage_existing(self):
self._test_volume_api('manage_existing',
msg = _("The volume cannot be updated during maintenance.")
raise exception.InvalidVolume(reason=msg)
- vref = self.db.volume_update(context, volume['id'], fields)
- LOG.info(_LI("Volume updated successfully."), resource=vref)
+ # NOTE(thangp): Update is called by various APIs, some of which are
+ # not yet using oslo_versionedobjects. We need to handle the case
+ # where volume is either a dict or a oslo_versionedobject.
+ if isinstance(volume, objects_base.CinderObject):
+ volume.update(fields)
+ volume.save()
+ LOG.info(_LI("Volume updated successfully."), resource=volume)
+ else:
+ vref = self.db.volume_update(context, volume['id'], fields)
+ LOG.info(_LI("Volume updated successfully."), resource=vref)
def get(self, context, volume_id, viewable_admin_meta=False):
volume = objects.Volume.get_by_id(context, volume_id)
@wrap_check_policy
def retype(self, context, volume, new_type, migration_policy=None):
"""Attempt to modify the type associated with an existing volume."""
- if volume['status'] not in ['available', 'in-use']:
+ if volume.status not in ['available', 'in-use']:
msg = _('Unable to update type due to incorrect status: '
'%(vol_status)s on volume: %(vol_id)s. Volume status '
'must be available or '
- 'in-use.') % {'vol_status': volume['status'],
- 'vol_id': volume['id']}
+ 'in-use.') % {'vol_status': volume.status,
+ 'vol_id': volume.id}
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
if self._is_volume_migrating(volume):
msg = (_("Volume %s is already part of an active migration.")
- % volume['id'])
+ % volume.id)
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
- cg_id = volume.get('consistencygroup_id', None)
- if cg_id:
+ if volume.consistencygroup_id:
msg = _("Volume must not be part of a consistency group.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
vol_type_qos_id = vol_type['qos_specs_id']
old_vol_type = None
- old_vol_type_id = volume['volume_type_id']
+ old_vol_type_id = volume.volume_type_id
old_vol_type_qos_id = None
# Error if the original and new type are the same
- if volume['volume_type_id'] == vol_type_id:
+ if volume.volume_type_id == vol_type_id:
msg = _('New volume_type same as original: %s.') % new_type
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
- if volume['volume_type_id']:
+ if volume.volume_type_id:
old_vol_type = volume_types.get_volume_type(
context, old_vol_type_id)
old_vol_type_qos_id = old_vol_type['qos_specs_id']
# We don't support changing QoS at the front-end yet for in-use volumes
# TODO(avishay): Call Nova to change QoS setting (libvirt has support
# - virDomainSetBlockIoTune() - Nova does not have support yet).
- if (volume['status'] != 'available' and
+ if (volume.status != 'available' and
old_vol_type_qos_id != vol_type_qos_id):
for qos_id in [old_vol_type_qos_id, vol_type_qos_id]:
if qos_id:
specs = qos_specs.get_qos_specs(context.elevated(), qos_id)
if specs['consumer'] != 'back-end':
msg = _('Retype cannot change front-end qos specs for '
- 'in-use volume: %s.') % volume['id']
+ 'in-use volume: %s.') % volume.id
raise exception.InvalidInput(reason=msg)
# We're checking here in so that we can report any quota issues as
vol_type_id)
self.update(context, volume, {'status': 'retyping',
- 'previous_status': volume['status']})
+ 'previous_status': volume.status})
request_spec = {'volume_properties': volume,
- 'volume_id': volume['id'],
+ 'volume_id': volume.id,
'volume_type': vol_type,
'migration_policy': migration_policy,
'quota_reservations': reservations}
- self.scheduler_rpcapi.retype(context, CONF.volume_topic, volume['id'],
+ self.scheduler_rpcapi.retype(context, CONF.volume_topic, volume.id,
request_spec=request_spec,
- filter_properties={})
+ filter_properties={}, volume=volume)
LOG.info(_LI("Retype volume request issued successfully."),
resource=volume)
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
- RPC_API_VERSION = '1.33'
+ RPC_API_VERSION = '1.34'
target = messaging.Target(version=RPC_API_VERSION)
resource=volume)
def retype(self, ctxt, volume_id, new_type_id, host,
- migration_policy='never', reservations=None):
+ migration_policy='never', reservations=None, volume=None):
- def _retype_error(context, volume_id, old_reservations,
+ def _retype_error(context, volume, old_reservations,
new_reservations, status_update):
try:
- self.db.volume_update(context, volume_id, status_update)
+ volume.update(status_update)
+ volume.save()
finally:
QUOTAS.rollback(context, old_reservations)
QUOTAS.rollback(context, new_reservations)
context = ctxt.elevated()
- volume_ref = self.db.volume_get(ctxt, volume_id)
- status_update = {'status': volume_ref['previous_status']}
- if context.project_id != volume_ref['project_id']:
- project_id = volume_ref['project_id']
+ # FIXME(thangp): Remove this in v2.0 of RPC API.
+ if volume is None:
+ # For older clients, mimic the old behavior and look up the volume
+ # by its volume_id.
+ volume = objects.Volume.get_by_id(context, volume_id)
+
+ status_update = {'status': volume.previous_status}
+ if context.project_id != volume.project_id:
+ project_id = volume.project_id
else:
project_id = context.project_id
# set the volume status to error. Should that be done
# here? Setting the volume back to it's original status
# for now.
- self.db.volume_update(context, volume_id, status_update)
+ volume.update(status_update)
+ volume.save()
# Get old reservations
try:
- reserve_opts = {'volumes': -1, 'gigabytes': -volume_ref['size']}
+ reserve_opts = {'volumes': -1, 'gigabytes': -volume.size}
QUOTAS.add_volume_type_opts(context,
reserve_opts,
- volume_ref.get('volume_type_id'))
+ volume.volume_type_id)
old_reservations = QUOTAS.reserve(context,
project_id=project_id,
**reserve_opts)
except Exception:
- self.db.volume_update(context, volume_id, status_update)
+ volume.update(status_update)
+ volume.save()
LOG.exception(_LE("Failed to update usages "
"while retyping volume."))
raise exception.CinderException(_("Failed to get old volume type"
# If volume types have the same contents, no need to do anything
retyped = False
diff, all_equal = volume_types.volume_types_diff(
- context, volume_ref.get('volume_type_id'), new_type_id)
+ context, volume.volume_type_id, new_type_id)
if all_equal:
retyped = True
try:
new_type = volume_types.get_volume_type(context, new_type_id)
ret = self.driver.retype(context,
- volume_ref,
+ volume,
new_type,
diff,
host)
retyped = ret
if retyped:
- LOG.info(_LI("Volume %s: retyped successfully"), volume_id)
+ LOG.info(_LI("Volume %s: retyped successfully"), volume.id)
except Exception:
retyped = False
LOG.exception(_LE("Volume %s: driver error when trying to "
"retype, falling back to generic "
- "mechanism."), volume_ref['id'])
+ "mechanism."), volume.id)
# We could not change the type, so we need to migrate the volume, where
# the destination volume will be of the new type
if not retyped:
if migration_policy == 'never':
- _retype_error(context, volume_id, old_reservations,
+ _retype_error(context, volume, old_reservations,
new_reservations, status_update)
msg = _("Retype requires migration but is not allowed.")
raise exception.VolumeMigrationFailed(reason=msg)
snaps = objects.SnapshotList.get_all_for_volume(context,
- volume_ref['id'])
+ volume.id)
if snaps:
- _retype_error(context, volume_id, old_reservations,
+ _retype_error(context, volume, old_reservations,
new_reservations, status_update)
msg = _("Volume must not have snapshots.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# Don't allow volume with replicas to be migrated
- rep_status = volume_ref['replication_status']
+ rep_status = volume.replication_status
if rep_status is not None and rep_status != 'disabled':
- _retype_error(context, volume_id, old_reservations,
+ _retype_error(context, volume, old_reservations,
new_reservations, status_update)
msg = _("Volume must not be replicated.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
- self.db.volume_update(context, volume_ref['id'],
- {'migration_status': 'starting'})
+ volume.migration_status = 'starting'
+ volume.save()
try:
- self.migrate_volume(context, volume_id, host,
+ self.migrate_volume(context, volume.id, host,
new_type_id=new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
- _retype_error(context, volume_id, old_reservations,
+ _retype_error(context, volume, old_reservations,
new_reservations, status_update)
else:
model_update = {'volume_type_id': new_type_id,
'status': status_update['status']}
if retype_model_update:
model_update.update(retype_model_update)
- self.db.volume_update(context, volume_id, model_update)
+ volume.update(model_update)
+ volume.save()
if old_reservations:
QUOTAS.commit(context, old_reservations, project_id=project_id)
QUOTAS.commit(context, new_reservations, project_id=project_id)
self.publish_service_capabilities(context)
LOG.info(_LI("Retype volume completed successfully."),
- resource=volume_ref)
+ resource=volume)
def manage_existing(self, ctxt, volume_id, ref=None):
try:
args. Forwarding CGSnapshot object instead of CGSnapshot_id.
1.32 - Adds support for sending objects over RPC in create_volume().
1.33 - Adds support for sending objects over RPC in delete_volume().
+ 1.34 - Adds support for sending objects over RPC in retype().
"""
BASE_RPC_API_VERSION = '1.0'
def retype(self, ctxt, volume, new_type_id, dest_host,
migration_policy='never', reservations=None):
- new_host = utils.extract_host(volume['host'])
- cctxt = self.client.prepare(server=new_host, version='1.12')
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
- cctxt.cast(ctxt, 'retype', volume_id=volume['id'],
- new_type_id=new_type_id, host=host_p,
- migration_policy=migration_policy,
- reservations=reservations)
+ msg_args = {'volume_id': volume.id, 'new_type_id': new_type_id,
+ 'host': host_p, 'migration_policy': migration_policy,
+ 'reservations': reservations}
+ if self.client.can_send_version('1.34'):
+ version = '1.34'
+ msg_args['volume'] = volume
+ else:
+ version = '1.12'
+
+ new_host = utils.extract_host(volume.host)
+ cctxt = self.client.prepare(server=new_host, version=version)
+ cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, volume, ref):
new_host = utils.extract_host(volume['host'])