Parent

Class Index [+]

Quicksearch

MCollective::RPC::Client

The main component of the Simple RPC client system, this wraps around MCollective::Client and just brings in a lot of convention and standard approached.

Attributes

timeout[RW]

(Not documented)

verbose[RW]

(Not documented)

filter[RW]

(Not documented)

config[RW]

(Not documented)

progress[RW]

(Not documented)

ttl[RW]

(Not documented)

reply_to[RW]

(Not documented)

client[R]

(Not documented)

stats[R]

(Not documented)

ddl[R]

(Not documented)

agent[R]

(Not documented)

limit_targets[R]

(Not documented)

limit_method[R]

(Not documented)

output_format[R]

(Not documented)

batch_size[R]

(Not documented)

batch_sleep_time[R]

(Not documented)

batch_mode[R]

(Not documented)

discovery_options[R]

(Not documented)

discovery_method[R]

(Not documented)

default_discovery_method[R]

(Not documented)

limit_seed[R]

(Not documented)

Public Class Methods

new(agent, flags = {}) click to toggle source

Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.

  rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)

You typically would not call this directly you’d use MCollective::RPC#rpcclient instead which is a wrapper around this that can be used as a Mixin

     # File lib/mcollective/rpc/client.rb, line 20
 20:       def initialize(agent, flags = {})
 21:         if flags.include?(:options)
 22:           initial_options = flags[:options]
 23: 
 24:         elsif @@initial_options
 25:           initial_options = Marshal.load(@@initial_options)
 26: 
 27:         else
 28:           oparser = MCollective::Optionparser.new({:verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1}, "filter")
 29: 
 30:           initial_options = oparser.parse do |parser, opts|
 31:             if block_given?
 32:               yield(parser, opts)
 33:             end
 34: 
 35:             Helpers.add_simplerpc_options(parser, opts)
 36:           end
 37: 
 38:           @@initial_options = Marshal.dump(initial_options)
 39:         end
 40: 
 41:         @initial_options = initial_options
 42: 
 43:         @config = initial_options[:config]
 44:         @client = MCollective::Client.new(@config)
 45:         @client.options = initial_options
 46: 
 47:         @stats = Stats.new
 48:         @agent = agent
 49:         @timeout = initial_options[:timeout] || 5
 50:         @verbose = initial_options[:verbose]
 51:         @filter = initial_options[:filter] || Util.empty_filter
 52:         @discovered_agents = nil
 53:         @progress = initial_options[:progress_bar]
 54:         @limit_targets = initial_options[:mcollective_limit_targets]
 55:         @limit_method = Config.instance.rpclimitmethod
 56:         @limit_seed = initial_options[:limit_seed] || nil
 57:         @output_format = initial_options[:output_format] || :console
 58:         @force_direct_request = false
 59:         @reply_to = initial_options[:reply_to]
 60:         @discovery_method = initial_options[:discovery_method]
 61:         if !@discovery_method
 62:           @discovery_method = Config.instance.default_discovery_method
 63:           @default_discovery_method = true
 64:         else
 65:           @default_discovery_method = false
 66:         end
 67:         @discovery_options = initial_options[:discovery_options] || []
 68:         @force_display_mode = initial_options[:force_display_mode] || false
 69: 
 70:         @batch_size = Integer(initial_options[:batch_size] || 0)
 71:         @batch_sleep_time = Float(initial_options[:batch_sleep_time] || 1)
 72:         @batch_mode = @batch_size > 0
 73: 
 74:         agent_filter agent
 75: 
 76:         @discovery_timeout = @initial_options.fetch(:disctimeout, nil)
 77: 
 78:         @collective = @client.collective
 79:         @ttl = initial_options[:ttl] || Config.instance.ttl
 80:         @publish_timeout = initial_options[:publish_timeout] || Config.instance.publish_timeout
 81:         @threaded = initial_options[:threaded] || Config.instance.threaded
 82: 
 83:         # if we can find a DDL for the service override
 84:         # the timeout of the client so we always magically
 85:         # wait appropriate amounts of time.
 86:         #
 87:         # We add the discovery timeout to the ddl supplied
 88:         # timeout as the discovery timeout tends to be tuned
 89:         # for local network conditions and fact source speed
 90:         # which would other wise not be accounted for and
 91:         # some results might get missed.
 92:         #
 93:         # We do this only if the timeout is the default 5
 94:         # seconds, so that users cli overrides will still
 95:         # get applied
 96:         #
 97:         # DDLs are required, failure to find a DDL is fatal
 98:         @ddl = DDL.new(agent)
 99:         @stats.ddl = @ddl
