]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Removed create_rpc_dispatcher methods
authorIhar Hrachyshka <ihrachys@redhat.com>
Mon, 9 Jun 2014 13:10:04 +0000 (15:10 +0200)
committerIhar Hrachyshka <ihrachys@redhat.com>
Fri, 20 Jun 2014 18:44:37 +0000 (20:44 +0200)
Now that we don't have a special dispatcher class and we pass a list of
endpoints to corresponding functions instead, those methods are
unneeded.

blueprint oslo-messaging

Change-Id: If2b187fd8e553594212264f34b51b5b99c4630b2

47 files changed:
neutron/common/rpc_compat.py
neutron/db/metering/metering_rpc.py
neutron/plugins/bigswitch/agent/restproxy_agent.py
neutron/plugins/bigswitch/plugin.py
neutron/plugins/brocade/NeutronPlugin.py
neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py
neutron/plugins/hyperv/agent/hyperv_neutron_agent.py
neutron/plugins/hyperv/hyperv_neutron_plugin.py
neutron/plugins/hyperv/rpc_callbacks.py
neutron/plugins/ibm/agent/sdnve_neutron_agent.py
neutron/plugins/ibm/sdnve_neutron_plugin.py
neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py
neutron/plugins/linuxbridge/lb_neutron_plugin.py
neutron/plugins/midonet/plugin.py
neutron/plugins/ml2/plugin.py
neutron/plugins/ml2/rpc.py
neutron/plugins/mlnx/agent/eswitch_neutron_agent.py
neutron/plugins/mlnx/mlnx_plugin.py
neutron/plugins/mlnx/rpc_callbacks.py
neutron/plugins/nec/agent/nec_neutron_agent.py
neutron/plugins/nec/nec_plugin.py
neutron/plugins/ofagent/agent/ofa_neutron_agent.py
neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py
neutron/plugins/oneconvergence/plugin.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/plugins/openvswitch/ovs_neutron_plugin.py
neutron/plugins/ryu/agent/ryu_neutron_agent.py
neutron/plugins/ryu/ryu_neutron_plugin.py
neutron/plugins/vmware/dhcp_meta/rpc.py
neutron/plugins/vmware/dhcpmeta_modes.py
neutron/services/firewall/fwaas_plugin.py
neutron/services/l3_router/l3_router_plugin.py
neutron/services/loadbalancer/drivers/common/agent_driver_base.py
neutron/services/metering/metering_plugin.py
neutron/services/vpn/device_drivers/cisco_ipsec.py
neutron/services/vpn/device_drivers/ipsec.py
neutron/services/vpn/service_drivers/cisco_ipsec.py
neutron/services/vpn/service_drivers/ipsec.py
neutron/tests/unit/bigswitch/test_base.py
neutron/tests/unit/bigswitch/test_security_groups.py
neutron/tests/unit/ml2/test_port_binding.py
neutron/tests/unit/ml2/test_security_group.py
neutron/tests/unit/oneconvergence/test_security_group.py
neutron/tests/unit/openvswitch/test_ovs_security_group.py
neutron/tests/unit/ryu/test_ryu_security_group.py
neutron/tests/unit/services/firewall/test_fwaas_plugin.py
neutron/tests/unit/test_agent_rpc.py

index 697c72a5a88475621bb263f88ce6b019959fcea8..8c16c2c5bfaab07342457eefb3601542d5343317 100644 (file)
@@ -108,15 +108,15 @@ class Service(service.Service):
         LOG.debug("Creating Consumer connection for Service %s" %
                   self.topic)
 
-        dispatcher = [self.manager]
+        endpoints = [self.manager]
 
         # Share this same connection for these Consumers
-        self.conn.create_consumer(self.topic, dispatcher, fanout=False)
+        self.conn.create_consumer(self.topic, endpoints, fanout=False)
 
         node_topic = '%s.%s' % (self.topic, self.host)
-        self.conn.create_consumer(node_topic, dispatcher, fanout=False)
+        self.conn.create_consumer(node_topic, endpoints, fanout=False)
 
-        self.conn.create_consumer(self.topic, dispatcher, fanout=True)
+        self.conn.create_consumer(self.topic, endpoints, fanout=True)
 
         # Hook to allow the manager to do other initializations after
         # the rpc connection is created.
index b55a0cf4c7e291d055df26ae37f4d931593c2437..c0bbd51ad30b660ec42ffeb0aa4f568a8fad78f8 100644 (file)
@@ -30,9 +30,6 @@ class MeteringRpcCallbacks(object):
     def __init__(self, meter_plugin):
         self.meter_plugin = meter_plugin
 
-    def create_rpc_dispatcher(self):
-        return [self]
-
     def get_sync_data_metering(self, context, **kwargs):
         l3_plugin = manager.NeutronManager.get_service_plugins().get(
             service_constants.L3_ROUTER_NAT)
