# make it a singleton
if not hasattr(cls, '_instance'):
cls._instance = super(ResourcesPullRpcApi, cls).__new__(cls)
+ target = oslo_messaging.Target(
+ topic=topics.PLUGIN, version='1.0',
+ namespace=constants.RPC_NAMESPACE_RESOURCES)
+ cls._instance.client = n_rpc.get_client(target)
return cls._instance
- def __init__(self):
- target = oslo_messaging.Target(
- topic=topics.PLUGIN, version='1.0',
- namespace=constants.RPC_NAMESPACE_RESOURCES)
- self.client = n_rpc.get_client(target)
-
@log_helpers.log_method_call
def pull(self, context, resource_type, resource_id):
_validate_resource_type(resource_type)
def setUp(self):
super(ResourcesPullRpcApiTestCase, self).setUp()
- mock.patch.object(resources_rpc.n_rpc, 'get_client').start()
mock.patch.object(resources_rpc, '_validate_resource_type').start()
mock.patch('neutron.api.rpc.callbacks.resources.get_resource_cls',
return_value=FakeResource).start()
self.rpc = resources_rpc.ResourcesPullRpcApi()
+ mock.patch.object(self.rpc, 'client').start()
self.cctxt_mock = self.rpc.client.prepare.return_value
def test_is_singleton(self):