100:         @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5
101: 
102:         # allows stderr and stdout to be overridden for testing
103:         # but also for web apps that might not want a bunch of stuff
104:         # generated to actual file handles
105:         if initial_options[:stderr]
106:           @stderr = initial_options[:stderr]
107:         else
108:           @stderr = STDERR
109:           @stderr.sync = true
110:         end
111: 
112:         if initial_options[:stdout]
113:           @stdout = initial_options[:stdout]
114:         else
115:           @stdout = STDOUT
116:           @stdout.sync = true
117:         end
118:       end

Public Instance Methods

agent_filter(agent) click to toggle source

Sets the agent filter

     # File lib/mcollective/rpc/client.rb, line 422
422:       def agent_filter(agent)
423:         @filter["agent"] = @filter["agent"] | [agent]
424:         @filter["agent"].compact!
425:         reset
426:       end
aggregate_reply(reply, aggregate) click to toggle source

(Not documented)

     # File lib/mcollective/rpc/client.rb, line 687
687:       def aggregate_reply(reply, aggregate)
688:         return nil unless aggregate
689: 
690:         aggregate.call_functions(reply)
691:         return aggregate
692:       rescue Exception => e
693:         Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class])
694:         return nil
695:       end
batch_size=(limit) click to toggle source

Sets the batch size, if the size is set to 0 that will disable batch mode

     # File lib/mcollective/rpc/client.rb, line 608
608:       def batch_size=(limit)
609:         raise "Can only set batch size if direct addressing is supported" unless Config.instance.direct_addressing
610: 
611:         @batch_size = Integer(limit)
612:         @batch_mode = @batch_size > 0
613:       end
batch_sleep_time=(time) click to toggle source

(Not documented)

     # File lib/mcollective/rpc/client.rb, line 615
615:       def batch_sleep_time=(time)
616:         raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing
617: 
618:         @batch_sleep_time = Float(time)
619:       end
call_agent(action, args, opts, disc=:auto, &block) click to toggle source

Handles traditional calls to the remote agents with full stats blocks, non blocks and everything else supported.

Other methods of calling the nodes can reuse this code by for example specifying custom options and discovery data

     # File lib/mcollective/rpc/client.rb, line 844
844:       def call_agent(action, args, opts, disc=:auto, &block)
845:         # Handle fire and forget requests and make sure
846:         # the :process_results value is set appropriately
847:         #
848:         # specific reply-to requests should be treated like
849:         # fire and forget since the client will never get
850:         # the responses
851:         if args[:process_results] == false || @reply_to
852:           return fire_and_forget_request(action, args)
853:         else
854:           args[:process_results] = true
855:         end
856: 
857:         # Do discovery when no specific discovery array is given
858:         #
859:         # If an array is given set the force_direct_request hint that
860:         # will tell the message object to be a direct request one
861:         if disc == :auto
862:           discovered = discover
863:         else
864:           @force_direct_request = true if Config.instance.direct_addressing
865:           discovered = disc
866:         end
867: 
868:         req = new_request(action.to_s, args)
869: 
870:         message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
871:         message.discovered_hosts = discovered.clone
872: 
873:         results = []
874:         respcount = 0
875: 
876:         if discovered.size > 0
877:           message.type = :direct_request if @force_direct_request
878: 
879:           if @progress && !block_given?
880:             twirl = Progress.new
881:             @stdout.puts
882:             @stdout.print twirl.twirl(respcount, discovered.size)
883:           end
884: 
885:           aggregate = load_aggregate_functions(action, @ddl)
886: 
887:           @client.req(message) do |resp|
888:             respcount += 1
889: 
890:             if block_given?
891:               aggregate = process_results_with_block(action, resp, block, aggregate)
892:             else
893:               @stdout.print twirl.twirl(respcount, discovered.size) if @progress
894: 
895:               result, aggregate = process_results_without_block(resp, action, aggregate)
896: 
897:               results << result
898:             end
899:           end
900: 
901:           @stats.aggregate_summary = aggregate.summarize if aggregate
902:           @stats.aggregate_failures = aggregate.failed if aggregate
903:           @stats.client_stats = @client.stats
904:         else
905:           @stderr.print("\nNo request sent, we did not discover any nodes.")
906:         end
907: 
908:         @stats.finish_request
909: 
910:         RPC.stats(@stats)
911: 
912:         @stdout.print("\n\n") if @progress
913: 
914:         if block_given?
915:           return stats
916:         else
917:           return [results].flatten
918:         end
919:       end
call_agent_batched(action, args, opts, batch_size, sleep_time, &block) click to toggle source

