]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Renamed consume_in_thread -> consume_in_threads
authorIhar Hrachyshka <ihrachys@redhat.com>
Mon, 9 Jun 2014 12:38:09 +0000 (14:38 +0200)
committerIhar Hrachyshka <ihrachys@redhat.com>
Thu, 19 Jun 2014 10:58:01 +0000 (12:58 +0200)
Now that we explicitly start multiple RPC servers, and each listener is
served in a separate thread, renamed the method to reflect new
behaviour.

blueprint oslo-messaging

Change-Id: I616f3a23e23e982e13f9b56ce417ca3623247f95

26 files changed:
neutron/agent/rpc.py
neutron/common/rpc_compat.py
neutron/plugins/bigswitch/plugin.py
neutron/plugins/brocade/NeutronPlugin.py
neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py
neutron/plugins/hyperv/hyperv_neutron_plugin.py
neutron/plugins/ibm/sdnve_neutron_plugin.py
neutron/plugins/linuxbridge/lb_neutron_plugin.py
neutron/plugins/midonet/plugin.py
neutron/plugins/ml2/plugin.py
neutron/plugins/mlnx/mlnx_plugin.py
neutron/plugins/nec/nec_plugin.py
neutron/plugins/oneconvergence/plugin.py
neutron/plugins/openvswitch/ovs_neutron_plugin.py
neutron/plugins/ryu/ryu_neutron_plugin.py
neutron/plugins/vmware/dhcpmeta_modes.py
neutron/services/firewall/fwaas_plugin.py
neutron/services/l3_router/l3_router_plugin.py
neutron/services/loadbalancer/drivers/common/agent_driver_base.py
neutron/services/metering/metering_plugin.py
neutron/services/vpn/device_drivers/cisco_ipsec.py
neutron/services/vpn/device_drivers/ipsec.py
neutron/services/vpn/service_drivers/cisco_ipsec.py
neutron/services/vpn/service_drivers/ipsec.py
neutron/tests/base.py
neutron/tests/unit/test_agent_rpc.py

index 5d2abf978e31b7c4133f763ec40c0b3feae43c4e..62b6860c410c17f23d25fd58e6325c1a7aca62c2 100644 (file)
@@ -51,7 +51,7 @@ def create_consumers(dispatcher, prefix, topic_details):
             connection.create_consumer(node_topic_name,
                                        dispatcher,
                                        fanout=False)
-    connection.consume_in_thread()
+    connection.consume_in_threads()
     return connection
 
 
index 939551d49300707d1895908b84eaf69c78f4e8b4..45588d9bcf3ea080a43dddb43ddd4c7fc580d1a0 100644 (file)
@@ -123,8 +123,8 @@ class Service(service.Service):
         if callable(getattr(self.manager, 'initialize_service_hook', None)):
             self.manager.initialize_service_hook(self)
 
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
 
     def stop(self):
         # Try to shut the connection down, but if we get any sort of
@@ -148,7 +148,7 @@ class Connection(object):
         server = n_rpc.get_server(target, proxy)
         self.servers.append(server)
 
-    def consume_in_thread(self):
+    def consume_in_threads(self):
         for server in self.servers:
             server.start()
         return self.servers
index 712f02b3c30f6beaa4c5312e83eb651c77648910..bdac0cf1a5ffdbdf94454aba501bc75d75b99882 100644 (file)
@@ -509,8 +509,8 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         self.conn.create_consumer(self.topic, self.dispatcher,
                                   fanout=False)
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
 
     def create_network(self, context, network):
         """Create a network.
index 5ec3fb40169d947bf6f231575da9923ab05e5970..5e0ee782708b4fbf82377a2f17e7c037bd94a284 100644 (file)
@@ -266,8 +266,8 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         for svc_topic in self.service_topics.values():
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
         self.notifier = AgentNotifierApi(topics.AGENT)
         self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
             dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
index e5c701e7d24893d1a3f0efe3a7e13cb48e6cc4db..83defcf7cde355999923a0f382811f7451bbee34 100644 (file)
@@ -140,8 +140,8 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
         self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
         self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
 
     def _setup_vsm(self):
         """
