Parent

Class Index [+]

Quicksearch

MCollective::Client

Helpers for writing clients that can talk to agents, do discovery and so forth

Attributes

options[RW]

(Not documented)

stats[RW]

(Not documented)

discoverer[RW]

(Not documented)

Public Class Methods

new(configfile) click to toggle source

(Not documented)

    # File lib/mcollective/client.rb, line 6
 6:     def initialize(configfile)
 7:       @config = Config.instance
 8:       @config.loadconfig(configfile) unless @config.configured
 9: 
10:       @connection = PluginManager["connector_plugin"]
11:       @security = PluginManager["security_plugin"]
12: 
13:       @security.initiated_by = :client
14:       @options = nil
15:       @subscriptions = {}
16: 
17:       @discoverer = Discovery.new(self)
18:       @connection.connect
19:     end

Public Instance Methods

collective() click to toggle source

Returns the configured main collective if no specific collective is specified as options

    # File lib/mcollective/client.rb, line 23
23:     def collective
24:       if @options[:collective].nil?
25:         @config.main_collective
26:       else
27:         @options[:collective]
28:       end
29:     end
createreq(msg, agent, filter ={}) click to toggle source

(Not documented)

    # File lib/mcollective/client.rb, line 48
48:     def createreq(msg, agent, filter ={})
49:       if msg.is_a?(Message)
50:         request = msg
51:         agent = request.agent
52:       else
53:         ttl = @options[:ttl] || @config.ttl
54:         request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
55:         request.reply_to = @options[:reply_to] if @options[:reply_to]
56:       end
57: 
58:       request.encode!
59:       subscribe(agent, :reply) unless request.reply_to
60:       request
61:     end
disconnect() click to toggle source

Disconnects cleanly from the middleware

    # File lib/mcollective/client.rb, line 32
32:     def disconnect
33:       Log.debug("Disconnecting from the middleware")
34:       @connection.disconnect
35:     end
discover(filter, timeout, limit=0) click to toggle source

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts

     # File lib/mcollective/client.rb, line 117
117:     def discover(filter, timeout, limit=0)
118:       discovered = @discoverer.discover(filter, timeout, limit)
119:     end
discovered_req(body, agent, options=false) click to toggle source

(Not documented)

     # File lib/mcollective/client.rb, line 233
233:     def discovered_req(body, agent, options=false)
234:       raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
235:     end
display_stats(stats, options=false, caption="stomp call summary") click to toggle source

Prints out the stats returns from req and discovered_req in a nice way

     # File lib/mcollective/client.rb, line 238
238:     def display_stats(stats, options=false, caption="stomp call summary")
239:       options = @options unless options
240: 
241:       if options[:verbose]
242:         puts("\n---- #{caption} ----")
243: 
244:         if stats[:discovered]
245:           puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
246:         else
247:           puts("           Nodes: #{stats[:responses]}")
248:         end
249: 
250:         printf("      Start Time: %s\n", Time.at(stats[:starttime]))
251:         printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
252:         printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
253:         printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
254: 
255:       else
256:         if stats[:discovered]
257:           printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
258:         else
259:           printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
260:         end
261:       end
262: 
263:       if stats[:noresponsefrom].size > 0
264:         puts("\nNo response from:\n")
265: 
266:         stats[:noresponsefrom].each do |c|
267:           puts if c % 4 == 1
268:           printf("%30s", c)
269:         end
270: 
271:         puts
272:       end
273:     end
receive(requestid = nil) click to toggle source

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.

     # File lib/mcollective/client.rb, line 88
 88:     def receive(requestid = nil)
 89:       reply = nil
 90: 
 91:       begin
 92:         reply = @connection.receive
 93:         reply.type = :reply
 94:         reply.expected_msgid = requestid
 95: 
 96:         reply.decode!
 97:         unless reply.requestid == requestid
 98:           raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
 99:         end
100:       rescue SecurityValidationFailed => e
101:         Log.warn("Ignoring a message that did not pass security validations")
102:         retry
103:       rescue MsgDoesNotMatchRequestID => e
104:         Log.debug("Ignoring a message for some other client : #{e.message}")
105:         retry
106:       end
107: 
108:       reply
109:     end
req(body, agent=nil, options=false, waitfor=0, &block) click to toggle source

Send a request, performs the passed block for each response

