Updated mcollective.init according to OSCI-658
[packages/precise/mcollective.git] / lib / mcollective / runner.rb
diff --git a/lib/mcollective/runner.rb b/lib/mcollective/runner.rb
new file mode 100644 (file)
index 0000000..19e6d1d
--- /dev/null
@@ -0,0 +1,137 @@
+module MCollective
+  # The main runner for the daemon, supports running in the foreground
+  # and the background, keeps detailed stats and provides hooks to access
+  # all this information
+  class Runner
+    include Translatable
+
+    def initialize(configfile)
+      @config = Config.instance
+      @config.loadconfig(configfile) unless @config.configured
+      @config.mode = :server
+
+      @stats = PluginManager["global_stats"]
+
+      @security = PluginManager["security_plugin"]
+      @security.initiated_by = :node
+
+      @connection = PluginManager["connector_plugin"]
+      @connection.connect
+
+      @agents = Agents.new
+
+      unless Util.windows?
+        Signal.trap("USR1") do
+          log_code(:PLMC2, "Reloading all agents after receiving USR1 signal", :info)
+          @agents.loadagents
+        end
+
+        Signal.trap("USR2") do
+          log_code(:PLMC3, "Cycling logging level due to USR2 signal", :info)
+
+          Log.cycle_level
+        end
+      else
+        Util.setup_windows_sleeper
+      end
+    end
+
+    # Starts the main loop, before calling this you should initialize the MCollective::Config singleton.
+    def run
+      Data.load_data_sources
+
+      Util.subscribe(Util.make_subscriptions("mcollective", :broadcast))
+      Util.subscribe(Util.make_subscriptions("mcollective", :directed)) if @config.direct_addressing
+
+      # Start the registration plugin if interval isn't 0
+      begin
+        PluginManager["registration_plugin"].run(@connection) unless @config.registerinterval == 0
+      rescue Exception => e
+        logexception(:PLMC4, "Failed to start registration plugin: %{error}", :error, e)
+      end
+
+      loop do
+        begin
+          request = receive
+
+          unless request.agent == "mcollective"
+            agentmsg(request)
+          else
+            log_code(:PLMC5, "Received a control message, possibly via 'mco controller' but this has been deprecated", :error)
+          end
+        rescue SignalException => e
+          logexception(:PLMC7, "Exiting after signal: %{error}", :warn, e)
+          @connection.disconnect
+          raise
+
+        rescue MsgTTLExpired => e
+          logexception(:PLMC9, "Expired Message: %{error}", :warn, e)
+
+        rescue NotTargettedAtUs => e
+          log_code(:PLMC6, "Message does not pass filters, ignoring", :debug)
+
+        rescue Exception => e
+          logexception(:PLMC10, "Failed to handle message: %{error}", :warn, e, true)
+        end
+      end
+    end
+
+    private
+    # Deals with messages directed to agents
+    def agentmsg(request)
+      log_code(:PLMC8, "Handling message for agent '%{agent}' on collective '%{collective} with requestid '%{requestid}'", :debug, :agent => request.agent, :collective => request.collective, :requestid => request.requestid)
+
+      @agents.dispatch(request, @connection) do |reply_message|
+        reply(reply_message, request) if reply_message
+      end
+    end
+
+    # Deals with messages sent to our control topic
+    def controlmsg(request)
+      Log.debug("Handling message for mcollectived controller")
+
+      begin
+        case request.payload[:body]
+        when /^stats$/
+          reply(@stats.to_hash, request)
+
+        when /^reload_agent (.+)$/
+          reply("reloaded #{$1} agent", request) if @agents.loadagent($1)
+
+        when /^reload_agents$/
+
+          reply("reloaded all agents", request) if @agents.loadagents
+
+        else
+          Log.error("Received an unknown message to the controller")
+
+        end
+      rescue Exception => e
+        Log.error("Failed to handle control message: #{e}")
+      end
+    end
+
+    # Receive a message from the connection handler
+    def receive
+      request = @connection.receive
+      request.type = :request
+
+      @stats.received
+
+      request.decode!
+      request.validate
+
+      request
+    end
+
+    # Sends a reply to a specific target topic
+    def reply(msg, request)
+      msg = Message.new(msg, nil, :request => request)
+      msg.encode!
+      msg.publish
+
+      @stats.sent
+    end
+  end
+end
+