2bb3b1c95dcfea70a8702d9e23181669ab9d89eb
[packages/precise/mcollective.git] / plugins / mcollective / connector / rabbitmq.rb
1 require 'stomp'
2
3 module MCollective
4   module Connector
5     class Rabbitmq<Base
6       attr_reader :connection
7
8       class EventLogger
9         def on_connecting(params=nil)
10           Log.info("TCP Connection attempt %d to %s" % [params[:cur_conattempts], stomp_url(params)])
11         rescue
12         end
13
14         def on_connected(params=nil)
15           Log.info("Conncted to #{stomp_url(params)}")
16         rescue
17         end
18
19         def on_disconnect(params=nil)
20           Log.info("Disconnected from #{stomp_url(params)}")
21         rescue
22         end
23
24         def on_connectfail(params=nil)
25           Log.info("TCP Connection to #{stomp_url(params)} failed on attempt #{params[:cur_conattempts]}")
26         rescue
27         end
28
29         def on_miscerr(params, errstr)
30           Log.error("Unexpected error on connection #{stomp_url(params)}: #{errstr}")
31         rescue
32         end
33
34         def on_ssl_connecting(params)
35           Log.info("Estblishing SSL session with #{stomp_url(params)}")
36         rescue
37         end
38
39         def on_ssl_connected(params)
40           Log.info("SSL session established with #{stomp_url(params)}")
41         rescue
42         end
43
44         def on_ssl_connectfail(params)
45           Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}")
46         end
47
48         def stomp_url(params)
49           "%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
50         end
51       end
52
53       def initialize
54         @config = Config.instance
55         @subscriptions = []
56         @base64 = false
57       end
58
59       # Connects to the RabbitMQ middleware
60       def connect(connector = ::Stomp::Connection)
61         if @connection
62           Log.debug("Already connection, not re-initializing connection")
63           return
64         end
65
66         begin
67           @base64 = get_bool_option("rabbitmq.base64", false)
68
69           pools = @config.pluginconf["rabbitmq.pool.size"].to_i
70           hosts = []
71
72           1.upto(pools) do |poolnum|
73             host = {}
74
75             host[:host] = get_option("rabbitmq.pool.#{poolnum}.host")
76             host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 6163).to_i
77             host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user")
78             host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password")
79             host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", false)
80
81             host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", false)) if host[:ssl]
82
83             Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
84             hosts << host
85           end
86
87           raise "No hosts found for the RabbitMQ connection pool" if hosts.size == 0
88
89           connection = {:hosts => hosts}
90
91           # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
92           # these can be guessed, the documentation isn't clear
93           connection[:initial_reconnect_delay] = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01))
94           connection[:max_reconnect_delay] = Float(get_option("rabbitmq.max_reconnect_delay", 30.0))
95           connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", true)
96           connection[:back_off_multiplier] = Integer(get_option("rabbitmq.back_off_multiplier", 2))
97           connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0))
98           connection[:randomize] = get_bool_option("rabbitmq.randomize", false)
99           connection[:backup] = get_bool_option("rabbitmq.backup", false)
100           connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1))
101           connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30))
102           connection[:reliable] = true
103
104           # RabbitMQ and Stomp supports vhosts, this sets it in a way compatible with RabbitMQ and
105           # force the version to 1.0, 1.1 support will be added in future
106           connection[:connect_headers] = {"accept-version" => '1.0', "host" => get_option("rabbitmq.vhost", "/")}
107
108           connection[:logger] = EventLogger.new
109
110           @connection = connector.new(connection)
111         rescue Exception => e
112           raise("Could not connect to RabbitMQ Server: #{e}")
113         end
114       end
115
116       # Sets the SSL paramaters for a specific connection
117       def ssl_parameters(poolnum, fallback)
118         params = {:cert_file => get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false),
119                   :key_file  => get_option("rabbitmq.pool.#{poolnum}.ssl.key", false),
120                   :ts_files  => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false)}
121
122         raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
123
124         raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
125         raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])
126
127         params[:ts_files].split(",").each do |ca|
128           raise "Cannot find CA file #{ca}" unless File.exist?(ca)
129         end
130
131         begin
132           Stomp::SSLParams.new(params)
133         rescue NameError
134           raise "Stomp gem >= 1.2.2 is needed"
135         end
136
137       rescue Exception => e
138         if fallback
139           Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
140           return true
141         else
142           Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
143           raise(e)
144         end
145       end
146
147       # Receives a message from the RabbitMQ connection
148       def receive
149         Log.debug("Waiting for a message from RabbitMQ")
150
151         # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
152         # handling it sets the connection to closed.  If we happen to be receiving at just
153         # that time we will get an exception warning about the closed connection so handling
154         # that here with a sleep and a retry.
155         begin
156           msg = @connection.receive
157         rescue ::Stomp::Error::NoCurrentConnection
158           sleep 1
159           retry
160         end
161
162         raise "Received a processing error from RabbitMQ: '%s'" % msg.body.chomp if msg.body =~ /Processing error/
163
164         Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
165       end
166
167       # Sends a message to the RabbitMQ connection
168       def publish(msg)
169         msg.base64_encode! if @base64
170
171         if msg.type == :direct_request
172           msg.discovered_hosts.each do |node|
173             target = target_for(msg, node)
174
175             Log.debug("Sending a direct message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
176
177             @connection.publish(target[:name], msg.payload, target[:headers])
178           end
179         else
180           target = target_for(msg)
181
182           Log.debug("Sending a broadcast message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
183
184           @connection.publish(target[:name], msg.payload, target[:headers])
185         end
186       end
187
188       def target_for(msg, node=nil)
189         if msg.type == :reply
190           target = {:name => msg.request.headers["reply-to"], :headers => {}, :id => ""}
191
192         elsif [:request, :direct_request].include?(msg.type)
193           target = make_target(msg.agent, msg.type, msg.collective, node)
194
195         else
196           raise "Don't now how to create a target for message type #{msg.type}"
197
198         end
199
200         return target
201       end
202
203       def make_target(agent, type, collective, node=nil)
204         raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
205         raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)
206
207         target = {:name => "", :headers => {}, :id => nil}
208
209         case type
210           when :reply # receiving replies on a temp queue
211             target[:name] = "/temp-queue/mcollective_reply_%s" % agent
212             target[:id] = "mcollective_%s_replies" % agent
213
214           when :broadcast, :request # publishing a request to all nodes with an agent
215             target[:name] = "/exchange/%s_broadcast/%s" % [collective, agent]
216             target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
217             target[:id] = "%s_broadcast_%s" % [collective, agent]
218
219           when :direct_request # a request to a specific node
220             raise "Directed requests need to have a node identity" unless node
221
222             target[:name] = "/exchange/%s_directed/%s" % [ collective, node]
223             target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
224
225           when :directed # subscribing to directed messages
226             target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ]
227             target[:id] = "%s_directed_to_identity" % @config.identity
228         end
229
230         target
231       end
232
233       # Subscribe to a topic or queue
234       def subscribe(agent, type, collective)
235         return if type == :reply
236
237         source = make_target(agent, type, collective)
238
239         unless @subscriptions.include?(source[:id])
240           Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
241           @connection.subscribe(source[:name], source[:headers], source[:id])
242           @subscriptions << source[:id]
243         end
244       rescue ::Stomp::Error::DuplicateSubscription
245         Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
246       end
247
248       # Subscribe to a topic or queue
249       def unsubscribe(agent, type, collective)
250         return if type == :reply
251
252         source = make_target(agent, type, collective)
253
254         Log.debug("Unsubscribing from #{source[:name]}")
255         @connection.unsubscribe(source[:name], source[:headers], source[:id])
256         @subscriptions.delete(source[:id])
257       end
258
259       # Disconnects from the RabbitMQ connection
260       def disconnect
261         Log.debug("Disconnecting from RabbitMQ")
262         @connection.disconnect
263         @connection = nil
264       end
265
266       # looks in the environment first then in the config file
267       # for a specific option, accepts an optional default.
268       #
269       # raises an exception when it cant find a value anywhere
270       def get_env_or_option(env, opt, default=nil)
271         return ENV[env] if ENV.include?(env)
272         return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
273         return default if default
274
275         raise("No #{env} environment or plugin.#{opt} configuration option given")
276       end
277
278       # looks for a config option, accepts an optional default
279       #
280       # raises an exception when it cant find a value anywhere
281       def get_option(opt, default=nil)
282         return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
283         return default unless default.nil?
284
285         raise("No plugin.#{opt} configuration option given")
286       end
287
288       # gets a boolean option from the config, supports y/n/true/false/1/0
289       def get_bool_option(opt, default)
290         return default unless @config.pluginconf.include?(opt)
291
292         val = @config.pluginconf[opt]
293
294         if val =~ /^1|yes|true/
295           return true
296         elsif val =~ /^0|no|false/
297           return false
298         else
299           return default
300         end
301       end
302     end
303   end
304 end
305
306 # vi:tabstop=4:expandtab:ai