]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add scheduler RPC API v2.0
authorMichał Dulko <michal.dulko@intel.com>
Wed, 17 Feb 2016 10:15:57 +0000 (11:15 +0100)
committerMichał Dulko <michal.dulko@intel.com>
Mon, 29 Feb 2016 13:55:22 +0000 (14:55 +0100)
This patch creates scheduler RPC API version 2.0, while retaining
compatibility in rpcapi and manager for 1.x, allowing for continuous
deployment scenarios.

This should be merged just before the Mitaka release.

UpgradeImpact - Deployments doing continous deployment should not
upgrade into Newton before doing an upgrade which includes all the
Mitaka's RPC API version bump commits (scheduler, volume, backup).

Change-Id: I9870462cf32be102a895f6e70ef843bfadf85a9d
Related-Blueprint: rpc-object-compatibility

cinder/scheduler/manager.py
cinder/scheduler/rpcapi.py
cinder/tests/unit/scheduler/test_rpcapi.py

index d68508a7e65e7a8e273578e1180d6a2a324c7a71..309f8a5b1b38429ee24aee82fa16ffe96c3954b4 100644 (file)
@@ -66,6 +66,7 @@ class SchedulerManager(manager.Manager):
             scheduler_driver = CONF.scheduler_driver
         self.driver = importutils.import_object(scheduler_driver)
         super(SchedulerManager, self).__init__(*args, **kwargs)
+        self.additional_endpoints.append(_SchedulerV2Proxy(self))
         self._startup_delay = True
 
     def init_host_with_rpc(self):
@@ -315,3 +316,59 @@ class SchedulerManager(manager.Manager):
         rpc.get_notifier("scheduler").error(context,
                                             'scheduler.' + method,
                                             payload)
+
+
+# TODO(dulek): This goes away immediately in Newton and is just present in
+# Mitaka so that we can receive v1.x and v2.0 messages.
+class _SchedulerV2Proxy(object):
+
+    target = messaging.Target(version='2.0')
+
+    def __init__(self, manager):
+        self.manager = manager
+
+    def update_service_capabilities(self, context, service_name=None,
+                                    host=None, capabilities=None, **kwargs):
+        return self.manager.update_service_capabilities(
+            context, service_name=service_name, host=host,
+            capabilities=capabilities, **kwargs)
+
+    def create_consistencygroup(self, context, topic, group,
+                                request_spec_list=None,
+                                filter_properties_list=None):
+        return self.manager.create_consistencygroup(
+            context, topic, group, request_spec_list=request_spec_list,
+            filter_properties_list=None)
+
+    def create_volume(self, context, topic, volume_id, snapshot_id=None,
+                      image_id=None, request_spec=None, filter_properties=None,
+                      volume=None):
+        return self.manager.create_volume(
+            context, topic, volume_id, snapshot_id=snapshot_id,
+            image_id=image_id, request_spec=request_spec,
+            filter_properties=filter_properties, volume=volume)
+
+    def request_service_capabilities(self, context):
+        return self.manager.request_service_capabilities(context)
+
+    def migrate_volume_to_host(self, context, topic, volume_id, host,
+                               force_host_copy, request_spec,
+                               filter_properties=None, volume=None):
+        return self.manager.migrate_volume_to_host(
+            context, topic, volume_id, host, force_host_copy, request_spec,
+            filter_properties=filter_properties, volume=volume)
+
+    def retype(self, context, topic, volume_id, request_spec,
+               filter_properties=None, volume=None):
+        return self.manager.retype(context, topic, volume_id, request_spec,
+                                   filter_properties=filter_properties,
+                                   volume=volume)
+
+    def manage_existing(self, context, topic, volume_id, request_spec,
+                        filter_properties=None):
+        return self.manager.manage_existing(
+            context, topic, volume_id, request_spec,
+            filter_properties=filter_properties)
+
+    def get_pools(self, context, filters=None):
+        return self.manager.get_pools(context, filters=filters)
index ee76981f9941b7c7f12342a6ac8e8807428d889a..3faf0390fd1b5d0a6bc71ad7c8b36cf9cc81658c 100644 (file)
@@ -44,17 +44,29 @@ class SchedulerAPI(rpc.RPCAPI):
         1.10 - Adds support for sending objects over RPC in retype()
         1.11 - Adds support for sending objects over RPC in
                migrate_volume_to_host()
