X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=lib%2Fmcollective%2Fclient.rb;h=9c8fd83171d40968400609ad75c3867c9687c338;hb=d1f1649ba43c5cbc43c4beb2380096ba051d646a;hp=d77eaa5c90fdef7b5f3789705985d7d0d5439c3c;hpb=b87d2f4e68281062df1913440ca5753ae63314a9;p=packages%2Fprecise%2Fmcollective.git diff --git a/lib/mcollective/client.rb b/lib/mcollective/client.rb index d77eaa5..9c8fd83 100644 --- a/lib/mcollective/client.rb +++ b/lib/mcollective/client.rb @@ -37,6 +37,15 @@ module MCollective # Sends a request and returns the generated request id, doesn't wait for # responses and doesn't execute any passed in code blocks for responses def sendreq(msg, agent, filter = {}) + request = createreq(msg, agent, filter) + + Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}") + + request.publish + request.requestid + end + + def createreq(msg, agent, filter ={}) if msg.is_a?(Message) request = msg agent = request.agent @@ -47,14 +56,8 @@ module MCollective end request.encode! - - Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}") - subscribe(agent, :reply) unless request.reply_to - - request.publish - - request.requestid + request end def subscribe(agent, type) @@ -91,15 +94,14 @@ module MCollective reply.expected_msgid = requestid reply.decode! - - reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON") - - raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid + unless reply.requestid == requestid + raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}") + end rescue SecurityValidationFailed => e Log.warn("Ignoring a message that did not pass security validations") retry rescue MsgDoesNotMatchRequestID => e - Log.debug("Ignoring a message for some other client") + Log.debug("Ignoring a message for some other client : #{e.message}") retry end @@ -124,7 +126,7 @@ module MCollective # # It returns a hash of times and timeouts for discovery and total run is taken from the options # hash which in turn is generally built using MCollective::Optionparser - def req(body, agent=nil, options=false, waitfor=0) + def req(body, agent=nil, options=false, waitfor=0, &block) if body.is_a?(Message) agent = body.agent waitfor = body.discovered_hosts.size || 0 @@ -132,46 +134,100 @@ module MCollective end @options = options if options - - stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} - + threaded = @options[:threaded] timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter]) - + request = createreq(body, agent, @options[:filter]) + publish_timeout = @options[:publish_timeout] + stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} STDOUT.sync = true - hosts_responded = 0 - reqid = nil + begin - Log.debug("Publishing request to agent %s with timeout %d" % [agent, timeout]) + if threaded + hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block) + else + hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block) + end + rescue Interrupt => e + ensure + unsubscribe(agent, :reply) + end - Timeout.timeout(timeout) do - reqid = sendreq(body, agent, @options[:filter]) + return update_stat(stat, hosts_responded, request.requestid) + end - loop do - resp = receive(reqid) + # Starts the client receiver and publisher unthreaded. + # This is the default client behaviour. + def unthreaded_req(request, publish_timeout, timeout, waitfor, &block) + start_publisher(request, publish_timeout) + start_receiver(request.requestid, waitfor, timeout, &block) + end - hosts_responded += 1 + # Starts the client receiver and publisher in threads. + # This is activated when the 'threader_client' configuration + # option is set. + def threaded_req(request, publish_timeout, timeout, waitfor, &block) + Log.debug("Starting threaded client") + publisher = Thread.new do + start_publisher(request, publish_timeout) + end - yield(resp.payload) + # When the client is threaded we add the publishing timeout to + # the agent timeout so that the receiver doesn't time out before + # publishing has finished in cases where publish_timeout >= timeout. + total_timeout = publish_timeout + timeout + hosts_responded = 0 + + receiver = Thread.new do + hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block) + end - break if (waitfor != 0 && hosts_responded >= waitfor) - end + receiver.join + hosts_responded + end + + # Starts the request publishing routine + def start_publisher(request, publish_timeout) + Log.debug("Starting publishing with publish timeout of #{publish_timeout}") + begin + Timeout.timeout(publish_timeout) do + Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}") + request.publish end - rescue Interrupt => e rescue Timeout::Error => e - ensure - unsubscribe(agent, :reply) + Log.warn("Could not publish all messages. Publishing timed out.") end + end + + # Starts the response receiver routine + # Expected to return the amount of received responses. + def start_receiver(requestid, waitfor, timeout, &block) + Log.debug("Starting response receiver with timeout of #{timeout}") + hosts_responded = 0 + begin + Timeout.timeout(timeout) do + begin + resp = receive(requestid) + yield resp.payload + hosts_responded += 1 + end while (waitfor == 0 || hosts_responded < waitfor) + end + rescue Timeout::Error => e + Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}") + end + + hosts_responded + end + def update_stat(stat, hosts_responded, requestid) stat[:totaltime] = Time.now.to_f - stat[:starttime] stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] stat[:responses] = hosts_responded stat[:noresponsefrom] = [] - stat[:requestid] = reqid + stat[:requestid] = requestid @stats = stat - return stat end def discovered_req(body, agent, options=false)