index 6cdf5913b0707f67be738b1c3a6be0ee4f9ba80d..97aa7d0e3f4e22063e6f4a7344356fdb1c366025 100644 (file)
@@ -105,10 +105,10 @@ class RestProxyAgent(rpc_compat.RpcCallback,
         self.topic = topics.AGENT
         self.plugin_rpc = PluginApi(topics.PLUGIN)
         self.context = q_context.get_admin_context_without_session()
-        self.dispatcher = [self]
+        self.endpoints = [self]
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
 
index bdac0cf1a5ffdbdf94454aba501bc75d75b99882..c13c45b656a6d5cbe3b960dfc5ecffafa26cc277 100644 (file)
@@ -119,9 +119,6 @@ class RestProxyCallbacks(rpc_compat.RpcCallback,
 
     RPC_API_VERSION = '1.1'
 
-    def create_rpc_dispatcher(self):
-        return [self, agents_db.AgentExtRpcCallback()]
-
     def get_port_from_device(self, device):
         port_id = re.sub(r"^tap", "", device)
         port = self.get_port_and_sgs(port_id)
@@ -505,9 +502,9 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
         self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
             self._dhcp_agent_notifier
         )
-        self.callbacks = RestProxyCallbacks()
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
-        self.conn.create_consumer(self.topic, self.dispatcher,
+        self.endpoints = [RestProxyCallbacks(),
+                          agents_db.AgentExtRpcCallback()]
+        self.conn.create_consumer(self.topic, self.endpoints,
                                   fanout=False)
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
index 5e0ee782708b4fbf82377a2f17e7c037bd94a284..c633085d074fae72bb3947a735f5ebd0a71bbd59 100644 (file)
@@ -91,14 +91,6 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback,
     #   1.1 Support Security Group RPC
     TAP_PREFIX_LEN = 3
 
-    def create_rpc_dispatcher(self):
-        """Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        """
-        return [self, agents_db.AgentExtRpcCallback()]
-
     @classmethod
     def get_port_from_device(cls, device):
         """Get port from the brocade specific db."""
@@ -262,10 +254,10 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.rpc_context = context.RequestContext('neutron', 'neutron',
                                                   is_admin=False)
         self.conn = rpc_compat.create_connection(new=True)
-        self.callbacks = BridgeRpcCallbacks()
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.endpoints = [BridgeRpcCallbacks(),
+                          agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
-            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
         self.notifier = AgentNotifierApi(topics.AGENT)
index 83defcf7cde355999923a0f382811f7451bbee34..a31e570f0f05536592c2f2b6c14dd094b85ac098 100644 (file)
@@ -68,14 +68,6 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback,
     # Set RPC API version to 1.1 by default.
     RPC_API_VERSION = '1.1'
 
-    def create_rpc_dispatcher(self):
-        """Get the rpc dispatcher for this rpc manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        """
-        return [self, agents_db.AgentExtRpcCallback()]
-
 
 class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
                           external_net_db.External_net_db_mixin,
@@ -135,9 +127,9 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.service_topics = {svc_constants.CORE: topics.PLUGIN,
                                svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
         self.conn = rpc_compat.create_connection(new=True)
-        self.dispatcher = N1kvRpcCallbacks().create_rpc_dispatcher()
+        self.endpoints = [N1kvRpcCallbacks(), agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
-            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
         self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
         self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
         # Consume from all consumers in threads
index f76f751f843dd2f856975d3e48d97eddc7951e92..07a5ed7764ae6d0a3df40c06c56be231d9791280 100644 (file)
@@ -97,16 +97,13 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback,
 
     def _setup_rpc(self):
         self.topic = topics.AGENT
-        self.dispatcher = self._create_rpc_dispatcher()
+        self.endpoints = [HyperVSecurityCallbackMixin(self)]
         consumers = [[topics.SECURITY_GROUP, topics.UPDATE]]
 
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
 
-    def _create_rpc_dispatcher(self):
-        return [HyperVSecurityCallbackMixin(self)]
-
 
 class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
                                   sg_rpc.SecurityGroupAgentRpcCallbackMixin):
@@ -165,13 +162,13 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
         # RPC network init
         self.context = context.get_admin_context_without_session()
         # Handle updates from service
-        self.dispatcher = self._create_rpc_dispatcher()
+        self.endpoints = [self]
         # Define the listening consumers for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.NETWORK, topics.DELETE],
                      [topics.PORT, topics.DELETE],
                      [constants.TUNNEL, topics.UPDATE]]
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
 
@@ -233,9 +230,6 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
             network_type, physical_network,
             segmentation_id, port['admin_state_up'])
 
-    def _create_rpc_dispatcher(self):
-        return [self]
-
     def _get_vswitch_name(self, network_type, physical_network):
         if network_type != p_const.TYPE_LOCAL:
             vswitch_name = self._get_vswitch_for_physical_network(
index 2b2414845c41e46c5fff302dda8dde29ec0a5e20..4307e5133577d16f137862e391b35c8ed3af00c4 100644 (file)
@@ -190,10 +190,10 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
         self.conn = rpc_compat.create_connection(new=True)
         self.notifier = agent_notifier_api.AgentNotifierApi(
             topics.AGENT)
-        self.callbacks = rpc_callbacks.HyperVRpcCallbacks(self.notifier)
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier),
+                          agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
-            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
 
index e967286d58578315fe388f00fc31f27dd03ba5f9..874059a58365569c874f7198c66075fc10b5f5ef 100644 (file)
@@ -18,7 +18,6 @@
 
 from neutron.common import constants as q_const
 from neutron.common import rpc_compat
-from neutron.db import agents_db
 from neutron.db import dhcp_rpc_base
 from neutron.db import l3_rpc_base
 from neutron.openstack.common import log as logging
@@ -41,14 +40,6 @@ class HyperVRpcCallbacks(
         self.notifier = notifier
         self._db = hyperv_db.HyperVPluginDB()
 
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return [self, agents_db.AgentExtRpcCallback()]
-
     def get_device_details(self, rpc_context, **kwargs):
         """Agent requests device details."""
         agent_id = kwargs.get('agent_id')
index b1fa1e8b65aecb91216c9e65de479ee10c3a8502..e1c8d3ed71c324f3b03a626d76fb3a18c278cacf 100644 (file)
@@ -123,10 +123,10 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
         self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
 
         self.context = context.get_admin_context_without_session()
-        self.dispatcher = self.create_rpc_dispatcher()
+        self.endpoints = [self]
         consumers = [[constants.INFO, topics.UPDATE]]
 
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
         if self.polling_interval:
@@ -154,9 +154,6 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
                                              "connection-mode",
                                              "out-of-band")
 
-    def create_rpc_dispatcher(self):
-        return [self]
-
     def setup_integration_br(self, bridge_name, reset_br, out_of_band,
                              controller_ip=None):
         '''Sets up the integration bridge.
index 80ddf4f70bba4d46bab1af9ad0286da9726859da..cf127f0014604d169c73126e2a879e277e7995c1 100644 (file)
@@ -48,13 +48,6 @@ class SdnveRpcCallbacks():
     def __init__(self, notifier):
         self.notifier = notifier  # used to notify the agent
 
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return [self, agents_db.AgentExtRpcCallback()]
-
     def sdnve_info(self, rpc_context, **kwargs):
         '''Update new information.'''
         info = kwargs.get('info')
@@ -140,9 +133,9 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.topic = topics.PLUGIN
         self.conn = rpc_compat.create_connection(new=True)
         self.notifier = AgentNotifierApi(topics.AGENT)
-        self.callbacks = SdnveRpcCallbacks(self.notifier)
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
-        self.conn.create_consumer(self.topic, self.dispatcher,
+        self.endpoints = [SdnveRpcCallbacks(self.notifier),
+                          agents_db.AgentExtRpcCallback()]
+        self.conn.create_consumer(self.topic, self.endpoints,
                                   fanout=False)
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
index 5af3f674a37085f421791e958a4086901735f4a7..65fddfa1fcca1aca468214f801bc9d3284fc776b 100755 (executable)
@@ -809,14 +809,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
 
             getattr(self, method)(context, values)
 
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return [self]
-
 
 class LinuxBridgePluginApi(agent_rpc.PluginApi,
                            sg_rpc.SecurityGroupServerRpcApiMixin):
@@ -876,9 +868,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
         # RPC network init
         self.context = context.get_admin_context_without_session()
         # Handle updates from service
-        self.callbacks = LinuxBridgeRpcCallbacks(self.context,
-                                                 self)
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self)]
         # Define the listening consumers for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.NETWORK, topics.DELETE],
@@ -886,7 +876,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
         if cfg.CONF.VXLAN.l2_population:
             consumers.append([topics.L2POPULATION,
                               topics.UPDATE, cfg.CONF.host])
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
         report_interval = cfg.CONF.AGENT.report_interval
index 025048e0aad72e138baf4a06a483c6f02bd4ee6e..412275d24eb96e83570182d205cbdf2b6586801f 100644 (file)
@@ -65,14 +65,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
     # Device names start with "tap"
     TAP_PREFIX_LEN = 3
 
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return [self, agents_db.AgentExtRpcCallback()]
-
     @classmethod
     def get_port_from_device(cls, device):
         port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
@@ -281,10 +273,10 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.service_topics = {svc_constants.CORE: topics.PLUGIN,
                                svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
         self.conn = rpc_compat.create_connection(new=True)
-        self.callbacks = LinuxBridgeRpcCallbacks()
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.endpoints = [LinuxBridgeRpcCallbacks(),
+                          agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
-            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
         self.notifier = AgentNotifierApi(topics.AGENT)
index 7f2dcdfd1352ae9b62c89c5f0657fc214b7f19fa..9a706d4a5233f69e12d49396faa301e9ab0477ec 100644 (file)
@@ -180,16 +180,6 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback,
                        dhcp_rpc_base.DhcpRpcCallbackMixin):
     RPC_API_VERSION = '1.1'
 
-    def create_rpc_dispatcher(self):
-        """Get the rpc dispatcher for this manager.
-
-        This a basic implementation that will call the plugin like get_ports
-        and handle basic events
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        """
-        return [self, agents_db.AgentExtRpcCallback()]
-
 
 class MidonetPluginException(n_exc.NeutronException):
     message = _("%(msg)s")
@@ -382,9 +372,9 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         # RPC support
         self.topic = topics.PLUGIN
         self.conn = rpc_compat.create_connection(new=True)
-        self.callbacks = MidoRpcCallbacks()
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
-        self.conn.create_consumer(self.topic, self.dispatcher,
+        self.endpoints = [MidoRpcCallbacks(),
+                          agents_db.AgentExtRpcCallback()]
+        self.conn.create_consumer(self.topic, self.endpoints,
                                   fanout=False)
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
index e839538e3e49bf2e9cd4940c561aa1e821376b93..a324637c5f89808c56250fe8933a2bf5820262e3 100644 (file)
@@ -25,6 +25,7 @@ from neutron.common import constants as const
 from neutron.common import exceptions as exc
 from neutron.common import rpc_compat
 from neutron.common import topics
+from neutron.db import agents_db
 from neutron.db import agentschedulers_db
 from neutron.db import allowedaddresspairs_db as addr_pair_db
 from neutron.db import db_base_plugin_v2
@@ -126,11 +127,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
         )
 
     def start_rpc_listeners(self):
-        self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
+        self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
+                          agents_db.AgentExtRpcCallback()]
         self.topic = topics.PLUGIN
         self.conn = rpc_compat.create_connection(new=True)
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
-        self.conn.create_consumer(self.topic, self.dispatcher,
+        self.conn.create_consumer(self.topic, self.endpoints,
                                   fanout=False)
         return self.conn.consume_in_threads()
 
index e5068afb4c266716fa6e194cde0ef0475bebe2b4..c744147c687f6c8fd228a7b6a378f5248dec9c72 100644 (file)
@@ -19,7 +19,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.common import constants as q_const
 from neutron.common import rpc_compat
 from neutron.common import topics
-from neutron.db import agents_db
 from neutron.db import api as db_api
 from neutron.db import dhcp_rpc_base
 from neutron.db import securitygroups_rpc_base as sg_db_rpc
@@ -58,14 +57,6 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
         # test in H3.
         super(RpcCallbacks, self).__init__(notifier, type_manager)
 
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return [self, agents_db.AgentExtRpcCallback()]
-
     @classmethod
     def _device_to_port_id(cls, device):
         # REVISIT(rkukura): Consider calling into MechanismDrivers to
index 94fd2b89a4278f8302cb4afac297f22ecea13c29..f60f02bb7747b9c6bbf1dda2d86b11e4636f3ba7 100644 (file)
@@ -210,15 +210,6 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback,
         else:
             LOG.debug(_("No port %s defined on agent."), port['id'])
 
-    def create_rpc_dispatcher(self):
-        """Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version,
-        or support more than one class as the target of rpc messages,
-        override this method.
-        """
-        return [self]
-
 
 class MlnxEswitchPluginApi(agent_rpc.PluginApi,
                            sg_rpc.SecurityGroupServerRpcApiMixin):
@@ -268,14 +259,12 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
         # RPC network init
         self.context = context.get_admin_context_without_session()
         # Handle updates from service
-        self.callbacks = MlnxEswitchRpcCallbacks(self.context,
-                                                 self)
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.endpoints = [MlnxEswitchRpcCallbacks(self.context, self)]
         # Define the listening consumers for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.NETWORK, topics.DELETE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
 
index 05d639a497e83aa72fc770b43c7c7ed845ee7760..16d72df553443d37a732da36a66768e77e8e5b5a 100644 (file)
@@ -28,6 +28,7 @@ from neutron.common import exceptions as n_exc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.common import utils
+from neutron.db import agents_db
 from neutron.db import agentschedulers_db
 from neutron.db import db_base_plugin_v2
 from neutron.db import external_net_db
@@ -120,10 +121,10 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
         self.service_topics = {svc_constants.CORE: topics.PLUGIN,
                                svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
         self.conn = rpc_compat.create_connection(new=True)
-        self.callbacks = rpc_callbacks.MlnxRpcCallbacks()
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
+                          agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
-            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
         self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
index 0eda5143687e662cae13c6454b9926ca27aeaec1..346d35822d1b24763a5f5b694d6bf99ba7eae99e 100644 (file)
@@ -18,7 +18,6 @@ from oslo.config import cfg
 
 from neutron.common import constants as q_const
 from neutron.common import rpc_compat
-from neutron.db import agents_db
 from neutron.db import api as db_api
 from neutron.db import dhcp_rpc_base
 from neutron.db import l3_rpc_base
@@ -40,15 +39,6 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback,
     #to be compatible with Linux Bridge Agent on Network Node
     TAP_PREFIX_LEN = 3
 
-    def create_rpc_dispatcher(self):
-        """Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an RPC API version,
-        or support more than one class as the target of RPC messages,
-        override this method.
-        """
-        return [self, agents_db.AgentExtRpcCallback()]
-
     @classmethod
     def get_port_from_device(cls, device):
         """Get port according to device.
index c1f580ac2e0384a57705c8e3ca196c7693be47db..6ab5f82b4eede1db0218a2f5d18e114160dc5b51 100755 (executable)
@@ -156,11 +156,11 @@ class NECNeutronAgent(object):
                                                 self, self.sg_agent)
         self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
                                                          self.sg_agent)
-        self.dispatcher = [self.callback_nec, self.callback_sg]
+        self.endpoints = [self.callback_nec, self.callback_sg]
         # Define the listening consumer for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
 
index 266dab7468f7438163f8fd5d50df6c8ade1365b3..f2225e733bcf3c0d4849e313be5e3d81c2adabce 100644 (file)
@@ -146,14 +146,14 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 
         # NOTE: callback_sg is referred to from the sg unit test.
         self.callback_sg = SecurityGroupServerRpcCallback()
-        self.dispatcher = [
+        self.endpoints = [
             NECPluginV2RPCCallbacks(self.safe_reference),
             DhcpRpcCallback(),
             L3RpcCallback(),
             self.callback_sg,
             agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
-            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
 
@@ -715,14 +715,6 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback):
         super(NECPluginV2RPCCallbacks, self).__init__()
         self.plugin = plugin
 
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return [self]
-
     def update_ports(self, rpc_context, **kwargs):
         """Update ports' information and activate/deavtivate them.
 
index c79d77a9154fa82e95abaefc4b64e22b019edb89..6e6cd84d7f7f8056f05a272be0ad7261eaac78c9 100644 (file)
@@ -286,13 +286,13 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
         # RPC network init
         self.context = context.get_admin_context_without_session()
         # Handle updates from service
-        self.dispatcher = self.create_rpc_dispatcher()
+        self.endpoints = [self]
         # Define the listening consumers for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.NETWORK, topics.DELETE],
                      [constants.TUNNEL, topics.UPDATE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
         report_interval = cfg.CONF.AGENT.report_interval
@@ -344,14 +344,6 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
             return
         self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
 
-    def create_rpc_dispatcher(self):
-        """Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        """
-        return [self]
-
     def _provision_local_vlan_outbound_for_tunnel(self, lvid,
                                                   segmentation_id, ofports):
         br = self.tun_br
index 0ef6348dfb428435dc3ea700ceffbbbd25bcc53e..377cdda1e99a636214e506701ecda8f0184f5668 100644 (file)
@@ -119,11 +119,11 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback):
                                                 self, self.sg_agent)
         self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
                                                          self.sg_agent)
-        self.dispatcher = [self.callback_oc, self.callback_sg]
+        self.endpoints = [self.callback_oc, self.callback_sg]
         # Define the listening consumer for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
 
index 4411bd20d6091a13000dac3fc885aa5f20e17733..257ab5494e24049e5930692e44ff9560986cf4ce 100644 (file)
@@ -58,10 +58,6 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback,
 
     RPC_API_VERSION = '1.1'
 
-    def create_rpc_dispatcher(self):
-        """Get the rpc dispatcher for this manager."""
-        return [self, agents_db.AgentExtRpcCallback()]
-
     @staticmethod
     def get_port_from_device(device):
         port = nvsd_db.get_port_from_device(device)
@@ -165,10 +161,10 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
             l3_rpc_agent_api.L3AgentNotifyAPI()
         )
-        self.callbacks = NVSDPluginRpcCallbacks()
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.endpoints = [NVSDPluginRpcCallbacks(),
+                          agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
-            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
index a66d773c82c745c9d4d049f8cb953989c437c16c..c5b136b068f7a073f088c98e94c343459ad8b177 100644 (file)
@@ -249,7 +249,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
         # RPC network init
         self.context = context.get_admin_context_without_session()
         # Handle updates from service
-        self.dispatcher = self.create_rpc_dispatcher()
+        self.endpoints = [self]
         # Define the listening consumers for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.NETWORK, topics.DELETE],
@@ -258,7 +258,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
         if self.l2_pop:
             consumers.append([topics.L2POPULATION,
                               topics.UPDATE, cfg.CONF.host])
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
         report_interval = cfg.CONF.AGENT.report_interval
@@ -493,14 +493,6 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
         else:
             LOG.warning(_('Action %s not supported'), action)
 
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return [self]
-
     def provision_local_vlan(self, net_uuid, network_type, physical_network,
                              segmentation_id):
         '''Provisions a local VLAN.
index bf35bb7a4da5e98ae3d0cf088f5de4287c861afc..31698a3df5467ab2ff76b7caa871ea5bbb3378a6 100644 (file)
@@ -73,14 +73,6 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback,
         self.notifier = notifier
         self.tunnel_type = tunnel_type
 
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return [self, agents_db.AgentExtRpcCallback()]
-
     @classmethod
     def get_port_from_device(cls, device):
         port = ovs_db_v2.get_port_from_device(device)
@@ -341,10 +333,10 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
             l3_rpc_agent_api.L3AgentNotifyAPI()
         )
-        self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type),
+                          agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
-            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
         # Consume from all consumers in threads
         self.conn.consume_in_threads()
 
