]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add atomic conditional updates to objects
authorGorka Eguileor <geguileo@redhat.com>
Sat, 25 Jul 2015 10:34:28 +0000 (12:34 +0200)
committerGorka Eguileor <geguileo@redhat.com>
Wed, 25 Nov 2015 15:49:57 +0000 (16:49 +0100)
To allow atomic state changes across Cinder services we need to
implement a way to easily do compare-and-swap.

This patch adds methods to allow compare-and-swap on DB models and on
Cinder Versioned Objects as well.

Conditions for the compare part of the update can consist of:
- Inclusion: status == 'available'
- Exclusion: status != 'in-use'
- Multi-inclusion: status in ('available', 'error')
- Multi-exclusion: status not in ('attaching', 'in-use')
- Sqlalchemy filters

A complete example of usage would be the compare-and-swap used in volume
delete requests that has to take in consideration not only the status
but the attach and migration status as well as the volume not having
snapshots:

 now = timeutils.utcnow()
 expected = {'attach_status': db.Not('attached'),
             'migration_status': None,
             'consistencygroup_id': None}
 good_status = ('available', 'error', 'error_restoring',
                'error_extending')
 if not force:
     expected.update(status=good_status)

 # Volume cannot have snapshots if we want to delete it
 filters = [~sql.exists().where(models.Volume.id ==
                                models.Snapshot.volume_id)]

 updated = vol_obj.conditional_update(
     {'status': 'deleting',
      'previous_status': vol_obj.model.status,
      'terminated_at': now},
     expected,
     filters)

It can also be specified whether to save already dirtied fields from the
objects or not and by default (if no expected_values argument is
provided) it will make sure that the entry in the DB has the same values
as the objects we are saving.

We can select values based on conditions using Case objects in the
'values' argument. For example:

 has_snapshot_filter = sql.exists().where(
     models.Snapshot.volume_id == models.Volume.id)
 case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
                           else_='no-snapshot')
 volume.conditional_update({'status': case_values},
                           {'status': 'available'})

Exclusion and multi-exclusion will handle, by default, NULL values like
Python does instead of like SQL does, so NULL values will be considered
different than any non NULL values.  That way if we search for something
not equal to 1 we will also get NULL values.

WARNING: SQLAlchemy does not allow selecting order of SET clauses, so
for now we cannot do things like
    {'previous_status': model.status, 'status': 'retyping'}
because it will result in both previous_status and status being set to
'retyping'.  Issue has been reported [1] and a patch to fix it [2] has
been submitted.
[1]: https://bitbucket.org/zzzeek/sqlalchemy/issues/3541
[2]: https://github.com/zzzeek/sqlalchemy/pull/200

Specs: https://review.openstack.org/232599/

Implements: blueprint cinder-volume-active-active-support
Related-Bug: #1490944
Related-Bug: #1238093
Related-Bug: #1490946
Related-Bug: #1469659
Related-Bug: #1493120
Related-Bug: #1493419
Related-Bug: #1493476
Related-Bug: #1494466
Change-Id: If90a37f8c7d6fad8fc1f861d52ba862875920cdc

cinder/db/api.py
cinder/db/sqlalchemy/api.py
cinder/objects/base.py
cinder/objects/snapshot.py
cinder/objects/volume.py
cinder/tests/unit/objects/test_base.py

index c086ee82868a06b57546b4cdc931c601220ec1af..dcd302ba89062f594ad6d3b6e41ac66a8f39e5a4 100644 (file)
@@ -41,6 +41,7 @@ from oslo_db import concurrency as db_concurrency
 from oslo_db import options as db_options
 
 from cinder.api import common
