Update version according to OSCI-856
[packages/precise/mcollective.git] / lib / mcollective / rpc / client.rb
1 module MCollective
2   module RPC
3     # The main component of the Simple RPC client system, this wraps around MCollective::Client
4     # and just brings in a lot of convention and standard approached.
5     class Client
6       attr_accessor :timeout, :verbose, :filter, :config, :progress, :ttl, :reply_to
7       attr_reader :client, :stats, :ddl, :agent, :limit_targets, :limit_method, :output_format, :batch_size, :batch_sleep_time, :batch_mode
8       attr_reader :discovery_options, :discovery_method, :default_discovery_method, :limit_seed
9
10       @@initial_options = nil
11
12       # Creates a stub for a remote agent, you can pass in an options array in the flags
13       # which will then be used else it will just create a default options array with
14       # filtering enabled based on the standard command line use.
15       #
16       #   rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
17       #
18       # You typically would not call this directly you'd use MCollective::RPC#rpcclient instead
19       # which is a wrapper around this that can be used as a Mixin
20       def initialize(agent, flags = {})
21         if flags.include?(:options)
22           initial_options = flags[:options]
23
24         elsif @@initial_options
25           initial_options = Marshal.load(@@initial_options)
26
27         else
28           oparser = MCollective::Optionparser.new({:verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1}, "filter")
29
30           initial_options = oparser.parse do |parser, opts|
31             if block_given?
32               yield(parser, opts)
33             end
34
35             Helpers.add_simplerpc_options(parser, opts)
36           end
37
38           @@initial_options = Marshal.dump(initial_options)
39         end
40
41         @initial_options = initial_options
42
43         @config = initial_options[:config]
44         @client = MCollective::Client.new(@config)
45         @client.options = initial_options
46
47         @stats = Stats.new
48         @agent = agent
49         @timeout = initial_options[:timeout] || 5
50         @verbose = initial_options[:verbose]
51         @filter = initial_options[:filter] || Util.empty_filter
52         @discovered_agents = nil
53         @progress = initial_options[:progress_bar]
54         @limit_targets = initial_options[:mcollective_limit_targets]
55         @limit_method = Config.instance.rpclimitmethod
56         @limit_seed = initial_options[:limit_seed] || nil
57         @output_format = initial_options[:output_format] || :console
58         @force_direct_request = false
59         @reply_to = initial_options[:reply_to]
60         @discovery_method = initial_options[:discovery_method]
61         if !@discovery_method
62           @discovery_method = Config.instance.default_discovery_method
63           @default_discovery_method = true
64         else
65           @default_discovery_method = false
66         end
67         @discovery_options = initial_options[:discovery_options] || []
68         @force_display_mode = initial_options[:force_display_mode] || false
69
70         @batch_size = Integer(initial_options[:batch_size] || 0)
71         @batch_sleep_time = Float(initial_options[:batch_sleep_time] || 1)
72         @batch_mode = @batch_size > 0
73
74         agent_filter agent
75
76         @discovery_timeout = @initial_options.fetch(:disctimeout, nil)
77
78         @collective = @client.collective
79         @ttl = initial_options[:ttl] || Config.instance.ttl
80         @publish_timeout = initial_options[:publish_timeout] || Config.instance.publish_timeout
81         @threaded = initial_options[:threaded] || Config.instance.threaded
82
83         # if we can find a DDL for the service override
84         # the timeout of the client so we always magically
85         # wait appropriate amounts of time.
86         #
87         # We add the discovery timeout to the ddl supplied
88         # timeout as the discovery timeout tends to be tuned
89         # for local network conditions and fact source speed
90         # which would other wise not be accounted for and
91         # some results might get missed.
92         #
93         # We do this only if the timeout is the default 5
94         # seconds, so that users cli overrides will still
95         # get applied
96         #
97         # DDLs are required, failure to find a DDL is fatal
98         @ddl = DDL.new(agent)
99         @stats.ddl = @ddl
100         @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5
101
102         # allows stderr and stdout to be overridden for testing
103         # but also for web apps that might not want a bunch of stuff
104         # generated to actual file handles
105         if initial_options[:stderr]
106           @stderr = initial_options[:stderr]
107         else
108           @stderr = STDERR
109           @stderr.sync = true
110         end
111
112         if initial_options[:stdout]
113           @stdout = initial_options[:stdout]
114         else
115           @stdout = STDOUT
116           @stdout.sync = true
117         end
118       end
119
120       # Disconnects cleanly from the middleware
121       def disconnect
122         @client.disconnect
123       end
124
125       # Returns help for an agent if a DDL was found
126       def help(template)
127         @ddl.help(template)
128       end
129
130       # Creates a suitable request hash for the SimpleRPC agent.
131       #
132       # You'd use this if you ever wanted to take care of sending
133       # requests on your own - perhaps via Client#sendreq if you
134       # didn't care for responses.
135       #
136       # In that case you can just do:
137       #
138       #   msg = your_rpc.new_request("some_action", :foo => :bar)
139       #   filter = your_rpc.filter
140       #
141       #   your_rpc.client.sendreq(msg, msg[:agent], filter)
142       #
143       # This will send a SimpleRPC request to the action some_action
144       # with arguments :foo = :bar, it will return immediately and
145       # you will have no indication at all if the request was receieved or not
146       #
147       # Clearly the use of this technique should be limited and done only
148       # if your code requires such a thing
149       def new_request(action, data)
150         callerid = PluginManager["security_plugin"].callerid
151
152         raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)
153
154         {:agent  => @agent,
155          :action => action,
156          :caller => callerid,
157          :data   => data}
158       end
159
160       # For the provided arguments and action the input arguments get
161       # modified by supplying any defaults provided in the DDL for arguments
162       # that were not supplied in the request
163       #
164       # We then pass the modified arguments to the DDL for validation
165       def validate_request(action, args)
166         raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl
167
168         @ddl.set_default_input_arguments(action, args)
169         @ddl.validate_rpc_request(action, args)
170       end
171
172       # Magic handler to invoke remote methods
173       #
174       # Once the stub is created using the constructor or the RPC#rpcclient helper you can
175       # call remote actions easily:
176       #
177       #   ret = rpc.echo(:msg => "hello world")
178       #
179       # This will call the 'echo' action of the 'rpctest' agent and return the result as an array,
180       # the array will be a simplified result set from the usual full MCollective::Client#req with
181       # additional error codes and error text:
182       #
183       # {
184       #   :sender => "remote.box.com",
185       #   :statuscode => 0,
186       #   :statusmsg => "OK",
187       #   :data => "hello world"
188       # }
189       #
190       # If :statuscode is 0 then everything went find, if it's 1 then you supplied the correct arguments etc
191       # but the request could not be completed, you'll find a human parsable reason in :statusmsg then.
192       #
193       # Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError
194       # see below for a description of those, in each case :statusmsg would be the reason for failure.
195       #
196       # To get access to the full result of the MCollective::Client#req calls you can pass in a block:
197       #
198       #   rpc.echo(:msg => "hello world") do |resp|
199       #      pp resp
200       #   end
201       #
202       # In this case resp will the result from MCollective::Client#req.  Instead of returning simple
203       # text and codes as above you'll also need to handle the following exceptions:
204       #
205       # UnknownRPCAction - There is no matching action on the agent
206       # MissingRPCData - You did not supply all the needed parameters for the action
207       # InvalidRPCData - The data you did supply did not pass validation
208       # UnknownRPCError - Some other error prevented the agent from running
209       #
210       # During calls a progress indicator will be shown of how many results we've received against
211       # how many nodes were discovered, you can disable this by setting progress to false:
212       #
213       #   rpc.progress = false
214       #
215       # This supports a 2nd mode where it will send the SimpleRPC request and never handle the
216       # responses.  It's a bit like UDP, it sends the request with the filter attached and you
217       # only get back the requestid, you have no indication about results.
218       #
219       # You can invoke this using:
220       #
221       #   puts rpc.echo(:process_results => false)
222       #
223       # This will output just the request id.
224       #
225       # Batched processing is supported:
226       #
227       #   printrpc rpc.ping(:batch_size => 5)
228       #
229       # This will do everything exactly as normal but communicate to only 5
230       # agents at a time
231       def method_missing(method_name, *args, &block)
232         # set args to an empty hash if nothings given
233         args = args[0]
234         args = {} if args.nil?
235
236         action = method_name.to_s
237
238         @stats.reset
239
240         validate_request(action, args)
241
242         # if a global batch size is set just use that else set it
243         # in the case that it was passed as an argument
244         batch_mode = args.include?(:batch_size) || @batch_mode
245         batch_size = args.delete(:batch_size) || @batch_size
246         batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time
247
248         # if we were given a batch_size argument thats 0 and batch_mode was
249         # determined to be on via global options etc this will allow a batch_size
250         # of 0 to disable or batch_mode for this call only
251         batch_mode = (batch_mode && Integer(batch_size) > 0)
252
253         # Handle single target requests by doing discovery and picking
254         # a random node.  Then do a custom request specifying a filter
255         # that will only match the one node.
256         if @limit_targets
257           target_nodes = pick_nodes_from_discovered(@limit_targets)
258           Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")
259
260           custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
261         elsif batch_mode
262           call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
263         else
264           call_agent(action, args, options, :auto, &block)
265         end
266       end
267
268       # Constructs custom requests with custom filters and discovery data
269       # the idea is that this would be used in web applications where you
270       # might be using a cached copy of data provided by a registration agent
271       # to figure out on your own what nodes will be responding and what your
272       # filter would be.
273       #
274       # This will help you essentially short circuit the traditional cycle of:
275       #
276       # mc discover / call / wait for discovered nodes
277       #
278       # by doing discovery however you like, contructing a filter and a list of
279       # nodes you expect responses from.
280       #
281       # Other than that it will work exactly like a normal call, blocks will behave
282       # the same way, stats will be handled the same way etcetc
283       #
284       # If you just wanted to contact one machine for example with a client that
285       # already has other filter options setup you can do:
286       #
287       # puppet.custom_request("runonce", {}, ["your.box.com"], {:identity => "your.box.com"})
288       #
289       # This will do runonce action on just 'your.box.com', no discovery will be
290       # done and after receiving just one response it will stop waiting for responses
291       #
292       # If direct_addressing is enabled in the config file you can provide an empty
293       # hash as a filter, this will force that request to be a directly addressed
294       # request which technically does not need filters.  If you try to use this
295       # mode with direct addressing disabled an exception will be raise
296       def custom_request(action, args, expected_agents, filter = {}, &block)
297         validate_request(action, args)
298
299         if filter == {} && !Config.instance.direct_addressing
300           raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
301         end
302
303         @stats.reset
304
305         custom_filter = Util.empty_filter
306         custom_options = options.clone
307
308         # merge the supplied filter with the standard empty one
309         # we could just use the merge method but I want to be sure
310         # we dont merge in stuff that isnt actually valid
311         ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
312           if filter.include?(ftype)
313             custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
314           end
315         end
316
317         # ensure that all filters at least restrict the call to the agent we're a proxy for
318         custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
319         custom_options[:filter] = custom_filter
320
321         # Fake out the stats discovery would have put there
322         @stats.discovered_agents([expected_agents].flatten)
323
324         # Handle fire and forget requests
325         #
326         # If a specific reply-to was set then from the client perspective this should
327         # be a fire and forget request too since no response will ever reach us - it
328         # will go to the reply-to destination
329         if args[:process_results] == false || @reply_to
330           return fire_and_forget_request(action, args, custom_filter)
331         end
332
333         # Now do a call pretty much exactly like in method_missing except with our own
334         # options and discovery magic
335         if block_given?
336           call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
337             block.call(r)
338           end
339         else
340           call_agent(action, args, custom_options, [expected_agents].flatten)
341         end
342       end
343
344       def discovery_timeout
345         return @discovery_timeout if @discovery_timeout
346         return @client.discoverer.ddl.meta[:timeout]
347       end
348
349       def discovery_timeout=(timeout)
350         @discovery_timeout = Float(timeout)
351
352         # we calculate the overall timeout from the DDL of the agent and
353         # the supplied discovery timeout unless someone specifically
354         # specifies a timeout to the constructor
355         #
356         # But if we also then specifically set a discovery_timeout on the
357         # agent that has to override the supplied timeout so we then
358         # calculate a correct timeout based on DDL timeout and the
359         # supplied discovery timeout
360         @timeout = @ddl.meta[:timeout] + discovery_timeout
361       end
362
363       # Sets the discovery method.  If we change the method there are a
364       # number of steps to take:
365       #
366       #  - set the new method
367       #  - if discovery options were provided, re-set those to initially
368       #    provided ones else clear them as they might now apply to a
369       #    different provider
370       #  - update the client options so it knows there is a new discovery
371       #    method in force
372       #  - reset discovery data forcing a discover on the next request
373       #
374       # The remaining item is the discovery timeout, we leave that as is
375       # since that is the user supplied timeout either via initial options
376       # or via specifically setting it on the client.
377       def discovery_method=(method)
378         @default_discovery_method = false
379         @discovery_method = method
380
381         if @initial_options[:discovery_options]
382           @discovery_options = @initial_options[:discovery_options]
383         else
384           @discovery_options.clear
385         end
386
387         @client.options = options
388
389         reset
390       end
391
392       def discovery_options=(options)
393         @discovery_options = [options].flatten
394         reset
395       end
396
397       # Sets the class filter
398       def class_filter(klass)
399         @filter["cf_class"] = @filter["cf_class"] | [klass]
400         @filter["cf_class"].compact!
401         reset
402       end
403
404       # Sets the fact filter
405       def fact_filter(fact, value=nil, operator="=")
406         return if fact.nil?
407         return if fact == false
408
409         if value.nil?
410           parsed = Util.parse_fact_string(fact)
411           @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
412         else
413           parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
414           @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
415         end
416
417         @filter["fact"].compact!
418         reset
419       end
420
421       # Sets the agent filter
422       def agent_filter(agent)
423         @filter["agent"] = @filter["agent"] | [agent]
424         @filter["agent"].compact!
425         reset
426       end
427
428       # Sets the identity filter
429       def identity_filter(identity)
430         @filter["identity"] = @filter["identity"] | [identity]
431         @filter["identity"].compact!
432         reset
433       end
434
435       # Set a compound filter
436       def compound_filter(filter)
437         @filter["compound"] = @filter["compound"] |  [Matcher.create_compound_callstack(filter)]
438         reset
439       end
440
441       # Resets various internal parts of the class, most importantly it clears
442       # out the cached discovery
443       def reset
444         @discovered_agents = nil
445       end
446
447       # Reet the filter to an empty one
448       def reset_filter
449         @filter = Util.empty_filter
450         agent_filter @agent
451       end
452
453       # Does discovery based on the filters set, if a discovery was
454       # previously done return that else do a new discovery.
455       #
456       # Alternatively if identity filters are given and none of them are
457       # regular expressions then just use the provided data as discovered
458       # data, avoiding discovery
459       #
460       # Discovery can be forced if direct_addressing is enabled by passing
461       # in an array of nodes with :nodes or JSON data like those produced
462       # by mcollective RPC JSON output using :json
463       #
464       # Will show a message indicating its doing discovery if running
465       # verbose or if the :verbose flag is passed in.
466       #
467       # Use reset to force a new discovery
468       def discover(flags={})
469         flags.keys.each do |key|
470           raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
471         end
472
473         flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose
474
475         verbose = false unless @output_format == :console
476
477         # flags[:nodes] and flags[:hosts] are the same thing, we should never have
478         # allowed :hosts as that was inconsistent with the established terminology
479         flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)
480
481         reset if flags[:nodes] || flags[:json]
482
483         unless @discovered_agents
484           # if either hosts or JSON is supplied try to figure out discovery data from there
485           # if direct_addressing is not enabled this is a critical error as the user might
486           # not have supplied filters so raise an exception
487           if flags[:nodes] || flags[:json]
488             raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing
489
490             hosts = []
491
492             if flags[:nodes]
493               hosts = Helpers.extract_hosts_from_array(flags[:nodes])
494             elsif flags[:json]
495               hosts = Helpers.extract_hosts_from_json(flags[:json])
496             end
497
498             raise "Could not find any hosts in discovery data provided" if hosts.empty?
499
500             @discovered_agents = hosts
501             @force_direct_request = true
502
503           else
504             identity_filter_discovery_optimization
505           end
506         end
507
508         # All else fails we do it the hard way using a traditional broadcast
509         unless @discovered_agents
510           @stats.time_discovery :start
511
512           @client.options = options
513
514           # if compound filters are used the only real option is to use the mc
515           # discovery plugin since its the only capable of using data queries etc
516           # and we do not want to degrade that experience just to allow compounds
517           # on other discovery plugins the UX would be too bad raising complex sets
518           # of errors etc.
519           @client.discoverer.force_discovery_method_by_filter(options[:filter])
520
521           if verbose
522             actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter])
523
524             if actual_timeout > 0
525               @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout])
526             else
527               @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method])
528             end
529           end
530
531           # if the requested limit is a pure number and not a percent
532           # and if we're configured to use the first found hosts as the
533           # limit method then pass in the limit thus minimizing the amount
534           # of work we do in the discover phase and speeding it up significantly
535           if @limit_method == :first and @limit_targets.is_a?(Fixnum)
536             @discovered_agents = @client.discover(@filter, discovery_timeout, @limit_targets)
537           else
538             @discovered_agents = @client.discover(@filter, discovery_timeout)
539           end
540
541           @stderr.puts(@discovered_agents.size) if verbose
542
543           @force_direct_request = @client.discoverer.force_direct_mode?
544
545           @stats.time_discovery :end
546         end
547
548         @stats.discovered_agents(@discovered_agents)
549         RPC.discovered(@discovered_agents)
550
551         @discovered_agents
552       end
553
554       # Provides a normal options hash like you would get from
555       # Optionparser
556       def options
557         {:disctimeout => discovery_timeout,
558          :timeout => @timeout,
559          :verbose => @verbose,
560          :filter => @filter,
561          :collective => @collective,
562          :output_format => @output_format,
563          :ttl => @ttl,
564          :discovery_method => @discovery_method,
565          :discovery_options => @discovery_options,
566          :force_display_mode => @force_display_mode,
567          :config => @config,
568          :publish_timeout => @publish_timeout,
569          :threaded => @threaded}
570       end
571
572       # Sets the collective we are communicating with
573       def collective=(c)
574         raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c)
575
576         @collective = c
577         @client.options = options
578         reset
579       end
580
581       # Sets and sanity checks the limit_targets variable
582       # used to restrict how many nodes we'll target
583       def limit_targets=(limit)
584         if limit.is_a?(String)
585           raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/
586
587           begin
588             @limit_targets = Integer(limit)
589           rescue
590             @limit_targets = limit
591           end
592         else
593           @limit_targets = Integer(limit)
594         end
595       end
596
597       # Sets and sanity check the limit_method variable
598       # used to determine how to limit targets if limit_targets is set
599       def limit_method=(method)
600         method = method.to_sym unless method.is_a?(Symbol)
601
602         raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method)
603
604         @limit_method = method
605       end
606
607       # Sets the batch size, if the size is set to 0 that will disable batch mode
608       def batch_size=(limit)
609         raise "Can only set batch size if direct addressing is supported" unless Config.instance.direct_addressing
610
611         @batch_size = Integer(limit)
612         @batch_mode = @batch_size > 0
613       end
614
615       def batch_sleep_time=(time)
616         raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing
617
618         @batch_sleep_time = Float(time)
619       end
620
621       # Pick a number of nodes from the discovered nodes
622       #
623       # The count should be a string that can be either
624       # just a number or a percentage like 10%
625       #
626       # It will select nodes from the discovered list based
627       # on the rpclimitmethod configuration option which can
628       # be either :first or anything else
629       #
630       #   - :first would be a simple way to do a distance based
631       #     selection
632       #   - anything else will just pick one at random
633       #   - if random chosen, and batch-seed set, then set srand
634       #     for the generator, and reset afterwards
635       def pick_nodes_from_discovered(count)
636         if count =~ /%$/
637           pct = Integer((discover.size * (count.to_f / 100)))
638           pct == 0 ? count = 1 : count = pct
639         else
640           count = Integer(count)
641         end
642
643         return discover if discover.size <= count
644
645         result = []
646
647         if @limit_method == :first
648           return discover[0, count]
649         else
650           # we delete from the discovered list because we want
651           # to be sure there is no chance that the same node will
652           # be randomly picked twice.  So we have to clone the
653           # discovered list else this method will only ever work
654           # once per discovery cycle and not actually return the
655           # right nodes.
656           haystack = discover.clone
657
658           if @limit_seed
659             haystack.sort!
660             srand(@limit_seed)
661           end
662
663           count.times do
664             rnd = rand(haystack.size)
665             result << haystack.delete_at(rnd)
666           end
667
668           # Reset random number generator to fresh seed
669           # As our seed from options is most likely short
670           srand if @limit_seed
671         end
672
673         [result].flatten
674       end
675
676       def load_aggregate_functions(action, ddl)
677         return nil unless ddl
678         return nil unless ddl.action_interface(action).keys.include?(:aggregate)
679
680         return Aggregate.new(ddl.action_interface(action))
681
682       rescue => e
683         Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class])
684         return nil
685       end
686
687       def aggregate_reply(reply, aggregate)
688         return nil unless aggregate
689
690         aggregate.call_functions(reply)
691         return aggregate
692       rescue Exception => e
693         Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class])
694         return nil
695       end
696
697       def rpc_result_from_reply(agent, action, reply)
698         Result.new(agent, action, {:sender => reply[:senderid], :statuscode => reply[:body][:statuscode],
699                                    :statusmsg => reply[:body][:statusmsg], :data => reply[:body][:data]})
700       end
701
702       # for requests that do not care for results just
703       # return the request id and don't do any of the
704       # response processing.
705       #
706       # We send the :process_results flag with to the
707       # nodes so they can make decisions based on that.
708       #
709       # Should only be called via method_missing
710       def fire_and_forget_request(action, args, filter=nil)
711         validate_request(action, args)
712
713         identity_filter_discovery_optimization
714
715         req = new_request(action.to_s, args)
716
717         filter = options[:filter] unless filter
718
719         message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options})
720         message.reply_to = @reply_to if @reply_to
721
722         if @force_direct_request || @client.discoverer.force_direct_mode?
723           message.discovered_hosts = discover.clone
724           message.type = :direct_request
725         end
726
727         client.sendreq(message, nil)
728       end
729
730       # if an identity filter is supplied and it is all strings no regex we can use that
731       # as discovery data, technically the identity filter is then redundant if we are
732       # in direct addressing mode and we could empty it out but this use case should
733       # only really be for a few -I's on the CLI
734       #
735       # For safety we leave the filter in place for now, that way we can support this
736       # enhancement also in broadcast mode.
737       #
738       # This is only needed for the 'mc' discovery method, other methods might change
739       # the concept of identity to mean something else so we should pass the full
740       # identity filter to them
741       def identity_filter_discovery_optimization
742         if options[:filter]["identity"].size > 0 && @discovery_method == "mc"
743           regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size
744
745           if regex_filters == 0
746             @discovered_agents = options[:filter]["identity"].clone
747             @force_direct_request = true if Config.instance.direct_addressing
748           end
749         end
750       end
751
752       # Calls an agent in a way very similar to call_agent but it supports batching
753       # the queries to the network.
754       #
755       # The result sets, stats, block handling etc is all exactly like you would expect
756       # from normal call_agent.
757       #
758       # This is used by method_missing and works only with direct addressing mode
759       def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
760         raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
761         raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
762
763         batch_size = Integer(batch_size)
764         sleep_time = Float(sleep_time)
765
766         Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")
767
768         @force_direct_request = true
769
770         discovered = discover
771         results = []
772         respcount = 0
773
774         if discovered.size > 0
775           req = new_request(action.to_s, args)
776
777           aggregate = load_aggregate_functions(action, @ddl)
778
779           if @progress && !block_given?
780             twirl = Progress.new
781             @stdout.puts
782             @stdout.print twirl.twirl(respcount, discovered.size)
783           end
784
785           @stats.requestid = nil
786
787           discovered.in_groups_of(batch_size) do |hosts, last_batch|
788             message = Message.new(req, nil, {:agent => @agent, :type => :direct_request, :collective => @collective, :filter => opts[:filter], :options => opts})
789
790             # first time round we let the Message object create a request id
791             # we then re-use it for future requests to keep auditing sane etc
792             @stats.requestid = message.create_reqid unless @stats.requestid
793             message.requestid = @stats.requestid
794
795             message.discovered_hosts = hosts.clone.compact
796
797             @client.req(message) do |resp|
798               respcount += 1
799
800               if block_given?
801                 aggregate = process_results_with_block(action, resp, block, aggregate)
802               else
803                 @stdout.print twirl.twirl(respcount, discovered.size) if @progress
804
805                 result, aggregate = process_results_without_block(resp, action, aggregate)
806
807                 results << result
808               end
809             end
810
811             @stats.noresponsefrom.concat @client.stats[:noresponsefrom]
812             @stats.responses += @client.stats[:responses]
813             @stats.blocktime += @client.stats[:blocktime] + sleep_time
814             @stats.totaltime += @client.stats[:totaltime]
815             @stats.discoverytime += @client.stats[:discoverytime]
816
817             sleep sleep_time unless last_batch
818           end
819
820           @stats.aggregate_summary = aggregate.summarize if aggregate
821           @stats.aggregate_failures = aggregate.failed if aggregate
822         else
823           @stderr.print("\nNo request sent, we did not discover any nodes.")
824         end
825
826         @stats.finish_request
827
828         RPC.stats(@stats)
829
830         @stdout.print("\n") if @progress
831
832         if block_given?
833           return stats
834         else
835           return [results].flatten
836         end
837       end
838
839       # Handles traditional calls to the remote agents with full stats
840       # blocks, non blocks and everything else supported.
841       #
842       # Other methods of calling the nodes can reuse this code by
843       # for example specifying custom options and discovery data
844       def call_agent(action, args, opts, disc=:auto, &block)
845         # Handle fire and forget requests and make sure
846         # the :process_results value is set appropriately
847         #
848         # specific reply-to requests should be treated like
849         # fire and forget since the client will never get
850         # the responses
851         if args[:process_results] == false || @reply_to
852           return fire_and_forget_request(action, args)
853         else
854           args[:process_results] = true
855         end
856
857         # Do discovery when no specific discovery array is given
858         #
859         # If an array is given set the force_direct_request hint that
860         # will tell the message object to be a direct request one
861         if disc == :auto
862           discovered = discover
863         else
864           @force_direct_request = true if Config.instance.direct_addressing
865           discovered = disc
866         end
867
868         req = new_request(action.to_s, args)
869
870         message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
871         message.discovered_hosts = discovered.clone
872
873         results = []
874         respcount = 0
875
876         if discovered.size > 0
877           message.type = :direct_request if @force_direct_request
878
879           if @progress && !block_given?
880             twirl = Progress.new
881             @stdout.puts
882             @stdout.print twirl.twirl(respcount, discovered.size)
883           end
884
885           aggregate = load_aggregate_functions(action, @ddl)
886
887           @client.req(message) do |resp|
888             respcount += 1
889
890             if block_given?
891               aggregate = process_results_with_block(action, resp, block, aggregate)
892             else
893               @stdout.print twirl.twirl(respcount, discovered.size) if @progress
894
895               result, aggregate = process_results_without_block(resp, action, aggregate)
896
897               results << result
898             end
899           end
900
901           @stats.aggregate_summary = aggregate.summarize if aggregate
902           @stats.aggregate_failures = aggregate.failed if aggregate
903           @stats.client_stats = @client.stats
904         else
905           @stderr.print("\nNo request sent, we did not discover any nodes.")
906         end
907
908         @stats.finish_request
909
910         RPC.stats(@stats)
911
912         @stdout.print("\n\n") if @progress
913
914         if block_given?
915           return stats
916         else
917           return [results].flatten
918         end
919       end
920
921       # Handles result sets that has no block associated, sets fails and ok
922       # in the stats object and return a hash of the response to send to the
923       # caller
924       def process_results_without_block(resp, action, aggregate)
925         @stats.node_responded(resp[:senderid])
926
927         result = rpc_result_from_reply(@agent, action, resp)
928         aggregate = aggregate_reply(result, aggregate) if aggregate
929
930         if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
931           @stats.ok if resp[:body][:statuscode] == 0
932           @stats.fail if resp[:body][:statuscode] == 1
933         else
934           @stats.fail
935         end
936
937         [result, aggregate]
938       end
939
940       # process client requests by calling a block on each result
941       # in this mode we do not do anything fancy with the result
942       # objects and we raise exceptions if there are problems with
943       # the data
944       def process_results_with_block(action, resp, block, aggregate)
945         @stats.node_responded(resp[:senderid])
946
947         result = rpc_result_from_reply(@agent, action, resp)
948         aggregate = aggregate_reply(result, aggregate) if aggregate
949
950         if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
951           @stats.ok if resp[:body][:statuscode] == 0
952           @stats.fail if resp[:body][:statuscode] == 1
953           @stats.time_block_execution :start
954
955           case block.arity
956             when 1
957               block.call(resp)
958             when 2
959               block.call(resp, result)
960           end
961
962           @stats.time_block_execution :end
963         else
964           @stats.fail
965
966           case resp[:body][:statuscode]
967             when 2
968               raise UnknownRPCAction, resp[:body][:statusmsg]
969             when 3
970               raise MissingRPCData, resp[:body][:statusmsg]
971             when 4
972               raise InvalidRPCData, resp[:body][:statusmsg]
973             when 5
974               raise UnknownRPCError, resp[:body][:statusmsg]
975           end
976         end
977
978         return aggregate
979       end
980     end
981   end
982 end