from oslo.messaging import serializer as om_serializer
from neutron.common import exceptions
-from neutron.common import log
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import service
load_admin_roles=False, **rpc_ctxt_dict)
-class RpcProxy(object):
- '''
- This class is created to facilitate migration from oslo-incubator
- RPC layer implementation to oslo.messaging and is intended to
- emulate RpcProxy class behaviour using oslo.messaging API once the
- migration is applied.
- '''
- RPC_API_NAMESPACE = None
-
- def __init__(self, topic, default_version, version_cap=None):
- super(RpcProxy, self).__init__()
- self.topic = topic
- target = messaging.Target(topic=topic, version=default_version)
- self._client = get_client(target, version_cap=version_cap)
-
- def make_msg(self, method, **kwargs):
- return {'method': method,
- 'namespace': self.RPC_API_NAMESPACE,
- 'args': kwargs}
-
- @log.log
- def call(self, context, msg, **kwargs):
- return self.__call_rpc_method(
- context, msg, rpc_method='call', **kwargs)
-
- @log.log
- def cast(self, context, msg, **kwargs):
- self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
-
- @log.log
- def fanout_cast(self, context, msg, **kwargs):
- kwargs['fanout'] = True
- self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
-
- def __call_rpc_method(self, context, msg, **kwargs):
- options = dict(
- ((opt, kwargs[opt])
- for opt in ('fanout', 'timeout', 'topic', 'version')
- if kwargs.get(opt))
- )
- if msg['namespace']:
- options['namespace'] = msg['namespace']
-
- if options:
- callee = self._client.prepare(**options)
- else:
- callee = self._client
-
- func = getattr(callee, kwargs['rpc_method'])
- return func(context, msg['method'], **msg['args'])
-
-
class RpcCallback(object):
'''
This class is created to facilitate migration from oslo-incubator
import sys
import fixtures
-import mock
from oslo.config import cfg
from oslo.messaging import conffixture as messaging_conffixture
'neutron.common.rpc.Connection.consume_in_threads',
fake_consume_in_threads))
- # immediately return RPC calls
- self.useFixture(fixtures.MonkeyPatch(
- 'neutron.common.rpc.RpcProxy._RpcProxy__call_rpc_method',
- mock.MagicMock()))
-
self.useFixture(fixtures.MonkeyPatch(
'oslo.messaging.Notifier', fake_notifier.FakeNotifier))