self.volume_api.update_readonly_flag(context, volume, readonly_flag)
return webob.Response(status_int=202)
+ @wsgi.action('os-retype')
+ def _retype(self, req, id, body):
+ """Change type of existing volume."""
+ context = req.environ['cinder.context']
+ volume = self.volume_api.get(context, id)
+ try:
+ new_type = body['os-retype']['new_type']
+ except KeyError:
+ msg = _("New volume type must be specified.")
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+ policy = body['os-retype'].get('migration_policy')
+
+ self.volume_api.retype(context, volume, new_type, policy)
+ return webob.Response(status_int=202)
+
class Volume_actions(extensions.ExtensionDescriptor):
"""Enable volume actions
msg_fmt = _("key manager error: %(reason)s")
+class VolumeRetypeFailed(CinderException):
+ message = _("Volume retype failed: %(reason)s")
+
+
# Driver specific exceptions
# Coraid
class CoraidException(VolumeDriverException):
--- /dev/null
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from cinder import exception
+from cinder.openstack.common import log as logging
+from cinder import quota
+
+
+LOG = logging.getLogger(__name__)
+QUOTAS = quota.QUOTAS
+
+
+def get_volume_type_reservation(ctxt, volume, type_id):
+ # Reserve quotas for the given volume type
+ try:
+ reserve_opts = {'volumes': 1, 'gigabytes': volume['size']}
+ QUOTAS.add_volume_type_opts(ctxt,
+ reserve_opts,
+ type_id)
+ reservations = QUOTAS.reserve(ctxt, **reserve_opts)
+ except exception.OverQuota as e:
+ overs = e.kwargs['overs']
+ usages = e.kwargs['usages']
+ quotas = e.kwargs['quotas']
+
+ def _consumed(name):
+ return (usages[name]['reserved'] + usages[name]['in_use'])
+
+ for over in overs:
+ if 'gigabytes' in over:
+ s_size = volume['size']
+ d_quota = quotas[over]
+ d_consumed = _consumed(over)
+ msg = _("Quota exceeded for %(s_pid)s, tried to create "
+ "%(s_size)sG volume - (%(d_consumed)dG of "
+ "%(d_quota)dG already consumed)")
+ LOG.warn(msg % {'s_pid': ctxt.project_id,
+ 's_size': s_size,
+ 'd_consumed': d_consumed,
+ 'd_quota': d_quota})
+ raise exception.VolumeSizeExceedsAvailableQuota(
+ requested=s_size, quota=d_quota, consumed=d_consumed)
+ elif 'volumes' in over:
+ msg = _("Quota exceeded for %(s_pid)s, tried to create "
+ "volume (%(d_consumed)d volumes "
+ "already consumed)")
+
+ LOG.warn(msg % {'s_pid': ctxt.project_id,
+ 'd_consumed': _consumed(over)})
+ raise exception.VolumeLimitExceeded(
+ allowed=quotas[over])
+ return reservations
return self._filter_hosts(request_spec, hosts, **kwargs)
+ def _choose_host_from_list(self, hosts):
+ return hosts[int(random.random() * len(hosts))]
+
def _schedule(self, context, topic, request_spec, **kwargs):
"""Picks a host that is up at random."""
hosts = self._get_weighted_candidates(context, topic,
if not hosts:
msg = _("Could not find another host")
raise exception.NoValidHost(reason=msg)
- return hosts[int(random.random() * len(hosts))]
+ return self._choose_host_from_list(hosts)
def schedule_create_volume(self, context, request_spec, filter_properties):
"""Picks a host that is up at random."""
msg = (_('cannot place volume %(id)s on %(host)s')
% {'id': request_spec['volume_id'], 'host': host})
raise exception.NoValidHost(reason=msg)
+
+ def find_retype_host(self, context, request_spec, filter_properties,
+ migration_policy='never'):
+ """Find a host that can accept the volume with its new type."""
+ current_host = request_spec['volume_properties']['host']
+
+ # The volume already exists on this host, and so we shouldn't check if
+ # it can accept the volume again.
+ filter_properties['vol_exists_on'] = current_host
+
+ weighed_hosts = self._get_weighted_candidates(
+ context,
+ CONF.volume_topic,
+ request_spec,
+ filter_properties=filter_properties)
+ if not weighed_hosts:
+ msg = (_('No valid hosts for volume %(id)s with type %(type)s')
+ % {'id': request_spec['volume_id'],
+ 'type': request_spec['volume_type']})
+ raise exception.NoValidHost(reason=msg)
+
+ target_host = None
+ for weighed_host in weighed_hosts:
+ if weighed_host == current_host:
+ target_host = current_host
+
+ if migration_policy == 'never' and target_host is None:
+ msg = (_('Current host not valid for volume %(id)s with type '
+ '%(type)s, migration not allowed')
+ % {'id': request_spec['volume_id'],
+ 'type': request_spec['volume_type']})
+ raise exception.NoValidHost(reason=msg)
+
+ if not target_host:
+ target_host = self._choose_host_from_list(weighed_hosts)
+
+ elevated = context.elevated()
+ host_states = self.host_manager.get_all_host_states(elevated)
+ for host_state in host_states:
+ if host_state.host == target_host:
+ return (host_state, migration_policy)
+
+ # NOTE(avishay):We should never get here, but raise just in case
+ msg = (_('No host_state for selected host %s') % target_host)
+ raise exception.NoValidHost(reason=msg)
"""Check if the specified host passes the filters."""
raise NotImplementedError(_("Must implement host_passes_filters"))
+ def find_retype_host(self, context, request_spec, filter_properties={},
+ migration_policy='never'):
+ """Find a host that can accept the volume with its new type."""
+ raise NotImplementedError(_("Must implement find_retype_host"))
+
def schedule(self, context, topic, method, *_args, **_kwargs):
"""Must override schedule method for scheduler to work."""
raise NotImplementedError(_("Must implement a fallback schedule"))
% {'id': request_spec['volume_id'], 'host': host})
raise exception.NoValidHost(reason=msg)
+ def find_retype_host(self, context, request_spec, filter_properties={},
+ migration_policy='never'):
+ """Find a host that can accept the volume with its new type."""
+ current_host = request_spec['volume_properties']['host']
+
+ # The volume already exists on this host, and so we shouldn't check if
+ # it can accept the volume again in the CapacityFilter.
+ filter_properties['vol_exists_on'] = current_host
+
+ weighed_hosts = self._get_weighted_candidates(context, request_spec,
+ filter_properties)
+ if not weighed_hosts:
+ msg = (_('No valid hosts for volume %(id)s with type %(type)s')
+ % {'id': request_spec['volume_id'],
+ 'type': request_spec['volume_type']})
+ raise exception.NoValidHost(reason=msg)
+
+ for weighed_host in weighed_hosts:
+ host_state = weighed_host.obj
+ if host_state.host == current_host:
+ return host_state
+
+ if migration_policy == 'never':
+ msg = (_('Current host not valid for volume %(id)s with type '
+ '%(type)s, migration not allowed')
+ % {'id': request_spec['volume_id'],
+ 'type': request_spec['volume_type']})
+ raise exception.NoValidHost(reason=msg)
+
+ top_host = self._choose_top_host(weighed_hosts, request_spec)
+ return top_host.obj
+
def _post_select_populate_filter_properties(self, filter_properties,
host_state):
"""Add additional information to the filter properties after a host has
filter_properties)
if not weighed_hosts:
return None
- best_host = weighed_hosts[0]
- LOG.debug(_("Choosing %s") % best_host)
+ return self._choose_top_host(weighed_hosts, request_spec)
+
+ def _choose_top_host(self, weighed_hosts, request_spec):
+ top_host = weighed_hosts[0]
+ host_state = top_host.obj
+ LOG.debug(_("Choosing %s") % host_state.host)
volume_properties = request_spec['volume_properties']
- best_host.obj.consume_from_volume(volume_properties)
- return best_host
+ host_state.consume_from_volume(volume_properties)
+ return top_host
def host_passes(self, host_state, filter_properties):
"""Return True if host has sufficient capacity."""
+
+ # If the volume already exists on this host, don't fail it for
+ # insufficient capacity (e.g., if we are retyping)
+ if host_state.host == filter_properties.get('vol_exists_on'):
+ return True
+
volume_size = filter_properties.get('size')
if host_state.free_capacity_gb is None:
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common.notifier import api as notifier
+from cinder import quota
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
CONF = cfg.CONF
CONF.register_opt(scheduler_driver_opt)
+QUOTAS = quota.QUOTAS
+
LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
- RPC_API_VERSION = '1.3'
+ RPC_API_VERSION = '1.4'
def __init__(self, scheduler_driver=None, service_name=None,
*args, **kwargs):
def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
- def _migrate_volume_set_error(self, context, ex, request_spec):
- volume_state = {'volume_state': {'migration_status': None}}
- self._set_volume_state_and_notify('migrate_volume_to_host',
- volume_state,
- context, ex, request_spec)
-
def migrate_volume_to_host(self, context, topic, volume_id, host,
force_host_copy, request_spec,
filter_properties=None):
"""Ensure that the host exists and can accept the volume."""
+
+ def _migrate_volume_set_error(self, context, ex, request_spec):
+ volume_state = {'volume_state': {'migration_status': None}}
+ self._set_volume_state_and_notify('migrate_volume_to_host',
+ volume_state,
+ context, ex, request_spec)
+
try:
tgt_host = self.driver.host_passes_filters(context, host,
request_spec,
filter_properties)
except exception.NoValidHost as ex:
- self._migrate_volume_set_error(context, ex, request_spec)
+ _migrate_volume_set_error(self, context, ex, request_spec)
except Exception as ex:
with excutils.save_and_reraise_exception():
- self._migrate_volume_set_error(context, ex, request_spec)
+ _migrate_volume_set_error(self, context, ex, request_spec)
else:
volume_ref = db.volume_get(context, volume_id)
volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
tgt_host,
force_host_copy)
+ def retype(self, context, topic, volume_id,
+ request_spec, filter_properties=None):
+ """Schedule the modification of a volume's type.
+
+ :param context: the request context
+ :param topic: the topic listened on
+ :param volume_id: the ID of the volume to retype
+ :param request_spec: parameters for this retype request
+ :param filter_properties: parameters to filter by
+ """
+ def _retype_volume_set_error(self, context, ex, request_spec,
+ volume_ref, msg, reservations):
+ if reservations:
+ QUOTAS.rollback(context, reservations)
+ if (volume_ref['instance_uuid'] is None and
+ volume_ref['attached_host'] is None):
+ orig_status = 'available'
+ else:
+ orig_status = 'in-use'
+ volume_state = {'volume_state': {'status': orig_status}}
+ self._set_volume_state_and_notify('retype', volume_state,
+ context, ex, request_spec, msg)
+
+ volume_ref = db.volume_get(context, volume_id)
+ reservations = request_spec.get('quota_reservations')
+ new_type = request_spec.get('volume_type')
+ if new_type is None:
+ msg = _('New volume type not specified in request_spec.')
+ ex = exception.ParameterNotFound(param='volume_type')
+ _retype_volume_set_error(self, context, ex, request_spec,
+ volume_ref, msg, reservations)
+
+ # Default migration policy is 'never'
+ migration_policy = request_spec.get('migration_policy')
+ if not migration_policy:
+ migration_policy = 'never'
+
+ try:
+ tgt_host = self.driver.find_retype_host(context, request_spec,
+ filter_properties,
+ migration_policy)
+ except exception.NoValidHost as ex:
+ msg = (_("Could not find a host for volume %(volume_id)s with "
+ "type %(type_id)s.") %
+ {'type_id': new_type['id'], 'volume_id': volume_id})
+ _retype_volume_set_error(self, context, ex, request_spec,
+ volume_ref, msg, reservations)
+ except Exception as ex:
+ with excutils.save_and_reraise_exception():
+ _retype_volume_set_error(self, context, ex, request_spec,
+ volume_ref, None, reservations)
+ else:
+ volume_rpcapi.VolumeAPI().retype(context, volume_ref,
+ new_type['id'], tgt_host,
+ migration_policy, reservations)
+
def _set_volume_state_and_notify(self, method, updates, context, ex,
- request_spec):
+ request_spec, msg=None):
# TODO(harlowja): move into a task that just does this later.
-
- LOG.error(_("Failed to schedule_%(method)s: %(ex)s") %
- {'method': method, 'ex': ex})
+ if not msg:
+ msg = (_("Failed to schedule_%(method)s: %(ex)s") %
+ {'method': method, 'ex': ex})
+ LOG.error(msg)
volume_state = updates['volume_state']
properties = request_spec.get('volume_properties', {})
1.2 - Add request_spec, filter_properties arguments
to create_volume()
1.3 - Add migrate_volume_to_host() method
+ 1.4 - Add retype method
'''
RPC_API_VERSION = '1.0'
filter_properties=filter_properties),
version='1.3')
+ def retype(self, ctxt, topic, volume_id,
+ request_spec=None, filter_properties=None):
+ request_spec_p = jsonutils.to_primitive(request_spec)
+ return self.cast(ctxt, self.make_msg(
+ 'retype',
+ topic=topic,
+ volume_id=volume_id,
+ request_spec=request_spec_p,
+ filter_properties=filter_properties),
+ version='1.4')
+
def update_service_capabilities(self, ctxt,
service_name, host,
capabilities):
from cinder import db
from cinder import exception
from cinder.scheduler import chance
-from cinder.scheduler import driver
from cinder import utils
class SimpleScheduler(chance.ChanceScheduler):
"""Implements Naive Scheduler that tries to find least loaded host."""
- def schedule_create_volume(self, context, request_spec, filter_properties):
+ def _get_weighted_candidates(self, context, topic, request_spec, **kwargs):
"""Picks a host that is up and has the fewest volumes."""
elevated = context.elevated()
volume_properties = request_spec.get('volume_properties')
volume_size = volume_properties.get('size')
availability_zone = volume_properties.get('availability_zone')
+ filter_properties = kwargs.get('filter_properties', {})
zone, host = None, None
if availability_zone:
zone, _x, host = availability_zone.partition(':')
if host and context.is_admin:
- topic = CONF.volume_topic
service = db.service_get_by_args(elevated, host, topic)
if not utils.service_is_up(service):
raise exception.WillNotSchedule(host=host)
- updated_volume = driver.volume_update_db(context, volume_id, host)
- self.volume_rpcapi.create_volume(context, updated_volume, host,
- request_spec, filter_properties,
- snapshot_id=snapshot_id,
- image_id=image_id)
- return None
+ return [host]
+ candidates = []
results = db.service_get_all_volume_sorted(elevated)
if zone:
results = [(s, gigs) for (s, gigs) in results
if s['availability_zone'] == zone]
for result in results:
(service, volume_gigabytes) = result
- if volume_gigabytes + volume_size > CONF.max_gigabytes:
- msg = _("Not enough allocatable volume gigabytes remaining")
- raise exception.NoValidHost(reason=msg)
+ no_skip = service['host'] != filter_properties.get('vol_exists_on')
+ if no_skip and volume_gigabytes + volume_size > CONF.max_gigabytes:
+ continue
if utils.service_is_up(service) and not service['disabled']:
- updated_volume = driver.volume_update_db(context, volume_id,
- service['host'])
- self.volume_rpcapi.create_volume(context, updated_volume,
- service['host'], request_spec,
- filter_properties,
- snapshot_id=snapshot_id,
- image_id=image_id)
- return None
- msg = _("Is the appropriate service running?")
- raise exception.NoValidHost(reason=msg)
+ candidates.append(service['host'])
+
+ if candidates:
+ return candidates
+ else:
+ msg = _("No service with adequate space or no service running")
+ raise exception.NoValidHost(reason=msg)
+
+ def _choose_host_from_list(self, hosts):
+ return hosts[0]
from cinder.volume import api as volume_api
-def fake_volume_api(*args, **kwargs):
- return True
-
-
-def fake_volume_get(*args, **kwargs):
- return {'id': 'fake', 'host': 'fake'}
-
-
class VolumeActionsTest(test.TestCase):
_actions = ('os-detach', 'os-reserve', 'os-unreserve')
def setUp(self):
super(VolumeActionsTest, self).setUp()
- self.stubs.Set(volume.API, 'get', fake_volume_api)
self.UUID = uuid.uuid4()
- for _method in self._methods:
- self.stubs.Set(volume.API, _method, fake_volume_api)
-
- self.stubs.Set(volume.API, 'get', fake_volume_get)
+ self.api_patchers = {}
+ for _meth in self._methods:
+ self.api_patchers[_meth] = mock.patch('cinder.volume.API.' + _meth)
+ self.api_patchers[_meth].start()
+ self.api_patchers[_meth].return_value = True
+
+ vol = {'id': 'fake', 'host': 'fake', 'status': 'available', 'size': 1,
+ 'migration_status': None, 'volume_type_id': 'fake'}
+ self.get_patcher = mock.patch('cinder.volume.API.get')
+ self.mock_volume_get = self.get_patcher.start()
+ self.mock_volume_get.return_value = vol
+ self.update_patcher = mock.patch('cinder.volume.API.update')
+ self.mock_volume_update = self.update_patcher.start()
+ self.mock_volume_update.return_value = vol
+
+ self.flags(rpc_backend='cinder.openstack.common.rpc.impl_fake')
+
+ def tearDown(self):
+ for patcher in self.api_patchers:
+ self.api_patchers[patcher].stop()
+ self.update_patcher.stop()
+ self.get_patcher.stop()
+ super(VolumeActionsTest, self).tearDown()
def test_simple_api_actions(self):
app = fakes.wsgi_app()
make_update_readonly_flag_test(self, None, 400)
+class VolumeRetypeActionsTest(VolumeActionsTest):
+ def setUp(self):
+ def get_vol_type(*args, **kwargs):
+ d1 = {'id': 'fake', 'qos_specs_id': 'fakeqid1', 'extra_specs': {}}
+ d2 = {'id': 'foo', 'qos_specs_id': 'fakeqid2', 'extra_specs': {}}
+ return d1 if d1['id'] == args[1] else d2
+
+ self.retype_patchers = {}
+ self.retype_mocks = {}
+ paths = ['cinder.volume.volume_types.get_volume_type',
+ 'cinder.volume.volume_types.get_volume_type_by_name',
+ 'cinder.volume.qos_specs.get_qos_specs',
+ 'cinder.quota.QUOTAS.add_volume_type_opts',
+ 'cinder.quota.QUOTAS.reserve']
+ for path in paths:
+ name = path.split('.')[-1]
+ self.retype_patchers[name] = mock.patch(path)
+ self.retype_mocks[name] = self.retype_patchers[name].start()
+
+ self.retype_mocks['get_volume_type'].side_effect = get_vol_type
+ self.retype_mocks['get_volume_type_by_name'].side_effect = get_vol_type
+ self.retype_mocks['add_volume_type_opts'].return_value = None
+ self.retype_mocks['reserve'].return_value = None
+
+ super(VolumeRetypeActionsTest, self).setUp()
+
+ def tearDown(self):
+ for name, patcher in self.retype_patchers.iteritems():
+ patcher.stop()
+ super(VolumeRetypeActionsTest, self).tearDown()
+
+ def _retype_volume_exec(self, expected_status, new_type='foo'):
+ req = webob.Request.blank('/v2/fake/volumes/1/action')
+ req.method = 'POST'
+ req.headers['content-type'] = 'application/json'
+ retype_body = {'new_type': new_type, 'migration_policy': 'never'}
+ req.body = jsonutils.dumps({'os-retype': retype_body})
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, expected_status)
+
+ @mock.patch('cinder.volume.qos_specs.get_qos_specs')
+ def test_retype_volume_success(self, _mock_get_qspecs):
+ # Test that the retype API works for both available and in-use
+ self._retype_volume_exec(202)
+ self.mock_volume_get.return_value['status'] = 'in-use'
+ specs = {'qos_specs': {'id': 'fakeqid1', 'consumer': 'back-end'}}
+ _mock_get_qspecs.return_value = specs
+ self._retype_volume_exec(202)
+
+ def test_retype_volume_no_body(self):
+ # Request with no body should fail
+ req = webob.Request.blank('/v2/fake/volumes/1/action')
+ req.method = 'POST'
+ req.headers['content-type'] = 'application/json'
+ req.body = jsonutils.dumps({'os-retype': None})
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_retype_volume_bad_policy(self):
+ # Request with invalid migration policy should fail
+ req = webob.Request.blank('/v2/fake/volumes/1/action')
+ req.method = 'POST'
+ req.headers['content-type'] = 'application/json'
+ retype_body = {'new_type': 'foo', 'migration_policy': 'invalid'}
+ req.body = jsonutils.dumps({'os-retype': retype_body})
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_retype_volume_bad_status(self):
+ # Should fail if volume does not have proper status
+ self.mock_volume_get.return_value['status'] = 'error'
+ self._retype_volume_exec(400)
+
+ def test_retype_type_no_exist(self):
+ # Should fail if new type does not exist
+ exc = exception.VolumeTypeNotFound('exc')
+ self.retype_mocks['get_volume_type'].side_effect = exc
+ self._retype_volume_exec(404)
+
+ def test_retype_same_type(self):
+ # Should fail if new type and old type are the same
+ self._retype_volume_exec(400, new_type='fake')
+
+ def test_retype_over_quota(self):
+ # Should fail if going over quota for new type
+ exc = exception.OverQuota(overs=['gigabytes'],
+ quotas={'gigabytes': 20},
+ usages={'gigabytes': {'reserved': 5,
+ 'in_use': 15}})
+ self.retype_mocks['reserve'].side_effect = exc
+ self._retype_volume_exec(413)
+
+ @mock.patch('cinder.volume.qos_specs.get_qos_specs')
+ def _retype_volume_diff_qos(self, vol_status, consumer, expected_status,
+ _mock_get_qspecs):
+ def fake_get_qos(ctxt, qos_id):
+ d1 = {'qos_specs': {'id': 'fakeqid1', 'consumer': consumer}}
+ d2 = {'qos_specs': {'id': 'fakeqid2', 'consumer': consumer}}
+ return d1 if d1['qos_specs']['id'] == qos_id else d2
+
+ self.mock_volume_get.return_value['status'] = vol_status
+ _mock_get_qspecs.side_effect = fake_get_qos
+ self._retype_volume_exec(expected_status)
+
+ def test_retype_volume_diff_qos_fe_in_use(self):
+ # should fail if changing qos enforced by front-end for in-use volumes
+ self._retype_volume_diff_qos('in-use', 'front-end', 400)
+
+ def test_retype_volume_diff_qos_fe_available(self):
+ # should NOT fail if changing qos enforced by FE for available volumes
+ self._retype_volume_diff_qos('available', 'front-end', 202)
+
+ def test_retype_volume_diff_qos_be(self):
+ # should NOT fail if changing qos enforced by back-end
+ self._retype_volume_diff_qos('available', 'back-end', 202)
+ self._retype_volume_diff_qos('in-use', 'back-end', 202)
+
+
def stub_volume_get(self, context, volume_id):
volume = stubs.stub_volume(volume_id)
if volume_id == 5:
"volume:migrate_volume": [["rule:admin_api"]],
"volume:migrate_volume_completion": [["rule:admin_api"]],
"volume:update_readonly_flag": [],
+ "volume:retype": [],
"volume_extension:volume_admin_actions:reset_status": [["rule:admin_api"]],
"volume_extension:snapshot_admin_actions:reset_status": [["rule:admin_api"]],
'free_capacity_gb': 1024,
'allocated_capacity_gb': 0,
'reserved_percentage': 10,
+ 'volume_backend_name': 'lvm1',
'timestamp': None},
'host2': {'total_capacity_gb': 2048,
'free_capacity_gb': 300,
'allocated_capacity_gb': 1748,
'reserved_percentage': 10,
+ 'volume_backend_name': 'lvm2',
'timestamp': None},
'host3': {'total_capacity_gb': 512,
'free_capacity_gb': 256,
'allocated_capacity_gb': 256,
'reserved_percentage': 0,
+ 'volume_backend_name': 'lvm3',
'timestamp': None},
'host4': {'total_capacity_gb': 2048,
'free_capacity_gb': 200,
'allocated_capacity_gb': 1848,
'reserved_percentage': 5,
+ 'volume_backend_name': 'lvm4',
'timestamp': None},
}
sched.host_passes_filters,
ctx, 'host1', request_spec, {})
self.assertTrue(_mock_service_get_topic.called)
+
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ def test_retype_policy_never_migrate_pass(self, _mock_service_get_topic):
+ # Retype should pass if current host passes filters and
+ # policy=never. host4 doesn't have enough space to hold an additional
+ # 200GB, but it is already the host of this volume and should not be
+ # counted twice.
+ sched, ctx = self._host_passes_filters_setup(
+ _mock_service_get_topic)
+ extra_specs = {'volume_backend_name': 'lvm4'}
+ request_spec = {'volume_id': 1,
+ 'volume_type': {'name': 'LVM_iSCSI',
+ 'extra_specs': extra_specs},
+ 'volume_properties': {'project_id': 1,
+ 'size': 200,
+ 'host': 'host4'}}
+ host_state = sched.find_retype_host(ctx, request_spec,
+ filter_properties={},
+ migration_policy='never')
+ self.assertEqual(host_state.host, 'host4')
+
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ def test_retype_policy_never_migrate_fail(self, _mock_service_get_topic):
+ # Retype should fail if current host doesn't pass filters and
+ # policy=never.
+ sched, ctx = self._host_passes_filters_setup(
+ _mock_service_get_topic)
+ extra_specs = {'volume_backend_name': 'lvm1'}
+ request_spec = {'volume_id': 1,
+ 'volume_type': {'name': 'LVM_iSCSI',
+ 'extra_specs': extra_specs},
+ 'volume_properties': {'project_id': 1,
+ 'size': 200,
+ 'host': 'host4'}}
+ self.assertRaises(exception.NoValidHost, sched.find_retype_host, ctx,
+ request_spec, filter_properties={},
+ migration_policy='never')
+
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ def test_retype_policy_demand_migrate_pass(self, _mock_service_get_topic):
+ # Retype should pass if current host fails filters but another host
+ # is suitable when policy=on-demand.
+ sched, ctx = self._host_passes_filters_setup(
+ _mock_service_get_topic)
+ extra_specs = {'volume_backend_name': 'lvm1'}
+ request_spec = {'volume_id': 1,
+ 'volume_type': {'name': 'LVM_iSCSI',
+ 'extra_specs': extra_specs},
+ 'volume_properties': {'project_id': 1,
+ 'size': 200,
+ 'host': 'host4'}}
+ host_state = sched.find_retype_host(ctx, request_spec,
+ filter_properties={},
+ migration_policy='on-demand')
+ self.assertEqual(host_state.host, 'host1')
+
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ def test_retype_policy_demand_migrate_fail(self, _mock_service_get_topic):
+ # Retype should fail if current host doesn't pass filters and
+ # no other suitable candidates exist even if policy=on-demand.
+ sched, ctx = self._host_passes_filters_setup(
+ _mock_service_get_topic)
+ extra_specs = {'volume_backend_name': 'lvm1'}
+ request_spec = {'volume_id': 1,
+ 'volume_type': {'name': 'LVM_iSCSI',
+ 'extra_specs': extra_specs},
+ 'volume_properties': {'project_id': 1,
+ 'size': 2048,
+ 'host': 'host4'}}
+ self.assertRaises(exception.NoValidHost, sched.find_retype_host, ctx,
+ request_spec, filter_properties={},
+ migration_policy='on-demand')
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.3')
+
+ @mock.patch('cinder.openstack.common.rpc.cast')
+ def test_retype(self, _mock_rpc_method):
+ self._test_scheduler_api('retype',
+ rpc_method='cast',
+ _mock_method=_mock_rpc_method,
+ topic='topic',
+ volume_id='volume_id',
+ request_spec='fake_request_spec',
+ filter_properties='filter_properties',
+ version='1.4')
self.assertEqual(CONF.scheduler_default_weighers,
['AllocatedCapacityWeigher'])
+ @mock.patch('cinder.db.volume_update')
+ @mock.patch('cinder.db.volume_get')
+ def test_retype_volume_exception_returns_volume_state(self, _mock_vol_get,
+ _mock_vol_update):
+ # Test NoValidHost exception behavior for retype.
+ # Puts the volume in original state and eats the exception.
+ fake_volume_id = 1
+ topic = 'fake_topic'
+ volume_id = fake_volume_id
+ request_spec = {'volume_id': fake_volume_id, 'volume_type': {'id': 3},
+ 'migration_policy': 'on-demand'}
+ vol_info = {'id': fake_volume_id, 'status': 'in-use',
+ 'instance_uuid': 'foo', 'attached_host': None}
+
+ _mock_vol_get.return_value = vol_info
+ _mock_vol_update.return_value = {'status': 'in-use'}
+ _mock_find_retype_host = mock.Mock(
+ side_effect=exception.NoValidHost(reason=""))
+ orig_retype = self.manager.driver.find_retype_host
+ self.manager.driver.find_retype_host = _mock_find_retype_host
+
+ self.manager.retype(self.context, topic, volume_id,
+ request_spec=request_spec,
+ filter_properties={})
+
+ _mock_vol_get.assert_called_once_with(self.context, fake_volume_id)
+ _mock_find_retype_host.assert_called_once_with(self.context,
+ request_spec, {},
+ 'on-demand')
+ _mock_vol_update.assert_called_once_with(self.context, fake_volume_id,
+ {'status': 'in-use'})
+ self.manager.driver.find_retype_host = orig_retype
+
class SchedulerTestCase(test.TestCase):
"""Test case for base scheduler driver class."""
from cinder.volume.drivers import lvm
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
+from cinder.volume import volume_types
QUOTAS = quota.QUOTAS
"""Test volume migration done by driver."""
# stub out driver and rpc functions
self.stubs.Set(self.volume.driver, 'migrate_volume',
- lambda x, y, z: (True, {'user_id': 'foo'}))
+ lambda x, y, z, new_type_id=None: (True,
+ {'user_id': 'foo'}))
volume = tests_utils.create_volume(self.context, size=0,
host=CONF.host,
self.assertEqual(volume['host'], 'newhost')
self.assertIsNone(volume['migration_status'])
+ def _retype_volume_exec(self, driver, snap=False, policy='on-demand',
+ migrate_exc=False, exc=None, diff_equal=False):
+ elevated = context.get_admin_context()
+ project_id = self.context.project_id
+
+ db.volume_type_create(elevated, {'name': 'old', 'extra_specs': {}})
+ old_vol_type = db.volume_type_get_by_name(elevated, 'old')
+ db.volume_type_create(elevated, {'name': 'new', 'extra_specs': {}})
+ vol_type = db.volume_type_get_by_name(elevated, 'new')
+ db.quota_create(elevated, project_id, 'volumes_new', 10)
+
+ volume = tests_utils.create_volume(self.context, size=1,
+ host=CONF.host, status='retyping',
+ volume_type_id=old_vol_type['id'])
+ if snap:
+ self._create_snapshot(volume['id'], size=volume['size'])
+ host_obj = {'host': 'newhost', 'capabilities': {}}
+
+ reserve_opts = {'volumes': 1, 'gigabytes': volume['size']}
+ QUOTAS.add_volume_type_opts(self.context,
+ reserve_opts,
+ vol_type['id'])
+ reservations = QUOTAS.reserve(self.context,
+ project_id=project_id,
+ **reserve_opts)
+
+ with mock.patch.object(self.volume.driver, 'retype') as _retype:
+ with mock.patch.object(volume_types, 'volume_types_diff') as _diff:
+ with mock.patch.object(self.volume, 'migrate_volume') as _mig:
+ _retype.return_value = driver
+ _diff.return_value = ({}, diff_equal)
+ if migrate_exc:
+ _mig.side_effect = KeyError
+ else:
+ _mig.return_value = True
+
+ if not exc:
+ self.volume.retype(self.context, volume['id'],
+ vol_type['id'], host_obj,
+ migration_policy=policy,
+ reservations=reservations)
+ else:
+ self.assertRaises(exc, self.volume.retype,
+ self.context, volume['id'],
+ vol_type['id'], host_obj,
+ migration_policy=policy,
+ reservations=reservations)
+
+ # get volume/quota properties
+ volume = db.volume_get(elevated, volume['id'])
+ try:
+ usage = db.quota_usage_get(elevated, project_id, 'volumes_new')
+ volumes_in_use = usage.in_use
+ except exception.QuotaUsageNotFound:
+ volumes_in_use = 0
+
+ # check properties
+ if not exc:
+ self.assertEqual(volume['volume_type_id'], vol_type['id'])
+ self.assertEqual(volume['status'], 'available')
+ self.assertEqual(volumes_in_use, 1)
+ else:
+ self.assertEqual(volume['volume_type_id'], old_vol_type['id'])
+ self.assertEqual(volume['status'], 'available')
+ self.assertEqual(volumes_in_use, 0)
+
+ def test_retype_volume_driver_success(self):
+ self._retype_volume_exec(True)
+
+ def test_retype_volume_migration_bad_policy(self):
+ # Test volume retype that requires migration by not allowed
+ self._retype_volume_exec(False, policy='never',
+ exc=exception.VolumeMigrationFailed)
+
+ def test_retype_volume_migration_with_snaps(self):
+ self._retype_volume_exec(False, snap=True, exc=exception.InvalidVolume)
+
+ def test_retype_volume_migration_failed(self):
+ self._retype_volume_exec(False, migrate_exc=True, exc=KeyError)
+
+ def test_retype_volume_migration_success(self):
+ self._retype_volume_exec(False, migrate_exc=False, exc=None)
+
+ def test_retype_volume_migration_equal_types(self):
+ self._retype_volume_exec(False, diff_equal=True)
+
def test_update_volume_readonly_flag(self):
"""Test volume readonly flag can be updated at API level."""
# create a volume and assign to host
new_volume=self.fake_volume,
error=False,
version='1.10')
+
+ def test_retype(self):
+ class FakeHost(object):
+ def __init__(self):
+ self.host = 'host'
+ self.capabilities = {}
+ dest_host = FakeHost()
+ self._test_volume_api('retype',
+ rpc_method='cast',
+ volume=self.fake_volume,
+ new_type_id='fake',
+ dest_host=dest_host,
+ migration_policy='never',
+ reservations=None,
+ version='1.12')
'k3': 'v3'}}}
res = volume_types.get_volume_type_qos_specs(type_ref['id'])
self.assertDictMatch(expected, res)
+
+ def test_volume_types_diff(self):
+ #type_ref 1 and 2 have the same extra_specs, while 3 has different
+ keyvals1 = {"key1": "val1", "key2": "val2"}
+ keyvals2 = {"key1": "val0", "key2": "val2"}
+ type_ref1 = volume_types.create(self.ctxt, "type1", keyvals1)
+ type_ref2 = volume_types.create(self.ctxt, "type2", keyvals1)
+ type_ref3 = volume_types.create(self.ctxt, "type3", keyvals2)
+
+ # Check equality with only extra_specs
+ diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
+ type_ref2['id'])
+ self.assertEqual(same, True)
+ self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val1'))
+ diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
+ type_ref3['id'])
+ self.assertEqual(same, False)
+ self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val0'))
+
+ #qos_ref 1 and 2 have the same specs, while 3 has different
+ qos_keyvals1 = {'k1': 'v1', 'k2': 'v2', 'k3': 'v3'}
+ qos_keyvals2 = {'k1': 'v0', 'k2': 'v2', 'k3': 'v3'}
+ qos_ref1 = qos_specs.create(self.ctxt, 'qos-specs-1', qos_keyvals1)
+ qos_ref2 = qos_specs.create(self.ctxt, 'qos-specs-2', qos_keyvals1)
+ qos_ref3 = qos_specs.create(self.ctxt, 'qos-specs-3', qos_keyvals2)
+
+ # Check equality with qos specs too
+ qos_specs.associate_qos_with_type(self.ctxt, qos_ref1['id'],
+ type_ref1['id'])
+ qos_specs.associate_qos_with_type(self.ctxt, qos_ref2['id'],
+ type_ref2['id'])
+ diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
+ type_ref2['id'])
+ self.assertEqual(same, True)
+ self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val1'))
+ self.assertEqual(diff['qos_specs']['k1'], ('v1', 'v1'))
+ qos_specs.disassociate_qos_specs(self.ctxt, qos_ref2['id'],
+ type_ref2['id'])
+ qos_specs.associate_qos_with_type(self.ctxt, qos_ref3['id'],
+ type_ref2['id'])
+ diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
+ type_ref2['id'])
+ self.assertEqual(same, False)
+ self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val1'))
+ self.assertEqual(diff['qos_specs']['k1'], ('v1', 'v0'))
+ qos_specs.disassociate_qos_specs(self.ctxt, qos_ref3['id'],
+ type_ref2['id'])
+ qos_specs.associate_qos_with_type(self.ctxt, qos_ref2['id'],
+ type_ref2['id'])
+
+ # And add encryption for good measure
+ enc_keyvals1 = {'cipher': 'c1', 'key_size': 256, 'provider': 'p1',
+ 'control_location': 'front-end'}
+ enc_keyvals2 = {'cipher': 'c1', 'key_size': 128, 'provider': 'p1',
+ 'control_location': 'front-end'}
+ db.volume_type_encryption_update_or_create(self.ctxt, type_ref1['id'],
+ enc_keyvals1)
+ db.volume_type_encryption_update_or_create(self.ctxt, type_ref2['id'],
+ enc_keyvals2)
+ diff, same = volume_types.volume_types_diff(self.ctxt, type_ref1['id'],
+ type_ref2['id'])
+ self.assertEqual(same, False)
+ self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val1'))
+ self.assertEqual(diff['qos_specs']['k1'], ('v1', 'v1'))
+ self.assertEqual(diff['encryption']['key_size'], (256, 128))
migration_status=None,
size=1,
availability_zone='fake_az',
+ volume_type_id=None,
**kwargs):
"""Create a volume object in the DB."""
vol = {}
vol['display_description'] = display_description
vol['attach_status'] = 'detached'
vol['availability_zone'] = availability_zone
+ if volume_type_id:
+ vol['volume_type_id'] = volume_type_id
for key in kwargs:
vol[key] = kwargs[key]
return db.volume_create(ctxt, vol)
from cinder.openstack.common import excutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import timeutils
+from cinder.openstack.common import uuidutils
import cinder.policy
from cinder import quota
+from cinder import quota_utils
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import utils
from cinder.volume.flows import create_volume
+from cinder.volume import qos_specs
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
self.update_volume_admin_metadata(context.elevated(), volume,
{'readonly': str(flag)})
+ @wrap_check_policy
+ def retype(self, context, volume, new_type, migration_policy=None):
+ """Attempt to modify the type associated with an existing volume."""
+ if volume['status'] not in ['available', 'in-use']:
+ msg = _('Unable to update type due to incorrect status '
+ 'on volume: %s') % volume['id']
+ LOG.error(msg)
+ raise exception.InvalidVolume(reason=msg)
+
+ if volume['migration_status'] is not None:
+ msg = (_("Volume %s is already part of an active migration.")
+ % volume['id'])
+ LOG.error(msg)
+ raise exception.InvalidVolume(reason=msg)
+
+ if migration_policy and migration_policy not in ['on-demand', 'never']:
+ msg = _('migration_policy must be \'on-demand\' or \'never\', '
+ 'passed: %s') % str(new_type)
+ LOG.error(msg)
+ raise exception.InvalidInput(reason=msg)
+
+ # Support specifying volume type by ID or name
+ try:
+ if uuidutils.is_uuid_like(new_type):
+ vol_type = volume_types.get_volume_type(context, new_type)
+ else:
+ vol_type = volume_types.get_volume_type_by_name(context,
+ new_type)
+ except exception.InvalidVolumeType:
+ msg = _('Invalid volume_type passed: %s') % str(new_type)
+ LOG.error(msg)
+ raise exception.InvalidInput(reason=msg)
+
+ vol_type_id = vol_type['id']
+ vol_type_qos_id = vol_type['qos_specs_id']
+
+ old_vol_type = None
+ old_vol_type_id = volume['volume_type_id']
+ old_vol_type_qos_id = None
+
+ # Error if the original and new type are the same
+ if volume['volume_type_id'] == vol_type_id:
+ msg = _('New volume_type same as original: %s') % str(new_type)
+ LOG.error(msg)
+ raise exception.InvalidInput(reason=msg)
+
+ if volume['volume_type_id']:
+ old_vol_type = volume_types.get_volume_type(
+ context, old_vol_type_id)
+ old_vol_type_qos_id = old_vol_type['qos_specs_id']
+
+ # We don't support changing encryption requirements yet
+ old_enc = volume_types.get_volume_type_encryption(context,
+ old_vol_type_id)
+ new_enc = volume_types.get_volume_type_encryption(context,
+ vol_type_id)
+ if old_enc != new_enc:
+ msg = _('Retype cannot change encryption requirements')
+ raise exception.InvalidInput(reason=msg)
+
+ # We don't support changing QoS at the front-end yet for in-use volumes
+ # TODO(avishay): Call Nova to change QoS setting (libvirt has support
+ # - virDomainSetBlockIoTune() - Nova does not have support yet).
+ if (volume['status'] != 'available' and
+ old_vol_type_qos_id != vol_type_qos_id):
+ for qos_id in [old_vol_type_qos_id, vol_type_qos_id]:
+ if qos_id:
+ specs = qos_specs.get_qos_specs(context.elevated(), qos_id)
+ if specs['qos_specs']['consumer'] != 'back-end':
+ msg = _('Retype cannot change front-end qos specs for '
+ 'in-use volumes')
+ raise exception.InvalidInput(reason=msg)
+
+ self.update(context, volume, {'status': 'retyping'})
+
+ # We're checking here in so that we can report any quota issues as
+ # early as possible, but won't commit until we change the type. We
+ # pass the reservations onward in case we need to roll back.
+ reservations = quota_utils.get_volume_type_reservation(context, volume,
+ vol_type_id)
+ request_spec = {'volume_properties': volume,
+ 'volume_id': volume['id'],
+ 'volume_type': vol_type,
+ 'migration_policy': migration_policy,
+ 'quota_reservations': reservations}
+
+ self.scheduler_rpcapi.retype(context, CONF.volume_topic, volume['id'],
+ request_spec=request_spec,
+ filter_properties={})
+
class HostAPI(base.Base):
def __init__(self):
Returns a boolean indicating whether the migration occurred, as well as
model_update.
+
+ :param ctxt: Context
+ :param volume: A dictionary describing the volume to migrate
+ :param host: A dictionary describing the host to migrate to, where
+ host['host'] is its name, and host['capabilities'] is a
+ dictionary of its reported capabilities.
"""
return (False, None)
+ def retype(self, context, volume, new_type, diff, host):
+ """Convert the volume to be of the new type.
+
+ Returns a boolean indicating whether the retype occurred.
+
+ :param ctxt: Context
+ :param volume: A dictionary describing the volume to migrate
+ :param new_type: A dictionary describing the volume type to convert to
+ :param diff: A dictionary with the difference between the two types
+ :param host: A dictionary describing the host to migrate to, where
+ host['host'] is its name, and host['capabilities'] is a
+ dictionary of its reported capabilities.
+ """
+ return False
+
class ISCSIDriver(VolumeDriver):
"""Executes commands relating to ISCSI volumes.
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
- RPC_API_VERSION = '1.11'
+ RPC_API_VERSION = '1.12'
def __init__(self, volume_driver=None, service_name=None,
*args, **kwargs):
volume_ref = self.db.volume_get(context.elevated(), volume_id)
self.driver.accept_transfer(context, volume_ref, new_user, new_project)
- def _migrate_volume_generic(self, ctxt, volume, host):
+ def _migrate_volume_generic(self, ctxt, volume, host, new_type_id):
rpcapi = volume_rpcapi.VolumeAPI()
# Create new volume on remote host
# We don't copy volume_type because the db sets that according to
# volume_type_id, which we do copy
del new_vol_values['volume_type']
+ if new_type_id:
+ new_vol_values['volume_type_id'] = new_type_id
new_vol_values['host'] = host['host']
new_vol_values['status'] = 'creating'
new_vol_values['migration_status'] = 'target:%s' % volume['id']
# Copy the source volume to the destination volume
try:
- if volume['status'] == 'available':
+ if (volume['instance_uuid'] is None and
+ volume['attached_host'] is None):
self.driver.copy_volume_data(ctxt, volume, new_volume,
remote='dest')
# The above call is synchronous so we complete the migration
rpcapi.delete_volume(ctxt, new_volume)
new_volume['migration_status'] = None
+ def _get_original_status(self, volume):
+ if (volume['instance_uuid'] is None and
+ volume['attached_host'] is None):
+ return 'available'
+ else:
+ return 'in-use'
+
+ @utils.require_driver_initialized
def migrate_volume_completion(self, ctxt, volume_id, new_volume_id,
error=False):
msg = _("migrate_volume_completion: completing migration for "
new_volume = self.db.volume_get(ctxt, new_volume_id)
rpcapi = volume_rpcapi.VolumeAPI()
+ status_update = None
+ if volume['status'] == 'retyping':
+ status_update = {'status': self._get_original_status(volume)}
+
if error:
msg = _("migrate_volume_completion is cleaning up an error "
"for volume %(vol1)s (temporary volume %(vol2)s")
'vol2': new_volume['id']})
new_volume['migration_status'] = None
rpcapi.delete_volume(ctxt, new_volume)
- self.db.volume_update(ctxt, volume_id, {'migration_status': None})
+ updates = {'migration_status': None}
+ if status_update:
+ updates.update(status_update)
+ self.db.volume_update(ctxt, volume_id, updates)
return volume_id
self.db.volume_update(ctxt, volume_id,
self.db.finish_volume_migration(ctxt, volume_id, new_volume_id)
self.db.volume_destroy(ctxt, new_volume_id)
- self.db.volume_update(ctxt, volume_id, {'migration_status': None})
+ updates = {'migration_status': None}
+ if status_update:
+ updates.update(status_update)
+ self.db.volume_update(ctxt, volume_id, updates)
return volume['id']
@utils.require_driver_initialized
- def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False):
+ def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False,
+ new_type_id=None):
"""Migrate the volume to the specified host (called on source host)."""
volume_ref = self.db.volume_get(ctxt, volume_id)
model_update = None
moved = False
+ status_update = None
+ if volume_ref['status'] == 'retyping':
+ status_update = {'status': self._get_original_status(volume_ref)}
+
self.db.volume_update(ctxt, volume_ref['id'],
{'migration_status': 'migrating'})
- if not force_host_copy:
+ if not force_host_copy and new_type_id is None:
try:
LOG.debug(_("volume %s: calling driver migrate_volume"),
volume_ref['id'])
if moved:
updates = {'host': host['host'],
'migration_status': None}
+ if status_update:
+ updates.update(status_update)
if model_update:
updates.update(model_update)
volume_ref = self.db.volume_update(ctxt,
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': None}
+ if status_update:
+ updates.update(status_update)
model_update = self.driver.create_export(ctxt, volume_ref)
if model_update:
updates.update(model_update)
self.db.volume_update(ctxt, volume_ref['id'], updates)
if not moved:
try:
- self._migrate_volume_generic(ctxt, volume_ref, host)
+ self._migrate_volume_generic(ctxt, volume_ref, host,
+ new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': None}
+ if status_update:
+ updates.update(status_update)
model_update = self.driver.create_export(ctxt, volume_ref)
if model_update:
updates.update(model_update)
self._notify_about_volume_usage(
context, volume, "resize.end",
extra_usage_info={'size': int(new_size)})
+
+ @utils.require_driver_initialized
+ def retype(self, ctxt, volume_id, new_type_id, host,
+ migration_policy='never', reservations=None):
+ def _retype_error(context, volume_id, old_reservations,
+ new_reservations, status_update):
+ try:
+ self.db.volume_update(context, volume_id, status_update)
+ finally:
+ QUOTAS.rollback(context, old_reservations)
+ QUOTAS.rollback(context, new_reservations)
+
+ context = ctxt.elevated()
+
+ volume_ref = self.db.volume_get(ctxt, volume_id)
+ status_update = {'status': self._get_original_status(volume_ref)}
+ if context.project_id != volume_ref['project_id']:
+ project_id = volume_ref['project_id']
+ else:
+ project_id = context.project_id
+
+ # Get old reservations
+ try:
+ reserve_opts = {'volumes': -1, 'gigabytes': -volume_ref['size']}
+ QUOTAS.add_volume_type_opts(context,
+ reserve_opts,
+ volume_ref.get('volume_type_id'))
+ old_reservations = QUOTAS.reserve(context,
+ project_id=project_id,
+ **reserve_opts)
+ except Exception:
+ old_reservations = None
+ self.db.volume_update(context, volume_id, status_update)
+ LOG.exception(_("Failed to update usages while retyping volume."))
+ raise exception.CinderException(_("Failed to get old volume type"
+ " quota reservations"))
+
+ # We already got the new reservations
+ new_reservations = reservations
+
+ # If volume types have the same contents, no need to do anything
+ retyped = False
+ diff, all_equal = volume_types.volume_types_diff(
+ context, volume_ref.get('volume_type_id'), new_type_id)
+ if all_equal:
+ retyped = True
+
+ # Call driver to try and change the type
+ if not retyped:
+ try:
+ new_type = volume_types.get_volume_type(context, new_type_id)
+ retyped = self.driver.retype(context, volume_ref, new_type,
+ diff, host)
+ if retyped:
+ LOG.info(_("Volume %s: retyped succesfully"), volume_id)
+ except Exception:
+ retyped = False
+ LOG.info(_("Volume %s: driver error when trying to retype, "
+ "falling back to generic mechanism."),
+ volume_ref['id'])
+
+ # We could not change the type, so we need to migrate the volume, where
+ # the destination volume will be of the new type
+ if not retyped:
+ if migration_policy == 'never':
+ _retype_error(context, volume_id, old_reservations,
+ new_reservations, status_update)
+ msg = _("Retype requires migration but is not allowed.")
+ raise exception.VolumeMigrationFailed(reason=msg)
+
+ snaps = self.db.snapshot_get_all_for_volume(context,
+ volume_ref['id'])
+ if snaps:
+ _retype_error(context, volume_id, old_reservations,
+ new_reservations, status_update)
+ msg = _("Volume must not have snapshots.")
+ LOG.error(msg)
+ raise exception.InvalidVolume(reason=msg)
+ self.db.volume_update(context, volume_ref['id'],
+ {'migration_status': 'starting'})
+
+ try:
+ self.migrate_volume(context, volume_id, host,
+ new_type_id=new_type_id)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ _retype_error(context, volume_id, old_reservations,
+ new_reservations, status_update)
+
+ self.db.volume_update(context, volume_id,
+ {'volume_type_id': new_type_id,
+ 'status': status_update['status']})
+
+ if old_reservations:
+ QUOTAS.commit(context, old_reservations, project_id=project_id)
+ if new_reservations:
+ QUOTAS.commit(context, new_reservations, project_id=project_id)
+ self.publish_service_capabilities(context)
1.10 - Add migrate_volume_completion, remove rename_volume.
1.11 - Adds mode parameter to attach_volume()
to support volume read-only attaching.
+ 1.12 - Adds retype.
'''
BASE_RPC_API_VERSION = '1.0'
topic=rpc.queue_get_for(ctxt, self.topic,
volume['host']),
version='1.10')
+
+ def retype(self, ctxt, volume, new_type_id, dest_host,
+ migration_policy='never', reservations=None):
+ host_p = {'host': dest_host.host,
+ 'capabilities': dest_host.capabilities}
+ self.cast(ctxt,
+ self.make_msg('retype',
+ volume_id=volume['id'],
+ new_type_id=new_type_id,
+ host=host_p,
+ migration_policy=migration_policy,
+ reservations=reservations),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']),
+ version='1.12')
return encryption is not None
+def get_volume_type_encryption(context, volume_type_id):
+ if volume_type_id is None:
+ return None
+
+ encryption = db.volume_type_encryption_get(context, volume_type_id)
+ return encryption
+
+
def get_volume_type_qos_specs(volume_type_id):
ctxt = context.get_admin_context()
res = db.volume_type_qos_specs_get(ctxt,
volume_type_id)
return res
+
+
+def volume_types_diff(context, vol_type_id1, vol_type_id2):
+ """Returns a 'diff' of two volume types and whether they are equal.
+
+ Returns a tuple of (diff, equal), where 'equal' is a boolean indicating
+ whether there is any difference, and 'diff' is a dictionary with the
+ following format:
+ {'extra_specs': {'key1': (value_in_1st_vol_type, value_in_2nd_vol_type),
+ 'key2': (value_in_1st_vol_type, value_in_2nd_vol_type),
+ ...}
+ 'qos_specs': {'key1': (value_in_1st_vol_type, value_in_2nd_vol_type),
+ 'key2': (value_in_1st_vol_type, value_in_2nd_vol_type),
+ ...}
+ 'encryption': {'cipher': (value_in_1st_vol_type, value_in_2nd_vol_type),
+ {'key_size': (value_in_1st_vol_type, value_in_2nd_vol_type),
+ ...}
+ """
+ def _fix_qos_specs(qos_specs):
+ if qos_specs:
+ qos_specs.pop('id', None)
+ qos_specs.pop('name', None)
+ qos_specs.update(qos_specs.pop('specs', {}))
+
+ def _fix_encryption_specs(encryption):
+ if encryption1:
+ encryption = dict(encryption)
+ for param in ['volume_type_id', 'created_at', 'updated_at',
+ 'deleted_at']:
+ encryption.pop(param, None)
+
+ def _dict_diff(dict1, dict2):
+ res = {}
+ equal = True
+ if dict1 is None:
+ dict1 = {}
+ if dict2 is None:
+ dict2 = {}
+ for k, v in dict1.iteritems():
+ res[k] = (v, dict2.get(k))
+ if k not in dict2 or res[k][0] != res[k][1]:
+ equal = False
+ for k, v in dict2.iteritems():
+ res[k] = (dict1.get(k), v)
+ if k not in dict1 or res[k][0] != res[k][1]:
+ equal = False
+ return (res, equal)
+
+ all_equal = True
+ diff = {}
+ vol_type1 = get_volume_type(context, vol_type_id1)
+ vol_type2 = get_volume_type(context, vol_type_id2)
+
+ extra_specs1 = vol_type1.get('extra_specs')
+ extra_specs2 = vol_type2.get('extra_specs')
+ diff['extra_specs'], equal = _dict_diff(extra_specs1, extra_specs2)
+ if not equal:
+ all_equal = False
+
+ qos_specs1 = get_volume_type_qos_specs(vol_type_id1).get('qos_specs')
+ _fix_qos_specs(qos_specs1)
+ qos_specs2 = get_volume_type_qos_specs(vol_type_id2).get('qos_specs')
+ _fix_qos_specs(qos_specs2)
+ diff['qos_specs'], equal = _dict_diff(qos_specs1, qos_specs2)
+ if not equal:
+ all_equal = False
+
+ encryption1 = get_volume_type_encryption(context, vol_type_id1)
+ _fix_encryption_specs(encryption1)
+ encryption2 = get_volume_type_encryption(context, vol_type_id2)
+ _fix_encryption_specs(encryption2)
+ diff['encryption'], equal = _dict_diff(encryption1, encryption2)
+ if not equal:
+ all_equal = False
+
+ return (diff, all_equal)
"volume:get_all_snapshots": [],
"volume:extend": [],
"volume:update_readonly_flag": [],
+ "volume:retype": [],
"volume_extension:types_manage": [["rule:admin_api"]],
"volume_extension:types_extra_specs": [["rule:admin_api"]],