LOG = logging.getLogger(__name__)
-def create_consumers(dispatcher, prefix, topic_details):
+def create_consumers(endpoints, prefix, topic_details):
"""Create agent RPC consumers.
- :param dispatcher: The dispatcher to process the incoming messages.
+ :param endpoints: The list of endpoints to process the incoming messages.
:param prefix: Common prefix for the plugin/agent message queues.
:param topic_details: A list of topics. Each topic has a name, an
operation, and an optional host param keying the
itertools.chain(details, [None]), 3)
topic_name = topics.get_topic_name(prefix, topic, operation)
- connection.create_consumer(topic_name, dispatcher, fanout=True)
+ connection.create_consumer(topic_name, endpoints, fanout=True)
if node_name:
node_topic_name = '%s.%s' % (topic_name, node_name)
connection.create_consumer(node_topic_name,
- dispatcher,
+ endpoints,
fanout=False)
connection.consume_in_threads()
return connection
super(Connection, self).__init__()
self.servers = []
- def create_consumer(self, topic, proxy, fanout=False):
+ def create_consumer(self, topic, endpoints, fanout=False):
target = messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
- server = n_rpc.get_server(target, proxy)
+ server = n_rpc.get_server(target, endpoints)
self.servers.append(server)
def consume_in_threads(self):