Helpers for writing clients that can talk to agents, do discovery and so forth
(Not documented)
# File lib/mcollective/client.rb, line 6 6: def initialize(configfile) 7: @config = Config.instance 8: @config.loadconfig(configfile) unless @config.configured 9: 10: @connection = PluginManager["connector_plugin"] 11: @security = PluginManager["security_plugin"] 12: 13: @security.initiated_by = :client 14: @options = nil 15: @subscriptions = {} 16: 17: @discoverer = Discovery.new(self) 18: @connection.connect 19: end
Returns the configured main collective if no specific collective is specified as options
# File lib/mcollective/client.rb, line 23 23: def collective 24: if @options[:collective].nil? 25: @config.main_collective 26: else 27: @options[:collective] 28: end 29: end
(Not documented)
# File lib/mcollective/client.rb, line 48 48: def createreq(msg, agent, filter ={}) 49: if msg.is_a?(Message) 50: request = msg 51: agent = request.agent 52: else 53: ttl = @options[:ttl] || @config.ttl 54: request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl}) 55: request.reply_to = @options[:reply_to] if @options[:reply_to] 56: end 57: 58: request.encode! 59: subscribe(agent, :reply) unless request.reply_to 60: request 61: end
Disconnects cleanly from the middleware
# File lib/mcollective/client.rb, line 32 32: def disconnect 33: Log.debug("Disconnecting from the middleware") 34: @connection.disconnect 35: end
Performs a discovery of nodes matching the filter passed returns an array of nodes
An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts
# File lib/mcollective/client.rb, line 117 117: def discover(filter, timeout, limit=0) 118: discovered = @discoverer.discover(filter, timeout, limit) 119: end
(Not documented)
# File lib/mcollective/client.rb, line 233 233: def discovered_req(body, agent, options=false) 234: raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework" 235: end
Prints out the stats returns from req and discovered_req in a nice way
# File lib/mcollective/client.rb, line 238 238: def display_stats(stats, options=false, caption="stomp call summary") 239: options = @options unless options 240: 241: if options[:verbose] 242: puts("\n---- #{caption} ----") 243: 244: if stats[:discovered] 245: puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}") 246: else 247: puts(" Nodes: #{stats[:responses]}") 248: end 249: 250: printf(" Start Time: %s\n", Time.at(stats[:starttime])) 251: printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000) 252: printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000) 253: printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000) 254: 255: else 256: if stats[:discovered] 257: printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000) 258: else 259: printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000) 260: end 261: end 262: 263: if stats[:noresponsefrom].size > 0 264: puts("\nNo response from:\n") 265: 266: stats[:noresponsefrom].each do |c| 267: puts if c % 4 == 1 268: printf("%30s", c) 269: end 270: 271: puts 272: end 273: end
Blocking call that waits for ever for a message to arrive.
If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.
# File lib/mcollective/client.rb, line 88 88: def receive(requestid = nil) 89: reply = nil 90: 91: begin 92: reply = @connection.receive 93: reply.type = :reply 94: reply.expected_msgid = requestid 95: 96: reply.decode! 97: unless reply.requestid == requestid 98: raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}") 99: end 100: rescue SecurityValidationFailed => e 101: Log.warn("Ignoring a message that did not pass security validations") 102: retry 103: rescue MsgDoesNotMatchRequestID => e 104: Log.debug("Ignoring a message for some other client : #{e.message}") 105: retry 106: end 107: 108: reply 109: end
Send a request, performs the passed block for each response
times = req(“status”, “mcollectived”, options, client) {|resp|
pp resp
}
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb, line 129 129: def req(body, agent=nil, options=false, waitfor=0, &block) 130: if body.is_a?(Message) 131: agent = body.agent 132: waitfor = body.discovered_hosts.size || 0 133: @options = body.options 134: end 135: 136: @options = options if options 137: threaded = @options[:threaded] 138: timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter]) 139: request = createreq(body, agent, @options[:filter]) 140: publish_timeout = @options[:publish_timeout] 141: stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 142: STDOUT.sync = true 143: hosts_responded = 0 144: 145: 146: begin 147: if threaded 148: hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block) 149: else 150: hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block) 151: end 152: rescue Interrupt => e 153: ensure 154: unsubscribe(agent, :reply) 155: end 156: 157: return update_stat(stat, hosts_responded, request.requestid) 158: end
Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses
# File lib/mcollective/client.rb, line 39 39: def sendreq(msg, agent, filter = {}) 40: request = createreq(msg, agent, filter) 41: 42: Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}") 43: 44: request.publish 45: request.requestid 46: end
Starts the request publishing routine
# File lib/mcollective/client.rb, line 191 191: def start_publisher(request, publish_timeout) 192: Log.debug("Starting publishing with publish timeout of #{publish_timeout}") 193: begin 194: Timeout.timeout(publish_timeout) do 195: Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}") 196: request.publish 197: end 198: rescue Timeout::Error => e 199: Log.warn("Could not publish all messages. Publishing timed out.") 200: end 201: end
Starts the response receiver routine Expected to return the amount of received responses.
# File lib/mcollective/client.rb, line 205 205: def start_receiver(requestid, waitfor, timeout, &block) 206: Log.debug("Starting response receiver with timeout of #{timeout}") 207: hosts_responded = 0 208: begin 209: Timeout.timeout(timeout) do 210: begin 211: resp = receive(requestid) 212: yield resp.payload 213: hosts_responded += 1 214: end while (waitfor == 0 || hosts_responded < waitfor) 215: end 216: rescue Timeout::Error => e 217: Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}") 218: end 219: 220: hosts_responded 221: end
(Not documented)
# File lib/mcollective/client.rb, line 63 63: def subscribe(agent, type) 64: unless @subscriptions.include?(agent) 65: subscription = Util.make_subscriptions(agent, type, collective) 66: Log.debug("Subscribing to #{type} target for agent #{agent}") 67: 68: Util.subscribe(subscription) 69: @subscriptions[agent] = 1 70: end 71: end
Starts the client receiver and publisher in threads. This is activated when the ‘threader_client’ configuration option is set.
# File lib/mcollective/client.rb, line 170 170: def threaded_req(request, publish_timeout, timeout, waitfor, &block) 171: Log.debug("Starting threaded client") 172: publisher = Thread.new do 173: start_publisher(request, publish_timeout) 174: end 175: 176: # When the client is threaded we add the publishing timeout to 177: # the agent timeout so that the receiver doesn't time out before 178: # publishing has finished in cases where publish_timeout >= timeout. 179: total_timeout = publish_timeout + timeout 180: hosts_responded = 0 181: 182: receiver = Thread.new do 183: hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block) 184: end 185: 186: receiver.join 187: hosts_responded 188: end
(Not documented)
# File lib/mcollective/client.rb, line 73 73: def unsubscribe(agent, type) 74: if @subscriptions.include?(agent) 75: subscription = Util.make_subscriptions(agent, type, collective) 76: Log.debug("Unsubscribing #{type} target for #{agent}") 77: 78: Util.unsubscribe(subscription) 79: @subscriptions.delete(agent) 80: end 81: end
Starts the client receiver and publisher unthreaded. This is the default client behaviour.
# File lib/mcollective/client.rb, line 162 162: def unthreaded_req(request, publish_timeout, timeout, waitfor, &block) 163: start_publisher(request, publish_timeout) 164: start_receiver(request.requestid, waitfor, timeout, &block) 165: end
(Not documented)
# File lib/mcollective/client.rb, line 223 223: def update_stat(stat, hosts_responded, requestid) 224: stat[:totaltime] = Time.now.to_f - stat[:starttime] 225: stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 226: stat[:responses] = hosts_responded 227: stat[:noresponsefrom] = [] 228: stat[:requestid] = requestid 229: 230: @stats = stat 231: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.