]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Drop RpcProxy usage from ml2 AgentNotifierApi
authorRussell Bryant <rbryant@redhat.com>
Mon, 1 Dec 2014 18:42:30 +0000 (18:42 +0000)
committerRussell Bryant <rbryant@redhat.com>
Mon, 1 Dec 2014 18:42:30 +0000 (18:42 +0000)
Remove usage of the RpcProxy compatibility class from the ml2
AgentNotifierApi.  The equivalent oslo.messaging APIs are now used
instead.  A couple of other mixin APIs had to be converted at the same
time.

Note that there is one very minor functional change here.  The base
rpc version is set to '1.0' now instead of '1.1'.  The right pattern
to use is to always set the base to be N.0.  Any method that needs a
newer version should specify it.

Part of blueprint drop-rpc-compat.

Change-Id: I640568e2d73c9eb7a9505db640dc1427a1ae2abe

neutron/api/rpc/handlers/dvr_rpc.py
neutron/plugins/ml2/drivers/type_tunnel.py
neutron/plugins/ml2/rpc.py
neutron/tests/unit/ml2/test_rpcapi.py

index ba648bb0ae1dbb26b38bea3961322265c5b07611..43a5d68acbcea520d602093a864b9c549874f8b2 100644 (file)
@@ -98,11 +98,9 @@ class DVRAgentRpcApiMixin(object):
         """Notify dvr mac address updates."""
         if not dvr_macs:
             return
-        self.fanout_cast(context,
-                         self.make_msg('dvr_mac_address_update',
-                                       dvr_macs=dvr_macs),
-                         version=self.DVR_RPC_VERSION,
-                         topic=self._get_dvr_update_topic())
+        cctxt = self.client.prepare(topic=self._get_dvr_update_topic(),
+                                    version=self.DVR_RPC_VERSION, fanout=True)
+        cctxt.cast(context, 'dvr_mac_address_update', dvr_macs=dvr_macs)
 
 
 class DVRAgentRpcCallbackMixin(object):
index b6f34eaa58ed9892ae2dfb70e5410b55e946396c..b0bad17834c719465b093cab89c3166f06de484e 100644 (file)
@@ -187,8 +187,7 @@ class TunnelAgentRpcApiMixin(object):
                                      topics.UPDATE)
 
     def tunnel_update(self, context, tunnel_ip, tunnel_type):
-        self.fanout_cast(context,
-                         self.make_msg('tunnel_update',
-                                       tunnel_ip=tunnel_ip,
-                                       tunnel_type=tunnel_type),
-                         topic=self._get_tunnel_update_topic())
+        cctxt = self.client.prepare(topic=self._get_tunnel_update_topic(),
+                                    fanout=True)
+        cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
+                   tunnel_type=tunnel_type)
index 72fc955d17fba3a7abacd926802f8c8517db0f4c..d51d6d23b06173868db496b011154dae67193602 100644 (file)
@@ -13,6 +13,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from oslo import messaging
+
 from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.api.rpc.handlers import dvr_rpc
 from neutron.common import constants as q_const
