Update version according to OSCI-856
[packages/precise/mcollective.git] / lib / mcollective / client.rb
index d77eaa5c90fdef7b5f3789705985d7d0d5439c3c..9c8fd83171d40968400609ad75c3867c9687c338 100644 (file)
@@ -37,6 +37,15 @@ module MCollective
     # Sends a request and returns the generated request id, doesn't wait for
     # responses and doesn't execute any passed in code blocks for responses
     def sendreq(msg, agent, filter = {})
+      request = createreq(msg, agent, filter)
+
+      Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
+
+      request.publish
+      request.requestid
+    end
+
+    def createreq(msg, agent, filter ={})
       if msg.is_a?(Message)
         request = msg
         agent = request.agent
@@ -47,14 +56,8 @@ module MCollective
       end
 
       request.encode!
-
-      Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
-
       subscribe(agent, :reply) unless request.reply_to
-
-      request.publish
-
-      request.requestid
+      request
     end
 
     def subscribe(agent, type)
@@ -91,15 +94,14 @@ module MCollective
         reply.expected_msgid = requestid
 
         reply.decode!
-
-        reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
-
-        raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
+        unless reply.requestid == requestid
+          raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
+        end
       rescue SecurityValidationFailed => e
         Log.warn("Ignoring a message that did not pass security validations")
         retry
       rescue MsgDoesNotMatchRequestID => e
-        Log.debug("Ignoring a message for some other client")
+        Log.debug("Ignoring a message for some other client : #{e.message}")
         retry
       end
 
@@ -124,7 +126,7 @@ module MCollective
     #
     # It returns a hash of times and timeouts for discovery and total run is taken from the options
     # hash which in turn is generally built using MCollective::Optionparser
-    def req(body, agent=nil, options=false, waitfor=0)
+    def req(body, agent=nil, options=false, waitfor=0, &block)
       if body.is_a?(Message)
         agent = body.agent
         waitfor = body.discovered_hosts.size || 0
@@ -132,46 +134,100 @@ module MCollective
       end
 
       @options = options if options
-
-      stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
-
+      threaded = @options[:threaded]
       timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
-
+      request = createreq(body, agent, @options[:filter])
+      publish_timeout = @options[:publish_timeout]
+      stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
       STDOUT.sync = true
-
       hosts_responded = 0
-      reqid = nil
+
 
       begin
-        Log.debug("Publishing request to agent %s with timeout %d" % [agent, timeout])
+        if threaded
+          hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
+        else
+          hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
+        end
+      rescue Interrupt => e
+      ensure
+        unsubscribe(agent, :reply)
+      end
 
-        Timeout.timeout(timeout) do
-          reqid = sendreq(body, agent, @options[:filter])
+      return update_stat(stat, hosts_responded, request.requestid)
+    end
 
-          loop do
-            resp = receive(reqid)
+    # Starts the client receiver and publisher unthreaded.
+    # This is the default client behaviour.
+    def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
+      start_publisher(request, publish_timeout)
+      start_receiver(request.requestid, waitfor, timeout, &block)
+    end
 
-            hosts_responded += 1
+    # Starts the client receiver and publisher in threads.
+    # This is activated when the 'threader_client' configuration
+    # option is set.
+    def threaded_req(request, publish_timeout, timeout, waitfor, &block)
+      Log.debug("Starting threaded client")
+      publisher = Thread.new do
+        start_publisher(request, publish_timeout)
+      end
 
-            yield(resp.payload)
+      # When the client is threaded we add the publishing timeout to
+      # the agent timeout so that the receiver doesn't time out before
+      # publishing has finished in cases where publish_timeout >= timeout.
+      total_timeout = publish_timeout + timeout
+      hosts_responded = 0
+
+      receiver = Thread.new do
+        hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
+      end
 
-            break if (waitfor != 0 && hosts_responded >= waitfor)
-          end
+      receiver.join
+      hosts_responded
+    end
+
+    # Starts the request publishing routine
+    def start_publisher(request, publish_timeout)
+      Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
+      begin
+        Timeout.timeout(publish_timeout) do
+          Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
+          request.publish
         end
-      rescue Interrupt => e
       rescue Timeout::Error => e
-      ensure
-        unsubscribe(agent, :reply)
+        Log.warn("Could not publish all messages. Publishing timed out.")
       end
+    end
+
+    # Starts the response receiver routine
+    # Expected to return the amount of received responses.
+    def start_receiver(requestid, waitfor, timeout, &block)
+      Log.debug("Starting response receiver with timeout of #{timeout}")
+      hosts_responded = 0
+      begin
+        Timeout.timeout(timeout) do
+          begin
+            resp = receive(requestid)
+            yield resp.payload
+            hosts_responded += 1
+          end while (waitfor == 0 || hosts_responded < waitfor)
+        end
+      rescue Timeout::Error => e
+        Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
+      end
+
+      hosts_responded
+    end
 
+    def update_stat(stat, hosts_responded, requestid)
       stat[:totaltime] = Time.now.to_f - stat[:starttime]
       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
       stat[:responses] = hosts_responded
       stat[:noresponsefrom] = []
-      stat[:requestid] = reqid
+      stat[:requestid] = requestid
 
       @stats = stat
-      return stat
     end
 
     def discovered_req(body, agent, options=false)