From 2f7d2dce0e89ace77a23db834df4dd3f64c360c0 Mon Sep 17 00:00:00 2001 From: Roman Podolyaka Date: Mon, 22 Jul 2013 18:27:51 +0300 Subject: [PATCH] Execute DB API methods in a single transaction Many DB API methods do a few queries to a DB (e. g. SELECT and then UPDATE, or SELECT and then DELETE, etc). By default, a Session instance is used with autocommit=True, which means, that each query to a DB is done in a separate transaction. This is error-prone (as it may lead to race conditions or returning of unexpected results) and makes rollbacks harder (if one of transactions fail, the previous ones can not be rolled back). This patch ensures that all DB API methods, which do a few queries (or call private DB API methods), are executed inside a single transaction. Blueprint: db-session-cleanup Change-Id: Ie6510becffdeb78048fe4a09511ab326627d3412 --- cinder/db/sqlalchemy/api.py | 256 ++++++++++++++++++++---------------- 1 file changed, 146 insertions(+), 110 deletions(-) diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 493dc40a0..83fc5c10b 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -1128,18 +1128,20 @@ def volume_get(context, volume_id): @require_admin_context def volume_get_all(context, marker, limit, sort_key, sort_dir): - query = _volume_get_query(context) + session = get_session() + with session.begin(): + query = _volume_get_query(context, session=session) - marker_volume = None - if marker is not None: - marker_volume = _volume_get(context, marker) + marker_volume = None + if marker is not None: + marker_volume = _volume_get(context, marker, session=session) - query = sqlalchemyutils.paginate_query(query, models.Volume, limit, - [sort_key, 'created_at', 'id'], - marker=marker_volume, - sort_dir=sort_dir) + query = sqlalchemyutils.paginate_query(query, models.Volume, limit, + [sort_key, 'created_at', 'id'], + marker=marker_volume, + sort_dir=sort_dir) - return query.all() + return query.all() @require_admin_context @@ -1164,19 +1166,22 @@ def volume_get_all_by_instance_uuid(context, instance_uuid): @require_context def volume_get_all_by_project(context, project_id, marker, limit, sort_key, sort_dir): - authorize_project_context(context, project_id) - query = _volume_get_query(context).filter_by(project_id=project_id) + session = get_session() + with session.begin(): + authorize_project_context(context, project_id) + query = _volume_get_query(context, session).\ + filter_by(project_id=project_id) - marker_volume = None - if marker is not None: - marker_volume = _volume_get(context, marker) + marker_volume = None + if marker is not None: + marker_volume = _volume_get(context, marker, session) - query = sqlalchemyutils.paginate_query(query, models.Volume, limit, - [sort_key, 'created_at', 'id'], - marker=marker_volume, - sort_dir=sort_dir) + query = sqlalchemyutils.paginate_query(query, models.Volume, limit, + [sort_key, 'created_at', 'id'], + marker=marker_volume, + sort_dir=sort_dir) - return query.all() + return query.all() @require_admin_context @@ -1194,13 +1199,15 @@ def volume_get_iscsi_target_num(context, volume_id): @require_context def volume_update(context, volume_id, values): session = get_session() - metadata = values.get('metadata') - if metadata is not None: - volume_metadata_update(context, - volume_id, - values.pop('metadata'), - delete=True) with session.begin(): + metadata = values.get('metadata') + if metadata is not None: + _volume_metadata_update(context, + volume_id, + values.pop('metadata'), + delete=True, + session=session) + volume_ref = _volume_get(context, volume_id, session=session) volume_ref.update(values) volume_ref.save(session=session) @@ -1217,8 +1224,8 @@ def _volume_metadata_get_query(context, volume_id, session=None): @require_context @require_volume_exists -def volume_metadata_get(context, volume_id): - rows = _volume_metadata_get_query(context, volume_id).all() +def _volume_metadata_get(context, volume_id, session=None): + rows = _volume_metadata_get_query(context, volume_id, session).all() result = {} for row in rows: result[row['key']] = row['value'] @@ -1226,6 +1233,12 @@ def volume_metadata_get(context, volume_id): return result +@require_context +@require_volume_exists +def volume_metadata_get(context, volume_id): + return _volume_metadata_get(context, volume_id) + + @require_context @require_volume_exists def volume_metadata_delete(context, volume_id, key): @@ -1237,7 +1250,6 @@ def volume_metadata_delete(context, volume_id, key): @require_context -@require_volume_exists def _volume_metadata_get_item(context, volume_id, key, session=None): result = _volume_metadata_get_query(context, volume_id, session=session).\ filter_by(key=key).\ @@ -1257,38 +1269,49 @@ def volume_metadata_get_item(context, volume_id, key): @require_context @require_volume_exists -def volume_metadata_update(context, volume_id, metadata, delete): - session = get_session() - - # Set existing metadata to deleted if delete argument is True - if delete: - original_metadata = volume_metadata_get(context, volume_id) - for meta_key, meta_value in original_metadata.iteritems(): - if meta_key not in metadata: +def _volume_metadata_update(context, volume_id, metadata, delete, + session=None): + if not session: + session = get_session() + + with session.begin(subtransactions=True): + # Set existing metadata to deleted if delete argument is True + if delete: + original_metadata = _volume_metadata_get(context, volume_id, + session) + for meta_key, meta_value in original_metadata.iteritems(): + if meta_key not in metadata: + meta_ref = _volume_metadata_get_item(context, volume_id, + meta_key, session) + meta_ref.update({'deleted': True}) + meta_ref.save(session=session) + + meta_ref = None + + # Now update all existing items with new values, or create new meta + # objects + for meta_key, meta_value in metadata.items(): + + # update the value whether it exists or not + item = {"value": meta_value} + + try: meta_ref = _volume_metadata_get_item(context, volume_id, meta_key, session) - meta_ref.update({'deleted': True}) - meta_ref.save(session=session) + except exception.VolumeMetadataNotFound as e: + meta_ref = models.VolumeMetadata() + item.update({"key": meta_key, "volume_id": volume_id}) - meta_ref = None + meta_ref.update(item) + meta_ref.save(session=session) - # Now update all existing items with new values, or create new meta objects - for meta_key, meta_value in metadata.items(): - - # update the value whether it exists or not - item = {"value": meta_value} - - try: - meta_ref = _volume_metadata_get_item(context, volume_id, - meta_key, session) - except exception.VolumeMetadataNotFound as e: - meta_ref = models.VolumeMetadata() - item.update({"key": meta_key, "volume_id": volume_id}) + return metadata - meta_ref.update(item) - meta_ref.save(session=session) - return metadata +@require_context +@require_volume_exists +def volume_metadata_update(context, volume_id, metadata, delete): + return _volume_metadata_update(context, volume_id, metadata, delete) ################### @@ -1434,8 +1457,8 @@ def _snapshot_metadata_get_query(context, snapshot_id, session=None): @require_context @require_snapshot_exists -def snapshot_metadata_get(context, snapshot_id): - rows = _snapshot_metadata_get_query(context, snapshot_id).all() +def _snapshot_metadata_get(context, snapshot_id, session=None): + rows = _snapshot_metadata_get_query(context, snapshot_id, session).all() result = {} for row in rows: result[row['key']] = row['value'] @@ -1443,6 +1466,12 @@ def snapshot_metadata_get(context, snapshot_id): return result +@require_context +@require_snapshot_exists +def snapshot_metadata_get(context, snapshot_id): + return _snapshot_metadata_get(context, snapshot_id) + + @require_context @require_snapshot_exists def snapshot_metadata_delete(context, snapshot_id, key): @@ -1454,7 +1483,6 @@ def snapshot_metadata_delete(context, snapshot_id, key): @require_context -@require_snapshot_exists def _snapshot_metadata_get_item(context, snapshot_id, key, session=None): result = _snapshot_metadata_get_query(context, snapshot_id, @@ -1478,36 +1506,39 @@ def snapshot_metadata_get_item(context, snapshot_id, key): @require_snapshot_exists def snapshot_metadata_update(context, snapshot_id, metadata, delete): session = get_session() - - # Set existing metadata to deleted if delete argument is True - if delete: - original_metadata = snapshot_metadata_get(context, snapshot_id) - for meta_key, meta_value in original_metadata.iteritems(): - if meta_key not in metadata: + with session.begin(): + # Set existing metadata to deleted if delete argument is True + if delete: + original_metadata = _snapshot_metadata_get(context, snapshot_id, + session) + for meta_key, meta_value in original_metadata.iteritems(): + if meta_key not in metadata: + meta_ref = _snapshot_metadata_get_item(context, + snapshot_id, + meta_key, session) + meta_ref.update({'deleted': True}) + meta_ref.save(session=session) + + meta_ref = None + + # Now update all existing items with new values, or create new meta + # objects + for meta_key, meta_value in metadata.items(): + + # update the value whether it exists or not + item = {"value": meta_value} + + try: meta_ref = _snapshot_metadata_get_item(context, snapshot_id, meta_key, session) - meta_ref.update({'deleted': True}) - meta_ref.save(session=session) - - meta_ref = None + except exception.SnapshotMetadataNotFound as e: + meta_ref = models.SnapshotMetadata() + item.update({"key": meta_key, "snapshot_id": snapshot_id}) - # Now update all existing items with new values, or create new meta objects - for meta_key, meta_value in metadata.items(): + meta_ref.update(item) + meta_ref.save(session=session) - # update the value whether it exists or not - item = {"value": meta_value} - - try: - meta_ref = _snapshot_metadata_get_item(context, snapshot_id, - meta_key, session) - except exception.SnapshotMetadataNotFound as e: - meta_ref = models.SnapshotMetadata() - item.update({"key": meta_key, "snapshot_id": snapshot_id}) - - meta_ref.update(item) - meta_ref.save(session=session) - - return metadata + return metadata ################### @@ -1540,7 +1571,7 @@ def volume_type_create(context, values): models.VolumeTypeExtraSpecs) volume_type_ref = models.VolumeTypes() volume_type_ref.update(values) - volume_type_ref.save() + volume_type_ref.save(session=session) except Exception as e: raise db_exc.DBError(e) return volume_type_ref @@ -1615,10 +1646,10 @@ def volume_type_get_by_name(context, name): @require_admin_context def volume_type_destroy(context, id): - _volume_type_get(context, id) - session = get_session() with session.begin(): + _volume_type_get(context, id, session) + session.query(models.VolumeTypes).\ filter_by(id=id).\ update({'deleted': True, @@ -1674,12 +1705,14 @@ def volume_type_extra_specs_get(context, volume_type_id): @require_context def volume_type_extra_specs_delete(context, volume_type_id, key): session = get_session() - _volume_type_extra_specs_get_item(context, volume_type_id, key, session) - _volume_type_extra_specs_query(context, volume_type_id).\ - filter_by(key=key).\ - update({'deleted': True, - 'deleted_at': timeutils.utcnow(), - 'updated_at': literal_column('updated_at')}) + with session.begin(): + _volume_type_extra_specs_get_item(context, volume_type_id, key, + session) + _volume_type_extra_specs_query(context, volume_type_id, session).\ + filter_by(key=key).\ + update({'deleted': True, + 'deleted_at': timeutils.utcnow(), + 'updated_at': literal_column('updated_at')}) @require_context @@ -1707,18 +1740,20 @@ def volume_type_extra_specs_get_item(context, volume_type_id, key): def volume_type_extra_specs_update_or_create(context, volume_type_id, specs): session = get_session() - spec_ref = None - for key, value in specs.iteritems(): - try: - spec_ref = _volume_type_extra_specs_get_item( - context, volume_type_id, key, session) - except exception.VolumeTypeExtraSpecsNotFound as e: - spec_ref = models.VolumeTypeExtraSpecs() - spec_ref.update({"key": key, "value": value, - "volume_type_id": volume_type_id, - "deleted": False}) - spec_ref.save(session=session) - return specs + with session.begin(): + spec_ref = None + for key, value in specs.iteritems(): + try: + spec_ref = _volume_type_extra_specs_get_item( + context, volume_type_id, key, session) + except exception.VolumeTypeExtraSpecsNotFound as e: + spec_ref = models.VolumeTypeExtraSpecs() + spec_ref.update({"key": key, "value": value, + "volume_type_id": volume_type_id, + "deleted": False}) + spec_ref.save(session=session) + + return specs #################### @@ -1798,8 +1833,9 @@ def volume_glance_metadata_copy_to_snapshot(context, snapshot_id, volume_id): """ session = get_session() - metadata = _volume_glance_metadata_get(context, volume_id, session=session) with session.begin(): + metadata = _volume_glance_metadata_get(context, volume_id, + session=session) for meta in metadata: vol_glance_metadata = models.VolumeGlanceMetadata() vol_glance_metadata.snapshot_id = snapshot_id @@ -1821,10 +1857,10 @@ def volume_glance_metadata_copy_from_volume_to_volume(context, """ session = get_session() - metadata = _volume_glance_metadata_get(context, - src_volume_id, - session=session) with session.begin(): + metadata = _volume_glance_metadata_get(context, + src_volume_id, + session=session) for meta in metadata: vol_glance_metadata = models.VolumeGlanceMetadata() vol_glance_metadata.volume_id = volume_id @@ -1844,9 +1880,9 @@ def volume_glance_metadata_copy_to_volume(context, volume_id, snapshot_id): """ session = get_session() - metadata = _volume_snapshot_glance_metadata_get(context, snapshot_id, - session=session) with session.begin(): + metadata = _volume_snapshot_glance_metadata_get(context, snapshot_id, + session=session) for meta in metadata: vol_glance_metadata = models.VolumeGlanceMetadata() vol_glance_metadata.volume_id = volume_id -- 2.45.2