@@ -172,8 +174,7 @@ class RpcCallbacks(n_rpc.RpcCallback,
                 LOG.debug('Port %s not found during ARP update', port_id)
 
 
-class AgentNotifierApi(n_rpc.RpcProxy,
-                       dvr_rpc.DVRAgentRpcApiMixin,
+class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
                        sg_rpc.SecurityGroupAgentRpcApiMixin,
                        type_tunnel.TunnelAgentRpcApiMixin):
     """Agent side of the openvswitch rpc API.
@@ -185,30 +186,26 @@ class AgentNotifierApi(n_rpc.RpcProxy,
 
     """
 
-    BASE_RPC_API_VERSION = '1.1'
-
     def __init__(self, topic):
-        super(AgentNotifierApi, self).__init__(
-            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+        self.topic = topic
         self.topic_network_delete = topics.get_topic_name(topic,
                                                           topics.NETWORK,
                                                           topics.DELETE)
         self.topic_port_update = topics.get_topic_name(topic,
                                                        topics.PORT,
                                                        topics.UPDATE)
+        target = messaging.Target(topic=topic, version='1.0')
+        self.client = n_rpc.get_client(target)
 
     def network_delete(self, context, network_id):
-        self.fanout_cast(context,
-                         self.make_msg('network_delete',
-                                       network_id=network_id),
-                         topic=self.topic_network_delete)
+        cctxt = self.client.prepare(topic=self.topic_network_delete,
+                                    fanout=True)
+        cctxt.cast(context, 'network_delete', network_id=network_id)
 
     def port_update(self, context, port, network_type, segmentation_id,
                     physical_network):
-        self.fanout_cast(context,
-                         self.make_msg('port_update',
-                                       port=port,
-                                       network_type=network_type,
-                                       segmentation_id=segmentation_id,
-                                       physical_network=physical_network),
-                         topic=self.topic_port_update)
+        cctxt = self.client.prepare(topic=self.topic_port_update,
+                                    fanout=True)
+        cctxt.cast(context, 'port_update', port=port,
+                   network_type=network_type, segmentation_id=segmentation_id,
+                   physical_network=physical_network)
index a05d1dabf46768966b36ab426f65382aec3e9467..f4a5fb17ab5d23e3a7c31e294d6f01b73a7cdfad 100644 (file)
@@ -25,7 +25,6 @@ import mock
 from neutron.agent import rpc as agent_rpc
 from neutron.common import constants
 from neutron.common import exceptions
-from neutron.common import rpc as n_rpc
 from neutron.common import topics
 from neutron.openstack.common import context
 from neutron.plugins.ml2.drivers import type_tunnel
@@ -166,35 +165,11 @@ class RpcCallbacksTestCase(base.BaseTestCase):
 
 class RpcApiTestCase(base.BaseTestCase):
 
-    def _test_rpc_api_legacy(self, rpcapi, topic, method, rpc_method,
-                             **kwargs):
-        # NOTE(russellb) This can be removed once AgentNotifierApi has been
-        # converted over to no longer use the RpcProxy compatibility class.
-        ctxt = context.RequestContext('fake_user', 'fake_project')
-        expected_retval = 'foo' if rpc_method == 'call' else None
-        expected_version = kwargs.pop('version', None)
-        expected_msg = rpcapi.make_msg(method, **kwargs)
-
-        rpc = n_rpc.RpcProxy
-        with mock.patch.object(rpc, rpc_method) as rpc_method_mock:
-            rpc_method_mock.return_value = expected_retval
-            retval = getattr(rpcapi, method)(ctxt, **kwargs)
-
-        self.assertEqual(retval, expected_retval)
-        additional_args = {}
-        if topic:
-            additional_args['topic'] = topic
-        if expected_version:
-            additional_args['version'] = expected_version
-        expected = [
-            mock.call(ctxt, expected_msg, **additional_args)
-        ]
-        rpc_method_mock.assert_has_calls(expected)
-
     def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs):
         ctxt = context.RequestContext('fake_user', 'fake_project')
         expected_retval = 'foo' if rpc_method == 'call' else None
         expected_version = kwargs.pop('version', None)
+        fanout = kwargs.pop('fanout', False)
 
         with contextlib.nested(
             mock.patch.object(rpcapi.client, rpc_method),
@@ -209,6 +184,10 @@ class RpcApiTestCase(base.BaseTestCase):
         prepare_args = {}
         if expected_version:
             prepare_args['version'] = expected_version
+        if fanout:
+            prepare_args['fanout'] = fanout
+        if topic:
+            prepare_args['topic'] = topic
         prepare_mock.assert_called_once_with(**prepare_args)
 
         self.assertEqual(retval, expected_retval)
@@ -216,35 +195,36 @@ class RpcApiTestCase(base.BaseTestCase):
 
     def test_delete_network(self):
         rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
-        self._test_rpc_api_legacy(
+        self._test_rpc_api(
                 rpcapi,
                 topics.get_topic_name(topics.AGENT,
                                       topics.NETWORK,
                                       topics.DELETE),
-                'network_delete', rpc_method='fanout_cast',
-                network_id='fake_request_spec')
+                'network_delete', rpc_method='cast',
+                fanout=True, network_id='fake_request_spec')
 
     def test_port_update(self):
         rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
-        self._test_rpc_api_legacy(
+        self._test_rpc_api(
                 rpcapi,
                 topics.get_topic_name(topics.AGENT,
                                       topics.PORT,
                                       topics.UPDATE),
-                'port_update', rpc_method='fanout_cast',
-                port='fake_port',
+                'port_update', rpc_method='cast',
+                fanout=True, port='fake_port',
                 network_type='fake_network_type',
                 segmentation_id='fake_segmentation_id',
                 physical_network='fake_physical_network')
 
     def test_tunnel_update(self):
         rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
-        self._test_rpc_api_legacy(
+        self._test_rpc_api(
                 rpcapi,
                 topics.get_topic_name(topics.AGENT,
                                       type_tunnel.TUNNEL,
                                       topics.UPDATE),
-                'tunnel_update', rpc_method='fanout_cast',
+                'tunnel_update', rpc_method='cast',
+                fanout=True,
                 tunnel_ip='fake_ip', tunnel_type='gre')
 
     def test_device_details(self):