Calls an agent in a way very similar to call_agent but it supports batching the queries to the network.

The result sets, stats, block handling etc is all exactly like you would expect from normal call_agent.

This is used by method_missing and works only with direct addressing mode

     # File lib/mcollective/rpc/client.rb, line 759
759:       def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
760:         raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
761:         raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
762: 
763:         batch_size = Integer(batch_size)
764:         sleep_time = Float(sleep_time)
765: 
766:         Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")
767: 
768:         @force_direct_request = true
769: 
770:         discovered = discover
771:         results = []
772:         respcount = 0
773: 
774:         if discovered.size > 0
775:           req = new_request(action.to_s, args)
776: 
777:           aggregate = load_aggregate_functions(action, @ddl)
778: 
779:           if @progress && !block_given?
780:             twirl = Progress.new
781:             @stdout.puts
782:             @stdout.print twirl.twirl(respcount, discovered.size)
783:           end
784: 
785:           @stats.requestid = nil
786: 
787:           discovered.in_groups_of(batch_size) do |hosts, last_batch|
788:             message = Message.new(req, nil, {:agent => @agent, :type => :direct_request, :collective => @collective, :filter => opts[:filter], :options => opts})
789: 
790:             # first time round we let the Message object create a request id
791:             # we then re-use it for future requests to keep auditing sane etc
792:             @stats.requestid = message.create_reqid unless @stats.requestid
793:             message.requestid = @stats.requestid
794: 
795:             message.discovered_hosts = hosts.clone.compact
796: 
797:             @client.req(message) do |resp|
798:               respcount += 1
799: 
800:               if block_given?
801:                 aggregate = process_results_with_block(action, resp, block, aggregate)
802:               else
803:                 @stdout.print twirl.twirl(respcount, discovered.size) if @progress
804: 
805:                 result, aggregate = process_results_without_block(resp, action, aggregate)
806: 
807:                 results << result
808:               end
809:             end
810: 
811:             @stats.noresponsefrom.concat @client.stats[:noresponsefrom]
812:             @stats.responses += @client.stats[:responses]
813:             @stats.blocktime += @client.stats[:blocktime] + sleep_time
814:             @stats.totaltime += @client.stats[:totaltime]
815:             @stats.discoverytime += @client.stats[:discoverytime]
816: 
817:             sleep sleep_time unless last_batch
818:           end
819: 
820:           @stats.aggregate_summary = aggregate.summarize if aggregate
821:           @stats.aggregate_failures = aggregate.failed if aggregate
822:         else
823:           @stderr.print("\nNo request sent, we did not discover any nodes.")
824:         end
825: 
826:         @stats.finish_request
827: 
828:         RPC.stats(@stats)
829: 
830:         @stdout.print("\n") if @progress
831: 
832:         if block_given?
833:           return stats
834:         else
835:           return [results].flatten
836:         end
837:       end
class_filter(klass) click to toggle source

Sets the class filter

     # File lib/mcollective/rpc/client.rb, line 398
398:       def class_filter(klass)
399:         @filter["cf_class"] = @filter["cf_class"] | [klass]
400:         @filter["cf_class"].compact!
401:         reset
402:       end
collective=(c) click to toggle source

Sets the collective we are communicating with

     # File lib/mcollective/rpc/client.rb, line 573
573:       def collective=(c)
574:         raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c)
575: 
576:         @collective = c
577:         @client.options = options
578:         reset
579:       end
compound_filter(filter) click to toggle source

Set a compound filter

     # File lib/mcollective/rpc/client.rb, line 436
436:       def compound_filter(filter)
437:         @filter["compound"] = @filter["compound"] |  [Matcher.create_compound_callstack(filter)]
438:         reset
439:       end
custom_request(action, args, expected_agents, filter = {}, &block) click to toggle source

Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.

This will help you essentially short circuit the traditional cycle of:

mc discover / call / wait for discovered nodes

