]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Pin RPC and object version to lowest running
authorMichał Dulko <michal.dulko@intel.com>
Thu, 14 Jan 2016 19:57:12 +0000 (20:57 +0100)
committerMichał Dulko <michal.dulko@intel.com>
Fri, 29 Jan 2016 13:36:04 +0000 (08:36 -0500)
This commit adds detection and pinning to lowest RPC API version to all
rpcapi modules. The version pin is determined by a DB call done once per
service lifetime thanks to caching. Handling the compatibility is
guaranteed by oslo.messaging and our shims in rpcapi modules.

To achieve o.vo compatibility, a similar approach is implemented. Custom
oslo.messaging serializer is implemented that backports objects to the
lowest running version on sending.

During the process of upgrade it may happen that manager receives an
object in the version lower than current one. Handling of such
situations is up to the manager and it should do that explicitely by
checking obj.VERSION.

The patch also adds required methods to db.api and Service object.

Co-Authored-By: Thang Pham <thang.g.pham@gmail.com>
Change-Id: I649892da64f9734928a6cf0f004a369aa7aa375f
Partial-Implements: blueprint rpc-object-compatibility

15 files changed:
cinder/backup/rpcapi.py
cinder/db/api.py
cinder/db/sqlalchemy/api.py
cinder/objects/base.py
cinder/objects/service.py
cinder/rpc.py
cinder/scheduler/rpcapi.py
cinder/tests/unit/backup/test_rpcapi.py
cinder/tests/unit/objects/test_base.py
cinder/tests/unit/objects/test_objects.py
cinder/tests/unit/objects/test_service.py
cinder/tests/unit/test_db_api.py
cinder/tests/unit/test_rpc.py [new file with mode: 0644]
cinder/tests/unit/test_volume_rpcapi.py
cinder/volume/rpcapi.py

index d70e3bb3b25bbe5611099d60b4ed386b0086e22d..9ed3ccacc2b352299498be78f9f4ca7f693aff19 100644 (file)
@@ -20,18 +20,15 @@ Client side of the volume backup RPC API.
 
 from oslo_config import cfg
 from oslo_log import log as logging
-import oslo_messaging as messaging
 
-from cinder.objects import base as objects_base
 from cinder import rpc
 
 
 CONF = cfg.CONF
-
 LOG = logging.getLogger(__name__)
 
 
-class BackupAPI(object):
+class BackupAPI(rpc.RPCAPI):
     """Client side of the volume rpc API.
 
     API version history:
@@ -40,16 +37,9 @@ class BackupAPI(object):
         1.1 - Changed methods to accept backup objects instead of IDs.
     """
 
-    BASE_RPC_API_VERSION = '1.0'
     RPC_API_VERSION = '1.1'
-
-    def __init__(self):
-        super(BackupAPI, self).__init__()
-        target = messaging.Target(topic=CONF.backup_topic,
-                                  version=self.BASE_RPC_API_VERSION)
-        serializer = objects_base.CinderObjectSerializer()
-        self.client = rpc.get_client(target, self.RPC_API_VERSION,
-                                     serializer=serializer)
+    TOPIC = CONF.backup_topic
+    BINARY = 'cinder-backup'
 
     def create_backup(self, ctxt, backup):
         LOG.debug("create_backup in rpcapi backup_id %s", backup.id)
index d223e865ec546220529b3c2e749c880eb3c3da3c..89fceac006f635865404ff072972e570edeb16e5 100644 (file)
@@ -116,6 +116,11 @@ def service_get_all_by_topic(context, topic, disabled=None):
     return IMPL.service_get_all_by_topic(context, topic, disabled=disabled)
 
 
+def service_get_all_by_binary(context, binary, disabled=None):
+    """Get all services for a given binary."""
+    return IMPL.service_get_all_by_binary(context, binary, disabled)
+
+
 def service_get_by_args(context, host, binary):
     """Get the state of a service by node name and binary."""
     return IMPL.service_get_by_args(context, host, binary)