index 3bfb043ed587dac6f423323318d11b425a6c4852..2b2414845c41e46c5fff302dda8dde29ec0a5e20 100644 (file)
@@ -194,8 +194,8 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         for svc_topic in self.service_topics.values():
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
 
     def _parse_network_vlan_ranges(self):
         self._network_vlan_ranges = plugin_utils.parse_network_vlan_ranges(
index 8a6615f2e4443115e45900af5ec09f5402ef6ff3..80ddf4f70bba4d46bab1af9ad0286da9726859da 100644 (file)
@@ -144,8 +144,8 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         self.conn.create_consumer(self.topic, self.dispatcher,
                                   fanout=False)
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
 
     def _update_base_binding_dict(self, tenant_type):
         if tenant_type == constants.TENANT_TYPE_OVERLAY:
index 61089f63cfcd7f5ceaa043626cc23e7205c9cfef..025048e0aad72e138baf4a06a483c6f02bd4ee6e 100644 (file)
@@ -285,8 +285,8 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         for svc_topic in self.service_topics.values():
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
         self.notifier = AgentNotifierApi(topics.AGENT)
         self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
             dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
index 4495dda01d298b9ce10632cab51f487b9c7f5ec2..7f2dcdfd1352ae9b62c89c5f0657fc214b7f19fa 100644 (file)
@@ -386,8 +386,8 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         self.conn.create_consumer(self.topic, self.dispatcher,
                                   fanout=False)
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
 
     def create_subnet(self, context, subnet):
         """Create Neutron subnet.
index aab9b8b8911f290076138ba0bd467b4df350c889..e839538e3e49bf2e9cd4940c561aa1e821376b93 100644 (file)
@@ -132,7 +132,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         self.conn.create_consumer(self.topic, self.dispatcher,
                                   fanout=False)
-        return self.conn.consume_in_thread()
+        return self.conn.consume_in_threads()
 
     def _process_provider_segment(self, segment):
         network_type = self._get_attribute(segment, provider.NETWORK_TYPE)
index f0a469bd55b501d85bd15793b309f314b730e518..05d639a497e83aa72fc770b43c7c7ed845ee7760 100644 (file)
@@ -124,8 +124,8 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         for svc_topic in self.service_topics.values():
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
         self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
         self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
             dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
index 2bea5c04effa059f9f9df1bc0fcd00eb419f8184..266dab7468f7438163f8fd5d50df6c8ade1365b3 100644 (file)
@@ -154,8 +154,8 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
             agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
 
     def _update_resource_status(self, context, resource, id, status):
         """Update status of specified resource."""
index 732ead70ae631a2f6f5b3fcb23c4e5905a83f8f1..4411bd20d6091a13000dac3fc885aa5f20e17733 100644 (file)
@@ -170,8 +170,8 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         for svc_topic in self.service_topics.values():
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
 
     def create_network(self, context, network):
 
index 5e3f387b0f4f68ef9ba723d2e3029f774dbffcf2..bf35bb7a4da5e98ae3d0cf088f5de4287c861afc 100644 (file)
@@ -345,8 +345,8 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         for svc_topic in self.service_topics.values():
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
+        # Consume from all consumers in threads
+        self.conn.consume_in_threads()
 
     def _parse_network_vlan_ranges(self):
         try:
index 787ccb21c745141efa3f35542bd562ae353f5f3c..0e9405a985fa2a40f2b23bfb11679f49dea49129 100644 (file)
@@ -147,7 +147,7 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         for svc_topic in self.service_topics.values():
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
-        self.conn.consume_in_thread()
+        self.conn.consume_in_threads()
 
     def _create_all_tenant_network(self):
         for net in db_api_v2.network_all_tenant_list():
index 734358a63944d676fcc9575ac0b00f3ec6cdc837..b878503a2776b81aaac8641860415e667b86193f 100644 (file)
@@ -74,7 +74,7 @@ class DhcpMetadataAccess(object):
         self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
         self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
             notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
-        self.conn.consume_in_thread()
+        self.conn.consume_in_threads()
         self.network_scheduler = importutils.import_object(
             cfg.CONF.network_scheduler_driver
         )
index 0238902f3beb7cdac63fc8879b616046c87170f5..dbda0c5bfaa63a34385cee0d22554ecc0a4ac845 100644 (file)
@@ -172,7 +172,7 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin):
             topics.FIREWALL_PLUGIN,
             self.callbacks.create_rpc_dispatcher(),
             fanout=False)
-        self.conn.consume_in_thread()
+        self.conn.consume_in_threads()
 
         self.agent_rpc = FirewallAgentApi(
             topics.L3_AGENT,
index 29950c984d8d410a9890d93bef7cfcbb0e4c75e6..b13074193f16880c34d12911b4008bc3eb75e8c2 100644 (file)
@@ -80,7 +80,7 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         self.conn.create_consumer(self.topic, self.dispatcher,
                                   fanout=False)
-        self.conn.consume_in_thread()
+        self.conn.consume_in_threads()
 
     def get_plugin_type(self):
         return constants.L3_ROUTER_NAT
index 85be0bacd0ebc88d76f87201bcf020012a41cf04..da9c438f14f8a24529baba39098098a3c539679b 100644 (file)
@@ -348,7 +348,7 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
             topics.LOADBALANCER_PLUGIN,
             self.plugin.agent_callbacks.create_rpc_dispatcher(),
             fanout=False)
-        self.plugin.conn.consume_in_thread()
+        self.plugin.conn.consume_in_threads()
 
     def get_pool_agent(self, context, pool_id):
         agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
index 61385cc046233fea4ff38ea1f60dd71f15902b5a..dd11149355953d0ce06897e420c0b6182ea0d96d 100644 (file)
@@ -35,7 +35,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
             topics.METERING_PLUGIN,
             self.callbacks.create_rpc_dispatcher(),
             fanout=False)
-        self.conn.consume_in_thread()
+        self.conn.consume_in_threads()
 
         self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
 
index 12904f23e342854d5528396acb963983dadbb1ce..22fe1558941aba858d5b9f58ebabb656c108b6de 100644 (file)
@@ -202,7 +202,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
             node_topic,
             self.create_rpc_dispatcher(),
             fanout=False)
-        self.conn.consume_in_thread()
+        self.conn.consume_in_threads()
         self.agent_rpc = (
             CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
         self.periodic_report = loopingcall.FixedIntervalLoopingCall(
index 2480eb27279776b306e5beff131c70f48dde4606..2ed7c08a6e582e6f358024a1b5b2ab8f14dc1a2f 100644 (file)
@@ -508,7 +508,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
             node_topic,
             self.create_rpc_dispatcher(),
             fanout=False)
-        self.conn.consume_in_thread()
+        self.conn.consume_in_threads()
         self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
         self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
             self.report_status, self.context)
index c2b39da9e353e60c7a270a67694a12fbebbdcd91..4c4bd7a059bbf56eba1812eeb89bc1bc45e4cb6e 100644 (file)
@@ -94,7 +94,7 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
             topics.CISCO_IPSEC_DRIVER_TOPIC,
             self.callbacks.create_rpc_dispatcher(),
             fanout=False)
-        self.conn.consume_in_thread()
+        self.conn.consume_in_threads()
         self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
             topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
 
index 13b7c171b4150246b83f2ed436ff4ce85ba95e02..662662044832ee6e79c8f2199a957d91c729fda7 100644 (file)
@@ -79,7 +79,7 @@ class IPsecVPNDriver(service_drivers.VpnDriver):
             topics.IPSEC_DRIVER_TOPIC,
             self.callbacks.create_rpc_dispatcher(),
             fanout=False)
-        self.conn.consume_in_thread()
+        self.conn.consume_in_threads()
         self.agent_rpc = IPsecVpnAgentApi(
             topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
 
index 95034f65381d01b2006f0b62c0e161cd8a0a9700..dfbbe13868a1e4f82a77fa398b39fd3d16f06508 100644 (file)
@@ -172,7 +172,7 @@ class BaseTestCase(testtools.TestCase):
 
         # don't actually start RPC listeners when testing
         self.useFixture(fixtures.MonkeyPatch(
-            'neutron.common.rpc_compat.Connection.consume_in_thread',
+            'neutron.common.rpc_compat.Connection.consume_in_threads',
             fake_consume_in_threads))
 
         self.useFixture(fixtures.MonkeyPatch(
index 569a739566611b764e4dce30bab3530c410b9248..db553a9faf587051a161d96515659de620b5a119 100644 (file)
@@ -88,7 +88,7 @@ class AgentRPCMethods(base.BaseTestCase):
             mock.call(new=True),
             mock.call().create_consumer('foo-topic-op', dispatcher,
                                         fanout=True),
-            mock.call().consume_in_thread()
+            mock.call().consume_in_threads()
         ]
 
         call_to_patch = 'neutron.common.rpc_compat.create_connection'
@@ -104,7 +104,7 @@ class AgentRPCMethods(base.BaseTestCase):
                                         fanout=True),
             mock.call().create_consumer('foo-topic-op.node1', dispatcher,
                                         fanout=False),
-            mock.call().consume_in_thread()
+            mock.call().consume_in_threads()
         ]
 
         call_to_patch = 'neutron.common.rpc_compat.create_connection'