Update the LBaaS code to stop using the RpcProxy compatibility class.
The equivalent direct usage of oslo.messaging APIs are now used
instead.
Part of blueprint drop-rpc-compat.
Change-Id: I381394507e4f2daf6d774f70087fef8833c9bab5
# License for the specific language governing permissions and limitations
# under the License.
+from oslo import messaging
+
from neutron.common import rpc as n_rpc
-class LbaasAgentApi(n_rpc.RpcProxy):
+class LbaasAgentApi(object):
"""Agent side of the Agent to Plugin RPC API."""
- API_VERSION = '2.0'
# history
# 1.0 Initial version
# 2.0 Generic API for agent based drivers
# - pool_deployed() and update_status() methods added;
def __init__(self, topic, context, host):
- super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
self.context = context
self.host = host
+ target = messaging.Target(topic=topic, version='2.0')
+ self.client = n_rpc.get_client(target)
def get_ready_devices(self):
- return self.call(
- self.context,
- self.make_msg('get_ready_devices', host=self.host)
- )
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'get_ready_devices', host=self.host)
def pool_destroyed(self, pool_id):
- return self.call(
- self.context,
- self.make_msg('pool_destroyed', pool_id=pool_id)
- )
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'pool_destroyed', pool_id=pool_id)
def pool_deployed(self, pool_id):
- return self.call(
- self.context,
- self.make_msg('pool_deployed', pool_id=pool_id)
- )
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'pool_deployed', pool_id=pool_id)
def get_logical_device(self, pool_id):
- return self.call(
- self.context,
- self.make_msg(
- 'get_logical_device',
- pool_id=pool_id
- )
- )
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'get_logical_device', pool_id=pool_id)
def update_status(self, obj_type, obj_id, status):
- return self.call(
- self.context,
- self.make_msg('update_status', obj_type=obj_type, obj_id=obj_id,
- status=status)
- )
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'update_status', obj_type=obj_type,
+ obj_id=obj_id, status=status)
def plug_vip_port(self, port_id):
- return self.call(
- self.context,
- self.make_msg('plug_vip_port', port_id=port_id, host=self.host)
- )
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'plug_vip_port', port_id=port_id,
+ host=self.host)
def unplug_vip_port(self, port_id):
- return self.call(
- self.context,
- self.make_msg('unplug_vip_port', port_id=port_id, host=self.host)
- )
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'unplug_vip_port', port_id=port_id,
+ host=self.host)
def update_pool_stats(self, pool_id, stats):
- return self.call(
- self.context,
- self.make_msg(
- 'update_pool_stats',
- pool_id=pool_id,
- stats=stats,
- host=self.host
- )
- )
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'update_pool_stats', pool_id=pool_id,
+ stats=stats, host=self.host)
import uuid
from oslo.config import cfg
+from oslo import messaging
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
self.plugin.update_pool_stats(context, pool_id, data=stats)
-class LoadBalancerAgentApi(n_rpc.RpcProxy):
+class LoadBalancerAgentApi(object):
"""Plugin side of plugin to agent RPC API."""
- BASE_RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
# object individually;
def __init__(self, topic):
- super(LoadBalancerAgentApi, self).__init__(
- topic, default_version=self.BASE_RPC_API_VERSION)
-
- def _cast(self, context, method_name, method_args, host, version=None):
- return self.cast(
- context,
- self.make_msg(method_name, **method_args),
- topic='%s.%s' % (self.topic, host),
- version=version
- )
+ target = messaging.Target(topic=topic, version='2.0')
+ self.client = n_rpc.get_client(target)
def create_vip(self, context, vip, host):
- return self._cast(context, 'create_vip', {'vip': vip}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'create_vip', vip=vip)
def update_vip(self, context, old_vip, vip, host):
- return self._cast(context, 'update_vip',
- {'old_vip': old_vip, 'vip': vip}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'update_vip', old_vip=old_vip, vip=vip)
def delete_vip(self, context, vip, host):
- return self._cast(context, 'delete_vip', {'vip': vip}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'delete_vip', vip=vip)
def create_pool(self, context, pool, host, driver_name):
- return self._cast(context, 'create_pool',
- {'pool': pool, 'driver_name': driver_name}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'create_pool', pool=pool, driver_name=driver_name)
def update_pool(self, context, old_pool, pool, host):
- return self._cast(context, 'update_pool',
- {'old_pool': old_pool, 'pool': pool}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'update_pool', old_pool=old_pool, pool=pool)
def delete_pool(self, context, pool, host):
- return self._cast(context, 'delete_pool', {'pool': pool}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'delete_pool', pool=pool)
def create_member(self, context, member, host):
- return self._cast(context, 'create_member', {'member': member}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'create_member', member=member)
def update_member(self, context, old_member, member, host):
- return self._cast(context, 'update_member',
- {'old_member': old_member, 'member': member}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'update_member', old_member=old_member,
+ member=member)
def delete_member(self, context, member, host):
- return self._cast(context, 'delete_member', {'member': member}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'delete_member', member=member)
def create_pool_health_monitor(self, context, health_monitor, pool_id,
host):
- return self._cast(context, 'create_pool_health_monitor',
- {'health_monitor': health_monitor,
- 'pool_id': pool_id}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'create_pool_health_monitor',
+ health_monitor=health_monitor, pool_id=pool_id)
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id, host):
- return self._cast(context, 'update_pool_health_monitor',
- {'old_health_monitor': old_health_monitor,
- 'health_monitor': health_monitor,
- 'pool_id': pool_id}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'update_pool_health_monitor',
+ old_health_monitor=old_health_monitor,
+ health_monitor=health_monitor, pool_id=pool_id)
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
host):
- return self._cast(context, 'delete_pool_health_monitor',
- {'health_monitor': health_monitor,
- 'pool_id': pool_id}, host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'delete_pool_health_monitor',
+ health_monitor=health_monitor, pool_id=pool_id)
def agent_updated(self, context, admin_state_up, host):
- return self._cast(context, 'agent_updated',
- {'payload': {'admin_state_up': admin_state_up}},
- host)
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, 'agent_updated',
+ payload={'admin_state_up': admin_state_up})
class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
+import copy
import mock
from neutron.services.loadbalancer.agent import agent_api as api
super(TestApiCache, self).setUp()
self.api = api.LbaasAgentApi('topic', mock.sentinel.context, 'host')
- self.make_msg = mock.patch.object(self.api, 'make_msg').start()
- self.mock_call = mock.patch.object(self.api, 'call').start()
def test_init(self):
self.assertEqual(self.api.host, 'host')
self.assertEqual(self.api.context, mock.sentinel.context)
- def test_get_ready_devices(self):
- self.assertEqual(
- self.api.get_ready_devices(),
- self.mock_call.return_value
- )
+ def _test_method(self, method, **kwargs):
+ add_host = ('get_ready_devices', 'plug_vip_port', 'unplug_vip_port',
+ 'update_pool_stats')
+ expected_kwargs = copy.copy(kwargs)
+ if method in add_host:
+ expected_kwargs['host'] = self.api.host
- self.make_msg.assert_called_once_with('get_ready_devices', host='host')
- self.mock_call.assert_called_once_with(
- mock.sentinel.context,
- self.make_msg.return_value
- )
+ with contextlib.nested(
+ mock.patch.object(self.api.client, 'call'),
+ mock.patch.object(self.api.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = self.api.client
+ rpc_mock.return_value = 'foo'
+ rv = getattr(self.api, method)(**kwargs)
- def test_get_logical_device(self):
- self.assertEqual(
- self.api.get_logical_device('pool_id'),
- self.mock_call.return_value
- )
+ self.assertEqual(rv, 'foo')
- self.make_msg.assert_called_once_with(
- 'get_logical_device',
- pool_id='pool_id')
+ prepare_args = {}
+ prepare_mock.assert_called_once_with(**prepare_args)
- self.mock_call.assert_called_once_with(
- mock.sentinel.context,
- self.make_msg.return_value
- )
+ rpc_mock.assert_called_once_with(mock.sentinel.context, method,
+ **expected_kwargs)
- def test_pool_destroyed(self):
- self.assertEqual(
- self.api.pool_destroyed('pool_id'),
- self.mock_call.return_value
- )
+ def test_get_ready_devices(self):
+ self._test_method('get_ready_devices')
- self.make_msg.assert_called_once_with(
- 'pool_destroyed',
- pool_id='pool_id')
+ def test_get_logical_device(self):
+ self._test_method('get_logical_device', pool_id='pool_id')
- self.mock_call.assert_called_once_with(
- mock.sentinel.context,
- self.make_msg.return_value
- )
+ def test_pool_destroyed(self):
+ self._test_method('pool_destroyed', pool_id='pool_id')
def test_pool_deployed(self):
- self.assertEqual(
- self.api.pool_deployed('pool_id'),
- self.mock_call.return_value
- )
-
- self.make_msg.assert_called_once_with(
- 'pool_deployed',
- pool_id='pool_id')
-
- self.mock_call.assert_called_once_with(
- mock.sentinel.context,
- self.make_msg.return_value
- )
+ self._test_method('pool_deployed', pool_id='pool_id')
def test_update_status(self):
- self.assertEqual(
- self.api.update_status('pool', 'pool_id', 'ACTIVE'),
- self.mock_call.return_value
- )
-
- self.make_msg.assert_called_once_with(
- 'update_status',
- obj_type='pool',
- obj_id='pool_id',
- status='ACTIVE')
-
- self.mock_call.assert_called_once_with(
- mock.sentinel.context,
- self.make_msg.return_value,
- )
+ self._test_method('update_status', obj_type='type', obj_id='id',
+ status='status')
def test_plug_vip_port(self):
- self.assertEqual(
- self.api.plug_vip_port('port_id'),
- self.mock_call.return_value
- )
-
- self.make_msg.assert_called_once_with(
- 'plug_vip_port',
- port_id='port_id',
- host='host')
-
- self.mock_call.assert_called_once_with(
- mock.sentinel.context,
- self.make_msg.return_value
- )
+ self._test_method('plug_vip_port', port_id='port_id')
def test_unplug_vip_port(self):
- self.assertEqual(
- self.api.unplug_vip_port('port_id'),
- self.mock_call.return_value
- )
-
- self.make_msg.assert_called_once_with(
- 'unplug_vip_port',
- port_id='port_id',
- host='host')
-
- self.mock_call.assert_called_once_with(
- mock.sentinel.context,
- self.make_msg.return_value
- )
+ self._test_method('unplug_vip_port', port_id='port_id')
def test_update_pool_stats(self):
- self.assertEqual(
- self.api.update_pool_stats('pool_id', {'stat': 'stat'}),
- self.mock_call.return_value
- )
-
- self.make_msg.assert_called_once_with(
- 'update_pool_stats',
- pool_id='pool_id',
- stats={'stat': 'stat'},
- host='host')
-
- self.mock_call.assert_called_once_with(
- mock.sentinel.context,
- self.make_msg.return_value
- )
+ self._test_method('update_pool_stats', pool_id='id', stats='stats')
super(TestLoadBalancerAgentApi, self).setUp()
self.api = agent_driver_base.LoadBalancerAgentApi('topic')
- self.mock_cast = mock.patch.object(self.api, 'cast').start()
- self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
def test_init(self):
- self.assertEqual(self.api.topic, 'topic')
+ self.assertEqual(self.api.client.target.topic, 'topic')
def _call_test_helper(self, method_name, method_args):
- rv = getattr(self.api, method_name)(mock.sentinel.context,
- host='host',
- **method_args)
- self.assertEqual(rv, self.mock_cast.return_value)
- self.mock_cast.assert_called_once_with(
- mock.sentinel.context,
- self.mock_msg.return_value,
- topic='topic.host',
- version=None
- )
+ with contextlib.nested(
+ mock.patch.object(self.api.client, 'cast'),
+ mock.patch.object(self.api.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = self.api.client
+ getattr(self.api, method_name)(mock.sentinel.context,
+ host='host',
+ **method_args)
+
+ prepare_args = {'server': 'host'}
+ prepare_mock.assert_called_once_with(**prepare_args)
if method_name == 'agent_updated':
method_args = {'payload': method_args}
- self.mock_msg.assert_called_once_with(
- method_name,
- **method_args
- )
+ rpc_mock.assert_called_once_with(mock.sentinel.context, method_name,
+ **method_args)
def test_agent_updated(self):
self._call_test_helper('agent_updated', {'admin_state_up': 'test'})