index 222ab1e58a9f3173fed086dba3bf91d4ba4ec996..e82edd8900066ae84e5e6d7234e2fd4c11d9e389 100644 (file)
@@ -393,6 +393,17 @@ def service_get_all_by_topic(context, topic, disabled=None):
     return query.all()
 
 
+@require_admin_context
+def service_get_all_by_binary(context, binary, disabled=None):
+    query = model_query(
+        context, models.Service, read_deleted="no").filter_by(binary=binary)
+
+    if disabled is not None:
+        query = query.filter_by(disabled=disabled)
+
+    return query.all()
+
+
 @require_admin_context
 def service_get_by_host_and_topic(context, host, topic):
     result = model_query(
index f55ea4c6f9816da4c0f872812b39588b4bcd63ef..24f775a1d5631554bd714ebdf83e043d1d176aba 100644 (file)
@@ -96,6 +96,7 @@ OBJ_VERSIONS.add('1.0', {'Backup': '1.3', 'BackupImport': '1.3',
                          'ConsistencyGroup': '1.2',
                          'ConsistencyGroupList': '1.1', 'Service': '1.1',
                          'Volume': '1.3', 'VolumeTypeList': '1.1'})
+OBJ_VERSIONS.add('1.1', {'Service': '1.2', 'ServiceList': '1.1'})
 
 
 class CinderObjectRegistry(base.VersionedObjectRegistry):
@@ -385,3 +386,23 @@ class ObjectListBase(base.ObjectListBase):
 
 class CinderObjectSerializer(base.VersionedObjectSerializer):
     OBJ_BASE_CLASS = CinderObject
+
+    def __init__(self, version_cap=None):
+        super(CinderObjectSerializer, self).__init__()
+        self.version_cap = version_cap
+
+    def _get_capped_obj_version(self, obj):
+        objname = obj.obj_name()
+        objver = OBJ_VERSIONS.get(self.version_cap, {})
+        return objver.get(objname, None)
+
+    def serialize_entity(self, context, entity):
+        if isinstance(entity, (tuple, list, set, dict)):
+            entity = self._process_iterable(context, self.serialize_entity,
+                                            entity)
+        elif (hasattr(entity, 'obj_to_primitive') and
+              callable(entity.obj_to_primitive)):
+            # NOTE(dulek): Backport outgoing object to the capped version.
+            backport_ver = self._get_capped_obj_version(entity)
+            entity = entity.obj_to_primitive(backport_ver)
+        return entity
index ab4847db3391416567a0598ec4e1e62a6bdf16b7..8f28bfa674604466908df91a688b19acafdd7d85 100644 (file)
@@ -14,6 +14,7 @@
 
 from oslo_config import cfg
 from oslo_log import log as logging
+from oslo_utils import versionutils
 from oslo_versionedobjects import fields
 
 from cinder import db
@@ -33,7 +34,8 @@ class Service(base.CinderPersistentObject, base.CinderObject,
               base.CinderComparableObject):
     # Version 1.0: Initial version
     # Version 1.1: Add rpc_current_version and object_current_version fields
-    VERSION = '1.1'
+    # Version 1.2: Add get_minimum_rpc_version() and get_minimum_obj_version()
+    VERSION = '1.2'
 
     fields = {
         'id': fields.IntegerField(),
@@ -100,16 +102,49 @@ class Service(base.CinderPersistentObject, base.CinderObject,
         with self.obj_as_admin():
             db.service_destroy(self._context, self.id)
 
+    @classmethod
+    def _get_minimum_version(cls, attribute, context, binary):
+        services = ServiceList.get_all_by_binary(context, binary)
+        min_ver = None
+        min_ver_str = None
+        for s in services:
+            ver_str = getattr(s, attribute)
+            if ver_str is None:
+                # FIXME(dulek) None in *_current_version means that this
+                # service is in Liberty version, so we must assume this is the
+                # lowest one. We use handy and easy to remember token to
+                # indicate that. This may go away as soon as we drop
+                # compatibility with Liberty, possibly in early N.
+                return 'liberty'
+            ver = versionutils.convert_version_to_int(ver_str)
+            if min_ver is None or ver < min_ver:
+                min_ver = ver
+                min_ver_str = ver_str
+
+        return min_ver_str
+
+    @base.remotable_classmethod
+    def get_minimum_rpc_version(cls, context, binary):
+        return cls._get_minimum_version('rpc_current_version', context, binary)
+
+    @base.remotable_classmethod
+    def get_minimum_obj_version(cls, context, binary):
+        return cls._get_minimum_version('object_current_version', context,
+                                        binary)
+
 
 @base.CinderObjectRegistry.register
 class ServiceList(base.ObjectListBase, base.CinderObject):
-    VERSION = '1.0'
+    # Version 1.0: Initial version
+    # Version 1.1: Service object 1.2
+    VERSION = '1.1'
 
     fields = {
         'objects': fields.ListOfObjectsField('Service'),
     }
     child_versions = {
-        '1.0': '1.0'
+        '1.0': '1.0',
+        '1.1': '1.2',
     }
 
     @base.remotable_classmethod
@@ -124,3 +159,10 @@ class ServiceList(base.ObjectListBase, base.CinderObject):
                                                disabled=disabled)
         return base.obj_make_list(context, cls(context), objects.Service,
                                   services)
+
+    @base.remotable_classmethod
+    def get_all_by_binary(cls, context, binary, disabled=None):
+        services = db.service_get_all_by_binary(context, binary,
+                                                disabled=disabled)
+        return base.obj_make_list(context, cls(context), objects.Service,
+                                  services)
index 8ffab0e8bc6f0e071b1f9ca0feccf414f88f6ee6..9ade49d14d8fb778efd7fcad948deea8ea364afe 100644 (file)
@@ -27,14 +27,19 @@ __all__ = [
 ]
 
 from oslo_config import cfg
+from oslo_log import log as logging
 import oslo_messaging as messaging
 from oslo_serialization import jsonutils
 from osprofiler import profiler
 
 import cinder.context
 import cinder.exception
+from cinder.i18n import _LI
+from cinder import objects
+from cinder.objects import base
 
 CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
 TRANSPORT = None
 NOTIFIER = None
 
@@ -160,3 +165,70 @@ def get_notifier(service=None, host=None, publisher_id=None):
     if not publisher_id:
         publisher_id = "%s.%s" % (service, host or CONF.host)
     return NOTIFIER.prepare(publisher_id=publisher_id)
+
+
+LAST_RPC_VERSIONS = {}
+LAST_OBJ_VERSIONS = {}
+
+
+class RPCAPI(object):
+    """Mixin class aggregating methods related to RPC API compatibility."""
+
+    RPC_API_VERSION = '1.0'
+    TOPIC = ''
+    BINARY = ''
+
+    def __init__(self):
+        target = messaging.Target(topic=self.TOPIC,
+                                  version=self.RPC_API_VERSION)
+        obj_version_cap = self._determine_obj_version_cap()
+        serializer = base.CinderObjectSerializer(obj_version_cap)
+
+        rpc_version_cap = self._determine_rpc_version_cap()
+        self.client = get_client(target, version_cap=rpc_version_cap,
+                                 serializer=serializer)
+
+    def _determine_rpc_version_cap(self):
+        global LAST_RPC_VERSIONS
+        if self.BINARY in LAST_RPC_VERSIONS:
+            return LAST_RPC_VERSIONS[self.BINARY]
+
+        version_cap = objects.Service.get_minimum_rpc_version(
+            cinder.context.get_admin_context(), self.BINARY)
+        if version_cap == 'liberty':
+            # NOTE(dulek): This means that one of the services is Liberty,
+            # we should cap to it's RPC version.
+            version_cap = LIBERTY_RPC_VERSIONS[self.BINARY]
+        LOG.info(_LI('Automatically selected %(binary)s RPC version '
+                     '%(version)s as minimum service version.'),
+                 {'binary': self.BINARY, 'version': version_cap})
+        LAST_RPC_VERSIONS[self.BINARY] = version_cap
+        return version_cap
+
+    def _determine_obj_version_cap(self):
+        global LAST_OBJ_VERSIONS
+        if self.BINARY in LAST_OBJ_VERSIONS:
+            return LAST_OBJ_VERSIONS[self.BINARY]
+
+        version_cap = objects.Service.get_minimum_obj_version(
+            cinder.context.get_admin_context(), self.BINARY)
+        LOG.info(_LI('Automatically selected %(binary)s objects version '
+                     '%(version)s as minimum service version.'),
+                 {'binary': self.BINARY, 'version': version_cap})
+        LAST_OBJ_VERSIONS[self.BINARY] = version_cap
+        return version_cap
+
+
+# FIXME(dulek): Liberty haven't reported its RPC versions, so we need to have
+# them hardcoded. This dict may go away as soon as we drop compatibility with
+# L, which should be in early N.
+#
+# This is the only time we need to have such dictionary. We don't need to add
+# similar ones for any release following Liberty.
+LIBERTY_RPC_VERSIONS = {
+    'cinder-volume': '1.30',
+    'cinder-scheduler': '1.8',
+    # NOTE(dulek) backup.manager had specified version '1.2', but backup.rpcapi
+    # was really only sending messages up to '1.1'.
+    'cinder-backup': '1.1',
+}
index f227be62b14a6cd8cfd19abf210b700d83f75152..ee76981f9941b7c7f12342a6ac8e8807428d889a 100644 (file)
@@ -17,17 +17,15 @@ Client side of the scheduler manager RPC API.
 """
 
 from oslo_config import cfg
-import oslo_messaging as messaging
 from oslo_serialization import jsonutils
 
-from cinder.objects import base as objects_base
 from cinder import rpc
 
 
 CONF = cfg.CONF
 
 
-class SchedulerAPI(object):
+class SchedulerAPI(rpc.RPCAPI):
     """Client side of the scheduler rpc API.
 
     API version history:
@@ -48,18 +46,9 @@ class SchedulerAPI(object):
                migrate_volume_to_host()
     """
 
-    RPC_API_VERSION = '1.0'
-
-    def __init__(self):
-        super(SchedulerAPI, self).__init__()
-        target = messaging.Target(topic=CONF.scheduler_topic,
-                                  version=self.RPC_API_VERSION)
-        serializer = objects_base.CinderObjectSerializer()
-
-        # NOTE(thangp): Until version pinning is impletemented, set the client
-        # version_cap to None
-        self.client = rpc.get_client(target, version_cap=None,
-                                     serializer=serializer)
+    RPC_API_VERSION = '1.11'
+    TOPIC = CONF.scheduler_topic
+    BINARY = 'cinder-scheduler'
 
     def create_consistencygroup(self, ctxt, topic, group,
                                 request_spec_list=None,
index 4f6f745e8ffc65365a35a96fca7f13de961d7186..4354a8194194053d3b250258fa6f5fae902dc4c3 100644 (file)
@@ -45,7 +45,7 @@ class BackupRpcAPITestCase(test.TestCase):
         target = {
             "server": server,
             "fanout": fanout,
-            "version": kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION)
+            "version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
         }
 
         expected_msg = copy.deepcopy(kwargs)
index e487ac48f96198c9f1fecd2843593fbd25a02a8b..7f53518ef462554ecd390458071f97c6b9b41730 100644 (file)
@@ -17,6 +17,7 @@ import mock
 import uuid
 
 from iso8601 import iso8601
+from oslo_utils import versionutils
 from oslo_versionedobjects import fields
 from sqlalchemy import sql
 
@@ -30,12 +31,21 @@ from cinder.tests.unit import objects as test_objects
 
 @objects.base.CinderObjectRegistry.register_if(False)
 class TestObject(objects.base.CinderObject):
+    VERSION = '1.1'
+
     fields = {
         'scheduled_at': objects.base.fields.DateTimeField(nullable=True),
         'uuid': objects.base.fields.UUIDField(),
         'text': objects.base.fields.StringField(nullable=True),
     }
 
+    def obj_make_compatible(self, primitive, target_version):
+        super(TestObject, self).obj_make_compatible(primitive,
+                                                    target_version)
+        target_version = versionutils.convert_version_to_tuple(target_version)
+        if target_version < (1, 1):
+            primitive.pop('text', None)
+
 
 class TestCinderObject(test_objects.BaseObjectsTestCase):
     """Tests methods from CinderObject."""
@@ -595,3 +605,23 @@ class TestCinderDictObject(test_objects.BaseObjectsTestCase):
         self.assertTrue('foo' in obj)
         self.assertTrue('abc' in obj)
         self.assertFalse('def' in obj)
+
+
+@mock.patch('cinder.objects.base.OBJ_VERSIONS', {'1.0': {'TestObject': '1.0'},
+                                                 '1.1': {'TestObject': '1.1'},
+                                                 })
+class TestCinderObjectSerializer(test_objects.BaseObjectsTestCase):
+    def setUp(self):
+        super(TestCinderObjectSerializer, self).setUp()
+        self.obj = TestObject(scheduled_at=None, uuid=uuid.uuid4(),
+                              text='text')
+
+    def test_serialize_entity_backport(self):
+        serializer = objects.base.CinderObjectSerializer('1.0')
+        primitive = serializer.serialize_entity(self.context, self.obj)
+        self.assertEqual('1.0', primitive['versioned_object.version'])
+
+    def test_serialize_entity_unknown_version(self):
+        serializer = objects.base.CinderObjectSerializer('0.9')
+        primitive = serializer.serialize_entity(self.context, self.obj)
+        self.assertEqual('1.1', primitive['versioned_object.version'])
index 8908c25f9a86038068d598c8a0f38b73a5841a1f..a5b89e0106ca82e342a850f99d7fe553c48d1f69 100644 (file)
@@ -28,8 +28,8 @@ object_data = {
     'CGSnapshotList': '1.0-e8c3f4078cd0ee23487b34d173eec776',
     'ConsistencyGroup': '1.2-ed7f90a6871991a19af716ade7337fc9',
     'ConsistencyGroupList': '1.1-73916823b697dfa0c7f02508d87e0f28',
-    'Service': '1.1-9eb00cbd8e2bfb7371343429af54d6e8',
-    'ServiceList': '1.0-d242d3384b68e5a5a534e090ff1d5161',
+    'Service': '1.2-4d3dd6c9906da364739fbf3f90c80505',
+    'ServiceList': '1.1-cb758b200f0a3a90efabfc5aa2ffb627',
     'Snapshot': '1.0-a6c33eefeadefb324d79f72f66c54e9a',
     'SnapshotList': '1.0-71661e7180ef6cc51501704a9bea4bf1',
     'Volume': '1.3-97c3977846dae6588381e7bd3e6e6558',
index dc10236aca1b10cead47d023c50a8bee225f275e..923cffc2173486cc8a897eaea76ddc0d26bf30f9 100644 (file)
@@ -97,6 +97,39 @@ class TestService(test_objects.BaseObjectsTestCase):
                                       mock.call.__nonzero__(),
                                       mock.call(self.context, 123)])
 
+    @mock.patch('cinder.db.service_get_all_by_binary')
+    def _test_get_minimum_version(self, services_update, expected,
+                                  service_get_all_by_binary):
+        services = [fake_service.fake_db_service(**s) for s in services_update]
+        service_get_all_by_binary.return_value = services
+
+        min_rpc = objects.Service.get_minimum_rpc_version(self.context, 'foo')
+        self.assertEqual(expected[0], min_rpc)
+        min_obj = objects.Service.get_minimum_obj_version(self.context, 'foo')
+        self.assertEqual(expected[1], min_obj)
+        service_get_all_by_binary.assert_has_calls(
+            [mock.call(self.context, 'foo', disabled=None)] * 2)
+
+    @mock.patch('cinder.db.service_get_all_by_binary')
+    def test_get_minimum_version(self, service_get_all_by_binary):
+        services_update = [
+            {'rpc_current_version': '1.0', 'object_current_version': '1.3'},
+            {'rpc_current_version': '1.1', 'object_current_version': '1.2'},
+            {'rpc_current_version': '2.0', 'object_current_version': '2.5'},
+        ]
+        expected = ('1.0', '1.2')
+        self._test_get_minimum_version(services_update, expected)
+
+    @mock.patch('cinder.db.service_get_all_by_binary')
+    def test_get_minimum_version_liberty(self, service_get_all_by_binary):
+        services_update = [
+            {'rpc_current_version': '1.0', 'object_current_version': '1.3'},
+            {'rpc_current_version': '1.1', 'object_current_version': None},
+            {'rpc_current_version': None, 'object_current_version': '2.5'},
+        ]
+        expected = ('liberty', 'liberty')
+        self._test_get_minimum_version(services_update, expected)
+
 
 class TestServiceList(test_objects.BaseObjectsTestCase):
     @mock.patch('cinder.db.service_get_all')
@@ -120,3 +153,15 @@ class TestServiceList(test_objects.BaseObjectsTestCase):
             self.context, 'foo', disabled='bar')
         self.assertEqual(1, len(services))
         TestService._compare(self, db_service, services[0])
+
+    @mock.patch('cinder.db.service_get_all_by_binary')
+    def test_get_all_by_binary(self, service_get_all_by_binary):
+        db_service = fake_service.fake_db_service()
+        service_get_all_by_binary.return_value = [db_service]
+
+        services = objects.ServiceList.get_all_by_binary(
+            self.context, 'foo', 'bar')
+        service_get_all_by_binary.assert_called_once_with(
+            self.context, 'foo', disabled='bar')
+        self.assertEqual(1, len(services))
+        TestService._compare(self, db_service, services[0])
index 058d895b0573189d80e5a72239625ea58a00fa1b..ed321537856ac8f753d7c07163c73b0c392f3cf0 100644 (file)
@@ -209,6 +209,18 @@ class DBAPIServiceTestCase(BaseTest):
         real = db.service_get_all_by_topic(self.ctxt, 't1')
         self._assertEqualListsOfObjects(expected, real)
 
+    def test_service_get_all_by_binary(self):
+        values = [
+            {'host': 'host1', 'binary': 'b1'},
+            {'host': 'host2', 'binary': 'b1'},
+            {'host': 'host4', 'disabled': True, 'binary': 'b1'},
+            {'host': 'host3', 'binary': 'b2'}
+        ]
+        services = [self._create_service(vals) for vals in values]
+        expected = services[:3]
+        real = db.service_get_all_by_binary(self.ctxt, 'b1')
+        self._assertEqualListsOfObjects(expected, real)
+
     def test_service_get_by_args(self):
         values = [
             {'host': 'host1', 'binary': 'a'},
diff --git a/cinder/tests/unit/test_rpc.py b/cinder/tests/unit/test_rpc.py
new file mode 100644 (file)
index 0000000..cd52325
--- /dev/null
@@ -0,0 +1,83 @@
+#    Copyright 2015 Intel Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+
+from cinder import rpc
+from cinder import test
+
+
+class FakeAPI(rpc.RPCAPI):
+    RPC_API_VERSION = '1.5'
+    TOPIC = 'cinder-scheduler-topic'
+    BINARY = 'cinder-scheduler'
+
+
+class RPCAPITestCase(test.TestCase):
+    """Tests RPCAPI mixin aggregating stuff related to RPC compatibility."""
+
+    def setUp(self):
+        super(RPCAPITestCase, self).setUp()
+        # Reset cached version pins
+        rpc.LAST_RPC_VERSIONS = {}
+        rpc.LAST_OBJ_VERSIONS = {}
+
+    @mock.patch('cinder.objects.Service.get_minimum_rpc_version',
+                return_value='1.2')
+    @mock.patch('cinder.objects.Service.get_minimum_obj_version',
+                return_value='1.7')
+    @mock.patch('cinder.rpc.get_client')
+    def test_init(self, get_client, get_min_obj, get_min_rpc):
+        def fake_get_client(target, version_cap, serializer):
+            self.assertEqual(FakeAPI.TOPIC, target.topic)
+            self.assertEqual(FakeAPI.RPC_API_VERSION, target.version)
+            self.assertEqual('1.2', version_cap)
+            self.assertEqual('1.7', serializer.version_cap)
+
+        get_client.side_effect = fake_get_client
+        FakeAPI()
+
+    @mock.patch('cinder.objects.Service.get_minimum_rpc_version',
+                return_value='liberty')
+    @mock.patch('cinder.objects.Service.get_minimum_obj_version',
+                return_value='liberty')
+    @mock.patch('cinder.rpc.get_client')
+    def test_init_liberty_caps(self, get_client, get_min_obj, get_min_rpc):
+        def fake_get_client(target, version_cap, serializer):
+            self.assertEqual(FakeAPI.TOPIC, target.topic)
+            self.assertEqual(FakeAPI.RPC_API_VERSION, target.version)
+            self.assertEqual(rpc.LIBERTY_RPC_VERSIONS[FakeAPI.BINARY],
+                             version_cap)
+            self.assertEqual('liberty', serializer.version_cap)
+
+        get_client.side_effect = fake_get_client
+        FakeAPI()
+
+    @mock.patch('cinder.objects.Service.get_minimum_rpc_version')
+    @mock.patch('cinder.objects.Service.get_minimum_obj_version')
+    @mock.patch('cinder.rpc.get_client')
+    @mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-scheduler': '1.4'})
+    @mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-scheduler': '1.3'})
+    def test_init_cached_caps(self, get_client, get_min_obj, get_min_rpc):
+        def fake_get_client(target, version_cap, serializer):
+            self.assertEqual(FakeAPI.TOPIC, target.topic)
+            self.assertEqual(FakeAPI.RPC_API_VERSION, target.version)
+            self.assertEqual('1.4', version_cap)
+            self.assertEqual('1.3', serializer.version_cap)
+
+        get_client.side_effect = fake_get_client
+        FakeAPI()
+
+        self.assertFalse(get_min_obj.called)
+        self.assertFalse(get_min_rpc.called)
index 05ec502f2f50661e213fd4904e4d0bacdbb54b26..e03a49f0e7b57b2ad4321ab40d1e367013cab5d6 100644 (file)
@@ -110,7 +110,7 @@ class VolumeRpcAPITestCase(test.TestCase):
         expected_retval = 'foo' if method == 'call' else None
 
         target = {
-            "version": kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION)
+            "version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
         }
 
         if 'request_spec' in kwargs:
index 9600a140bd4d7b4827f24423372e16eec824768e..7fd60cb86b0bfdf03c77ce327e4034f3b2617f0d 100644 (file)
@@ -17,10 +17,8 @@ Client side of the volume RPC API.
 """
 
 from oslo_config import cfg
-import oslo_messaging as messaging
 from oslo_serialization import jsonutils
 
-from cinder.objects import base as objects_base
 from cinder import rpc
 from cinder.volume import utils
 
@@ -28,7 +26,7 @@ from cinder.volume import utils
 CONF = cfg.CONF
 
 
-class VolumeAPI(object):
+class VolumeAPI(rpc.RPCAPI):
     """Client side of the volume rpc API.
 
     API version history:
@@ -89,18 +87,9 @@ class VolumeAPI(object):
                checks in the API.
     """
 
-    BASE_RPC_API_VERSION = '1.0'
-
-    def __init__(self, topic=None):
-        super(VolumeAPI, self).__init__()
-        target = messaging.Target(topic=CONF.volume_topic,
-                                  version=self.BASE_RPC_API_VERSION)
-        serializer = objects_base.CinderObjectSerializer()
-
-        # NOTE(thangp): Until version pinning is impletemented, set the client
-        # version_cap to None
-        self.client = rpc.get_client(target, version_cap=None,
-                                     serializer=serializer)
+    RPC_API_VERSION = '1.37'
+    TOPIC = CONF.volume_topic
+    BINARY = 'cinder-volume'
 
     def create_consistencygroup(self, ctxt, group, host):
         new_host = utils.extract_host(host)