bf097040a9d6fa8ac09f6216c5f8a285266a5230
[packages/precise/mcollective.git] / spec / unit / plugins / mcollective / connector / rabbitmq_spec.rb
1 #!/usr/bin/env rspec
2
3 require 'spec_helper'
4
5 MCollective::PluginManager.clear
6
7 require File.dirname(__FILE__) + '/../../../../../plugins/mcollective/connector/rabbitmq.rb'
8
9 # create the stomp error class here as it does not always exist
10 # all versions of the stomp gem and we do not want to tie tests
11 # to the stomp gem
12 module Stomp
13   module Error
14     class DuplicateSubscription < RuntimeError; end
15   end
16 end
17
18 module MCollective
19   module Connector
20     describe Rabbitmq do
21       before do
22         unless ::Stomp::Error.constants.map{|c| c.to_s}.include?("NoCurrentConnection")
23           class ::Stomp::Error::NoCurrentConnection < RuntimeError ; end
24         end
25
26         @config = mock
27         @config.stubs(:configured).returns(true)
28         @config.stubs(:identity).returns("rspec")
29         @config.stubs(:collectives).returns(["mcollective"])
30
31         logger = mock
32         logger.stubs(:log)
33         logger.stubs(:start)
34         Log.configure(logger)
35
36         Config.stubs(:instance).returns(@config)
37
38         @msg = mock
39         @msg.stubs(:base64_encode!)
40         @msg.stubs(:payload).returns("msg")
41         @msg.stubs(:agent).returns("agent")
42         @msg.stubs(:type).returns(:reply)
43         @msg.stubs(:collective).returns("mcollective")
44
45         @subscription = mock
46         @subscription.stubs("<<").returns(true)
47         @subscription.stubs("include?").returns(false)
48         @subscription.stubs("delete").returns(false)
49
50         @connection = mock
51         @connection.stubs(:subscribe).returns(true)
52         @connection.stubs(:unsubscribe).returns(true)
53
54         @c = Rabbitmq.new
55         @c.instance_variable_set("@subscriptions", @subscription)
56         @c.instance_variable_set("@connection", @connection)
57       end
58
59       describe "#initialize" do
60         it "should set the @config variable" do
61           c = Rabbitmq.new
62           c.instance_variable_get("@config").should == @config
63         end
64
65         it "should set @subscriptions to an empty list" do
66           c = Rabbitmq.new
67           c.instance_variable_get("@subscriptions").should == []
68         end
69       end
70
71       describe "#connect" do
72         it "should not try to reconnect if already connected" do
73           Log.expects(:debug).with("Already connection, not re-initializing connection").once
74           @c.connect
75         end
76
77         it "should support new style config" do
78           pluginconf = {"rabbitmq.pool.size" => "2",
79                         "rabbitmq.pool.1.host" => "host1",
80                         "rabbitmq.pool.1.port" => "6163",
81                         "rabbitmq.pool.1.user" => "user1",
82                         "rabbitmq.pool.1.password" => "password1",
83                         "rabbitmq.pool.1.ssl" => "false",
84                         "rabbitmq.pool.2.host" => "host2",
85                         "rabbitmq.pool.2.port" => "6164",
86                         "rabbitmq.pool.2.user" => "user2",
87                         "rabbitmq.pool.2.password" => "password2",
88                         "rabbitmq.pool.2.ssl" => "true",
89                         "rabbitmq.pool.2.ssl.fallback" => "true",
90                         "rabbitmq.initial_reconnect_delay" => "0.02",
91                         "rabbitmq.max_reconnect_delay" => "40",
92                         "rabbitmq.use_exponential_back_off" => "false",
93                         "rabbitmq.back_off_multiplier" => "3",
94                         "rabbitmq.max_reconnect_attempts" => "5",
95                         "rabbitmq.randomize" => "true",
96                         "rabbitmq.backup" => "true",
97                         "rabbitmq.timeout" => "1",
98                         "rabbitmq.vhost" => "mcollective",
99                         "rabbitmq.connect_timeout" => "5"}
100
101
102           ENV.delete("STOMP_USER")
103           ENV.delete("STOMP_PASSWORD")
104
105           @config.expects(:pluginconf).returns(pluginconf).at_least_once
106
107           Rabbitmq::EventLogger.expects(:new).returns("logger")
108
109           connector = mock
110           connector.expects(:new).with(:backup => true,
111                                        :back_off_multiplier => 3,
112                                        :max_reconnect_delay => 40.0,
113                                        :timeout => 1,
114                                        :connect_timeout => 5,
115                                        :use_exponential_back_off => false,
116                                        :max_reconnect_attempts => 5,
117                                        :initial_reconnect_delay => 0.02,
118                                        :randomize => true,
119                                        :reliable => true,
120                                        :logger => "logger",
121                                        :connect_headers => {'accept-version' => '1.0', 'host' => 'mcollective'},
122                                        :hosts => [{:passcode => 'password1',
123                                                    :host => 'host1',
124                                                    :port => 6163,
125                                                    :ssl => false,
126                                                    :login => 'user1'},
127                                                   {:passcode => 'password2',
128                                                    :host => 'host2',
129                                                    :port => 6164,
130                                                    :ssl => true,
131                                                    :login => 'user2'}
132           ])
133
134           @c.expects(:ssl_parameters).with(2, true).returns(true)
135
136           @c.instance_variable_set("@connection", nil)
137           @c.connect(connector)
138         end
139       end
140
141       describe "#ssl_paramaters" do
142         it "should ensure all settings are provided" do
143           pluginconf = {"rabbitmq.pool.1.host" => "host1",
144                         "rabbitmq.pool.1.port" => "6164",
145                         "rabbitmq.pool.1.user" => "user1",
146                         "rabbitmq.pool.1.password" => "password1",
147                         "rabbitmq.pool.1.ssl" => "true",
148                         "rabbitmq.pool.1.ssl.cert" => "rspec"}
149
150           @config.expects(:pluginconf).returns(pluginconf).at_least_once
151
152           expect { @c.ssl_parameters(1, false) }.to raise_error("cert, key and ca has to be supplied for verified SSL mode")
153         end
154
155         it "should verify the ssl files exist" do
156           pluginconf = {"rabbitmq.pool.1.host" => "host1",
157                         "rabbitmq.pool.1.port" => "6164",
158                         "rabbitmq.pool.1.user" => "user1",
159                         "rabbitmq.pool.1.password" => "password1",
160                         "rabbitmq.pool.1.ssl" => "true",
161                         "rabbitmq.pool.1.ssl.cert" => "rspec.cert",
162                         "rabbitmq.pool.1.ssl.key" => "rspec.key",
163                         "rabbitmq.pool.1.ssl.ca" => "rspec1.ca,rspec2.ca"}
164
165           @config.expects(:pluginconf).returns(pluginconf).at_least_once
166
167           File.expects(:exist?).with("rspec.cert").twice.returns(true)
168           File.expects(:exist?).with("rspec.key").twice.returns(true)
169           File.expects(:exist?).with("rspec1.ca").twice.returns(true)
170           File.expects(:exist?).with("rspec2.ca").twice.returns(false)
171
172           expect { @c.ssl_parameters(1, false) }.to raise_error("Cannot find CA file rspec2.ca")
173
174           @c.ssl_parameters(1, true).should == true
175         end
176
177         it "should support fallback mode when there are errors" do
178           pluginconf = {"rabbitmq.pool.1.host" => "host1",
179                         "rabbitmq.pool.1.port" => "6164",
180                         "rabbitmq.pool.1.user" => "user1",
181                         "rabbitmq.pool.1.password" => "password1",
182                         "rabbitmq.pool.1.ssl" => "true"}
183
184           @config.expects(:pluginconf).returns(pluginconf).at_least_once
185
186           @c.ssl_parameters(1, true).should == true
187         end
188
189         it "should fail if fallback isnt enabled" do
190           pluginconf = {"rabbitmq.pool.1.host" => "host1",
191                         "rabbitmq.pool.1.port" => "6164",
192                         "rabbitmq.pool.1.user" => "user1",
193                         "rabbitmq.pool.1.password" => "password1",
194                         "rabbitmq.pool.1.ssl" => "true"}
195
196           @config.expects(:pluginconf).returns(pluginconf).at_least_once
197
198           expect { @c.ssl_parameters(1, false) }.to raise_error
199         end
200       end
201
202       describe "#receive" do
203         it "should receive from the middleware" do
204           payload = mock
205           payload.stubs(:body).returns("msg")
206           payload.stubs(:headers).returns("headers")
207
208           @connection.expects(:receive).returns(payload)
209
210           Message.expects(:new).with("msg", payload, :base64 => true, :headers => "headers").returns("message")
211           @c.instance_variable_set("@base64", true)
212
213           received = @c.receive
214           received.should == "message"
215         end
216
217         it "should sleep and retry if recieving while disconnected" do
218           payload = mock
219           payload.stubs(:body).returns("msg")
220           payload.stubs(:headers).returns("headers")
221
222           Message.stubs(:new).returns("rspec")
223           @connection.expects(:receive).raises(::Stomp::Error::NoCurrentConnection).returns(payload).twice
224           @c.expects(:sleep).with(1)
225
226           @c.receive.should == "rspec"
227         end
228       end
229
230       describe "#publish" do
231         before do
232           @connection.stubs(:publish).with("test", "msg", {}).returns(true)
233         end
234
235         it "should base64 encode a message if configured to do so" do
236           @c.instance_variable_set("@base64", true)
237           @c.expects(:target_for).returns({:name => "test", :headers => {}})
238           @connection.expects(:publish).with("test", "msg", {})
239           @msg.expects(:base64_encode!)
240
241           @c.publish(@msg)
242         end
243
244         it "should not base64 encode if not configured to do so" do
245           @c.instance_variable_set("@base64", false)
246           @c.expects(:target_for).returns({:name => "test", :headers => {}})
247           @connection.expects(:publish).with("test", "msg", {})
248           @msg.expects(:base64_encode!).never
249
250           @c.publish(@msg)
251         end
252
253         it "should publish the correct message to the correct target with msgheaders" do
254           @connection.expects(:publish).with("test", "msg", {}).once
255           @c.expects(:target_for).returns({:name => "test", :headers => {}})
256
257           @c.publish(@msg)
258         end
259
260         it "should publish direct messages based on discovered_hosts" do
261           msg = mock
262           msg.stubs(:base64_encode!)
263           msg.stubs(:payload).returns("msg")
264           msg.stubs(:agent).returns("agent")
265           msg.stubs(:collective).returns("mcollective")
266           msg.stubs(:type).returns(:direct_request)
267           msg.stubs(:reply_to).returns("/topic/mcollective")
268           msg.expects(:discovered_hosts).returns(["one", "two"])
269
270           @connection.expects(:publish).with('/exchange/mcollective_directed/one', 'msg', {'reply-to' => '/temp-queue/mcollective_reply_agent'})
271           @connection.expects(:publish).with('/exchange/mcollective_directed/two', 'msg', {'reply-to' => '/temp-queue/mcollective_reply_agent'})
272
273           @c.publish(msg)
274         end
275       end
276
277       describe "#subscribe" do
278         it "should handle duplicate subscription errors" do
279           @connection.expects(:subscribe).raises(::Stomp::Error::DuplicateSubscription)
280           Log.expects(:error).with(regexp_matches(/already had a matching subscription, ignoring/))
281           @c.subscribe("test", :broadcast, "mcollective")
282         end
283
284         it "should use the make_target correctly" do
285           @c.expects("make_target").with("test", :broadcast, "mcollective").returns({:name => "test", :headers => {}})
286           @c.subscribe("test", :broadcast, "mcollective")
287         end
288
289         it "should check for existing subscriptions" do
290           @c.expects("make_target").with("test", :broadcast, "mcollective").returns({:name => "test", :headers => {}, :id => "rspec"})
291           @subscription.expects("include?").with("rspec").returns(false)
292           @connection.expects(:subscribe).never
293
294           @c.subscribe("test", :broadcast, "mcollective")
295         end
296
297         it "should subscribe to the middleware" do
298           @c.expects("make_target").with("test", :broadcast, "mcollective").returns({:name => "test", :headers => {}, :id => "rspec"})
299           @connection.expects(:subscribe).with("test", {}, "rspec")
300           @c.subscribe("test", :broadcast, "mcollective")
301         end
302
303         it "should add to the list of subscriptions" do
304           @c.expects("make_target").with("test", :broadcast, "mcollective").returns({:name => "test", :headers => {}, :id => "rspec"})
305           @subscription.expects("<<").with("rspec")
306           @c.subscribe("test", :broadcast, "mcollective")
307         end
308       end
309
310       describe "#unsubscribe" do
311         it "should use make_target correctly" do
312           @c.expects("make_target").with("test", :broadcast, "mcollective").returns({:name => "test", :headers => {}})
313           @c.unsubscribe("test", :broadcast, "mcollective")
314         end
315
316         it "should unsubscribe from the target" do
317           @c.expects("make_target").with("test", :broadcast, "mcollective").returns({:name => "test", :headers => {}, :id => "rspec"})
318           @connection.expects(:unsubscribe).with("test", {}, "rspec").once
319
320           @c.unsubscribe("test", :broadcast, "mcollective")
321         end
322
323         it "should delete the source from subscriptions" do
324           @c.expects("make_target").with("test", :broadcast, "mcollective").returns({:name => "test", :headers => {}, :id => "rspec"})
325           @subscription.expects(:delete).with("rspec").once
326
327           @c.unsubscribe("test", :broadcast, "mcollective")
328         end
329       end
330
331       describe "#target_for" do
332         it "should create reply targets based on reply-to headers in requests" do
333           message = mock
334           message.expects(:type).returns(:reply)
335
336           request = mock
337           request.expects(:headers).returns({"reply-to" => "foo"})
338
339           message.expects(:request).returns(request)
340
341           @c.target_for(message).should == {:name => "foo", :headers => {}, :id => ""}
342         end
343
344         it "should create new request targets" do
345           message = mock
346           message.expects(:type).returns(:request).times(3)
347           message.expects(:agent).returns("rspecagent")
348           message.expects(:collective).returns("mcollective")
349           message.expects(:reply_to).returns("/topic/rspec")
350
351           @c.expects(:make_target).with("rspecagent", :request, "mcollective", "/topic/rspec", nil)
352           @c.target_for(message)
353         end
354
355         it "should support direct requests" do
356           message = mock
357           message.expects(:type).returns(:direct_request).times(3)
358           message.expects(:agent).returns("rspecagent")
359           message.expects(:collective).returns("mcollective")
360           message.expects(:reply_to).returns("/topic/rspec")
361
362           @c.expects(:make_target).with("rspecagent", :direct_request, "mcollective", "/topic/rspec", nil)
363           @c.target_for(message)
364         end
365
366         it "should fail for unknown message types" do
367           message = mock
368           message.stubs(:type).returns(:fail)
369
370           expect {
371             @c.target_for(message)
372           }.to raise_error("Don't now how to create a target for message type fail")
373         end
374       end
375
376       describe "#disconnect" do
377         it "should disconnect from the stomp connection" do
378           @connection.expects(:disconnect)
379           @c.disconnect
380           @c.connection.should == nil
381         end
382       end
383
384       describe "#make_target" do
385         it "should create correct targets" do
386           @c.make_target("test", :reply, "mcollective").should == {:name => "/temp-queue/mcollective_reply_test", :headers => {}, :id => "mcollective_test_replies"}
387           @c.make_target("test", :broadcast, "mcollective").should == {:name => "/exchange/mcollective_broadcast/test", :headers => {"reply-to"=>"/temp-queue/mcollective_reply_test"}, :id => "mcollective_broadcast_test"}
388           @c.make_target("test", :request, "mcollective").should == {:name => "/exchange/mcollective_broadcast/test", :headers => {"reply-to"=>"/temp-queue/mcollective_reply_test"}, :id => "mcollective_broadcast_test"}
389           @c.make_target("test", :direct_request, "mcollective", nil, "rspec").should == {:headers=>{"reply-to"=>"/temp-queue/mcollective_reply_test"}, :name=>"/exchange/mcollective_directed/rspec", :id => nil}
390           @c.make_target("test", :directed, "mcollective").should == {:name => "/exchange/mcollective_directed/rspec", :headers=>{}, :id => "rspec_directed_to_identity"}
391           @c.make_target("test", :request, "mcollective", "/topic/rspec", "rspec").should == {:headers=>{"reply-to"=>"/topic/rspec"}, :name=>"/exchange/mcollective_broadcast/test", :id => "mcollective_broadcast_test"}
392         end
393
394         it "should raise an error for unknown collectives" do
395           expect {
396             @c.make_target("test", :broadcast, "foo")
397           }.to raise_error("Unknown collective 'foo' known collectives are 'mcollective'")
398         end
399
400         it "should raise an error for unknown types" do
401           expect {
402             @c.make_target("test", :test, "mcollective")
403           }.to raise_error("Unknown target type test")
404         end
405       end
406
407
408       describe "#get_env_or_option" do
409         it "should return the environment variable if set" do
410           ENV["test"] = "rspec_env_test"
411
412           @c.get_env_or_option("test", nil, nil).should == "rspec_env_test"
413
414           ENV.delete("test")
415         end
416
417         it "should return the config option if set" do
418           @config.expects(:pluginconf).returns({"test" => "rspec_test"}).twice
419           @c.get_env_or_option("test", "test", "test").should == "rspec_test"
420         end
421
422         it "should return default if nothing else matched" do
423           @config.expects(:pluginconf).returns({}).once
424           @c.get_env_or_option("test", "test", "test").should == "test"
425         end
426
427         it "should raise an error if no default is supplied" do
428           @config.expects(:pluginconf).returns({}).once
429
430           expect {
431             @c.get_env_or_option("test", "test")
432           }.to raise_error("No test environment or plugin.test configuration option given")
433         end
434       end
435
436       describe "#get_option" do
437         it "should return the config option if set" do
438           @config.expects(:pluginconf).returns({"test" => "rspec_test"}).twice
439           @c.get_option("test").should == "rspec_test"
440         end
441
442         it "should return default option was not found" do
443           @config.expects(:pluginconf).returns({}).once
444           @c.get_option("test", "test").should == "test"
445         end
446
447         it "should raise an error if no default is supplied" do
448           @config.expects(:pluginconf).returns({}).once
449
450           expect {
451             @c.get_option("test")
452           }.to raise_error("No plugin.test configuration option given")
453         end
454       end
455
456       describe "#get_bool_option" do
457         it "should return the default if option isnt set" do
458           @config.expects(:pluginconf).returns({}).once
459           @c.get_bool_option("test", "default").should == "default"
460         end
461
462         ["1", "yes", "true"].each do |boolean|
463           it "should map options to true correctly" do
464             @config.expects(:pluginconf).returns({"test" => boolean}).twice
465             @c.get_bool_option("test", "default").should == true
466           end
467         end
468
469         ["0", "no", "false"].each do |boolean|
470           it "should map options to false correctly" do
471             @config.expects(:pluginconf).returns({"test" => boolean}).twice
472             @c.get_bool_option("test", "default").should == false
473           end
474         end
475
476         it "should return default for non boolean options" do
477           @config.expects(:pluginconf).returns({"test" => "foo"}).twice
478           @c.get_bool_option("test", "default").should == "default"
479         end
480       end
481     end
482   end
483 end