X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=plugins%2Fmcollective%2Fconnector%2Factivemq.rb;h=dee1654ebad301077aa3afaaaf00b4f25f37b3d1;hb=d1f1649ba43c5cbc43c4beb2380096ba051d646a;hp=cb14647ef1b8c09602180f49a443fd772e086934;hpb=b87d2f4e68281062df1913440ca5753ae63314a9;p=packages%2Fprecise%2Fmcollective.git diff --git a/plugins/mcollective/connector/activemq.rb b/plugins/mcollective/connector/activemq.rb index cb14647..dee1654 100644 --- a/plugins/mcollective/connector/activemq.rb +++ b/plugins/mcollective/connector/activemq.rb @@ -32,7 +32,7 @@ module MCollective # plugin.activemq.pool.size = 2 # # plugin.activemq.pool.1.host = stomp1.your.net - # plugin.activemq.pool.1.port = 6163 + # plugin.activemq.pool.1.port = 61613 # plugin.activemq.pool.1.user = you # plugin.activemq.pool.1.password = secret # plugin.activemq.pool.1.ssl = true @@ -42,7 +42,7 @@ module MCollective # plugin.activemq.pool.1.ssl.fallback = true # # plugin.activemq.pool.2.host = stomp2.your.net - # plugin.activemq.pool.2.port = 6163 + # plugin.activemq.pool.2.port = 61613 # plugin.activemq.pool.2.user = you # plugin.activemq.pool.2.password = secret # plugin.activemq.pool.2.ssl = false @@ -75,6 +75,23 @@ module MCollective # # plugin.activemq.priority = 4 # + # This plugin supports Stomp protocol 1.1 when combined with the stomp gem version + # 1.2.10 or newer. To enable network heartbeats which will help keep the connection + # alive over NAT connections and aggresive session tracking firewalls you can set: + # + # plugin.activemq.heartbeat_interval = 30 + # + # which will cause a heartbeat to be sent on 30 second intervals and one to be expected + # from the broker every 30 seconds. The shortest supported period is 30 seconds, if + # you set it lower it will get forced to 30 seconds. + # + # After 2 failures to receive a heartbeat the connection will be reset via the normal + # failover mechanism. + # + # By default if heartbeat_interval is set it will request Stomp 1.1 but support fallback + # to 1.0, but you can enable strict Stomp 1.1 only operation + # + # plugin.activemq.stomp_1_0_fallback = 0 class Activemq e + end + + # Stomp 1.1+ - heart beat send (transmit) failed. + def on_hbwrite_fail(params, ticker_data) + Log.error("Heartbeat write failed from '%s': %s" % [stomp_url(params), ticker_data.inspect]) + rescue Exception => e + end + + # Log heart beat fires + def on_hbfire(params, srind, curt) + case srind + when "receive_fire" + Log.debug("Received heartbeat from %s: %s, %s" % [stomp_url(params), srind, curt]) + when "send_fire" + Log.debug("Publishing heartbeat to %s: %s, %s" % [stomp_url(params), srind, curt]) + end + rescue Exception => e + end + def stomp_url(params) "%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]] end @@ -151,7 +191,7 @@ module MCollective end begin - @base64 = get_bool_option("activemq.base64", false) + @base64 = get_bool_option("activemq.base64", "false") @msgpriority = get_option("activemq.priority", 0).to_i pools = @config.pluginconf["activemq.pool.size"].to_i @@ -161,12 +201,12 @@ module MCollective host = {} host[:host] = get_option("activemq.pool.#{poolnum}.host") - host[:port] = get_option("activemq.pool.#{poolnum}.port", 6163).to_i + host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user") host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password") - host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", false) + host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", "false") - host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", false)) if host[:ssl] + host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", "false")) if host[:ssl] Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool") hosts << host @@ -180,14 +220,17 @@ module MCollective # these can be guessed, the documentation isn't clear connection[:initial_reconnect_delay] = Float(get_option("activemq.initial_reconnect_delay", 0.01)) connection[:max_reconnect_delay] = Float(get_option("activemq.max_reconnect_delay", 30.0)) - connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", true) + connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", "true") connection[:back_off_multiplier] = Integer(get_option("activemq.back_off_multiplier", 2)) connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0)) - connection[:randomize] = get_bool_option("activemq.randomize", false) - connection[:backup] = get_bool_option("activemq.backup", false) + connection[:randomize] = get_bool_option("activemq.randomize", "false") + connection[:backup] = get_bool_option("activemq.backup", "false") connection[:timeout] = Integer(get_option("activemq.timeout", -1)) connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30)) connection[:reliable] = true + connection[:connect_headers] = connection_headers + connection[:max_hbrlck_fails] = Integer(get_option("activemq.max_hbrlck_fails", 2)) + connection[:max_hbread_fails] = Integer(get_option("activemq.max_hbread_fails", 2)) connection[:logger] = EventLogger.new @@ -197,10 +240,47 @@ module MCollective end end + def stomp_version + ::Stomp::Version::STRING + end + + def connection_headers + headers = {:"accept-version" => "1.0"} + + heartbeat_interval = Integer(get_option("activemq.heartbeat_interval", 0)) + stomp_1_0_fallback = get_bool_option("activemq.stomp_1_0_fallback", true) + + headers[:host] = get_option("activemq.vhost", "mcollective") + + if heartbeat_interval > 0 + unless Util.versioncmp(stomp_version, "1.2.10") >= 0 + raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem") + end + + if heartbeat_interval < 30 + Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s") + heartbeat_interval = 30 + end + + heartbeat_interval = heartbeat_interval * 1000 + headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500] + + if stomp_1_0_fallback + headers[:"accept-version"] = "1.1,1.0" + else + headers[:"accept-version"] = "1.1" + end + else + Log.warn("Connecting without STOMP 1.1 heartbeats, if you are using ActiveMQ 5.8 or newer consider setting plugin.activemq.heartbeat_interval") + end + + headers + end + # Sets the SSL paramaters for a specific connection def ssl_parameters(poolnum, fallback) - params = {:cert_file => get_option("activemq.pool.#{poolnum}.ssl.cert", false), - :key_file => get_option("activemq.pool.#{poolnum}.ssl.key", false), + params = {:cert_file => get_cert_file(poolnum), + :key_file => get_key_file(poolnum), :ts_files => get_option("activemq.pool.#{poolnum}.ssl.ca", false)} raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files] @@ -228,6 +308,22 @@ module MCollective end end + # Returns the name of the private key file used by ActiveMQ + # Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists, + # where X is the ActiveMQ pool number. + # If the environment variable doesn't exist, it will try and load the value from the config. + def get_key_file(poolnum) + ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_KEY" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.key", false) + end + + # Returns the name of the certficate file used by ActiveMQ + # Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists, + # where X is the ActiveMQ pool number. + # If the environment variable doesn't exist, it will try and load the value from the config. + def get_cert_file(poolnum) + ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_CERT" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.cert", false) + end + # Receives a message from the ActiveMQ connection def receive Log.debug("Waiting for a message from ActiveMQ") @@ -312,8 +408,15 @@ module MCollective def headers_for(msg, identity=nil) headers = {} + headers = {"priority" => @msgpriority} if @msgpriority > 0 + headers["timestamp"] = (Time.now.utc.to_i * 1000).to_s + + # set the expires header based on the TTL, we build a small additional + # timeout of 10 seconds in here to allow for network latency etc + headers["expires"] = ((Time.now.utc.to_i + msg.ttl + 10) * 1000).to_s + if [:request, :direct_request].include?(msg.type) target = make_target(msg.agent, :reply, msg.collective) @@ -381,19 +484,9 @@ module MCollective raise("No plugin.#{opt} configuration option given") end - # gets a boolean option from the config, supports y/n/true/false/1/0 - def get_bool_option(opt, default) - return default unless @config.pluginconf.include?(opt) - - val = @config.pluginconf[opt] - - if val =~ /^1|yes|true/ - return true - elsif val =~ /^0|no|false/ - return false - else - return default - end + # looks up a boolean value in the config + def get_bool_option(val, default) + Util.str_to_bool(@config.pluginconf.fetch(val, default)) end end end