]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add base support for rpc API versioning.
authorRussell Bryant <rbryant@redhat.com>
Mon, 7 May 2012 18:10:29 +0000 (14:10 -0400)
committerJenkins <jenkins@review.openstack.org>
Wed, 18 Jul 2012 17:27:49 +0000 (17:27 +0000)
Part of blueprint versioned-rpc-apis.

This commit includes the base support for versioned RPC APIs.  It
introduces the RpcProxy and RpcDispatcher classes that have common code
for handling versioning on the client and server sides, respectively.

RPC APIs will be converted one at a time using this infrastructure.

Change-Id: I07bd82e9ff60c356123950e466caaffdfce79eba
Reviewed-on: https://review.openstack.org/9901
Reviewed-by: Huang Zhiteng <zhiteng.huang@intel.com>
Reviewed-by: Vish Ishaya <vishvananda@gmail.com>
Approved: John Griffith <john.griffith@solidfire.com>
Tested-by: Jenkins
cinder/manager.py
cinder/rpc/__init__.py
cinder/rpc/amqp.py
cinder/rpc/common.py
cinder/rpc/dispatcher.py [new file with mode: 0644]
cinder/rpc/impl_fake.py
cinder/rpc/proxy.py [new file with mode: 0644]
cinder/service.py
cinder/tests/rpc/common.py
cinder/tests/rpc/test_dispatcher.py [new file with mode: 0644]
cinder/tests/rpc/test_proxy.py [new file with mode: 0644]

index 9609e3906ea85ef577bdb2e971ebf70a71b8d0e1..bd2e2423e8cccd8eec95d9ed8108f2feeb71c476 100644 (file)
@@ -56,6 +56,7 @@ This module provides Manager, a base class for managers.
 from cinder.db import base
 from cinder import flags
 from cinder import log as logging
+from cinder.rpc import dispatcher as rpc_dispatcher
 from cinder.scheduler import api
 from cinder import version
 
@@ -130,12 +131,23 @@ class ManagerMeta(type):
 class Manager(base.Base):
     __metaclass__ = ManagerMeta
 
+    # Set RPC API version to 1.0 by default.
+    RPC_API_VERSION = '1.0'
+
     def __init__(self, host=None, db_driver=None):
         if not host:
             host = FLAGS.host
         self.host = host
         super(Manager, self).__init__(db_driver)
 
+    def create_rpc_dispatcher(self):
+        '''Get the rpc dispatcher for this manager.
+
+        If a manager would like to set an rpc API version, or support more than
+        one class as the target of rpc messages, override this method.
+        '''
+        return rpc_dispatcher.RpcDispatcher([self])
+
     def periodic_tasks(self, context, raise_on_error=False):
         """Tasks to be run at a periodic interval."""
         for task_name, task in self._periodic_tasks:
index ea36c9c38ca4a190bcbb1f3c9ab4412634b6ef64..becdefa9ed81e7ef3ba8914560bc7139995ad51b 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+"""
+A remote procedure call (rpc) abstraction.
+
+For some wrappers that add message versioning to rpc, see:
+    rpc.dispatcher
+    rpc.proxy
+"""
+
 from cinder.openstack.common import cfg
 from cinder.openstack.common import importutils
 
index 1c1341e431933f20ae50f6b591008378976126ed..c3488037d51e5ba146b92f995c884841f121e781 100644 (file)
@@ -241,23 +241,26 @@ class ProxyCallback(object):
         ctxt = unpack_context(self.conf, message_data)
         method = message_data.get('method')
         args = message_data.get('args', {})
+        version = message_data.get('version', None)
         if not method:
             LOG.warn(_('no method for message: %s') % message_data)
             ctxt.reply(_('No method for message: %s') % message_data,
                        connection_pool=self.connection_pool)
             return
-        self.pool.spawn_n(self._process_data, ctxt, method, args)
+        self.pool.spawn_n(self._process_data, ctxt, version, method, args)
 
