X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=plugins%2Fmcollective%2Fconnector%2Frabbitmq.rb;fp=plugins%2Fmcollective%2Fconnector%2Frabbitmq.rb;h=f0d479e7a82c4fe75b1fcc05a96736a3ec2f858b;hb=d1f1649ba43c5cbc43c4beb2380096ba051d646a;hp=f3e2dbe94abbcb17d96e144890cb56689f7f7313;hpb=8a3fe7daeecccf43dd71c59371c5005400d35101;p=packages%2Fprecise%2Fmcollective.git diff --git a/plugins/mcollective/connector/rabbitmq.rb b/plugins/mcollective/connector/rabbitmq.rb index f3e2dbe..f0d479e 100644 --- a/plugins/mcollective/connector/rabbitmq.rb +++ b/plugins/mcollective/connector/rabbitmq.rb @@ -45,6 +45,29 @@ module MCollective Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}") end + # Stomp 1.1+ - heart beat read (receive) failed. + def on_hbread_fail(params, ticker_data) + Log.error("Heartbeat read failed from '%s': %s" % [stomp_url(params), ticker_data.inspect]) + rescue Exception => e + end + + # Stomp 1.1+ - heart beat send (transmit) failed. + def on_hbwrite_fail(params, ticker_data) + Log.error("Heartbeat write failed from '%s': %s" % [stomp_url(params), ticker_data.inspect]) + rescue Exception => e + end + + # Log heart beat fires + def on_hbfire(params, srind, curt) + case srind + when "receive_fire" + Log.debug("Received heartbeat from %s: %s, %s" % [stomp_url(params), srind, curt]) + when "send_fire" + Log.debug("Publishing heartbeat to %s: %s, %s" % [stomp_url(params), srind, curt]) + end + rescue Exception => e + end + def stomp_url(params) "%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]] end @@ -64,7 +87,7 @@ module MCollective end begin - @base64 = get_bool_option("rabbitmq.base64", false) + @base64 = get_bool_option("rabbitmq.base64", "false") pools = @config.pluginconf["rabbitmq.pool.size"].to_i hosts = [] @@ -76,9 +99,8 @@ module MCollective host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 61613).to_i host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user") host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password") - host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", false) - - host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", false)) if host[:ssl] + host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", "false") + host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", "false")) if host[:ssl] Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool") hosts << host @@ -92,18 +114,19 @@ module MCollective # these can be guessed, the documentation isn't clear connection[:initial_reconnect_delay] = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01)) connection[:max_reconnect_delay] = Float(get_option("rabbitmq.max_reconnect_delay", 30.0)) - connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", true) + connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", "true") connection[:back_off_multiplier] = Integer(get_option("rabbitmq.back_off_multiplier", 2)) connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0)) - connection[:randomize] = get_bool_option("rabbitmq.randomize", false) - connection[:backup] = get_bool_option("rabbitmq.backup", false) + connection[:randomize] = get_bool_option("rabbitmq.randomize", "false") + connection[:backup] = get_bool_option("rabbitmq.backup", "false") + connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1)) connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30)) connection[:reliable] = true + connection[:max_hbrlck_fails] = Integer(get_option("rabbitmq.max_hbrlck_fails", 2)) + connection[:max_hbread_fails] = Integer(get_option("rabbitmq.max_hbread_fails", 2)) - # RabbitMQ and Stomp supports vhosts, this sets it in a way compatible with RabbitMQ and - # force the version to 1.0, 1.1 support will be added in future - connection[:connect_headers] = {"accept-version" => '1.0', "host" => get_option("rabbitmq.vhost", "/")} + connection[:connect_headers] = connection_headers connection[:logger] = EventLogger.new @@ -113,10 +136,47 @@ module MCollective end end + def connection_headers + headers = {:"accept-version" => "1.0"} + + heartbeat_interval = Integer(get_option("rabbitmq.heartbeat_interval", 0)) + stomp_1_0_fallback = get_bool_option("rabbitmq.stomp_1_0_fallback", true) + + headers[:host] = get_option("rabbitmq.vhost", "/") + + if heartbeat_interval > 0 + unless Util.versioncmp(stomp_version, "1.2.10") >= 0 + raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem") + end + + if heartbeat_interval < 30 + Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s") + heartbeat_interval = 30 + end + + heartbeat_interval = heartbeat_interval * 1000 + headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500] + + if stomp_1_0_fallback + headers[:"accept-version"] = "1.1,1.0" + else + headers[:"accept-version"] = "1.1" + end + else + Log.warn("Connecting without STOMP 1.1 heartbeats, consider setting plugin.rabbitmq.heartbeat_interval") + end + + headers + end + + def stomp_version + ::Stomp::Version::STRING + end + # Sets the SSL paramaters for a specific connection def ssl_parameters(poolnum, fallback) - params = {:cert_file => get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false), - :key_file => get_option("rabbitmq.pool.#{poolnum}.ssl.key", false), + params = {:cert_file => get_cert_file(poolnum), + :key_file => get_key_file(poolnum), :ts_files => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false)} raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files] @@ -144,6 +204,22 @@ module MCollective end end + # Returns the name of the private key file used by RabbitMQ + # Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_KEY exists, + # where X is the RabbitMQ pool number. + # If the environment variable doesn't exist, it will try and load the value from the config. + def get_key_file(poolnum) + ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_KEY" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.key", false) + end + + # Returns the name of the certificate file used by RabbitMQ + # Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_CERT exists, + # where X is the RabbitMQ pool number. + # If the environment variable doesn't exist, it will try and load the value from the config. + def get_cert_file(poolnum) + ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_CERT" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false) + end + # Receives a message from the RabbitMQ connection def receive Log.debug("Waiting for a message from RabbitMQ") @@ -197,6 +273,10 @@ module MCollective end + # marks messages as valid for ttl + 10 seconds, we do this here + # rather than in make_target as this should only be set on publish + target[:headers]["expiration"] = ((msg.ttl + 10) * 1000).to_s + return target end @@ -206,9 +286,14 @@ module MCollective target = {:name => "", :headers => {}, :id => nil} + if get_bool_option("rabbitmq.use_reply_exchange", false) + reply_path = "/exchange/mcollective_reply/%s_%s" % [ @config.identity, $$ ] + else + reply_path = "/temp-queue/mcollective_reply_%s" % agent + end case type when :reply # receiving replies on a temp queue - target[:name] = "/temp-queue/mcollective_reply_%s" % agent + target[:name] = reply_path target[:id] = "mcollective_%s_replies" % agent when :broadcast, :request # publishing a request to all nodes with an agent @@ -216,7 +301,7 @@ module MCollective if reply_to target[:headers]["reply-to"] = reply_to else - target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent + target[:headers]["reply-to"] = reply_path end target[:id] = "%s_broadcast_%s" % [collective, agent] @@ -224,11 +309,11 @@ module MCollective raise "Directed requests need to have a node identity" unless node target[:name] = "/exchange/%s_directed/%s" % [ collective, node] - target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent + target[:headers]["reply-to"] = reply_path when :directed # subscribing to directed messages target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ] - target[:id] = "%s_directed_to_identity" % @config.identity + target[:id] = "%s_%s_directed_to_identity" % [ collective, @config.identity ] end target @@ -289,19 +374,9 @@ module MCollective raise("No plugin.#{opt} configuration option given") end - # gets a boolean option from the config, supports y/n/true/false/1/0 - def get_bool_option(opt, default) - return default unless @config.pluginconf.include?(opt) - - val = @config.pluginconf[opt] - - if val =~ /^1|yes|true/ - return true - elsif val =~ /^0|no|false/ - return false - else - return default - end + # looks up a boolean value in the config + def get_bool_option(val, default) + Util.str_to_bool(@config.pluginconf.fetch(val, default)) end end end