Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}")
end
+ # Stomp 1.1+ - heart beat read (receive) failed.
+ def on_hbread_fail(params, ticker_data)
+ Log.error("Heartbeat read failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
+ rescue Exception => e
+ end
+
+ # Stomp 1.1+ - heart beat send (transmit) failed.
+ def on_hbwrite_fail(params, ticker_data)
+ Log.error("Heartbeat write failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
+ rescue Exception => e
+ end
+
+ # Log heart beat fires
+ def on_hbfire(params, srind, curt)
+ case srind
+ when "receive_fire"
+ Log.debug("Received heartbeat from %s: %s, %s" % [stomp_url(params), srind, curt])
+ when "send_fire"
+ Log.debug("Publishing heartbeat to %s: %s, %s" % [stomp_url(params), srind, curt])
+ end
+ rescue Exception => e
+ end
+
def stomp_url(params)
"%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
end
end
begin
- @base64 = get_bool_option("rabbitmq.base64", false)
+ @base64 = get_bool_option("rabbitmq.base64", "false")
pools = @config.pluginconf["rabbitmq.pool.size"].to_i
hosts = []
host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 61613).to_i
host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user")
host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password")
- host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", false)
-
- host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", false)) if host[:ssl]
+ host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", "false")
+ host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", "false")) if host[:ssl]
Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
hosts << host
# these can be guessed, the documentation isn't clear
connection[:initial_reconnect_delay] = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01))
connection[:max_reconnect_delay] = Float(get_option("rabbitmq.max_reconnect_delay", 30.0))
- connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", true)
+ connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", "true")
connection[:back_off_multiplier] = Integer(get_option("rabbitmq.back_off_multiplier", 2))
connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0))
- connection[:randomize] = get_bool_option("rabbitmq.randomize", false)
- connection[:backup] = get_bool_option("rabbitmq.backup", false)
+ connection[:randomize] = get_bool_option("rabbitmq.randomize", "false")
+ connection[:backup] = get_bool_option("rabbitmq.backup", "false")
+
connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1))
connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30))
connection[:reliable] = true
+ connection[:max_hbrlck_fails] = Integer(get_option("rabbitmq.max_hbrlck_fails", 2))
+ connection[:max_hbread_fails] = Integer(get_option("rabbitmq.max_hbread_fails", 2))
- # RabbitMQ and Stomp supports vhosts, this sets it in a way compatible with RabbitMQ and
- # force the version to 1.0, 1.1 support will be added in future
- connection[:connect_headers] = {"accept-version" => '1.0', "host" => get_option("rabbitmq.vhost", "/")}
+ connection[:connect_headers] = connection_headers
connection[:logger] = EventLogger.new
end
end
+ def connection_headers
+ headers = {:"accept-version" => "1.0"}
+
+ heartbeat_interval = Integer(get_option("rabbitmq.heartbeat_interval", 0))
+ stomp_1_0_fallback = get_bool_option("rabbitmq.stomp_1_0_fallback", true)
+
+ headers[:host] = get_option("rabbitmq.vhost", "/")
+
+ if heartbeat_interval > 0
+ unless Util.versioncmp(stomp_version, "1.2.10") >= 0
+ raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
+ end
+
+ if heartbeat_interval < 30
+ Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
+ heartbeat_interval = 30
+ end
+
+ heartbeat_interval = heartbeat_interval * 1000
+ headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]
+
+ if stomp_1_0_fallback
+ headers[:"accept-version"] = "1.1,1.0"
+ else
+ headers[:"accept-version"] = "1.1"
+ end
+ else
+ Log.warn("Connecting without STOMP 1.1 heartbeats, consider setting plugin.rabbitmq.heartbeat_interval")
+ end
+
+ headers
+ end
+
+ def stomp_version
+ ::Stomp::Version::STRING
+ end
+
# Sets the SSL paramaters for a specific connection
def ssl_parameters(poolnum, fallback)
- params = {:cert_file => get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false),
- :key_file => get_option("rabbitmq.pool.#{poolnum}.ssl.key", false),
+ params = {:cert_file => get_cert_file(poolnum),
+ :key_file => get_key_file(poolnum),
:ts_files => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false)}
raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
end
end
+ # Returns the name of the private key file used by RabbitMQ
+ # Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_KEY exists,
+ # where X is the RabbitMQ pool number.
+ # If the environment variable doesn't exist, it will try and load the value from the config.
+ def get_key_file(poolnum)
+ ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_KEY" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.key", false)
+ end
+
+ # Returns the name of the certificate file used by RabbitMQ
+ # Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_CERT exists,
+ # where X is the RabbitMQ pool number.
+ # If the environment variable doesn't exist, it will try and load the value from the config.
+ def get_cert_file(poolnum)
+ ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_CERT" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false)
+ end
+
# Receives a message from the RabbitMQ connection
def receive
Log.debug("Waiting for a message from RabbitMQ")
end
+ # marks messages as valid for ttl + 10 seconds, we do this here
+ # rather than in make_target as this should only be set on publish
+ target[:headers]["expiration"] = ((msg.ttl + 10) * 1000).to_s
+
return target
end
target = {:name => "", :headers => {}, :id => nil}
+ if get_bool_option("rabbitmq.use_reply_exchange", false)
+ reply_path = "/exchange/mcollective_reply/%s_%s" % [ @config.identity, $$ ]
+ else
+ reply_path = "/temp-queue/mcollective_reply_%s" % agent
+ end
case type
when :reply # receiving replies on a temp queue
- target[:name] = "/temp-queue/mcollective_reply_%s" % agent
+ target[:name] = reply_path
target[:id] = "mcollective_%s_replies" % agent
when :broadcast, :request # publishing a request to all nodes with an agent
if reply_to
target[:headers]["reply-to"] = reply_to
else
- target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
+ target[:headers]["reply-to"] = reply_path
end
target[:id] = "%s_broadcast_%s" % [collective, agent]
raise "Directed requests need to have a node identity" unless node
target[:name] = "/exchange/%s_directed/%s" % [ collective, node]
- target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
+ target[:headers]["reply-to"] = reply_path
when :directed # subscribing to directed messages
target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ]
- target[:id] = "%s_directed_to_identity" % @config.identity
+ target[:id] = "%s_%s_directed_to_identity" % [ collective, @config.identity ]
end
target
raise("No plugin.#{opt} configuration option given")
end
- # gets a boolean option from the config, supports y/n/true/false/1/0
- def get_bool_option(opt, default)
- return default unless @config.pluginconf.include?(opt)
-
- val = @config.pluginconf[opt]
-
- if val =~ /^1|yes|true/
- return true
- elsif val =~ /^0|no|false/
- return false
- else
- return default
- end
+ # looks up a boolean value in the config
+ def get_bool_option(val, default)
+ Util.str_to_bool(@config.pluginconf.fetch(val, default))
end
end
end