-    def _process_data(self, ctxt, method, args):
-        """Thread that magically looks for a method on the proxy
-        object and calls it.
+    def _process_data(self, ctxt, version, method, args):
+        """Process a message in a new thread.
+
+        If the proxy object we have has a dispatch method
+        (see rpc.dispatcher.RpcDispatcher), pass it the version,
+        method, and args and let it dispatch as appropriate.  If not, use
+        the old behavior of magically calling the specified method on the
+        proxy we have here.
         """
         ctxt.update_store()
         try:
-            node_func = getattr(self.proxy, str(method))
-            node_args = dict((str(k), v) for k, v in args.iteritems())
-            # NOTE(vish): magic is fun!
-            rval = node_func(context=ctxt, **node_args)
+            rval = self.proxy.dispatch(ctxt, version, method, **args)
             # Check if the result was a generator
             if inspect.isgenerator(rval):
                 for x in rval:
index 501f69a8a951bbabea3fb637756fa50d4d3046c9..0c370e90a579297e1739b236a827b4dcac7797fb 100644 (file)
@@ -85,6 +85,11 @@ class InvalidRPCConnectionReuse(RPCException):
     message = _("Invalid reuse of an RPC connection.")
 
 
+class UnsupportedRpcVersion(RPCException):
+    message = _("Specified RPC version, %(version)s, not supported by "
+                "this endpoint.")
+
+
 class Connection(object):
     """A connection, returned by rpc.create_connection().
 
