From 83d5b25ed0951de2cb55bde759557c018ed4abb0 Mon Sep 17 00:00:00 2001 From: Michal Dulko Date: Tue, 21 Jul 2015 12:30:56 +0200 Subject: [PATCH] Refactoring of manager's create_volume flow This commit resolves a TODO item (bulk metadata create), rephrases some comments and makes use of new TaskFlow functionality to save results of revert command to make decisions in c-vol manager instead of doing an ugly workaround by injecting the information into exception raised. Partial-Implements: blueprint taskflow-refactoring Depends-On: I82ebd0102aa5f50d98d9d6b48b13cd659d6dbcec Change-Id: Ia4739c87ca83ae725e16256e3f2c596c5e6d935b --- cinder/db/api.py | 6 ++ cinder/db/sqlalchemy/api.py | 28 +++++ cinder/flow_utils.py | 8 +- cinder/volume/flows/manager/create_volume.py | 103 +++++++++---------- cinder/volume/manager.py | 21 ++-- 5 files changed, 101 insertions(+), 65 deletions(-) diff --git a/cinder/db/api.py b/cinder/db/api.py index 3c5779d81..c7c55b819 100644 --- a/cinder/db/api.py +++ b/cinder/db/api.py @@ -639,6 +639,12 @@ def volume_glance_metadata_create(context, volume_id, key, value): value) +def volume_glance_metadata_bulk_create(context, volume_id, metadata): + """Add Glance metadata for specified volume (multiple pairs).""" + return IMPL.volume_glance_metadata_bulk_create(context, volume_id, + metadata) + + def volume_glance_metadata_get_all(context): """Return the glance metadata for all volumes.""" return IMPL.volume_glance_metadata_get_all(context) diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 02208a544..87bcb76ab 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -3154,6 +3154,34 @@ def volume_glance_metadata_create(context, volume_id, key, value): return +@require_context +@require_volume_exists +def volume_glance_metadata_bulk_create(context, volume_id, metadata): + """Update the Glance metadata for a volume by adding new key:value pairs. + + This API does not support changing the value of a key once it has been + created. + """ + + session = get_session() + with session.begin(): + for (key, value) in metadata.items(): + rows = session.query(models.VolumeGlanceMetadata).\ + filter_by(volume_id=volume_id).\ + filter_by(key=key).\ + filter_by(deleted=False).all() + + if len(rows) > 0: + raise exception.GlanceMetadataExists(key=key, + volume_id=volume_id) + + vol_glance_metadata = models.VolumeGlanceMetadata() + vol_glance_metadata.volume_id = volume_id + vol_glance_metadata.key = key + vol_glance_metadata.value = six.text_type(value) + session.add(vol_glance_metadata) + + @require_context @require_snapshot_exists def volume_glance_metadata_copy_to_snapshot(context, snapshot_id, volume_id): diff --git a/cinder/flow_utils.py b/cinder/flow_utils.py index 3bca9c820..711f5d301 100644 --- a/cinder/flow_utils.py +++ b/cinder/flow_utils.py @@ -40,9 +40,11 @@ class CinderTask(task.Task): """ def __init__(self, addons=None, **kwargs): - super(CinderTask, self).__init__(_make_task_name(self.__class__, - addons), - **kwargs) + super(CinderTask, self).__init__(self.make_name(addons), **kwargs) + + @classmethod + def make_name(cls, addons=None): + return _make_task_name(cls, addons) class DynamicLogListener(logging_listener.DynamicLoggingListener): diff --git a/cinder/volume/flows/manager/create_volume.py b/cinder/volume/flows/manager/create_volume.py index 0f775a639..7b9c6d4f7 100644 --- a/cinder/volume/flows/manager/create_volume.py +++ b/cinder/volume/flows/manager/create_volume.py @@ -49,6 +49,8 @@ IMAGE_ATTRIBUTES = ( class OnFailureRescheduleTask(flow_utils.CinderTask): """Triggers a rescheduling request to be sent when reverting occurs. + If rescheduling doesn't occur this task errors out the volume. + Reversion strategy: Triggers the rescheduling mechanism whereby a cast gets sent to the scheduler rpc api to allow for an attempt X of Y for scheduling this volume elsewhere. @@ -88,6 +90,31 @@ class OnFailureRescheduleTask(flow_utils.CinderTask): def execute(self, **kwargs): pass + def _pre_reschedule(self, context, volume_id): + """Actions that happen before the rescheduling attempt occur here.""" + + try: + # Update volume's timestamp and host. + # + # NOTE(harlowja): this is awkward to be done here, shouldn't + # this happen at the scheduler itself and not before it gets + # sent to the scheduler? (since what happens if it never gets + # there??). It's almost like we need a status of 'on-the-way-to + # scheduler' in the future. + # We don't need to update the volume's status to creating, since + # we haven't changed it to error. + update = { + 'scheduled_at': timeutils.utcnow(), + 'host': None, + } + LOG.debug("Updating volume %(volume_id)s with %(update)s.", + {'update': update, 'volume_id': volume_id}) + self.db.volume_update(context, volume_id, update) + except exception.CinderException: + # Don't let updating the state cause the rescheduling to fail. + LOG.exception(_LE("Volume %s: update volume state failed."), + volume_id) + def _reschedule(self, context, cause, request_spec, filter_properties, volume_id): """Actions that happen during the rescheduling attempt occur here.""" @@ -122,47 +149,18 @@ class OnFailureRescheduleTask(flow_utils.CinderTask): LOG.debug("Volume %s: re-scheduled", volume_id) - def _pre_reschedule(self, context, volume_id): - """Actions that happen before the rescheduling attempt occur here.""" - - try: - # Update volume's timestamp and host. - # - # NOTE(harlowja): this is awkward to be done here, shouldn't - # this happen at the scheduler itself and not before it gets - # sent to the scheduler? (since what happens if it never gets - # there??). It's almost like we need a status of 'on-the-way-to - # scheduler' in the future. - # We don't need to update the volume's status to creating, since - # we haven't changed it to error. - update = { - 'scheduled_at': timeutils.utcnow(), - 'host': None - } - LOG.debug("Updating volume %(volume_id)s with %(update)s.", - {'update': update, 'volume_id': volume_id}) - self.db.volume_update(context, volume_id, update) - except exception.CinderException: - # Don't let updating the state cause the rescheduling to fail. - LOG.exception(_LE("Volume %s: update volume state failed."), - volume_id) - - def revert(self, context, result, flow_failures, **kwargs): - volume_id = kwargs['volume_id'] + def revert(self, context, result, flow_failures, volume_id, **kwargs): + # NOTE(dulek): Revert is occurring and manager need to know if + # rescheduling happened. We're returning boolean flag that will + # indicate that. It which will be available in flow engine store + # through get_revert_result method. # If do not want to be rescheduled, just set the volume's status to # error and return. if not self.do_reschedule: common.error_out_volume(context, self.db, volume_id) LOG.error(_LE("Volume %s: create failed"), volume_id) - return - - # NOTE(dulek): Revert is occurring and manager need to know if - # rescheduling happened. We're injecting this information into - # exception that will be caught there. This is ugly and we need - # TaskFlow to support better way of returning data from reverted flow. - cause = list(flow_failures.values())[0] - cause.exception.rescheduled = False + return False # Check if we have a cause which can tell us not to reschedule and # set the volume's status to error. @@ -170,20 +168,22 @@ class OnFailureRescheduleTask(flow_utils.CinderTask): if failure.check(*self.no_reschedule_types): common.error_out_volume(context, self.db, volume_id) LOG.error(_LE("Volume %s: create failed"), volume_id) - return + return False # Use a different context when rescheduling. if self.reschedule_context: + cause = list(flow_failures.values())[0] context = self.reschedule_context try: self._pre_reschedule(context, volume_id) - self._reschedule(context, cause, **kwargs) + self._reschedule(context, cause, volume_id=volume_id, **kwargs) self._post_reschedule(context, volume_id) - # Inject information that we rescheduled - cause.exception.rescheduled = True + return True except exception.CinderException: LOG.exception(_LE("Volume %s: rescheduling failed"), volume_id) + return False + class ExtractVolumeRefTask(flow_utils.CinderTask): """Extracts volume reference for given volume id.""" @@ -206,11 +206,11 @@ class ExtractVolumeRefTask(flow_utils.CinderTask): return volume_ref def revert(self, context, volume_id, result, **kwargs): - if isinstance(result, ft.Failure): + if isinstance(result, ft.Failure) or not self.set_error: return - if self.set_error: - common.error_out_volume(context, self.db, volume_id) - LOG.error(_LE("Volume %s: create failed"), volume_id) + + common.error_out_volume(context, self.db, volume_id) + LOG.error(_LE("Volume %s: create failed"), volume_id) class ExtractVolumeSpecTask(flow_utils.CinderTask): @@ -561,21 +561,14 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): if value is not None: property_metadata[key] = value - # NOTE(harlowja): The best way for this to happen would be in bulk, - # but that doesn't seem to exist (yet), so we go through one by one - # which means we can have partial create/update failure. volume_metadata = dict(property_metadata) volume_metadata.update(base_metadata) LOG.debug("Creating volume glance metadata for volume %(volume_id)s" " backed by image %(image_id)s with: %(vol_metadata)s.", {'volume_id': volume_id, 'image_id': image_id, 'vol_metadata': volume_metadata}) - for (key, value) in volume_metadata.items(): - try: - self.db.volume_glance_metadata_create(context, volume_id, - key, value) - except exception.GlanceMetadataExists: - pass + self.db.volume_glance_metadata_bulk_create(context, volume_id, + volume_metadata) def _create_from_image(self, context, volume_ref, image_location, image_id, image_meta, @@ -710,7 +703,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask): # TODO(harlowja): is it acceptable to only log if this fails?? # or are there other side-effects that this will cause if the # status isn't updated correctly (aka it will likely be stuck in - # 'building' if this fails)?? + # 'creating' if this fails)?? volume_ref = self.db.volume_update(context, volume_id, update) # Now use the parent to notify. super(CreateVolumeOnFinishTask, self).execute(context, volume_ref) @@ -737,7 +730,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id, 3. Selects 1 of 2 activated only on *failure* tasks (one to update the db status & notify or one to update the db status & notify & *reschedule*). 4. Extracts a volume specification from the provided inputs. - 5. Notifies that the volume has start to be created. + 5. Notifies that the volume has started to be created. 6. Creates a volume from the extracted volume specification. 7. Attaches a on-success *only* task that notifies that the volume creation has ended and performs further database status updates. @@ -761,7 +754,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id, retry = filter_properties.get('retry', None) # Always add OnFailureRescheduleTask and we handle the change of volume's - # status when revert task flow. Meanwhile, no need to revert process of + # status when reverting the flow. Meanwhile, no need to revert process of # ExtractVolumeRefTask. do_reschedule = allow_reschedule and request_spec and retry volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index b9c6e21b3..a231e2f63 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -466,24 +466,31 @@ class VolumeManager(manager.SchedulerDependentManager): # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to # decide if allocated_capacity should be incremented. rescheduled = False + vol_ref = None try: if locked_action is None: _run_flow() else: _run_flow_locked() - except Exception as e: - if hasattr(e, 'rescheduled'): - rescheduled = e.rescheduled - raise finally: try: vol_ref = flow_engine.storage.fetch('volume_ref') - except tfe.NotFound as e: - # Flow was reverted, fetching volume_ref from the DB. - vol_ref = self.db.volume_get(context, volume_id) + except tfe.NotFound: + # If there's no vol_ref, then flow is reverted. Lets check out + # if rescheduling occurred. + try: + rescheduled = flow_engine.storage.get_revert_result( + create_volume.OnFailureRescheduleTask.make_name( + [create_volume.ACTION])) + except tfe.NotFound: + pass if not rescheduled: + if not vol_ref: + # Flow was reverted and not rescheduled, fetching + # volume_ref from the DB, because it will be needed. + vol_ref = self.db.volume_get(context, volume_id) # NOTE(dulek): Volume wasn't rescheduled so we need to update # volume stats as these are decremented on delete. self._update_allocated_capacity(vol_ref) -- 2.45.2