X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=lib%2Fmcollective%2Fclient.rb;fp=lib%2Fmcollective%2Fclient.rb;h=d77eaa5c90fdef7b5f3789705985d7d0d5439c3c;hb=b87d2f4e68281062df1913440ca5753ae63314a9;hp=0000000000000000000000000000000000000000;hpb=ab0ea530b8ac956091f17b104ab2311336cfc250;p=packages%2Fprecise%2Fmcollective.git diff --git a/lib/mcollective/client.rb b/lib/mcollective/client.rb new file mode 100644 index 0000000..d77eaa5 --- /dev/null +++ b/lib/mcollective/client.rb @@ -0,0 +1,219 @@ +module MCollective + # Helpers for writing clients that can talk to agents, do discovery and so forth + class Client + attr_accessor :options, :stats, :discoverer + + def initialize(configfile) + @config = Config.instance + @config.loadconfig(configfile) unless @config.configured + + @connection = PluginManager["connector_plugin"] + @security = PluginManager["security_plugin"] + + @security.initiated_by = :client + @options = nil + @subscriptions = {} + + @discoverer = Discovery.new(self) + @connection.connect + end + + # Returns the configured main collective if no + # specific collective is specified as options + def collective + if @options[:collective].nil? + @config.main_collective + else + @options[:collective] + end + end + + # Disconnects cleanly from the middleware + def disconnect + Log.debug("Disconnecting from the middleware") + @connection.disconnect + end + + # Sends a request and returns the generated request id, doesn't wait for + # responses and doesn't execute any passed in code blocks for responses + def sendreq(msg, agent, filter = {}) + if msg.is_a?(Message) + request = msg + agent = request.agent + else + ttl = @options[:ttl] || @config.ttl + request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl}) + request.reply_to = @options[:reply_to] if @options[:reply_to] + end + + request.encode! + + Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}") + + subscribe(agent, :reply) unless request.reply_to + + request.publish + + request.requestid + end + + def subscribe(agent, type) + unless @subscriptions.include?(agent) + subscription = Util.make_subscriptions(agent, type, collective) + Log.debug("Subscribing to #{type} target for agent #{agent}") + + Util.subscribe(subscription) + @subscriptions[agent] = 1 + end + end + + def unsubscribe(agent, type) + if @subscriptions.include?(agent) + subscription = Util.make_subscriptions(agent, type, collective) + Log.debug("Unsubscribing #{type} target for #{agent}") + + Util.unsubscribe(subscription) + @subscriptions.delete(agent) + end + end + # Blocking call that waits for ever for a message to arrive. + # + # If you give it a requestid this means you've previously send a request + # with that ID and now you just want replies that matches that id, in that + # case the current connection will just ignore all messages not directed at it + # and keep waiting for more till it finds a matching message. + def receive(requestid = nil) + reply = nil + + begin + reply = @connection.receive + reply.type = :reply + reply.expected_msgid = requestid + + reply.decode! + + reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON") + + raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid + rescue SecurityValidationFailed => e + Log.warn("Ignoring a message that did not pass security validations") + retry + rescue MsgDoesNotMatchRequestID => e + Log.debug("Ignoring a message for some other client") + retry + end + + reply + end + + # Performs a discovery of nodes matching the filter passed + # returns an array of nodes + # + # An integer limit can be supplied this will have the effect + # of the discovery being cancelled soon as it reached the + # requested limit of hosts + def discover(filter, timeout, limit=0) + discovered = @discoverer.discover(filter, timeout, limit) + end + + # Send a request, performs the passed block for each response + # + # times = req("status", "mcollectived", options, client) {|resp| + # pp resp + # } + # + # It returns a hash of times and timeouts for discovery and total run is taken from the options + # hash which in turn is generally built using MCollective::Optionparser + def req(body, agent=nil, options=false, waitfor=0) + if body.is_a?(Message) + agent = body.agent + waitfor = body.discovered_hosts.size || 0 + @options = body.options + end + + @options = options if options + + stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} + + timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter]) + + STDOUT.sync = true + + hosts_responded = 0 + reqid = nil + + begin + Log.debug("Publishing request to agent %s with timeout %d" % [agent, timeout]) + + Timeout.timeout(timeout) do + reqid = sendreq(body, agent, @options[:filter]) + + loop do + resp = receive(reqid) + + hosts_responded += 1 + + yield(resp.payload) + + break if (waitfor != 0 && hosts_responded >= waitfor) + end + end + rescue Interrupt => e + rescue Timeout::Error => e + ensure + unsubscribe(agent, :reply) + end + + stat[:totaltime] = Time.now.to_f - stat[:starttime] + stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] + stat[:responses] = hosts_responded + stat[:noresponsefrom] = [] + stat[:requestid] = reqid + + @stats = stat + return stat + end + + def discovered_req(body, agent, options=false) + raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework" + end + + # Prints out the stats returns from req and discovered_req in a nice way + def display_stats(stats, options=false, caption="stomp call summary") + options = @options unless options + + if options[:verbose] + puts("\n---- #{caption} ----") + + if stats[:discovered] + puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}") + else + puts(" Nodes: #{stats[:responses]}") + end + + printf(" Start Time: %s\n", Time.at(stats[:starttime])) + printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000) + printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000) + printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000) + + else + if stats[:discovered] + printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000) + else + printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000) + end + end + + if stats[:noresponsefrom].size > 0 + puts("\nNo response from:\n") + + stats[:noresponsefrom].each do |c| + puts if c % 4 == 1 + printf("%30s", c) + end + + puts + end + end + end +end