by doing discovery however you like, contructing a filter and a list of nodes you expect responses from.

Other than that it will work exactly like a normal call, blocks will behave the same way, stats will be handled the same way etcetc

If you just wanted to contact one machine for example with a client that already has other filter options setup you can do:

puppet.custom_request(“runonce”, {}, [“your.box.com“], {:identity => “your.box.com“})

This will do runonce action on just ‘your.box.com’, no discovery will be done and after receiving just one response it will stop waiting for responses

If direct_addressing is enabled in the config file you can provide an empty hash as a filter, this will force that request to be a directly addressed request which technically does not need filters. If you try to use this mode with direct addressing disabled an exception will be raise

     # File lib/mcollective/rpc/client.rb, line 296
296:       def custom_request(action, args, expected_agents, filter = {}, &block)
297:         validate_request(action, args)
298: 
299:         if filter == {} && !Config.instance.direct_addressing
300:           raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
301:         end
302: 
303:         @stats.reset
304: 
305:         custom_filter = Util.empty_filter
306:         custom_options = options.clone
307: 
308:         # merge the supplied filter with the standard empty one
309:         # we could just use the merge method but I want to be sure
310:         # we dont merge in stuff that isnt actually valid
311:         ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
312:           if filter.include?(ftype)
313:             custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
314:           end
315:         end
316: 
317:         # ensure that all filters at least restrict the call to the agent we're a proxy for
318:         custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
319:         custom_options[:filter] = custom_filter
320: 
321:         # Fake out the stats discovery would have put there
322:         @stats.discovered_agents([expected_agents].flatten)
323: 
324:         # Handle fire and forget requests
325:         #
326:         # If a specific reply-to was set then from the client perspective this should
327:         # be a fire and forget request too since no response will ever reach us - it
328:         # will go to the reply-to destination
329:         if args[:process_results] == false || @reply_to
330:           return fire_and_forget_request(action, args, custom_filter)
331:         end
332: 
333:         # Now do a call pretty much exactly like in method_missing except with our own
334:         # options and discovery magic
335:         if block_given?
336:           call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
337:             block.call(r)
338:           end
339:         else
340:           call_agent(action, args, custom_options, [expected_agents].flatten)
341:         end
342:       end
disconnect() click to toggle source

Disconnects cleanly from the middleware

     # File lib/mcollective/rpc/client.rb, line 121
121:       def disconnect
122:         @client.disconnect
123:       end
discover(flags={}) click to toggle source

Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.

Alternatively if identity filters are given and none of them are regular expressions then just use the provided data as discovered data, avoiding discovery

Discovery can be forced if direct_addressing is enabled by passing in an array of nodes with :nodes or JSON data like those produced by mcollective RPC JSON output using :json

Will show a message indicating its doing discovery if running verbose or if the :verbose flag is passed in.

Use reset to force a new discovery

     # File lib/mcollective/rpc/client.rb, line 468
468:       def discover(flags={})
469:         flags.keys.each do |key|
470:           raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
471:         end
472: 
473:         flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose
474: 
475:         verbose = false unless @output_format == :console
476: 
477:         # flags[:nodes] and flags[:hosts] are the same thing, we should never have
478:         # allowed :hosts as that was inconsistent with the established terminology
479:         flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)
480: 
481:         reset if flags[:nodes] || flags[:json]
482: 
483:         unless @discovered_agents
484:           # if either hosts or JSON is supplied try to figure out discovery data from there
485:           # if direct_addressing is not enabled this is a critical error as the user might
486:           # not have supplied filters so raise an exception
487:           if flags[:nodes] || flags[:json]
488:             raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing
489: 
490:             hosts = []
491: 
492:             if flags[:nodes]
493:               hosts = Helpers.extract_hosts_from_array(flags[:nodes])
494:             elsif flags[:json]
495:               hosts = Helpers.extract_hosts_from_json(flags[:json])
496:             end
497: 
498:             raise "Could not find any hosts in discovery data provided" if hosts.empty?
499: 
500:             @discovered_agents = hosts
501:             @force_direct_request = true
502: 
503:           else
504:             identity_filter_discovery_optimization
505:           end
506:         end
507: 
508:         # All else fails we do it the hard way using a traditional broadcast
509:         unless @discovered_agents
510:           @stats.time_discovery :start
511: 
512:           @client.options = options
513: 
514:           # if compound filters are used the only real option is to use the mc
515:           # discovery plugin since its the only capable of using data queries etc
516:           # and we do not want to degrade that experience just to allow compounds
517:           # on other discovery plugins the UX would be too bad raising complex sets
518:           # of errors etc.
519:           @client.discoverer.force_discovery_method_by_filter(options[:filter])
520: 
521:           if verbose
522:             actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter])
523: 
524:             if actual_timeout > 0
525:               @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout])
526:             else
527:               @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method])
528:             end
529:           end
530: 
531:           # if the requested limit is a pure number and not a percent
532:           # and if we're configured to use the first found hosts as the
533:           # limit method then pass in the limit thus minimizing the amount
534:           # of work we do in the discover phase and speeding it up significantly
535:           if @limit_method == :first and @limit_targets.is_a?(Fixnum)
536:             @discovered_agents = @client.discover(@filter, discovery_timeout, @limit_targets)
537:           else
538:             @discovered_agents = @client.discover(@filter, discovery_timeout)
539:           end
540: 
541:           @stderr.puts(@discovered_agents.size) if verbose
542: 
543:           @force_direct_request = @client.discoverer.force_direct_mode?
544: 
545:           @stats.time_discovery :end
546:         end
547: 
548:         @stats.discovered_agents(@discovered_agents)
549:         RPC.discovered(@discovered_agents)
550: 
551:         @discovered_agents
552:       end
discovery_method=(method) click to toggle source

