Update version according to OSCI-856
[packages/precise/mcollective.git] / lib / mcollective / client.rb
1 module MCollective
2   # Helpers for writing clients that can talk to agents, do discovery and so forth
3   class Client
4     attr_accessor :options, :stats, :discoverer
5
6     def initialize(configfile)
7       @config = Config.instance
8       @config.loadconfig(configfile) unless @config.configured
9
10       @connection = PluginManager["connector_plugin"]
11       @security = PluginManager["security_plugin"]
12
13       @security.initiated_by = :client
14       @options = nil
15       @subscriptions = {}
16
17       @discoverer = Discovery.new(self)
18       @connection.connect
19     end
20
21     # Returns the configured main collective if no
22     # specific collective is specified as options
23     def collective
24       if @options[:collective].nil?
25         @config.main_collective
26       else
27         @options[:collective]
28       end
29     end
30
31     # Disconnects cleanly from the middleware
32     def disconnect
33       Log.debug("Disconnecting from the middleware")
34       @connection.disconnect
35     end
36
37     # Sends a request and returns the generated request id, doesn't wait for
38     # responses and doesn't execute any passed in code blocks for responses
39     def sendreq(msg, agent, filter = {})
40       request = createreq(msg, agent, filter)
41
42       Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
43
44       request.publish
45       request.requestid
46     end
47
48     def createreq(msg, agent, filter ={})
49       if msg.is_a?(Message)
50         request = msg
51         agent = request.agent
52       else
53         ttl = @options[:ttl] || @config.ttl
54         request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
55         request.reply_to = @options[:reply_to] if @options[:reply_to]
56       end
57
58       request.encode!
59       subscribe(agent, :reply) unless request.reply_to
60       request
61     end
62
63     def subscribe(agent, type)
64       unless @subscriptions.include?(agent)
65         subscription = Util.make_subscriptions(agent, type, collective)
66         Log.debug("Subscribing to #{type} target for agent #{agent}")
67
68         Util.subscribe(subscription)
69         @subscriptions[agent] = 1
70       end
71     end
72
73     def unsubscribe(agent, type)
74       if @subscriptions.include?(agent)
75         subscription = Util.make_subscriptions(agent, type, collective)
76         Log.debug("Unsubscribing #{type} target for #{agent}")
77
78         Util.unsubscribe(subscription)
79         @subscriptions.delete(agent)
80       end
81     end
82     # Blocking call that waits for ever for a message to arrive.
83     #
84     # If you give it a requestid this means you've previously send a request
85     # with that ID and now you just want replies that matches that id, in that
86     # case the current connection will just ignore all messages not directed at it
87     # and keep waiting for more till it finds a matching message.
88     def receive(requestid = nil)
89       reply = nil
90
91       begin
92         reply = @connection.receive
93         reply.type = :reply
94         reply.expected_msgid = requestid
95
96         reply.decode!
97         unless reply.requestid == requestid
98           raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
99         end
100       rescue SecurityValidationFailed => e
101         Log.warn("Ignoring a message that did not pass security validations")
102         retry
103       rescue MsgDoesNotMatchRequestID => e
104         Log.debug("Ignoring a message for some other client : #{e.message}")
105         retry
106       end
107
108       reply
109     end
110
111     # Performs a discovery of nodes matching the filter passed
112     # returns an array of nodes
113     #
114     # An integer limit can be supplied this will have the effect
115     # of the discovery being cancelled soon as it reached the
116     # requested limit of hosts
117     def discover(filter, timeout, limit=0)
118       discovered = @discoverer.discover(filter, timeout, limit)
119     end
120
121     # Send a request, performs the passed block for each response
122     #
123     # times = req("status", "mcollectived", options, client) {|resp|
124     #   pp resp
125     # }
126     #
127     # It returns a hash of times and timeouts for discovery and total run is taken from the options
128     # hash which in turn is generally built using MCollective::Optionparser
129     def req(body, agent=nil, options=false, waitfor=0, &block)
130       if body.is_a?(Message)
131         agent = body.agent
132         waitfor = body.discovered_hosts.size || 0
133         @options = body.options
134       end
135
136       @options = options if options
137       threaded = @options[:threaded]
138       timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
139       request = createreq(body, agent, @options[:filter])
140       publish_timeout = @options[:publish_timeout]
141       stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
142       STDOUT.sync = true
143       hosts_responded = 0
144
145
146       begin
147         if threaded
148           hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
149         else
150           hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
151         end
152       rescue Interrupt => e
153       ensure
154         unsubscribe(agent, :reply)
155       end
156
157       return update_stat(stat, hosts_responded, request.requestid)
158     end
159
160     # Starts the client receiver and publisher unthreaded.
161     # This is the default client behaviour.
162     def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
163       start_publisher(request, publish_timeout)
164       start_receiver(request.requestid, waitfor, timeout, &block)
165     end
166
167     # Starts the client receiver and publisher in threads.
168     # This is activated when the 'threader_client' configuration
169     # option is set.
170     def threaded_req(request, publish_timeout, timeout, waitfor, &block)
171       Log.debug("Starting threaded client")
172       publisher = Thread.new do
173         start_publisher(request, publish_timeout)
174       end
175
176       # When the client is threaded we add the publishing timeout to
177       # the agent timeout so that the receiver doesn't time out before
178       # publishing has finished in cases where publish_timeout >= timeout.
179       total_timeout = publish_timeout + timeout
180       hosts_responded = 0
181
182       receiver = Thread.new do
183         hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
184       end
185
186       receiver.join
187       hosts_responded
188     end
189
190     # Starts the request publishing routine
191     def start_publisher(request, publish_timeout)
192       Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
193       begin
194         Timeout.timeout(publish_timeout) do
195           Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
196           request.publish
197         end
198       rescue Timeout::Error => e
199         Log.warn("Could not publish all messages. Publishing timed out.")
200       end
201     end
202
203     # Starts the response receiver routine
204     # Expected to return the amount of received responses.
205     def start_receiver(requestid, waitfor, timeout, &block)
206       Log.debug("Starting response receiver with timeout of #{timeout}")
207       hosts_responded = 0
208       begin
209         Timeout.timeout(timeout) do
210           begin
211             resp = receive(requestid)
212             yield resp.payload
213             hosts_responded += 1
214           end while (waitfor == 0 || hosts_responded < waitfor)
215         end
216       rescue Timeout::Error => e
217         Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
218       end
219
220       hosts_responded
221     end
222
223     def update_stat(stat, hosts_responded, requestid)
224       stat[:totaltime] = Time.now.to_f - stat[:starttime]
225       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
226       stat[:responses] = hosts_responded
227       stat[:noresponsefrom] = []
228       stat[:requestid] = requestid
229
230       @stats = stat
231     end
232
233     def discovered_req(body, agent, options=false)
234       raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
235     end
236
237     # Prints out the stats returns from req and discovered_req in a nice way
238     def display_stats(stats, options=false, caption="stomp call summary")
239       options = @options unless options
240
241       if options[:verbose]
242         puts("\n---- #{caption} ----")
243
244         if stats[:discovered]
245           puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
246         else
247           puts("           Nodes: #{stats[:responses]}")
248         end
249
250         printf("      Start Time: %s\n", Time.at(stats[:starttime]))
251         printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
252         printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
253         printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
254
255       else
256         if stats[:discovered]
257           printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
258         else
259           printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
260         end
261       end
262
263       if stats[:noresponsefrom].size > 0
264         puts("\nNo response from:\n")
265
266         stats[:noresponsefrom].each do |c|
267           puts if c % 4 == 1
268           printf("%30s", c)
269         end
270
271         puts
272       end
273     end
274   end
275 end