Update version according to OSCI-856
[packages/precise/mcollective.git] / plugins / mcollective / connector / rabbitmq.rb
1 require 'stomp'
2
3 module MCollective
4   module Connector
5     class Rabbitmq<Base
6       attr_reader :connection
7
8       class EventLogger
9         def on_connecting(params=nil)
10           Log.info("TCP Connection attempt %d to %s" % [params[:cur_conattempts], stomp_url(params)])
11         rescue
12         end
13
14         def on_connected(params=nil)
15           Log.info("Conncted to #{stomp_url(params)}")
16         rescue
17         end
18
19         def on_disconnect(params=nil)
20           Log.info("Disconnected from #{stomp_url(params)}")
21         rescue
22         end
23
24         def on_connectfail(params=nil)
25           Log.info("TCP Connection to #{stomp_url(params)} failed on attempt #{params[:cur_conattempts]}")
26         rescue
27         end
28
29         def on_miscerr(params, errstr)
30           Log.error("Unexpected error on connection #{stomp_url(params)}: #{errstr}")
31         rescue
32         end
33
34         def on_ssl_connecting(params)
35           Log.info("Estblishing SSL session with #{stomp_url(params)}")
36         rescue
37         end
38
39         def on_ssl_connected(params)
40           Log.info("SSL session established with #{stomp_url(params)}")
41         rescue
42         end
43
44         def on_ssl_connectfail(params)
45           Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}")
46         end
47
48         # Stomp 1.1+ - heart beat read (receive) failed.
49         def on_hbread_fail(params, ticker_data)
50           Log.error("Heartbeat read failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
51         rescue Exception => e
52         end
53
54         # Stomp 1.1+ - heart beat send (transmit) failed.
55         def on_hbwrite_fail(params, ticker_data)
56           Log.error("Heartbeat write failed from '%s': %s" % [stomp_url(params), ticker_data.inspect])
57         rescue Exception => e
58         end
59
60         # Log heart beat fires
61         def on_hbfire(params, srind, curt)
62           case srind
63             when "receive_fire"
64               Log.debug("Received heartbeat from %s: %s, %s" % [stomp_url(params), srind, curt])
65             when "send_fire"
66               Log.debug("Publishing heartbeat to %s: %s, %s" % [stomp_url(params), srind, curt])
67           end
68         rescue Exception => e
69         end
70
71         def stomp_url(params)
72           "%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
73         end
74       end
75
76       def initialize
77         @config = Config.instance
78         @subscriptions = []
79         @base64 = false
80       end
81
82       # Connects to the RabbitMQ middleware
83       def connect(connector = ::Stomp::Connection)
84         if @connection
85           Log.debug("Already connection, not re-initializing connection")
86           return
87         end
88
89         begin
90           @base64 = get_bool_option("rabbitmq.base64", "false")
91
92           pools = @config.pluginconf["rabbitmq.pool.size"].to_i
93           hosts = []
94
95           1.upto(pools) do |poolnum|
96             host = {}
97
98             host[:host] = get_option("rabbitmq.pool.#{poolnum}.host")
99             host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 61613).to_i
100             host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user")
101             host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password")
102             host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", "false")
103             host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", "false")) if host[:ssl]
104
105             Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
106             hosts << host
107           end
108
109           raise "No hosts found for the RabbitMQ connection pool" if hosts.size == 0
110
111           connection = {:hosts => hosts}
112
113           # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
114           # these can be guessed, the documentation isn't clear
115           connection[:initial_reconnect_delay] = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01))
116           connection[:max_reconnect_delay] = Float(get_option("rabbitmq.max_reconnect_delay", 30.0))
117           connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", "true")
118           connection[:back_off_multiplier] = Integer(get_option("rabbitmq.back_off_multiplier", 2))
119           connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0))
120           connection[:randomize] = get_bool_option("rabbitmq.randomize", "false")
121           connection[:backup] = get_bool_option("rabbitmq.backup", "false")
122
123           connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1))
124           connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30))
125           connection[:reliable] = true
126           connection[:max_hbrlck_fails] = Integer(get_option("rabbitmq.max_hbrlck_fails", 2))
127           connection[:max_hbread_fails] = Integer(get_option("rabbitmq.max_hbread_fails", 2))
128
129           connection[:connect_headers] = connection_headers
130
131           connection[:logger] = EventLogger.new
132
133           @connection = connector.new(connection)
134         rescue Exception => e
135           raise("Could not connect to RabbitMQ Server: #{e}")
136         end
137       end
138
139       def connection_headers
140         headers = {:"accept-version" => "1.0"}
141
142         heartbeat_interval = Integer(get_option("rabbitmq.heartbeat_interval", 0))
143         stomp_1_0_fallback = get_bool_option("rabbitmq.stomp_1_0_fallback", true)
144
145         headers[:host] = get_option("rabbitmq.vhost", "/")
146
147         if heartbeat_interval > 0
148           unless Util.versioncmp(stomp_version, "1.2.10") >= 0
149             raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
150           end
151
152           if heartbeat_interval < 30
153             Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
154             heartbeat_interval = 30
155           end
156
157           heartbeat_interval = heartbeat_interval * 1000
158           headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]
159
160           if stomp_1_0_fallback
161             headers[:"accept-version"] = "1.1,1.0"
162           else
163             headers[:"accept-version"] = "1.1"
164           end
165         else
166           Log.warn("Connecting without STOMP 1.1 heartbeats, consider setting plugin.rabbitmq.heartbeat_interval")
167         end
168
169         headers
170       end
171
172       def stomp_version
173         ::Stomp::Version::STRING
174       end
175
176       # Sets the SSL paramaters for a specific connection
177       def ssl_parameters(poolnum, fallback)
178         params = {:cert_file => get_cert_file(poolnum),
179                   :key_file  => get_key_file(poolnum),
180                   :ts_files  => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false)}
181
182         raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
183
184         raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
185         raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])
186
187         params[:ts_files].split(",").each do |ca|
188           raise "Cannot find CA file #{ca}" unless File.exist?(ca)
189         end
190
191         begin
192           Stomp::SSLParams.new(params)
193         rescue NameError
194           raise "Stomp gem >= 1.2.2 is needed"
195         end
196
197       rescue Exception => e
198         if fallback
199           Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
200           return true
201         else
202           Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
203           raise(e)
204         end
205       end
206
207       # Returns the name of the private key file used by RabbitMQ
208       # Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_KEY exists,
209       # where X is the RabbitMQ pool number.
210       # If the environment variable doesn't exist, it will try and load the value from the config.
211       def get_key_file(poolnum)
212         ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_KEY" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.key", false)
213       end
214
215       # Returns the name of the certificate file used by RabbitMQ
216       # Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_CERT exists,
217       # where X is the RabbitMQ pool number.
218       # If the environment variable doesn't exist, it will try and load the value from the config.
219       def get_cert_file(poolnum)
220         ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_CERT" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false)
221       end
222
223       # Receives a message from the RabbitMQ connection
224       def receive
225         Log.debug("Waiting for a message from RabbitMQ")
226
227         # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
228         # handling it sets the connection to closed.  If we happen to be receiving at just
229         # that time we will get an exception warning about the closed connection so handling
230         # that here with a sleep and a retry.
231         begin
232           msg = @connection.receive
233         rescue ::Stomp::Error::NoCurrentConnection
234           sleep 1
235           retry
236         end
237
238         raise "Received a processing error from RabbitMQ: '%s'" % msg.body.chomp if msg.body =~ /Processing error/
239
240         Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
241       end
242
243       # Sends a message to the RabbitMQ connection
244       def publish(msg)
245         msg.base64_encode! if @base64
246
247         if msg.type == :direct_request
248           msg.discovered_hosts.each do |node|
249             target = target_for(msg, node)
250
251             Log.debug("Sending a direct message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
252
253             @connection.publish(target[:name], msg.payload, target[:headers])
254           end
255         else
256           target = target_for(msg)
257
258           Log.debug("Sending a broadcast message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
259
260           @connection.publish(target[:name], msg.payload, target[:headers])
261         end
262       end
263
264       def target_for(msg, node=nil)
265         if msg.type == :reply
266           target = {:name => msg.request.headers["reply-to"], :headers => {}, :id => ""}
267
268         elsif [:request, :direct_request].include?(msg.type)
269           target = make_target(msg.agent, msg.type, msg.collective, msg.reply_to, node)
270
271         else
272           raise "Don't now how to create a target for message type #{msg.type}"
273
274         end
275
276         # marks messages as valid for ttl + 10 seconds, we do this here
277         # rather than in make_target as this should only be set on publish
278         target[:headers]["expiration"] = ((msg.ttl + 10) * 1000).to_s
279
280         return target
281       end
282
283       def make_target(agent, type, collective, reply_to=nil, node=nil)
284         raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
285         raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)
286
287         target = {:name => "", :headers => {}, :id => nil}
288
289         if get_bool_option("rabbitmq.use_reply_exchange", false)
290           reply_path = "/exchange/mcollective_reply/%s_%s" % [ @config.identity, $$ ]
291         else
292           reply_path = "/temp-queue/mcollective_reply_%s" % agent
293         end
294         case type
295           when :reply # receiving replies on a temp queue
296             target[:name] = reply_path
297             target[:id] = "mcollective_%s_replies" % agent
298
299           when :broadcast, :request # publishing a request to all nodes with an agent
300             target[:name] = "/exchange/%s_broadcast/%s" % [collective, agent]
301             if reply_to
302               target[:headers]["reply-to"] = reply_to
303             else
304               target[:headers]["reply-to"] = reply_path
305             end
306             target[:id] = "%s_broadcast_%s" % [collective, agent]
307
308           when :direct_request # a request to a specific node
309             raise "Directed requests need to have a node identity" unless node
310
311             target[:name] = "/exchange/%s_directed/%s" % [ collective, node]
312             target[:headers]["reply-to"] = reply_path
313
314           when :directed # subscribing to directed messages
315             target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ]
316             target[:id] = "%s_%s_directed_to_identity" % [ collective, @config.identity ]
317         end
318
319         target
320       end
321
322       # Subscribe to a topic or queue
323       def subscribe(agent, type, collective)
324         return if type == :reply
325
326         source = make_target(agent, type, collective)
327
328         unless @subscriptions.include?(source[:id])
329           Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
330           @connection.subscribe(source[:name], source[:headers], source[:id])
331           @subscriptions << source[:id]
332         end
333       rescue ::Stomp::Error::DuplicateSubscription
334         Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
335       end
336
337       # Subscribe to a topic or queue
338       def unsubscribe(agent, type, collective)
339         return if type == :reply
340
341         source = make_target(agent, type, collective)
342
343         Log.debug("Unsubscribing from #{source[:name]}")
344         @connection.unsubscribe(source[:name], source[:headers], source[:id])
345         @subscriptions.delete(source[:id])
346       end
347
348       # Disconnects from the RabbitMQ connection
349       def disconnect
350         Log.debug("Disconnecting from RabbitMQ")
351         @connection.disconnect
352         @connection = nil
353       end
354
355       # looks in the environment first then in the config file
356       # for a specific option, accepts an optional default.
357       #
358       # raises an exception when it cant find a value anywhere
359       def get_env_or_option(env, opt, default=nil)
360         return ENV[env] if ENV.include?(env)
361         return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
362         return default if default
363
364         raise("No #{env} environment or plugin.#{opt} configuration option given")
365       end
366
367       # looks for a config option, accepts an optional default
368       #
369       # raises an exception when it cant find a value anywhere
370       def get_option(opt, default=nil)
371         return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
372         return default unless default.nil?
373
374         raise("No plugin.#{opt} configuration option given")
375       end
376
377       # looks up a boolean value in the config
378       def get_bool_option(val, default)
379         Util.str_to_bool(@config.pluginconf.fetch(val, default))
380       end
381     end
382   end
383 end
384
385 # vi:tabstop=4:expandtab:ai