Sets the discovery method. If we change the method there are a number of steps to take:

 - set the new method
 - if discovery options were provided, re-set those to initially
   provided ones else clear them as they might now apply to a
   different provider
 - update the client options so it knows there is a new discovery
   method in force
 - reset discovery data forcing a discover on the next request

The remaining item is the discovery timeout, we leave that as is since that is the user supplied timeout either via initial options or via specifically setting it on the client.

     # File lib/mcollective/rpc/client.rb, line 377
377:       def discovery_method=(method)
378:         @default_discovery_method = false
379:         @discovery_method = method
380: 
381:         if @initial_options[:discovery_options]
382:           @discovery_options = @initial_options[:discovery_options]
383:         else
384:           @discovery_options.clear
385:         end
386: 
387:         @client.options = options
388: 
389:         reset
390:       end
discovery_options=(options) click to toggle source

(Not documented)

     # File lib/mcollective/rpc/client.rb, line 392
392:       def discovery_options=(options)
393:         @discovery_options = [options].flatten
394:         reset
395:       end
discovery_timeout() click to toggle source

(Not documented)

     # File lib/mcollective/rpc/client.rb, line 344
344:       def discovery_timeout
345:         return @discovery_timeout if @discovery_timeout
346:         return @client.discoverer.ddl.meta[:timeout]
347:       end
discovery_timeout=(timeout) click to toggle source

(Not documented)

     # File lib/mcollective/rpc/client.rb, line 349
349:       def discovery_timeout=(timeout)
350:         @discovery_timeout = Float(timeout)
351: 
352:         # we calculate the overall timeout from the DDL of the agent and
353:         # the supplied discovery timeout unless someone specifically
354:         # specifies a timeout to the constructor
355:         #
356:         # But if we also then specifically set a discovery_timeout on the
357:         # agent that has to override the supplied timeout so we then
358:         # calculate a correct timeout based on DDL timeout and the
359:         # supplied discovery timeout
360:         @timeout = @ddl.meta[:timeout] + discovery_timeout
361:       end
fact_filter(fact, value=nil, operator="=") click to toggle source

Sets the fact filter

     # File lib/mcollective/rpc/client.rb, line 405
405:       def fact_filter(fact, value=nil, operator="=")
406:         return if fact.nil?
407:         return if fact == false
408: 
409:         if value.nil?
410:           parsed = Util.parse_fact_string(fact)
411:           @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
412:         else
413:           parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
414:           @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
415:         end
416: 
417:         @filter["fact"].compact!
418:         reset
419:       end
fire_and_forget_request(action, args, filter=nil) click to toggle source

for requests that do not care for results just return the request id and don’t do any of the response processing.

We send the :process_results flag with to the nodes so they can make decisions based on that.

Should only be called via method_missing

     # File lib/mcollective/rpc/client.rb, line 710