times = req(“status”, “mcollectived”, options, client) {|resp|

  pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

     # File lib/mcollective/client.rb, line 129
129:     def req(body, agent=nil, options=false, waitfor=0, &block)
130:       if body.is_a?(Message)
131:         agent = body.agent
132:         waitfor = body.discovered_hosts.size || 0
133:         @options = body.options
134:       end
135: 
136:       @options = options if options
137:       threaded = @options[:threaded]
138:       timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
139:       request = createreq(body, agent, @options[:filter])
140:       publish_timeout = @options[:publish_timeout]
141:       stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
142:       STDOUT.sync = true
143:       hosts_responded = 0
144: 
145: 
146:       begin
147:         if threaded
148:           hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
149:         else
150:           hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
151:         end
152:       rescue Interrupt => e
153:       ensure
154:         unsubscribe(agent, :reply)
155:       end
156: 
157:       return update_stat(stat, hosts_responded, request.requestid)
158:     end
sendreq(msg, agent, filter = {}) click to toggle source

Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses

    # File lib/mcollective/client.rb, line 39
39:     def sendreq(msg, agent, filter = {})
40:       request = createreq(msg, agent, filter)
41: 
42:       Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
43: 
44:       request.publish
45:       request.requestid
46:     end
start_publisher(request, publish_timeout) click to toggle source

Starts the request publishing routine

     # File lib/mcollective/client.rb, line 191
191:     def start_publisher(request, publish_timeout)
192:       Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
193:       begin
194:         Timeout.timeout(publish_timeout) do
195:           Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
196:           request.publish
197:         end
198:       rescue Timeout::Error => e
199:         Log.warn("Could not publish all messages. Publishing timed out.")
200:       end
201:     end
start_receiver(requestid, waitfor, timeout, &block) click to toggle source

Starts the response receiver routine Expected to return the amount of received responses.

     # File lib/mcollective/client.rb, line 205
205:     def start_receiver(requestid, waitfor, timeout, &block)
206:       Log.debug("Starting response receiver with timeout of #{timeout}")
207:       hosts_responded = 0
208:       begin
209:         Timeout.timeout(timeout) do
210:           begin
211:             resp = receive(requestid)
212:             yield resp.payload
213:             hosts_responded += 1
214:           end while (waitfor == 0 || hosts_responded < waitfor)
215:         end
216:       rescue Timeout::Error => e
217:         Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
218:       end
219: 
220:       hosts_responded
221:     end
subscribe(agent, type) click to toggle source

(Not documented)

    # File lib/mcollective/client.rb, line 63
63:     def subscribe(agent, type)
64:       unless @subscriptions.include?(agent)
65:         subscription = Util.make_subscriptions(agent, type, collective)
66:         Log.debug("Subscribing to #{type} target for agent #{agent}")
67: 
68:         Util.subscribe(subscription)
69:         @subscriptions[agent] = 1
70:       end
71:     end
threaded_req(request, publish_timeout, timeout, waitfor, &block) click to toggle source

Starts the client receiver and publisher in threads. This is activated when the ‘threader_client’ configuration option is set.

     # File lib/mcollective/client.rb, line 170
170:     def threaded_req(request, publish_timeout, timeout, waitfor, &block)
171:       Log.debug("Starting threaded client")
172:       publisher = Thread.new do
173:         start_publisher(request, publish_timeout)
174:       end
175: 
176:       # When the client is threaded we add the publishing timeout to
177:       # the agent timeout so that the receiver doesn't time out before
178:       # publishing has finished in cases where publish_timeout >= timeout.
179:       total_timeout = publish_timeout + timeout
180:       hosts_responded = 0
181: 
182:       receiver = Thread.new do
183:         hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
184:       end
185: 
186:       receiver.join
187:       hosts_responded
188:     end
unsubscribe(agent, type) click to toggle source

(Not documented)

    # File lib/mcollective/client.rb, line 73
73:     def unsubscribe(agent, type)
74:       if @subscriptions.include?(agent)
75:         subscription = Util.make_subscriptions(agent, type, collective)
76:         Log.debug("Unsubscribing #{type} target for #{agent}")
77: 
78:         Util.unsubscribe(subscription)
79:         @subscriptions.delete(agent)
80:       end
81:     end
unthreaded_req(request, publish_timeout, timeout, waitfor, &block) click to toggle source

Starts the client receiver and publisher unthreaded. This is the default client behaviour.

     # File lib/mcollective/client.rb, line 162
162:     def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
163:       start_publisher(request, publish_timeout)
164:       start_receiver(request.requestid, waitfor, timeout, &block)
165:     end
update_stat(stat, hosts_responded, requestid) click to toggle source

(Not documented)

     # File lib/mcollective/client.rb, line 223
223:     def update_stat(stat, hosts_responded, requestid)
224:       stat[:totaltime] = Time.now.to_f - stat[:starttime]
225:       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
226:       stat[:responses] = hosts_responded
227:       stat[:noresponsefrom] = []
228:       stat[:requestid] = requestid
229: 
230:       @stats = stat
231:     end

Disabled; run with --debug to generate this.

[Validate]

Generated with the Darkfish Rdoc Generator 1.1.6.