+from cinder.i18n import _
 
 db_opts = [
     cfg.BoolOpt('enable_new_services',
@@ -1089,3 +1090,113 @@ def get_model_for_versioned_object(versioned_object):
 
 def get_by_id(context, model, id, *args, **kwargs):
     return IMPL.get_by_id(context, model, id, *args, **kwargs)
+
+
+class Condition(object):
+    """Class for normal condition values for conditional_update."""
+    def __init__(self, value, field=None):
+        self.value = value
+        # Field is optional and can be passed when getting the filter
+        self.field = field
+
+    def get_filter(self, model, field=None):
+        return IMPL.condition_db_filter(model, self._get_field(field),
+                                        self.value)
+
+    def _get_field(self, field=None):
+        # We must have a defined field on initialization or when called
+        field = field or self.field
+        if not field:
+            raise ValueError(_('Condition has no field.'))
+        return field
+
+
+class Not(Condition):
+    """Class for negated condition values for conditional_update.
+
+    By default NULL values will be treated like Python treats None instead of
+    how SQL treats it.
+
+    So for example when values are (1, 2) it will evaluate to True when we have
+    value 3 or NULL, instead of only with 3 like SQL does.
+    """
+    def __init__(self, value, field=None, auto_none=True):
+        super(Not, self).__init__(value, field)
+        self.auto_none = auto_none
+
+    def get_filter(self, model, field=None):
+        # If implementation has a specific method use it
+        if hasattr(IMPL, 'condition_not_db_filter'):
+            return IMPL.condition_not_db_filter(model, self._get_field(field),
+                                                self.value, self.auto_none)
+
+        # Otherwise non negated object must adming ~ operator for not
+        return ~super(Not, self).get_filter(model, field)
+
+
+class Case(object):
+    """Class for conditional value selection for conditional_update."""
+    def __init__(self, whens, value=None, else_=None):
+        self.whens = whens
+        self.value = value
+        self.else_ = else_
+
+
+def is_orm_value(obj):
+    """Check if object is an ORM field."""
+    return IMPL.is_orm_value(obj)
+
+
+def conditional_update(context, model, values, expected_values, filters=(),
+                       include_deleted='no', project_only=False):
+    """Compare-and-swap conditional update.
+
+       Update will only occur in the DB if conditions are met.
+
+       We have 4 different condition types we can use in expected_values:
+        - Equality:  {'status': 'available'}
+        - Inequality: {'status': vol_obj.Not('deleting')}
+        - In range: {'status': ['available', 'error']
+        - Not in range: {'status': vol_obj.Not(['in-use', 'attaching'])
+
+       Method accepts additional filters, which are basically anything that
+       can be passed to a sqlalchemy query's filter method, for example:
+       [~sql.exists().where(models.Volume.id == models.Snapshot.volume_id)]
+
+       We can select values based on conditions using Case objects in the
+       'values' argument. For example:
+       has_snapshot_filter = sql.exists().where(
+           models.Snapshot.volume_id == models.Volume.id)
+       case_values = db.Case([(has_snapshot_filter, 'has-snapshot')],
+                             else_='no-snapshot')
+       db.conditional_update(context, models.Volume, {'status': case_values},
+                             {'status': 'available'})
+
+       And we can use DB fields for example to store previous status in the
+       corresponding field even though we don't know which value is in the db
+       from those we allowed:
+       db.conditional_update(context, models.Volume,
+                             {'status': 'deleting',
+                              'previous_status': models.Volume.status},
+                             {'status': ('available', 'error')})
+
+       WARNING: SQLAlchemy does not allow selecting order of SET clauses, so
+       for now we cannot do things like
+           {'previous_status': model.status, 'status': 'retyping'}
+       because it will result in both previous_status and status being set to
+       'retyping'.  Issue has been reported [1] and a patch to fix it [2] has
+       been submitted.
+       [1]: https://bitbucket.org/zzzeek/sqlalchemy/issues/3541/
+       [2]: https://github.com/zzzeek/sqlalchemy/pull/200
+
+       :param values: Dictionary of key-values to update in the DB.
+       :param expected_values: Dictionary of conditions that must be met
+                               for the update to be executed.
+       :param filters: Iterable with additional filters
+       :param include_deleted: Should the update include deleted items, this
+                               is equivalent to read_deleted
+       :param project_only: Should the query be limited to context's project.
+       :returns number of db rows that were updated
+    """
+    return IMPL.conditional_update(context, model, values, expected_values,
+                                   filters, include_deleted, project_only)
index d31f3aed31e698d997b096c0304785c3426a3e30..bcd1b2eaa918e5d3fecec14f019451f83462894e 100644 (file)
@@ -19,6 +19,7 @@
 """Implementation of SQLAlchemy backend."""
 
 
+import collections
 import datetime as dt
 import functools
 import re
@@ -38,7 +39,7 @@ import osprofiler.sqlalchemy
 import six
 import sqlalchemy
 from sqlalchemy import MetaData
-from sqlalchemy import or_
+from sqlalchemy import or_, case
 from sqlalchemy.orm import joinedload, joinedload_all
 from sqlalchemy.orm import RelationshipProperty
 from sqlalchemy.schema import Table
@@ -50,13 +51,13 @@ from sqlalchemy.sql import sqltypes
 
 from cinder.api import common
 from cinder.common import sqlalchemyutils
+from cinder import db
 from cinder.db.sqlalchemy import models
 from cinder import exception
 from cinder.i18n import _, _LW, _LE, _LI
 
 
 CONF = cfg.CONF
-CONF.import_group("profiler", "cinder.service")
 LOG = logging.getLogger(__name__)
 
 options.set_defaults(CONF, connection='sqlite:///$state_path/cinder.sqlite')
@@ -75,6 +76,11 @@ def _create_facade_lazily():
                 **dict(CONF.database)
             )
 
+            # NOTE(geguileo): To avoid a cyclical dependency we import the
+            # group here.  Dependency cycle is objects.base requires db.api,
+            # which requires db.sqlalchemy.api, which requires service which
+            # requires objects.base
+            CONF.import_group("profiler", "cinder.service")
             if CONF.profiler.profiler_enabled:
                 if CONF.profiler.trace_sqlalchemy:
                     osprofiler.sqlalchemy.add_tracing(sqlalchemy,
@@ -4191,3 +4197,84 @@ def get_by_id(context, model, id, *args, **kwargs):
         _GET_METHODS[model] = _get_get_method(model)
 
     return _GET_METHODS[model](context, id, *args, **kwargs)
+
+
+def condition_db_filter(model, field, value):
+    """Create matching filter.
+
+    If value is an iterable other than a string, any of the values is
+    a valid match (OR), so we'll use SQL IN operator.
+
+    If it's not an iterator == operator will be used.
+    """
+    orm_field = getattr(model, field)
+    # For values that must match and are iterables we use IN
+    if (isinstance(value, collections.Iterable) and
+            not isinstance(value, six.string_types)):
+        # We cannot use in_ when one of the values is None
+        if None not in value:
+            return orm_field.in_(value)
+
+        return or_(orm_field == v for v in value)
+
+    # For values that must match and are not iterables we use ==
+    return orm_field == value
+
+
+def condition_not_db_filter(model, field, value, auto_none=True):
+    """Create non matching filter.
+
+    If value is an iterable other than a string, any of the values is
+    a valid match (OR), so we'll use SQL IN operator.
+
+    If it's not an iterator == operator will be used.
+
+    If auto_none is True then we'll consider NULL values as different as well,
+    like we do in Python and not like SQL does.
+    """
+    result = ~condition_db_filter(model, field, value)
+
+    if (auto_none
+            and ((isinstance(value, collections.Iterable) and
+                  not isinstance(value, six.string_types)
+                  and None not in value)
+                 or (value is not None))):
+        orm_field = getattr(model, field)
+        result = or_(result, orm_field.is_(None))
+
+    return result
+
+
+def is_orm_value(obj):
+    """Check if object is an ORM field or expression."""
+    return isinstance(obj, (sqlalchemy.orm.attributes.InstrumentedAttribute,
+                            sqlalchemy.sql.expression.ColumnElement))
+
+
+@_retry_on_deadlock
+@require_context
+def conditional_update(context, model, values, expected_values, filters=(),
+                       include_deleted='no', project_only=False):
+    """Compare-and-swap conditional update SQLAlchemy implementation."""
+    # Provided filters will become part of the where clause
+    where_conds = list(filters)
+
+    # Build where conditions with operators ==, !=, NOT IN and IN
+    for field, condition in expected_values.items():
+        if not isinstance(condition, db.Condition):
+            condition = db.Condition(condition, field)
+        where_conds.append(condition.get_filter(model, field))
+
+    # Transform case values
+    values = {field: case(value.whens, value.value, value.else_)
+              if isinstance(value, db.Case)
+              else value
+              for field, value in values.items()}
+
+    query = model_query(context, model, read_deleted=include_deleted,
+                        project_only=project_only)
+
+    # Return True if we were able to change any DB entry, False otherwise
+    result = query.filter(*where_conds).update(values,
+                                               synchronize_session=False)
+    return 0 != result
index cb6db1ebd563fc4cab5d2b8e8bd666c4564b3476..efa1293f83750ebc86dcd0fbf3a83c47da7df8ca 100644 (file)
@@ -36,6 +36,12 @@ obj_make_list = base.obj_make_list
 class CinderObjectRegistry(base.VersionedObjectRegistry):
     def registration_hook(self, cls, index):
         setattr(objects, cls.obj_name(), cls)
+        # For Versioned Object Classes that have a model store the model in
+        # a Class attribute named model
+        try:
+            cls.model = db.get_model_for_versioned_object(cls)
+        except (ImportError, AttributeError):
+            pass
 
 
 class CinderObject(base.VersionedObject):
@@ -49,6 +55,9 @@ class CinderObject(base.VersionedObject):
     # version compatibility.
     VERSION_COMPATIBILITY = {'7.0.0': '1.0'}
 
+    Not = db.Not
+    Case = db.Case
+
     def cinder_obj_get_changes(self):
         """Returns a dict of changed fields with tz unaware datetimes.
 
@@ -87,6 +96,115 @@ class CinderObject(base.VersionedObject):
             kargs = {'expected_attrs': getattr(cls, 'DEFAULT_EXPECTED_ATTR')}
         return cls._from_db_object(context, cls(context), orm_obj, **kargs)
 
+    def conditional_update(self, values, expected_values=None, filters=(),
+                           save_all=False, session=None, reflect_changes=True):
+        """Compare-and-swap update.
+
+           A conditional object update that, unlike normal update, will SAVE
+           the contents of the update to the DB.
+
+           Update will only occur in the DB and the object if conditions are
+           met.
+
+           If no expected_values are passed in we will default to make sure
+           that all fields have not been changed in the DB. Since we cannot
+           know the original value in the DB for dirty fields in the object
+           those will be excluded.
+
+           We have 4 different condition types we can use in expected_values:
+            - Equality:  {'status': 'available'}
+            - Inequality: {'status': vol_obj.Not('deleting')}
+            - In range: {'status': ['available', 'error']
+            - Not in range: {'status': vol_obj.Not(['in-use', 'attaching'])
+
+           Method accepts additional filters, which are basically anything that
+           can be passed to a sqlalchemy query's filter method, for example:
+           [~sql.exists().where(models.Volume.id == models.Snapshot.volume_id)]
+
+           We can select values based on conditions using Case objects in the
+           'values' argument. For example:
+           has_snapshot_filter = sql.exists().where(
+               models.Snapshot.volume_id == models.Volume.id)
+           case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
+                                     else_='no-snapshot')
+           volume.conditional_update({'status': case_values},
+                                     {'status': 'available'}))
+
+            And we can use DB fields using model class attribute for example to
+            store previous status in the corresponding field even though we
+            don't know which value is in the db from those we allowed:
+            volume.conditional_update({'status': 'deleting',
+                                       'previous_status': volume.model.status},
+                                      {'status': ('available', 'error')})
+
+           :param values: Dictionary of key-values to update in the DB.
+           :param expected_values: Dictionary of conditions that must be met
+                                   for the update to be executed.
+           :param filters: Iterable with additional filters
+           :param save_all: Object may have changes that are not in the DB,
+                            this will say whether we want those changes saved
+                            as well.
+           :param session: Session to use for the update
+           :param reflect_changes: If we want changes made in the database to
+                                   be reflected in the versioned object.  This
+                                   may mean in some cases that we have to
+                                   reload the object from the database.
+           :returns number of db rows that were updated, which can be used as a
+                    boolean, since it will be 0 if we couldn't update the DB
+                    and 1 if we could, because we are using unique index id.
+        """
+        if 'id' not in self.fields:
+            msg = (_('VersionedObject %s does not support conditional update.')
+                   % (self.obj_name()))
+            raise NotImplementedError(msg)
+
+        # If no conditions are set we will require object in DB to be unchanged
+        if expected_values is None:
+            changes = self.obj_what_changed()
+
+            expected = {key: getattr(self, key)
+                        for key in self.fields.keys()
+                        if self.obj_attr_is_set(key) and key not in changes and
+                        key not in self.OPTIONAL_FIELDS}
+        else:
+            # Set the id in expected_values to limit conditional update to only
+            # change this object
+            expected = expected_values.copy()
+            expected['id'] = self.id
+
+        # If we want to save any additional changes the object has besides the
+        # ones referred in values
+        if save_all:
+            changes = self.cinder_obj_get_changes()
+            changes.update(values)
+            values = changes
+
+        result = db.conditional_update(self._context, self.model, values,
+                                       expected, filters)
+
+        # If we were able to update the DB then we need to update this object
+        # as well to reflect new DB contents and clear the object's dirty flags
+        # for those fields.
+        if result and reflect_changes:
+            # If we have used a Case, a db field or an expression in values we
+            # don't know which value was used, so we need to read the object
+            # back from the DB
+            if any(isinstance(v, self.Case) or db.is_orm_value(v)
+                   for v in values.values()):
+                # Read back object from DB
+                obj = type(self).get_by_id(self._context, self.id)
+                db_values = obj.obj_to_primitive()['versioned_object.data']
+                # Only update fields were changes were requested
+                values = {field: db_values[field]
+                          for field, value in values.items()}
+
+            # NOTE(geguileo): We don't use update method because our objects
+            # will eventually move away from VersionedObjectDictCompat
+            for key, value in values.items():
+                setattr(self, key, value)
+            self.obj_reset_changes(values.keys())
+        return result
+
 
 class CinderObjectDictCompat(base.VersionedObjectDictCompat):
     """Mix-in to provide dictionary key access compat.
index ef5d8bb8c26f32f49468350830803874ecdd0300..7a9806574a78d2691a13f25756418ec496242720 100644 (file)
@@ -24,9 +24,6 @@ from cinder import objects
 from cinder.objects import base
 
 CONF = cfg.CONF
-# NOTE(thangp): OPTIONAL_FIELDS are fields that would be lazy-loaded. They are
-# typically the relationship in the sqlalchemy object.
-OPTIONAL_FIELDS = ['volume', 'metadata', 'cgsnapshot']
 LOG = logging.getLogger(__name__)
 
 
@@ -36,6 +33,10 @@ class Snapshot(base.CinderPersistentObject, base.CinderObject,
     # Version 1.0: Initial version
     VERSION = '1.0'
 
+    # NOTE(thangp): OPTIONAL_FIELDS are fields that would be lazy-loaded. They
+    # are typically the relationship in the sqlalchemy object.
+    OPTIONAL_FIELDS = ('volume', 'metadata', 'cgsnapshot')
+
     DEFAULT_EXPECTED_ATTR = ('metadata',)
 
     fields = {
@@ -109,7 +110,7 @@ class Snapshot(base.CinderPersistentObject, base.CinderObject,
         if expected_attrs is None:
             expected_attrs = []
         for name, field in snapshot.fields.items():
-            if name in OPTIONAL_FIELDS:
+            if name in Snapshot.OPTIONAL_FIELDS:
                 continue
             value = db_snapshot.get(name)
             if isinstance(field, fields.IntegerField):
@@ -180,7 +181,7 @@ class Snapshot(base.CinderPersistentObject, base.CinderObject,
         db.snapshot_destroy(self._context, self.id)
 
     def obj_load_attr(self, attrname):
-        if attrname not in OPTIONAL_FIELDS:
+        if attrname not in self.OPTIONAL_FIELDS:
             raise exception.ObjectActionError(
                 action='obj_load_attr',
                 reason=_('attribute %s not lazy-loadable') % attrname)
index b8790f7450c8e188b4d7426ba17c3101ea2b4892..85579fe58a3ca3a397a15d8eb09051d509c09d30 100644 (file)
@@ -24,8 +24,6 @@ from cinder import objects
 from cinder.objects import base
 
 CONF = cfg.CONF
-OPTIONAL_FIELDS = ['metadata', 'admin_metadata',
-                   'volume_type', 'volume_attachment']
 LOG = logging.getLogger(__name__)
 
 
@@ -37,6 +35,9 @@ class Volume(base.CinderPersistentObject, base.CinderObject,
     #              volume_type
     VERSION = '1.1'
 
+    OPTIONAL_FIELDS = ('metadata', 'admin_metadata',
+                       'volume_type', 'volume_attachment')
+
     DEFAULT_EXPECTED_ATTR = ('admin_metadata', 'metadata')
 
     fields = {
@@ -146,7 +147,7 @@ class Volume(base.CinderPersistentObject, base.CinderObject,
         if expected_attrs is None:
             expected_attrs = []
         for name, field in volume.fields.items():
-            if name in OPTIONAL_FIELDS:
+            if name in Volume.OPTIONAL_FIELDS:
                 continue
             value = db_volume.get(name)
             if isinstance(field, fields.IntegerField):
@@ -218,7 +219,7 @@ class Volume(base.CinderPersistentObject, base.CinderObject,
             db.volume_destroy(self._context, self.id)
 
     def obj_load_attr(self, attrname):
-        if attrname not in OPTIONAL_FIELDS:
+        if attrname not in self.OPTIONAL_FIELDS:
             raise exception.ObjectActionError(
                 action='obj_load_attr',
                 reason=_('attribute %s not lazy-loadable') % attrname)
index 50354277c9ede231f80b9cc95e6d1d7bda84b1a6..ec12ab6b87c42b5398ea9625c6193bcc76bb9486 100644 (file)
@@ -17,17 +17,22 @@ import uuid
 
 from iso8601 import iso8601
 from oslo_versionedobjects import fields
+from sqlalchemy import sql
 
-from cinder.objects import base
+from cinder import context
+from cinder import db
+from cinder.db.sqlalchemy import models
+from cinder import objects
+from cinder import test
 from cinder.tests.unit import objects as test_objects
 
 
-@base.CinderObjectRegistry.register_if(False)
-class TestObject(base.CinderObject):
+@objects.base.CinderObjectRegistry.register_if(False)
+class TestObject(objects.base.CinderObject):
     fields = {
-        'scheduled_at': base.fields.DateTimeField(nullable=True),
-        'uuid': base.fields.UUIDField(),
-        'text': base.fields.StringField(nullable=True),
+        'scheduled_at': objects.base.fields.DateTimeField(nullable=True),
+        'uuid': objects.base.fields.UUIDField(),
+        'text': objects.base.fields.StringField(nullable=True),
     }
 
 
@@ -80,10 +85,10 @@ class TestCinderObject(test_objects.BaseObjectsTestCase):
 
 class TestCinderComparableObject(test_objects.BaseObjectsTestCase):
     def test_comparable_objects(self):
-        @base.CinderObjectRegistry.register
-        class MyComparableObj(base.CinderObject,
-                              base.CinderObjectDictCompat,
-                              base.CinderComparableObject):
+        @objects.base.CinderObjectRegistry.register
+        class MyComparableObj(objects.base.CinderObject,
+                              objects.base.CinderObjectDictCompat,
+                              objects.base.CinderComparableObject):
             fields = {'foo': fields.Field(fields.Integer())}
 
         class NonVersionedObject(object):
@@ -97,3 +102,439 @@ class TestCinderComparableObject(test_objects.BaseObjectsTestCase):
         self.assertFalse(obj1 == obj3)
         self.assertFalse(obj1 == obj4)
         self.assertNotEqual(obj1, None)
+
+
+class TestCinderObjectConditionalUpdate(test.TestCase):
+
+    def setUp(self):
+        super(TestCinderObjectConditionalUpdate, self).setUp()
+        self.context = context.get_admin_context()
+
+    def _create_volume(self):
+        vol = {
+            'display_description': 'Test Desc',
+            'size': 1,
+            'status': 'available',
+            'availability_zone': 'az',
+            'host': 'dummy',
+            'attach_status': 'no',
+        }
+        volume = objects.Volume(context=self.context, **vol)
+        volume.create()
+        return volume
+
+    def _create_snapshot(self, volume):
+        snapshot = objects.Snapshot(context=self.context, volume_id=volume.id)
+        snapshot.create()
+        return snapshot
+
+    def _check_volume(self, volume, status, size, reload=False, dirty_keys=(),
+                      **kwargs):
+        if reload:
+            volume = objects.Volume.get_by_id(self.context, volume.id)
+        self.assertEqual(status, volume.status)
+        self.assertEqual(size, volume.size)
+        dirty = volume.cinder_obj_get_changes()
+        self.assertEqual(list(dirty_keys), dirty.keys())
+        for key, value in kwargs.items():
+            self.assertEqual(value, getattr(volume, key))
+
+    def test_conditional_update_non_iterable_expected(self):
+        volume = self._create_volume()
+        # We also check that we can check for None values
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting', 'size': 2},
+            {'status': 'available', 'migration_status': None}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 2)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 2, True)
+
+    def test_conditional_update_non_iterable_expected_model_field(self):
+        volume = self._create_volume()
+        # We also check that we can check for None values
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting', 'size': 2,
+             'previous_status': volume.model.status},
+            {'status': 'available', 'migration_status': None}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 2, previous_status='available')
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 2, True,
+                           previous_status='available')
+
+    def test_conditional_update_non_iterable_expected_save_all(self):
+        volume = self._create_volume()
+        volume.size += 1
+        # We also check that we can check for not None values
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting'},
+            {'status': 'available', 'availability_zone': volume.Not(None)},
+            save_all=True))
+
+        # Check that the object in memory has been updated and that the size
+        # is not a dirty key
+        self._check_volume(volume, 'deleting', 2)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 2, True)
+
+    def test_conditional_update_non_iterable_expected_dont_save_all(self):
+        volume = self._create_volume()
+        volume.size += 1
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting'},
+            {'status': 'available'}, save_all=False))
+
+        # Check that the object in memory has been updated with the new status
+        # but that size has not been saved and is a dirty key
+        self._check_volume(volume, 'deleting', 2, False, ['size'])
+
+        # Check that the volume in the DB also has been updated but not the
+        # size
+        self._check_volume(volume, 'deleting', 1, True)
+
+    def test_conditional_update_fail_non_iterable_expected_save_all(self):
+        volume = self._create_volume()
+        volume.size += 1
+        self.assertFalse(volume.conditional_update(
+            {'status': 'available'},
+            {'status': 'deleting'}, save_all=True))
+
+        # Check that the object in memory has not been updated and that the
+        # size is still a dirty key
+        self._check_volume(volume, 'available', 2, False, ['size'])
+
+        # Check that the volume in the DB hasn't been updated
+        self._check_volume(volume, 'available', 1, True)
+
+    def test_default_conditional_update_non_iterable_expected(self):
+        volume = self._create_volume()
+        self.assertTrue(volume.conditional_update({'status': 'deleting'}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 1)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 1, True)
+
+    def test_default_conditional_fail_update_non_iterable_expected(self):
+        volume_in_db = self._create_volume()
+        volume = objects.Volume.get_by_id(self.context, volume_in_db.id)
+        volume_in_db.size += 1
+        volume_in_db.save()
+        # This will fail because size in DB is different
+        self.assertFalse(volume.conditional_update({'status': 'deleting'}))
+
+        # Check that the object in memory has not been updated
+        self._check_volume(volume, 'available', 1)
+
+        # Check that the volume in the DB hasn't changed the status but has
+        # the size we changed before the conditional update
+        self._check_volume(volume_in_db, 'available', 2, True)
+
+    def test_default_conditional_update_non_iterable_expected_with_dirty(self):
+        volume_in_db = self._create_volume()
+        volume = objects.Volume.get_by_id(self.context, volume_in_db.id)
+        volume_in_db.size += 1
+        volume_in_db.save()
+        volume.size = 33
+        # This will fail because even though we have excluded the size from
+        # the default condition when we dirtied it in the volume object, we
+        # still have the last update timestamp that will be included in the
+        # condition
+        self.assertFalse(volume.conditional_update({'status': 'deleting'}))
+
+        # Check that the object in memory has not been updated
+        self._check_volume(volume, 'available', 33, False, ['size'])
+
+        # Check that the volume in the DB hasn't changed the status but has
+        # the size we changed before the conditional update
+        self._check_volume(volume_in_db, 'available', 2, True)
+
+    def test_conditional_update_negated_non_iterable_expected(self):
+        volume = self._create_volume()
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting', 'size': 2},
+            {'status': db.Not('in-use'), 'size': db.Not(2)}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 2)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 2, True)
+
+    def test_conditional_update_non_iterable_expected_filter(self):
+        # Volume we want to change
+        volume = self._create_volume()
+
+        # Another volume that has no snapshots
+        volume2 = self._create_volume()
+
+        # A volume with snapshots
+        volume3 = self._create_volume()
+        self._create_snapshot(volume3)
+
+        # Update only it it has no snapshot
+        filters = (~sql.exists().where(
+            models.Snapshot.volume_id == models.Volume.id),)
+
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting', 'size': 2},
+            {'status': 'available'},
+            filters))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 2)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 2, True)
+
+        # Check that the other volumes in the DB haven't changed
+        self._check_volume(volume2, 'available', 1, True)
+        self._check_volume(volume3, 'available', 1, True)
+
+    def test_conditional_update_iterable_expected(self):
+        volume = self._create_volume()
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting', 'size': 20},
+            {'status': ('error', 'available'), 'size': range(10)}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 20)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 20, True)
+
+    def test_conditional_update_negated_iterable_expected(self):
+        volume = self._create_volume()
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting', 'size': 20},
+            {'status': db.Not(('creating', 'in-use')), 'size': range(10)}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 20)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 20, True)
+
+    def test_conditional_update_fail_non_iterable_expected(self):
+        volume = self._create_volume()
+        self.assertFalse(volume.conditional_update(
+            {'status': 'deleting'},
+            {'status': 'available', 'size': 2}))
+
+        # Check that the object in memory hasn't changed
+        self._check_volume(volume, 'available', 1)
+
+        # Check that the volume in the DB hasn't changed either
+        self._check_volume(volume, 'available', 1, True)
+
+    def test_conditional_update_fail_negated_non_iterable_expected(self):
+        volume = self._create_volume()
+        result = volume.conditional_update({'status': 'deleting'},
+                                           {'status': db.Not('in-use'),
+                                            'size': 2})
+        self.assertFalse(result)
+
+        # Check that the object in memory hasn't changed
+        self._check_volume(volume, 'available', 1)
+
+        # Check that the volume in the DB hasn't changed either
+        self._check_volume(volume, 'available', 1, True)
+
+    def test_conditional_update_fail_iterable_expected(self):
+        volume = self._create_volume()
+        self.assertFalse(volume.conditional_update(
+            {'status': 'available'},
+            {'status': ('error', 'creating'), 'size': range(2, 10)}))
+
+        # Check that the object in memory hasn't changed
+        self._check_volume(volume, 'available', 1)
+
+        # Check that the volume in the DB hasn't changed either
+        self._check_volume(volume, 'available', 1, True)
+
+    def test_conditional_update_fail_negated_iterable_expected(self):
+        volume = self._create_volume()
+        self.assertFalse(volume.conditional_update(
+            {'status': 'error'},
+            {'status': db.Not(('available', 'in-use')), 'size': range(2, 10)}))
+
+        # Check that the object in memory hasn't changed
+        self._check_volume(volume, 'available', 1)
+
+        # Check that the volume in the DB hasn't changed either
+        self._check_volume(volume, 'available', 1, True)
+
+    def test_conditional_update_fail_non_iterable_expected_filter(self):
+        # Volume we want to change
+        volume = self._create_volume()
+        self._create_snapshot(volume)
+
+        # A volume that has no snapshots
+        volume2 = self._create_volume()
+
+        # Another volume with snapshots
+        volume3 = self._create_volume()
+        self._create_snapshot(volume3)
+
+        # Update only it it has no snapshot
+        filters = (~sql.exists().where(
+            models.Snapshot.volume_id == models.Volume.id),)
+
+        self.assertFalse(volume.conditional_update(
+            {'status': 'deleting', 'size': 2},
+            {'status': 'available'},
+            filters))
+
+        # Check that the object in memory hasn't been updated
+        self._check_volume(volume, 'available', 1)
+
+        # Check that no volume in the DB also has been updated
+        self._check_volume(volume, 'available', 1, True)
+        self._check_volume(volume2, 'available', 1, True)
+        self._check_volume(volume3, 'available', 1, True)
+
+    def test_conditional_update_non_iterable_case_value(self):
+        # Volume we want to change and has snapshots
+        volume = self._create_volume()
+        self._create_snapshot(volume)
+
+        # Filter that checks if a volume has snapshots
+        has_snapshot_filter = sql.exists().where(
+            models.Snapshot.volume_id == models.Volume.id)
+
+        # We want the updated value to depend on whether it has snapshots or
+        # not
+        case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
+                                  else_='no-snapshot')
+        self.assertTrue(volume.conditional_update({'status': case_values},
+                                                  {'status': 'available'}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'has-snapshot', 1)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'has-snapshot', 1, True)
+
+    def test_conditional_update_non_iterable_case_value_else(self):
+        # Volume we want to change
+        volume = self._create_volume()
+
+        # Filter that checks if a volume has snapshots
+        has_snapshot_filter = sql.exists().where(
+            models.Snapshot.volume_id == models.Volume.id)
+
+        # We want the updated value to depend on whether it has snapshots or
+        # not
+        case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
+                                  else_='no-snapshot')
+        self.assertTrue(volume.conditional_update({'status': case_values},
+                                                  {'status': 'available'}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'no-snapshot', 1)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'no-snapshot', 1, True)
+
+    def test_conditional_update_non_iterable_case_value_fail(self):
+        # Volume we want to change doesn't have snapshots
+        volume = self._create_volume()
+
+        # Filter that checks if a volume has snapshots
+        has_snapshot_filter = sql.exists().where(
+            models.Snapshot.volume_id == models.Volume.id)
+
+        # We want the updated value to depend on whether it has snapshots or
+        # not
+        case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
+                                  else_='no-snapshot')
+        # We won't update because volume status is available
+        self.assertFalse(volume.conditional_update({'status': case_values},
+                                                   {'status': 'deleting'}))
+
+        # Check that the object in memory has not been updated
+        self._check_volume(volume, 'available', 1)
+
+        # Check that the volume in the DB also hasn't been updated either
+        self._check_volume(volume, 'available', 1, True)
+
+    def test_conditional_update_iterable_with_none_expected(self):
+        volume = self._create_volume()
+        # We also check that we can check for None values in an iterable
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting'},
+            {'status': (None, 'available'),
+             'migration_status': (None, 'finished')}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 1)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 1, True)
+
+    def test_conditional_update_iterable_with_not_none_expected(self):
+        volume = self._create_volume()
+        # We also check that we can check for None values in a negated iterable
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting'},
+            {'status': volume.Not((None, 'in-use'))}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 1)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 1, True)
+
+    def test_conditional_update_iterable_with_not_includes_null(self):
+        volume = self._create_volume()
+        # We also check that negation includes None values by default like we
+        # do in Python and not like MySQL does
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting'},
+            {'status': 'available',
+             'migration_status': volume.Not(('migrating', 'error'))}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', 1)
+
+        # Check that the volume in the DB also has been updated
+        self._check_volume(volume, 'deleting', 1, True)
+
+    def test_conditional_update_iterable_with_not_includes_null_fails(self):
+        volume = self._create_volume()
+        # We also check that negation excludes None values if we ask it to
+        self.assertFalse(volume.conditional_update(
+            {'status': 'deleting'},
+            {'status': 'available',
+             'migration_status': volume.Not(('migrating', 'error'),
+                                            auto_none=False)}))
+
+        # Check that the object in memory has not been updated
+        self._check_volume(volume, 'available', 1, False)
+
+        # Check that the volume in the DB hasn't been updated
+        self._check_volume(volume, 'available', 1, True)
+
+    def test_conditional_update_use_operation_in_value(self):
+        volume = self._create_volume()
+        expected_size = volume.size + 1
+
+        # We also check that using fields in requested changes will work as
+        # expected
+        self.assertTrue(volume.conditional_update(
+            {'status': 'deleting',
+             'size': volume.model.size + 1},
+            {'status': 'available'}))
+
+        # Check that the object in memory has been updated
+        self._check_volume(volume, 'deleting', expected_size, False)
+
+        # Check that the volume in the DB has also been updated
+        self._check_volume(volume, 'deleting', expected_size, True)