Added mcollective 2.3.1 package
[packages/trusty/mcollective.git] / lib / mcollective / client.rb
1 module MCollective
2   # Helpers for writing clients that can talk to agents, do discovery and so forth
3   class Client
4     attr_accessor :options, :stats, :discoverer
5
6     def initialize(configfile)
7       @config = Config.instance
8       @config.loadconfig(configfile) unless @config.configured
9
10       @connection = PluginManager["connector_plugin"]
11       @security = PluginManager["security_plugin"]
12
13       @security.initiated_by = :client
14       @options = nil
15       @subscriptions = {}
16
17       @discoverer = Discovery.new(self)
18       @connection.connect
19     end
20
21     # Returns the configured main collective if no
22     # specific collective is specified as options
23     def collective
24       if @options[:collective].nil?
25         @config.main_collective
26       else
27         @options[:collective]
28       end
29     end
30
31     # Disconnects cleanly from the middleware
32     def disconnect
33       Log.debug("Disconnecting from the middleware")
34       @connection.disconnect
35     end
36
37     # Sends a request and returns the generated request id, doesn't wait for
38     # responses and doesn't execute any passed in code blocks for responses
39     def sendreq(msg, agent, filter = {})
40       if msg.is_a?(Message)
41         request = msg
42         agent = request.agent
43       else
44         ttl = @options[:ttl] || @config.ttl
45         request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
46         request.reply_to = @options[:reply_to] if @options[:reply_to]
47       end
48
49       request.encode!
50
51       Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
52
53       subscribe(agent, :reply) unless request.reply_to
54
55       request.publish
56
57       request.requestid
58     end
59
60     def subscribe(agent, type)
61       unless @subscriptions.include?(agent)
62         subscription = Util.make_subscriptions(agent, type, collective)
63         Log.debug("Subscribing to #{type} target for agent #{agent}")
64
65         Util.subscribe(subscription)
66         @subscriptions[agent] = 1
67       end
68     end
69
70     def unsubscribe(agent, type)
71       if @subscriptions.include?(agent)
72         subscription = Util.make_subscriptions(agent, type, collective)
73         Log.debug("Unsubscribing #{type} target for #{agent}")
74
75         Util.unsubscribe(subscription)
76         @subscriptions.delete(agent)
77       end
78     end
79     # Blocking call that waits for ever for a message to arrive.
80     #
81     # If you give it a requestid this means you've previously send a request
82     # with that ID and now you just want replies that matches that id, in that
83     # case the current connection will just ignore all messages not directed at it
84     # and keep waiting for more till it finds a matching message.
85     def receive(requestid = nil)
86       reply = nil
87
88       begin
89         reply = @connection.receive
90         reply.type = :reply
91         reply.expected_msgid = requestid
92
93         reply.decode!
94
95         reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
96
97         raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
98       rescue SecurityValidationFailed => e
99         Log.warn("Ignoring a message that did not pass security validations")
100         retry
101       rescue MsgDoesNotMatchRequestID => e
102         Log.debug("Ignoring a message for some other client")
103         retry
104       end
105
106       reply
107     end
108
109     # Performs a discovery of nodes matching the filter passed
110     # returns an array of nodes
111     #
112     # An integer limit can be supplied this will have the effect
113     # of the discovery being cancelled soon as it reached the
114     # requested limit of hosts
115     def discover(filter, timeout, limit=0)
116       discovered = @discoverer.discover(filter, timeout, limit)
117     end
118
119     # Send a request, performs the passed block for each response
120     #
121     # times = req("status", "mcollectived", options, client) {|resp|
122     #   pp resp
123     # }
124     #
125     # It returns a hash of times and timeouts for discovery and total run is taken from the options
126     # hash which in turn is generally built using MCollective::Optionparser
127     def req(body, agent=nil, options=false, waitfor=0)
128       if body.is_a?(Message)
129         agent = body.agent
130         waitfor = body.discovered_hosts.size || 0
131         @options = body.options
132       end
133
134       @options = options if options
135
136       stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
137
138       timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
139
140       STDOUT.sync = true
141
142       hosts_responded = 0
143       reqid = nil
144
145       begin
146         Log.debug("Publishing request to agent %s with timeout %d" % [agent, timeout])
147
148         Timeout.timeout(timeout) do
149           reqid = sendreq(body, agent, @options[:filter])
150
151           loop do
152             resp = receive(reqid)
153
154             hosts_responded += 1
155
156             yield(resp.payload)
157
158             break if (waitfor != 0 && hosts_responded >= waitfor)
159           end
160         end
161       rescue Interrupt => e
162       rescue Timeout::Error => e
163       ensure
164         unsubscribe(agent, :reply)
165       end
166
167       stat[:totaltime] = Time.now.to_f - stat[:starttime]
168       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
169       stat[:responses] = hosts_responded
170       stat[:noresponsefrom] = []
171       stat[:requestid] = reqid
172
173       @stats = stat
174       return stat
175     end
176
177     def discovered_req(body, agent, options=false)
178       raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
179     end
180
181     # Prints out the stats returns from req and discovered_req in a nice way
182     def display_stats(stats, options=false, caption="stomp call summary")
183       options = @options unless options
184
185       if options[:verbose]
186         puts("\n---- #{caption} ----")
187
188         if stats[:discovered]
189           puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
190         else
191           puts("           Nodes: #{stats[:responses]}")
192         end
193
194         printf("      Start Time: %s\n", Time.at(stats[:starttime]))
195         printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
196         printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
197         printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
198
199       else
200         if stats[:discovered]
201           printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
202         else
203           printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
204         end
205       end
206
207       if stats[:noresponsefrom].size > 0
208         puts("\nNo response from:\n")
209
210         stats[:noresponsefrom].each do |c|
211           puts if c % 4 == 1
212           printf("%30s", c)
213         end
214
215         puts
216       end
217     end
218   end
219 end