connection.create_consumer(node_topic_name,
dispatcher,
fanout=False)
- connection.consume_in_thread()
+ connection.consume_in_threads()
return connection
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
def stop(self):
# Try to shut the connection down, but if we get any sort of
server = n_rpc.get_server(target, proxy)
self.servers.append(server)
- def consume_in_thread(self):
+ def consume_in_threads(self):
for server in self.servers:
server.start()
return self.servers
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
def create_network(self, context, network):
"""Create a network.
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
self.notifier = AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
def _setup_vsm(self):
"""
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
def _parse_network_vlan_ranges(self):
self._network_vlan_ranges = plugin_utils.parse_network_vlan_ranges(
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
def _update_base_binding_dict(self, tenant_type):
if tenant_type == constants.TENANT_TYPE_OVERLAY:
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
self.notifier = AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
def create_subnet(self, context, subnet):
"""Create Neutron subnet.
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
- return self.conn.consume_in_thread()
+ return self.conn.consume_in_threads()
def _process_provider_segment(self, segment):
network_type = self._get_attribute(segment, provider.NETWORK_TYPE)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
def _update_resource_status(self, context, resource, id, status):
"""Update status of specified resource."""
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
def create_network(self, context, network):
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
+ # Consume from all consumers in threads
+ self.conn.consume_in_threads()
def _parse_network_vlan_ranges(self):
try:
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
- self.conn.consume_in_thread()
+ self.conn.consume_in_threads()
def _create_all_tenant_network(self):
for net in db_api_v2.network_all_tenant_list():
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
- self.conn.consume_in_thread()
+ self.conn.consume_in_threads()
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
topics.FIREWALL_PLUGIN,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
- self.conn.consume_in_thread()
+ self.conn.consume_in_threads()
self.agent_rpc = FirewallAgentApi(
topics.L3_AGENT,
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
- self.conn.consume_in_thread()
+ self.conn.consume_in_threads()
def get_plugin_type(self):
return constants.L3_ROUTER_NAT
topics.LOADBALANCER_PLUGIN,
self.plugin.agent_callbacks.create_rpc_dispatcher(),
fanout=False)
- self.plugin.conn.consume_in_thread()
+ self.plugin.conn.consume_in_threads()
def get_pool_agent(self, context, pool_id):
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
topics.METERING_PLUGIN,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
- self.conn.consume_in_thread()
+ self.conn.consume_in_threads()
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
node_topic,
self.create_rpc_dispatcher(),
fanout=False)
- self.conn.consume_in_thread()
+ self.conn.consume_in_threads()
self.agent_rpc = (
CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
self.periodic_report = loopingcall.FixedIntervalLoopingCall(
node_topic,
self.create_rpc_dispatcher(),
fanout=False)
- self.conn.consume_in_thread()
+ self.conn.consume_in_threads()
self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
self.report_status, self.context)
topics.CISCO_IPSEC_DRIVER_TOPIC,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
- self.conn.consume_in_thread()
+ self.conn.consume_in_threads()
self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
topics.IPSEC_DRIVER_TOPIC,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
- self.conn.consume_in_thread()
+ self.conn.consume_in_threads()
self.agent_rpc = IPsecVpnAgentApi(
topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
# don't actually start RPC listeners when testing
self.useFixture(fixtures.MonkeyPatch(
- 'neutron.common.rpc_compat.Connection.consume_in_thread',
+ 'neutron.common.rpc_compat.Connection.consume_in_threads',
fake_consume_in_threads))
self.useFixture(fixtures.MonkeyPatch(
mock.call(new=True),
mock.call().create_consumer('foo-topic-op', dispatcher,
fanout=True),
- mock.call().consume_in_thread()
+ mock.call().consume_in_threads()
]
call_to_patch = 'neutron.common.rpc_compat.create_connection'
fanout=True),
mock.call().create_consumer('foo-topic-op.node1', dispatcher,
fanout=False),
- mock.call().consume_in_thread()
+ mock.call().consume_in_threads()
]
call_to_patch = 'neutron.common.rpc_compat.create_connection'