Added mcollective 2.3.1 package
[packages/trusty/mcollective.git] / lib / mcollective / agents.rb
1 module MCollective
2   # A collection of agents, loads them, reloads them and dispatches messages to them.
3   # It uses the PluginManager to store, load and manage instances of plugins.
4   class Agents
5     def initialize(agents = {})
6       @config = Config.instance
7       raise ("Configuration has not been loaded, can't load agents") unless @config.configured
8
9       @@agents = agents
10
11       loadagents
12     end
13
14     # Deletes all agents
15     def clear!
16       @@agents.each_key do |agent|
17         PluginManager.delete "#{agent}_agent"
18         Util.unsubscribe(Util.make_subscriptions(agent, :broadcast))
19       end
20
21       @@agents = {}
22     end
23
24     # Loads all agents from disk
25     def loadagents
26       Log.debug("Reloading all agents from disk")
27
28       clear!
29
30       @config.libdir.each do |libdir|
31         agentdir = "#{libdir}/mcollective/agent"
32         next unless File.directory?(agentdir)
33
34         Dir.new(agentdir).grep(/\.rb$/).each do |agent|
35           agentname = File.basename(agent, ".rb")
36           loadagent(agentname) unless PluginManager.include?("#{agentname}_agent")
37         end
38       end
39     end
40
41     # Loads a specified agent from disk if available
42     def loadagent(agentname)
43       agentfile = findagentfile(agentname)
44       return false unless agentfile
45       classname = class_for_agent(agentname)
46
47       PluginManager.delete("#{agentname}_agent")
48
49       begin
50         single_instance = ["registration", "discovery"].include?(agentname)
51
52         PluginManager.loadclass(classname)
53
54         if activate_agent?(agentname)
55           PluginManager << {:type => "#{agentname}_agent", :class => classname, :single_instance => single_instance}
56
57           # Attempt to instantiate the agent once so any validation and hooks get run
58           # this does a basic sanity check on the agent as a whole, if this fails it
59           # will be removed from the plugin list
60           PluginManager["#{agentname}_agent"]
61
62           Util.subscribe(Util.make_subscriptions(agentname, :broadcast)) unless @@agents.include?(agentname)
63
64           @@agents[agentname] = {:file => agentfile}
65           return true
66         else
67           Log.debug("Not activating agent #{agentname} due to agent policy in activate? method")
68           return false
69         end
70       rescue Exception => e
71         Log.error("Loading agent #{agentname} failed: #{e}")
72         PluginManager.delete("#{agentname}_agent")
73         return false
74       end
75     end
76
77     # Builds a class name string given a Agent name
78     def class_for_agent(agent)
79       "MCollective::Agent::#{agent.capitalize}"
80     end
81
82     # Checks if a plugin should be activated by
83     # calling #activate? on it if it responds to
84     # that method else always activate it
85     def activate_agent?(agent)
86       klass = Kernel.const_get("MCollective").const_get("Agent").const_get(agent.capitalize)
87
88       if klass.respond_to?("activate?")
89         return klass.activate?
90       else
91         Log.debug("#{klass} does not have an activate? method, activating as default")
92         return true
93       end
94     rescue Exception => e
95       Log.warn("Agent activation check for #{agent} failed: #{e.class}: #{e}")
96       return false
97     end
98
99     # searches the libdirs for agents
100     def findagentfile(agentname)
101       @config.libdir.each do |libdir|
102         agentfile = File.join([libdir, "mcollective", "agent", "#{agentname}.rb"])
103         if File.exist?(agentfile)
104           Log.debug("Found #{agentname} at #{agentfile}")
105           return agentfile
106         end
107       end
108       return false
109     end
110
111     # Determines if we have an agent with a certain name
112     def include?(agentname)
113       PluginManager.include?("#{agentname}_agent")
114     end
115
116     # Dispatches a message to an agent, accepts a block that will get run if there are
117     # any replies to process from the agent
118     def dispatch(request, connection)
119       Log.debug("Dispatching a message to agent #{request.agent}")
120
121       Thread.new do
122         begin
123           agent = PluginManager["#{request.agent}_agent"]
124
125           Timeout::timeout(agent.timeout) do
126             replies = agent.handlemsg(request.payload, connection)
127
128             # Agents can decide if they wish to reply or not,
129             # returning nil will mean nothing goes back to the
130             # requestor
131             unless replies == nil
132               yield(replies)
133             end
134           end
135         rescue Timeout::Error => e
136           Log.warn("Timeout while handling message for #{request.agent}")
137         rescue Exception => e
138           Log.error("Execution of #{request.agent} failed: #{e}")
139           Log.error(e.backtrace.join("\n\t\t"))
140         end
141       end
142     end
143
144     # Get a list of agents that we have
145     def self.agentlist
146       @@agents.keys
147     end
148   end
149 end