+module MCollective
+ # Helpers for writing clients that can talk to agents, do discovery and so forth
+ class Client
+ attr_accessor :options, :stats, :discoverer
+
+ def initialize(configfile)
+ @config = Config.instance
+ @config.loadconfig(configfile) unless @config.configured
+
+ @connection = PluginManager["connector_plugin"]
+ @security = PluginManager["security_plugin"]
+
+ @security.initiated_by = :client
+ @options = nil
+ @subscriptions = {}
+
+ @discoverer = Discovery.new(self)
+ @connection.connect
+ end
+
+ # Returns the configured main collective if no
+ # specific collective is specified as options
+ def collective
+ if @options[:collective].nil?
+ @config.main_collective
+ else
+ @options[:collective]
+ end
+ end
+
+ # Disconnects cleanly from the middleware
+ def disconnect
+ Log.debug("Disconnecting from the middleware")
+ @connection.disconnect
+ end
+
+ # Sends a request and returns the generated request id, doesn't wait for
+ # responses and doesn't execute any passed in code blocks for responses
+ def sendreq(msg, agent, filter = {})
+ if msg.is_a?(Message)
+ request = msg
+ agent = request.agent
+ else
+ ttl = @options[:ttl] || @config.ttl
+ request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
+ request.reply_to = @options[:reply_to] if @options[:reply_to]
+ end
+
+ request.encode!
+
+ Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
+
+ subscribe(agent, :reply) unless request.reply_to
+
+ request.publish
+
+ request.requestid
+ end
+
+ def subscribe(agent, type)
+ unless @subscriptions.include?(agent)
+ subscription = Util.make_subscriptions(agent, type, collective)
+ Log.debug("Subscribing to #{type} target for agent #{agent}")
+
+ Util.subscribe(subscription)
+ @subscriptions[agent] = 1
+ end
+ end
+
+ def unsubscribe(agent, type)
+ if @subscriptions.include?(agent)
+ subscription = Util.make_subscriptions(agent, type, collective)
+ Log.debug("Unsubscribing #{type} target for #{agent}")
+
+ Util.unsubscribe(subscription)
+ @subscriptions.delete(agent)
+ end
+ end
+ # Blocking call that waits for ever for a message to arrive.
+ #
+ # If you give it a requestid this means you've previously send a request
+ # with that ID and now you just want replies that matches that id, in that
+ # case the current connection will just ignore all messages not directed at it
+ # and keep waiting for more till it finds a matching message.
+ def receive(requestid = nil)
+ reply = nil
+
+ begin
+ reply = @connection.receive
+ reply.type = :reply
+ reply.expected_msgid = requestid
+
+ reply.decode!
+
+ reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
+
+ raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
+ rescue SecurityValidationFailed => e
+ Log.warn("Ignoring a message that did not pass security validations")
+ retry
+ rescue MsgDoesNotMatchRequestID => e
+ Log.debug("Ignoring a message for some other client")
+ retry
+ end
+
+ reply
+ end
+
+ # Performs a discovery of nodes matching the filter passed
+ # returns an array of nodes
+ #
+ # An integer limit can be supplied this will have the effect
+ # of the discovery being cancelled soon as it reached the
+ # requested limit of hosts
+ def discover(filter, timeout, limit=0)
+ discovered = @discoverer.discover(filter, timeout, limit)
+ end
+
+ # Send a request, performs the passed block for each response
+ #
+ # times = req("status", "mcollectived", options, client) {|resp|
+ # pp resp
+ # }
+ #
+ # It returns a hash of times and timeouts for discovery and total run is taken from the options
+ # hash which in turn is generally built using MCollective::Optionparser
+ def req(body, agent=nil, options=false, waitfor=0)
+ if body.is_a?(Message)
+ agent = body.agent
+ waitfor = body.discovered_hosts.size || 0
+ @options = body.options
+ end
+
+ @options = options if options
+
+ stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
+
+ timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
+
+ STDOUT.sync = true
+
+ hosts_responded = 0
+ reqid = nil
+
+ begin
+ Log.debug("Publishing request to agent %s with timeout %d" % [agent, timeout])
+
+ Timeout.timeout(timeout) do
+ reqid = sendreq(body, agent, @options[:filter])
+
+ loop do
+ resp = receive(reqid)
+
+ hosts_responded += 1
+
+ yield(resp.payload)
+
+ break if (waitfor != 0 && hosts_responded >= waitfor)
+ end
+ end
+ rescue Interrupt => e
+ rescue Timeout::Error => e
+ ensure
+ unsubscribe(agent, :reply)
+ end
+
+ stat[:totaltime] = Time.now.to_f - stat[:starttime]
+ stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
+ stat[:responses] = hosts_responded
+ stat[:noresponsefrom] = []
+ stat[:requestid] = reqid
+
+ @stats = stat
+ return stat
+ end
+
+ def discovered_req(body, agent, options=false)
+ raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
+ end
+
+ # Prints out the stats returns from req and discovered_req in a nice way
+ def display_stats(stats, options=false, caption="stomp call summary")
+ options = @options unless options
+
+ if options[:verbose]
+ puts("\n---- #{caption} ----")
+
+ if stats[:discovered]
+ puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}")
+ else
+ puts(" Nodes: #{stats[:responses]}")
+ end
+
+ printf(" Start Time: %s\n", Time.at(stats[:starttime]))
+ printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
+ printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000)
+ printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000)
+
+ else
+ if stats[:discovered]
+ printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
+ else
+ printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
+ end
+ end
+
+ if stats[:noresponsefrom].size > 0
+ puts("\nNo response from:\n")
+
+ stats[:noresponsefrom].each do |c|
+ puts if c % 4 == 1
+ printf("%30s", c)
+ end
+
+ puts
+ end
+ end
+ end
+end