19e6d1d9747e6ed975834ec684165ea432a65b8a
[packages/precise/mcollective.git] / lib / mcollective / runner.rb
1 module MCollective
2   # The main runner for the daemon, supports running in the foreground
3   # and the background, keeps detailed stats and provides hooks to access
4   # all this information
5   class Runner
6     include Translatable
7
8     def initialize(configfile)
9       @config = Config.instance
10       @config.loadconfig(configfile) unless @config.configured
11       @config.mode = :server
12
13       @stats = PluginManager["global_stats"]
14
15       @security = PluginManager["security_plugin"]
16       @security.initiated_by = :node
17
18       @connection = PluginManager["connector_plugin"]
19       @connection.connect
20
21       @agents = Agents.new
22
23       unless Util.windows?
24         Signal.trap("USR1") do
25           log_code(:PLMC2, "Reloading all agents after receiving USR1 signal", :info)
26           @agents.loadagents
27         end
28
29         Signal.trap("USR2") do
30           log_code(:PLMC3, "Cycling logging level due to USR2 signal", :info)
31
32           Log.cycle_level
33         end
34       else
35         Util.setup_windows_sleeper
36       end
37     end
38
39     # Starts the main loop, before calling this you should initialize the MCollective::Config singleton.
40     def run
41       Data.load_data_sources
42
43       Util.subscribe(Util.make_subscriptions("mcollective", :broadcast))
44       Util.subscribe(Util.make_subscriptions("mcollective", :directed)) if @config.direct_addressing
45
46       # Start the registration plugin if interval isn't 0
47       begin
48         PluginManager["registration_plugin"].run(@connection) unless @config.registerinterval == 0
49       rescue Exception => e
50         logexception(:PLMC4, "Failed to start registration plugin: %{error}", :error, e)
51       end
52
53       loop do
54         begin
55           request = receive
56
57           unless request.agent == "mcollective"
58             agentmsg(request)
59           else
60             log_code(:PLMC5, "Received a control message, possibly via 'mco controller' but this has been deprecated", :error)
61           end
62         rescue SignalException => e
63           logexception(:PLMC7, "Exiting after signal: %{error}", :warn, e)
64           @connection.disconnect
65           raise
66
67         rescue MsgTTLExpired => e
68           logexception(:PLMC9, "Expired Message: %{error}", :warn, e)
69
70         rescue NotTargettedAtUs => e
71           log_code(:PLMC6, "Message does not pass filters, ignoring", :debug)
72
73         rescue Exception => e
74           logexception(:PLMC10, "Failed to handle message: %{error}", :warn, e, true)
75         end
76       end
77     end
78
79     private
80     # Deals with messages directed to agents
81     def agentmsg(request)
82       log_code(:PLMC8, "Handling message for agent '%{agent}' on collective '%{collective} with requestid '%{requestid}'", :debug, :agent => request.agent, :collective => request.collective, :requestid => request.requestid)
83
84       @agents.dispatch(request, @connection) do |reply_message|
85         reply(reply_message, request) if reply_message
86       end
87     end
88
89     # Deals with messages sent to our control topic
90     def controlmsg(request)
91       Log.debug("Handling message for mcollectived controller")
92
93       begin
94         case request.payload[:body]
95         when /^stats$/
96           reply(@stats.to_hash, request)
97
98         when /^reload_agent (.+)$/
99           reply("reloaded #{$1} agent", request) if @agents.loadagent($1)
100
101         when /^reload_agents$/
102
103           reply("reloaded all agents", request) if @agents.loadagents
104
105         else
106           Log.error("Received an unknown message to the controller")
107
108         end
109       rescue Exception => e
110         Log.error("Failed to handle control message: #{e}")
111       end
112     end
113
114     # Receive a message from the connection handler
115     def receive
116       request = @connection.receive
117       request.type = :request
118
119       @stats.received
120
121       request.decode!
122       request.validate
123
124       request
125     end
126
127     # Sends a reply to a specific target topic
128     def reply(msg, request)
129       msg = Message.new(msg, nil, :request => request)
130       msg.encode!
131       msg.publish
132
133       @stats.sent
134     end
135   end
136 end
137