The main component of the Simple RPC client system, this wraps around MCollective::Client and just brings in a lot of convention and standard approached.
Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.
rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
You typically would not call this directly you’d use MCollective::RPC#rpcclient instead which is a wrapper around this that can be used as a Mixin
# File lib/mcollective/rpc/client.rb, line 20 20: def initialize(agent, flags = {}) 21: if flags.include?(:options) 22: initial_options = flags[:options] 23: 24: elsif @@initial_options 25: initial_options = Marshal.load(@@initial_options) 26: 27: else 28: oparser = MCollective::Optionparser.new({:verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1}, "filter") 29: 30: initial_options = oparser.parse do |parser, opts| 31: if block_given? 32: yield(parser, opts) 33: end 34: 35: Helpers.add_simplerpc_options(parser, opts) 36: end 37: 38: @@initial_options = Marshal.dump(initial_options) 39: end 40: 41: @initial_options = initial_options 42: 43: @config = initial_options[:config] 44: @client = MCollective::Client.new(@config) 45: @client.options = initial_options 46: 47: @stats = Stats.new 48: @agent = agent 49: @timeout = initial_options[:timeout] || 5 50: @verbose = initial_options[:verbose] 51: @filter = initial_options[:filter] || Util.empty_filter 52: @discovered_agents = nil 53: @progress = initial_options[:progress_bar] 54: @limit_targets = initial_options[:mcollective_limit_targets] 55: @limit_method = Config.instance.rpclimitmethod 56: @limit_seed = initial_options[:limit_seed] || nil 57: @output_format = initial_options[:output_format] || :console 58: @force_direct_request = false 59: @reply_to = initial_options[:reply_to] 60: @discovery_method = initial_options[:discovery_method] 61: if !@discovery_method 62: @discovery_method = Config.instance.default_discovery_method 63: @default_discovery_method = true 64: else 65: @default_discovery_method = false 66: end 67: @discovery_options = initial_options[:discovery_options] || [] 68: @force_display_mode = initial_options[:force_display_mode] || false 69: 70: @batch_size = Integer(initial_options[:batch_size] || 0) 71: @batch_sleep_time = Float(initial_options[:batch_sleep_time] || 1) 72: @batch_mode = @batch_size > 0 73: 74: agent_filter agent 75: 76: @discovery_timeout = @initial_options.fetch(:disctimeout, nil) 77: 78: @collective = @client.collective 79: @ttl = initial_options[:ttl] || Config.instance.ttl 80: @publish_timeout = initial_options[:publish_timeout] || Config.instance.publish_timeout 81: @threaded = initial_options[:threaded] || Config.instance.threaded 82: 83: # if we can find a DDL for the service override 84: # the timeout of the client so we always magically 85: # wait appropriate amounts of time. 86: # 87: # We add the discovery timeout to the ddl supplied 88: # timeout as the discovery timeout tends to be tuned 89: # for local network conditions and fact source speed 90: # which would other wise not be accounted for and 91: # some results might get missed. 92: # 93: # We do this only if the timeout is the default 5 94: # seconds, so that users cli overrides will still 95: # get applied 96: # 97: # DDLs are required, failure to find a DDL is fatal 98: @ddl = DDL.new(agent) 99: @stats.ddl = @ddl 100: @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5 101: 102: # allows stderr and stdout to be overridden for testing 103: # but also for web apps that might not want a bunch of stuff 104: # generated to actual file handles 105: if initial_options[:stderr] 106: @stderr = initial_options[:stderr] 107: else 108: @stderr = STDERR 109: @stderr.sync = true 110: end 111: 112: if initial_options[:stdout] 113: @stdout = initial_options[:stdout] 114: else 115: @stdout = STDOUT 116: @stdout.sync = true 117: end 118: end
Sets the agent filter
# File lib/mcollective/rpc/client.rb, line 422 422: def agent_filter(agent) 423: @filter["agent"] = @filter["agent"] | [agent] 424: @filter["agent"].compact! 425: reset 426: end
(Not documented)
# File lib/mcollective/rpc/client.rb, line 687 687: def aggregate_reply(reply, aggregate) 688: return nil unless aggregate 689: 690: aggregate.call_functions(reply) 691: return aggregate 692: rescue Exception => e 693: Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class]) 694: return nil 695: end
Sets the batch size, if the size is set to 0 that will disable batch mode
# File lib/mcollective/rpc/client.rb, line 608 608: def batch_size=(limit) 609: raise "Can only set batch size if direct addressing is supported" unless Config.instance.direct_addressing 610: 611: @batch_size = Integer(limit) 612: @batch_mode = @batch_size > 0 613: end
(Not documented)
# File lib/mcollective/rpc/client.rb, line 615 615: def batch_sleep_time=(time) 616: raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing 617: 618: @batch_sleep_time = Float(time) 619: end
Handles traditional calls to the remote agents with full stats blocks, non blocks and everything else supported.
Other methods of calling the nodes can reuse this code by for example specifying custom options and discovery data
# File lib/mcollective/rpc/client.rb, line 844 844: def call_agent(action, args, opts, disc=:auto, &block) 845: # Handle fire and forget requests and make sure 846: # the :process_results value is set appropriately 847: # 848: # specific reply-to requests should be treated like 849: # fire and forget since the client will never get 850: # the responses 851: if args[:process_results] == false || @reply_to 852: return fire_and_forget_request(action, args) 853: else 854: args[:process_results] = true 855: end 856: 857: # Do discovery when no specific discovery array is given 858: # 859: # If an array is given set the force_direct_request hint that 860: # will tell the message object to be a direct request one 861: if disc == :auto 862: discovered = discover 863: else 864: @force_direct_request = true if Config.instance.direct_addressing 865: discovered = disc 866: end 867: 868: req = new_request(action.to_s, args) 869: 870: message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts}) 871: message.discovered_hosts = discovered.clone 872: 873: results = [] 874: respcount = 0 875: 876: if discovered.size > 0 877: message.type = :direct_request if @force_direct_request 878: 879: if @progress && !block_given? 880: twirl = Progress.new 881: @stdout.puts 882: @stdout.print twirl.twirl(respcount, discovered.size) 883: end 884: 885: aggregate = load_aggregate_functions(action, @ddl) 886: 887: @client.req(message) do |resp| 888: respcount += 1 889: 890: if block_given? 891: aggregate = process_results_with_block(action, resp, block, aggregate) 892: else 893: @stdout.print twirl.twirl(respcount, discovered.size) if @progress 894: 895: result, aggregate = process_results_without_block(resp, action, aggregate) 896: 897: results << result 898: end 899: end 900: 901: @stats.aggregate_summary = aggregate.summarize if aggregate 902: @stats.aggregate_failures = aggregate.failed if aggregate 903: @stats.client_stats = @client.stats 904: else 905: @stderr.print("\nNo request sent, we did not discover any nodes.") 906: end 907: 908: @stats.finish_request 909: 910: RPC.stats(@stats) 911: 912: @stdout.print("\n\n") if @progress 913: 914: if block_given? 915: return stats 916: else 917: return [results].flatten 918: end 919: end
Calls an agent in a way very similar to call_agent but it supports batching the queries to the network.
The result sets, stats, block handling etc is all exactly like you would expect from normal call_agent.
This is used by method_missing and works only with direct addressing mode
# File lib/mcollective/rpc/client.rb, line 759 759: def call_agent_batched(action, args, opts, batch_size, sleep_time, &block) 760: raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing 761: raise "Cannot bypass result processing for batched requests" if args[:process_results] == false 762: 763: batch_size = Integer(batch_size) 764: sleep_time = Float(sleep_time) 765: 766: Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}") 767: 768: @force_direct_request = true 769: 770: discovered = discover 771: results = [] 772: respcount = 0 773: 774: if discovered.size > 0 775: req = new_request(action.to_s, args) 776: 777: aggregate = load_aggregate_functions(action, @ddl) 778: 779: if @progress && !block_given? 780: twirl = Progress.new 781: @stdout.puts 782: @stdout.print twirl.twirl(respcount, discovered.size) 783: end 784: 785: @stats.requestid = nil 786: 787: discovered.in_groups_of(batch_size) do |hosts, last_batch| 788: message = Message.new(req, nil, {:agent => @agent, :type => :direct_request, :collective => @collective, :filter => opts[:filter], :options => opts}) 789: 790: # first time round we let the Message object create a request id 791: # we then re-use it for future requests to keep auditing sane etc 792: @stats.requestid = message.create_reqid unless @stats.requestid 793: message.requestid = @stats.requestid 794: 795: message.discovered_hosts = hosts.clone.compact 796: 797: @client.req(message) do |resp| 798: respcount += 1 799: 800: if block_given? 801: aggregate = process_results_with_block(action, resp, block, aggregate) 802: else 803: @stdout.print twirl.twirl(respcount, discovered.size) if @progress 804: 805: result, aggregate = process_results_without_block(resp, action, aggregate) 806: 807: results << result 808: end 809: end 810: 811: @stats.noresponsefrom.concat @client.stats[:noresponsefrom] 812: @stats.responses += @client.stats[:responses] 813: @stats.blocktime += @client.stats[:blocktime] + sleep_time 814: @stats.totaltime += @client.stats[:totaltime] 815: @stats.discoverytime += @client.stats[:discoverytime] 816: 817: sleep sleep_time unless last_batch 818: end 819: 820: @stats.aggregate_summary = aggregate.summarize if aggregate 821: @stats.aggregate_failures = aggregate.failed if aggregate 822: else 823: @stderr.print("\nNo request sent, we did not discover any nodes.") 824: end 825: 826: @stats.finish_request 827: 828: RPC.stats(@stats) 829: 830: @stdout.print("\n") if @progress 831: 832: if block_given? 833: return stats 834: else 835: return [results].flatten 836: end 837: end
Sets the class filter
# File lib/mcollective/rpc/client.rb, line 398 398: def class_filter(klass) 399: @filter["cf_class"] = @filter["cf_class"] | [klass] 400: @filter["cf_class"].compact! 401: reset 402: end
Sets the collective we are communicating with
# File lib/mcollective/rpc/client.rb, line 573 573: def collective=(c) 574: raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c) 575: 576: @collective = c 577: @client.options = options 578: reset 579: end
Set a compound filter
# File lib/mcollective/rpc/client.rb, line 436 436: def compound_filter(filter) 437: @filter["compound"] = @filter["compound"] | [Matcher.create_compound_callstack(filter)] 438: reset 439: end
Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.
This will help you essentially short circuit the traditional cycle of:
mc discover / call / wait for discovered nodes
by doing discovery however you like, contructing a filter and a list of nodes you expect responses from.
Other than that it will work exactly like a normal call, blocks will behave the same way, stats will be handled the same way etcetc
If you just wanted to contact one machine for example with a client that already has other filter options setup you can do:
puppet.custom_request(“runonce”, {}, [“your.box.com“], {:identity => “your.box.com“})
This will do runonce action on just ‘your.box.com’, no discovery will be done and after receiving just one response it will stop waiting for responses
If direct_addressing is enabled in the config file you can provide an empty hash as a filter, this will force that request to be a directly addressed request which technically does not need filters. If you try to use this mode with direct addressing disabled an exception will be raise
# File lib/mcollective/rpc/client.rb, line 296 296: def custom_request(action, args, expected_agents, filter = {}, &block) 297: validate_request(action, args) 298: 299: if filter == {} && !Config.instance.direct_addressing 300: raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes" 301: end 302: 303: @stats.reset 304: 305: custom_filter = Util.empty_filter 306: custom_options = options.clone 307: 308: # merge the supplied filter with the standard empty one 309: # we could just use the merge method but I want to be sure 310: # we dont merge in stuff that isnt actually valid 311: ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype| 312: if filter.include?(ftype) 313: custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten 314: end 315: end 316: 317: # ensure that all filters at least restrict the call to the agent we're a proxy for 318: custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent) 319: custom_options[:filter] = custom_filter 320: 321: # Fake out the stats discovery would have put there 322: @stats.discovered_agents([expected_agents].flatten) 323: 324: # Handle fire and forget requests 325: # 326: # If a specific reply-to was set then from the client perspective this should 327: # be a fire and forget request too since no response will ever reach us - it 328: # will go to the reply-to destination 329: if args[:process_results] == false || @reply_to 330: return fire_and_forget_request(action, args, custom_filter) 331: end 332: 333: # Now do a call pretty much exactly like in method_missing except with our own 334: # options and discovery magic 335: if block_given? 336: call_agent(action, args, custom_options, [expected_agents].flatten) do |r| 337: block.call(r) 338: end 339: else 340: call_agent(action, args, custom_options, [expected_agents].flatten) 341: end 342: end
Disconnects cleanly from the middleware
# File lib/mcollective/rpc/client.rb, line 121 121: def disconnect 122: @client.disconnect 123: end
Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.
Alternatively if identity filters are given and none of them are regular expressions then just use the provided data as discovered data, avoiding discovery
Discovery can be forced if direct_addressing is enabled by passing in an array of nodes with :nodes or JSON data like those produced by mcollective RPC JSON output using :json
Will show a message indicating its doing discovery if running verbose or if the :verbose flag is passed in.
Use reset to force a new discovery
# File lib/mcollective/rpc/client.rb, line 468 468: def discover(flags={}) 469: flags.keys.each do |key| 470: raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key) 471: end 472: 473: flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose 474: 475: verbose = false unless @output_format == :console 476: 477: # flags[:nodes] and flags[:hosts] are the same thing, we should never have 478: # allowed :hosts as that was inconsistent with the established terminology 479: flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts) 480: 481: reset if flags[:nodes] || flags[:json] 482: 483: unless @discovered_agents 484: # if either hosts or JSON is supplied try to figure out discovery data from there 485: # if direct_addressing is not enabled this is a critical error as the user might 486: # not have supplied filters so raise an exception 487: if flags[:nodes] || flags[:json] 488: raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing 489: 490: hosts = [] 491: 492: if flags[:nodes] 493: hosts = Helpers.extract_hosts_from_array(flags[:nodes]) 494: elsif flags[:json] 495: hosts = Helpers.extract_hosts_from_json(flags[:json]) 496: end 497: 498: raise "Could not find any hosts in discovery data provided" if hosts.empty? 499: 500: @discovered_agents = hosts 501: @force_direct_request = true 502: 503: else 504: identity_filter_discovery_optimization 505: end 506: end 507: 508: # All else fails we do it the hard way using a traditional broadcast 509: unless @discovered_agents 510: @stats.time_discovery :start 511: 512: @client.options = options 513: 514: # if compound filters are used the only real option is to use the mc 515: # discovery plugin since its the only capable of using data queries etc 516: # and we do not want to degrade that experience just to allow compounds 517: # on other discovery plugins the UX would be too bad raising complex sets 518: # of errors etc. 519: @client.discoverer.force_discovery_method_by_filter(options[:filter]) 520: 521: if verbose 522: actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter]) 523: 524: if actual_timeout > 0 525: @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout]) 526: else 527: @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method]) 528: end 529: end 530: 531: # if the requested limit is a pure number and not a percent 532: # and if we're configured to use the first found hosts as the 533: # limit method then pass in the limit thus minimizing the amount 534: # of work we do in the discover phase and speeding it up significantly 535: if @limit_method == :first and @limit_targets.is_a?(Fixnum) 536: @discovered_agents = @client.discover(@filter, discovery_timeout, @limit_targets) 537: else 538: @discovered_agents = @client.discover(@filter, discovery_timeout) 539: end 540: 541: @stderr.puts(@discovered_agents.size) if verbose 542: 543: @force_direct_request = @client.discoverer.force_direct_mode? 544: 545: @stats.time_discovery :end 546: end 547: 548: @stats.discovered_agents(@discovered_agents) 549: RPC.discovered(@discovered_agents) 550: 551: @discovered_agents 552: end
Sets the discovery method. If we change the method there are a number of steps to take:
- set the new method - if discovery options were provided, re-set those to initially provided ones else clear them as they might now apply to a different provider - update the client options so it knows there is a new discovery method in force - reset discovery data forcing a discover on the next request
The remaining item is the discovery timeout, we leave that as is since that is the user supplied timeout either via initial options or via specifically setting it on the client.
# File lib/mcollective/rpc/client.rb, line 377 377: def discovery_method=(method) 378: @default_discovery_method = false 379: @discovery_method = method 380: 381: if @initial_options[:discovery_options] 382: @discovery_options = @initial_options[:discovery_options] 383: else 384: @discovery_options.clear 385: end 386: 387: @client.options = options 388: 389: reset 390: end
(Not documented)
# File lib/mcollective/rpc/client.rb, line 392 392: def discovery_options=(options) 393: @discovery_options = [options].flatten 394: reset 395: end
(Not documented)
# File lib/mcollective/rpc/client.rb, line 344 344: def discovery_timeout 345: return @discovery_timeout if @discovery_timeout 346: return @client.discoverer.ddl.meta[:timeout] 347: end
(Not documented)
# File lib/mcollective/rpc/client.rb, line 349 349: def discovery_timeout=(timeout) 350: @discovery_timeout = Float(timeout) 351: 352: # we calculate the overall timeout from the DDL of the agent and 353: # the supplied discovery timeout unless someone specifically 354: # specifies a timeout to the constructor 355: # 356: # But if we also then specifically set a discovery_timeout on the 357: # agent that has to override the supplied timeout so we then 358: # calculate a correct timeout based on DDL timeout and the 359: # supplied discovery timeout 360: @timeout = @ddl.meta[:timeout] + discovery_timeout 361: end
Sets the fact filter
# File lib/mcollective/rpc/client.rb, line 405 405: def fact_filter(fact, value=nil, operator="=") 406: return if fact.nil? 407: return if fact == false 408: 409: if value.nil? 410: parsed = Util.parse_fact_string(fact) 411: @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false 412: else 413: parsed = Util.parse_fact_string("#{fact}#{operator}#{value}") 414: @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false 415: end 416: 417: @filter["fact"].compact! 418: reset 419: end
for requests that do not care for results just return the request id and don’t do any of the response processing.
We send the :process_results flag with to the nodes so they can make decisions based on that.
Should only be called via method_missing
# File lib/mcollective/rpc/client.rb, line 710 710: def fire_and_forget_request(action, args, filter=nil) 711: validate_request(action, args) 712: 713: identity_filter_discovery_optimization 714: 715: req = new_request(action.to_s, args) 716: 717: filter = options[:filter] unless filter 718: 719: message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options}) 720: message.reply_to = @reply_to if @reply_to 721: 722: if @force_direct_request || @client.discoverer.force_direct_mode? 723: message.discovered_hosts = discover.clone 724: message.type = :direct_request 725: end 726: 727: client.sendreq(message, nil) 728: end
Returns help for an agent if a DDL was found
# File lib/mcollective/rpc/client.rb, line 126 126: def help(template) 127: @ddl.help(template) 128: end
Sets the identity filter
# File lib/mcollective/rpc/client.rb, line 429 429: def identity_filter(identity) 430: @filter["identity"] = @filter["identity"] | [identity] 431: @filter["identity"].compact! 432: reset 433: end
if an identity filter is supplied and it is all strings no regex we can use that as discovery data, technically the identity filter is then redundant if we are in direct addressing mode and we could empty it out but this use case should only really be for a few -I’s on the CLI
For safety we leave the filter in place for now, that way we can support this enhancement also in broadcast mode.
This is only needed for the ‘mc’ discovery method, other methods might change the concept of identity to mean something else so we should pass the full identity filter to them
# File lib/mcollective/rpc/client.rb, line 741 741: def identity_filter_discovery_optimization 742: if options[:filter]["identity"].size > 0 && @discovery_method == "mc" 743: regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size 744: 745: if regex_filters == 0 746: @discovered_agents = options[:filter]["identity"].clone 747: @force_direct_request = true if Config.instance.direct_addressing 748: end 749: end 750: end
Sets and sanity check the limit_method variable used to determine how to limit targets if limit_targets is set
# File lib/mcollective/rpc/client.rb, line 599 599: def limit_method=(method) 600: method = method.to_sym unless method.is_a?(Symbol) 601: 602: raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method) 603: 604: @limit_method = method 605: end
Sets and sanity checks the limit_targets variable used to restrict how many nodes we’ll target
# File lib/mcollective/rpc/client.rb, line 583 583: def limit_targets=(limit) 584: if limit.is_a?(String) 585: raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/ 586: 587: begin 588: @limit_targets = Integer(limit) 589: rescue 590: @limit_targets = limit 591: end 592: else 593: @limit_targets = Integer(limit) 594: end 595: end
(Not documented)
# File lib/mcollective/rpc/client.rb, line 676 676: def load_aggregate_functions(action, ddl) 677: return nil unless ddl 678: return nil unless ddl.action_interface(action).keys.include?(:aggregate) 679: 680: return Aggregate.new(ddl.action_interface(action)) 681: 682: rescue => e 683: Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class]) 684: return nil 685: end
Magic handler to invoke remote methods
Once the stub is created using the constructor or the RPC#rpcclient helper you can call remote actions easily:
ret = rpc.echo(:msg => "hello world")
This will call the ‘echo’ action of the ‘rpctest’ agent and return the result as an array, the array will be a simplified result set from the usual full MCollective::Client#req with additional error codes and error text:
{
:sender => "remote.box.com", :statuscode => 0, :statusmsg => "OK", :data => "hello world"
}
If :statuscode is 0 then everything went find, if it’s 1 then you supplied the correct arguments etc but the request could not be completed, you’ll find a human parsable reason in :statusmsg then.
Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError see below for a description of those, in each case :statusmsg would be the reason for failure.
To get access to the full result of the MCollective::Client#req calls you can pass in a block:
rpc.echo(:msg => "hello world") do |resp| pp resp end
In this case resp will the result from MCollective::Client#req. Instead of returning simple text and codes as above you’ll also need to handle the following exceptions:
UnknownRPCAction - There is no matching action on the agent MissingRPCData - You did not supply all the needed parameters for the action InvalidRPCData - The data you did supply did not pass validation UnknownRPCError - Some other error prevented the agent from running
During calls a progress indicator will be shown of how many results we’ve received against how many nodes were discovered, you can disable this by setting progress to false:
rpc.progress = false
This supports a 2nd mode where it will send the SimpleRPC request and never handle the responses. It’s a bit like UDP, it sends the request with the filter attached and you only get back the requestid, you have no indication about results.
You can invoke this using:
puts rpc.echo(:process_results => false)
This will output just the request id.
Batched processing is supported:
printrpc rpc.ping(:batch_size => 5)
This will do everything exactly as normal but communicate to only 5 agents at a time
# File lib/mcollective/rpc/client.rb, line 231 231: def method_missing(method_name, *args, &block) 232: # set args to an empty hash if nothings given 233: args = args[0] 234: args = {} if args.nil? 235: 236: action = method_name.to_s 237: 238: @stats.reset 239: 240: validate_request(action, args) 241: 242: # if a global batch size is set just use that else set it 243: # in the case that it was passed as an argument 244: batch_mode = args.include?(:batch_size) || @batch_mode 245: batch_size = args.delete(:batch_size) || @batch_size 246: batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time 247: 248: # if we were given a batch_size argument thats 0 and batch_mode was 249: # determined to be on via global options etc this will allow a batch_size 250: # of 0 to disable or batch_mode for this call only 251: batch_mode = (batch_mode && Integer(batch_size) > 0) 252: 253: # Handle single target requests by doing discovery and picking 254: # a random node. Then do a custom request specifying a filter 255: # that will only match the one node. 256: if @limit_targets 257: target_nodes = pick_nodes_from_discovered(@limit_targets) 258: Log.debug("Picked #{target_nodes.join(',')} as limited target(s)") 259: 260: custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block) 261: elsif batch_mode 262: call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block) 263: else 264: call_agent(action, args, options, :auto, &block) 265: end 266: end
Creates a suitable request hash for the SimpleRPC agent.
You’d use this if you ever wanted to take care of sending requests on your own - perhaps via Client#sendreq if you didn’t care for responses.
In that case you can just do:
msg = your_rpc.new_request("some_action", :foo => :bar) filter = your_rpc.filter your_rpc.client.sendreq(msg, msg[:agent], filter)
This will send a SimpleRPC request to the action some_action with arguments :foo = :bar, it will return immediately and you will have no indication at all if the request was receieved or not
Clearly the use of this technique should be limited and done only if your code requires such a thing
# File lib/mcollective/rpc/client.rb, line 149 149: def new_request(action, data) 150: callerid = PluginManager["security_plugin"].callerid 151: 152: raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid) 153: 154: {:agent => @agent, 155: :action => action, 156: :caller => callerid, 157: :data => data} 158: end
Provides a normal options hash like you would get from Optionparser
# File lib/mcollective/rpc/client.rb, line 556 556: def options 557: {:disctimeout => discovery_timeout, 558: :timeout => @timeout, 559: :verbose => @verbose, 560: :filter => @filter, 561: :collective => @collective, 562: :output_format => @output_format, 563: :ttl => @ttl, 564: :discovery_method => @discovery_method, 565: :discovery_options => @discovery_options, 566: :force_display_mode => @force_display_mode, 567: :config => @config, 568: :publish_timeout => @publish_timeout, 569: :threaded => @threaded} 570: end
Pick a number of nodes from the discovered nodes
The count should be a string that can be either just a number or a percentage like 10%
It will select nodes from the discovered list based on the rpclimitmethod configuration option which can be either :first or anything else
- :first would be a simple way to do a distance based selection - anything else will just pick one at random - if random chosen, and batch-seed set, then set srand for the generator, and reset afterwards
# File lib/mcollective/rpc/client.rb, line 635 635: def pick_nodes_from_discovered(count) 636: if count =~ /%$/ 637: pct = Integer((discover.size * (count.to_f / 100))) 638: pct == 0 ? count = 1 : count = pct 639: else 640: count = Integer(count) 641: end 642: 643: return discover if discover.size <= count 644: 645: result = [] 646: 647: if @limit_method == :first 648: return discover[0, count] 649: else 650: # we delete from the discovered list because we want 651: # to be sure there is no chance that the same node will 652: # be randomly picked twice. So we have to clone the 653: # discovered list else this method will only ever work 654: # once per discovery cycle and not actually return the 655: # right nodes. 656: haystack = discover.clone 657: 658: if @limit_seed 659: haystack.sort! 660: srand(@limit_seed) 661: end 662: 663: count.times do 664: rnd = rand(haystack.size) 665: result << haystack.delete_at(rnd) 666: end 667: 668: # Reset random number generator to fresh seed 669: # As our seed from options is most likely short 670: srand if @limit_seed 671: end 672: 673: [result].flatten 674: end
process client requests by calling a block on each result in this mode we do not do anything fancy with the result objects and we raise exceptions if there are problems with the data
# File lib/mcollective/rpc/client.rb, line 944 944: def process_results_with_block(action, resp, block, aggregate) 945: @stats.node_responded(resp[:senderid]) 946: 947: result = rpc_result_from_reply(@agent, action, resp) 948: aggregate = aggregate_reply(result, aggregate) if aggregate 949: 950: if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1 951: @stats.ok if resp[:body][:statuscode] == 0 952: @stats.fail if resp[:body][:statuscode] == 1 953: @stats.time_block_execution :start 954: 955: case block.arity 956: when 1 957: block.call(resp) 958: when 2 959: block.call(resp, result) 960: end 961: 962: @stats.time_block_execution :end 963: else 964: @stats.fail 965: 966: case resp[:body][:statuscode] 967: when 2 968: raise UnknownRPCAction, resp[:body][:statusmsg] 969: when 3 970: raise MissingRPCData, resp[:body][:statusmsg] 971: when 4 972: raise InvalidRPCData, resp[:body][:statusmsg] 973: when 5 974: raise UnknownRPCError, resp[:body][:statusmsg] 975: end 976: end 977: 978: return aggregate 979: end
Handles result sets that has no block associated, sets fails and ok in the stats object and return a hash of the response to send to the caller
# File lib/mcollective/rpc/client.rb, line 924 924: def process_results_without_block(resp, action, aggregate) 925: @stats.node_responded(resp[:senderid]) 926: 927: result = rpc_result_from_reply(@agent, action, resp) 928: aggregate = aggregate_reply(result, aggregate) if aggregate 929: 930: if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1 931: @stats.ok if resp[:body][:statuscode] == 0 932: @stats.fail if resp[:body][:statuscode] == 1 933: else 934: @stats.fail 935: end 936: 937: [result, aggregate] 938: end
Resets various internal parts of the class, most importantly it clears out the cached discovery
# File lib/mcollective/rpc/client.rb, line 443 443: def reset 444: @discovered_agents = nil 445: end
Reet the filter to an empty one
# File lib/mcollective/rpc/client.rb, line 448 448: def reset_filter 449: @filter = Util.empty_filter 450: agent_filter @agent 451: end
(Not documented)
# File lib/mcollective/rpc/client.rb, line 697 697: def rpc_result_from_reply(agent, action, reply) 698: Result.new(agent, action, {:sender => reply[:senderid], :statuscode => reply[:body][:statuscode], 699: :statusmsg => reply[:body][:statusmsg], :data => reply[:body][:data]}) 700: end
For the provided arguments and action the input arguments get modified by supplying any defaults provided in the DDL for arguments that were not supplied in the request
We then pass the modified arguments to the DDL for validation
# File lib/mcollective/rpc/client.rb, line 165 165: def validate_request(action, args) 166: raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl 167: 168: @ddl.set_default_input_arguments(action, args) 169: @ddl.validate_rpc_request(action, args) 170: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.