From 159bb1bf37bec6cd6909c9f3a531c9468148fa4a Mon Sep 17 00:00:00 2001 From: Ihar Hrachyshka Date: Mon, 9 Jun 2014 14:38:09 +0200 Subject: [PATCH] Renamed consume_in_thread -> consume_in_threads Now that we explicitly start multiple RPC servers, and each listener is served in a separate thread, renamed the method to reflect new behaviour. blueprint oslo-messaging Change-Id: I616f3a23e23e982e13f9b56ce417ca3623247f95 --- neutron/agent/rpc.py | 2 +- neutron/common/rpc_compat.py | 6 +++--- neutron/plugins/bigswitch/plugin.py | 4 ++-- neutron/plugins/brocade/NeutronPlugin.py | 4 ++-- neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py | 4 ++-- neutron/plugins/hyperv/hyperv_neutron_plugin.py | 4 ++-- neutron/plugins/ibm/sdnve_neutron_plugin.py | 4 ++-- neutron/plugins/linuxbridge/lb_neutron_plugin.py | 4 ++-- neutron/plugins/midonet/plugin.py | 4 ++-- neutron/plugins/ml2/plugin.py | 2 +- neutron/plugins/mlnx/mlnx_plugin.py | 4 ++-- neutron/plugins/nec/nec_plugin.py | 4 ++-- neutron/plugins/oneconvergence/plugin.py | 4 ++-- neutron/plugins/openvswitch/ovs_neutron_plugin.py | 4 ++-- neutron/plugins/ryu/ryu_neutron_plugin.py | 2 +- neutron/plugins/vmware/dhcpmeta_modes.py | 2 +- neutron/services/firewall/fwaas_plugin.py | 2 +- neutron/services/l3_router/l3_router_plugin.py | 2 +- .../loadbalancer/drivers/common/agent_driver_base.py | 2 +- neutron/services/metering/metering_plugin.py | 2 +- neutron/services/vpn/device_drivers/cisco_ipsec.py | 2 +- neutron/services/vpn/device_drivers/ipsec.py | 2 +- neutron/services/vpn/service_drivers/cisco_ipsec.py | 2 +- neutron/services/vpn/service_drivers/ipsec.py | 2 +- neutron/tests/base.py | 2 +- neutron/tests/unit/test_agent_rpc.py | 4 ++-- 26 files changed, 40 insertions(+), 40 deletions(-) diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 5d2abf978..62b6860c4 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -51,7 +51,7 @@ def create_consumers(dispatcher, prefix, topic_details): connection.create_consumer(node_topic_name, dispatcher, fanout=False) - connection.consume_in_thread() + connection.consume_in_threads() return connection diff --git a/neutron/common/rpc_compat.py b/neutron/common/rpc_compat.py index 939551d49..45588d9bc 100644 --- a/neutron/common/rpc_compat.py +++ b/neutron/common/rpc_compat.py @@ -123,8 +123,8 @@ class Service(service.Service): if callable(getattr(self.manager, 'initialize_service_hook', None)): self.manager.initialize_service_hook(self) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() def stop(self): # Try to shut the connection down, but if we get any sort of @@ -148,7 +148,7 @@ class Connection(object): server = n_rpc.get_server(target, proxy) self.servers.append(server) - def consume_in_thread(self): + def consume_in_threads(self): for server in self.servers: server.start() return self.servers diff --git a/neutron/plugins/bigswitch/plugin.py b/neutron/plugins/bigswitch/plugin.py index 712f02b3c..bdac0cf1a 100644 --- a/neutron/plugins/bigswitch/plugin.py +++ b/neutron/plugins/bigswitch/plugin.py @@ -509,8 +509,8 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self.dispatcher = self.callbacks.create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() def create_network(self, context, network): """Create a network. diff --git a/neutron/plugins/brocade/NeutronPlugin.py b/neutron/plugins/brocade/NeutronPlugin.py index 5ec3fb401..5e0ee7827 100644 --- a/neutron/plugins/brocade/NeutronPlugin.py +++ b/neutron/plugins/brocade/NeutronPlugin.py @@ -266,8 +266,8 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.dispatcher = self.callbacks.create_rpc_dispatcher() for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() self.notifier = AgentNotifierApi(topics.AGENT) self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = ( dhcp_rpc_agent_api.DhcpAgentNotifyAPI() diff --git a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py index e5c701e7d..83defcf7c 100644 --- a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py +++ b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py @@ -140,8 +140,8 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI() - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() def _setup_vsm(self): """ diff --git a/neutron/plugins/hyperv/hyperv_neutron_plugin.py b/neutron/plugins/hyperv/hyperv_neutron_plugin.py index 3bfb043ed..2b2414845 100644 --- a/neutron/plugins/hyperv/hyperv_neutron_plugin.py +++ b/neutron/plugins/hyperv/hyperv_neutron_plugin.py @@ -194,8 +194,8 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin, self.dispatcher = self.callbacks.create_rpc_dispatcher() for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() def _parse_network_vlan_ranges(self): self._network_vlan_ranges = plugin_utils.parse_network_vlan_ranges( diff --git a/neutron/plugins/ibm/sdnve_neutron_plugin.py b/neutron/plugins/ibm/sdnve_neutron_plugin.py index 8a6615f2e..80ddf4f70 100644 --- a/neutron/plugins/ibm/sdnve_neutron_plugin.py +++ b/neutron/plugins/ibm/sdnve_neutron_plugin.py @@ -144,8 +144,8 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.dispatcher = self.callbacks.create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() def _update_base_binding_dict(self, tenant_type): if tenant_type == constants.TENANT_TYPE_OVERLAY: diff --git a/neutron/plugins/linuxbridge/lb_neutron_plugin.py b/neutron/plugins/linuxbridge/lb_neutron_plugin.py index 61089f63c..025048e0a 100644 --- a/neutron/plugins/linuxbridge/lb_neutron_plugin.py +++ b/neutron/plugins/linuxbridge/lb_neutron_plugin.py @@ -285,8 +285,8 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.dispatcher = self.callbacks.create_rpc_dispatcher() for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() self.notifier = AgentNotifierApi(topics.AGENT) self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = ( dhcp_rpc_agent_api.DhcpAgentNotifyAPI() diff --git a/neutron/plugins/midonet/plugin.py b/neutron/plugins/midonet/plugin.py index 4495dda01..7f2dcdfd1 100644 --- a/neutron/plugins/midonet/plugin.py +++ b/neutron/plugins/midonet/plugin.py @@ -386,8 +386,8 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.dispatcher = self.callbacks.create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() def create_subnet(self, context, subnet): """Create Neutron subnet. diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index aab9b8b89..e839538e3 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -132,7 +132,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.dispatcher = self.callbacks.create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) - return self.conn.consume_in_thread() + return self.conn.consume_in_threads() def _process_provider_segment(self, segment): network_type = self._get_attribute(segment, provider.NETWORK_TYPE) diff --git a/neutron/plugins/mlnx/mlnx_plugin.py b/neutron/plugins/mlnx/mlnx_plugin.py index f0a469bd5..05d639a49 100644 --- a/neutron/plugins/mlnx/mlnx_plugin.py +++ b/neutron/plugins/mlnx/mlnx_plugin.py @@ -124,8 +124,8 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2, self.dispatcher = self.callbacks.create_rpc_dispatcher() for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT) self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = ( dhcp_rpc_agent_api.DhcpAgentNotifyAPI() diff --git a/neutron/plugins/nec/nec_plugin.py b/neutron/plugins/nec/nec_plugin.py index 2bea5c04e..266dab746 100644 --- a/neutron/plugins/nec/nec_plugin.py +++ b/neutron/plugins/nec/nec_plugin.py @@ -154,8 +154,8 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2, agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() def _update_resource_status(self, context, resource, id, status): """Update status of specified resource.""" diff --git a/neutron/plugins/oneconvergence/plugin.py b/neutron/plugins/oneconvergence/plugin.py index 732ead70a..4411bd20d 100644 --- a/neutron/plugins/oneconvergence/plugin.py +++ b/neutron/plugins/oneconvergence/plugin.py @@ -170,8 +170,8 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2, for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() def create_network(self, context, network): diff --git a/neutron/plugins/openvswitch/ovs_neutron_plugin.py b/neutron/plugins/openvswitch/ovs_neutron_plugin.py index 5e3f387b0..bf35bb7a4 100644 --- a/neutron/plugins/openvswitch/ovs_neutron_plugin.py +++ b/neutron/plugins/openvswitch/ovs_neutron_plugin.py @@ -345,8 +345,8 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.dispatcher = self.callbacks.create_rpc_dispatcher() for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() + # Consume from all consumers in threads + self.conn.consume_in_threads() def _parse_network_vlan_ranges(self): try: diff --git a/neutron/plugins/ryu/ryu_neutron_plugin.py b/neutron/plugins/ryu/ryu_neutron_plugin.py index 787ccb21c..0e9405a98 100644 --- a/neutron/plugins/ryu/ryu_neutron_plugin.py +++ b/neutron/plugins/ryu/ryu_neutron_plugin.py @@ -147,7 +147,7 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.dispatcher = self.callbacks.create_rpc_dispatcher() for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) - self.conn.consume_in_thread() + self.conn.consume_in_threads() def _create_all_tenant_network(self): for net in db_api_v2.network_all_tenant_list(): diff --git a/neutron/plugins/vmware/dhcpmeta_modes.py b/neutron/plugins/vmware/dhcpmeta_modes.py index 734358a63..b878503a2 100644 --- a/neutron/plugins/vmware/dhcpmeta_modes.py +++ b/neutron/plugins/vmware/dhcpmeta_modes.py @@ -74,7 +74,7 @@ class DhcpMetadataAccess(object): self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI()) - self.conn.consume_in_thread() + self.conn.consume_in_threads() self.network_scheduler = importutils.import_object( cfg.CONF.network_scheduler_driver ) diff --git a/neutron/services/firewall/fwaas_plugin.py b/neutron/services/firewall/fwaas_plugin.py index 0238902f3..dbda0c5bf 100644 --- a/neutron/services/firewall/fwaas_plugin.py +++ b/neutron/services/firewall/fwaas_plugin.py @@ -172,7 +172,7 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin): topics.FIREWALL_PLUGIN, self.callbacks.create_rpc_dispatcher(), fanout=False) - self.conn.consume_in_thread() + self.conn.consume_in_threads() self.agent_rpc = FirewallAgentApi( topics.L3_AGENT, diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index 29950c984..b13074193 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -80,7 +80,7 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin, self.dispatcher = self.callbacks.create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) - self.conn.consume_in_thread() + self.conn.consume_in_threads() def get_plugin_type(self): return constants.L3_ROUTER_NAT diff --git a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py index 85be0bacd..da9c438f1 100644 --- a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py +++ b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py @@ -348,7 +348,7 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver): topics.LOADBALANCER_PLUGIN, self.plugin.agent_callbacks.create_rpc_dispatcher(), fanout=False) - self.plugin.conn.consume_in_thread() + self.plugin.conn.consume_in_threads() def get_pool_agent(self, context, pool_id): agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id) diff --git a/neutron/services/metering/metering_plugin.py b/neutron/services/metering/metering_plugin.py index 61385cc04..dd1114935 100644 --- a/neutron/services/metering/metering_plugin.py +++ b/neutron/services/metering/metering_plugin.py @@ -35,7 +35,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin): topics.METERING_PLUGIN, self.callbacks.create_rpc_dispatcher(), fanout=False) - self.conn.consume_in_thread() + self.conn.consume_in_threads() self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() diff --git a/neutron/services/vpn/device_drivers/cisco_ipsec.py b/neutron/services/vpn/device_drivers/cisco_ipsec.py index 12904f23e..22fe15589 100644 --- a/neutron/services/vpn/device_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/device_drivers/cisco_ipsec.py @@ -202,7 +202,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): node_topic, self.create_rpc_dispatcher(), fanout=False) - self.conn.consume_in_thread() + self.conn.consume_in_threads() self.agent_rpc = ( CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0')) self.periodic_report = loopingcall.FixedIntervalLoopingCall( diff --git a/neutron/services/vpn/device_drivers/ipsec.py b/neutron/services/vpn/device_drivers/ipsec.py index 2480eb272..2ed7c08a6 100644 --- a/neutron/services/vpn/device_drivers/ipsec.py +++ b/neutron/services/vpn/device_drivers/ipsec.py @@ -508,7 +508,7 @@ class IPsecDriver(device_drivers.DeviceDriver): node_topic, self.create_rpc_dispatcher(), fanout=False) - self.conn.consume_in_thread() + self.conn.consume_in_threads() self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0') self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall( self.report_status, self.context) diff --git a/neutron/services/vpn/service_drivers/cisco_ipsec.py b/neutron/services/vpn/service_drivers/cisco_ipsec.py index c2b39da9e..4c4bd7a05 100644 --- a/neutron/services/vpn/service_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/service_drivers/cisco_ipsec.py @@ -94,7 +94,7 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver): topics.CISCO_IPSEC_DRIVER_TOPIC, self.callbacks.create_rpc_dispatcher(), fanout=False) - self.conn.consume_in_thread() + self.conn.consume_in_threads() self.agent_rpc = CiscoCsrIPsecVpnAgentApi( topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION) diff --git a/neutron/services/vpn/service_drivers/ipsec.py b/neutron/services/vpn/service_drivers/ipsec.py index 13b7c171b..662662044 100644 --- a/neutron/services/vpn/service_drivers/ipsec.py +++ b/neutron/services/vpn/service_drivers/ipsec.py @@ -79,7 +79,7 @@ class IPsecVPNDriver(service_drivers.VpnDriver): topics.IPSEC_DRIVER_TOPIC, self.callbacks.create_rpc_dispatcher(), fanout=False) - self.conn.consume_in_thread() + self.conn.consume_in_threads() self.agent_rpc = IPsecVpnAgentApi( topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION) diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 95034f653..dfbbe1386 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -172,7 +172,7 @@ class BaseTestCase(testtools.TestCase): # don't actually start RPC listeners when testing self.useFixture(fixtures.MonkeyPatch( - 'neutron.common.rpc_compat.Connection.consume_in_thread', + 'neutron.common.rpc_compat.Connection.consume_in_threads', fake_consume_in_threads)) self.useFixture(fixtures.MonkeyPatch( diff --git a/neutron/tests/unit/test_agent_rpc.py b/neutron/tests/unit/test_agent_rpc.py index 569a73956..db553a9fa 100644 --- a/neutron/tests/unit/test_agent_rpc.py +++ b/neutron/tests/unit/test_agent_rpc.py @@ -88,7 +88,7 @@ class AgentRPCMethods(base.BaseTestCase): mock.call(new=True), mock.call().create_consumer('foo-topic-op', dispatcher, fanout=True), - mock.call().consume_in_thread() + mock.call().consume_in_threads() ] call_to_patch = 'neutron.common.rpc_compat.create_connection' @@ -104,7 +104,7 @@ class AgentRPCMethods(base.BaseTestCase): fanout=True), mock.call().create_consumer('foo-topic-op.node1', dispatcher, fanout=False), - mock.call().consume_in_thread() + mock.call().consume_in_threads() ] call_to_patch = 'neutron.common.rpc_compat.create_connection' -- 2.45.2