#
# plugin.activemq.priority = 4
#
+ # This plugin supports Stomp protocol 1.1 when combined with the stomp gem version
+ # 1.2.10 or newer. To enable network heartbeats which will help keep the connection
+ # alive over NAT connections and aggresive session tracking firewalls you can set:
+ #
+ # plugin.activemq.heartbeat_interval = 30
+ #
+ # which will cause a heartbeat to be sent on 30 second intervals and one to be expected
+ # from the broker every 30 seconds. The shortest supported period is 30 seconds, if
+ # you set it lower it will get forced to 30 seconds.
+ #
+ # After 2 failures to receive a heartbeat the connection will be reset via the normal
+ # failover mechanism.
+ #
+ # By default if heartbeat_interval is set it will request Stomp 1.1 but support fallback
+ # to 1.0, but you can enable strict Stomp 1.1 only operation
+ #
+ # plugin.activemq.stomp_1_0_fallback = 0
class Activemq<Base
attr_reader :connection
Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}")
end
+ # Stomp 1.1+ - heart beat read (receive) failed.
+ def on_hbread_fail(params, ticker_data)
+ Log.error("Heartbeat read failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
+ rescue Exception => e
+ end
+
+ # Stomp 1.1+ - heart beat send (transmit) failed.
+ def on_hbwrite_fail(params, ticker_data)
+ Log.error("Heartbeat write failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
+ rescue Exception => e
+ end
+
+ # Log heart beat fires
+ def on_hbfire(params, srind, curt)
+ case srind
+ when "receive_fire"
+ Log.debug("Received heartbeat from %s: %s, %s" % [stomp_url(params), srind, curt])
+ when "send_fire"
+ Log.debug("Publishing heartbeat to %s: %s, %s" % [stomp_url(params), srind, curt])
+ end
+ rescue Exception => e
+ end
+
def stomp_url(params)
"%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
end
end
begin
- @base64 = get_bool_option("activemq.base64", false)
+ @base64 = get_bool_option("activemq.base64", "false")
@msgpriority = get_option("activemq.priority", 0).to_i
pools = @config.pluginconf["activemq.pool.size"].to_i
host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i
host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user")
host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password")
- host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", false)
+ host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", "false")
- host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", false)) if host[:ssl]
+ host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", "false")) if host[:ssl]
Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
hosts << host
# these can be guessed, the documentation isn't clear
connection[:initial_reconnect_delay] = Float(get_option("activemq.initial_reconnect_delay", 0.01))
connection[:max_reconnect_delay] = Float(get_option("activemq.max_reconnect_delay", 30.0))
- connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", true)
+ connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", "true")
connection[:back_off_multiplier] = Integer(get_option("activemq.back_off_multiplier", 2))
connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0))
- connection[:randomize] = get_bool_option("activemq.randomize", false)
- connection[:backup] = get_bool_option("activemq.backup", false)
+ connection[:randomize] = get_bool_option("activemq.randomize", "false")
+ connection[:backup] = get_bool_option("activemq.backup", "false")
connection[:timeout] = Integer(get_option("activemq.timeout", -1))
connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30))
connection[:reliable] = true
+ connection[:connect_headers] = connection_headers
+ connection[:max_hbrlck_fails] = Integer(get_option("activemq.max_hbrlck_fails", 2))
+ connection[:max_hbread_fails] = Integer(get_option("activemq.max_hbread_fails", 2))
connection[:logger] = EventLogger.new
end
end
+ def stomp_version
+ ::Stomp::Version::STRING
+ end
+
+ def connection_headers
+ headers = {:"accept-version" => "1.0"}
+
+ heartbeat_interval = Integer(get_option("activemq.heartbeat_interval", 0))
+ stomp_1_0_fallback = get_bool_option("activemq.stomp_1_0_fallback", true)
+
+ headers[:host] = get_option("activemq.vhost", "mcollective")
+
+ if heartbeat_interval > 0
+ unless Util.versioncmp(stomp_version, "1.2.10") >= 0
+ raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
+ end
+
+ if heartbeat_interval < 30
+ Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
+ heartbeat_interval = 30
+ end
+
+ heartbeat_interval = heartbeat_interval * 1000
+ headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]
+
+ if stomp_1_0_fallback
+ headers[:"accept-version"] = "1.1,1.0"
+ else
+ headers[:"accept-version"] = "1.1"
+ end
+ else
+ Log.warn("Connecting without STOMP 1.1 heartbeats, if you are using ActiveMQ 5.8 or newer consider setting plugin.activemq.heartbeat_interval")
+ end
+
+ headers
+ end
+
# Sets the SSL paramaters for a specific connection
def ssl_parameters(poolnum, fallback)
- params = {:cert_file => get_option("activemq.pool.#{poolnum}.ssl.cert", false),
- :key_file => get_option("activemq.pool.#{poolnum}.ssl.key", false),
+ params = {:cert_file => get_cert_file(poolnum),
+ :key_file => get_key_file(poolnum),
:ts_files => get_option("activemq.pool.#{poolnum}.ssl.ca", false)}
raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
end
end
+ # Returns the name of the private key file used by ActiveMQ
+ # Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists,
+ # where X is the ActiveMQ pool number.
+ # If the environment variable doesn't exist, it will try and load the value from the config.
+ def get_key_file(poolnum)
+ ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_KEY" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.key", false)
+ end
+
+ # Returns the name of the certficate file used by ActiveMQ
+ # Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists,
+ # where X is the ActiveMQ pool number.
+ # If the environment variable doesn't exist, it will try and load the value from the config.
+ def get_cert_file(poolnum)
+ ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_CERT" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.cert", false)
+ end
+
# Receives a message from the ActiveMQ connection
def receive
Log.debug("Waiting for a message from ActiveMQ")
def headers_for(msg, identity=nil)
headers = {}
+
headers = {"priority" => @msgpriority} if @msgpriority > 0
+ headers["timestamp"] = (Time.now.utc.to_i * 1000).to_s
+
+ # set the expires header based on the TTL, we build a small additional
+ # timeout of 10 seconds in here to allow for network latency etc
+ headers["expires"] = ((Time.now.utc.to_i + msg.ttl + 10) * 1000).to_s
+
if [:request, :direct_request].include?(msg.type)
target = make_target(msg.agent, :reply, msg.collective)
raise("No plugin.#{opt} configuration option given")
end
- # gets a boolean option from the config, supports y/n/true/false/1/0
- def get_bool_option(opt, default)
- return default unless @config.pluginconf.include?(opt)
-
- val = @config.pluginconf[opt]
-
- if val =~ /^1|yes|true/
- return true
- elsif val =~ /^0|no|false/
- return false
- else
- return default
- end
+ # looks up a boolean value in the config
+ def get_bool_option(val, default)
+ Util.str_to_bool(@config.pluginconf.fetch(val, default))
end
end
end