Added mcollective 2.3.1 package
[packages/trusty/mcollective.git] / lib / mcollective / aggregate.rb
1 module MCollective
2   class Aggregate
3     autoload :Result, 'mcollective/aggregate/result'
4     autoload :Base, 'mcollective/aggregate/base'
5
6     attr_accessor :ddl, :functions, :action, :failed
7
8     def initialize(ddl)
9       @functions = []
10       @ddl = ddl
11       @action = ddl[:action]
12       @failed = []
13
14       create_functions
15     end
16
17     # Creates instances of the Aggregate functions and stores them in the function array.
18     # All aggregate call and summarize method calls operate on these function as a batch.
19     def create_functions
20       @ddl[:aggregate].each_with_index do |agg, i|
21         output = agg[:args][0]
22
23         if contains_output?(output)
24           arguments = agg[:args][1]
25           format = (arguments.delete(:format) if arguments) || nil
26           begin
27             @functions << load_function(agg[:function]).new(output, arguments, format, @action)
28           rescue Exception => e
29             Log.error("Cannot create aggregate function '#{output}'. #{e.to_s}")
30             @failed << {:name => output, :type => :startup}
31           end
32         else
33           Log.error("Cannot create aggregate function '#{output}'. '#{output}' has not been specified as a valid ddl output.")
34           @failed << {:name => output, :type => :create}
35         end
36       end
37     end
38
39     # Check if the function param is defined as an output for the action in the ddl
40     def contains_output?(output)
41       @ddl[:output].keys.include?(output)
42     end
43
44     # Call all the appropriate functions with the reply data received from RPC::Client
45     def call_functions(reply)
46       @functions.each do |function|
47         Log.debug("Calling aggregate function #{function} for result")
48         begin
49           function.process_result(reply[:data][function.output_name], reply)
50         rescue Exception => e
51           Log.error("Could not process aggregate function for '#{function.output_name}'. #{e.to_s}")
52           @failed << {:name => function.output_name, :type => :process_result}
53           @functions.delete(function)
54         end
55       end
56     end
57
58     # Finalizes the function returning a result object
59     def summarize
60       summary = @functions.map do |function|
61         begin
62           function.summarize
63         rescue Exception => e
64           Log.error("Could not summarize aggregate result for '#{function.output_name}'. #{e.to_s}")
65           @failed << {:name => function.output_name, :type => :summarize}
66           nil
67         end
68       end
69
70       summary.reject{|x| x.nil?}.sort do |x,y|
71         x.result[:output] <=> y.result[:output]
72       end
73     end
74
75     # Loads function from disk for use
76     def load_function(function_name)
77       function_name = function_name.to_s.capitalize
78
79       PluginManager.loadclass("MCollective::Aggregate::#{function_name}") unless Aggregate.const_defined?(function_name)
80       Aggregate.const_get(function_name)
81     rescue Exception
82       raise "Aggregate function file '#{function_name.downcase}.rb' cannot be loaded"
83     end
84   end
85 end