Updated mcollective.init according to OSCI-658
[packages/precise/mcollective.git] / lib / mcollective / rpc / client.rb
diff --git a/lib/mcollective/rpc/client.rb b/lib/mcollective/rpc/client.rb
new file mode 100644 (file)
index 0000000..21efd74
--- /dev/null
@@ -0,0 +1,958 @@
+module MCollective
+  module RPC
+    # The main component of the Simple RPC client system, this wraps around MCollective::Client
+    # and just brings in a lot of convention and standard approached.
+    class Client
+      attr_accessor :timeout, :verbose, :filter, :config, :progress, :ttl, :reply_to
+      attr_reader :client, :stats, :ddl, :agent, :limit_targets, :limit_method, :output_format, :batch_size, :batch_sleep_time, :batch_mode
+      attr_reader :discovery_options, :discovery_method, :limit_seed
+
+      @@initial_options = nil
+
+      # Creates a stub for a remote agent, you can pass in an options array in the flags
+      # which will then be used else it will just create a default options array with
+      # filtering enabled based on the standard command line use.
+      #
+      #   rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
+      #
+      # You typically would not call this directly you'd use MCollective::RPC#rpcclient instead
+      # which is a wrapper around this that can be used as a Mixin
+      def initialize(agent, flags = {})
+        if flags.include?(:options)
+          initial_options = flags[:options]
+
+        elsif @@initial_options
+          initial_options = Marshal.load(@@initial_options)
+
+        else
+          oparser = MCollective::Optionparser.new({:verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1}, "filter")
+
+          initial_options = oparser.parse do |parser, opts|
+            if block_given?
+              yield(parser, opts)
+            end
+
+            Helpers.add_simplerpc_options(parser, opts)
+          end
+
+          @@initial_options = Marshal.dump(initial_options)
+        end
+
+        @initial_options = initial_options
+
+        @config = initial_options[:config]
+        @client = MCollective::Client.new(@config)
+        @client.options = initial_options
+
+        @stats = Stats.new
+        @agent = agent
+        @timeout = initial_options[:timeout] || 5
+        @verbose = initial_options[:verbose]
+        @filter = initial_options[:filter] || Util.empty_filter
+        @discovered_agents = nil
+        @progress = initial_options[:progress_bar]
+        @limit_targets = initial_options[:mcollective_limit_targets]
+        @limit_method = Config.instance.rpclimitmethod
+        @limit_seed = initial_options[:limit_seed] || nil
+        @output_format = initial_options[:output_format] || :console
+        @force_direct_request = false
+        @reply_to = initial_options[:reply_to]
+        @discovery_method = initial_options[:discovery_method] || Config.instance.default_discovery_method
+        @discovery_options = initial_options[:discovery_options] || []
+        @force_display_mode = initial_options[:force_display_mode] || false
+
+        @batch_size = Integer(initial_options[:batch_size] || 0)
+        @batch_sleep_time = Float(initial_options[:batch_sleep_time] || 1)
+        @batch_mode = @batch_size > 0
+
+        agent_filter agent
+
+        @discovery_timeout = @initial_options.fetch(:disctimeout, nil)
+
+        @collective = @client.collective
+        @ttl = initial_options[:ttl] || Config.instance.ttl
+
+        # if we can find a DDL for the service override
+        # the timeout of the client so we always magically
+        # wait appropriate amounts of time.
+        #
+        # We add the discovery timeout to the ddl supplied
+        # timeout as the discovery timeout tends to be tuned
+        # for local network conditions and fact source speed
+        # which would other wise not be accounted for and
+        # some results might get missed.
+        #
+        # We do this only if the timeout is the default 5
+        # seconds, so that users cli overrides will still
+        # get applied
+        #
+        # DDLs are required, failure to find a DDL is fatal
+        @ddl = DDL.new(agent)
+        @stats.ddl = @ddl
+        @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5
+
+        # allows stderr and stdout to be overridden for testing
+        # but also for web apps that might not want a bunch of stuff
+        # generated to actual file handles
+        if initial_options[:stderr]
+          @stderr = initial_options[:stderr]
+        else
+          @stderr = STDERR
+          @stderr.sync = true
+        end
+
+        if initial_options[:stdout]
+          @stdout = initial_options[:stdout]
+        else
+          @stdout = STDOUT
+          @stdout.sync = true
+        end
+      end
+
+      # Disconnects cleanly from the middleware
+      def disconnect
+        @client.disconnect
+      end
+
+      # Returns help for an agent if a DDL was found
+      def help(template)
+        @ddl.help(template)
+      end
+
+      # Creates a suitable request hash for the SimpleRPC agent.
+      #
+      # You'd use this if you ever wanted to take care of sending
+      # requests on your own - perhaps via Client#sendreq if you
+      # didn't care for responses.
+      #
+      # In that case you can just do:
+      #
+      #   msg = your_rpc.new_request("some_action", :foo => :bar)
+      #   filter = your_rpc.filter
+      #
+      #   your_rpc.client.sendreq(msg, msg[:agent], filter)
+      #
+      # This will send a SimpleRPC request to the action some_action
+      # with arguments :foo = :bar, it will return immediately and
+      # you will have no indication at all if the request was receieved or not
+      #
+      # Clearly the use of this technique should be limited and done only
+      # if your code requires such a thing
+      def new_request(action, data)
+        callerid = PluginManager["security_plugin"].callerid
+
+        raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)
+
+        {:agent  => @agent,
+         :action => action,
+         :caller => callerid,
+         :data   => data}
+      end
+
+      # For the provided arguments and action the input arguments get
+      # modified by supplying any defaults provided in the DDL for arguments
+      # that were not supplied in the request
+      #
+      # We then pass the modified arguments to the DDL for validation
+      def validate_request(action, args)
+        raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl
+
+        @ddl.set_default_input_arguments(action, args)
+        @ddl.validate_rpc_request(action, args)
+      end
+
+      # Magic handler to invoke remote methods
+      #
+      # Once the stub is created using the constructor or the RPC#rpcclient helper you can
+      # call remote actions easily:
+      #
+      #   ret = rpc.echo(:msg => "hello world")
+      #
+      # This will call the 'echo' action of the 'rpctest' agent and return the result as an array,
+      # the array will be a simplified result set from the usual full MCollective::Client#req with
+      # additional error codes and error text:
+      #
+      # {
+      #   :sender => "remote.box.com",
+      #   :statuscode => 0,
+      #   :statusmsg => "OK",
+      #   :data => "hello world"
+      # }
+      #
+      # If :statuscode is 0 then everything went find, if it's 1 then you supplied the correct arguments etc
+      # but the request could not be completed, you'll find a human parsable reason in :statusmsg then.
+      #
+      # Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError
+      # see below for a description of those, in each case :statusmsg would be the reason for failure.
+      #
+      # To get access to the full result of the MCollective::Client#req calls you can pass in a block:
+      #
+      #   rpc.echo(:msg => "hello world") do |resp|
+      #      pp resp
+      #   end
+      #
+      # In this case resp will the result from MCollective::Client#req.  Instead of returning simple
+      # text and codes as above you'll also need to handle the following exceptions:
+      #
+      # UnknownRPCAction - There is no matching action on the agent
+      # MissingRPCData - You did not supply all the needed parameters for the action
+      # InvalidRPCData - The data you did supply did not pass validation
+      # UnknownRPCError - Some other error prevented the agent from running
+      #
+      # During calls a progress indicator will be shown of how many results we've received against
+      # how many nodes were discovered, you can disable this by setting progress to false:
+      #
+      #   rpc.progress = false
+      #
+      # This supports a 2nd mode where it will send the SimpleRPC request and never handle the
+      # responses.  It's a bit like UDP, it sends the request with the filter attached and you
+      # only get back the requestid, you have no indication about results.
+      #
+      # You can invoke this using:
+      #
+      #   puts rpc.echo(:process_results => false)
+      #
+      # This will output just the request id.
+      #
+      # Batched processing is supported:
+      #
+      #   printrpc rpc.ping(:batch_size => 5)
+      #
+      # This will do everything exactly as normal but communicate to only 5
+      # agents at a time
+      def method_missing(method_name, *args, &block)
+        # set args to an empty hash if nothings given
+        args = args[0]
+        args = {} if args.nil?
+
+        action = method_name.to_s
+
+        @stats.reset
+
+        validate_request(action, args)
+
+        # if a global batch size is set just use that else set it
+        # in the case that it was passed as an argument
+        batch_mode = args.include?(:batch_size) || @batch_mode
+        batch_size = args.delete(:batch_size) || @batch_size
+        batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time
+
+        # if we were given a batch_size argument thats 0 and batch_mode was
+        # determined to be on via global options etc this will allow a batch_size
+        # of 0 to disable or batch_mode for this call only
+        batch_mode = (batch_mode && Integer(batch_size) > 0)
+
+        # Handle single target requests by doing discovery and picking
+        # a random node.  Then do a custom request specifying a filter
+        # that will only match the one node.
+        if @limit_targets
+          target_nodes = pick_nodes_from_discovered(@limit_targets)
+          Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")
+
+          custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
+        elsif batch_mode
+          call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
+        else
+          call_agent(action, args, options, :auto, &block)
+        end
+      end
+
+      # Constructs custom requests with custom filters and discovery data
+      # the idea is that this would be used in web applications where you
+      # might be using a cached copy of data provided by a registration agent
+      # to figure out on your own what nodes will be responding and what your
+      # filter would be.
+      #
+      # This will help you essentially short circuit the traditional cycle of:
+      #
+      # mc discover / call / wait for discovered nodes
+      #
+      # by doing discovery however you like, contructing a filter and a list of
+      # nodes you expect responses from.
+      #
+      # Other than that it will work exactly like a normal call, blocks will behave
+      # the same way, stats will be handled the same way etcetc
+      #
+      # If you just wanted to contact one machine for example with a client that
+      # already has other filter options setup you can do:
+      #
+      # puppet.custom_request("runonce", {}, ["your.box.com"], {:identity => "your.box.com"})
+      #
+      # This will do runonce action on just 'your.box.com', no discovery will be
+      # done and after receiving just one response it will stop waiting for responses
+      #
+      # If direct_addressing is enabled in the config file you can provide an empty
+      # hash as a filter, this will force that request to be a directly addressed
+      # request which technically does not need filters.  If you try to use this
+      # mode with direct addressing disabled an exception will be raise
+      def custom_request(action, args, expected_agents, filter = {}, &block)
+        validate_request(action, args)
+
+        if filter == {} && !Config.instance.direct_addressing
+          raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
+        end
+
+        @stats.reset
+
+        custom_filter = Util.empty_filter
+        custom_options = options.clone
+
+        # merge the supplied filter with the standard empty one
+        # we could just use the merge method but I want to be sure
+        # we dont merge in stuff that isnt actually valid
+        ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
+          if filter.include?(ftype)
+            custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
+          end
+        end
+
+        # ensure that all filters at least restrict the call to the agent we're a proxy for
+        custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
+        custom_options[:filter] = custom_filter
+
+        # Fake out the stats discovery would have put there
+        @stats.discovered_agents([expected_agents].flatten)
+
+        # Handle fire and forget requests
+        #
+        # If a specific reply-to was set then from the client perspective this should
+        # be a fire and forget request too since no response will ever reach us - it
+        # will go to the reply-to destination
+        if args[:process_results] == false || @reply_to
+          return fire_and_forget_request(action, args, custom_filter)
+        end
+
+        # Now do a call pretty much exactly like in method_missing except with our own
+        # options and discovery magic
+        if block_given?
+          call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
+            block.call(r)
+          end
+        else
+          call_agent(action, args, custom_options, [expected_agents].flatten)
+        end
+      end
+
+      def discovery_timeout
+        return @discovery_timeout if @discovery_timeout
+        return @client.discoverer.ddl.meta[:timeout]
+      end
+
+      def discovery_timeout=(timeout)
+        @discovery_timeout = Float(timeout)
+
+        # we calculate the overall timeout from the DDL of the agent and
+        # the supplied discovery timeout unless someone specifically
+        # specifies a timeout to the constructor
+        #
+        # But if we also then specifically set a discovery_timeout on the
+        # agent that has to override the supplied timeout so we then
+        # calculate a correct timeout based on DDL timeout and the
+        # supplied discovery timeout
+        @timeout = @ddl.meta[:timeout] + discovery_timeout
+      end
+
+      # Sets the discovery method.  If we change the method there are a
+      # number of steps to take:
+      #
+      #  - set the new method
+      #  - if discovery options were provided, re-set those to initially
+      #    provided ones else clear them as they might now apply to a
+      #    different provider
+      #  - update the client options so it knows there is a new discovery
+      #    method in force
+      #  - reset discovery data forcing a discover on the next request
+      #
+      # The remaining item is the discovery timeout, we leave that as is
+      # since that is the user supplied timeout either via initial options
+      # or via specifically setting it on the client.
+      def discovery_method=(method)
+        @discovery_method = method
+
+        if @initial_options[:discovery_options]
+          @discovery_options = @initial_options[:discovery_options]
+        else
+          @discovery_options.clear
+        end
+
+        @client.options = options
+
+        reset
+      end
+
+      def discovery_options=(options)
+        @discovery_options = [options].flatten
+        reset
+      end
+
+      # Sets the class filter
+      def class_filter(klass)
+        @filter["cf_class"] << klass
+        @filter["cf_class"].compact!
+        reset
+      end
+
+      # Sets the fact filter
+      def fact_filter(fact, value=nil, operator="=")
+        return if fact.nil?
+        return if fact == false
+
+        if value.nil?
+          parsed = Util.parse_fact_string(fact)
+          @filter["fact"] << parsed unless parsed == false
+        else
+          parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
+          @filter["fact"] << parsed unless parsed == false
+        end
+
+        @filter["fact"].compact!
+        reset
+      end
+
+      # Sets the agent filter
+      def agent_filter(agent)
+        @filter["agent"] << agent
+        @filter["agent"].compact!
+        reset
+      end
+
+      # Sets the identity filter
+      def identity_filter(identity)
+        @filter["identity"] << identity
+        @filter["identity"].compact!
+        reset
+      end
+
+      # Set a compound filter
+      def compound_filter(filter)
+        @filter["compound"] <<  Matcher.create_compound_callstack(filter)
+        reset
+      end
+
+      # Resets various internal parts of the class, most importantly it clears
+      # out the cached discovery
+      def reset
+        @discovered_agents = nil
+      end
+
+      # Reet the filter to an empty one
+      def reset_filter
+        @filter = Util.empty_filter
+        agent_filter @agent
+      end
+
+      # Does discovery based on the filters set, if a discovery was
+      # previously done return that else do a new discovery.
+      #
+      # Alternatively if identity filters are given and none of them are
+      # regular expressions then just use the provided data as discovered
+      # data, avoiding discovery
+      #
+      # Discovery can be forced if direct_addressing is enabled by passing
+      # in an array of nodes with :nodes or JSON data like those produced
+      # by mcollective RPC JSON output using :json
+      #
+      # Will show a message indicating its doing discovery if running
+      # verbose or if the :verbose flag is passed in.
+      #
+      # Use reset to force a new discovery
+      def discover(flags={})
+        flags.keys.each do |key|
+          raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
+        end
+
+        flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose
+
+        verbose = false unless @output_format == :console
+
+        # flags[:nodes] and flags[:hosts] are the same thing, we should never have
+        # allowed :hosts as that was inconsistent with the established terminology
+        flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)
+
+        reset if flags[:nodes] || flags[:json]
+
+        unless @discovered_agents
+          # if either hosts or JSON is supplied try to figure out discovery data from there
+          # if direct_addressing is not enabled this is a critical error as the user might
+          # not have supplied filters so raise an exception
+          if flags[:nodes] || flags[:json]
+            raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing
+
+            hosts = []
+
+            if flags[:nodes]
+              hosts = Helpers.extract_hosts_from_array(flags[:nodes])
+            elsif flags[:json]
+              hosts = Helpers.extract_hosts_from_json(flags[:json])
+            end
+
+            raise "Could not find any hosts in discovery data provided" if hosts.empty?
+
+            @discovered_agents = hosts
+            @force_direct_request = true
+
+          # if an identity filter is supplied and it is all strings no regex we can use that
+          # as discovery data, technically the identity filter is then redundant if we are
+          # in direct addressing mode and we could empty it out but this use case should
+          # only really be for a few -I's on the CLI
+          #
+          # For safety we leave the filter in place for now, that way we can support this
+          # enhancement also in broadcast mode.
+          #
+          # This is only needed for the 'mc' discovery method, other methods might change
+          # the concept of identity to mean something else so we should pass the full
+          # identity filter to them
+          elsif options[:filter]["identity"].size > 0 && @discovery_method == "mc"
+            regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size
+
+            if regex_filters == 0
+              @discovered_agents = options[:filter]["identity"].clone
+              @force_direct_request = true if Config.instance.direct_addressing
+            end
+          end
+        end
+
+        # All else fails we do it the hard way using a traditional broadcast
+        unless @discovered_agents
+          @stats.time_discovery :start
+
+          @client.options = options
+
+          # if compound filters are used the only real option is to use the mc
+          # discovery plugin since its the only capable of using data queries etc
+          # and we do not want to degrade that experience just to allow compounds
+          # on other discovery plugins the UX would be too bad raising complex sets
+          # of errors etc.
+          @client.discoverer.force_discovery_method_by_filter(options[:filter])
+
+          if verbose
+            actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter])
+
+            if actual_timeout > 0
+              @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout])
+            else
+              @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method])
+            end
+          end
+
+          # if the requested limit is a pure number and not a percent
+          # and if we're configured to use the first found hosts as the
+          # limit method then pass in the limit thus minimizing the amount
+          # of work we do in the discover phase and speeding it up significantly
+          if @limit_method == :first and @limit_targets.is_a?(Fixnum)
+            @discovered_agents = @client.discover(@filter, discovery_timeout, @limit_targets)
+          else
+            @discovered_agents = @client.discover(@filter, discovery_timeout)
+          end
+
+          @stderr.puts(@discovered_agents.size) if verbose
+
+          @force_direct_request = @client.discoverer.force_direct_mode?
+
+          @stats.time_discovery :end
+        end
+
+        @stats.discovered_agents(@discovered_agents)
+        RPC.discovered(@discovered_agents)
+
+        @discovered_agents
+      end
+
+      # Provides a normal options hash like you would get from
+      # Optionparser
+      def options
+        {:disctimeout => discovery_timeout,
+         :timeout => @timeout,
+         :verbose => @verbose,
+         :filter => @filter,
+         :collective => @collective,
+         :output_format => @output_format,
+         :ttl => @ttl,
+         :discovery_method => @discovery_method,
+         :discovery_options => @discovery_options,
+         :force_display_mode => @force_display_mode,
+         :config => @config}
+      end
+
+      # Sets the collective we are communicating with
+      def collective=(c)
+        raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c)
+
+        @collective = c
+        @client.options = options
+        reset
+      end
+
+      # Sets and sanity checks the limit_targets variable
+      # used to restrict how many nodes we'll target
+      def limit_targets=(limit)
+        if limit.is_a?(String)
+          raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/
+
+          begin
+            @limit_targets = Integer(limit)
+          rescue
+            @limit_targets = limit
+          end
+        else
+          @limit_targets = Integer(limit)
+        end
+      end
+
+      # Sets and sanity check the limit_method variable
+      # used to determine how to limit targets if limit_targets is set
+      def limit_method=(method)
+        method = method.to_sym unless method.is_a?(Symbol)
+
+        raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method)
+
+        @limit_method = method
+      end
+
+      # Sets the batch size, if the size is set to 0 that will disable batch mode
+      def batch_size=(limit)
+        raise "Can only set batch size if direct addressing is supported" unless Config.instance.direct_addressing
+
+        @batch_size = Integer(limit)
+        @batch_mode = @batch_size > 0
+      end
+
+      def batch_sleep_time=(time)
+        raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing
+
+        @batch_sleep_time = Float(time)
+      end
+
+      # Pick a number of nodes from the discovered nodes
+      #
+      # The count should be a string that can be either
+      # just a number or a percentage like 10%
+      #
+      # It will select nodes from the discovered list based
+      # on the rpclimitmethod configuration option which can
+      # be either :first or anything else
+      #
+      #   - :first would be a simple way to do a distance based
+      #     selection
+      #   - anything else will just pick one at random
+      #   - if random chosen, and batch-seed set, then set srand
+      #     for the generator, and reset afterwards
+      def pick_nodes_from_discovered(count)
+        if count =~ /%$/
+          pct = Integer((discover.size * (count.to_f / 100)))
+          pct == 0 ? count = 1 : count = pct
+        else
+          count = Integer(count)
+        end
+
+        return discover if discover.size <= count
+
+        result = []
+
+        if @limit_method == :first
+          return discover[0, count]
+        else
+          # we delete from the discovered list because we want
+          # to be sure there is no chance that the same node will
+          # be randomly picked twice.  So we have to clone the
+          # discovered list else this method will only ever work
+          # once per discovery cycle and not actually return the
+          # right nodes.
+          haystack = discover.clone
+
+          if @limit_seed
+            haystack.sort!
+            srand(@limit_seed)
+          end
+
+          count.times do
+            rnd = rand(haystack.size)
+            result << haystack.delete_at(rnd)
+          end
+
+          # Reset random number generator to fresh seed
+          # As our seed from options is most likely short
+          srand if @limit_seed
+        end
+
+        [result].flatten
+      end
+
+      def load_aggregate_functions(action, ddl)
+        return nil unless ddl
+        return nil unless ddl.action_interface(action).keys.include?(:aggregate)
+
+        return Aggregate.new(ddl.action_interface(action))
+
+      rescue => e
+        Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class])
+        return nil
+      end
+
+      def aggregate_reply(reply, aggregate)
+        return nil unless aggregate
+
+        aggregate.call_functions(reply)
+        return aggregate
+      rescue Exception => e
+        Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class])
+        return nil
+      end
+
+      def rpc_result_from_reply(agent, action, reply)
+        Result.new(agent, action, {:sender => reply[:senderid], :statuscode => reply[:body][:statuscode],
+                                   :statusmsg => reply[:body][:statusmsg], :data => reply[:body][:data]})
+      end
+
+      # for requests that do not care for results just
+      # return the request id and don't do any of the
+      # response processing.
+      #
+      # We send the :process_results flag with to the
+      # nodes so they can make decisions based on that.
+      #
+      # Should only be called via method_missing
+      def fire_and_forget_request(action, args, filter=nil)
+        validate_request(action, args)
+
+        req = new_request(action.to_s, args)
+
+        filter = options[:filter] unless filter
+
+        message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options})
+        message.reply_to = @reply_to if @reply_to
+
+        return @client.sendreq(message, nil)
+      end
+
+      # Calls an agent in a way very similar to call_agent but it supports batching
+      # the queries to the network.
+      #
+      # The result sets, stats, block handling etc is all exactly like you would expect
+      # from normal call_agent.
+      #
+      # This is used by method_missing and works only with direct addressing mode
+      def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
+        raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
+        raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
+
+        batch_size = Integer(batch_size)
+        sleep_time = Float(sleep_time)
+
+        Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")
+
+        @force_direct_request = true
+
+        discovered = discover
+        results = []
+        respcount = 0
+
+        if discovered.size > 0
+          req = new_request(action.to_s, args)
+
+          aggregate = load_aggregate_functions(action, @ddl)
+
+          if @progress && !block_given?
+            twirl = Progress.new
+            @stdout.puts
+            @stdout.print twirl.twirl(respcount, discovered.size)
+          end
+
+          @stats.requestid = nil
+
+          discovered.in_groups_of(batch_size) do |hosts, last_batch|
+            message = Message.new(req, nil, {:agent => @agent, :type => :direct_request, :collective => @collective, :filter => opts[:filter], :options => opts})
+
+            # first time round we let the Message object create a request id
+            # we then re-use it for future requests to keep auditing sane etc
+            @stats.requestid = message.create_reqid unless @stats.requestid
+            message.requestid = @stats.requestid
+
+            message.discovered_hosts = hosts.clone.compact
+
+            @client.req(message) do |resp|
+              respcount += 1
+
+              if block_given?
+                aggregate = process_results_with_block(action, resp, block, aggregate)
+              else
+                @stdout.print twirl.twirl(respcount, discovered.size) if @progress
+
+                result, aggregate = process_results_without_block(resp, action, aggregate)
+
+                results << result
+              end
+            end
+
+            @stats.noresponsefrom.concat @client.stats[:noresponsefrom]
+            @stats.responses += @client.stats[:responses]
+            @stats.blocktime += @client.stats[:blocktime] + sleep_time
+            @stats.totaltime += @client.stats[:totaltime]
+            @stats.discoverytime += @client.stats[:discoverytime]
+
+            sleep sleep_time unless last_batch
+          end
+
+          @stats.aggregate_summary = aggregate.summarize if aggregate
+          @stats.aggregate_failures = aggregate.failed if aggregate
+        else
+          @stderr.print("\nNo request sent, we did not discover any nodes.")
+        end
+
+        @stats.finish_request
+
+        RPC.stats(@stats)
+
+        @stdout.print("\n") if @progress
+
+        if block_given?
+          return stats
+        else
+          return [results].flatten
+        end
+      end
+
+      # Handles traditional calls to the remote agents with full stats
+      # blocks, non blocks and everything else supported.
+      #
+      # Other methods of calling the nodes can reuse this code by
+      # for example specifying custom options and discovery data
+      def call_agent(action, args, opts, disc=:auto, &block)
+        # Handle fire and forget requests and make sure
+        # the :process_results value is set appropriately
+        #
+        # specific reply-to requests should be treated like
+        # fire and forget since the client will never get
+        # the responses
+        if args[:process_results] == false || @reply_to
+          return fire_and_forget_request(action, args)
+        else
+          args[:process_results] = true
+        end
+
+        # Do discovery when no specific discovery array is given
+        #
+        # If an array is given set the force_direct_request hint that
+        # will tell the message object to be a direct request one
+        if disc == :auto
+          discovered = discover
+        else
+          @force_direct_request = true if Config.instance.direct_addressing
+          discovered = disc
+        end
+
+        req = new_request(action.to_s, args)
+
+        message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
+        message.discovered_hosts = discovered.clone
+
+        results = []
+        respcount = 0
+
+        if discovered.size > 0
+          message.type = :direct_request if @force_direct_request
+
+          if @progress && !block_given?
+            twirl = Progress.new
+            @stdout.puts
+            @stdout.print twirl.twirl(respcount, discovered.size)
+          end
+
+          aggregate = load_aggregate_functions(action, @ddl)
+
+          @client.req(message) do |resp|
+            respcount += 1
+
+            if block_given?
+              aggregate = process_results_with_block(action, resp, block, aggregate)
+            else
+              @stdout.print twirl.twirl(respcount, discovered.size) if @progress
+
+              result, aggregate = process_results_without_block(resp, action, aggregate)
+
+              results << result
+            end
+          end
+
+          @stats.aggregate_summary = aggregate.summarize if aggregate
+          @stats.aggregate_failures = aggregate.failed if aggregate
+          @stats.client_stats = @client.stats
+        else
+          @stderr.print("\nNo request sent, we did not discover any nodes.")
+        end
+
+        @stats.finish_request
+
+        RPC.stats(@stats)
+
+        @stdout.print("\n\n") if @progress
+
+        if block_given?
+          return stats
+        else
+          return [results].flatten
+        end
+      end
+
+      # Handles result sets that has no block associated, sets fails and ok
+      # in the stats object and return a hash of the response to send to the
+      # caller
+      def process_results_without_block(resp, action, aggregate)
+        @stats.node_responded(resp[:senderid])
+
+        result = rpc_result_from_reply(@agent, action, resp)
+        aggregate = aggregate_reply(result, aggregate) if aggregate
+
+        if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
+          @stats.ok if resp[:body][:statuscode] == 0
+          @stats.fail if resp[:body][:statuscode] == 1
+        else
+          @stats.fail
+        end
+
+        [result, aggregate]
+      end
+
+      # process client requests by calling a block on each result
+      # in this mode we do not do anything fancy with the result
+      # objects and we raise exceptions if there are problems with
+      # the data
+      def process_results_with_block(action, resp, block, aggregate)
+        @stats.node_responded(resp[:senderid])
+
+        result = rpc_result_from_reply(@agent, action, resp)
+        aggregate = aggregate_reply(result, aggregate) if aggregate
+
+        if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1
+          @stats.ok if resp[:body][:statuscode] == 0
+          @stats.fail if resp[:body][:statuscode] == 1
+          @stats.time_block_execution :start
+
+          case block.arity
+            when 1
+              block.call(resp)
+            when 2
+              block.call(resp, result)
+          end
+
+          @stats.time_block_execution :end
+        else
+          @stats.fail
+
+          case resp[:body][:statuscode]
+            when 2
+              raise UnknownRPCAction, resp[:body][:statusmsg]
+            when 3
+              raise MissingRPCData, resp[:body][:statusmsg]
+            when 4
+              raise InvalidRPCData, resp[:body][:statusmsg]
+            when 5
+              raise UnknownRPCError, resp[:body][:statusmsg]
+          end
+        end
+
+        return aggregate
+      end
+    end
+  end
+end