3 # The main component of the Simple RPC client system, this wraps around MCollective::Client
4 # and just brings in a lot of convention and standard approached.
6 attr_accessor :timeout, :verbose, :filter, :config, :progress, :ttl, :reply_to
7 attr_reader :client, :stats, :ddl, :agent, :limit_targets, :limit_method, :output_format, :batch_size, :batch_sleep_time, :batch_mode
8 attr_reader :discovery_options, :discovery_method, :default_discovery_method, :limit_seed
10 @@initial_options = nil
12 # Creates a stub for a remote agent, you can pass in an options array in the flags
13 # which will then be used else it will just create a default options array with
14 # filtering enabled based on the standard command line use.
16 # rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
18 # You typically would not call this directly you'd use MCollective::RPC#rpcclient instead
19 # which is a wrapper around this that can be used as a Mixin
20 def initialize(agent, flags = {})
21 if flags.include?(:options)
22 initial_options = flags[:options]
24 elsif @@initial_options
25 initial_options = Marshal.load(@@initial_options)
28 oparser = MCollective::Optionparser.new({:verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1}, "filter")
30 initial_options = oparser.parse do |parser, opts|
35 Helpers.add_simplerpc_options(parser, opts)
38 @@initial_options = Marshal.dump(initial_options)
41 @initial_options = initial_options
43 @config = initial_options[:config]
44 @client = MCollective::Client.new(@config)
45 @client.options = initial_options
49 @timeout = initial_options[:timeout] || 5
50 @verbose = initial_options[:verbose]
51 @filter = initial_options[:filter] || Util.empty_filter
52 @discovered_agents = nil
53 @progress = initial_options[:progress_bar]
54 @limit_targets = initial_options[:mcollective_limit_targets]
55 @limit_method = Config.instance.rpclimitmethod
56 @limit_seed = initial_options[:limit_seed] || nil
57 @output_format = initial_options[:output_format] || :console
58 @force_direct_request = false
59 @reply_to = initial_options[:reply_to]
60 @discovery_method = initial_options[:discovery_method]
62 @discovery_method = Config.instance.default_discovery_method
63 @default_discovery_method = true
65 @default_discovery_method = false
67 @discovery_options = initial_options[:discovery_options] || []
68 @force_display_mode = initial_options[:force_display_mode] || false
70 @batch_size = Integer(initial_options[:batch_size] || 0)
71 @batch_sleep_time = Float(initial_options[:batch_sleep_time] || 1)
72 @batch_mode = @batch_size > 0
76 @discovery_timeout = @initial_options.fetch(:disctimeout, nil)
78 @collective = @client.collective
79 @ttl = initial_options[:ttl] || Config.instance.ttl
80 @publish_timeout = initial_options[:publish_timeout] || Config.instance.publish_timeout
81 @threaded = initial_options[:threaded] || Config.instance.threaded
83 # if we can find a DDL for the service override
84 # the timeout of the client so we always magically
85 # wait appropriate amounts of time.
87 # We add the discovery timeout to the ddl supplied
88 # timeout as the discovery timeout tends to be tuned
89 # for local network conditions and fact source speed
90 # which would other wise not be accounted for and
91 # some results might get missed.
93 # We do this only if the timeout is the default 5
94 # seconds, so that users cli overrides will still
97 # DDLs are required, failure to find a DDL is fatal
100 @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5
102 # allows stderr and stdout to be overridden for testing
103 # but also for web apps that might not want a bunch of stuff
104 # generated to actual file handles
105 if initial_options[:stderr]
106 @stderr = initial_options[:stderr]
112 if initial_options[:stdout]
113 @stdout = initial_options[:stdout]
120 # Disconnects cleanly from the middleware
125 # Returns help for an agent if a DDL was found
130 # Creates a suitable request hash for the SimpleRPC agent.
132 # You'd use this if you ever wanted to take care of sending
133 # requests on your own - perhaps via Client#sendreq if you
134 # didn't care for responses.
136 # In that case you can just do:
138 # msg = your_rpc.new_request("some_action", :foo => :bar)
139 # filter = your_rpc.filter
141 # your_rpc.client.sendreq(msg, msg[:agent], filter)
143 # This will send a SimpleRPC request to the action some_action
144 # with arguments :foo = :bar, it will return immediately and
145 # you will have no indication at all if the request was receieved or not
147 # Clearly the use of this technique should be limited and done only
148 # if your code requires such a thing
149 def new_request(action, data)
150 callerid = PluginManager["security_plugin"].callerid
152 raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)
160 # For the provided arguments and action the input arguments get
161 # modified by supplying any defaults provided in the DDL for arguments
162 # that were not supplied in the request
164 # We then pass the modified arguments to the DDL for validation
165 def validate_request(action, args)
166 raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl
168 @ddl.set_default_input_arguments(action, args)
169 @ddl.validate_rpc_request(action, args)
172 # Magic handler to invoke remote methods
174 # Once the stub is created using the constructor or the RPC#rpcclient helper you can
175 # call remote actions easily:
177 # ret = rpc.echo(:msg => "hello world")
179 # This will call the 'echo' action of the 'rpctest' agent and return the result as an array,
180 # the array will be a simplified result set from the usual full MCollective::Client#req with
181 # additional error codes and error text:
184 # :sender => "remote.box.com",
186 # :statusmsg => "OK",
187 # :data => "hello world"
190 # If :statuscode is 0 then everything went find, if it's 1 then you supplied the correct arguments etc
191 # but the request could not be completed, you'll find a human parsable reason in :statusmsg then.
193 # Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError
194 # see below for a description of those, in each case :statusmsg would be the reason for failure.
196 # To get access to the full result of the MCollective::Client#req calls you can pass in a block:
198 # rpc.echo(:msg => "hello world") do |resp|
202 # In this case resp will the result from MCollective::Client#req. Instead of returning simple
203 # text and codes as above you'll also need to handle the following exceptions:
205 # UnknownRPCAction - There is no matching action on the agent
206 # MissingRPCData - You did not supply all the needed parameters for the action
207 # InvalidRPCData - The data you did supply did not pass validation
208 # UnknownRPCError - Some other error prevented the agent from running
210 # During calls a progress indicator will be shown of how many results we've received against
211 # how many nodes were discovered, you can disable this by setting progress to false:
213 # rpc.progress = false
215 # This supports a 2nd mode where it will send the SimpleRPC request and never handle the
216 # responses. It's a bit like UDP, it sends the request with the filter attached and you
217 # only get back the requestid, you have no indication about results.
219 # You can invoke this using:
221 # puts rpc.echo(:process_results => false)
223 # This will output just the request id.
225 # Batched processing is supported:
227 # printrpc rpc.ping(:batch_size => 5)
229 # This will do everything exactly as normal but communicate to only 5
231 def method_missing(method_name, *args, &block)
232 # set args to an empty hash if nothings given
234 args = {} if args.nil?
236 action = method_name.to_s
240 validate_request(action, args)
242 # if a global batch size is set just use that else set it
243 # in the case that it was passed as an argument
244 batch_mode = args.include?(:batch_size) || @batch_mode
245 batch_size = args.delete(:batch_size) || @batch_size
246 batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time
248 # if we were given a batch_size argument thats 0 and batch_mode was
249 # determined to be on via global options etc this will allow a batch_size
250 # of 0 to disable or batch_mode for this call only
251 batch_mode = (batch_mode && Integer(batch_size) > 0)
253 # Handle single target requests by doing discovery and picking
254 # a random node. Then do a custom request specifying a filter
255 # that will only match the one node.
257 target_nodes = pick_nodes_from_discovered(@limit_targets)
258 Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")
260 custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
262 call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
264 call_agent(action, args, options, :auto, &block)
268 # Constructs custom requests with custom filters and discovery data
269 # the idea is that this would be used in web applications where you
270 # might be using a cached copy of data provided by a registration agent
271 # to figure out on your own what nodes will be responding and what your
274 # This will help you essentially short circuit the traditional cycle of:
276 # mc discover / call / wait for discovered nodes
278 # by doing discovery however you like, contructing a filter and a list of
279 # nodes you expect responses from.
281 # Other than that it will work exactly like a normal call, blocks will behave
282 # the same way, stats will be handled the same way etcetc
284 # If you just wanted to contact one machine for example with a client that
285 # already has other filter options setup you can do:
287 # puppet.custom_request("runonce", {}, ["your.box.com"], {:identity => "your.box.com"})
289 # This will do runonce action on just 'your.box.com', no discovery will be
290 # done and after receiving just one response it will stop waiting for responses
292 # If direct_addressing is enabled in the config file you can provide an empty
293 # hash as a filter, this will force that request to be a directly addressed
294 # request which technically does not need filters. If you try to use this
295 # mode with direct addressing disabled an exception will be raise
296 def custom_request(action, args, expected_agents, filter = {}, &block)
297 validate_request(action, args)
299 if filter == {} && !Config.instance.direct_addressing
300 raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
305 custom_filter = Util.empty_filter
306 custom_options = options.clone
308 # merge the supplied filter with the standard empty one
309 # we could just use the merge method but I want to be sure
310 # we dont merge in stuff that isnt actually valid
311 ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
312 if filter.include?(ftype)
313 custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
317 # ensure that all filters at least restrict the call to the agent we're a proxy for
318 custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
319 custom_options[:filter] = custom_filter
321 # Fake out the stats discovery would have put there
322 @stats.discovered_agents([expected_agents].flatten)
324 # Handle fire and forget requests
326 # If a specific reply-to was set then from the client perspective this should
327 # be a fire and forget request too since no response will ever reach us - it
328 # will go to the reply-to destination
329 if args[:process_results] == false || @reply_to
330 return fire_and_forget_request(action, args, custom_filter)
333 # Now do a call pretty much exactly like in method_missing except with our own
334 # options and discovery magic
336 call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
340 call_agent(action, args, custom_options, [expected_agents].flatten)
344 def discovery_timeout
345 return @discovery_timeout if @discovery_timeout
346 return @client.discoverer.ddl.meta[:timeout]
349 def discovery_timeout=(timeout)
350 @discovery_timeout = Float(timeout)
352 # we calculate the overall timeout from the DDL of the agent and
353 # the supplied discovery timeout unless someone specifically
354 # specifies a timeout to the constructor
356 # But if we also then specifically set a discovery_timeout on the
357 # agent that has to override the supplied timeout so we then
358 # calculate a correct timeout based on DDL timeout and the
359 # supplied discovery timeout
360 @timeout = @ddl.meta[:timeout] + discovery_timeout
363 # Sets the discovery method. If we change the method there are a
364 # number of steps to take:
366 # - set the new method
367 # - if discovery options were provided, re-set those to initially
368 # provided ones else clear them as they might now apply to a
370 # - update the client options so it knows there is a new discovery
372 # - reset discovery data forcing a discover on the next request
374 # The remaining item is the discovery timeout, we leave that as is
375 # since that is the user supplied timeout either via initial options
376 # or via specifically setting it on the client.
377 def discovery_method=(method)
378 @default_discovery_method = false
379 @discovery_method = method
381 if @initial_options[:discovery_options]
382 @discovery_options = @initial_options[:discovery_options]
384 @discovery_options.clear
387 @client.options = options
392 def discovery_options=(options)
393 @discovery_options = [options].flatten
397 # Sets the class filter
398 def class_filter(klass)
399 @filter["cf_class"] = @filter["cf_class"] | [klass]
400 @filter["cf_class"].compact!
404 # Sets the fact filter
405 def fact_filter(fact, value=nil, operator="=")
407 return if fact == false
410 parsed = Util.parse_fact_string(fact)
411 @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
413 parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
414 @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
417 @filter["fact"].compact!
421 # Sets the agent filter
422 def agent_filter(agent)
423 @filter["agent"] = @filter["agent"] | [agent]
424 @filter["agent"].compact!
428 # Sets the identity filter
429 def identity_filter(identity)
430 @filter["identity"] = @filter["identity"] | [identity]
431 @filter["identity"].compact!
435 # Set a compound filter
436 def compound_filter(filter)
437 @filter["compound"] = @filter["compound"] | [Matcher.create_compound_callstack(filter)]
441 # Resets various internal parts of the class, most importantly it clears
442 # out the cached discovery
444 @discovered_agents = nil
447 # Reet the filter to an empty one
449 @filter = Util.empty_filter
453 # Does discovery based on the filters set, if a discovery was
454 # previously done return that else do a new discovery.
456 # Alternatively if identity filters are given and none of them are
457 # regular expressions then just use the provided data as discovered
458 # data, avoiding discovery
460 # Discovery can be forced if direct_addressing is enabled by passing
461 # in an array of nodes with :nodes or JSON data like those produced
462 # by mcollective RPC JSON output using :json
464 # Will show a message indicating its doing discovery if running
465 # verbose or if the :verbose flag is passed in.
467 # Use reset to force a new discovery
468 def discover(flags={})
469 flags.keys.each do |key|
470 raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
473 flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose
475 verbose = false unless @output_format == :console
477 # flags[:nodes] and flags[:hosts] are the same thing, we should never have
478 # allowed :hosts as that was inconsistent with the established terminology
479 flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)
481 reset if flags[:nodes] || flags[:json]
483 unless @discovered_agents
484 # if either hosts or JSON is supplied try to figure out discovery data from there
485 # if direct_addressing is not enabled this is a critical error as the user might
486 # not have supplied filters so raise an exception
487 if flags[:nodes] || flags[:json]
488 raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing
493 hosts = Helpers.extract_hosts_from_array(flags[:nodes])
495 hosts = Helpers.extract_hosts_from_json(flags[:json])
498 raise "Could not find any hosts in discovery data provided" if hosts.empty?
500 @discovered_agents = hosts
501 @force_direct_request = true
504 identity_filter_discovery_optimization
508 # All else fails we do it the hard way using a traditional broadcast
509 unless @discovered_agents
510 @stats.time_discovery :start
512 @client.options = options
514 # if compound filters are used the only real option is to use the mc
515 # discovery plugin since its the only capable of using data queries etc
516 # and we do not want to degrade that experience just to allow compounds
517 # on other discovery plugins the UX would be too bad raising complex sets
519 @client.discoverer.force_discovery_method_by_filter(options[:filter])
522 actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter])
524 if actual_timeout > 0
525 @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout])
527 @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method])
531 # if the requested limit is a pure number and not a percent
532 # and if we're configured to use the first found hosts as the
533 # limit method then pass in the limit thus minimizing the amount
534 # of work we do in the discover phase and speeding it up significantly
535 if @limit_method == :first and @limit_targets.is_a?(Fixnum)
536 @discovered_agents = @client.discover(@filter, discovery_timeout, @limit_targets)
538 @discovered_agents = @client.discover(@filter, discovery_timeout)
541 @stderr.puts(@discovered_agents.size) if verbose
543 @force_direct_request = @client.discoverer.force_direct_mode?
545 @stats.time_discovery :end
548 @stats.discovered_agents(@discovered_agents)
549 RPC.discovered(@discovered_agents)
554 # Provides a normal options hash like you would get from
557 {:disctimeout => discovery_timeout,
558 :timeout => @timeout,
559 :verbose => @verbose,
561 :collective => @collective,
562 :output_format => @output_format,
564 :discovery_method => @discovery_method,
565 :discovery_options => @discovery_options,
566 :force_display_mode => @force_display_mode,
568 :publish_timeout => @publish_timeout,
569 :threaded => @threaded}
572 # Sets the collective we are communicating with
574 raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c)
577 @client.options = options
581 # Sets and sanity checks the limit_targets variable
582 # used to restrict how many nodes we'll target
583 def limit_targets=(limit)
584 if limit.is_a?(String)
585 raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/
588 @limit_targets = Integer(limit)
590 @limit_targets = limit
593 @limit_targets = Integer(limit)
597 # Sets and sanity check the limit_method variable
598 # used to determine how to limit targets if limit_targets is set
599 def limit_method=(method)
600 method = method.to_sym unless method.is_a?(Symbol)
602 raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method)
604 @limit_method = method
607 # Sets the batch size, if the size is set to 0 that will disable batch mode
608 def batch_size=(limit)
609 raise "Can only set batch size if direct addressing is supported" unless Config.instance.direct_addressing
611 @batch_size = Integer(limit)
612 @batch_mode = @batch_size > 0
615 def batch_sleep_time=(time)
616 raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing
618 @batch_sleep_time = Float(time)
621 # Pick a number of nodes from the discovered nodes
623 # The count should be a string that can be either
624 # just a number or a percentage like 10%
626 # It will select nodes from the discovered list based
627 # on the rpclimitmethod configuration option which can
628 # be either :first or anything else
630 # - :first would be a simple way to do a distance based
632 # - anything else will just pick one at random
633 # - if random chosen, and batch-seed set, then set srand
634 # for the generator, and reset afterwards
635 def pick_nodes_from_discovered(count)
637 pct = Integer((discover.size * (count.to_f / 100)))
638 pct == 0 ? count = 1 : count = pct
640 count = Integer(count)
643 return discover if discover.size <= count
647 if @limit_method == :first
648 return discover[0, count]
650 # we delete from the discovered list because we want
651 # to be sure there is no chance that the same node will
652 # be randomly picked twice. So we have to clone the
653 # discovered list else this method will only ever work
654 # once per discovery cycle and not actually return the
656 haystack = discover.clone
664 rnd = rand(haystack.size)
665 result << haystack.delete_at(rnd)
668 # Reset random number generator to fresh seed
669 # As our seed from options is most likely short
676 def load_aggregate_functions(action, ddl)
677 return nil unless ddl
678 return nil unless ddl.action_interface(action).keys.include?(:aggregate)
680 return Aggregate.new(ddl.action_interface(action))
683 Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class])
687 def aggregate_reply(reply, aggregate)
688 return nil unless aggregate
690 aggregate.call_functions(reply)
692 rescue Exception => e
693 Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class])
697 def rpc_result_from_reply(agent, action, reply)
698 Result.new(agent, action, {:sender => reply[:senderid], :statuscode => reply[:body][:statuscode],
699 :statusmsg => reply[:body][:statusmsg], :data => reply[:body][:data]})
702 # for requests that do not care for results just
703 # return the request id and don't do any of the
704 # response processing.
706 # We send the :process_results flag with to the
707 # nodes so they can make decisions based on that.
709 # Should only be called via method_missing
710 def fire_and_forget_request(action, args, filter=nil)
711 validate_request(action, args)
713 identity_filter_discovery_optimization
715 req = new_request(action.to_s, args)
717 filter = options[:filter] unless filter
719 message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options})
720 message.reply_to = @reply_to if @reply_to
722 if @force_direct_request || @client.discoverer.force_direct_mode?
723 message.discovered_hosts = discover.clone
724 message.type = :direct_request
727 client.sendreq(message, nil)
730 # if an identity filter is supplied and it is all strings no regex we can use that
731 # as discovery data, technically the identity filter is then redundant if we are
732 # in direct addressing mode and we could empty it out but this use case should
733 # only really be for a few -I's on the CLI
735 # For safety we leave the filter in place for now, that way we can support this
736 # enhancement also in broadcast mode.
738 # This is only needed for the 'mc' discovery method, other methods might change
739 # the concept of identity to mean something else so we should pass the full
740 # identity filter to them
741 def identity_filter_discovery_optimization
742 if options[:filter]["identity"].size > 0 && @discovery_method == "mc"
743 regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size
745 if regex_filters == 0
746 @discovered_agents = options[:filter]["identity"].clone
747 @force_direct_request = true if Config.instance.direct_addressing
752 # Calls an agent in a way very similar to call_agent but it supports batching
753 # the queries to the network.
755 # The result sets, stats, block handling etc is all exactly like you would expect
756 # from normal call_agent.
758 # This is used by method_missing and works only with direct addressing mode
759 def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
760 raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
761 raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
763 batch_size = Integer(batch_size)
764 sleep_time = Float(sleep_time)
766 Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")
768 @force_direct_request = true
770 discovered = discover
774 if discovered.size > 0
775 req = new_request(action.to_s, args)
777 aggregate = load_aggregate_functions(action, @ddl)
779 if @progress && !block_given?
782 @stdout.print twirl.twirl(respcount, discovered.size)
785 @stats.requestid = nil
787 discovered.in_groups_of(batch_size) do |hosts, last_batch|
788 message = Message.new(req, nil, {:agent => @agent, :type => :direct_request, :collective => @collective, :filter => opts[:filter], :options => opts})
790 # first time round we let the Message object create a request id
791 # we then re-use it for future requests to keep auditing sane etc
792 @stats.requestid = message.create_reqid unless @stats.requestid
793 message.requestid = @stats.requestid
795 message.discovered_hosts = hosts.clone.compact
797 @client.req(message) do |resp|
801 aggregate = process_results_with_block(action, resp, block, aggregate)
803 @stdout.print twirl.twirl(respcount, discovered.size) if @progress
805 result, aggregate = process_results_without_block(resp, action, aggregate)
811 @stats.noresponsefrom.concat @client.stats[:noresponsefrom]
812 @stats.responses += @client.stats[:responses]
813 @stats.blocktime += @client.stats[:blocktime] + sleep_time
814 @stats.totaltime += @client.stats[:totaltime]
815 @stats.discoverytime += @client.stats[:discoverytime]
817 sleep sleep_time unless last_batch
820 @stats.aggregate_summary = aggregate.summarize if aggregate
821 @stats.aggregate_failures = aggregate.failed if aggregate
823 @stderr.print("\nNo request sent, we did not discover any nodes.")
826 @stats.finish_request
830 @stdout.print("\n") if @progress
835 return [results].flatten
839 # Handles traditional calls to the remote agents with full stats
840 # blocks, non blocks and everything else supported.
842 # Other methods of calling the nodes can reuse this code by
843 # for example specifying custom options and discovery data
844 def call_agent(action, args, opts, disc=:auto, &block)
845 # Handle fire and forget requests and make sure
846 # the :process_results value is set appropriately
848 # specific reply-to requests should be treated like
849 # fire and forget since the client will never get
851 if args[:process_results] == false || @reply_to
852 return fire_and_forget_request(action, args)
854 args[:process_results] = true
857 # Do discovery when no specific discovery array is given
859 # If an array is given set the force_direct_request hint that
860 # will tell the message object to be a direct request one
862 discovered = discover
864 @force_direct_request = true if Config.instance.direct_addressing
868 req = new_request(action.to_s, args)
870 message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
871 message.discovered_hosts = discovered.clone
876 if discovered.size > 0
877 message.type = :direct_request if @force_direct_request
879 if @progress && !block_given?
882 @stdout.print twirl.twirl(respcount, discovered.size)
885 aggregate = load_aggregate_functions(action, @ddl)
887 @client.req(message) do |resp|
891 aggregate = process_results_with_block(action, resp, block, aggregate)
893 @stdout.print twirl.twirl(respcount, discovered.size) if @progress
895 result, aggregate = process_results_without_block(resp, action, aggregate)
901 @stats.aggregate_summary = aggregate.summarize if aggregate
902 @stats.aggregate_failures = aggregate.failed if aggregate
903 @stats.client_stats = @client.stats
905 @stderr.print("\nNo request sent, we did not discover any nodes.")
908 @stats.finish_request
912 @stdout.print("\n\n") if @progress
917 return [results].flatten
921 # Handles result sets that has no block associated, sets fails and ok
922 # in the stats object and return a hash of the response to send to the
924 def process_results_without_block(resp, action, aggregate)
925 @stats.node_responded(resp[:senderid])
927 result = rpc_result_from_reply(@agent, action, resp)
928 aggregate = aggregate_reply(result, aggregate) if aggregate
930 if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
931 @stats.ok if resp[:body][:statuscode] == 0
932 @stats.fail if resp[:body][:statuscode] == 1
940 # process client requests by calling a block on each result
941 # in this mode we do not do anything fancy with the result
942 # objects and we raise exceptions if there are problems with
944 def process_results_with_block(action, resp, block, aggregate)
945 @stats.node_responded(resp[:senderid])
947 result = rpc_result_from_reply(@agent, action, resp)
948 aggregate = aggregate_reply(result, aggregate) if aggregate
950 if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
951 @stats.ok if resp[:body][:statuscode] == 0
952 @stats.fail if resp[:body][:statuscode] == 1
953 @stats.time_block_execution :start
959 block.call(resp, result)
962 @stats.time_block_execution :end
966 case resp[:body][:statuscode]
968 raise UnknownRPCAction, resp[:body][:statusmsg]
970 raise MissingRPCData, resp[:body][:statusmsg]
972 raise InvalidRPCData, resp[:body][:statusmsg]
974 raise UnknownRPCError, resp[:body][:statusmsg]