diff --git a/cinder/rpc/dispatcher.py b/cinder/rpc/dispatcher.py
new file mode 100644 (file)
index 0000000..95c3ff8
--- /dev/null
@@ -0,0 +1,105 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Code for rpc message dispatching.
+
+Messages that come in have a version number associated with them.  RPC API
+version numbers are in the form:
+
+    Major.Minor
+
+For a given message with version X.Y, the receiver must be marked as able to
+handle messages of version A.B, where:
+
+    A = X
+
+    B >= Y
+
+The Major version number would be incremented for an almost completely new API.
+The Minor version number would be incremented for backwards compatible changes
+to an existing API.  A backwards compatible change could be something like
+adding a new method, adding an argument to an existing method (but not
+requiring it), or changing the type for an existing argument (but still
+handling the old type as well).
+
+The conversion over to a versioned API must be done on both the client side and
+server side of the API at the same time.  However, as the code stands today,
+there can be both versioned and unversioned APIs implemented in the same code
+base.
+"""
+
+from cinder.rpc import common as rpc_common
+
+
+class RpcDispatcher(object):
+    """Dispatch rpc messages according to the requested API version.
+
+    This class can be used as the top level 'manager' for a service.  It
+    contains a list of underlying managers that have an API_VERSION attribute.
+    """
+
+    def __init__(self, callbacks):
+        """Initialize the rpc dispatcher.
+
+        :param callbacks: List of proxy objects that are an instance
+                          of a class with rpc methods exposed.  Each proxy
+                          object should have an RPC_API_VERSION attribute.
+        """
+        self.callbacks = callbacks
+        super(RpcDispatcher, self).__init__()
+
+    @staticmethod
+    def _is_compatible(mversion, version):
+        """Determine whether versions are compatible.
+
+        :param mversion: The API version implemented by a callback.
+        :param version: The API version requested by an incoming message.
+        """
+        version_parts = version.split('.')
+        mversion_parts = mversion.split('.')
+        if int(version_parts[0]) != int(mversion_parts[0]):  # Major
+            return False
+        if int(version_parts[1]) > int(mversion_parts[1]):  # Minor
+            return False
+        return True
+
+    def dispatch(self, ctxt, version, method, **kwargs):
+        """Dispatch a message based on a requested version.
+
+        :param ctxt: The request context
+        :param version: The requested API version from the incoming message
+        :param method: The method requested to be called by the incoming
+                       message.
+        :param kwargs: A dict of keyword arguments to be passed to the method.
+
+        :returns: Whatever is returned by the underlying method that gets
+                  called.
+        """
+        if not version:
+            version = '1.0'
+
+        for proxyobj in self.callbacks:
+            if hasattr(proxyobj, 'RPC_API_VERSION'):
+                rpc_api_version = proxyobj.RPC_API_VERSION
+            else:
+                rpc_api_version = '1.0'
+            if not hasattr(proxyobj, method):
+                continue
+            if self._is_compatible(rpc_api_version, version):
+                return getattr(proxyobj, method)(ctxt, **kwargs)
+
+        raise rpc_common.UnsupportedRpcVersion(version=version)
index dc3b00ecaa5cc62e68ce6c557678e91149a59a5f..d373f9c40fa32cb7e321a318f21894a7e6c0925f 100644 (file)
@@ -50,15 +50,13 @@ class Consumer(object):
         self.topic = topic
         self.proxy = proxy
 
-    def call(self, context, method, args, timeout):
-        node_func = getattr(self.proxy, method)
-        node_args = dict((str(k), v) for k, v in args.iteritems())
+    def call(self, context, version, method, args, timeout):
         done = eventlet.event.Event()
 
         def _inner():
             ctxt = RpcContext.from_dict(context.to_dict())
             try:
-                rval = node_func(context=ctxt, **node_args)
+                rval = self.proxy.dispatch(context, version, method, **args)
                 res = []
                 # Caller might have called ctxt.reply() manually
                 for (reply, failure) in ctxt._response:
@@ -132,13 +130,14 @@ def multicall(conf, context, topic, msg, timeout=None):
     if not method:
         return
     args = msg.get('args', {})
+    version = msg.get('version', None)
 
     try:
         consumer = CONSUMERS[topic][0]
     except (KeyError, IndexError):
         return iter([None])
     else:
-        return consumer.call(context, method, args, timeout)
+        return consumer.call(context, version, method, args, timeout)
 
 
 def call(conf, context, topic, msg, timeout=None):
diff --git a/cinder/rpc/proxy.py b/cinder/rpc/proxy.py
new file mode 100644 (file)
index 0000000..c9f8fa6
--- /dev/null
@@ -0,0 +1,161 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+A helper class for proxy objects to remote APIs.
+
+For more information about rpc API version numbers, see:
+    rpc/dispatcher.py
+"""
+
+
+from cinder import rpc
+
+
+class RpcProxy(object):
+    """A helper class for rpc clients.
+
+    This class is a wrapper around the RPC client API.  It allows you to
+    specify the topic and API version in a single place.  This is intended to
+    be used as a base class for a class that implements the client side of an
+    rpc API.
+    """
+
+    def __init__(self, topic, default_version):
+        """Initialize an RpcProxy.
+
+        :param topic: The topic to use for all messages.
+        :param default_version: The default API version to request in all
+               outgoing messages.  This can be overridden on a per-message
+               basis.
+        """
+        self.topic = topic
+        self.default_version = default_version
+        super(RpcProxy, self).__init__()
+
+    def _set_version(self, msg, vers):
+        """Helper method to set the version in a message.
+
+        :param msg: The message having a version added to it.
+        :param vers: The version number to add to the message.
+        """
+        msg['version'] = vers if vers else self.default_version
+
+    def _get_topic(self, topic):
+        """Return the topic to use for a message."""
+        return topic if topic else self.topic
+
+    @staticmethod
+    def make_msg(method, **kwargs):
+        return {'method': method, 'args': kwargs}
+
+    def call(self, context, msg, topic=None, version=None, timeout=None):
+        """rpc.call() a remote method.
+
+        :param context: The request context
+        :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
+        :param timeout: (Optional) A timeout to use when waiting for the
+               response.  If no timeout is specified, a default timeout will be
+               used that is usually sufficient.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: The return value from the remote method.
+        """
+        self._set_version(msg, version)
+        return rpc.call(context, self._get_topic(topic), msg, timeout)
+
+    def multicall(self, context, msg, topic=None, version=None, timeout=None):
+        """rpc.multicall() a remote method.
+
+        :param context: The request context
+        :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
+        :param timeout: (Optional) A timeout to use when waiting for the
+               response.  If no timeout is specified, a default timeout will be
+               used that is usually sufficient.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: An iterator that lets you process each of the returned values
+                  from the remote method as they arrive.
+        """
+        self._set_version(msg, version)
+        return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+
+    def cast(self, context, msg, topic=None, version=None):
+        """rpc.cast() a remote method.
+
+        :param context: The request context
+        :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: None.  rpc.cast() does not wait on any return value from the
+                  remote method.
+        """
+        self._set_version(msg, version)
+        rpc.cast(context, self._get_topic(topic), msg)
+
+    def fanout_cast(self, context, msg, version=None):
+        """rpc.fanout_cast() a remote method.
+
+        :param context: The request context
+        :param msg: The message to send, including the method and args.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: None.  rpc.fanout_cast() does not wait on any return value
+                  from the remote method.
+        """
+        self._set_version(msg, version)
+        rpc.fanout_cast(context, self.topic, msg)
+
+    def cast_to_server(self, context, server_params, msg, topic=None,
+            version=None):
+        """rpc.cast_to_server() a remote method.
+
+        :param context: The request context
+        :param server_params: Server parameters.  See rpc.cast_to_server() for
+               details.
+        :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: None.  rpc.cast_to_server() does not wait on any
+                  return values.
+        """
+        self._set_version(msg, version)
+        rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
+
+    def fanout_cast_to_server(self, context, server_params, msg, version=None):
+        """rpc.fanout_cast_to_server() a remote method.
+
+        :param context: The request context
+        :param server_params: Server parameters.  See rpc.cast_to_server() for
+               details.
+        :param msg: The message to send, including the method and args.
+        :param version: (Optional) Override the requested API version in this
+               message.
+
+        :returns: None.  rpc.fanout_cast_to_server() does not wait on any
+                  return values.
+        """
+        self._set_version(msg, version)
+        rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
index 3c90b7c4d78b31d2222add1d1f6ade36fc27c0d3..4f6cd54b3292ed6da2a294d136b03b28016e2705 100644 (file)
@@ -175,13 +175,15 @@ class Service(object):
         LOG.debug(_("Creating Consumer connection for Service %s") %
                   self.topic)
 
