2 # Helpers for writing clients that can talk to agents, do discovery and so forth
4 attr_accessor :options, :stats, :discoverer
6 def initialize(configfile)
7 @config = Config.instance
8 @config.loadconfig(configfile) unless @config.configured
10 @connection = PluginManager["connector_plugin"]
11 @security = PluginManager["security_plugin"]
13 @security.initiated_by = :client
17 @discoverer = Discovery.new(self)
21 # Returns the configured main collective if no
22 # specific collective is specified as options
24 if @options[:collective].nil?
25 @config.main_collective
31 # Disconnects cleanly from the middleware
33 Log.debug("Disconnecting from the middleware")
34 @connection.disconnect
37 # Sends a request and returns the generated request id, doesn't wait for
38 # responses and doesn't execute any passed in code blocks for responses
39 def sendreq(msg, agent, filter = {})
40 request = createreq(msg, agent, filter)
42 Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
48 def createreq(msg, agent, filter ={})
53 ttl = @options[:ttl] || @config.ttl
54 request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
55 request.reply_to = @options[:reply_to] if @options[:reply_to]
59 subscribe(agent, :reply) unless request.reply_to
63 def subscribe(agent, type)
64 unless @subscriptions.include?(agent)
65 subscription = Util.make_subscriptions(agent, type, collective)
66 Log.debug("Subscribing to #{type} target for agent #{agent}")
68 Util.subscribe(subscription)
69 @subscriptions[agent] = 1
73 def unsubscribe(agent, type)
74 if @subscriptions.include?(agent)
75 subscription = Util.make_subscriptions(agent, type, collective)
76 Log.debug("Unsubscribing #{type} target for #{agent}")
78 Util.unsubscribe(subscription)
79 @subscriptions.delete(agent)
82 # Blocking call that waits for ever for a message to arrive.
84 # If you give it a requestid this means you've previously send a request
85 # with that ID and now you just want replies that matches that id, in that
86 # case the current connection will just ignore all messages not directed at it
87 # and keep waiting for more till it finds a matching message.
88 def receive(requestid = nil)
92 reply = @connection.receive
94 reply.expected_msgid = requestid
97 unless reply.requestid == requestid
98 raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
100 rescue SecurityValidationFailed => e
101 Log.warn("Ignoring a message that did not pass security validations")
103 rescue MsgDoesNotMatchRequestID => e
104 Log.debug("Ignoring a message for some other client : #{e.message}")
111 # Performs a discovery of nodes matching the filter passed
112 # returns an array of nodes
114 # An integer limit can be supplied this will have the effect
115 # of the discovery being cancelled soon as it reached the
116 # requested limit of hosts
117 def discover(filter, timeout, limit=0)
118 discovered = @discoverer.discover(filter, timeout, limit)
121 # Send a request, performs the passed block for each response
123 # times = req("status", "mcollectived", options, client) {|resp|
127 # It returns a hash of times and timeouts for discovery and total run is taken from the options
128 # hash which in turn is generally built using MCollective::Optionparser
129 def req(body, agent=nil, options=false, waitfor=0, &block)
130 if body.is_a?(Message)
132 waitfor = body.discovered_hosts.size || 0
133 @options = body.options
136 @options = options if options
137 threaded = @options[:threaded]
138 timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
139 request = createreq(body, agent, @options[:filter])
140 publish_timeout = @options[:publish_timeout]
141 stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
148 hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
150 hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
152 rescue Interrupt => e
154 unsubscribe(agent, :reply)
157 return update_stat(stat, hosts_responded, request.requestid)
160 # Starts the client receiver and publisher unthreaded.
161 # This is the default client behaviour.
162 def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
163 start_publisher(request, publish_timeout)
164 start_receiver(request.requestid, waitfor, timeout, &block)
167 # Starts the client receiver and publisher in threads.
168 # This is activated when the 'threader_client' configuration
170 def threaded_req(request, publish_timeout, timeout, waitfor, &block)
171 Log.debug("Starting threaded client")
172 publisher = Thread.new do
173 start_publisher(request, publish_timeout)
176 # When the client is threaded we add the publishing timeout to
177 # the agent timeout so that the receiver doesn't time out before
178 # publishing has finished in cases where publish_timeout >= timeout.
179 total_timeout = publish_timeout + timeout
182 receiver = Thread.new do
183 hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
190 # Starts the request publishing routine
191 def start_publisher(request, publish_timeout)
192 Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
194 Timeout.timeout(publish_timeout) do
195 Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
198 rescue Timeout::Error => e
199 Log.warn("Could not publish all messages. Publishing timed out.")
203 # Starts the response receiver routine
204 # Expected to return the amount of received responses.
205 def start_receiver(requestid, waitfor, timeout, &block)
206 Log.debug("Starting response receiver with timeout of #{timeout}")
209 Timeout.timeout(timeout) do
211 resp = receive(requestid)
214 end while (waitfor == 0 || hosts_responded < waitfor)
216 rescue Timeout::Error => e
217 Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
223 def update_stat(stat, hosts_responded, requestid)
224 stat[:totaltime] = Time.now.to_f - stat[:starttime]
225 stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
226 stat[:responses] = hosts_responded
227 stat[:noresponsefrom] = []
228 stat[:requestid] = requestid
233 def discovered_req(body, agent, options=false)
234 raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
237 # Prints out the stats returns from req and discovered_req in a nice way
238 def display_stats(stats, options=false, caption="stomp call summary")
239 options = @options unless options
242 puts("\n---- #{caption} ----")
244 if stats[:discovered]
245 puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}")
247 puts(" Nodes: #{stats[:responses]}")
250 printf(" Start Time: %s\n", Time.at(stats[:starttime]))
251 printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
252 printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000)
253 printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000)
256 if stats[:discovered]
257 printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
259 printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
263 if stats[:noresponsefrom].size > 0
264 puts("\nNo response from:\n")
266 stats[:noresponsefrom].each do |c|