index 6086113c7f59cd6c8e169e1b2825fcd83cc3fffe..d1fac31852527966577d90b830b20ce738b9b3ed 100755 (executable)
@@ -200,16 +200,13 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback,
         self.topic = topics.AGENT
         self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
         self.context = q_context.get_admin_context_without_session()
-        self.dispatcher = self._create_rpc_dispatcher()
+        self.endpoints = [self]
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
-        self.connection = agent_rpc.create_consumers(self.dispatcher,
+        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
 
-    def _create_rpc_dispatcher(self):
-        return [self]
-
     def _setup_integration_br(self, root_helper, integ_br,
                               tunnel_ip, ovsdb_port, ovsdb_ip):
         self.int_br = OVSBridge(integ_br, root_helper)
index 0e9405a985fa2a40f2b23bfb11679f49dea49129..9fd6bf9893ad7dbbd83048d54f05087ce622297d 100644 (file)
@@ -57,9 +57,6 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback,
         super(RyuRpcCallbacks, self).__init__()
         self.ofp_rest_api_addr = ofp_rest_api_addr
 
-    def create_rpc_dispatcher(self):
-        return [self]
-
     def get_ofp_rest_api(self, context, **kwargs):
         LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
         return self.ofp_rest_api_addr
@@ -143,10 +140,9 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
                                svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
         self.conn = rpc_compat.create_connection(new=True)
         self.notifier = AgentNotifierApi(topics.AGENT)
-        self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.endpoints = [RyuRpcCallbacks(self.ofp_api_host)]
         for svc_topic in self.service_topics.values():
-            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
         self.conn.consume_in_threads()
 
     def _create_all_tenant_network(self):
index c32a39b37246b053da8bf78fb0be883cd1899afd..9d409d01a7920978bcae2af723de6a20349c139f 100644 (file)
@@ -25,7 +25,6 @@ from neutron.api.v2 import attributes
 from neutron.common import constants as const
 from neutron.common import exceptions as ntn_exc
 from neutron.common import rpc_compat
-from neutron.db import agents_db
 from neutron.db import db_base_plugin_v2
 from neutron.db import dhcp_rpc_base
 from neutron.db import l3_db
@@ -48,14 +47,6 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback,
 
     RPC_API_VERSION = '1.1'
 
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return [self, agents_db.AgentExtRpcCallback()]
-
 
 def handle_network_dhcp_access(plugin, context, network, action):
     pass
index b878503a2776b81aaac8641860415e667b86193f..0ce2112f60aad0f5ba3cd76ed0fa8316c9d568fc 100644 (file)
@@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 from neutron.common import constants as const
 from neutron.common import rpc_compat
 from neutron.common import topics
+from neutron.db import agents_db
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
 from neutron.plugins.vmware.common import config
@@ -70,8 +71,9 @@ class DhcpMetadataAccess(object):
     def _setup_rpc_dhcp_metadata(self, notifier=None):
         self.topic = topics.PLUGIN
         self.conn = rpc_compat.create_connection(new=True)
-        self.dispatcher = nsx_rpc.NSXRpcCallbacks().create_rpc_dispatcher()
-        self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
+        self.endpoints = [nsx_rpc.NSXRpcCallbacks(),
+                          agents_db.AgentExtRpcCallback()]
+        self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
         self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
             notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
         self.conn.consume_in_threads()
index dbda0c5bfaa63a34385cee0d22554ecc0a4ac845..f0eaf1b2c6092064635afee30b816806fdf054a8 100644 (file)
@@ -40,9 +40,6 @@ class FirewallCallbacks(rpc_compat.RpcCallback):
         super(FirewallCallbacks, self).__init__()
         self.plugin = plugin
 
-    def create_rpc_dispatcher(self):
-        return [self]
-
     def set_firewall_status(self, context, firewall_id, status, **kwargs):
         """Agent uses this to set a firewall's status."""
         LOG.debug(_("set_firewall_status() called"))
@@ -165,13 +162,11 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin):
         """Do the initialization for the firewall service plugin here."""
         qdbapi.register_models()
 
