X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;ds=sidebyside;f=plugins%2Fmcollective%2Fconnector%2Frabbitmq.rb;fp=plugins%2Fmcollective%2Fconnector%2Frabbitmq.rb;h=2bb3b1c95dcfea70a8702d9e23181669ab9d89eb;hb=b87d2f4e68281062df1913440ca5753ae63314a9;hp=0000000000000000000000000000000000000000;hpb=ab0ea530b8ac956091f17b104ab2311336cfc250;p=packages%2Fprecise%2Fmcollective.git diff --git a/plugins/mcollective/connector/rabbitmq.rb b/plugins/mcollective/connector/rabbitmq.rb new file mode 100644 index 0000000..2bb3b1c --- /dev/null +++ b/plugins/mcollective/connector/rabbitmq.rb @@ -0,0 +1,306 @@ +require 'stomp' + +module MCollective + module Connector + class Rabbitmq hosts} + + # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of + # these can be guessed, the documentation isn't clear + connection[:initial_reconnect_delay] = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01)) + connection[:max_reconnect_delay] = Float(get_option("rabbitmq.max_reconnect_delay", 30.0)) + connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", true) + connection[:back_off_multiplier] = Integer(get_option("rabbitmq.back_off_multiplier", 2)) + connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0)) + connection[:randomize] = get_bool_option("rabbitmq.randomize", false) + connection[:backup] = get_bool_option("rabbitmq.backup", false) + connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1)) + connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30)) + connection[:reliable] = true + + # RabbitMQ and Stomp supports vhosts, this sets it in a way compatible with RabbitMQ and + # force the version to 1.0, 1.1 support will be added in future + connection[:connect_headers] = {"accept-version" => '1.0', "host" => get_option("rabbitmq.vhost", "/")} + + connection[:logger] = EventLogger.new + + @connection = connector.new(connection) + rescue Exception => e + raise("Could not connect to RabbitMQ Server: #{e}") + end + end + + # Sets the SSL paramaters for a specific connection + def ssl_parameters(poolnum, fallback) + params = {:cert_file => get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false), + :key_file => get_option("rabbitmq.pool.#{poolnum}.ssl.key", false), + :ts_files => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false)} + + raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files] + + raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file]) + raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file]) + + params[:ts_files].split(",").each do |ca| + raise "Cannot find CA file #{ca}" unless File.exist?(ca) + end + + begin + Stomp::SSLParams.new(params) + rescue NameError + raise "Stomp gem >= 1.2.2 is needed" + end + + rescue Exception => e + if fallback + Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}") + return true + else + Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}") + raise(e) + end + end + + # Receives a message from the RabbitMQ connection + def receive + Log.debug("Waiting for a message from RabbitMQ") + + # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection + # handling it sets the connection to closed. If we happen to be receiving at just + # that time we will get an exception warning about the closed connection so handling + # that here with a sleep and a retry. + begin + msg = @connection.receive + rescue ::Stomp::Error::NoCurrentConnection + sleep 1 + retry + end + + raise "Received a processing error from RabbitMQ: '%s'" % msg.body.chomp if msg.body =~ /Processing error/ + + Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers) + end + + # Sends a message to the RabbitMQ connection + def publish(msg) + msg.base64_encode! if @base64 + + if msg.type == :direct_request + msg.discovered_hosts.each do |node| + target = target_for(msg, node) + + Log.debug("Sending a direct message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") + + @connection.publish(target[:name], msg.payload, target[:headers]) + end + else + target = target_for(msg) + + Log.debug("Sending a broadcast message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") + + @connection.publish(target[:name], msg.payload, target[:headers]) + end + end + + def target_for(msg, node=nil) + if msg.type == :reply + target = {:name => msg.request.headers["reply-to"], :headers => {}, :id => ""} + + elsif [:request, :direct_request].include?(msg.type) + target = make_target(msg.agent, msg.type, msg.collective, node) + + else + raise "Don't now how to create a target for message type #{msg.type}" + + end + + return target + end + + def make_target(agent, type, collective, node=nil) + raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type) + raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective) + + target = {:name => "", :headers => {}, :id => nil} + + case type + when :reply # receiving replies on a temp queue + target[:name] = "/temp-queue/mcollective_reply_%s" % agent + target[:id] = "mcollective_%s_replies" % agent + + when :broadcast, :request # publishing a request to all nodes with an agent + target[:name] = "/exchange/%s_broadcast/%s" % [collective, agent] + target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent + target[:id] = "%s_broadcast_%s" % [collective, agent] + + when :direct_request # a request to a specific node + raise "Directed requests need to have a node identity" unless node + + target[:name] = "/exchange/%s_directed/%s" % [ collective, node] + target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent + + when :directed # subscribing to directed messages + target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ] + target[:id] = "%s_directed_to_identity" % @config.identity + end + + target + end + + # Subscribe to a topic or queue + def subscribe(agent, type, collective) + return if type == :reply + + source = make_target(agent, type, collective) + + unless @subscriptions.include?(source[:id]) + Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}") + @connection.subscribe(source[:name], source[:headers], source[:id]) + @subscriptions << source[:id] + end + rescue ::Stomp::Error::DuplicateSubscription + Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring") + end + + # Subscribe to a topic or queue + def unsubscribe(agent, type, collective) + return if type == :reply + + source = make_target(agent, type, collective) + + Log.debug("Unsubscribing from #{source[:name]}") + @connection.unsubscribe(source[:name], source[:headers], source[:id]) + @subscriptions.delete(source[:id]) + end + + # Disconnects from the RabbitMQ connection + def disconnect + Log.debug("Disconnecting from RabbitMQ") + @connection.disconnect + @connection = nil + end + + # looks in the environment first then in the config file + # for a specific option, accepts an optional default. + # + # raises an exception when it cant find a value anywhere + def get_env_or_option(env, opt, default=nil) + return ENV[env] if ENV.include?(env) + return @config.pluginconf[opt] if @config.pluginconf.include?(opt) + return default if default + + raise("No #{env} environment or plugin.#{opt} configuration option given") + end + + # looks for a config option, accepts an optional default + # + # raises an exception when it cant find a value anywhere + def get_option(opt, default=nil) + return @config.pluginconf[opt] if @config.pluginconf.include?(opt) + return default unless default.nil? + + raise("No plugin.#{opt} configuration option given") + end + + # gets a boolean option from the config, supports y/n/true/false/1/0 + def get_bool_option(opt, default) + return default unless @config.pluginconf.include?(opt) + + val = @config.pluginconf[opt] + + if val =~ /^1|yes|true/ + return true + elsif val =~ /^0|no|false/ + return false + else + return default + end + end + end + end +end + +# vi:tabstop=4:expandtab:ai