From: Russell Bryant Date: Mon, 1 Dec 2014 18:42:30 +0000 (+0000) Subject: Drop RpcProxy usage from ml2 AgentNotifierApi X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=1bc911ca8a487ad82e0422be8f74ce510afa5f8c;p=openstack-build%2Fneutron-build.git Drop RpcProxy usage from ml2 AgentNotifierApi Remove usage of the RpcProxy compatibility class from the ml2 AgentNotifierApi. The equivalent oslo.messaging APIs are now used instead. A couple of other mixin APIs had to be converted at the same time. Note that there is one very minor functional change here. The base rpc version is set to '1.0' now instead of '1.1'. The right pattern to use is to always set the base to be N.0. Any method that needs a newer version should specify it. Part of blueprint drop-rpc-compat. Change-Id: I640568e2d73c9eb7a9505db640dc1427a1ae2abe --- diff --git a/neutron/api/rpc/handlers/dvr_rpc.py b/neutron/api/rpc/handlers/dvr_rpc.py index ba648bb0a..43a5d68ac 100644 --- a/neutron/api/rpc/handlers/dvr_rpc.py +++ b/neutron/api/rpc/handlers/dvr_rpc.py @@ -98,11 +98,9 @@ class DVRAgentRpcApiMixin(object): """Notify dvr mac address updates.""" if not dvr_macs: return - self.fanout_cast(context, - self.make_msg('dvr_mac_address_update', - dvr_macs=dvr_macs), - version=self.DVR_RPC_VERSION, - topic=self._get_dvr_update_topic()) + cctxt = self.client.prepare(topic=self._get_dvr_update_topic(), + version=self.DVR_RPC_VERSION, fanout=True) + cctxt.cast(context, 'dvr_mac_address_update', dvr_macs=dvr_macs) class DVRAgentRpcCallbackMixin(object): diff --git a/neutron/plugins/ml2/drivers/type_tunnel.py b/neutron/plugins/ml2/drivers/type_tunnel.py index b6f34eaa5..b0bad1783 100644 --- a/neutron/plugins/ml2/drivers/type_tunnel.py +++ b/neutron/plugins/ml2/drivers/type_tunnel.py @@ -187,8 +187,7 @@ class TunnelAgentRpcApiMixin(object): topics.UPDATE) def tunnel_update(self, context, tunnel_ip, tunnel_type): - self.fanout_cast(context, - self.make_msg('tunnel_update', - tunnel_ip=tunnel_ip, - tunnel_type=tunnel_type), - topic=self._get_tunnel_update_topic()) + cctxt = self.client.prepare(topic=self._get_tunnel_update_topic(), + fanout=True) + cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip, + tunnel_type=tunnel_type) diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 72fc955d1..d51d6d23b 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo import messaging + from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.handlers import dvr_rpc from neutron.common import constants as q_const @@ -172,8 +174,7 @@ class RpcCallbacks(n_rpc.RpcCallback, LOG.debug('Port %s not found during ARP update', port_id) -class AgentNotifierApi(n_rpc.RpcProxy, - dvr_rpc.DVRAgentRpcApiMixin, +class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, type_tunnel.TunnelAgentRpcApiMixin): """Agent side of the openvswitch rpc API. @@ -185,30 +186,26 @@ class AgentNotifierApi(n_rpc.RpcProxy, """ - BASE_RPC_API_VERSION = '1.1' - def __init__(self, topic): - super(AgentNotifierApi, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) + self.topic = topic self.topic_network_delete = topics.get_topic_name(topic, topics.NETWORK, topics.DELETE) self.topic_port_update = topics.get_topic_name(topic, topics.PORT, topics.UPDATE) + target = messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) def network_delete(self, context, network_id): - self.fanout_cast(context, - self.make_msg('network_delete', - network_id=network_id), - topic=self.topic_network_delete) + cctxt = self.client.prepare(topic=self.topic_network_delete, + fanout=True) + cctxt.cast(context, 'network_delete', network_id=network_id) def port_update(self, context, port, network_type, segmentation_id, physical_network): - self.fanout_cast(context, - self.make_msg('port_update', - port=port, - network_type=network_type, - segmentation_id=segmentation_id, - physical_network=physical_network), - topic=self.topic_port_update) + cctxt = self.client.prepare(topic=self.topic_port_update, + fanout=True) + cctxt.cast(context, 'port_update', port=port, + network_type=network_type, segmentation_id=segmentation_id, + physical_network=physical_network) diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index a05d1dabf..f4a5fb17a 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -25,7 +25,6 @@ import mock from neutron.agent import rpc as agent_rpc from neutron.common import constants from neutron.common import exceptions -from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.openstack.common import context from neutron.plugins.ml2.drivers import type_tunnel @@ -166,35 +165,11 @@ class RpcCallbacksTestCase(base.BaseTestCase): class RpcApiTestCase(base.BaseTestCase): - def _test_rpc_api_legacy(self, rpcapi, topic, method, rpc_method, - **kwargs): - # NOTE(russellb) This can be removed once AgentNotifierApi has been - # converted over to no longer use the RpcProxy compatibility class. - ctxt = context.RequestContext('fake_user', 'fake_project') - expected_retval = 'foo' if rpc_method == 'call' else None - expected_version = kwargs.pop('version', None) - expected_msg = rpcapi.make_msg(method, **kwargs) - - rpc = n_rpc.RpcProxy - with mock.patch.object(rpc, rpc_method) as rpc_method_mock: - rpc_method_mock.return_value = expected_retval - retval = getattr(rpcapi, method)(ctxt, **kwargs) - - self.assertEqual(retval, expected_retval) - additional_args = {} - if topic: - additional_args['topic'] = topic - if expected_version: - additional_args['version'] = expected_version - expected = [ - mock.call(ctxt, expected_msg, **additional_args) - ] - rpc_method_mock.assert_has_calls(expected) - def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if rpc_method == 'call' else None expected_version = kwargs.pop('version', None) + fanout = kwargs.pop('fanout', False) with contextlib.nested( mock.patch.object(rpcapi.client, rpc_method), @@ -209,6 +184,10 @@ class RpcApiTestCase(base.BaseTestCase): prepare_args = {} if expected_version: prepare_args['version'] = expected_version + if fanout: + prepare_args['fanout'] = fanout + if topic: + prepare_args['topic'] = topic prepare_mock.assert_called_once_with(**prepare_args) self.assertEqual(retval, expected_retval) @@ -216,35 +195,36 @@ class RpcApiTestCase(base.BaseTestCase): def test_delete_network(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) - self._test_rpc_api_legacy( + self._test_rpc_api( rpcapi, topics.get_topic_name(topics.AGENT, topics.NETWORK, topics.DELETE), - 'network_delete', rpc_method='fanout_cast', - network_id='fake_request_spec') + 'network_delete', rpc_method='cast', + fanout=True, network_id='fake_request_spec') def test_port_update(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) - self._test_rpc_api_legacy( + self._test_rpc_api( rpcapi, topics.get_topic_name(topics.AGENT, topics.PORT, topics.UPDATE), - 'port_update', rpc_method='fanout_cast', - port='fake_port', + 'port_update', rpc_method='cast', + fanout=True, port='fake_port', network_type='fake_network_type', segmentation_id='fake_segmentation_id', physical_network='fake_physical_network') def test_tunnel_update(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) - self._test_rpc_api_legacy( + self._test_rpc_api( rpcapi, topics.get_topic_name(topics.AGENT, type_tunnel.TUNNEL, topics.UPDATE), - 'tunnel_update', rpc_method='fanout_cast', + 'tunnel_update', rpc_method='cast', + fanout=True, tunnel_ip='fake_ip', tunnel_type='gre') def test_device_details(self):