Update code from https://github.com/dmi-try/marionette-collective
[packages/precise/mcollective.git] / plugins / mcollective / connector / activemq.rb
1 require 'stomp'
2
3 module MCollective
4   module Connector
5     # Handles sending and receiving messages over the Stomp protocol for ActiveMQ
6     # servers specifically, we take advantages of ActiveMQ specific features and
7     # enhancements to the Stomp protocol.  For best results in a clustered environment
8     # use ActiveMQ 5.5.0 at least.
9     #
10     # This plugin takes an entirely different approach to dealing with ActiveMQ
11     # from the more generic stomp connector.
12     #
13     #  - Agents use /topic/<collective>.<agent>.agent
14     #  - Replies use temp-topics so they are private and transient.
15     #  - Point to Point messages using topics are supported by subscribing to
16     #    /queue/<collective>.nodes with a selector "mc_identity = 'identity'
17     #
18     # The use of temp-topics for the replies is a huge improvement over the old style.
19     # In the old way all clients got replies for all clients that were active at that
20     # time, this would mean that they would need to decrypt, validate etc in order to
21     # determine if they need to ignore the message, this was computationally expensive
22     # and on large busy networks the messages were being sent all over the show cross
23     # broker boundaries.
24     #
25     # The new way means the messages go point2point back to only whoever requested the
26     # message, they only get their own replies and this is ap private channel that
27     # casual observers cannot just snoop into.
28     #
29     # This plugin supports 1.1.6 and newer of the Stomp rubygem.
30     #
31     #    connector = activemq
32     #    plugin.activemq.pool.size = 2
33     #
34     #    plugin.activemq.pool.1.host = stomp1.your.net
35     #    plugin.activemq.pool.1.port = 61613
36     #    plugin.activemq.pool.1.user = you
37     #    plugin.activemq.pool.1.password = secret
38     #    plugin.activemq.pool.1.ssl = true
39     #    plugin.activemq.pool.1.ssl.cert = /path/to/your.cert
40     #    plugin.activemq.pool.1.ssl.key = /path/to/your.key
41     #    plugin.activemq.pool.1.ssl.ca = /path/to/your.ca
42     #    plugin.activemq.pool.1.ssl.fallback = true
43     #
44     #    plugin.activemq.pool.2.host = stomp2.your.net
45     #    plugin.activemq.pool.2.port = 61613
46     #    plugin.activemq.pool.2.user = you
47     #    plugin.activemq.pool.2.password = secret
48     #    plugin.activemq.pool.2.ssl = false
49     #
50     # Using this method you can supply just STOMP_USER and STOMP_PASSWORD.  The port will
51     # default to 61613 if not specified.
52     #
53     # The ssl options are only usable in version of the Stomp gem newer than 1.2.2 where these
54     # will imply full SSL validation will be done and you'll only be able to connect to a
55     # ActiveMQ server that has a cert signed by the same CA.  If you only set ssl = true
56     # and do not supply the cert, key and ca properties or if you have an older gem it
57     # will fall back to unverified mode only if ssl.fallback is true
58     #
59     # In addition you can set the following options for the rubygem:
60     #
61     #     plugin.activemq.initial_reconnect_delay = 0.01
62     #     plugin.activemq.max_reconnect_delay = 30.0
63     #     plugin.activemq.use_exponential_back_off = true
64     #     plugin.activemq.back_off_multiplier = 2
65     #     plugin.activemq.max_reconnect_attempts = 0
66     #     plugin.activemq.randomize = false
67     #     plugin.activemq.timeout = -1
68     #
69     # You can set the initial connetion timeout - this is when your stomp server is simply
70     # unreachable - after which it would failover to the next in the pool:
71     #
72     #     plugin.activemq.connect_timeout = 30
73     #
74     # ActiveMQ JMS message priorities can be set:
75     #
76     #     plugin.activemq.priority = 4
77     #
78     class Activemq<Base
79       attr_reader :connection
80
81       # Older stomp gems do not have these error classes, in order to be able to
82       # handle these exceptions if they are present and still support older gems
83       # we're assigning the constants to a dummy exception that will never be thrown
84       # by us.  End result is that the code catching these exceptions become noops on
85       # older gems but on newer ones they become usable and handle those new errors
86       # intelligently
87       class DummyError<RuntimeError; end
88
89       ::Stomp::Error = DummyError unless defined?(::Stomp::Error)
90       ::Stomp::Error::NoCurrentConnection = DummyError unless defined?(::Stomp::Error::NoCurrentConnection)
91       ::Stomp::Error::DuplicateSubscription = DummyError unless defined?(::Stomp::Error::DuplicateSubscription)
92
93       # Class for Stomp 1.1.9 callback based logging
94       class EventLogger
95         def on_connecting(params=nil)
96           Log.info("TCP Connection attempt %d to %s" % [params[:cur_conattempts], stomp_url(params)])
97         rescue
98         end
99
100         def on_connected(params=nil)
101           Log.info("Conncted to #{stomp_url(params)}")
102         rescue
103         end
104
105         def on_disconnect(params=nil)
106           Log.info("Disconnected from #{stomp_url(params)}")
107         rescue
108         end
109
110         def on_connectfail(params=nil)
111           Log.info("TCP Connection to #{stomp_url(params)} failed on attempt #{params[:cur_conattempts]}")
112         rescue
113         end
114
115         def on_miscerr(params, errstr)
116           Log.error("Unexpected error on connection #{stomp_url(params)}: #{errstr}")
117         rescue
118         end
119
120         def on_ssl_connecting(params)
121           Log.info("Estblishing SSL session with #{stomp_url(params)}")
122         rescue
123         end
124
125         def on_ssl_connected(params)
126           Log.info("SSL session established with #{stomp_url(params)}")
127         rescue
128         end
129
130         def on_ssl_connectfail(params)
131           Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}")
132         end
133
134         def stomp_url(params)
135           "%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
136         end
137       end
138
139       def initialize
140         @config = Config.instance
141         @subscriptions = []
142         @msgpriority = 0
143         @base64 = false
144       end
145
146       # Connects to the ActiveMQ middleware
147       def connect(connector = ::Stomp::Connection)
148         if @connection
149           Log.debug("Already connection, not re-initializing connection")
150           return
151         end
152
153         begin
154           @base64 = get_bool_option("activemq.base64", false)
155           @msgpriority = get_option("activemq.priority", 0).to_i
156
157           pools = @config.pluginconf["activemq.pool.size"].to_i
158           hosts = []
159
160           1.upto(pools) do |poolnum|
161             host = {}
162
163             host[:host] = get_option("activemq.pool.#{poolnum}.host")
164             host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i
165             host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user")
166             host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password")
167             host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", false)
168
169             host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", false)) if host[:ssl]
170
171             Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
172             hosts << host
173           end
174
175           raise "No hosts found for the ActiveMQ connection pool" if hosts.size == 0
176
177           connection = {:hosts => hosts}
178
179           # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
180           # these can be guessed, the documentation isn't clear
181           connection[:initial_reconnect_delay] = Float(get_option("activemq.initial_reconnect_delay", 0.01))
182           connection[:max_reconnect_delay] = Float(get_option("activemq.max_reconnect_delay", 30.0))
183           connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", true)
184           connection[:back_off_multiplier] = Integer(get_option("activemq.back_off_multiplier", 2))
185           connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0))
186           connection[:randomize] = get_bool_option("activemq.randomize", false)
187           connection[:backup] = get_bool_option("activemq.backup", false)
188           connection[:timeout] = Integer(get_option("activemq.timeout", -1))
189           connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30))
190           connection[:reliable] = true
191
192           connection[:logger] = EventLogger.new
193
194           @connection = connector.new(connection)
195         rescue Exception => e
196           raise("Could not connect to ActiveMQ Server: #{e}")
197         end
198       end
199
200       # Sets the SSL paramaters for a specific connection
201       def ssl_parameters(poolnum, fallback)
202         params = {:cert_file => get_option("activemq.pool.#{poolnum}.ssl.cert", false),
203                   :key_file  => get_option("activemq.pool.#{poolnum}.ssl.key", false),
204                   :ts_files  => get_option("activemq.pool.#{poolnum}.ssl.ca", false)}
205
206         raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
207
208         raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
209         raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])
210
211         params[:ts_files].split(",").each do |ca|
212           raise "Cannot find CA file #{ca}" unless File.exist?(ca)
213         end
214
215         begin
216           Stomp::SSLParams.new(params)
217         rescue NameError
218           raise "Stomp gem >= 1.2.2 is needed"
219         end
220
221       rescue Exception => e
222         if fallback
223           Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
224           return true
225         else
226           Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
227           raise(e)
228         end
229       end
230
231       # Receives a message from the ActiveMQ connection
232       def receive
233         Log.debug("Waiting for a message from ActiveMQ")
234
235         # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
236         # handling it sets the connection to closed.  If we happen to be receiving at just
237         # that time we will get an exception warning about the closed connection so handling
238         # that here with a sleep and a retry.
239         begin
240           msg = @connection.receive
241         rescue ::Stomp::Error::NoCurrentConnection
242           sleep 1
243           retry
244         end
245
246         Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
247       end
248
249       # Sends a message to the ActiveMQ connection
250       def publish(msg)
251         msg.base64_encode! if @base64
252
253         target = target_for(msg)
254
255         if msg.type == :direct_request
256           msg.discovered_hosts.each do |node|
257             target[:headers] = headers_for(msg, node)
258
259             Log.debug("Sending a direct message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
260
261             @connection.publish(target[:name], msg.payload, target[:headers])
262           end
263         else
264           target[:headers].merge!(headers_for(msg))
265
266           Log.debug("Sending a broadcast message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
267
268           @connection.publish(target[:name], msg.payload, target[:headers])
269         end
270       end
271
272       # Subscribe to a topic or queue
273       def subscribe(agent, type, collective)
274         source = make_target(agent, type, collective)
275
276         unless @subscriptions.include?(source[:id])
277           Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
278           @connection.subscribe(source[:name], source[:headers], source[:id])
279           @subscriptions << source[:id]
280         end
281       rescue ::Stomp::Error::DuplicateSubscription
282         Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
283       end
284
285       # Subscribe to a topic or queue
286       def unsubscribe(agent, type, collective)
287         source = make_target(agent, type, collective)
288
289         Log.debug("Unsubscribing from #{source[:name]}")
290         @connection.unsubscribe(source[:name], source[:headers], source[:id])
291         @subscriptions.delete(source[:id])
292       end
293
294       def target_for(msg)
295         if msg.type == :reply
296           target = {:name => msg.request.headers["reply-to"], :headers => {}}
297         elsif [:request, :direct_request].include?(msg.type)
298           target = make_target(msg.agent, msg.type, msg.collective)
299         else
300           raise "Don't now how to create a target for message type #{msg.type}"
301         end
302
303         return target
304       end
305
306       # Disconnects from the ActiveMQ connection
307       def disconnect
308         Log.debug("Disconnecting from ActiveMQ")
309         @connection.disconnect
310         @connection = nil
311       end
312
313       def headers_for(msg, identity=nil)
314         headers = {}
315         headers = {"priority" => @msgpriority} if @msgpriority > 0
316
317         if [:request, :direct_request].include?(msg.type)
318           target = make_target(msg.agent, :reply, msg.collective)
319
320           if msg.reply_to
321             headers["reply-to"] = msg.reply_to
322           else
323             headers["reply-to"] = target[:name]
324           end
325
326           headers["mc_identity"] = identity if msg.type == :direct_request
327         end
328
329         return headers
330       end
331
332       def make_target(agent, type, collective)
333         raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
334         raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)
335
336         target = {:name => nil, :headers => {}}
337
338         case type
339           when :reply
340             target[:name] = ["/queue/" + collective, :reply, "#{Config.instance.identity}_#{$$}"].join(".")
341
342           when :broadcast
343             target[:name] = ["/topic/" + collective, agent, :agent].join(".")
344
345           when :request
346             target[:name] = ["/topic/" + collective, agent, :agent].join(".")
347
348           when :direct_request
349             target[:name] = ["/queue/" + collective, :nodes].join(".")
350
351           when :directed
352             target[:name] = ["/queue/" + collective, :nodes].join(".")
353             target[:headers]["selector"] = "mc_identity = '#{@config.identity}'"
354             target[:id] = "%s_directed_to_identity" % collective
355         end
356
357         target[:id] = target[:name] unless target[:id]
358
359         target
360       end
361
362       # looks in the environment first then in the config file
363       # for a specific option, accepts an optional default.
364       #
365       # raises an exception when it cant find a value anywhere
366       def get_env_or_option(env, opt, default=nil)
367         return ENV[env] if ENV.include?(env)
368         return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
369         return default if default
370
371         raise("No #{env} environment or plugin.#{opt} configuration option given")
372       end
373
374       # looks for a config option, accepts an optional default
375       #
376       # raises an exception when it cant find a value anywhere
377       def get_option(opt, default=nil)
378         return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
379         return default unless default.nil?
380
381         raise("No plugin.#{opt} configuration option given")
382       end
383
384       # gets a boolean option from the config, supports y/n/true/false/1/0
385       def get_bool_option(opt, default)
386         return default unless @config.pluginconf.include?(opt)
387
388         val = @config.pluginconf[opt]
389
390         if val =~ /^1|yes|true/
391           return true
392         elsif val =~ /^0|no|false/
393           return false
394         else
395           return default
396         end
397       end
398     end
399   end
400 end
401
402 # vi:tabstop=4:expandtab:ai