host = {}
host[:host] = get_option("rabbitmq.pool.#{poolnum}.host")
- host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 6163).to_i
+ host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 61613).to_i
host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user")
host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password")
host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", false)
target = {:name => msg.request.headers["reply-to"], :headers => {}, :id => ""}
elsif [:request, :direct_request].include?(msg.type)
- target = make_target(msg.agent, msg.type, msg.collective, node)
+ target = make_target(msg.agent, msg.type, msg.collective, msg.reply_to, node)
else
raise "Don't now how to create a target for message type #{msg.type}"
return target
end
- def make_target(agent, type, collective, node=nil)
+ def make_target(agent, type, collective, reply_to=nil, node=nil)
raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)
when :broadcast, :request # publishing a request to all nodes with an agent
target[:name] = "/exchange/%s_broadcast/%s" % [collective, agent]
- target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
+ if reply_to
+ target[:headers]["reply-to"] = reply_to
+ else
+ target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
+ end
target[:id] = "%s_broadcast_%s" % [collective, agent]
when :direct_request # a request to a specific node