4334793ddea5cac1a7e6c13c2cad01d2c23bd84f
[packages/precise/mcollective.git] / lib / mcollective / vendor / systemu / lib / systemu.rb
1 # encoding: utf-8
2
3 require 'tmpdir'
4 require 'socket'
5 require 'fileutils'
6 require 'rbconfig'
7 require 'thread'
8
9 class Object
10   def systemu(*a, &b) SystemUniversal.new(*a, &b).systemu end
11 end
12
13 class SystemUniversal
14 #
15 # constants
16 #
17   SystemUniversal::VERSION = '2.5.2' unless SystemUniversal.send(:const_defined?, :VERSION)
18   def SystemUniversal.version() SystemUniversal::VERSION end
19   def version() SystemUniversal::VERSION end
20 #
21 # class methods
22 #
23
24   @host = Socket.gethostname
25   @ppid = Process.ppid
26   @pid = Process.pid
27   @turd = ENV['SYSTEMU_TURD']
28
29   c = begin; ::RbConfig::CONFIG; rescue NameError; ::Config::CONFIG; end
30   ruby = File.join(c['bindir'], c['ruby_install_name']) << c['EXEEXT']
31   @ruby = if system('%s -e 42' % ruby)
32     ruby
33   else
34     system('%s -e 42' % 'ruby') ? 'ruby' : warn('no ruby in PATH/CONFIG')
35   end
36
37   class << SystemUniversal
38     %w( host ppid pid ruby turd ).each{|a| attr_accessor a}
39
40     def quote(*words)
41       words.map{|word| word.inspect}.join(' ')
42     end
43   end
44
45 #
46 # instance methods
47 #
48
49   def initialize argv, opts = {}, &block
50     getopt = getopts opts
51
52     @argv = argv
53     @block = block
54
55     @stdin = getopt[ ['stdin', 'in', '0', 0] ]
56     @stdout = getopt[ ['stdout', 'out', '1', 1] ]
57     @stderr = getopt[ ['stderr', 'err', '2', 2] ]
58     @env = getopt[ 'env' ]
59     @cwd = getopt[ 'cwd' ]
60
61     @host = getopt[ 'host', self.class.host ]
62     @ppid = getopt[ 'ppid', self.class.ppid ]
63     @pid = getopt[ 'pid', self.class.pid ]
64     @ruby = getopt[ 'ruby', self.class.ruby ]
65   end
66
67   def systemu
68     tmpdir do |tmp|
69       c = child_setup tmp
70       status = nil
71
72       begin
73         thread = nil
74
75         quietly{
76           IO.popen "#{ quote(@ruby) } #{ quote(c['program']) }", 'r+' do |pipe|
77             line = pipe.gets
78             case line
79               when %r/^pid: \d+$/
80                 cid = Integer line[%r/\d+/]
81               else
82                 begin
83                   buf = pipe.read
84                   buf = "#{ line }#{ buf }"
85                   e = Marshal.load buf
86                   raise unless Exception === e
87                   raise e
88                 rescue
89                   raise "systemu: Error - process interrupted!\n#{ buf }\n"
90                 end
91             end
92             thread = new_thread cid, @block if @block
93             pipe.read rescue nil
94           end
95         }
96         status = $?
97       ensure
98         if thread
99           begin
100             class << status
101               attr 'thread'
102             end
103             status.instance_eval{ @thread = thread }
104           rescue
105             42
106           end
107         end
108       end
109
110       if @stdout or @stderr
111         open(c['stdout']){|f| relay f => @stdout} if @stdout
112         open(c['stderr']){|f| relay f => @stderr} if @stderr
113         status
114       else
115         [status, IO.read(c['stdout']), IO.read(c['stderr'])]
116       end
117     end
118   end
119
120   def quote *args, &block
121     SystemUniversal.quote(*args, &block)
122   end
123
124   def new_thread cid, block
125     q = Queue.new
126     Thread.new(cid) do |cid|
127       current = Thread.current
128       current.abort_on_exception = true
129       q.push current
130       block.call cid
131     end
132     q.pop
133   end
134
135   def child_setup tmp
136     stdin = File.expand_path(File.join(tmp, 'stdin'))
137     stdout = File.expand_path(File.join(tmp, 'stdout'))
138     stderr = File.expand_path(File.join(tmp, 'stderr'))
139     program = File.expand_path(File.join(tmp, 'program'))
140     config = File.expand_path(File.join(tmp, 'config'))
141
142     if @stdin
143       open(stdin, 'w'){|f| relay @stdin => f}
144     else
145       FileUtils.touch stdin
146     end
147     FileUtils.touch stdout
148     FileUtils.touch stderr
149
150     c = {}
151     c['argv'] = @argv
152     c['env'] = @env
153     c['cwd'] = @cwd
154     c['stdin'] = stdin
155     c['stdout'] = stdout
156     c['stderr'] = stderr
157     c['program'] = program
158     open(config, 'w'){|f| Marshal.dump(c, f)}
159
160     open(program, 'w'){|f| f.write child_program(config)}
161
162     c
163   end
164
165   def quietly
166     v = $VERBOSE
167     $VERBOSE = nil
168     yield
169   ensure
170     $VERBOSE = v
171   end
172
173   def child_program config
174     <<-program
175       # encoding: utf-8
176
177       PIPE = STDOUT.dup
178       begin
179         config = Marshal.load(IO.read('#{ config }'))
180
181         argv = config['argv']
182         env = config['env']
183         cwd = config['cwd']
184         stdin = config['stdin']
185         stdout = config['stdout']
186         stderr = config['stderr']
187
188         Dir.chdir cwd if cwd
189         env.each{|k,v| ENV[k.to_s] = v.to_s} if env
190
191         STDIN.reopen stdin
192         STDOUT.reopen stdout
193         STDERR.reopen stderr
194
195         PIPE.puts "pid: \#{ Process.pid }"
196         PIPE.flush                        ### the process is ready yo!
197         PIPE.close
198
199         exec *argv
200       rescue Exception => e
201         PIPE.write Marshal.dump(e) rescue nil
202         exit 42
203       end
204     program
205   end
206
207   def relay srcdst
208     src, dst, ignored = srcdst.to_a.first
209     if src.respond_to? 'read'
210       while((buf = src.read(8192))); dst << buf; end
211     else
212       if src.respond_to?(:each_line)
213         src.each_line{|buf| dst << buf}
214       else
215         src.each{|buf| dst << buf}
216       end
217     end
218   end
219
220   def tmpdir d = Dir.tmpdir, max = 42, &b
221     i = -1 and loop{
222       i += 1
223
224       tmp = File.join d, "systemu_#{ @host }_#{ @ppid }_#{ @pid }_#{ rand }_#{ i += 1 }"
225
226       begin
227         Dir.mkdir tmp
228       rescue Errno::EEXIST
229         raise if i >= max
230         next
231       end
232
233       break(
234         if b
235           begin
236             b.call tmp
237           ensure
238             FileUtils.rm_rf tmp unless SystemU.turd
239           end
240         else
241           tmp
242         end
243       )
244     }
245   end
246
247   def getopts opts = {}
248     lambda do |*args|
249       keys, default, ignored = args
250       catch(:opt) do
251         [keys].flatten.each do |key|
252           [key, key.to_s, key.to_s.intern].each do |key|
253             throw :opt, opts[key] if opts.has_key?(key)
254           end
255         end
256         default
257       end
258     end
259   end
260 end
261
262 # some monkeypatching for JRuby
263 if defined? JRUBY_VERSION
264   require 'jruby'
265   java_import org.jruby.RubyProcess
266
267   class SystemUniversal
268     def systemu
269       split_argv = JRuby::PathHelper.smart_split_command @argv
270       process = java.lang.Runtime.runtime.exec split_argv.to_java(:string)
271
272       stdout, stderr = [process.input_stream, process.error_stream].map do |stream|
273         StreamReader.new(stream)
274       end
275
276       exit_code = process.wait_for
277       field = process.get_class.get_declared_field("pid")
278       field.set_accessible(true)
279       pid = field.get(process)
280       [
281         RubyProcess::RubyStatus.new_process_status(JRuby.runtime, exit_code, pid),
282         stdout.join,
283         stderr.join
284       ]
285     end
286
287     class StreamReader
288       def initialize(stream)
289         @data = ""
290         @thread = Thread.new do
291           reader = java.io.BufferedReader.new java.io.InputStreamReader.new(stream)
292
293           while line = reader.read_line
294             @data << line << "\n"
295           end
296         end
297       end
298
299       def join
300         @thread.join
301         @data
302       end
303     end
304   end
305 end
306
307
308
309 SystemU = SystemUniversal unless defined? SystemU
310 Systemu = SystemUniversal unless defined? Systemu
311
312
313
314
315
316
317
318
319
320
321
322
323
324 if $0 == __FILE__
325 #
326 # date
327 #
328   date = %q( ruby -e"  t = Time.now; STDOUT.puts t; STDERR.puts t  " )
329
330   status, stdout, stderr = systemu date
331   p [status, stdout, stderr]
332
333   status = systemu date, 1=>(stdout = '')
334   p [status, stdout]
335
336   status = systemu date, 2=>(stderr = '')
337   p [status, stderr]
338 #
339 # sleep
340 #
341   sleep = %q( ruby -e"  p(sleep(1))  " )
342   status, stdout, stderr = systemu sleep
343   p [status, stdout, stderr]
344
345   sleep = %q( ruby -e"  p(sleep(42))  " )
346   status, stdout, stderr = systemu(sleep){|cid| Process.kill 9, cid}
347   p [status, stdout, stderr]
348 #
349 # env
350 #
351   env = %q( ruby -e"  p ENV['A']  " )
352   status, stdout, stderr = systemu env, :env => {'A' => 42}
353   p [status, stdout, stderr]
354 #
355 # cwd
356 #
357   env = %q( ruby -e"  p Dir.pwd  " )
358   status, stdout, stderr = systemu env, :cwd => Dir.tmpdir
359   p [status, stdout, stderr]
360 end