Update version according to OSCI-856
[packages/precise/mcollective.git] / plugins / mcollective / connector / activemq.rb
1 require 'stomp'
2
3 module MCollective
4   module Connector
5     # Handles sending and receiving messages over the Stomp protocol for ActiveMQ
6     # servers specifically, we take advantages of ActiveMQ specific features and
7     # enhancements to the Stomp protocol.  For best results in a clustered environment
8     # use ActiveMQ 5.5.0 at least.
9     #
10     # This plugin takes an entirely different approach to dealing with ActiveMQ
11     # from the more generic stomp connector.
12     #
13     #  - Agents use /topic/<collective>.<agent>.agent
14     #  - Replies use temp-topics so they are private and transient.
15     #  - Point to Point messages using topics are supported by subscribing to
16     #    /queue/<collective>.nodes with a selector "mc_identity = 'identity'
17     #
18     # The use of temp-topics for the replies is a huge improvement over the old style.
19     # In the old way all clients got replies for all clients that were active at that
20     # time, this would mean that they would need to decrypt, validate etc in order to
21     # determine if they need to ignore the message, this was computationally expensive
22     # and on large busy networks the messages were being sent all over the show cross
23     # broker boundaries.
24     #
25     # The new way means the messages go point2point back to only whoever requested the
26     # message, they only get their own replies and this is ap private channel that
27     # casual observers cannot just snoop into.
28     #
29     # This plugin supports 1.1.6 and newer of the Stomp rubygem.
30     #
31     #    connector = activemq
32     #    plugin.activemq.pool.size = 2
33     #
34     #    plugin.activemq.pool.1.host = stomp1.your.net
35     #    plugin.activemq.pool.1.port = 61613
36     #    plugin.activemq.pool.1.user = you
37     #    plugin.activemq.pool.1.password = secret
38     #    plugin.activemq.pool.1.ssl = true
39     #    plugin.activemq.pool.1.ssl.cert = /path/to/your.cert
40     #    plugin.activemq.pool.1.ssl.key = /path/to/your.key
41     #    plugin.activemq.pool.1.ssl.ca = /path/to/your.ca
42     #    plugin.activemq.pool.1.ssl.fallback = true
43     #
44     #    plugin.activemq.pool.2.host = stomp2.your.net
45     #    plugin.activemq.pool.2.port = 61613
46     #    plugin.activemq.pool.2.user = you
47     #    plugin.activemq.pool.2.password = secret
48     #    plugin.activemq.pool.2.ssl = false
49     #
50     # Using this method you can supply just STOMP_USER and STOMP_PASSWORD.  The port will
51     # default to 61613 if not specified.
52     #
53     # The ssl options are only usable in version of the Stomp gem newer than 1.2.2 where these
54     # will imply full SSL validation will be done and you'll only be able to connect to a
55     # ActiveMQ server that has a cert signed by the same CA.  If you only set ssl = true
56     # and do not supply the cert, key and ca properties or if you have an older gem it
57     # will fall back to unverified mode only if ssl.fallback is true
58     #
59     # In addition you can set the following options for the rubygem:
60     #
61     #     plugin.activemq.initial_reconnect_delay = 0.01
62     #     plugin.activemq.max_reconnect_delay = 30.0
63     #     plugin.activemq.use_exponential_back_off = true
64     #     plugin.activemq.back_off_multiplier = 2
65     #     plugin.activemq.max_reconnect_attempts = 0
66     #     plugin.activemq.randomize = false
67     #     plugin.activemq.timeout = -1
68     #
69     # You can set the initial connetion timeout - this is when your stomp server is simply
70     # unreachable - after which it would failover to the next in the pool:
71     #
72     #     plugin.activemq.connect_timeout = 30
73     #
74     # ActiveMQ JMS message priorities can be set:
75     #
76     #     plugin.activemq.priority = 4
77     #
78     # This plugin supports Stomp protocol 1.1 when combined with the stomp gem version
79     # 1.2.10 or newer.  To enable network heartbeats which will help keep the connection
80     # alive over NAT connections and aggresive session tracking firewalls you can set:
81     #
82     #     plugin.activemq.heartbeat_interval = 30
83     #
84     # which will cause a heartbeat to be sent on 30 second intervals and one to be expected
85     # from the broker every 30 seconds.  The shortest supported period is 30 seconds, if
86     # you set it lower it will get forced to 30 seconds.
87     #
88     # After 2 failures to receive a heartbeat the connection will be reset via the normal
89     # failover mechanism.
90     #
91     # By default if heartbeat_interval is set it will request Stomp 1.1 but support fallback
92     # to 1.0, but you can enable strict Stomp 1.1 only operation
93     #
94     #     plugin.activemq.stomp_1_0_fallback = 0
95     class Activemq<Base
96       attr_reader :connection
97
98       # Older stomp gems do not have these error classes, in order to be able to
99       # handle these exceptions if they are present and still support older gems
100       # we're assigning the constants to a dummy exception that will never be thrown
101       # by us.  End result is that the code catching these exceptions become noops on
102       # older gems but on newer ones they become usable and handle those new errors
103       # intelligently
104       class DummyError<RuntimeError; end
105
106       ::Stomp::Error = DummyError unless defined?(::Stomp::Error)
107       ::Stomp::Error::NoCurrentConnection = DummyError unless defined?(::Stomp::Error::NoCurrentConnection)
108       ::Stomp::Error::DuplicateSubscription = DummyError unless defined?(::Stomp::Error::DuplicateSubscription)
109
110       # Class for Stomp 1.1.9 callback based logging
111       class EventLogger
112         def on_connecting(params=nil)
113           Log.info("TCP Connection attempt %d to %s" % [params[:cur_conattempts], stomp_url(params)])
114         rescue
115         end
116
117         def on_connected(params=nil)
118           Log.info("Conncted to #{stomp_url(params)}")
119         rescue
120         end
121
122         def on_disconnect(params=nil)
123           Log.info("Disconnected from #{stomp_url(params)}")
124         rescue
125         end
126
127         def on_connectfail(params=nil)
128           Log.info("TCP Connection to #{stomp_url(params)} failed on attempt #{params[:cur_conattempts]}")
129         rescue
130         end
131
132         def on_miscerr(params, errstr)
133           Log.error("Unexpected error on connection #{stomp_url(params)}: #{errstr}")
134         rescue
135         end
136
137         def on_ssl_connecting(params)
138           Log.info("Estblishing SSL session with #{stomp_url(params)}")
139         rescue
140         end
141
142         def on_ssl_connected(params)
143           Log.info("SSL session established with #{stomp_url(params)}")
144         rescue
145         end
146
147         def on_ssl_connectfail(params)
148           Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}")
149         end
150
151         # Stomp 1.1+ - heart beat read (receive) failed.
152         def on_hbread_fail(params, ticker_data)
153           Log.error("Heartbeat read failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
154         rescue Exception => e
155         end
156
157         # Stomp 1.1+ - heart beat send (transmit) failed.
158         def on_hbwrite_fail(params, ticker_data)
159           Log.error("Heartbeat write failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
160         rescue Exception => e
161         end
162
163         # Log heart beat fires
164         def on_hbfire(params, srind, curt)
165           case srind
166             when "receive_fire"
167               Log.debug("Received heartbeat from %s: %s, %s" % [stomp_url(params), srind, curt])
168             when "send_fire"
169               Log.debug("Publishing heartbeat to %s: %s, %s" % [stomp_url(params), srind, curt])
170           end
171         rescue Exception => e
172         end
173
174         def stomp_url(params)
175           "%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
176         end
177       end
178
179       def initialize
180         @config = Config.instance
181         @subscriptions = []
182         @msgpriority = 0
183         @base64 = false
184       end
185
186       # Connects to the ActiveMQ middleware
187       def connect(connector = ::Stomp::Connection)
188         if @connection
189           Log.debug("Already connection, not re-initializing connection")
190           return
191         end
192
193         begin
194           @base64 = get_bool_option("activemq.base64", "false")
195           @msgpriority = get_option("activemq.priority", 0).to_i
196
197           pools = @config.pluginconf["activemq.pool.size"].to_i
198           hosts = []
199
200           1.upto(pools) do |poolnum|
201             host = {}
202
203             host[:host] = get_option("activemq.pool.#{poolnum}.host")
204             host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i
205             host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user")
206             host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password")
207             host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", "false")
208
209             host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", "false")) if host[:ssl]
210
211             Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
212             hosts << host
213           end
214
215           raise "No hosts found for the ActiveMQ connection pool" if hosts.size == 0
216
217           connection = {:hosts => hosts}
218
219           # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
220           # these can be guessed, the documentation isn't clear
221           connection[:initial_reconnect_delay] = Float(get_option("activemq.initial_reconnect_delay", 0.01))
222           connection[:max_reconnect_delay] = Float(get_option("activemq.max_reconnect_delay", 30.0))
223           connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", "true")
224           connection[:back_off_multiplier] = Integer(get_option("activemq.back_off_multiplier", 2))
225           connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0))
226           connection[:randomize] = get_bool_option("activemq.randomize", "false")
227           connection[:backup] = get_bool_option("activemq.backup", "false")
228           connection[:timeout] = Integer(get_option("activemq.timeout", -1))
229           connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30))
230           connection[:reliable] = true
231           connection[:connect_headers] = connection_headers
232           connection[:max_hbrlck_fails] = Integer(get_option("activemq.max_hbrlck_fails", 2))
233           connection[:max_hbread_fails] = Integer(get_option("activemq.max_hbread_fails", 2))
234
235           connection[:logger] = EventLogger.new
236
237           @connection = connector.new(connection)
238         rescue Exception => e
239           raise("Could not connect to ActiveMQ Server: #{e}")
240         end
241       end
242
243       def stomp_version
244         ::Stomp::Version::STRING
245       end
246
247       def connection_headers
248         headers = {:"accept-version" => "1.0"}
249
250         heartbeat_interval = Integer(get_option("activemq.heartbeat_interval", 0))
251         stomp_1_0_fallback = get_bool_option("activemq.stomp_1_0_fallback", true)
252
253         headers[:host] = get_option("activemq.vhost", "mcollective")
254
255         if heartbeat_interval > 0
256           unless Util.versioncmp(stomp_version, "1.2.10") >= 0
257             raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
258           end
259
260           if heartbeat_interval < 30
261             Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
262             heartbeat_interval = 30
263           end
264
265           heartbeat_interval = heartbeat_interval * 1000
266           headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]
267
268           if stomp_1_0_fallback
269             headers[:"accept-version"] = "1.1,1.0"
270           else
271             headers[:"accept-version"] = "1.1"
272           end
273         else
274           Log.warn("Connecting without STOMP 1.1 heartbeats, if you are using ActiveMQ 5.8 or newer consider setting plugin.activemq.heartbeat_interval")
275         end
276
277         headers
278       end
279
280       # Sets the SSL paramaters for a specific connection
281       def ssl_parameters(poolnum, fallback)
282         params = {:cert_file => get_cert_file(poolnum),
283                   :key_file => get_key_file(poolnum),
284                   :ts_files  => get_option("activemq.pool.#{poolnum}.ssl.ca", false)}
285
286         raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
287
288         raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
289         raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])
290
291         params[:ts_files].split(",").each do |ca|
292           raise "Cannot find CA file #{ca}" unless File.exist?(ca)
293         end
294
295         begin
296           Stomp::SSLParams.new(params)
297         rescue NameError
298           raise "Stomp gem >= 1.2.2 is needed"
299         end
300
301       rescue Exception => e
302         if fallback
303           Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
304           return true
305         else
306           Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
307           raise(e)
308         end
309       end
310
311       # Returns the name of the private key file used by ActiveMQ
312       # Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists,
313       # where X is the ActiveMQ pool number.
314       # If the environment variable doesn't exist, it will try and load the value from the config.
315       def get_key_file(poolnum)
316         ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_KEY" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.key", false)
317       end
318
319       # Returns the name of the certficate file used by ActiveMQ
320       # Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists,
321       # where X is the ActiveMQ pool number.
322       # If the environment variable doesn't exist, it will try and load the value from the config.
323       def get_cert_file(poolnum)
324         ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_CERT" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.cert", false)
325       end
326
327       # Receives a message from the ActiveMQ connection
328       def receive
329         Log.debug("Waiting for a message from ActiveMQ")
330
331         # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
332         # handling it sets the connection to closed.  If we happen to be receiving at just
333         # that time we will get an exception warning about the closed connection so handling
334         # that here with a sleep and a retry.
335         begin
336           msg = @connection.receive
337         rescue ::Stomp::Error::NoCurrentConnection
338           sleep 1
339           retry
340         end
341
342         Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
343       end
344
345       # Sends a message to the ActiveMQ connection
346       def publish(msg)
347         msg.base64_encode! if @base64
348
349         target = target_for(msg)
350
351         if msg.type == :direct_request
352           msg.discovered_hosts.each do |node|
353             target[:headers] = headers_for(msg, node)
354
355             Log.debug("Sending a direct message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
356
357             @connection.publish(target[:name], msg.payload, target[:headers])
358           end
359         else
360           target[:headers].merge!(headers_for(msg))
361
362           Log.debug("Sending a broadcast message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
363
364           @connection.publish(target[:name], msg.payload, target[:headers])
365         end
366       end
367
368       # Subscribe to a topic or queue
369       def subscribe(agent, type, collective)
370         source = make_target(agent, type, collective)
371
372         unless @subscriptions.include?(source[:id])
373           Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
374           @connection.subscribe(source[:name], source[:headers], source[:id])
375           @subscriptions << source[:id]
376         end
377       rescue ::Stomp::Error::DuplicateSubscription
378         Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
379       end
380
381       # Subscribe to a topic or queue
382       def unsubscribe(agent, type, collective)
383         source = make_target(agent, type, collective)
384
385         Log.debug("Unsubscribing from #{source[:name]}")
386         @connection.unsubscribe(source[:name], source[:headers], source[:id])
387         @subscriptions.delete(source[:id])
388       end
389
390       def target_for(msg)
391         if msg.type == :reply
392           target = {:name => msg.request.headers["reply-to"], :headers => {}}
393         elsif [:request, :direct_request].include?(msg.type)
394           target = make_target(msg.agent, msg.type, msg.collective)
395         else
396           raise "Don't now how to create a target for message type #{msg.type}"
397         end
398
399         return target
400       end
401
402       # Disconnects from the ActiveMQ connection
403       def disconnect
404         Log.debug("Disconnecting from ActiveMQ")
405         @connection.disconnect
406         @connection = nil
407       end
408
409       def headers_for(msg, identity=nil)
410         headers = {}
411
412         headers = {"priority" => @msgpriority} if @msgpriority > 0
413
414         headers["timestamp"] = (Time.now.utc.to_i * 1000).to_s
415
416         # set the expires header based on the TTL, we build a small additional
417         # timeout of 10 seconds in here to allow for network latency etc
418         headers["expires"] = ((Time.now.utc.to_i + msg.ttl + 10) * 1000).to_s
419
420         if [:request, :direct_request].include?(msg.type)
421           target = make_target(msg.agent, :reply, msg.collective)
422
423           if msg.reply_to
424             headers["reply-to"] = msg.reply_to
425           else
426             headers["reply-to"] = target[:name]
427           end
428
429           headers["mc_identity"] = identity if msg.type == :direct_request
430         end
431
432         return headers
433       end
434
435       def make_target(agent, type, collective)
436         raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
437         raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)
438
439         target = {:name => nil, :headers => {}}
440
441         case type
442           when :reply
443             target[:name] = ["/queue/" + collective, :reply, "#{Config.instance.identity}_#{$$}"].join(".")
444
445           when :broadcast
446             target[:name] = ["/topic/" + collective, agent, :agent].join(".")
447
448           when :request
449             target[:name] = ["/topic/" + collective, agent, :agent].join(".")
450
451           when :direct_request
452             target[:name] = ["/queue/" + collective, :nodes].join(".")
453
454           when :directed
455             target[:name] = ["/queue/" + collective, :nodes].join(".")
456             target[:headers]["selector"] = "mc_identity = '#{@config.identity}'"
457             target[:id] = "%s_directed_to_identity" % collective
458         end
459
460         target[:id] = target[:name] unless target[:id]
461
462         target
463       end
464
465       # looks in the environment first then in the config file
466       # for a specific option, accepts an optional default.
467       #
468       # raises an exception when it cant find a value anywhere
469       def get_env_or_option(env, opt, default=nil)
470         return ENV[env] if ENV.include?(env)
471         return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
472         return default if default
473
474         raise("No #{env} environment or plugin.#{opt} configuration option given")
475       end
476
477       # looks for a config option, accepts an optional default
478       #
479       # raises an exception when it cant find a value anywhere
480       def get_option(opt, default=nil)
481         return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
482         return default unless default.nil?
483
484         raise("No plugin.#{opt} configuration option given")
485       end
486
487       # looks up a boolean value in the config
488       def get_bool_option(val, default)
489         Util.str_to_bool(@config.pluginconf.fetch(val, default))
490       end
491     end
492   end
493 end
494
495 # vi:tabstop=4:expandtab:ai