Update version according to OSCI-856
[packages/precise/mcollective.git] / plugins / mcollective / connector / rabbitmq.rb
index f3e2dbe94abbcb17d96e144890cb56689f7f7313..f0d479e7a82c4fe75b1fcc05a96736a3ec2f858b 100644 (file)
@@ -45,6 +45,29 @@ module MCollective
           Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}")
         end
 
+        # Stomp 1.1+ - heart beat read (receive) failed.
+        def on_hbread_fail(params, ticker_data)
+          Log.error("Heartbeat read failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
+        rescue Exception => e
+        end
+
+        # Stomp 1.1+ - heart beat send (transmit) failed.
+        def on_hbwrite_fail(params, ticker_data)
+          Log.error("Heartbeat write failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
+        rescue Exception => e
+        end
+
+        # Log heart beat fires
+        def on_hbfire(params, srind, curt)
+          case srind
+            when "receive_fire"
+              Log.debug("Received heartbeat from %s: %s, %s" % [stomp_url(params), srind, curt])
+            when "send_fire"
+              Log.debug("Publishing heartbeat to %s: %s, %s" % [stomp_url(params), srind, curt])
+          end
+        rescue Exception => e
+        end
+
         def stomp_url(params)
           "%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
         end
@@ -64,7 +87,7 @@ module MCollective
         end
 
         begin
-          @base64 = get_bool_option("rabbitmq.base64", false)
+          @base64 = get_bool_option("rabbitmq.base64", "false")
 
           pools = @config.pluginconf["rabbitmq.pool.size"].to_i
           hosts = []
@@ -76,9 +99,8 @@ module MCollective
             host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 61613).to_i
             host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user")
             host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password")
-            host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", false)
-
-            host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", false)) if host[:ssl]
+            host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", "false")
+            host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", "false")) if host[:ssl]
 
             Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
             hosts << host
@@ -92,18 +114,19 @@ module MCollective
           # these can be guessed, the documentation isn't clear
           connection[:initial_reconnect_delay] = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01))
           connection[:max_reconnect_delay] = Float(get_option("rabbitmq.max_reconnect_delay", 30.0))
-          connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", true)
+          connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", "true")
           connection[:back_off_multiplier] = Integer(get_option("rabbitmq.back_off_multiplier", 2))
           connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0))
-          connection[:randomize] = get_bool_option("rabbitmq.randomize", false)
-          connection[:backup] = get_bool_option("rabbitmq.backup", false)
+          connection[:randomize] = get_bool_option("rabbitmq.randomize", "false")
+          connection[:backup] = get_bool_option("rabbitmq.backup", "false")
+
           connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1))
           connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30))
           connection[:reliable] = true
+          connection[:max_hbrlck_fails] = Integer(get_option("rabbitmq.max_hbrlck_fails", 2))
+          connection[:max_hbread_fails] = Integer(get_option("rabbitmq.max_hbread_fails", 2))
 
-          # RabbitMQ and Stomp supports vhosts, this sets it in a way compatible with RabbitMQ and
-          # force the version to 1.0, 1.1 support will be added in future
-          connection[:connect_headers] = {"accept-version" => '1.0', "host" => get_option("rabbitmq.vhost", "/")}
+          connection[:connect_headers] = connection_headers
 
           connection[:logger] = EventLogger.new
 
@@ -113,10 +136,47 @@ module MCollective
         end
       end
 
+      def connection_headers
+        headers = {:"accept-version" => "1.0"}
+
+        heartbeat_interval = Integer(get_option("rabbitmq.heartbeat_interval", 0))
+        stomp_1_0_fallback = get_bool_option("rabbitmq.stomp_1_0_fallback", true)
+
+        headers[:host] = get_option("rabbitmq.vhost", "/")
+
+        if heartbeat_interval > 0
+          unless Util.versioncmp(stomp_version, "1.2.10") >= 0
+            raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
+          end
+
+          if heartbeat_interval < 30
+            Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
+            heartbeat_interval = 30
+          end
+
+          heartbeat_interval = heartbeat_interval * 1000
+          headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]
+
+          if stomp_1_0_fallback
+            headers[:"accept-version"] = "1.1,1.0"
+          else
+            headers[:"accept-version"] = "1.1"
+          end
+        else
+          Log.warn("Connecting without STOMP 1.1 heartbeats, consider setting plugin.rabbitmq.heartbeat_interval")
+        end
+
+        headers
+      end
+
+      def stomp_version
+        ::Stomp::Version::STRING
+      end
+
       # Sets the SSL paramaters for a specific connection
       def ssl_parameters(poolnum, fallback)
