# Sends a request and returns the generated request id, doesn't wait for
# responses and doesn't execute any passed in code blocks for responses
def sendreq(msg, agent, filter = {})
+ request = createreq(msg, agent, filter)
+
+ Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
+
+ request.publish
+ request.requestid
+ end
+
+ def createreq(msg, agent, filter ={})
if msg.is_a?(Message)
request = msg
agent = request.agent
end
request.encode!
-
- Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
-
subscribe(agent, :reply) unless request.reply_to
-
- request.publish
-
- request.requestid
+ request
end
def subscribe(agent, type)
reply.expected_msgid = requestid
reply.decode!
-
- reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
-
- raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
+ unless reply.requestid == requestid
+ raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
+ end
rescue SecurityValidationFailed => e
Log.warn("Ignoring a message that did not pass security validations")
retry
rescue MsgDoesNotMatchRequestID => e
- Log.debug("Ignoring a message for some other client")
+ Log.debug("Ignoring a message for some other client : #{e.message}")
retry
end
#
# It returns a hash of times and timeouts for discovery and total run is taken from the options
# hash which in turn is generally built using MCollective::Optionparser
- def req(body, agent=nil, options=false, waitfor=0)
+ def req(body, agent=nil, options=false, waitfor=0, &block)
if body.is_a?(Message)
agent = body.agent
waitfor = body.discovered_hosts.size || 0
end
@options = options if options
-
- stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
-
+ threaded = @options[:threaded]
timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
-
+ request = createreq(body, agent, @options[:filter])
+ publish_timeout = @options[:publish_timeout]
+ stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
STDOUT.sync = true
-
hosts_responded = 0
- reqid = nil
+
begin
- Log.debug("Publishing request to agent %s with timeout %d" % [agent, timeout])
+ if threaded
+ hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
+ else
+ hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
+ end
+ rescue Interrupt => e
+ ensure
+ unsubscribe(agent, :reply)
+ end
- Timeout.timeout(timeout) do
- reqid = sendreq(body, agent, @options[:filter])
+ return update_stat(stat, hosts_responded, request.requestid)
+ end
- loop do
- resp = receive(reqid)
+ # Starts the client receiver and publisher unthreaded.
+ # This is the default client behaviour.
+ def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
+ start_publisher(request, publish_timeout)
+ start_receiver(request.requestid, waitfor, timeout, &block)
+ end
- hosts_responded += 1
+ # Starts the client receiver and publisher in threads.
+ # This is activated when the 'threader_client' configuration
+ # option is set.
+ def threaded_req(request, publish_timeout, timeout, waitfor, &block)
+ Log.debug("Starting threaded client")
+ publisher = Thread.new do
+ start_publisher(request, publish_timeout)
+ end
- yield(resp.payload)
+ # When the client is threaded we add the publishing timeout to
+ # the agent timeout so that the receiver doesn't time out before
+ # publishing has finished in cases where publish_timeout >= timeout.
+ total_timeout = publish_timeout + timeout
+ hosts_responded = 0
+
+ receiver = Thread.new do
+ hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
+ end
- break if (waitfor != 0 && hosts_responded >= waitfor)
- end
+ receiver.join
+ hosts_responded
+ end
+
+ # Starts the request publishing routine
+ def start_publisher(request, publish_timeout)
+ Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
+ begin
+ Timeout.timeout(publish_timeout) do
+ Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
+ request.publish
end
- rescue Interrupt => e
rescue Timeout::Error => e
- ensure
- unsubscribe(agent, :reply)
+ Log.warn("Could not publish all messages. Publishing timed out.")
end
+ end
+
+ # Starts the response receiver routine
+ # Expected to return the amount of received responses.
+ def start_receiver(requestid, waitfor, timeout, &block)
+ Log.debug("Starting response receiver with timeout of #{timeout}")
+ hosts_responded = 0
+ begin
+ Timeout.timeout(timeout) do
+ begin
+ resp = receive(requestid)
+ yield resp.payload
+ hosts_responded += 1
+ end while (waitfor == 0 || hosts_responded < waitfor)
+ end
+ rescue Timeout::Error => e
+ Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
+ end
+
+ hosts_responded
+ end
+ def update_stat(stat, hosts_responded, requestid)
stat[:totaltime] = Time.now.to_f - stat[:starttime]
stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
stat[:responses] = hosts_responded
stat[:noresponsefrom] = []
- stat[:requestid] = reqid
+ stat[:requestid] = requestid
@stats = stat
- return stat
end
def discovered_req(body, agent, options=false)