Added mcollective 2.3.1 package
[packages/trusty/mcollective.git] / lib / mcollective / rpc / client.rb
1 module MCollective
2   module RPC
3     # The main component of the Simple RPC client system, this wraps around MCollective::Client
4     # and just brings in a lot of convention and standard approached.
5     class Client
6       attr_accessor :timeout, :verbose, :filter, :config, :progress, :ttl, :reply_to
7       attr_reader :client, :stats, :ddl, :agent, :limit_targets, :limit_method, :output_format, :batch_size, :batch_sleep_time, :batch_mode
8       attr_reader :discovery_options, :discovery_method, :limit_seed
9
10       @@initial_options = nil
11
12       # Creates a stub for a remote agent, you can pass in an options array in the flags
13       # which will then be used else it will just create a default options array with
14       # filtering enabled based on the standard command line use.
15       #
16       #   rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
17       #
18       # You typically would not call this directly you'd use MCollective::RPC#rpcclient instead
19       # which is a wrapper around this that can be used as a Mixin
20       def initialize(agent, flags = {})
21         if flags.include?(:options)
22           initial_options = flags[:options]
23
24         elsif @@initial_options
25           initial_options = Marshal.load(@@initial_options)
26
27         else
28           oparser = MCollective::Optionparser.new({:verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1}, "filter")
29
30           initial_options = oparser.parse do |parser, opts|
31             if block_given?
32               yield(parser, opts)
33             end
34
35             Helpers.add_simplerpc_options(parser, opts)
36           end
37
38           @@initial_options = Marshal.dump(initial_options)
39         end
40
41         @initial_options = initial_options
42
43         @config = initial_options[:config]
44         @client = MCollective::Client.new(@config)
45         @client.options = initial_options
46
47         @stats = Stats.new
48         @agent = agent
49         @timeout = initial_options[:timeout] || 5
50         @verbose = initial_options[:verbose]
51         @filter = initial_options[:filter] || Util.empty_filter
52         @discovered_agents = nil
53         @progress = initial_options[:progress_bar]
54         @limit_targets = initial_options[:mcollective_limit_targets]
55         @limit_method = Config.instance.rpclimitmethod
56         @limit_seed = initial_options[:limit_seed] || nil
57         @output_format = initial_options[:output_format] || :console
58         @force_direct_request = false
59         @reply_to = initial_options[:reply_to]
60         @discovery_method = initial_options[:discovery_method] || Config.instance.default_discovery_method
61         @discovery_options = initial_options[:discovery_options] || []
62         @force_display_mode = initial_options[:force_display_mode] || false
63
64         @batch_size = Integer(initial_options[:batch_size] || 0)
65         @batch_sleep_time = Float(initial_options[:batch_sleep_time] || 1)
66         @batch_mode = @batch_size > 0
67
68         agent_filter agent
69
70         @discovery_timeout = @initial_options.fetch(:disctimeout, nil)
71
72         @collective = @client.collective
73         @ttl = initial_options[:ttl] || Config.instance.ttl
74
75         # if we can find a DDL for the service override
76         # the timeout of the client so we always magically
77         # wait appropriate amounts of time.
78         #
79         # We add the discovery timeout to the ddl supplied
80         # timeout as the discovery timeout tends to be tuned
81         # for local network conditions and fact source speed
82         # which would other wise not be accounted for and
83         # some results might get missed.
84         #
85         # We do this only if the timeout is the default 5
86         # seconds, so that users cli overrides will still
87         # get applied
88         #
89         # DDLs are required, failure to find a DDL is fatal
90         @ddl = DDL.new(agent)
91         @stats.ddl = @ddl
92         @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5
93
94         # allows stderr and stdout to be overridden for testing
95         # but also for web apps that might not want a bunch of stuff
96         # generated to actual file handles
97         if initial_options[:stderr]
98           @stderr = initial_options[:stderr]
99         else
100           @stderr = STDERR
101           @stderr.sync = true
102         end
103
104         if initial_options[:stdout]
105           @stdout = initial_options[:stdout]
106         else
107           @stdout = STDOUT
108           @stdout.sync = true
109         end
110       end
111
112       # Disconnects cleanly from the middleware
113       def disconnect
114         @client.disconnect
115       end
116
117       # Returns help for an agent if a DDL was found
118       def help(template)
119         @ddl.help(template)
120       end
121
122       # Creates a suitable request hash for the SimpleRPC agent.
123       #
124       # You'd use this if you ever wanted to take care of sending
125       # requests on your own - perhaps via Client#sendreq if you
126       # didn't care for responses.
127       #
128       # In that case you can just do:
129       #
130       #   msg = your_rpc.new_request("some_action", :foo => :bar)
131       #   filter = your_rpc.filter
132       #
133       #   your_rpc.client.sendreq(msg, msg[:agent], filter)
134       #
135       # This will send a SimpleRPC request to the action some_action
136       # with arguments :foo = :bar, it will return immediately and
137       # you will have no indication at all if the request was receieved or not
138       #
139       # Clearly the use of this technique should be limited and done only
140       # if your code requires such a thing
141       def new_request(action, data)
142         callerid = PluginManager["security_plugin"].callerid
143
144         raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)
145
146         {:agent  => @agent,
147          :action => action,
148          :caller => callerid,
149          :data   => data}
150       end
151
152       # For the provided arguments and action the input arguments get
153       # modified by supplying any defaults provided in the DDL for arguments
154       # that were not supplied in the request
155       #
156       # We then pass the modified arguments to the DDL for validation
157       def validate_request(action, args)
158         raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl
159
160         @ddl.set_default_input_arguments(action, args)
161         @ddl.validate_rpc_request(action, args)
162       end
163
164       # Magic handler to invoke remote methods
165       #
166       # Once the stub is created using the constructor or the RPC#rpcclient helper you can
167       # call remote actions easily:
168       #
169       #   ret = rpc.echo(:msg => "hello world")
170       #
171       # This will call the 'echo' action of the 'rpctest' agent and return the result as an array,
172       # the array will be a simplified result set from the usual full MCollective::Client#req with
173       # additional error codes and error text:
174       #
175       # {
176       #   :sender => "remote.box.com",
177       #   :statuscode => 0,
178       #   :statusmsg => "OK",
179       #   :data => "hello world"
180       # }
181       #
182       # If :statuscode is 0 then everything went find, if it's 1 then you supplied the correct arguments etc
183       # but the request could not be completed, you'll find a human parsable reason in :statusmsg then.
184       #
185       # Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError
186       # see below for a description of those, in each case :statusmsg would be the reason for failure.
187       #
188       # To get access to the full result of the MCollective::Client#req calls you can pass in a block:
189       #
190       #   rpc.echo(:msg => "hello world") do |resp|
191       #      pp resp
192       #   end
193       #
194       # In this case resp will the result from MCollective::Client#req.  Instead of returning simple
195       # text and codes as above you'll also need to handle the following exceptions:
196       #
197       # UnknownRPCAction - There is no matching action on the agent
198       # MissingRPCData - You did not supply all the needed parameters for the action
199       # InvalidRPCData - The data you did supply did not pass validation
200       # UnknownRPCError - Some other error prevented the agent from running
201       #
202       # During calls a progress indicator will be shown of how many results we've received against
203       # how many nodes were discovered, you can disable this by setting progress to false:
204       #
205       #   rpc.progress = false
206       #
207       # This supports a 2nd mode where it will send the SimpleRPC request and never handle the
208       # responses.  It's a bit like UDP, it sends the request with the filter attached and you
209       # only get back the requestid, you have no indication about results.
210       #
211       # You can invoke this using:
212       #
213       #   puts rpc.echo(:process_results => false)
214       #
215       # This will output just the request id.
216       #
217       # Batched processing is supported:
218       #
219       #   printrpc rpc.ping(:batch_size => 5)
220       #
221       # This will do everything exactly as normal but communicate to only 5
222       # agents at a time
223       def method_missing(method_name, *args, &block)
224         # set args to an empty hash if nothings given
225         args = args[0]
226         args = {} if args.nil?
227
228         action = method_name.to_s
229
230         @stats.reset
231
232         validate_request(action, args)
233
234         # if a global batch size is set just use that else set it
235         # in the case that it was passed as an argument
236         batch_mode = args.include?(:batch_size) || @batch_mode
237         batch_size = args.delete(:batch_size) || @batch_size
238         batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time
239
240         # if we were given a batch_size argument thats 0 and batch_mode was
241         # determined to be on via global options etc this will allow a batch_size
242         # of 0 to disable or batch_mode for this call only
243         batch_mode = (batch_mode && Integer(batch_size) > 0)
244
245         # Handle single target requests by doing discovery and picking
246         # a random node.  Then do a custom request specifying a filter
247         # that will only match the one node.
248         if @limit_targets
249           target_nodes = pick_nodes_from_discovered(@limit_targets)
250           Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")
251
252           custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
253         elsif batch_mode
254           call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
255         else
256           call_agent(action, args, options, :auto, &block)
257         end
258       end
259
260       # Constructs custom requests with custom filters and discovery data
261       # the idea is that this would be used in web applications where you
262       # might be using a cached copy of data provided by a registration agent
263       # to figure out on your own what nodes will be responding and what your
264       # filter would be.
265       #
266       # This will help you essentially short circuit the traditional cycle of:
267       #
268       # mc discover / call / wait for discovered nodes
269       #
270       # by doing discovery however you like, contructing a filter and a list of
271       # nodes you expect responses from.
272       #
273       # Other than that it will work exactly like a normal call, blocks will behave
274       # the same way, stats will be handled the same way etcetc
275       #
276       # If you just wanted to contact one machine for example with a client that
277       # already has other filter options setup you can do:
278       #
279       # puppet.custom_request("runonce", {}, ["your.box.com"], {:identity => "your.box.com"})
280       #
281       # This will do runonce action on just 'your.box.com', no discovery will be
282       # done and after receiving just one response it will stop waiting for responses
283       #
284       # If direct_addressing is enabled in the config file you can provide an empty
285       # hash as a filter, this will force that request to be a directly addressed
286       # request which technically does not need filters.  If you try to use this
287       # mode with direct addressing disabled an exception will be raise
288       def custom_request(action, args, expected_agents, filter = {}, &block)
289         validate_request(action, args)
290
291         if filter == {} && !Config.instance.direct_addressing
292           raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
293         end
294
295         @stats.reset
296
297         custom_filter = Util.empty_filter
298         custom_options = options.clone
299
300         # merge the supplied filter with the standard empty one
301         # we could just use the merge method but I want to be sure
302         # we dont merge in stuff that isnt actually valid
303         ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
304           if filter.include?(ftype)
305             custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
306           end
307         end
308
309         # ensure that all filters at least restrict the call to the agent we're a proxy for
310         custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
311         custom_options[:filter] = custom_filter
312
313         # Fake out the stats discovery would have put there
314         @stats.discovered_agents([expected_agents].flatten)
315
316         # Handle fire and forget requests
317         #
318         # If a specific reply-to was set then from the client perspective this should
319         # be a fire and forget request too since no response will ever reach us - it
320         # will go to the reply-to destination
321         if args[:process_results] == false || @reply_to
322           return fire_and_forget_request(action, args, custom_filter)
323         end
324
325         # Now do a call pretty much exactly like in method_missing except with our own
326         # options and discovery magic
327         if block_given?
328           call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
329             block.call(r)
330           end
331         else
332           call_agent(action, args, custom_options, [expected_agents].flatten)
333         end
334       end
335
336       def discovery_timeout
337         return @discovery_timeout if @discovery_timeout
338         return @client.discoverer.ddl.meta[:timeout]
339       end
340
341       def discovery_timeout=(timeout)
342         @discovery_timeout = Float(timeout)
343
344         # we calculate the overall timeout from the DDL of the agent and
345         # the supplied discovery timeout unless someone specifically
346         # specifies a timeout to the constructor
347         #
348         # But if we also then specifically set a discovery_timeout on the
349         # agent that has to override the supplied timeout so we then
350         # calculate a correct timeout based on DDL timeout and the
351         # supplied discovery timeout
352         @timeout = @ddl.meta[:timeout] + discovery_timeout
353       end
354
355       # Sets the discovery method.  If we change the method there are a
356       # number of steps to take:
357       #
358       #  - set the new method
359       #  - if discovery options were provided, re-set those to initially
360       #    provided ones else clear them as they might now apply to a
361       #    different provider
362       #  - update the client options so it knows there is a new discovery
363       #    method in force
364       #  - reset discovery data forcing a discover on the next request
365       #
366       # The remaining item is the discovery timeout, we leave that as is
367       # since that is the user supplied timeout either via initial options
368       # or via specifically setting it on the client.
369       def discovery_method=(method)
370         @discovery_method = method
371
372         if @initial_options[:discovery_options]
373           @discovery_options = @initial_options[:discovery_options]
374         else
375           @discovery_options.clear
376         end
377
378         @client.options = options
379
380         reset
381       end
382
383       def discovery_options=(options)
384         @discovery_options = [options].flatten
385         reset
386       end
387
388       # Sets the class filter
389       def class_filter(klass)
390         @filter["cf_class"] << klass
391         @filter["cf_class"].compact!
392         reset
393       end
394
395       # Sets the fact filter
396       def fact_filter(fact, value=nil, operator="=")
397         return if fact.nil?
398         return if fact == false
399
400         if value.nil?
401           parsed = Util.parse_fact_string(fact)
402           @filter["fact"] << parsed unless parsed == false
403         else
404           parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
405           @filter["fact"] << parsed unless parsed == false
406         end
407
408         @filter["fact"].compact!
409         reset
410       end
411
412       # Sets the agent filter
413       def agent_filter(agent)
414         @filter["agent"] << agent
415         @filter["agent"].compact!
416         reset
417       end
418
419       # Sets the identity filter
420       def identity_filter(identity)
421         @filter["identity"] << identity
422         @filter["identity"].compact!
423         reset
424       end
425
426       # Set a compound filter
427       def compound_filter(filter)
428         @filter["compound"] <<  Matcher.create_compound_callstack(filter)
429         reset
430       end
431
432       # Resets various internal parts of the class, most importantly it clears
433       # out the cached discovery
434       def reset
435         @discovered_agents = nil
436       end
437
438       # Reet the filter to an empty one
439       def reset_filter
440         @filter = Util.empty_filter
441         agent_filter @agent
442       end
443
444       # Does discovery based on the filters set, if a discovery was
445       # previously done return that else do a new discovery.
446       #
447       # Alternatively if identity filters are given and none of them are
448       # regular expressions then just use the provided data as discovered
449       # data, avoiding discovery
450       #
451       # Discovery can be forced if direct_addressing is enabled by passing
452       # in an array of nodes with :nodes or JSON data like those produced
453       # by mcollective RPC JSON output using :json
454       #
455       # Will show a message indicating its doing discovery if running
456       # verbose or if the :verbose flag is passed in.
457       #
458       # Use reset to force a new discovery
459       def discover(flags={})
460         flags.keys.each do |key|
461           raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
462         end
463
464         flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose
465
466         verbose = false unless @output_format == :console
467
468         # flags[:nodes] and flags[:hosts] are the same thing, we should never have
469         # allowed :hosts as that was inconsistent with the established terminology
470         flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)
471
472         reset if flags[:nodes] || flags[:json]
473
474         unless @discovered_agents
475           # if either hosts or JSON is supplied try to figure out discovery data from there
476           # if direct_addressing is not enabled this is a critical error as the user might
477           # not have supplied filters so raise an exception
478           if flags[:nodes] || flags[:json]
479             raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing
480
481             hosts = []
482
483             if flags[:nodes]
484               hosts = Helpers.extract_hosts_from_array(flags[:nodes])
485             elsif flags[:json]
486               hosts = Helpers.extract_hosts_from_json(flags[:json])
487             end
488
489             raise "Could not find any hosts in discovery data provided" if hosts.empty?
490
491             @discovered_agents = hosts
492             @force_direct_request = true
493
494           # if an identity filter is supplied and it is all strings no regex we can use that
495           # as discovery data, technically the identity filter is then redundant if we are
496           # in direct addressing mode and we could empty it out but this use case should
497           # only really be for a few -I's on the CLI
498           #
499           # For safety we leave the filter in place for now, that way we can support this
500           # enhancement also in broadcast mode.
501           #
502           # This is only needed for the 'mc' discovery method, other methods might change
503           # the concept of identity to mean something else so we should pass the full
504           # identity filter to them
505           elsif options[:filter]["identity"].size > 0 && @discovery_method == "mc"
506             regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size
507
508             if regex_filters == 0
509               @discovered_agents = options[:filter]["identity"].clone
510               @force_direct_request = true if Config.instance.direct_addressing
511             end
512           end
513         end
514
515         # All else fails we do it the hard way using a traditional broadcast
516         unless @discovered_agents
517           @stats.time_discovery :start
518
519           @client.options = options
520
521           # if compound filters are used the only real option is to use the mc
522           # discovery plugin since its the only capable of using data queries etc
523           # and we do not want to degrade that experience just to allow compounds
524           # on other discovery plugins the UX would be too bad raising complex sets
525           # of errors etc.
526           @client.discoverer.force_discovery_method_by_filter(options[:filter])
527
528           if verbose
529             actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter])
530
531             if actual_timeout > 0
532               @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout])
533             else
534               @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method])
535             end
536           end
537
538           # if the requested limit is a pure number and not a percent
539           # and if we're configured to use the first found hosts as the
540           # limit method then pass in the limit thus minimizing the amount
541           # of work we do in the discover phase and speeding it up significantly
542           if @limit_method == :first and @limit_targets.is_a?(Fixnum)
543             @discovered_agents = @client.discover(@filter, discovery_timeout, @limit_targets)
544           else
545             @discovered_agents = @client.discover(@filter, discovery_timeout)
546           end
547
548           @stderr.puts(@discovered_agents.size) if verbose
549
550           @force_direct_request = @client.discoverer.force_direct_mode?
551
552           @stats.time_discovery :end
553         end
554
555         @stats.discovered_agents(@discovered_agents)
556         RPC.discovered(@discovered_agents)
557
558         @discovered_agents
559       end
560
561       # Provides a normal options hash like you would get from
562       # Optionparser
563       def options
564         {:disctimeout => discovery_timeout,
565          :timeout => @timeout,
566          :verbose => @verbose,
567          :filter => @filter,
568          :collective => @collective,
569          :output_format => @output_format,
570          :ttl => @ttl,
571          :discovery_method => @discovery_method,
572          :discovery_options => @discovery_options,
573          :force_display_mode => @force_display_mode,
574          :config => @config}
575       end
576
577       # Sets the collective we are communicating with
578       def collective=(c)
579         raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c)
580
581         @collective = c
582         @client.options = options
583         reset
584       end
585
586       # Sets and sanity checks the limit_targets variable
587       # used to restrict how many nodes we'll target
588       def limit_targets=(limit)
589         if limit.is_a?(String)
590           raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/
591
592           begin
593             @limit_targets = Integer(limit)
594           rescue
595             @limit_targets = limit
596           end
597         else
598           @limit_targets = Integer(limit)
599         end
600       end
601
602       # Sets and sanity check the limit_method variable
603       # used to determine how to limit targets if limit_targets is set
604       def limit_method=(method)
605         method = method.to_sym unless method.is_a?(Symbol)
606
607         raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method)
608
609         @limit_method = method
610       end
611
612       # Sets the batch size, if the size is set to 0 that will disable batch mode
613       def batch_size=(limit)
614         raise "Can only set batch size if direct addressing is supported" unless Config.instance.direct_addressing
615
616         @batch_size = Integer(limit)
617         @batch_mode = @batch_size > 0
618       end
619
620       def batch_sleep_time=(time)
621         raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing
622
623         @batch_sleep_time = Float(time)
624       end
625
626       # Pick a number of nodes from the discovered nodes
627       #
628       # The count should be a string that can be either
629       # just a number or a percentage like 10%
630       #
631       # It will select nodes from the discovered list based
632       # on the rpclimitmethod configuration option which can
633       # be either :first or anything else
634       #
635       #   - :first would be a simple way to do a distance based
636       #     selection
637       #   - anything else will just pick one at random
638       #   - if random chosen, and batch-seed set, then set srand
639       #     for the generator, and reset afterwards
640       def pick_nodes_from_discovered(count)
641         if count =~ /%$/
642           pct = Integer((discover.size * (count.to_f / 100)))
643           pct == 0 ? count = 1 : count = pct
644         else
645           count = Integer(count)
646         end
647
648         return discover if discover.size <= count
649
650         result = []
651
652         if @limit_method == :first
653           return discover[0, count]
654         else
655           # we delete from the discovered list because we want
656           # to be sure there is no chance that the same node will
657           # be randomly picked twice.  So we have to clone the
658           # discovered list else this method will only ever work
659           # once per discovery cycle and not actually return the
660           # right nodes.
661           haystack = discover.clone
662
663           if @limit_seed
664             haystack.sort!
665             srand(@limit_seed)
666           end
667
668           count.times do
669             rnd = rand(haystack.size)
670             result << haystack.delete_at(rnd)
671           end
672
673           # Reset random number generator to fresh seed
674           # As our seed from options is most likely short
675           srand if @limit_seed
676         end
677
678         [result].flatten
679       end
680
681       def load_aggregate_functions(action, ddl)
682         return nil unless ddl
683         return nil unless ddl.action_interface(action).keys.include?(:aggregate)
684
685         return Aggregate.new(ddl.action_interface(action))
686
687       rescue => e
688         Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class])
689         return nil
690       end
691
692       def aggregate_reply(reply, aggregate)
693         return nil unless aggregate
694
695         aggregate.call_functions(reply)
696         return aggregate
697       rescue Exception => e
698         Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class])
699         return nil
700       end
701
702       def rpc_result_from_reply(agent, action, reply)
703         Result.new(agent, action, {:sender => reply[:senderid], :statuscode => reply[:body][:statuscode],
704                                    :statusmsg => reply[:body][:statusmsg], :data => reply[:body][:data]})
705       end
706
707       # for requests that do not care for results just
708       # return the request id and don't do any of the
709       # response processing.
710       #
711       # We send the :process_results flag with to the
712       # nodes so they can make decisions based on that.
713       #
714       # Should only be called via method_missing
715       def fire_and_forget_request(action, args, filter=nil)
716         validate_request(action, args)
717
718         req = new_request(action.to_s, args)
719
720         filter = options[:filter] unless filter
721
722         message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options})
723         message.reply_to = @reply_to if @reply_to
724
725         return @client.sendreq(message, nil)
726       end
727
728       # Calls an agent in a way very similar to call_agent but it supports batching
729       # the queries to the network.
730       #
731       # The result sets, stats, block handling etc is all exactly like you would expect
732       # from normal call_agent.
733       #
734       # This is used by method_missing and works only with direct addressing mode
735       def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
736         raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
737         raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
738
739         batch_size = Integer(batch_size)
740         sleep_time = Float(sleep_time)
741
742         Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")
743
744         @force_direct_request = true
745
746         discovered = discover
747         results = []
748         respcount = 0
749
750         if discovered.size > 0
751           req = new_request(action.to_s, args)
752
753           aggregate = load_aggregate_functions(action, @ddl)
754
755           if @progress && !block_given?
756             twirl = Progress.new
757             @stdout.puts
758             @stdout.print twirl.twirl(respcount, discovered.size)
759           end
760
761           @stats.requestid = nil
762
763           discovered.in_groups_of(batch_size) do |hosts, last_batch|
764             message = Message.new(req, nil, {:agent => @agent, :type => :direct_request, :collective => @collective, :filter => opts[:filter], :options => opts})
765
766             # first time round we let the Message object create a request id
767             # we then re-use it for future requests to keep auditing sane etc
768             @stats.requestid = message.create_reqid unless @stats.requestid
769             message.requestid = @stats.requestid
770
771             message.discovered_hosts = hosts.clone.compact
772
773             @client.req(message) do |resp|
774               respcount += 1
775
776               if block_given?
777                 aggregate = process_results_with_block(action, resp, block, aggregate)
778               else
779                 @stdout.print twirl.twirl(respcount, discovered.size) if @progress
780
781                 result, aggregate = process_results_without_block(resp, action, aggregate)
782
783                 results << result
784               end
785             end
786
787             @stats.noresponsefrom.concat @client.stats[:noresponsefrom]
788             @stats.responses += @client.stats[:responses]
789             @stats.blocktime += @client.stats[:blocktime] + sleep_time
790             @stats.totaltime += @client.stats[:totaltime]
791             @stats.discoverytime += @client.stats[:discoverytime]
792
793             sleep sleep_time unless last_batch
794           end
795
796           @stats.aggregate_summary = aggregate.summarize if aggregate
797           @stats.aggregate_failures = aggregate.failed if aggregate
798         else
799           @stderr.print("\nNo request sent, we did not discover any nodes.")
800         end
801
802         @stats.finish_request
803
804         RPC.stats(@stats)
805
806         @stdout.print("\n") if @progress
807
808         if block_given?
809           return stats
810         else
811           return [results].flatten
812         end
813       end
814
815       # Handles traditional calls to the remote agents with full stats
816       # blocks, non blocks and everything else supported.
817       #
818       # Other methods of calling the nodes can reuse this code by
819       # for example specifying custom options and discovery data
820       def call_agent(action, args, opts, disc=:auto, &block)
821         # Handle fire and forget requests and make sure
822         # the :process_results value is set appropriately
823         #
824         # specific reply-to requests should be treated like
825         # fire and forget since the client will never get
826         # the responses
827         if args[:process_results] == false || @reply_to
828           return fire_and_forget_request(action, args)
829         else
830           args[:process_results] = true
831         end
832
833         # Do discovery when no specific discovery array is given
834         #
835         # If an array is given set the force_direct_request hint that
836         # will tell the message object to be a direct request one
837         if disc == :auto
838           discovered = discover
839         else
840           @force_direct_request = true if Config.instance.direct_addressing
841           discovered = disc
842         end
843
844         req = new_request(action.to_s, args)
845
846         message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
847         message.discovered_hosts = discovered.clone
848
849         results = []
850         respcount = 0
851
852         if discovered.size > 0
853           message.type = :direct_request if @force_direct_request
854
855           if @progress && !block_given?
856             twirl = Progress.new
857             @stdout.puts
858             @stdout.print twirl.twirl(respcount, discovered.size)
859           end
860
861           aggregate = load_aggregate_functions(action, @ddl)
862
863           @client.req(message) do |resp|
864             respcount += 1
865
866             if block_given?
867               aggregate = process_results_with_block(action, resp, block, aggregate)
868             else
869               @stdout.print twirl.twirl(respcount, discovered.size) if @progress
870
871               result, aggregate = process_results_without_block(resp, action, aggregate)
872
873               results << result
874             end
875           end
876
877           @stats.aggregate_summary = aggregate.summarize if aggregate
878           @stats.aggregate_failures = aggregate.failed if aggregate
879           @stats.client_stats = @client.stats
880         else
881           @stderr.print("\nNo request sent, we did not discover any nodes.")
882         end
883
884         @stats.finish_request
885
886         RPC.stats(@stats)
887
888         @stdout.print("\n\n") if @progress
889
890         if block_given?
891           return stats
892         else
893           return [results].flatten
894         end
895       end
896
897       # Handles result sets that has no block associated, sets fails and ok
898       # in the stats object and return a hash of the response to send to the
899       # caller
900       def process_results_without_block(resp, action, aggregate)
901         @stats.node_responded(resp[:senderid])
902
903         result = rpc_result_from_reply(@agent, action, resp)
904         aggregate = aggregate_reply(result, aggregate) if aggregate
905
906         if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
907           @stats.ok if resp[:body][:statuscode] == 0
908           @stats.fail if resp[:body][:statuscode] == 1
909         else
910           @stats.fail
911         end
912
913         [result, aggregate]
914       end
915
916       # process client requests by calling a block on each result
917       # in this mode we do not do anything fancy with the result
918       # objects and we raise exceptions if there are problems with
919       # the data
920       def process_results_with_block(action, resp, block, aggregate)
921         @stats.node_responded(resp[:senderid])
922
923         result = rpc_result_from_reply(@agent, action, resp)
924         aggregate = aggregate_reply(result, aggregate) if aggregate
925
926         if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
927           @stats.ok if resp[:body][:statuscode] == 0
928           @stats.fail if resp[:body][:statuscode] == 1
929           @stats.time_block_execution :start
930
931           case block.arity
932             when 1
933               block.call(resp)
934             when 2
935               block.call(resp, result)
936           end
937
938           @stats.time_block_execution :end
939         else
940           @stats.fail
941
942           case resp[:body][:statuscode]
943             when 2
944               raise UnknownRPCAction, resp[:body][:statusmsg]
945             when 3
946               raise MissingRPCData, resp[:body][:statusmsg]
947             when 4
948               raise InvalidRPCData, resp[:body][:statusmsg]
949             when 5
950               raise UnknownRPCError, resp[:body][:statusmsg]
951           end
952         end
953
954         return aggregate
955       end
956     end
957   end
958 end