-        params = {:cert_file => get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false),
-                  :key_file  => get_option("rabbitmq.pool.#{poolnum}.ssl.key", false),
+        params = {:cert_file => get_cert_file(poolnum),
+                  :key_file  => get_key_file(poolnum),
                   :ts_files  => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false)}
 
         raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
@@ -144,6 +204,22 @@ module MCollective
         end
       end
 
+      # Returns the name of the private key file used by RabbitMQ
+      # Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_KEY exists,
+      # where X is the RabbitMQ pool number.
+      # If the environment variable doesn't exist, it will try and load the value from the config.
+      def get_key_file(poolnum)
+        ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_KEY" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.key", false)
+      end
+
+      # Returns the name of the certificate file used by RabbitMQ
+      # Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_CERT exists,
+      # where X is the RabbitMQ pool number.
+      # If the environment variable doesn't exist, it will try and load the value from the config.
+      def get_cert_file(poolnum)
+        ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_CERT" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false)
+      end
+
       # Receives a message from the RabbitMQ connection
       def receive
         Log.debug("Waiting for a message from RabbitMQ")
@@ -197,6 +273,10 @@ module MCollective
 
         end
 
+        # marks messages as valid for ttl + 10 seconds, we do this here
+        # rather than in make_target as this should only be set on publish
+        target[:headers]["expiration"] = ((msg.ttl + 10) * 1000).to_s
+
         return target
       end
 
@@ -206,9 +286,14 @@ module MCollective
 
         target = {:name => "", :headers => {}, :id => nil}
 
+        if get_bool_option("rabbitmq.use_reply_exchange", false)
+          reply_path = "/exchange/mcollective_reply/%s_%s" % [ @config.identity, $$ ]
+        else
+          reply_path = "/temp-queue/mcollective_reply_%s" % agent
+        end
         case type
           when :reply # receiving replies on a temp queue
-            target[:name] = "/temp-queue/mcollective_reply_%s" % agent
+            target[:name] = reply_path
             target[:id] = "mcollective_%s_replies" % agent
 
           when :broadcast, :request # publishing a request to all nodes with an agent
@@ -216,7 +301,7 @@ module MCollective
             if reply_to
               target[:headers]["reply-to"] = reply_to
             else
-              target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
+              target[:headers]["reply-to"] = reply_path
             end
             target[:id] = "%s_broadcast_%s" % [collective, agent]
 
@@ -224,11 +309,11 @@ module MCollective
             raise "Directed requests need to have a node identity" unless node
 
             target[:name] = "/exchange/%s_directed/%s" % [ collective, node]
-            target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
+            target[:headers]["reply-to"] = reply_path
 
           when :directed # subscribing to directed messages
             target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ]
-            target[:id] = "%s_directed_to_identity" % @config.identity
+            target[:id] = "%s_%s_directed_to_identity" % [ collective, @config.identity ]
         end
 
         target
@@ -289,19 +374,9 @@ module MCollective
         raise("No plugin.#{opt} configuration option given")
       end
 
-      # gets a boolean option from the config, supports y/n/true/false/1/0
-      def get_bool_option(opt, default)
-        return default unless @config.pluginconf.include?(opt)
-
-        val = @config.pluginconf[opt]
-
-        if val =~ /^1|yes|true/
-          return true
-        elsif val =~ /^0|no|false/
-          return false
-        else
-          return default
-        end
+      # looks up a boolean value in the config
+      def get_bool_option(val, default)
+        Util.str_to_bool(@config.pluginconf.fetch(val, default))
       end
     end
   end