2 # Helpers for writing clients that can talk to agents, do discovery and so forth
4 attr_accessor :options, :stats, :discoverer
6 def initialize(configfile)
7 @config = Config.instance
8 @config.loadconfig(configfile) unless @config.configured
10 @connection = PluginManager["connector_plugin"]
11 @security = PluginManager["security_plugin"]
13 @security.initiated_by = :client
17 @discoverer = Discovery.new(self)
21 # Returns the configured main collective if no
22 # specific collective is specified as options
24 if @options[:collective].nil?
25 @config.main_collective
31 # Disconnects cleanly from the middleware
33 Log.debug("Disconnecting from the middleware")
34 @connection.disconnect
37 # Sends a request and returns the generated request id, doesn't wait for
38 # responses and doesn't execute any passed in code blocks for responses
39 def sendreq(msg, agent, filter = {})
44 ttl = @options[:ttl] || @config.ttl
45 request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
46 request.reply_to = @options[:reply_to] if @options[:reply_to]
51 Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
53 subscribe(agent, :reply) unless request.reply_to
60 def subscribe(agent, type)
61 unless @subscriptions.include?(agent)
62 subscription = Util.make_subscriptions(agent, type, collective)
63 Log.debug("Subscribing to #{type} target for agent #{agent}")
65 Util.subscribe(subscription)
66 @subscriptions[agent] = 1
70 def unsubscribe(agent, type)
71 if @subscriptions.include?(agent)
72 subscription = Util.make_subscriptions(agent, type, collective)
73 Log.debug("Unsubscribing #{type} target for #{agent}")
75 Util.unsubscribe(subscription)
76 @subscriptions.delete(agent)
79 # Blocking call that waits for ever for a message to arrive.
81 # If you give it a requestid this means you've previously send a request
82 # with that ID and now you just want replies that matches that id, in that
83 # case the current connection will just ignore all messages not directed at it
84 # and keep waiting for more till it finds a matching message.
85 def receive(requestid = nil)
89 reply = @connection.receive
91 reply.expected_msgid = requestid
95 reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
97 raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
98 rescue SecurityValidationFailed => e
99 Log.warn("Ignoring a message that did not pass security validations")
101 rescue MsgDoesNotMatchRequestID => e
102 Log.debug("Ignoring a message for some other client")
109 # Performs a discovery of nodes matching the filter passed
110 # returns an array of nodes
112 # An integer limit can be supplied this will have the effect
113 # of the discovery being cancelled soon as it reached the
114 # requested limit of hosts
115 def discover(filter, timeout, limit=0)
116 discovered = @discoverer.discover(filter, timeout, limit)
119 # Send a request, performs the passed block for each response
121 # times = req("status", "mcollectived", options, client) {|resp|
125 # It returns a hash of times and timeouts for discovery and total run is taken from the options
126 # hash which in turn is generally built using MCollective::Optionparser
127 def req(body, agent=nil, options=false, waitfor=0)
128 if body.is_a?(Message)
130 waitfor = body.discovered_hosts.size || 0
131 @options = body.options
134 @options = options if options
136 stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
138 timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
146 Log.debug("Publishing request to agent %s with timeout %d" % [agent, timeout])
148 Timeout.timeout(timeout) do
149 reqid = sendreq(body, agent, @options[:filter])
152 resp = receive(reqid)
158 break if (waitfor != 0 && hosts_responded >= waitfor)
161 rescue Interrupt => e
162 rescue Timeout::Error => e
164 unsubscribe(agent, :reply)
167 stat[:totaltime] = Time.now.to_f - stat[:starttime]
168 stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
169 stat[:responses] = hosts_responded
170 stat[:noresponsefrom] = []
171 stat[:requestid] = reqid
177 def discovered_req(body, agent, options=false)
178 raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
181 # Prints out the stats returns from req and discovered_req in a nice way
182 def display_stats(stats, options=false, caption="stomp call summary")
183 options = @options unless options
186 puts("\n---- #{caption} ----")
188 if stats[:discovered]
189 puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}")
191 puts(" Nodes: #{stats[:responses]}")
194 printf(" Start Time: %s\n", Time.at(stats[:starttime]))
195 printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
196 printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000)
197 printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000)
200 if stats[:discovered]
201 printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
203 printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
207 if stats[:noresponsefrom].size > 0
208 puts("\nNo response from:\n")
210 stats[:noresponsefrom].each do |c|