3 autoload :Result, 'mcollective/aggregate/result'
4 autoload :Base, 'mcollective/aggregate/base'
6 attr_accessor :ddl, :functions, :action, :failed
11 @action = ddl[:action]
17 # Creates instances of the Aggregate functions and stores them in the function array.
18 # All aggregate call and summarize method calls operate on these function as a batch.
20 @ddl[:aggregate].each_with_index do |agg, i|
21 output = agg[:args][0]
23 if contains_output?(output)
24 arguments = agg[:args][1]
25 format = (arguments.delete(:format) if arguments) || nil
27 @functions << load_function(agg[:function]).new(output, arguments, format, @action)
29 Log.error("Cannot create aggregate function '#{output}'. #{e.to_s}")
30 @failed << {:name => output, :type => :startup}
33 Log.error("Cannot create aggregate function '#{output}'. '#{output}' has not been specified as a valid ddl output.")
34 @failed << {:name => output, :type => :create}
39 # Check if the function param is defined as an output for the action in the ddl
40 def contains_output?(output)
41 @ddl[:output].keys.include?(output)
44 # Call all the appropriate functions with the reply data received from RPC::Client
45 def call_functions(reply)
46 @functions.each do |function|
47 Log.debug("Calling aggregate function #{function} for result")
49 function.process_result(reply[:data][function.output_name], reply)
51 Log.error("Could not process aggregate function for '#{function.output_name}'. #{e.to_s}")
52 @failed << {:name => function.output_name, :type => :process_result}
53 @functions.delete(function)
58 # Finalizes the function returning a result object
60 summary = @functions.map do |function|
64 Log.error("Could not summarize aggregate result for '#{function.output_name}'. #{e.to_s}")
65 @failed << {:name => function.output_name, :type => :summarize}
70 summary.reject{|x| x.nil?}.sort do |x,y|
71 x.result[:output] <=> y.result[:output]
75 # Loads function from disk for use
76 def load_function(function_name)
77 function_name = function_name.to_s.capitalize
79 PluginManager.loadclass("MCollective::Aggregate::#{function_name}") unless Aggregate.const_defined?(function_name)
80 Aggregate.const_get(function_name)
82 raise "Aggregate function file '#{function_name.downcase}.rb' cannot be loaded"