710:       def fire_and_forget_request(action, args, filter=nil)
711:         validate_request(action, args)
712: 
713:         identity_filter_discovery_optimization
714: 
715:         req = new_request(action.to_s, args)
716: 
717:         filter = options[:filter] unless filter
718: 
719:         message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options})
720:         message.reply_to = @reply_to if @reply_to
721: 
722:         if @force_direct_request || @client.discoverer.force_direct_mode?
723:           message.discovered_hosts = discover.clone
724:           message.type = :direct_request
725:         end
726: 
727:         client.sendreq(message, nil)
728:       end
help(template) click to toggle source

Returns help for an agent if a DDL was found

     # File lib/mcollective/rpc/client.rb, line 126
126:       def help(template)
127:         @ddl.help(template)
128:       end
identity_filter(identity) click to toggle source

Sets the identity filter

     # File lib/mcollective/rpc/client.rb, line 429
429:       def identity_filter(identity)
430:         @filter["identity"] = @filter["identity"] | [identity]
431:         @filter["identity"].compact!
432:         reset
433:       end
identity_filter_discovery_optimization() click to toggle source

if an identity filter is supplied and it is all strings no regex we can use that as discovery data, technically the identity filter is then redundant if we are in direct addressing mode and we could empty it out but this use case should only really be for a few -I’s on the CLI

For safety we leave the filter in place for now, that way we can support this enhancement also in broadcast mode.

This is only needed for the ‘mc’ discovery method, other methods might change the concept of identity to mean something else so we should pass the full identity filter to them

     # File lib/mcollective/rpc/client.rb, line 741
741:       def identity_filter_discovery_optimization
742:         if options[:filter]["identity"].size > 0 && @discovery_method == "mc"
743:           regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size
744: 
745:           if regex_filters == 0
746:             @discovered_agents = options[:filter]["identity"].clone
747:             @force_direct_request = true if Config.instance.direct_addressing
748:           end
749:         end
750:       end
limit_method=(method) click to toggle source

Sets and sanity check the limit_method variable used to determine how to limit targets if limit_targets is set

     # File lib/mcollective/rpc/client.rb, line 599
599:       def limit_method=(method)
600:         method = method.to_sym unless method.is_a?(Symbol)
601: 
602:         raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method)
603: 
604:         @limit_method = method
605:       end
limit_targets=(limit) click to toggle source

Sets and sanity checks the limit_targets variable used to restrict how many nodes we’ll target

     # File lib/mcollective/rpc/client.rb, line 583
583:       def limit_targets=(limit)
584:         if limit.is_a?(String)
585:           raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/
586: 
587:           begin
588:             @limit_targets = Integer(limit)
589:           rescue
590:             @limit_targets = limit
591:           end
592:         else
593:           @limit_targets = Integer(limit)
594:         end
595:       end
load_aggregate_functions(action, ddl) click to toggle source

(Not documented)

     # File lib/mcollective/rpc/client.rb, line 676
676:       def load_aggregate_functions(action, ddl)
677:         return nil unless ddl
678:         return nil unless ddl.action_interface(action).keys.include?(:aggregate)
679: 
680:         return Aggregate.new(ddl.action_interface(action))
681: 
682:       rescue => e
683:         Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class])
684:         return nil
685:       end
method_missing(method_name, *args, &block) click to toggle source

Magic handler to invoke remote methods

Once the stub is created using the constructor or the RPC#rpcclient helper you can call remote actions easily:

  ret = rpc.echo(:msg => "hello world")

This will call the ‘echo’ action of the ‘rpctest’ agent and return the result as an array, the array will be a simplified result set from the usual full MCollective::Client#req with additional error codes and error text:

{

  :sender => "remote.box.com",
  :statuscode => 0,
  :statusmsg => "OK",
  :data => "hello world"

}

If :statuscode is 0 then everything went find, if it’s 1 then you supplied the correct arguments etc but the request could not be completed, you’ll find a human parsable reason in :statusmsg then.

Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError see below for a description of those, in each case :statusmsg would be the reason for failure.

To get access to the full result of the MCollective::Client#req calls you can pass in a block:

  rpc.echo(:msg => "hello world") do |resp|
     pp resp
  end

In this case resp will the result from MCollective::Client#req. Instead of returning simple text and codes as above you’ll also need to handle the following exceptions:

UnknownRPCAction - There is no matching action on the agent MissingRPCData - You did not supply all the needed parameters for the action InvalidRPCData - The data you did supply did not pass validation UnknownRPCError - Some other error prevented the agent from running