+
+        ... Mitaka supports messaging 1.11. Any changes to existing methods in
+        1.x after this point should be done so that they can handle version cap
+        set to 1.11.
+
+        2.0 - Remove 1.x compatibility
     """
 
-    RPC_API_VERSION = '1.11'
+    RPC_API_VERSION = '2.0'
     TOPIC = CONF.scheduler_topic
     BINARY = 'cinder-scheduler'
 
+    def _compat_ver(self, current, legacy):
+        if self.client.can_send_version(current):
+            return current
+        else:
+            return legacy
+
     def create_consistencygroup(self, ctxt, topic, group,
                                 request_spec_list=None,
                                 filter_properties_list=None):
-
-        cctxt = self.client.prepare(version='1.8')
+        version = self._compat_ver('2.0', '1.8')
+        cctxt = self.client.prepare(version=version)
         request_spec_p_list = []
         for request_spec in request_spec_list:
             request_spec_p = jsonutils.to_primitive(request_spec)
@@ -69,13 +81,15 @@ class SchedulerAPI(rpc.RPCAPI):
     def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
                       image_id=None, request_spec=None,
                       filter_properties=None, volume=None):
-
         request_spec_p = jsonutils.to_primitive(request_spec)
         msg_args = {'topic': topic, 'volume_id': volume_id,
                     'snapshot_id': snapshot_id, 'image_id': image_id,
                     'request_spec': request_spec_p,
                     'filter_properties': filter_properties}
-        if self.client.can_send_version('1.9'):
+        if self.client.can_send_version('2.0'):
+            version = '2.0'
+            msg_args['volume'] = volume
+        elif self.client.can_send_version('1.9'):
             version = '1.9'
             msg_args['volume'] = volume
         else:
@@ -92,7 +106,10 @@ class SchedulerAPI(rpc.RPCAPI):
                     'host': host, 'force_host_copy': force_host_copy,
                     'request_spec': request_spec_p,
                     'filter_properties': filter_properties}
-        if self.client.can_send_version('1.11'):
+        if self.client.can_send_version('2.0'):
+            version = '2.0'
+            msg_args['volume'] = volume
+        elif self.client.can_send_version('1.11'):
             version = '1.11'
             msg_args['volume'] = volume
         else:
@@ -108,7 +125,10 @@ class SchedulerAPI(rpc.RPCAPI):
         msg_args = {'topic': topic, 'volume_id': volume_id,
                     'request_spec': request_spec_p,
                     'filter_properties': filter_properties}
-        if self.client.can_send_version('1.10'):
+        if self.client.can_send_version('2.0'):
+            version = '2.0'
+            msg_args['volume'] = volume
+        elif self.client.can_send_version('1.10'):
             version = '1.10'
             msg_args['volume'] = volume
         else:
@@ -119,7 +139,8 @@ class SchedulerAPI(rpc.RPCAPI):
 
     def manage_existing(self, ctxt, topic, volume_id,
                         request_spec=None, filter_properties=None):
-        cctxt = self.client.prepare(version='1.5')
+        version = self._compat_ver('2.0', '1.5')
+        cctxt = self.client.prepare(version=version)
         request_spec_p = jsonutils.to_primitive(request_spec)
         return cctxt.cast(ctxt, 'manage_existing',
                           topic=topic,
@@ -128,7 +149,8 @@ class SchedulerAPI(rpc.RPCAPI):
                           filter_properties=filter_properties)
 
     def get_pools(self, ctxt, filters=None):
-        cctxt = self.client.prepare(version='1.7')
+        version = self._compat_ver('2.0', '1.7')
+        cctxt = self.client.prepare(version=version)
         return cctxt.call(ctxt, 'get_pools',
                           filters=filters)
 
@@ -136,7 +158,8 @@ class SchedulerAPI(rpc.RPCAPI):
                                     service_name, host,
                                     capabilities):
         # FIXME(flaper87): What to do with fanout?
-        cctxt = self.client.prepare(fanout=True, version='1.0')
+        version = self._compat_ver('2.0', '1.0')
+        cctxt = self.client.prepare(fanout=True, version=version)
         cctxt.cast(ctxt, 'update_service_capabilities',
                    service_name=service_name, host=host,
                    capabilities=capabilities)
index 370d617bfeb223ac134c1b7a060365198767406e..99a98ecc22245d84a3eb7c9923cf5803659beff7 100644 (file)
@@ -75,7 +75,20 @@ class SchedulerRpcAPITestCase(test.TestCase):
                 for kwarg, value in self.fake_kwargs.items():
                     self.assertEqual(expected_msg[kwarg], value)
 
-    def test_update_service_capabilities(self):
+    @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
+    def test_update_service_capabilities(self, can_send_version):
+        self._test_scheduler_api('update_service_capabilities',
+                                 rpc_method='cast',
+                                 service_name='fake_name',
+                                 host='fake_host',
+                                 capabilities='fake_capabilities',
+                                 fanout=True,
+                                 version='2.0')
+        can_send_version.assert_called_once_with('2.0')
+
+    @mock.patch('oslo_messaging.RPCClient.can_send_version',
+                return_value=False)
+    def test_update_service_capabilities_old(self, can_send_version):
         self._test_scheduler_api('update_service_capabilities',
                                  rpc_method='cast',
                                  service_name='fake_name',
@@ -83,6 +96,7 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  capabilities='fake_capabilities',
                                  fanout=True,
                                  version='1.0')
+        can_send_version.assert_called_once_with('2.0')
 
     @mock.patch('oslo_messaging.RPCClient.can_send_version',
                 return_value=True)
@@ -96,8 +110,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  request_spec='fake_request_spec',
                                  filter_properties='filter_properties',
                                  volume='volume',
-                                 version='1.9')
-        can_send_version.assert_called_once_with('1.9')
+                                 version='2.0')
+        can_send_version.assert_called_once_with('2.0')
 
     @mock.patch('oslo_messaging.RPCClient.can_send_version',
                 return_value=False)
@@ -112,7 +126,7 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  request_spec='fake_request_spec',
                                  filter_properties='filter_properties',
                                  version='1.2')
-        can_send_version.assert_called_once_with('1.9')
+        can_send_version.assert_has_calls([mock.call('2.0'), mock.call('1.9')])
 
     @mock.patch('oslo_messaging.RPCClient.can_send_version',
                 return_value=True)
@@ -126,8 +140,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  request_spec='fake_request_spec',
                                  filter_properties='filter_properties',
                                  volume='volume',
-                                 version='1.11')
-        can_send_version.assert_called_once_with('1.11')
+                                 version='2.0')
+        can_send_version.assert_called_once_with('2.0')
 
     @mock.patch('oslo_messaging.RPCClient.can_send_version',
                 return_value=False)
@@ -142,7 +156,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  filter_properties='filter_properties',
                                  volume='volume',
                                  version='1.3')
-        can_send_version.assert_called_once_with('1.11')
+        can_send_version.assert_has_calls([mock.call('2.0'),
+                                           mock.call('1.11')])
 
     @mock.patch('oslo_messaging.RPCClient.can_send_version',
                 return_value=True)
@@ -154,8 +169,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  request_spec='fake_request_spec',
                                  filter_properties='filter_properties',
                                  volume='volume',
-                                 version='1.10')
-        can_send_version.assert_called_with('1.10')
+                                 version='2.0')
+        can_send_version.assert_called_with('2.0')
 
     @mock.patch('oslo_messaging.RPCClient.can_send_version',
                 return_value=False)
@@ -168,9 +183,23 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  filter_properties='filter_properties',
                                  volume='volume',
                                  version='1.4')
-        can_send_version.assert_called_with('1.10')
+        can_send_version.assert_has_calls([mock.call('2.0'),
+                                           mock.call('1.10')])
+
+    @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
+    def test_manage_existing(self, can_send_version):
+        self._test_scheduler_api('manage_existing',
+                                 rpc_method='cast',
+                                 topic='topic',
+                                 volume_id='volume_id',
+                                 request_spec='fake_request_spec',
+                                 filter_properties='filter_properties',
+                                 version='2.0')
+        can_send_version.assert_called_with('2.0')
 
-    def test_manage_existing(self):
+    @mock.patch('oslo_messaging.RPCClient.can_send_version',
+                return_value=False)
+    def test_manage_existing_old(self, can_send_version):
         self._test_scheduler_api('manage_existing',
                                  rpc_method='cast',
                                  topic='topic',
@@ -178,9 +207,21 @@ class SchedulerRpcAPITestCase(test.TestCase):
                                  request_spec='fake_request_spec',
                                  filter_properties='filter_properties',
                                  version='1.5')
+        can_send_version.assert_called_with('2.0')
 
-    def test_get_pools(self):
+    @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
+    def test_get_pools(self, can_send_version):
+        self._test_scheduler_api('get_pools',
+                                 rpc_method='call',
+                                 filters=None,
+                                 version='2.0')
+        can_send_version.assert_called_with('2.0')
+
+    @mock.patch('oslo_messaging.RPCClient.can_send_version',
+                return_value=False)
+    def test_get_pools_old(self, can_send_version):
         self._test_scheduler_api('get_pools',
                                  rpc_method='call',
                                  filters=None,
                                  version='1.7')
+        can_send_version.assert_called_with('2.0')