-        self.callbacks = FirewallCallbacks(self)
+        self.endpoints = [FirewallCallbacks(self)]
 
         self.conn = rpc_compat.create_connection(new=True)
         self.conn.create_consumer(
-            topics.FIREWALL_PLUGIN,
-            self.callbacks.create_rpc_dispatcher(),
-            fanout=False)
+            topics.FIREWALL_PLUGIN, self.endpoints, fanout=False)
         self.conn.consume_in_threads()
 
         self.agent_rpc = FirewallAgentApi(
index b13074193f16880c34d12911b4008bc3eb75e8c2..bd1378bc8dbaa7efd935ecf4ae6bfc612e320780 100644 (file)
@@ -39,14 +39,6 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback,
 
     RPC_API_VERSION = '1.1'
 
-    def create_rpc_dispatcher(self):
-        """Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        """
-        return [self]
-
 
 class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
                      extraroute_db.ExtraRoute_db_mixin,
@@ -76,9 +68,8 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
         self.conn = rpc_compat.create_connection(new=True)
         self.agent_notifiers.update(
             {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
-        self.callbacks = L3RouterPluginRpcCallbacks()
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
-        self.conn.create_consumer(self.topic, self.dispatcher,
+        self.endpoints = [L3RouterPluginRpcCallbacks()]
+        self.conn.create_consumer(self.topic, self.endpoints,
                                   fanout=False)
         self.conn.consume_in_threads()
 
index da9c438f14f8a24529baba39098098a3c539679b..5849c95975311a47831e4773e70fdbd037eb3a3b 100644 (file)
@@ -64,9 +64,6 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback):
         super(LoadBalancerCallbacks, self).__init__()
         self.plugin = plugin
 
-    def create_rpc_dispatcher(self):
-        return [self, agents_db.AgentExtRpcCallback(self.plugin)]
-
     def get_ready_devices(self, context, host=None):
         with context.session.begin(subtransactions=True):
             agents = self.plugin.get_lbaas_agents(context,
@@ -342,11 +339,14 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
         if hasattr(self.plugin, 'agent_callbacks'):
             return
 
-        self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
+        self.plugin.agent_endpoints = [
+            LoadBalancerCallbacks(self.plugin),
+            agents_db.AgentExtRpcCallback(self.plugin)
+        ]
         self.plugin.conn = rpc_compat.create_connection(new=True)
         self.plugin.conn.create_consumer(
             topics.LOADBALANCER_PLUGIN,
-            self.plugin.agent_callbacks.create_rpc_dispatcher(),
+            self.plugin.agent_endpoints,
             fanout=False)
         self.plugin.conn.consume_in_threads()
 
index dd11149355953d0ce06897e420c0b6182ea0d96d..e67dbab60d4c181f00eb6ccb65911720269f5ea0 100644 (file)
@@ -28,13 +28,11 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
     def __init__(self):
         super(MeteringPlugin, self).__init__()
 
-        self.callbacks = metering_rpc.MeteringRpcCallbacks(self)
+        self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
 
         self.conn = rpc_compat.create_connection(new=True)
         self.conn.create_consumer(
-            topics.METERING_PLUGIN,
-            self.callbacks.create_rpc_dispatcher(),
-            fanout=False)
+            topics.METERING_PLUGIN, self.endpoints, fanout=False)
         self.conn.consume_in_threads()
 
         self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
index 22fe1558941aba858d5b9f58ebabb656c108b6de..7d73735c1cced572169ceccbab7d2e562bd75f3c 100644 (file)
@@ -198,10 +198,8 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
 
         self.service_state = {}
 
-        self.conn.create_consumer(
-            node_topic,
-            self.create_rpc_dispatcher(),
-            fanout=False)
+        self.endpoints = [self]
+        self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
         self.conn.consume_in_threads()
         self.agent_rpc = (
             CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
@@ -225,9 +223,6 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
                                                        v['timeout']))
                           for k, v in csrs_found.items()])
 