During calls a progress indicator will be shown of how many results we’ve received against how many nodes were discovered, you can disable this by setting progress to false:

  rpc.progress = false

This supports a 2nd mode where it will send the SimpleRPC request and never handle the responses. It’s a bit like UDP, it sends the request with the filter attached and you only get back the requestid, you have no indication about results.

You can invoke this using:

  puts rpc.echo(:process_results => false)

This will output just the request id.

Batched processing is supported:

  printrpc rpc.ping(:batch_size => 5)

This will do everything exactly as normal but communicate to only 5 agents at a time

     # File lib/mcollective/rpc/client.rb, line 231
231:       def method_missing(method_name, *args, &block)
232:         # set args to an empty hash if nothings given
233:         args = args[0]
234:         args = {} if args.nil?
235: 
236:         action = method_name.to_s
237: 
238:         @stats.reset
239: 
240:         validate_request(action, args)
241: 
242:         # if a global batch size is set just use that else set it
243:         # in the case that it was passed as an argument
244:         batch_mode = args.include?(:batch_size) || @batch_mode
245:         batch_size = args.delete(:batch_size) || @batch_size
246:         batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time
247: 
248:         # if we were given a batch_size argument thats 0 and batch_mode was
249:         # determined to be on via global options etc this will allow a batch_size
250:         # of 0 to disable or batch_mode for this call only
251:         batch_mode = (batch_mode && Integer(batch_size) > 0)
252: 
253:         # Handle single target requests by doing discovery and picking
254:         # a random node.  Then do a custom request specifying a filter
255:         # that will only match the one node.
256:         if @limit_targets
257:           target_nodes = pick_nodes_from_discovered(@limit_targets)
258:           Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")
259: 
260:           custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
261:         elsif batch_mode
262:           call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
263:         else
264:           call_agent(action, args, options, :auto, &block)
265:         end
266:       end
new_request(action, data) click to toggle source

Creates a suitable request hash for the SimpleRPC agent.

You’d use this if you ever wanted to take care of sending requests on your own - perhaps via Client#sendreq if you didn’t care for responses.

In that case you can just do:

  msg = your_rpc.new_request("some_action", :foo => :bar)
  filter = your_rpc.filter

  your_rpc.client.sendreq(msg, msg[:agent], filter)

This will send a SimpleRPC request to the action some_action with arguments :foo = :bar, it will return immediately and you will have no indication at all if the request was receieved or not

Clearly the use of this technique should be limited and done only if your code requires such a thing

     # File lib/mcollective/rpc/client.rb, line 149
149:       def new_request(action, data)
150:         callerid = PluginManager["security_plugin"].callerid
151: 
152:         raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)
153: 
154:         {:agent  => @agent,
155:          :action => action,
156:          :caller => callerid,
157:          :data   => data}
158:       end
options() click to toggle source

Provides a normal options hash like you would get from Optionparser

     # File lib/mcollective/rpc/client.rb, line 556
556:       def options
557:         {:disctimeout => discovery_timeout,
558:          :timeout => @timeout,
559:          :verbose => @verbose,
560:          :filter => @filter,
561:          :collective => @collective,
562:          :output_format => @output_format,
563:          :ttl => @ttl,
564:          :discovery_method => @discovery_method,
565:          :discovery_options => @discovery_options,
566:          :force_display_mode => @force_display_mode,
567:          :config => @config,
568:          :publish_timeout => @publish_timeout,
569:          :threaded => @threaded}
570:       end
pick_nodes_from_discovered(count) click to toggle source

Pick a number of nodes from the discovered nodes

The count should be a string that can be either just a number or a percentage like 10%

It will select nodes from the discovered list based on the rpclimitmethod configuration option which can be either :first or anything else

  - :first would be a simple way to do a distance based
    selection
  - anything else will just pick one at random
  - if random chosen, and batch-seed set, then set srand
    for the generator, and reset afterwards
     # File lib/mcollective/rpc/client.rb, line 635
