From: MichaƂ Dulko Date: Thu, 14 Jan 2016 19:57:12 +0000 (+0100) Subject: Pin RPC and object version to lowest running X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=e6d525e56d3d266abbb5894eb4afad33670ef869;p=openstack-build%2Fcinder-build.git Pin RPC and object version to lowest running This commit adds detection and pinning to lowest RPC API version to all rpcapi modules. The version pin is determined by a DB call done once per service lifetime thanks to caching. Handling the compatibility is guaranteed by oslo.messaging and our shims in rpcapi modules. To achieve o.vo compatibility, a similar approach is implemented. Custom oslo.messaging serializer is implemented that backports objects to the lowest running version on sending. During the process of upgrade it may happen that manager receives an object in the version lower than current one. Handling of such situations is up to the manager and it should do that explicitely by checking obj.VERSION. The patch also adds required methods to db.api and Service object. Co-Authored-By: Thang Pham Change-Id: I649892da64f9734928a6cf0f004a369aa7aa375f Partial-Implements: blueprint rpc-object-compatibility --- diff --git a/cinder/backup/rpcapi.py b/cinder/backup/rpcapi.py index d70e3bb3b..9ed3ccacc 100644 --- a/cinder/backup/rpcapi.py +++ b/cinder/backup/rpcapi.py @@ -20,18 +20,15 @@ Client side of the volume backup RPC API. from oslo_config import cfg from oslo_log import log as logging -import oslo_messaging as messaging -from cinder.objects import base as objects_base from cinder import rpc CONF = cfg.CONF - LOG = logging.getLogger(__name__) -class BackupAPI(object): +class BackupAPI(rpc.RPCAPI): """Client side of the volume rpc API. API version history: @@ -40,16 +37,9 @@ class BackupAPI(object): 1.1 - Changed methods to accept backup objects instead of IDs. """ - BASE_RPC_API_VERSION = '1.0' RPC_API_VERSION = '1.1' - - def __init__(self): - super(BackupAPI, self).__init__() - target = messaging.Target(topic=CONF.backup_topic, - version=self.BASE_RPC_API_VERSION) - serializer = objects_base.CinderObjectSerializer() - self.client = rpc.get_client(target, self.RPC_API_VERSION, - serializer=serializer) + TOPIC = CONF.backup_topic + BINARY = 'cinder-backup' def create_backup(self, ctxt, backup): LOG.debug("create_backup in rpcapi backup_id %s", backup.id) diff --git a/cinder/db/api.py b/cinder/db/api.py index d223e865e..89fceac00 100644 --- a/cinder/db/api.py +++ b/cinder/db/api.py @@ -116,6 +116,11 @@ def service_get_all_by_topic(context, topic, disabled=None): return IMPL.service_get_all_by_topic(context, topic, disabled=disabled) +def service_get_all_by_binary(context, binary, disabled=None): + """Get all services for a given binary.""" + return IMPL.service_get_all_by_binary(context, binary, disabled) + + def service_get_by_args(context, host, binary): """Get the state of a service by node name and binary.""" return IMPL.service_get_by_args(context, host, binary) diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 222ab1e58..e82edd890 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -393,6 +393,17 @@ def service_get_all_by_topic(context, topic, disabled=None): return query.all() +@require_admin_context +def service_get_all_by_binary(context, binary, disabled=None): + query = model_query( + context, models.Service, read_deleted="no").filter_by(binary=binary) + + if disabled is not None: + query = query.filter_by(disabled=disabled) + + return query.all() + + @require_admin_context def service_get_by_host_and_topic(context, host, topic): result = model_query( diff --git a/cinder/objects/base.py b/cinder/objects/base.py index f55ea4c6f..24f775a1d 100644 --- a/cinder/objects/base.py +++ b/cinder/objects/base.py @@ -96,6 +96,7 @@ OBJ_VERSIONS.add('1.0', {'Backup': '1.3', 'BackupImport': '1.3', 'ConsistencyGroup': '1.2', 'ConsistencyGroupList': '1.1', 'Service': '1.1', 'Volume': '1.3', 'VolumeTypeList': '1.1'}) +OBJ_VERSIONS.add('1.1', {'Service': '1.2', 'ServiceList': '1.1'}) class CinderObjectRegistry(base.VersionedObjectRegistry): @@ -385,3 +386,23 @@ class ObjectListBase(base.ObjectListBase): class CinderObjectSerializer(base.VersionedObjectSerializer): OBJ_BASE_CLASS = CinderObject + + def __init__(self, version_cap=None): + super(CinderObjectSerializer, self).__init__() + self.version_cap = version_cap + + def _get_capped_obj_version(self, obj): + objname = obj.obj_name() + objver = OBJ_VERSIONS.get(self.version_cap, {}) + return objver.get(objname, None) + + def serialize_entity(self, context, entity): + if isinstance(entity, (tuple, list, set, dict)): + entity = self._process_iterable(context, self.serialize_entity, + entity) + elif (hasattr(entity, 'obj_to_primitive') and + callable(entity.obj_to_primitive)): + # NOTE(dulek): Backport outgoing object to the capped version. + backport_ver = self._get_capped_obj_version(entity) + entity = entity.obj_to_primitive(backport_ver) + return entity diff --git a/cinder/objects/service.py b/cinder/objects/service.py index ab4847db3..8f28bfa67 100644 --- a/cinder/objects/service.py +++ b/cinder/objects/service.py @@ -14,6 +14,7 @@ from oslo_config import cfg from oslo_log import log as logging +from oslo_utils import versionutils from oslo_versionedobjects import fields from cinder import db @@ -33,7 +34,8 @@ class Service(base.CinderPersistentObject, base.CinderObject, base.CinderComparableObject): # Version 1.0: Initial version # Version 1.1: Add rpc_current_version and object_current_version fields - VERSION = '1.1' + # Version 1.2: Add get_minimum_rpc_version() and get_minimum_obj_version() + VERSION = '1.2' fields = { 'id': fields.IntegerField(), @@ -100,16 +102,49 @@ class Service(base.CinderPersistentObject, base.CinderObject, with self.obj_as_admin(): db.service_destroy(self._context, self.id) + @classmethod + def _get_minimum_version(cls, attribute, context, binary): + services = ServiceList.get_all_by_binary(context, binary) + min_ver = None + min_ver_str = None + for s in services: + ver_str = getattr(s, attribute) + if ver_str is None: + # FIXME(dulek) None in *_current_version means that this + # service is in Liberty version, so we must assume this is the + # lowest one. We use handy and easy to remember token to + # indicate that. This may go away as soon as we drop + # compatibility with Liberty, possibly in early N. + return 'liberty' + ver = versionutils.convert_version_to_int(ver_str) + if min_ver is None or ver < min_ver: + min_ver = ver + min_ver_str = ver_str + + return min_ver_str + + @base.remotable_classmethod + def get_minimum_rpc_version(cls, context, binary): + return cls._get_minimum_version('rpc_current_version', context, binary) + + @base.remotable_classmethod + def get_minimum_obj_version(cls, context, binary): + return cls._get_minimum_version('object_current_version', context, + binary) + @base.CinderObjectRegistry.register class ServiceList(base.ObjectListBase, base.CinderObject): - VERSION = '1.0' + # Version 1.0: Initial version + # Version 1.1: Service object 1.2 + VERSION = '1.1' fields = { 'objects': fields.ListOfObjectsField('Service'), } child_versions = { - '1.0': '1.0' + '1.0': '1.0', + '1.1': '1.2', } @base.remotable_classmethod @@ -124,3 +159,10 @@ class ServiceList(base.ObjectListBase, base.CinderObject): disabled=disabled) return base.obj_make_list(context, cls(context), objects.Service, services) + + @base.remotable_classmethod + def get_all_by_binary(cls, context, binary, disabled=None): + services = db.service_get_all_by_binary(context, binary, + disabled=disabled) + return base.obj_make_list(context, cls(context), objects.Service, + services) diff --git a/cinder/rpc.py b/cinder/rpc.py index 8ffab0e8b..9ade49d14 100644 --- a/cinder/rpc.py +++ b/cinder/rpc.py @@ -27,14 +27,19 @@ __all__ = [ ] from oslo_config import cfg +from oslo_log import log as logging import oslo_messaging as messaging from oslo_serialization import jsonutils from osprofiler import profiler import cinder.context import cinder.exception +from cinder.i18n import _LI +from cinder import objects +from cinder.objects import base CONF = cfg.CONF +LOG = logging.getLogger(__name__) TRANSPORT = None NOTIFIER = None @@ -160,3 +165,70 @@ def get_notifier(service=None, host=None, publisher_id=None): if not publisher_id: publisher_id = "%s.%s" % (service, host or CONF.host) return NOTIFIER.prepare(publisher_id=publisher_id) + + +LAST_RPC_VERSIONS = {} +LAST_OBJ_VERSIONS = {} + + +class RPCAPI(object): + """Mixin class aggregating methods related to RPC API compatibility.""" + + RPC_API_VERSION = '1.0' + TOPIC = '' + BINARY = '' + + def __init__(self): + target = messaging.Target(topic=self.TOPIC, + version=self.RPC_API_VERSION) + obj_version_cap = self._determine_obj_version_cap() + serializer = base.CinderObjectSerializer(obj_version_cap) + + rpc_version_cap = self._determine_rpc_version_cap() + self.client = get_client(target, version_cap=rpc_version_cap, + serializer=serializer) + + def _determine_rpc_version_cap(self): + global LAST_RPC_VERSIONS + if self.BINARY in LAST_RPC_VERSIONS: + return LAST_RPC_VERSIONS[self.BINARY] + + version_cap = objects.Service.get_minimum_rpc_version( + cinder.context.get_admin_context(), self.BINARY) + if version_cap == 'liberty': + # NOTE(dulek): This means that one of the services is Liberty, + # we should cap to it's RPC version. + version_cap = LIBERTY_RPC_VERSIONS[self.BINARY] + LOG.info(_LI('Automatically selected %(binary)s RPC version ' + '%(version)s as minimum service version.'), + {'binary': self.BINARY, 'version': version_cap}) + LAST_RPC_VERSIONS[self.BINARY] = version_cap + return version_cap + + def _determine_obj_version_cap(self): + global LAST_OBJ_VERSIONS + if self.BINARY in LAST_OBJ_VERSIONS: + return LAST_OBJ_VERSIONS[self.BINARY] + + version_cap = objects.Service.get_minimum_obj_version( + cinder.context.get_admin_context(), self.BINARY) + LOG.info(_LI('Automatically selected %(binary)s objects version ' + '%(version)s as minimum service version.'), + {'binary': self.BINARY, 'version': version_cap}) + LAST_OBJ_VERSIONS[self.BINARY] = version_cap + return version_cap + + +# FIXME(dulek): Liberty haven't reported its RPC versions, so we need to have +# them hardcoded. This dict may go away as soon as we drop compatibility with +# L, which should be in early N. +# +# This is the only time we need to have such dictionary. We don't need to add +# similar ones for any release following Liberty. +LIBERTY_RPC_VERSIONS = { + 'cinder-volume': '1.30', + 'cinder-scheduler': '1.8', + # NOTE(dulek) backup.manager had specified version '1.2', but backup.rpcapi + # was really only sending messages up to '1.1'. + 'cinder-backup': '1.1', +} diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index f227be62b..ee76981f9 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -17,17 +17,15 @@ Client side of the scheduler manager RPC API. """ from oslo_config import cfg -import oslo_messaging as messaging from oslo_serialization import jsonutils -from cinder.objects import base as objects_base from cinder import rpc CONF = cfg.CONF -class SchedulerAPI(object): +class SchedulerAPI(rpc.RPCAPI): """Client side of the scheduler rpc API. API version history: @@ -48,18 +46,9 @@ class SchedulerAPI(object): migrate_volume_to_host() """ - RPC_API_VERSION = '1.0' - - def __init__(self): - super(SchedulerAPI, self).__init__() - target = messaging.Target(topic=CONF.scheduler_topic, - version=self.RPC_API_VERSION) - serializer = objects_base.CinderObjectSerializer() - - # NOTE(thangp): Until version pinning is impletemented, set the client - # version_cap to None - self.client = rpc.get_client(target, version_cap=None, - serializer=serializer) + RPC_API_VERSION = '1.11' + TOPIC = CONF.scheduler_topic + BINARY = 'cinder-scheduler' def create_consistencygroup(self, ctxt, topic, group, request_spec_list=None, diff --git a/cinder/tests/unit/backup/test_rpcapi.py b/cinder/tests/unit/backup/test_rpcapi.py index 4f6f745e8..4354a8194 100644 --- a/cinder/tests/unit/backup/test_rpcapi.py +++ b/cinder/tests/unit/backup/test_rpcapi.py @@ -45,7 +45,7 @@ class BackupRpcAPITestCase(test.TestCase): target = { "server": server, "fanout": fanout, - "version": kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION) + "version": kwargs.pop('version', rpcapi.RPC_API_VERSION) } expected_msg = copy.deepcopy(kwargs) diff --git a/cinder/tests/unit/objects/test_base.py b/cinder/tests/unit/objects/test_base.py index e487ac48f..7f53518ef 100644 --- a/cinder/tests/unit/objects/test_base.py +++ b/cinder/tests/unit/objects/test_base.py @@ -17,6 +17,7 @@ import mock import uuid from iso8601 import iso8601 +from oslo_utils import versionutils from oslo_versionedobjects import fields from sqlalchemy import sql @@ -30,12 +31,21 @@ from cinder.tests.unit import objects as test_objects @objects.base.CinderObjectRegistry.register_if(False) class TestObject(objects.base.CinderObject): + VERSION = '1.1' + fields = { 'scheduled_at': objects.base.fields.DateTimeField(nullable=True), 'uuid': objects.base.fields.UUIDField(), 'text': objects.base.fields.StringField(nullable=True), } + def obj_make_compatible(self, primitive, target_version): + super(TestObject, self).obj_make_compatible(primitive, + target_version) + target_version = versionutils.convert_version_to_tuple(target_version) + if target_version < (1, 1): + primitive.pop('text', None) + class TestCinderObject(test_objects.BaseObjectsTestCase): """Tests methods from CinderObject.""" @@ -595,3 +605,23 @@ class TestCinderDictObject(test_objects.BaseObjectsTestCase): self.assertTrue('foo' in obj) self.assertTrue('abc' in obj) self.assertFalse('def' in obj) + + +@mock.patch('cinder.objects.base.OBJ_VERSIONS', {'1.0': {'TestObject': '1.0'}, + '1.1': {'TestObject': '1.1'}, + }) +class TestCinderObjectSerializer(test_objects.BaseObjectsTestCase): + def setUp(self): + super(TestCinderObjectSerializer, self).setUp() + self.obj = TestObject(scheduled_at=None, uuid=uuid.uuid4(), + text='text') + + def test_serialize_entity_backport(self): + serializer = objects.base.CinderObjectSerializer('1.0') + primitive = serializer.serialize_entity(self.context, self.obj) + self.assertEqual('1.0', primitive['versioned_object.version']) + + def test_serialize_entity_unknown_version(self): + serializer = objects.base.CinderObjectSerializer('0.9') + primitive = serializer.serialize_entity(self.context, self.obj) + self.assertEqual('1.1', primitive['versioned_object.version']) diff --git a/cinder/tests/unit/objects/test_objects.py b/cinder/tests/unit/objects/test_objects.py index 8908c25f9..a5b89e010 100644 --- a/cinder/tests/unit/objects/test_objects.py +++ b/cinder/tests/unit/objects/test_objects.py @@ -28,8 +28,8 @@ object_data = { 'CGSnapshotList': '1.0-e8c3f4078cd0ee23487b34d173eec776', 'ConsistencyGroup': '1.2-ed7f90a6871991a19af716ade7337fc9', 'ConsistencyGroupList': '1.1-73916823b697dfa0c7f02508d87e0f28', - 'Service': '1.1-9eb00cbd8e2bfb7371343429af54d6e8', - 'ServiceList': '1.0-d242d3384b68e5a5a534e090ff1d5161', + 'Service': '1.2-4d3dd6c9906da364739fbf3f90c80505', + 'ServiceList': '1.1-cb758b200f0a3a90efabfc5aa2ffb627', 'Snapshot': '1.0-a6c33eefeadefb324d79f72f66c54e9a', 'SnapshotList': '1.0-71661e7180ef6cc51501704a9bea4bf1', 'Volume': '1.3-97c3977846dae6588381e7bd3e6e6558', diff --git a/cinder/tests/unit/objects/test_service.py b/cinder/tests/unit/objects/test_service.py index dc10236ac..923cffc21 100644 --- a/cinder/tests/unit/objects/test_service.py +++ b/cinder/tests/unit/objects/test_service.py @@ -97,6 +97,39 @@ class TestService(test_objects.BaseObjectsTestCase): mock.call.__nonzero__(), mock.call(self.context, 123)]) + @mock.patch('cinder.db.service_get_all_by_binary') + def _test_get_minimum_version(self, services_update, expected, + service_get_all_by_binary): + services = [fake_service.fake_db_service(**s) for s in services_update] + service_get_all_by_binary.return_value = services + + min_rpc = objects.Service.get_minimum_rpc_version(self.context, 'foo') + self.assertEqual(expected[0], min_rpc) + min_obj = objects.Service.get_minimum_obj_version(self.context, 'foo') + self.assertEqual(expected[1], min_obj) + service_get_all_by_binary.assert_has_calls( + [mock.call(self.context, 'foo', disabled=None)] * 2) + + @mock.patch('cinder.db.service_get_all_by_binary') + def test_get_minimum_version(self, service_get_all_by_binary): + services_update = [ + {'rpc_current_version': '1.0', 'object_current_version': '1.3'}, + {'rpc_current_version': '1.1', 'object_current_version': '1.2'}, + {'rpc_current_version': '2.0', 'object_current_version': '2.5'}, + ] + expected = ('1.0', '1.2') + self._test_get_minimum_version(services_update, expected) + + @mock.patch('cinder.db.service_get_all_by_binary') + def test_get_minimum_version_liberty(self, service_get_all_by_binary): + services_update = [ + {'rpc_current_version': '1.0', 'object_current_version': '1.3'}, + {'rpc_current_version': '1.1', 'object_current_version': None}, + {'rpc_current_version': None, 'object_current_version': '2.5'}, + ] + expected = ('liberty', 'liberty') + self._test_get_minimum_version(services_update, expected) + class TestServiceList(test_objects.BaseObjectsTestCase): @mock.patch('cinder.db.service_get_all') @@ -120,3 +153,15 @@ class TestServiceList(test_objects.BaseObjectsTestCase): self.context, 'foo', disabled='bar') self.assertEqual(1, len(services)) TestService._compare(self, db_service, services[0]) + + @mock.patch('cinder.db.service_get_all_by_binary') + def test_get_all_by_binary(self, service_get_all_by_binary): + db_service = fake_service.fake_db_service() + service_get_all_by_binary.return_value = [db_service] + + services = objects.ServiceList.get_all_by_binary( + self.context, 'foo', 'bar') + service_get_all_by_binary.assert_called_once_with( + self.context, 'foo', disabled='bar') + self.assertEqual(1, len(services)) + TestService._compare(self, db_service, services[0]) diff --git a/cinder/tests/unit/test_db_api.py b/cinder/tests/unit/test_db_api.py index 058d895b0..ed3215378 100644 --- a/cinder/tests/unit/test_db_api.py +++ b/cinder/tests/unit/test_db_api.py @@ -209,6 +209,18 @@ class DBAPIServiceTestCase(BaseTest): real = db.service_get_all_by_topic(self.ctxt, 't1') self._assertEqualListsOfObjects(expected, real) + def test_service_get_all_by_binary(self): + values = [ + {'host': 'host1', 'binary': 'b1'}, + {'host': 'host2', 'binary': 'b1'}, + {'host': 'host4', 'disabled': True, 'binary': 'b1'}, + {'host': 'host3', 'binary': 'b2'} + ] + services = [self._create_service(vals) for vals in values] + expected = services[:3] + real = db.service_get_all_by_binary(self.ctxt, 'b1') + self._assertEqualListsOfObjects(expected, real) + def test_service_get_by_args(self): values = [ {'host': 'host1', 'binary': 'a'}, diff --git a/cinder/tests/unit/test_rpc.py b/cinder/tests/unit/test_rpc.py new file mode 100644 index 000000000..cd52325e0 --- /dev/null +++ b/cinder/tests/unit/test_rpc.py @@ -0,0 +1,83 @@ +# Copyright 2015 Intel Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from cinder import rpc +from cinder import test + + +class FakeAPI(rpc.RPCAPI): + RPC_API_VERSION = '1.5' + TOPIC = 'cinder-scheduler-topic' + BINARY = 'cinder-scheduler' + + +class RPCAPITestCase(test.TestCase): + """Tests RPCAPI mixin aggregating stuff related to RPC compatibility.""" + + def setUp(self): + super(RPCAPITestCase, self).setUp() + # Reset cached version pins + rpc.LAST_RPC_VERSIONS = {} + rpc.LAST_OBJ_VERSIONS = {} + + @mock.patch('cinder.objects.Service.get_minimum_rpc_version', + return_value='1.2') + @mock.patch('cinder.objects.Service.get_minimum_obj_version', + return_value='1.7') + @mock.patch('cinder.rpc.get_client') + def test_init(self, get_client, get_min_obj, get_min_rpc): + def fake_get_client(target, version_cap, serializer): + self.assertEqual(FakeAPI.TOPIC, target.topic) + self.assertEqual(FakeAPI.RPC_API_VERSION, target.version) + self.assertEqual('1.2', version_cap) + self.assertEqual('1.7', serializer.version_cap) + + get_client.side_effect = fake_get_client + FakeAPI() + + @mock.patch('cinder.objects.Service.get_minimum_rpc_version', + return_value='liberty') + @mock.patch('cinder.objects.Service.get_minimum_obj_version', + return_value='liberty') + @mock.patch('cinder.rpc.get_client') + def test_init_liberty_caps(self, get_client, get_min_obj, get_min_rpc): + def fake_get_client(target, version_cap, serializer): + self.assertEqual(FakeAPI.TOPIC, target.topic) + self.assertEqual(FakeAPI.RPC_API_VERSION, target.version) + self.assertEqual(rpc.LIBERTY_RPC_VERSIONS[FakeAPI.BINARY], + version_cap) + self.assertEqual('liberty', serializer.version_cap) + + get_client.side_effect = fake_get_client + FakeAPI() + + @mock.patch('cinder.objects.Service.get_minimum_rpc_version') + @mock.patch('cinder.objects.Service.get_minimum_obj_version') + @mock.patch('cinder.rpc.get_client') + @mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-scheduler': '1.4'}) + @mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-scheduler': '1.3'}) + def test_init_cached_caps(self, get_client, get_min_obj, get_min_rpc): + def fake_get_client(target, version_cap, serializer): + self.assertEqual(FakeAPI.TOPIC, target.topic) + self.assertEqual(FakeAPI.RPC_API_VERSION, target.version) + self.assertEqual('1.4', version_cap) + self.assertEqual('1.3', serializer.version_cap) + + get_client.side_effect = fake_get_client + FakeAPI() + + self.assertFalse(get_min_obj.called) + self.assertFalse(get_min_rpc.called) diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index 05ec502f2..e03a49f0e 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -110,7 +110,7 @@ class VolumeRpcAPITestCase(test.TestCase): expected_retval = 'foo' if method == 'call' else None target = { - "version": kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION) + "version": kwargs.pop('version', rpcapi.RPC_API_VERSION) } if 'request_spec' in kwargs: diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 9600a140b..7fd60cb86 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -17,10 +17,8 @@ Client side of the volume RPC API. """ from oslo_config import cfg -import oslo_messaging as messaging from oslo_serialization import jsonutils -from cinder.objects import base as objects_base from cinder import rpc from cinder.volume import utils @@ -28,7 +26,7 @@ from cinder.volume import utils CONF = cfg.CONF -class VolumeAPI(object): +class VolumeAPI(rpc.RPCAPI): """Client side of the volume rpc API. API version history: @@ -89,18 +87,9 @@ class VolumeAPI(object): checks in the API. """ - BASE_RPC_API_VERSION = '1.0' - - def __init__(self, topic=None): - super(VolumeAPI, self).__init__() - target = messaging.Target(topic=CONF.volume_topic, - version=self.BASE_RPC_API_VERSION) - serializer = objects_base.CinderObjectSerializer() - - # NOTE(thangp): Until version pinning is impletemented, set the client - # version_cap to None - self.client = rpc.get_client(target, version_cap=None, - serializer=serializer) + RPC_API_VERSION = '1.37' + TOPIC = CONF.volume_topic + BINARY = 'cinder-volume' def create_consistencygroup(self, ctxt, group, host): new_host = utils.extract_host(host)