+        rpc_dispatcher = self.manager.create_rpc_dispatcher()
+
         # Share this same connection for these Consumers
-        self.conn.create_consumer(self.topic, self, fanout=False)
+        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
 
         node_topic = '%s.%s' % (self.topic, self.host)
-        self.conn.create_consumer(node_topic, self, fanout=False)
+        self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
 
-        self.conn.create_consumer(self.topic, self, fanout=True)
+        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
 
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
index 37b315ba4d75da0164824d3eebf9dfceb2e81373..da7b946ff7b145dceeefd6c7bbe703690b2bf48a 100644 (file)
@@ -30,6 +30,7 @@ from cinder import flags
 from cinder import log as logging
 from cinder.rpc import amqp as rpc_amqp
 from cinder.rpc import common as rpc_common
+from cinder.rpc import dispatcher as rpc_dispatcher
 from cinder import test
 
 
@@ -44,8 +45,9 @@ class BaseRpcTestCase(test.TestCase):
         self.context = context.get_admin_context()
         if self.rpc:
             self.conn = self.rpc.create_connection(FLAGS, True)
-            self.receiver = TestReceiver()
-            self.conn.create_consumer('test', self.receiver, False)
+            receiver = TestReceiver()
+            self.dispatcher = rpc_dispatcher.RpcDispatcher([receiver])
+            self.conn.create_consumer('test', self.dispatcher, False)
             self.conn.consume_in_thread()
 
     def tearDown(self):
@@ -145,8 +147,9 @@ class BaseRpcTestCase(test.TestCase):
                 return value
 
         nested = Nested()
+        dispatcher = rpc_dispatcher.RpcDispatcher([nested])
         conn = self.rpc.create_connection(FLAGS, True)