635:       def pick_nodes_from_discovered(count)
636:         if count =~ /%$/
637:           pct = Integer((discover.size * (count.to_f / 100)))
638:           pct == 0 ? count = 1 : count = pct
639:         else
640:           count = Integer(count)
641:         end
642: 
643:         return discover if discover.size <= count
644: 
645:         result = []
646: 
647:         if @limit_method == :first
648:           return discover[0, count]
649:         else
650:           # we delete from the discovered list because we want
651:           # to be sure there is no chance that the same node will
652:           # be randomly picked twice.  So we have to clone the
653:           # discovered list else this method will only ever work
654:           # once per discovery cycle and not actually return the
655:           # right nodes.
656:           haystack = discover.clone
657: 
658:           if @limit_seed
659:             haystack.sort!
660:             srand(@limit_seed)
661:           end
662: 
663:           count.times do
664:             rnd = rand(haystack.size)
665:             result << haystack.delete_at(rnd)
666:           end
667: 
668:           # Reset random number generator to fresh seed
669:           # As our seed from options is most likely short
670:           srand if @limit_seed
671:         end
672: 
673:         [result].flatten
674:       end
process_results_with_block(action, resp, block, aggregate) click to toggle source

process client requests by calling a block on each result in this mode we do not do anything fancy with the result objects and we raise exceptions if there are problems with the data

     # File lib/mcollective/rpc/client.rb, line 944
944:       def process_results_with_block(action, resp, block, aggregate)
945:         @stats.node_responded(resp[:senderid])
946: 
947:         result = rpc_result_from_reply(@agent, action, resp)
948:         aggregate = aggregate_reply(result, aggregate) if aggregate
949: 
950:         if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
951:           @stats.ok if resp[:body][:statuscode] == 0
952:           @stats.fail if resp[:body][:statuscode] == 1
953:           @stats.time_block_execution :start
954: 
955:           case block.arity
956:             when 1
957:               block.call(resp)
958:             when 2
959:               block.call(resp, result)
960:           end
961: 
962:           @stats.time_block_execution :end
963:         else
964:           @stats.fail
965: 
966:           case resp[:body][:statuscode]
967:             when 2
968:               raise UnknownRPCAction, resp[:body][:statusmsg]
969:             when 3
970:               raise MissingRPCData, resp[:body][:statusmsg]
971:             when 4
972:               raise InvalidRPCData, resp[:body][:statusmsg]
973:             when 5
974:               raise UnknownRPCError, resp[:body][:statusmsg]
975:           end
976:         end
977: 
978:         return aggregate
979:       end
process_results_without_block(resp, action, aggregate) click to toggle source

Handles result sets that has no block associated, sets fails and ok in the stats object and return a hash of the response to send to the caller

     # File lib/mcollective/rpc/client.rb, line 924
924:       def process_results_without_block(resp, action, aggregate)
925:         @stats.node_responded(resp[:senderid])
926: 
927:         result = rpc_result_from_reply(@agent, action, resp)
928:         aggregate = aggregate_reply(result, aggregate) if aggregate
929: 
930:         if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
931:           @stats.ok if resp[:body][:statuscode] == 0
932:           @stats.fail if resp[:body][:statuscode] == 1
933:         else
934:           @stats.fail
935:         end
936: 
937:         [result, aggregate]
938:       end
reset() click to toggle source

Resets various internal parts of the class, most importantly it clears out the cached discovery

     # File lib/mcollective/rpc/client.rb, line 443
443:       def reset
444:         @discovered_agents = nil
445:       end
reset_filter() click to toggle source

Reet the filter to an empty one

     # File lib/mcollective/rpc/client.rb, line 448
448:       def reset_filter
449:         @filter = Util.empty_filter
450:         agent_filter @agent
451:       end
rpc_result_from_reply(agent, action, reply) click to toggle source

(Not documented)

     # File lib/mcollective/rpc/client.rb, line 697
697:       def rpc_result_from_reply(agent, action, reply)
698:         Result.new(agent, action, {:sender => reply[:senderid], :statuscode => reply[:body][:statuscode],
699:                                    :statusmsg => reply[:body][:statusmsg], :data => reply[:body][:data]})
700:       end
validate_request(action, args) click to toggle source

For the provided arguments and action the input arguments get modified by supplying any defaults provided in the DDL for arguments that were not supplied in the request

We then pass the modified arguments to the DDL for validation

     # File lib/mcollective/rpc/client.rb, line 165
165:       def validate_request(action, args)
166:         raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl
167: 
168:         @ddl.set_default_input_arguments(action, args)
169:         @ddl.validate_rpc_request(action, args)
170:       end

Disabled; run with --debug to generate this.

[Validate]

Generated with the Darkfish Rdoc Generator 1.1.6.