X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=plugins%2Fmcollective%2Fconnector%2Factivemq.rb;fp=plugins%2Fmcollective%2Fconnector%2Factivemq.rb;h=cb14647ef1b8c09602180f49a443fd772e086934;hb=b87d2f4e68281062df1913440ca5753ae63314a9;hp=0000000000000000000000000000000000000000;hpb=ab0ea530b8ac956091f17b104ab2311336cfc250;p=packages%2Fprecise%2Fmcollective.git diff --git a/plugins/mcollective/connector/activemq.rb b/plugins/mcollective/connector/activemq.rb new file mode 100644 index 0000000..cb14647 --- /dev/null +++ b/plugins/mcollective/connector/activemq.rb @@ -0,0 +1,402 @@ +require 'stomp' + +module MCollective + module Connector + # Handles sending and receiving messages over the Stomp protocol for ActiveMQ + # servers specifically, we take advantages of ActiveMQ specific features and + # enhancements to the Stomp protocol. For best results in a clustered environment + # use ActiveMQ 5.5.0 at least. + # + # This plugin takes an entirely different approach to dealing with ActiveMQ + # from the more generic stomp connector. + # + # - Agents use /topic/..agent + # - Replies use temp-topics so they are private and transient. + # - Point to Point messages using topics are supported by subscribing to + # /queue/.nodes with a selector "mc_identity = 'identity' + # + # The use of temp-topics for the replies is a huge improvement over the old style. + # In the old way all clients got replies for all clients that were active at that + # time, this would mean that they would need to decrypt, validate etc in order to + # determine if they need to ignore the message, this was computationally expensive + # and on large busy networks the messages were being sent all over the show cross + # broker boundaries. + # + # The new way means the messages go point2point back to only whoever requested the + # message, they only get their own replies and this is ap private channel that + # casual observers cannot just snoop into. + # + # This plugin supports 1.1.6 and newer of the Stomp rubygem. + # + # connector = activemq + # plugin.activemq.pool.size = 2 + # + # plugin.activemq.pool.1.host = stomp1.your.net + # plugin.activemq.pool.1.port = 6163 + # plugin.activemq.pool.1.user = you + # plugin.activemq.pool.1.password = secret + # plugin.activemq.pool.1.ssl = true + # plugin.activemq.pool.1.ssl.cert = /path/to/your.cert + # plugin.activemq.pool.1.ssl.key = /path/to/your.key + # plugin.activemq.pool.1.ssl.ca = /path/to/your.ca + # plugin.activemq.pool.1.ssl.fallback = true + # + # plugin.activemq.pool.2.host = stomp2.your.net + # plugin.activemq.pool.2.port = 6163 + # plugin.activemq.pool.2.user = you + # plugin.activemq.pool.2.password = secret + # plugin.activemq.pool.2.ssl = false + # + # Using this method you can supply just STOMP_USER and STOMP_PASSWORD. The port will + # default to 61613 if not specified. + # + # The ssl options are only usable in version of the Stomp gem newer than 1.2.2 where these + # will imply full SSL validation will be done and you'll only be able to connect to a + # ActiveMQ server that has a cert signed by the same CA. If you only set ssl = true + # and do not supply the cert, key and ca properties or if you have an older gem it + # will fall back to unverified mode only if ssl.fallback is true + # + # In addition you can set the following options for the rubygem: + # + # plugin.activemq.initial_reconnect_delay = 0.01 + # plugin.activemq.max_reconnect_delay = 30.0 + # plugin.activemq.use_exponential_back_off = true + # plugin.activemq.back_off_multiplier = 2 + # plugin.activemq.max_reconnect_attempts = 0 + # plugin.activemq.randomize = false + # plugin.activemq.timeout = -1 + # + # You can set the initial connetion timeout - this is when your stomp server is simply + # unreachable - after which it would failover to the next in the pool: + # + # plugin.activemq.connect_timeout = 30 + # + # ActiveMQ JMS message priorities can be set: + # + # plugin.activemq.priority = 4 + # + class Activemq hosts} + + # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of + # these can be guessed, the documentation isn't clear + connection[:initial_reconnect_delay] = Float(get_option("activemq.initial_reconnect_delay", 0.01)) + connection[:max_reconnect_delay] = Float(get_option("activemq.max_reconnect_delay", 30.0)) + connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", true) + connection[:back_off_multiplier] = Integer(get_option("activemq.back_off_multiplier", 2)) + connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0)) + connection[:randomize] = get_bool_option("activemq.randomize", false) + connection[:backup] = get_bool_option("activemq.backup", false) + connection[:timeout] = Integer(get_option("activemq.timeout", -1)) + connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30)) + connection[:reliable] = true + + connection[:logger] = EventLogger.new + + @connection = connector.new(connection) + rescue Exception => e + raise("Could not connect to ActiveMQ Server: #{e}") + end + end + + # Sets the SSL paramaters for a specific connection + def ssl_parameters(poolnum, fallback) + params = {:cert_file => get_option("activemq.pool.#{poolnum}.ssl.cert", false), + :key_file => get_option("activemq.pool.#{poolnum}.ssl.key", false), + :ts_files => get_option("activemq.pool.#{poolnum}.ssl.ca", false)} + + raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files] + + raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file]) + raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file]) + + params[:ts_files].split(",").each do |ca| + raise "Cannot find CA file #{ca}" unless File.exist?(ca) + end + + begin + Stomp::SSLParams.new(params) + rescue NameError + raise "Stomp gem >= 1.2.2 is needed" + end + + rescue Exception => e + if fallback + Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}") + return true + else + Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}") + raise(e) + end + end + + # Receives a message from the ActiveMQ connection + def receive + Log.debug("Waiting for a message from ActiveMQ") + + # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection + # handling it sets the connection to closed. If we happen to be receiving at just + # that time we will get an exception warning about the closed connection so handling + # that here with a sleep and a retry. + begin + msg = @connection.receive + rescue ::Stomp::Error::NoCurrentConnection + sleep 1 + retry + end + + Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers) + end + + # Sends a message to the ActiveMQ connection + def publish(msg) + msg.base64_encode! if @base64 + + target = target_for(msg) + + if msg.type == :direct_request + msg.discovered_hosts.each do |node| + target[:headers] = headers_for(msg, node) + + Log.debug("Sending a direct message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") + + @connection.publish(target[:name], msg.payload, target[:headers]) + end + else + target[:headers].merge!(headers_for(msg)) + + Log.debug("Sending a broadcast message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") + + @connection.publish(target[:name], msg.payload, target[:headers]) + end + end + + # Subscribe to a topic or queue + def subscribe(agent, type, collective) + source = make_target(agent, type, collective) + + unless @subscriptions.include?(source[:id]) + Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}") + @connection.subscribe(source[:name], source[:headers], source[:id]) + @subscriptions << source[:id] + end + rescue ::Stomp::Error::DuplicateSubscription + Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring") + end + + # Subscribe to a topic or queue + def unsubscribe(agent, type, collective) + source = make_target(agent, type, collective) + + Log.debug("Unsubscribing from #{source[:name]}") + @connection.unsubscribe(source[:name], source[:headers], source[:id]) + @subscriptions.delete(source[:id]) + end + + def target_for(msg) + if msg.type == :reply + target = {:name => msg.request.headers["reply-to"], :headers => {}} + elsif [:request, :direct_request].include?(msg.type) + target = make_target(msg.agent, msg.type, msg.collective) + else + raise "Don't now how to create a target for message type #{msg.type}" + end + + return target + end + + # Disconnects from the ActiveMQ connection + def disconnect + Log.debug("Disconnecting from ActiveMQ") + @connection.disconnect + @connection = nil + end + + def headers_for(msg, identity=nil) + headers = {} + headers = {"priority" => @msgpriority} if @msgpriority > 0 + + if [:request, :direct_request].include?(msg.type) + target = make_target(msg.agent, :reply, msg.collective) + + if msg.reply_to + headers["reply-to"] = msg.reply_to + else + headers["reply-to"] = target[:name] + end + + headers["mc_identity"] = identity if msg.type == :direct_request + end + + return headers + end + + def make_target(agent, type, collective) + raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type) + raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective) + + target = {:name => nil, :headers => {}} + + case type + when :reply + target[:name] = ["/queue/" + collective, :reply, "#{Config.instance.identity}_#{$$}"].join(".") + + when :broadcast + target[:name] = ["/topic/" + collective, agent, :agent].join(".") + + when :request + target[:name] = ["/topic/" + collective, agent, :agent].join(".") + + when :direct_request + target[:name] = ["/queue/" + collective, :nodes].join(".") + + when :directed + target[:name] = ["/queue/" + collective, :nodes].join(".") + target[:headers]["selector"] = "mc_identity = '#{@config.identity}'" + target[:id] = "%s_directed_to_identity" % collective + end + + target[:id] = target[:name] unless target[:id] + + target + end + + # looks in the environment first then in the config file + # for a specific option, accepts an optional default. + # + # raises an exception when it cant find a value anywhere + def get_env_or_option(env, opt, default=nil) + return ENV[env] if ENV.include?(env) + return @config.pluginconf[opt] if @config.pluginconf.include?(opt) + return default if default + + raise("No #{env} environment or plugin.#{opt} configuration option given") + end + + # looks for a config option, accepts an optional default + # + # raises an exception when it cant find a value anywhere + def get_option(opt, default=nil) + return @config.pluginconf[opt] if @config.pluginconf.include?(opt) + return default unless default.nil? + + raise("No plugin.#{opt} configuration option given") + end + + # gets a boolean option from the config, supports y/n/true/false/1/0 + def get_bool_option(opt, default) + return default unless @config.pluginconf.include?(opt) + + val = @config.pluginconf[opt] + + if val =~ /^1|yes|true/ + return true + elsif val =~ /^0|no|false/ + return false + else + return default + end + end + end + end +end + +# vi:tabstop=4:expandtab:ai