Update version according to OSCI-856
[packages/precise/mcollective.git] / plugins / mcollective / connector / activemq.rb
index eadeb17127d5b498142a296d5325007919530c5d..dee1654ebad301077aa3afaaaf00b4f25f37b3d1 100644 (file)
@@ -75,6 +75,23 @@ module MCollective
     #
     #     plugin.activemq.priority = 4
     #
+    # This plugin supports Stomp protocol 1.1 when combined with the stomp gem version
+    # 1.2.10 or newer.  To enable network heartbeats which will help keep the connection
+    # alive over NAT connections and aggresive session tracking firewalls you can set:
+    #
+    #     plugin.activemq.heartbeat_interval = 30
+    #
+    # which will cause a heartbeat to be sent on 30 second intervals and one to be expected
+    # from the broker every 30 seconds.  The shortest supported period is 30 seconds, if
+    # you set it lower it will get forced to 30 seconds.
+    #
+    # After 2 failures to receive a heartbeat the connection will be reset via the normal
+    # failover mechanism.
+    #
+    # By default if heartbeat_interval is set it will request Stomp 1.1 but support fallback
+    # to 1.0, but you can enable strict Stomp 1.1 only operation
+    #
+    #     plugin.activemq.stomp_1_0_fallback = 0
     class Activemq<Base
       attr_reader :connection
 
@@ -131,6 +148,29 @@ module MCollective
           Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}")
         end
 
+        # Stomp 1.1+ - heart beat read (receive) failed.
+        def on_hbread_fail(params, ticker_data)
+          Log.error("Heartbeat read failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
+        rescue Exception => e
+        end
+
+        # Stomp 1.1+ - heart beat send (transmit) failed.
+        def on_hbwrite_fail(params, ticker_data)
+          Log.error("Heartbeat write failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
+        rescue Exception => e
+        end
+
+        # Log heart beat fires
+        def on_hbfire(params, srind, curt)
+          case srind
+            when "receive_fire"
+              Log.debug("Received heartbeat from %s: %s, %s" % [stomp_url(params), srind, curt])
+            when "send_fire"
+              Log.debug("Publishing heartbeat to %s: %s, %s" % [stomp_url(params), srind, curt])
+          end
+        rescue Exception => e
+        end
+
         def stomp_url(params)
           "%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
         end
@@ -151,7 +191,7 @@ module MCollective
         end
 
         begin
-          @base64 = get_bool_option("activemq.base64", false)
+          @base64 = get_bool_option("activemq.base64", "false")
           @msgpriority = get_option("activemq.priority", 0).to_i
 
           pools = @config.pluginconf["activemq.pool.size"].to_i
@@ -164,9 +204,9 @@ module MCollective
             host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i
             host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user")
             host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password")
-            host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", false)
+            host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", "false")
 
-            host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", false)) if host[:ssl]
+            host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", "false")) if host[:ssl]
 
             Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
             hosts << host
@@ -180,14 +220,17 @@ module MCollective
           # these can be guessed, the documentation isn't clear
           connection[:initial_reconnect_delay] = Float(get_option("activemq.initial_reconnect_delay", 0.01))
           connection[:max_reconnect_delay] = Float(get_option("activemq.max_reconnect_delay", 30.0))
-          connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", true)
+          connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", "true")
           connection[:back_off_multiplier] = Integer(get_option("activemq.back_off_multiplier", 2))
           connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0))
-          connection[:randomize] = get_bool_option("activemq.randomize", false)
-          connection[:backup] = get_bool_option("activemq.backup", false)
+          connection[:randomize] = get_bool_option("activemq.randomize", "false")
+          connection[:backup] = get_bool_option("activemq.backup", "false")
           connection[:timeout] = Integer(get_option("activemq.timeout", -1))
           connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30))
           connection[:reliable] = true
+          connection[:connect_headers] = connection_headers
+          connection[:max_hbrlck_fails] = Integer(get_option("activemq.max_hbrlck_fails", 2))
+          connection[:max_hbread_fails] = Integer(get_option("activemq.max_hbread_fails", 2))
 
           connection[:logger] = EventLogger.new
 
