Updated mcollective.init according to OSCI-658
[packages/precise/mcollective.git] / lib / mcollective / client.rb
diff --git a/lib/mcollective/client.rb b/lib/mcollective/client.rb
new file mode 100644 (file)
index 0000000..d77eaa5
--- /dev/null
@@ -0,0 +1,219 @@
+module MCollective
+  # Helpers for writing clients that can talk to agents, do discovery and so forth
+  class Client
+    attr_accessor :options, :stats, :discoverer
+
+    def initialize(configfile)
+      @config = Config.instance
+      @config.loadconfig(configfile) unless @config.configured
+
+      @connection = PluginManager["connector_plugin"]
+      @security = PluginManager["security_plugin"]
+
+      @security.initiated_by = :client
+      @options = nil
+      @subscriptions = {}
+
+      @discoverer = Discovery.new(self)
+      @connection.connect
+    end
+
+    # Returns the configured main collective if no
+    # specific collective is specified as options
+    def collective
+      if @options[:collective].nil?
+        @config.main_collective
+      else
+        @options[:collective]
+      end
+    end
+
+    # Disconnects cleanly from the middleware
+    def disconnect
+      Log.debug("Disconnecting from the middleware")
+      @connection.disconnect
+    end
+
+    # Sends a request and returns the generated request id, doesn't wait for
+    # responses and doesn't execute any passed in code blocks for responses
+    def sendreq(msg, agent, filter = {})
+      if msg.is_a?(Message)
+        request = msg
+        agent = request.agent
+      else
+        ttl = @options[:ttl] || @config.ttl
+        request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
+        request.reply_to = @options[:reply_to] if @options[:reply_to]
+      end
+
+      request.encode!
+
+      Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
+
+      subscribe(agent, :reply) unless request.reply_to
+
+      request.publish
+
+      request.requestid
+    end
+
+    def subscribe(agent, type)
+      unless @subscriptions.include?(agent)
+        subscription = Util.make_subscriptions(agent, type, collective)
+        Log.debug("Subscribing to #{type} target for agent #{agent}")
+
+        Util.subscribe(subscription)
+        @subscriptions[agent] = 1
+      end
+    end
+
+    def unsubscribe(agent, type)
+      if @subscriptions.include?(agent)
+        subscription = Util.make_subscriptions(agent, type, collective)
+        Log.debug("Unsubscribing #{type} target for #{agent}")
+
+        Util.unsubscribe(subscription)
+        @subscriptions.delete(agent)
+      end
+    end
+    # Blocking call that waits for ever for a message to arrive.
+    #
+    # If you give it a requestid this means you've previously send a request
+    # with that ID and now you just want replies that matches that id, in that
+    # case the current connection will just ignore all messages not directed at it
+    # and keep waiting for more till it finds a matching message.
+    def receive(requestid = nil)
+      reply = nil
+
+      begin
+        reply = @connection.receive
+        reply.type = :reply
+        reply.expected_msgid = requestid
+
+        reply.decode!
+
+        reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
+
+        raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
+      rescue SecurityValidationFailed => e
+        Log.warn("Ignoring a message that did not pass security validations")
+        retry
+      rescue MsgDoesNotMatchRequestID => e
+        Log.debug("Ignoring a message for some other client")
+        retry
+      end
+
+      reply
+    end
+
+    # Performs a discovery of nodes matching the filter passed
+    # returns an array of nodes
+    #
+    # An integer limit can be supplied this will have the effect
+    # of the discovery being cancelled soon as it reached the
+    # requested limit of hosts
+    def discover(filter, timeout, limit=0)
+      discovered = @discoverer.discover(filter, timeout, limit)
+    end
+
+    # Send a request, performs the passed block for each response
+    #
+    # times = req("status", "mcollectived", options, client) {|resp|
+    #   pp resp
+    # }
+    #
+    # It returns a hash of times and timeouts for discovery and total run is taken from the options
+    # hash which in turn is generally built using MCollective::Optionparser
+    def req(body, agent=nil, options=false, waitfor=0)
+      if body.is_a?(Message)
+        agent = body.agent
+        waitfor = body.discovered_hosts.size || 0
+        @options = body.options
+      end
+
+      @options = options if options
+
+      stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
+
+      timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
+
+      STDOUT.sync = true
+
+      hosts_responded = 0
+      reqid = nil
+
+      begin
+        Log.debug("Publishing request to agent %s with timeout %d" % [agent, timeout])
+
+        Timeout.timeout(timeout) do
+          reqid = sendreq(body, agent, @options[:filter])
+
+          loop do
+            resp = receive(reqid)
+
+            hosts_responded += 1
+
+            yield(resp.payload)
+
+            break if (waitfor != 0 && hosts_responded >= waitfor)
+          end
+        end
+      rescue Interrupt => e
+      rescue Timeout::Error => e
+      ensure
+        unsubscribe(agent, :reply)
+      end
+
+      stat[:totaltime] = Time.now.to_f - stat[:starttime]
+      stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
+      stat[:responses] = hosts_responded
+      stat[:noresponsefrom] = []
+      stat[:requestid] = reqid
+
+      @stats = stat
+      return stat
+    end
+
+    def discovered_req(body, agent, options=false)
+      raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
+    end
+
+    # Prints out the stats returns from req and discovered_req in a nice way
+    def display_stats(stats, options=false, caption="stomp call summary")
+      options = @options unless options
+
+      if options[:verbose]
+        puts("\n---- #{caption} ----")
+
+        if stats[:discovered]
+          puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
+        else
+          puts("           Nodes: #{stats[:responses]}")
+        end
+
+        printf("      Start Time: %s\n", Time.at(stats[:starttime]))
+        printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
+        printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
+        printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
+
+      else
+        if stats[:discovered]
+          printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
+        else
+          printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
+        end
+      end
+
+      if stats[:noresponsefrom].size > 0
+        puts("\nNo response from:\n")
+
+        stats[:noresponsefrom].each do |c|
+          puts if c % 4 == 1
+          printf("%30s", c)
+        end
+
+        puts
+      end
+    end
+  end
+end