from oslo_config import cfg
from oslo_log import log as logging
-import oslo_messaging as messaging
-from cinder.objects import base as objects_base
from cinder import rpc
CONF = cfg.CONF
-
LOG = logging.getLogger(__name__)
-class BackupAPI(object):
+class BackupAPI(rpc.RPCAPI):
"""Client side of the volume rpc API.
API version history:
1.1 - Changed methods to accept backup objects instead of IDs.
"""
- BASE_RPC_API_VERSION = '1.0'
RPC_API_VERSION = '1.1'
-
- def __init__(self):
- super(BackupAPI, self).__init__()
- target = messaging.Target(topic=CONF.backup_topic,
- version=self.BASE_RPC_API_VERSION)
- serializer = objects_base.CinderObjectSerializer()
- self.client = rpc.get_client(target, self.RPC_API_VERSION,
- serializer=serializer)
+ TOPIC = CONF.backup_topic
+ BINARY = 'cinder-backup'
def create_backup(self, ctxt, backup):
LOG.debug("create_backup in rpcapi backup_id %s", backup.id)
return IMPL.service_get_all_by_topic(context, topic, disabled=disabled)
+def service_get_all_by_binary(context, binary, disabled=None):
+ """Get all services for a given binary."""
+ return IMPL.service_get_all_by_binary(context, binary, disabled)
+
+
def service_get_by_args(context, host, binary):
"""Get the state of a service by node name and binary."""
return IMPL.service_get_by_args(context, host, binary)
return query.all()
+@require_admin_context
+def service_get_all_by_binary(context, binary, disabled=None):
+ query = model_query(
+ context, models.Service, read_deleted="no").filter_by(binary=binary)
+
+ if disabled is not None:
+ query = query.filter_by(disabled=disabled)
+
+ return query.all()
+
+
@require_admin_context
def service_get_by_host_and_topic(context, host, topic):
result = model_query(
'ConsistencyGroup': '1.2',
'ConsistencyGroupList': '1.1', 'Service': '1.1',
'Volume': '1.3', 'VolumeTypeList': '1.1'})
+OBJ_VERSIONS.add('1.1', {'Service': '1.2', 'ServiceList': '1.1'})
class CinderObjectRegistry(base.VersionedObjectRegistry):
class CinderObjectSerializer(base.VersionedObjectSerializer):
OBJ_BASE_CLASS = CinderObject
+
+ def __init__(self, version_cap=None):
+ super(CinderObjectSerializer, self).__init__()
+ self.version_cap = version_cap
+
+ def _get_capped_obj_version(self, obj):
+ objname = obj.obj_name()
+ objver = OBJ_VERSIONS.get(self.version_cap, {})
+ return objver.get(objname, None)
+
+ def serialize_entity(self, context, entity):
+ if isinstance(entity, (tuple, list, set, dict)):
+ entity = self._process_iterable(context, self.serialize_entity,
+ entity)
+ elif (hasattr(entity, 'obj_to_primitive') and
+ callable(entity.obj_to_primitive)):
+ # NOTE(dulek): Backport outgoing object to the capped version.
+ backport_ver = self._get_capped_obj_version(entity)
+ entity = entity.obj_to_primitive(backport_ver)
+ return entity
from oslo_config import cfg
from oslo_log import log as logging
+from oslo_utils import versionutils
from oslo_versionedobjects import fields
from cinder import db
base.CinderComparableObject):
# Version 1.0: Initial version
# Version 1.1: Add rpc_current_version and object_current_version fields
- VERSION = '1.1'
+ # Version 1.2: Add get_minimum_rpc_version() and get_minimum_obj_version()
+ VERSION = '1.2'
fields = {
'id': fields.IntegerField(),
with self.obj_as_admin():
db.service_destroy(self._context, self.id)
+ @classmethod
+ def _get_minimum_version(cls, attribute, context, binary):
+ services = ServiceList.get_all_by_binary(context, binary)
+ min_ver = None
+ min_ver_str = None
+ for s in services:
+ ver_str = getattr(s, attribute)
+ if ver_str is None:
+ # FIXME(dulek) None in *_current_version means that this
+ # service is in Liberty version, so we must assume this is the
+ # lowest one. We use handy and easy to remember token to
+ # indicate that. This may go away as soon as we drop
+ # compatibility with Liberty, possibly in early N.
+ return 'liberty'
+ ver = versionutils.convert_version_to_int(ver_str)
+ if min_ver is None or ver < min_ver:
+ min_ver = ver
+ min_ver_str = ver_str
+
+ return min_ver_str
+
+ @base.remotable_classmethod
+ def get_minimum_rpc_version(cls, context, binary):
+ return cls._get_minimum_version('rpc_current_version', context, binary)
+
+ @base.remotable_classmethod
+ def get_minimum_obj_version(cls, context, binary):
+ return cls._get_minimum_version('object_current_version', context,
+ binary)
+
@base.CinderObjectRegistry.register
class ServiceList(base.ObjectListBase, base.CinderObject):
- VERSION = '1.0'
+ # Version 1.0: Initial version
+ # Version 1.1: Service object 1.2
+ VERSION = '1.1'
fields = {
'objects': fields.ListOfObjectsField('Service'),
}
child_versions = {
- '1.0': '1.0'
+ '1.0': '1.0',
+ '1.1': '1.2',
}
@base.remotable_classmethod
disabled=disabled)
return base.obj_make_list(context, cls(context), objects.Service,
services)
+
+ @base.remotable_classmethod
+ def get_all_by_binary(cls, context, binary, disabled=None):
+ services = db.service_get_all_by_binary(context, binary,
+ disabled=disabled)
+ return base.obj_make_list(context, cls(context), objects.Service,
+ services)
]
from oslo_config import cfg
+from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from osprofiler import profiler
import cinder.context
import cinder.exception
+from cinder.i18n import _LI
+from cinder import objects
+from cinder.objects import base
CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
TRANSPORT = None
NOTIFIER = None
if not publisher_id:
publisher_id = "%s.%s" % (service, host or CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)
+
+
+LAST_RPC_VERSIONS = {}
+LAST_OBJ_VERSIONS = {}
+
+
+class RPCAPI(object):
+ """Mixin class aggregating methods related to RPC API compatibility."""
+
+ RPC_API_VERSION = '1.0'
+ TOPIC = ''
+ BINARY = ''
+
+ def __init__(self):
+ target = messaging.Target(topic=self.TOPIC,
+ version=self.RPC_API_VERSION)
+ obj_version_cap = self._determine_obj_version_cap()
+ serializer = base.CinderObjectSerializer(obj_version_cap)
+
+ rpc_version_cap = self._determine_rpc_version_cap()
+ self.client = get_client(target, version_cap=rpc_version_cap,
+ serializer=serializer)
+
+ def _determine_rpc_version_cap(self):
+ global LAST_RPC_VERSIONS
+ if self.BINARY in LAST_RPC_VERSIONS:
+ return LAST_RPC_VERSIONS[self.BINARY]
+
+ version_cap = objects.Service.get_minimum_rpc_version(
+ cinder.context.get_admin_context(), self.BINARY)
+ if version_cap == 'liberty':
+ # NOTE(dulek): This means that one of the services is Liberty,
+ # we should cap to it's RPC version.
+ version_cap = LIBERTY_RPC_VERSIONS[self.BINARY]
+ LOG.info(_LI('Automatically selected %(binary)s RPC version '
+ '%(version)s as minimum service version.'),
+ {'binary': self.BINARY, 'version': version_cap})
+ LAST_RPC_VERSIONS[self.BINARY] = version_cap
+ return version_cap
+
+ def _determine_obj_version_cap(self):
+ global LAST_OBJ_VERSIONS
+ if self.BINARY in LAST_OBJ_VERSIONS:
+ return LAST_OBJ_VERSIONS[self.BINARY]
+
+ version_cap = objects.Service.get_minimum_obj_version(
+ cinder.context.get_admin_context(), self.BINARY)
+ LOG.info(_LI('Automatically selected %(binary)s objects version '
+ '%(version)s as minimum service version.'),
+ {'binary': self.BINARY, 'version': version_cap})
+ LAST_OBJ_VERSIONS[self.BINARY] = version_cap
+ return version_cap
+
+
+# FIXME(dulek): Liberty haven't reported its RPC versions, so we need to have
+# them hardcoded. This dict may go away as soon as we drop compatibility with
+# L, which should be in early N.
+#
+# This is the only time we need to have such dictionary. We don't need to add
+# similar ones for any release following Liberty.
+LIBERTY_RPC_VERSIONS = {
+ 'cinder-volume': '1.30',
+ 'cinder-scheduler': '1.8',
+ # NOTE(dulek) backup.manager had specified version '1.2', but backup.rpcapi
+ # was really only sending messages up to '1.1'.
+ 'cinder-backup': '1.1',
+}
"""
from oslo_config import cfg
-import oslo_messaging as messaging
from oslo_serialization import jsonutils
-from cinder.objects import base as objects_base
from cinder import rpc
CONF = cfg.CONF
-class SchedulerAPI(object):
+class SchedulerAPI(rpc.RPCAPI):
"""Client side of the scheduler rpc API.
API version history:
migrate_volume_to_host()
"""
- RPC_API_VERSION = '1.0'
-
- def __init__(self):
- super(SchedulerAPI, self).__init__()
- target = messaging.Target(topic=CONF.scheduler_topic,
- version=self.RPC_API_VERSION)
- serializer = objects_base.CinderObjectSerializer()
-
- # NOTE(thangp): Until version pinning is impletemented, set the client
- # version_cap to None
- self.client = rpc.get_client(target, version_cap=None,
- serializer=serializer)
+ RPC_API_VERSION = '1.11'
+ TOPIC = CONF.scheduler_topic
+ BINARY = 'cinder-scheduler'
def create_consistencygroup(self, ctxt, topic, group,
request_spec_list=None,
target = {
"server": server,
"fanout": fanout,
- "version": kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION)
+ "version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
}
expected_msg = copy.deepcopy(kwargs)
import uuid
from iso8601 import iso8601
+from oslo_utils import versionutils
from oslo_versionedobjects import fields
from sqlalchemy import sql
@objects.base.CinderObjectRegistry.register_if(False)
class TestObject(objects.base.CinderObject):
+ VERSION = '1.1'
+
fields = {
'scheduled_at': objects.base.fields.DateTimeField(nullable=True),
'uuid': objects.base.fields.UUIDField(),
'text': objects.base.fields.StringField(nullable=True),
}
+ def obj_make_compatible(self, primitive, target_version):
+ super(TestObject, self).obj_make_compatible(primitive,
+ target_version)
+ target_version = versionutils.convert_version_to_tuple(target_version)
+ if target_version < (1, 1):
+ primitive.pop('text', None)
+
class TestCinderObject(test_objects.BaseObjectsTestCase):
"""Tests methods from CinderObject."""
self.assertTrue('foo' in obj)
self.assertTrue('abc' in obj)
self.assertFalse('def' in obj)
+
+
+@mock.patch('cinder.objects.base.OBJ_VERSIONS', {'1.0': {'TestObject': '1.0'},
+ '1.1': {'TestObject': '1.1'},
+ })
+class TestCinderObjectSerializer(test_objects.BaseObjectsTestCase):
+ def setUp(self):
+ super(TestCinderObjectSerializer, self).setUp()
+ self.obj = TestObject(scheduled_at=None, uuid=uuid.uuid4(),
+ text='text')
+
+ def test_serialize_entity_backport(self):
+ serializer = objects.base.CinderObjectSerializer('1.0')
+ primitive = serializer.serialize_entity(self.context, self.obj)
+ self.assertEqual('1.0', primitive['versioned_object.version'])
+
+ def test_serialize_entity_unknown_version(self):
+ serializer = objects.base.CinderObjectSerializer('0.9')
+ primitive = serializer.serialize_entity(self.context, self.obj)
+ self.assertEqual('1.1', primitive['versioned_object.version'])
'CGSnapshotList': '1.0-e8c3f4078cd0ee23487b34d173eec776',
'ConsistencyGroup': '1.2-ed7f90a6871991a19af716ade7337fc9',
'ConsistencyGroupList': '1.1-73916823b697dfa0c7f02508d87e0f28',
- 'Service': '1.1-9eb00cbd8e2bfb7371343429af54d6e8',
- 'ServiceList': '1.0-d242d3384b68e5a5a534e090ff1d5161',
+ 'Service': '1.2-4d3dd6c9906da364739fbf3f90c80505',
+ 'ServiceList': '1.1-cb758b200f0a3a90efabfc5aa2ffb627',
'Snapshot': '1.0-a6c33eefeadefb324d79f72f66c54e9a',
'SnapshotList': '1.0-71661e7180ef6cc51501704a9bea4bf1',
'Volume': '1.3-97c3977846dae6588381e7bd3e6e6558',
mock.call.__nonzero__(),
mock.call(self.context, 123)])
+ @mock.patch('cinder.db.service_get_all_by_binary')
+ def _test_get_minimum_version(self, services_update, expected,
+ service_get_all_by_binary):
+ services = [fake_service.fake_db_service(**s) for s in services_update]
+ service_get_all_by_binary.return_value = services
+
+ min_rpc = objects.Service.get_minimum_rpc_version(self.context, 'foo')
+ self.assertEqual(expected[0], min_rpc)
+ min_obj = objects.Service.get_minimum_obj_version(self.context, 'foo')
+ self.assertEqual(expected[1], min_obj)
+ service_get_all_by_binary.assert_has_calls(
+ [mock.call(self.context, 'foo', disabled=None)] * 2)
+
+ @mock.patch('cinder.db.service_get_all_by_binary')
+ def test_get_minimum_version(self, service_get_all_by_binary):
+ services_update = [
+ {'rpc_current_version': '1.0', 'object_current_version': '1.3'},
+ {'rpc_current_version': '1.1', 'object_current_version': '1.2'},
+ {'rpc_current_version': '2.0', 'object_current_version': '2.5'},
+ ]
+ expected = ('1.0', '1.2')
+ self._test_get_minimum_version(services_update, expected)
+
+ @mock.patch('cinder.db.service_get_all_by_binary')
+ def test_get_minimum_version_liberty(self, service_get_all_by_binary):
+ services_update = [
+ {'rpc_current_version': '1.0', 'object_current_version': '1.3'},
+ {'rpc_current_version': '1.1', 'object_current_version': None},
+ {'rpc_current_version': None, 'object_current_version': '2.5'},
+ ]
+ expected = ('liberty', 'liberty')
+ self._test_get_minimum_version(services_update, expected)
+
class TestServiceList(test_objects.BaseObjectsTestCase):
@mock.patch('cinder.db.service_get_all')
self.context, 'foo', disabled='bar')
self.assertEqual(1, len(services))
TestService._compare(self, db_service, services[0])
+
+ @mock.patch('cinder.db.service_get_all_by_binary')
+ def test_get_all_by_binary(self, service_get_all_by_binary):
+ db_service = fake_service.fake_db_service()
+ service_get_all_by_binary.return_value = [db_service]
+
+ services = objects.ServiceList.get_all_by_binary(
+ self.context, 'foo', 'bar')
+ service_get_all_by_binary.assert_called_once_with(
+ self.context, 'foo', disabled='bar')
+ self.assertEqual(1, len(services))
+ TestService._compare(self, db_service, services[0])
real = db.service_get_all_by_topic(self.ctxt, 't1')
self._assertEqualListsOfObjects(expected, real)
+ def test_service_get_all_by_binary(self):
+ values = [
+ {'host': 'host1', 'binary': 'b1'},
+ {'host': 'host2', 'binary': 'b1'},
+ {'host': 'host4', 'disabled': True, 'binary': 'b1'},
+ {'host': 'host3', 'binary': 'b2'}
+ ]
+ services = [self._create_service(vals) for vals in values]
+ expected = services[:3]
+ real = db.service_get_all_by_binary(self.ctxt, 'b1')
+ self._assertEqualListsOfObjects(expected, real)
+
def test_service_get_by_args(self):
values = [
{'host': 'host1', 'binary': 'a'},
--- /dev/null
+# Copyright 2015 Intel Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from cinder import rpc
+from cinder import test
+
+
+class FakeAPI(rpc.RPCAPI):
+ RPC_API_VERSION = '1.5'
+ TOPIC = 'cinder-scheduler-topic'
+ BINARY = 'cinder-scheduler'
+
+
+class RPCAPITestCase(test.TestCase):
+ """Tests RPCAPI mixin aggregating stuff related to RPC compatibility."""
+
+ def setUp(self):
+ super(RPCAPITestCase, self).setUp()
+ # Reset cached version pins
+ rpc.LAST_RPC_VERSIONS = {}
+ rpc.LAST_OBJ_VERSIONS = {}
+
+ @mock.patch('cinder.objects.Service.get_minimum_rpc_version',
+ return_value='1.2')
+ @mock.patch('cinder.objects.Service.get_minimum_obj_version',
+ return_value='1.7')
+ @mock.patch('cinder.rpc.get_client')
+ def test_init(self, get_client, get_min_obj, get_min_rpc):
+ def fake_get_client(target, version_cap, serializer):
+ self.assertEqual(FakeAPI.TOPIC, target.topic)
+ self.assertEqual(FakeAPI.RPC_API_VERSION, target.version)
+ self.assertEqual('1.2', version_cap)
+ self.assertEqual('1.7', serializer.version_cap)
+
+ get_client.side_effect = fake_get_client
+ FakeAPI()
+
+ @mock.patch('cinder.objects.Service.get_minimum_rpc_version',
+ return_value='liberty')
+ @mock.patch('cinder.objects.Service.get_minimum_obj_version',
+ return_value='liberty')
+ @mock.patch('cinder.rpc.get_client')
+ def test_init_liberty_caps(self, get_client, get_min_obj, get_min_rpc):
+ def fake_get_client(target, version_cap, serializer):
+ self.assertEqual(FakeAPI.TOPIC, target.topic)
+ self.assertEqual(FakeAPI.RPC_API_VERSION, target.version)
+ self.assertEqual(rpc.LIBERTY_RPC_VERSIONS[FakeAPI.BINARY],
+ version_cap)
+ self.assertEqual('liberty', serializer.version_cap)
+
+ get_client.side_effect = fake_get_client
+ FakeAPI()
+
+ @mock.patch('cinder.objects.Service.get_minimum_rpc_version')
+ @mock.patch('cinder.objects.Service.get_minimum_obj_version')
+ @mock.patch('cinder.rpc.get_client')
+ @mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-scheduler': '1.4'})
+ @mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-scheduler': '1.3'})
+ def test_init_cached_caps(self, get_client, get_min_obj, get_min_rpc):
+ def fake_get_client(target, version_cap, serializer):
+ self.assertEqual(FakeAPI.TOPIC, target.topic)
+ self.assertEqual(FakeAPI.RPC_API_VERSION, target.version)
+ self.assertEqual('1.4', version_cap)
+ self.assertEqual('1.3', serializer.version_cap)
+
+ get_client.side_effect = fake_get_client
+ FakeAPI()
+
+ self.assertFalse(get_min_obj.called)
+ self.assertFalse(get_min_rpc.called)
expected_retval = 'foo' if method == 'call' else None
target = {
- "version": kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION)
+ "version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
}
if 'request_spec' in kwargs:
"""
from oslo_config import cfg
-import oslo_messaging as messaging
from oslo_serialization import jsonutils
-from cinder.objects import base as objects_base
from cinder import rpc
from cinder.volume import utils
CONF = cfg.CONF
-class VolumeAPI(object):
+class VolumeAPI(rpc.RPCAPI):
"""Client side of the volume rpc API.
API version history:
checks in the API.
"""
- BASE_RPC_API_VERSION = '1.0'
-
- def __init__(self, topic=None):
- super(VolumeAPI, self).__init__()
- target = messaging.Target(topic=CONF.volume_topic,
- version=self.BASE_RPC_API_VERSION)
- serializer = objects_base.CinderObjectSerializer()
-
- # NOTE(thangp): Until version pinning is impletemented, set the client
- # version_cap to None
- self.client = rpc.get_client(target, version_cap=None,
- serializer=serializer)
+ RPC_API_VERSION = '1.37'
+ TOPIC = CONF.volume_topic
+ BINARY = 'cinder-volume'
def create_consistencygroup(self, ctxt, group, host):
new_host = utils.extract_host(host)