-    def create_rpc_dispatcher(self):
-        return [self]
-
     def vpnservice_updated(self, context, **kwargs):
         """Handle VPNaaS service driver change notifications."""
         LOG.debug(_("Handling VPN service update notification '%s'"),
index 2ed7c08a6e582e6f358024a1b5b2ab8f14dc1a2f..aef47919c0d7a70649b08984b3f89de32a6efc76 100644 (file)
@@ -504,10 +504,8 @@ class IPsecDriver(device_drivers.DeviceDriver):
         self.processes = {}
         self.process_status_cache = {}
 
-        self.conn.create_consumer(
-            node_topic,
-            self.create_rpc_dispatcher(),
-            fanout=False)
+        self.endpoints = [self]
+        self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
         self.conn.consume_in_threads()
         self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
         self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
@@ -515,9 +513,6 @@ class IPsecDriver(device_drivers.DeviceDriver):
         self.process_status_cache_check.start(
             interval=self.conf.ipsec.ipsec_status_check_interval)
 
-    def create_rpc_dispatcher(self):
-        return [self]
-
     def _update_nat(self, vpnservice, func):
         """Setting up nat rule in iptables.
 
index 4c4bd7a059bbf56eba1812eeb89bc1bc45e4cb6e..ed34f41ff746007f6fe5cd5859b507abe4d808f8 100644 (file)
@@ -53,9 +53,6 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback):
         super(CiscoCsrIPsecVpnDriverCallBack, self).__init__()
         self.driver = driver
 
-    def create_rpc_dispatcher(self):
-        return [self]
-
     def get_vpn_services_on_host(self, context, host=None):
         """Retuns info on the vpnservices on the host."""
         plugin = self.driver.service_plugin
@@ -88,12 +85,10 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
 
     def __init__(self, service_plugin):
         super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin)
-        self.callbacks = CiscoCsrIPsecVpnDriverCallBack(self)
+        self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)]
         self.conn = rpc_compat.create_connection(new=True)
         self.conn.create_consumer(
-            topics.CISCO_IPSEC_DRIVER_TOPIC,
-            self.callbacks.create_rpc_dispatcher(),
-            fanout=False)
+            topics.CISCO_IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
         self.conn.consume_in_threads()
         self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
             topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
index 662662044832ee6e79c8f2199a957d91c729fda7..be6aa9e26d0cd8cbef67d39e15f038d18aa5685c 100644 (file)
@@ -40,9 +40,6 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback):
         super(IPsecVpnDriverCallBack, self).__init__()
         self.driver = driver
 
-    def create_rpc_dispatcher(self):
-        return [self]
-
     def get_vpn_services_on_host(self, context, host=None):
         """Returns the vpnservices on the host."""
         plugin = self.driver.service_plugin
@@ -73,12 +70,10 @@ class IPsecVPNDriver(service_drivers.VpnDriver):
 
     def __init__(self, service_plugin):
         super(IPsecVPNDriver, self).__init__(service_plugin)
-        self.callbacks = IPsecVpnDriverCallBack(self)
+        self.endpoints = [IPsecVpnDriverCallBack(self)]
         self.conn = rpc_compat.create_connection(new=True)
         self.conn.create_consumer(
-            topics.IPSEC_DRIVER_TOPIC,
-            self.callbacks.create_rpc_dispatcher(),
-            fanout=False)
+            topics.IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
         self.conn.consume_in_threads()
         self.agent_rpc = IPsecVpnAgentApi(
             topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
index 6e5a0f7fe781929bb4aced0a3b36ef315117626f..6fc5580eb5306ab13ea8f17929fc062e1d708a21 100644 (file)
@@ -33,8 +33,6 @@ from neutron.db import portbindings_db  # noqa
 
 RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
 NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
-CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
-DISPATCHER = CALLBACKS + '.create_rpc_dispatcher'
 CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
 SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
 HTTPCON = 'neutron.plugins.bigswitch.servermanager.httplib.HTTPConnection'
@@ -61,15 +59,11 @@ class BigSwitchTestBase(object):
 
     def setup_patches(self):
         self.plugin_notifier_p = mock.patch(NOTIFIER)
-        # prevent rpc callback dispatcher from being created
-        self.callbacks_p = mock.patch(DISPATCHER,
-                                      new=lambda *args, **kwargs: None)
         # prevent any greenthreads from spawning
         self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None)
         # prevent the consistency watchdog from starting
         self.watch_p = mock.patch(CWATCH, new=lambda *args, **kwargs: None)
         self.addCleanup(db.clear_db)
-        self.callbacks_p.start()
         self.plugin_notifier_p.start()
         self.spawn_p.start()
         self.watch_p.start()
index f08623a729ada435b4de893b498d02c5242448e1..1e3a7aa56f7404f212f345dddfa4f35eb92ef0d5 100644 (file)
@@ -32,7 +32,7 @@ class RestProxySecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase,
         super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str)
         plugin = manager.NeutronManager.get_plugin()
         self.notifier = plugin.notifier
-        self.rpc = plugin.callbacks
+        self.rpc = plugin.endpoints[0]
         self.startHttpPatch()
 
 
index 19523474a2b626fab7075c16ca7b41ae4c7faf90..b4aa19a9ca66c480df185892d11649a6a6de4681 100644 (file)
@@ -62,7 +62,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
                                  bound, status)
             port_id = port['port']['id']
             neutron_context = context.get_admin_context()
-            details = self.plugin.callbacks.get_device_details(
+            details = self.plugin.endpoints[0].get_device_details(
                 neutron_context, agent_id="theAgentId", device=port_id)
             if bound:
                 self.assertEqual(details['network_type'], 'local')
index 46aba96126d84454d2c08836cf7cf95028d46975..3e82c91e4fdea41a24aad86a0e2ac76a4e8463dd 100644 (file)
@@ -74,7 +74,8 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
                                            req.get_response(self.api))
                     port_id = res['port']['id']
                     plugin = manager.NeutronManager.get_plugin()
-                    port_dict = plugin.callbacks.get_port_from_device(port_id)
+                    callbacks = plugin.endpoints[0]
+                    port_dict = callbacks.get_port_from_device(port_id)
                     self.assertEqual(port_id, port_dict['id'])
                     self.assertEqual([security_group_id],
                                      port_dict[ext_sg.SECURITYGROUPS])
@@ -85,7 +86,7 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
 
     def test_security_group_get_port_from_device_with_no_port(self):
         plugin = manager.NeutronManager.get_plugin()
-        port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
+        port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
         self.assertIsNone(port_dict)
 
 
index 051d5825b0786dab04b7d95cec776e4be55fa983..af08132c58df6fb7523d3a2c5a94351d76fa3cb1 100644 (file)
@@ -136,7 +136,8 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
                                            req.get_response(self.api))
                     port_id = res['port']['id']
                     plugin = manager.NeutronManager.get_plugin()
-                    port_dict = plugin.callbacks.get_port_from_device(port_id)
+                    callbacks = plugin.endpoints[0]
+                    port_dict = callbacks.get_port_from_device(port_id)
                     self.assertEqual(port_id, port_dict['id'])
                     self.assertEqual([security_group_id],
                                      port_dict[ext_sg.SECURITYGROUPS])
@@ -148,7 +149,7 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
     def test_security_group_get_port_from_device_with_no_port(self):
 
         plugin = manager.NeutronManager.get_plugin()
-        port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
+        port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
         self.assertIsNone(port_dict)
 
 
index c681af827133b373afb9b149f30ae28615f208b4..50e2caf27c94ace97dda1116723c4035c39b8e51 100644 (file)
@@ -84,7 +84,8 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
                                            req.get_response(self.api))
                     port_id = res['port']['id']
                     plugin = manager.NeutronManager.get_plugin()
-                    port_dict = plugin.callbacks.get_port_from_device(port_id)
+                    callbacks = plugin.endpoints[0]
+                    port_dict = callbacks.get_port_from_device(port_id)
                     self.assertEqual(port_id, port_dict['id'])
                     self.assertEqual([security_group_id],
                                      port_dict[ext_sg.SECURITYGROUPS])
@@ -95,7 +96,7 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
 
     def test_security_group_get_port_from_device_with_no_port(self):
         plugin = manager.NeutronManager.get_plugin()
-        port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
+        port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
         self.assertIsNone(port_dict)
 
 
index aadbe8266b6636fc8bc618bc452ad950912aab9c..a023136efc7103514944cc5ee0fdcacd1ba77e65 100644 (file)
@@ -73,7 +73,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
                                        req.get_response(self.api))
                 port_id = res['port']['id']
                 plugin = manager.NeutronManager.get_plugin()
-                port_dict = plugin.callbacks.get_port_from_device(port_id)
+                port_dict = plugin.endpoints[0].get_port_from_device(port_id)
                 self.assertEqual(port_id, port_dict['id'])
                 self.assertEqual([security_group_id],
                                  port_dict[ext_sg.SECURITYGROUPS])
@@ -84,7 +84,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
 
     def test_security_group_get_port_from_device_with_no_port(self):
         plugin = manager.NeutronManager.get_plugin()
-        port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
+        port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
         self.assertIsNone(port_dict)
 
 
index 73e94698b0e0f3f57a836da0a524dff57ef63949..2430d69c6dafdcc492162c4f3d0fa28e32d3d77a 100644 (file)
@@ -41,7 +41,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
     def setUp(self):
         super(TestFirewallCallbacks,
               self).setUp(fw_plugin=FW_PLUGIN_KLASS)
-        self.callbacks = self.plugin.callbacks
+        self.callbacks = self.plugin.endpoints[0]
 
     def test_set_firewall_status(self):
         ctx = context.get_admin_context()
@@ -210,7 +210,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
 
     def setUp(self):
         super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS)
-        self.callbacks = self.plugin.callbacks
+        self.callbacks = self.plugin.endpoints[0]
 
     def test_create_second_firewall_not_permitted(self):
         with self.firewall():
@@ -342,7 +342,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
                 for k, v in attrs.iteritems():
                     self.assertEqual(fw_db[k], v)
             # cleanup the pending firewall
-            self.plugin.callbacks.firewall_deleted(ctx, fw_id)
+            self.plugin.endpoints[0].firewall_deleted(ctx, fw_id)
 
     def test_delete_firewall_after_agent_delete(self):
         ctx = context.get_admin_context()
index db553a9faf587051a161d96515659de620b5a119..ed571376d65c2192d08e67d4a20050c825f5f9cc 100644 (file)
@@ -83,31 +83,31 @@ class AgentPluginReportState(base.BaseTestCase):
 
 class AgentRPCMethods(base.BaseTestCase):
     def test_create_consumers(self):
-        dispatcher = mock.Mock()
+        endpoints = [mock.Mock()]
         expected = [
             mock.call(new=True),
-            mock.call().create_consumer('foo-topic-op', dispatcher,
+            mock.call().create_consumer('foo-topic-op', endpoints,
                                         fanout=True),
             mock.call().consume_in_threads()
         ]
 
         call_to_patch = 'neutron.common.rpc_compat.create_connection'
         with mock.patch(call_to_patch) as create_connection:
-            rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
+            rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
             create_connection.assert_has_calls(expected)
 
     def test_create_consumers_with_node_name(self):
-        dispatcher = mock.Mock()
+        endpoints = [mock.Mock()]
         expected = [
             mock.call(new=True),
-            mock.call().create_consumer('foo-topic-op', dispatcher,
+            mock.call().create_consumer('foo-topic-op', endpoints,
                                         fanout=True),
-            mock.call().create_consumer('foo-topic-op.node1', dispatcher,
+            mock.call().create_consumer('foo-topic-op.node1', endpoints,
                                         fanout=False),
             mock.call().consume_in_threads()
         ]
 
         call_to_patch = 'neutron.common.rpc_compat.create_connection'
         with mock.patch(call_to_patch) as create_connection:
-            rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')])
+            rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
             create_connection.assert_has_calls(expected)