@@ -197,10 +240,47 @@ module MCollective
         end
       end
 
+      def stomp_version
+        ::Stomp::Version::STRING
+      end
+
+      def connection_headers
+        headers = {:"accept-version" => "1.0"}
+
+        heartbeat_interval = Integer(get_option("activemq.heartbeat_interval", 0))
+        stomp_1_0_fallback = get_bool_option("activemq.stomp_1_0_fallback", true)
+
+        headers[:host] = get_option("activemq.vhost", "mcollective")
+
+        if heartbeat_interval > 0
+          unless Util.versioncmp(stomp_version, "1.2.10") >= 0
+            raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
+          end
+
+          if heartbeat_interval < 30
+            Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
+            heartbeat_interval = 30
+          end
+
+          heartbeat_interval = heartbeat_interval * 1000
+          headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]
+
+          if stomp_1_0_fallback
+            headers[:"accept-version"] = "1.1,1.0"
+          else
+            headers[:"accept-version"] = "1.1"
+          end
+        else
+          Log.warn("Connecting without STOMP 1.1 heartbeats, if you are using ActiveMQ 5.8 or newer consider setting plugin.activemq.heartbeat_interval")
+        end
+
+        headers
+      end
+
       # Sets the SSL paramaters for a specific connection
       def ssl_parameters(poolnum, fallback)
-        params = {:cert_file => get_option("activemq.pool.#{poolnum}.ssl.cert", false),
-                  :key_file  => get_option("activemq.pool.#{poolnum}.ssl.key", false),
+        params = {:cert_file => get_cert_file(poolnum),
+                  :key_file => get_key_file(poolnum),
                   :ts_files  => get_option("activemq.pool.#{poolnum}.ssl.ca", false)}
 
         raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
@@ -228,6 +308,22 @@ module MCollective
         end
       end
 
+      # Returns the name of the private key file used by ActiveMQ
+      # Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists,
+      # where X is the ActiveMQ pool number.
+      # If the environment variable doesn't exist, it will try and load the value from the config.
+      def get_key_file(poolnum)
+        ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_KEY" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.key", false)
+      end
+
+      # Returns the name of the certficate file used by ActiveMQ
+      # Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists,
+      # where X is the ActiveMQ pool number.
+      # If the environment variable doesn't exist, it will try and load the value from the config.
+      def get_cert_file(poolnum)
+        ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_CERT" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.cert", false)
+      end
+
       # Receives a message from the ActiveMQ connection
       def receive
         Log.debug("Waiting for a message from ActiveMQ")
@@ -312,8 +408,15 @@ module MCollective
 
       def headers_for(msg, identity=nil)
         headers = {}
+
         headers = {"priority" => @msgpriority} if @msgpriority > 0
 
+        headers["timestamp"] = (Time.now.utc.to_i * 1000).to_s
+
+        # set the expires header based on the TTL, we build a small additional
+        # timeout of 10 seconds in here to allow for network latency etc
+        headers["expires"] = ((Time.now.utc.to_i + msg.ttl + 10) * 1000).to_s
+
         if [:request, :direct_request].include?(msg.type)
           target = make_target(msg.agent, :reply, msg.collective)
 
@@ -381,19 +484,9 @@ module MCollective
         raise("No plugin.#{opt} configuration option given")
       end
 
-      # gets a boolean option from the config, supports y/n/true/false/1/0
-      def get_bool_option(opt, default)
-        return default unless @config.pluginconf.include?(opt)
-
-        val = @config.pluginconf[opt]
-
-        if val =~ /^1|yes|true/
-          return true
-        elsif val =~ /^0|no|false/
-          return false
-        else
-          return default
-        end
+      # looks up a boolean value in the config
+      def get_bool_option(val, default)
+        Util.str_to_bool(@config.pluginconf.fetch(val, default))
       end
     end
   end