-        conn.create_consumer('nested', nested, False)
+        conn.create_consumer('nested', dispatcher, False)
         conn.consume_in_thread()
         value = 42
         result = self.rpc.call(FLAGS, self.context,
@@ -228,7 +231,6 @@ class TestReceiver(object):
     Uses static methods because we aren't actually storing any state.
 
     """
-
     @staticmethod
     def echo(context, value):
         """Simply returns whatever value is sent in."""
diff --git a/cinder/tests/rpc/test_dispatcher.py b/cinder/tests/rpc/test_dispatcher.py
new file mode 100644 (file)
index 0000000..7a688c2
--- /dev/null
@@ -0,0 +1,109 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Unit Tests for rpc.dispatcher
+"""
+
+from cinder import context
+from cinder.rpc import dispatcher
+from cinder.rpc import common as rpc_common
+from cinder import test
+
+
+class RpcDispatcherTestCase(test.TestCase):
+    class API1(object):
+        RPC_API_VERSION = '1.0'
+
+        def __init__(self):
+            self.test_method_ctxt = None
+            self.test_method_arg1 = None
+
+        def test_method(self, ctxt, arg1):
+            self.test_method_ctxt = ctxt
+            self.test_method_arg1 = arg1
+
+    class API2(object):
+        RPC_API_VERSION = '2.1'
+
+        def __init__(self):
+            self.test_method_ctxt = None
+            self.test_method_arg1 = None
+
+        def test_method(self, ctxt, arg1):
+            self.test_method_ctxt = ctxt
+            self.test_method_arg1 = arg1
+
+    class API3(object):
+        RPC_API_VERSION = '3.5'
+
+        def __init__(self):
+            self.test_method_ctxt = None
+            self.test_method_arg1 = None
+
+        def test_method(self, ctxt, arg1):
+            self.test_method_ctxt = ctxt
+            self.test_method_arg1 = arg1
+
+    def setUp(self):
+        self.ctxt = context.RequestContext('fake_user', 'fake_project')
+        super(RpcDispatcherTestCase, self).setUp()
+
+    def tearDown(self):
+        super(RpcDispatcherTestCase, self).tearDown()
+
+    def _test_dispatch(self, version, expectations):
+        v2 = self.API2()
+        v3 = self.API3()
+        disp = dispatcher.RpcDispatcher([v2, v3])
+
+        disp.dispatch(self.ctxt, version, 'test_method', arg1=1)
+
+        self.assertEqual(v2.test_method_ctxt, expectations[0])
+        self.assertEqual(v2.test_method_arg1, expectations[1])
+        self.assertEqual(v3.test_method_ctxt, expectations[2])
+        self.assertEqual(v3.test_method_arg1, expectations[3])
+
+    def test_dispatch(self):
+        self._test_dispatch('2.1', (self.ctxt, 1, None, None))
+        self._test_dispatch('3.5', (None, None, self.ctxt, 1))
+
+    def test_dispatch_lower_minor_version(self):
+        self._test_dispatch('2.0', (self.ctxt, 1, None, None))
+        self._test_dispatch('3.1', (None, None, self.ctxt, 1))
+
+    def test_dispatch_higher_minor_version(self):
+        self.assertRaises(rpc_common.UnsupportedRpcVersion,
+                self._test_dispatch, '2.6', (None, None, None, None))
+        self.assertRaises(rpc_common.UnsupportedRpcVersion,
+                self._test_dispatch, '3.6', (None, None, None, None))
+
+    def test_dispatch_lower_major_version(self):
+        self.assertRaises(rpc_common.UnsupportedRpcVersion,
+                self._test_dispatch, '1.0', (None, None, None, None))
+
+    def test_dispatch_higher_major_version(self):
+        self.assertRaises(rpc_common.UnsupportedRpcVersion,
+                self._test_dispatch, '4.0', (None, None, None, None))
+
+    def test_dispatch_no_version_uses_v1(self):
+        v1 = self.API1()
+        disp = dispatcher.RpcDispatcher([v1])
+
+        disp.dispatch(self.ctxt, None, 'test_method', arg1=1)
+
+        self.assertEqual(v1.test_method_ctxt, self.ctxt)
+        self.assertEqual(v1.test_method_arg1, 1)
diff --git a/cinder/tests/rpc/test_proxy.py b/cinder/tests/rpc/test_proxy.py
new file mode 100644 (file)
index 0000000..6a29345
--- /dev/null
@@ -0,0 +1,124 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Unit Tests for rpc.proxy
+"""
+
+import copy
+
+from cinder import context
+from cinder import rpc
+from cinder.rpc import proxy
+from cinder import test
+
+
+class RpcProxyTestCase(test.TestCase):
+
+    def setUp(self):
+        super(RpcProxyTestCase, self).setUp()
+
+    def tearDown(self):
+        super(RpcProxyTestCase, self).tearDown()
+
+    def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
+            server_params=None, supports_topic_override=True):
+        topic = 'fake_topic'
+        timeout = 123
+        rpc_proxy = proxy.RpcProxy(topic, '1.0')
+        ctxt = context.RequestContext('fake_user', 'fake_project')
+        msg = {'method': 'fake_method', 'args': {'x': 'y'}}
+        expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
+                'version': '1.0'}
+
+        expected_retval = 'hi' if has_retval else None
+
+        self.fake_args = None
+        self.fake_kwargs = None
+
+        def _fake_rpc_method(*args, **kwargs):
+            self.fake_args = args
+            self.fake_kwargs = kwargs
+            if has_retval:
+                return expected_retval
+
+        self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+        args = [ctxt, msg]
+        if server_params:
+            args.insert(1, server_params)
+
+        # Base method usage
+        retval = getattr(rpc_proxy, rpc_method)(*args)
+        self.assertEqual(retval, expected_retval)
+        expected_args = [ctxt, topic, expected_msg]
+        if server_params:
+            expected_args.insert(1, server_params)
+        for arg, expected_arg in zip(self.fake_args, expected_args):
+            self.assertEqual(arg, expected_arg)
+
+        # overriding the version
+        retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1')
+        self.assertEqual(retval, expected_retval)
+        new_msg = copy.deepcopy(expected_msg)
+        new_msg['version'] = '1.1'
+        expected_args = [ctxt, topic, new_msg]
+        if server_params:
+            expected_args.insert(1, server_params)
+        for arg, expected_arg in zip(self.fake_args, expected_args):
+            self.assertEqual(arg, expected_arg)
+
+        if has_timeout:
+            # set a timeout
+            retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout)
+            self.assertEqual(retval, expected_retval)
+            expected_args = [ctxt, topic, expected_msg, timeout]
+            for arg, expected_arg in zip(self.fake_args, expected_args):
+                self.assertEqual(arg, expected_arg)
+
+        if supports_topic_override:
+            # set a topic
+            new_topic = 'foo.bar'
+            retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic)
+            self.assertEqual(retval, expected_retval)
+            expected_args = [ctxt, new_topic, expected_msg]
+            if server_params:
+                expected_args.insert(1, server_params)
+            for arg, expected_arg in zip(self.fake_args, expected_args):
+                self.assertEqual(arg, expected_arg)
+
+    def test_call(self):
+        self._test_rpc_method('call', has_timeout=True, has_retval=True)
+
+    def test_multicall(self):
+        self._test_rpc_method('multicall', has_timeout=True, has_retval=True)
+
+    def test_cast(self):
+        self._test_rpc_method('cast')
+
+    def test_fanout_cast(self):
+        self._test_rpc_method('fanout_cast', supports_topic_override=False)
+
+    def test_cast_to_server(self):
+        self._test_rpc_method('cast_to_server', server_params={'blah': 1})
+
+    def test_fanout_cast_to_server(self):
+        self._test_rpc_method('fanout_cast_to_server',
+                server_params={'blah': 1}, supports_topic_override=False)
+
+    def test_make_msg(self):
+        self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
+                         {'method': 'test_method', 'args': {'a': 1, 'b': 2}})