2 # A collection of agents, loads them, reloads them and dispatches messages to them.
3 # It uses the PluginManager to store, load and manage instances of plugins.
5 def initialize(agents = {})
6 @config = Config.instance
7 raise ("Configuration has not been loaded, can't load agents") unless @config.configured
16 @@agents.each_key do |agent|
17 PluginManager.delete "#{agent}_agent"
18 Util.unsubscribe(Util.make_subscriptions(agent, :broadcast))
24 # Loads all agents from disk
26 Log.debug("Reloading all agents from disk")
30 @config.libdir.each do |libdir|
31 agentdir = "#{libdir}/mcollective/agent"
32 next unless File.directory?(agentdir)
34 Dir.new(agentdir).grep(/\.rb$/).each do |agent|
35 agentname = File.basename(agent, ".rb")
36 loadagent(agentname) unless PluginManager.include?("#{agentname}_agent")
41 # Loads a specified agent from disk if available
42 def loadagent(agentname)
43 agentfile = findagentfile(agentname)
44 return false unless agentfile
45 classname = class_for_agent(agentname)
47 PluginManager.delete("#{agentname}_agent")
50 single_instance = ["registration", "discovery"].include?(agentname)
52 PluginManager.loadclass(classname)
54 if activate_agent?(agentname)
55 PluginManager << {:type => "#{agentname}_agent", :class => classname, :single_instance => single_instance}
57 # Attempt to instantiate the agent once so any validation and hooks get run
58 # this does a basic sanity check on the agent as a whole, if this fails it
59 # will be removed from the plugin list
60 PluginManager["#{agentname}_agent"]
62 Util.subscribe(Util.make_subscriptions(agentname, :broadcast)) unless @@agents.include?(agentname)
64 @@agents[agentname] = {:file => agentfile}
67 Log.debug("Not activating agent #{agentname} due to agent policy in activate? method")
71 Log.error("Loading agent #{agentname} failed: #{e}")
72 PluginManager.delete("#{agentname}_agent")
77 # Builds a class name string given a Agent name
78 def class_for_agent(agent)
79 "MCollective::Agent::#{agent.capitalize}"
82 # Checks if a plugin should be activated by
83 # calling #activate? on it if it responds to
84 # that method else always activate it
85 def activate_agent?(agent)
86 klass = Kernel.const_get("MCollective").const_get("Agent").const_get(agent.capitalize)
88 if klass.respond_to?("activate?")
89 return klass.activate?
91 Log.debug("#{klass} does not have an activate? method, activating as default")
95 Log.warn("Agent activation check for #{agent} failed: #{e.class}: #{e}")
99 # searches the libdirs for agents
100 def findagentfile(agentname)
101 @config.libdir.each do |libdir|
102 agentfile = File.join([libdir, "mcollective", "agent", "#{agentname}.rb"])
103 if File.exist?(agentfile)
104 Log.debug("Found #{agentname} at #{agentfile}")
111 # Determines if we have an agent with a certain name
112 def include?(agentname)
113 PluginManager.include?("#{agentname}_agent")
116 # Dispatches a message to an agent, accepts a block that will get run if there are
117 # any replies to process from the agent
118 def dispatch(request, connection)
119 Log.debug("Dispatching a message to agent #{request.agent}")
123 agent = PluginManager["#{request.agent}_agent"]
125 Timeout::timeout(agent.timeout) do
126 replies = agent.handlemsg(request.payload, connection)
128 # Agents can decide if they wish to reply or not,
129 # returning nil will mean nothing goes back to the
131 unless replies == nil
135 rescue Timeout::Error => e
136 Log.warn("Timeout while handling message for #{request.agent}")
137 rescue Exception => e
138 Log.error("Execution of #{request.agent} failed: #{e}")
139 Log.error(e.backtrace.join("\n\t\t"))
144 # Get a list of agents that we have