d2cf043d40c3bd5409440534842133b730b4b1ea
[packages/precise/mcollective.git] / lib / mcollective / rpc / agent.rb
1 module MCollective
2   module RPC
3     # A wrapper around the traditional agent, it takes care of a lot of the tedious setup
4     # you would do for each agent allowing you to just create methods following a naming
5     # standard leaving the heavy lifting up to this clas.
6     #
7     # See http://marionette-collective.org/simplerpc/agents.html
8     #
9     # It only really makes sense to use this with a Simple RPC client on the other end, basic
10     # usage would be:
11     #
12     #    module MCollective
13     #      module Agent
14     #        class Helloworld<RPC::Agent
15     #          action "hello" do
16     #            reply[:msg] = "Hello #{request[:name]}"
17     #          end
18     #
19     #          action "foo" do
20     #            implemented_by "/some/script.sh"
21     #          end
22     #        end
23     #      end
24     #    end
25     #
26     # If you wish to implement the logic for an action using an external script use the
27     # implemented_by method that will cause your script to be run with 2 arguments.
28     #
29     # The first argument is a file containing JSON with the request and the 2nd argument
30     # is where the script should save its output as a JSON hash.
31     #
32     # We also currently have the validation code in here, this will be moved to plugins soon.
33     class Agent
34       include Translatable
35       extend Translatable
36
37       attr_accessor :reply, :request, :agent_name
38       attr_reader :logger, :config, :timeout, :ddl, :meta
39
40       def initialize
41         @agent_name = self.class.to_s.split("::").last.downcase
42
43         load_ddl
44
45         @logger = Log.instance
46         @config = Config.instance
47
48         # if we have a global authorization provider enable it
49         # plugins can still override it per plugin
50         self.class.authorized_by(@config.rpcauthprovider) if @config.rpcauthorization
51
52         startup_hook
53       end
54
55       def load_ddl
56         @ddl = DDL.new(@agent_name, :agent)
57         @meta = @ddl.meta
58         @timeout = @meta[:timeout] || 10
59
60       rescue Exception => e
61         DDL.validation_fail!(:PLMC24, "Failed to load DDL for the '%{agent}' agent, DDLs are required: %{error_class}: %{error}", :error, :agent => @agent_name, :error_class => e.class, :error => e.to_s)
62       end
63
64       def handlemsg(msg, connection)
65         @request = RPC::Request.new(msg, @ddl)
66         @reply = RPC::Reply.new(@request.action, @ddl)
67
68         begin
69           # Incoming requests need to be validated against the DDL thus reusing
70           # all the work users put into creating DDLs and creating a consistant
71           # quality of input validation everywhere with the a simple once off
72           # investment of writing a DDL
73           @request.validate!
74
75           # Calls the authorization plugin if any is defined
76           # if this raises an exception we wil just skip processing this
77           # message
78           authorization_hook(@request) if respond_to?("authorization_hook")
79
80           # Audits the request, currently continues processing the message
81           # we should make this a configurable so that an audit failure means
82           # a message wont be processed by this node depending on config
83           audit_request(@request, connection)
84
85           before_processing_hook(msg, connection)
86
87           if respond_to?("#{@request.action}_action")
88             send("#{@request.action}_action")
89           else
90             log_code(:PLMC36, "Unknown action '%{action}' for agent '%{agent}'", :warn, :action => @request.action, :agent => @request.agent)
91             raise UnknownRPCAction, "Unknown action '#{@request.action}' for agent '#{@request.agent}'"
92           end
93         rescue RPCAborted => e
94           @reply.fail e.to_s, 1
95
96         rescue UnknownRPCAction => e
97           @reply.fail e.to_s, 2
98
99         rescue MissingRPCData => e
100           @reply.fail e.to_s, 3
101
102         rescue InvalidRPCData, DDLValidationError => e
103           @reply.fail e.to_s, 4
104
105         rescue UnknownRPCError => e
106           Log.error("%s#%s failed: %s: %s" % [@agent_name, @request.action, e.class, e.to_s])
107           Log.error(e.backtrace.join("\n\t"))
108           @reply.fail e.to_s, 5
109
110         rescue Exception => e
111           Log.error("%s#%s failed: %s: %s" % [@agent_name, @request.action, e.class, e.to_s])
112           Log.error(e.backtrace.join("\n\t"))
113           @reply.fail e.to_s, 5
114
115         end
116
117         after_processing_hook
118
119         if @request.should_respond?
120           return @reply.to_hash
121         else
122           log_code(:PLMC35, "Client did not request a response, surpressing reply", :debug)
123           return nil
124         end
125       end
126
127       # By default RPC Agents support a toggle in the configuration that
128       # can enable and disable them based on the agent name
129       #
130       # Example an agent called Foo can have:
131       #
132       # plugin.foo.activate_agent = false
133       #
134       # and this will prevent the agent from loading on this particular
135       # machine.
136       #
137       # Agents can use the activate_when helper to override this for example:
138       #
139       # activate_when do
140       #    File.exist?("/usr/bin/puppet")
141       # end
142       def self.activate?
143         agent_name = self.to_s.split("::").last.downcase
144
145         log_code(:PLMC37, "Starting default activation checks for the '%{agent}' agent", :debug, :agent => agent_name)
146
147         should_activate = Config.instance.pluginconf["#{agent_name}.activate_agent"]
148
149         if should_activate
150           log_code(:PLMC38, "Found plugin configuration '%{agent}.activate_agent' with value '%{should_activate}'", :debug, :agent => agent_name, :should_activate => should_activate)
151
152           unless should_activate =~ /^1|y|true$/
153             return false
154           end
155         end
156
157         return true
158       end
159
160       # Returns an array of actions this agent support
161       def self.actions
162         public_instance_methods.sort.grep(/_action$/).map do |method|
163           $1 if method =~ /(.+)_action$/
164         end
165       end
166
167       private
168       # Runs a command via the MC::Shell wrapper, options are as per MC::Shell
169       #
170       # The simplest use is:
171       #
172       #   out = ""
173       #   err = ""
174       #   status = run("echo 1", :stdout => out, :stderr => err)
175       #
176       #   reply[:out] = out
177       #   reply[:error] = err
178       #   reply[:exitstatus] = status
179       #
180       # This can be simplified as:
181       #
182       #   reply[:exitstatus] = run("echo 1", :stdout => :out, :stderr => :error)
183       #
184       # You can set a command specific environment and cwd:
185       #
186       #   run("echo 1", :cwd => "/tmp", :environment => {"FOO" => "BAR"})
187       #
188       # This will run 'echo 1' from /tmp with FOO=BAR in addition to a setting forcing
189       # LC_ALL = C.  To prevent LC_ALL from being set either set it specifically or:
190       #
191       #   run("echo 1", :cwd => "/tmp", :environment => nil)
192       #
193       # Exceptions here will be handled by the usual agent exception handler or any
194       # specific one you create, if you dont it will just fall through and be sent
195       # to the client.
196       #
197       # If the shell handler fails to return a Process::Status instance for exit
198       # status this method will return -1 as the exit status
199       def run(command, options={})
200         shellopts = {}
201
202         # force stderr and stdout to be strings as the library
203         # will append data to them if given using the << method.
204         #
205         # if the data pased to :stderr or :stdin is a Symbol
206         # add that into the reply hash with that Symbol
207         [:stderr, :stdout].each do |k|
208           if options.include?(k)
209             if options[k].is_a?(Symbol)
210               reply[ options[k] ] = ""
211               shellopts[k] = reply[ options[k] ]
212             else
213               if options[k].respond_to?("<<")
214                 shellopts[k] = options[k]
215               else
216                 reply.fail! "#{k} should support << while calling run(#{command})"
217               end
218             end
219           end
220         end
221
222         [:stdin, :cwd, :environment].each do |k|
223           if options.include?(k)
224             shellopts[k] = options[k]
225           end
226         end
227
228         shell = Shell.new(command, shellopts)
229
230         shell.runcommand
231
232         if options[:chomp]
233           shellopts[:stdout].chomp! if shellopts[:stdout].is_a?(String)
234           shellopts[:stderr].chomp! if shellopts[:stderr].is_a?(String)
235         end
236
237         shell.status.exitstatus rescue -1
238       end
239
240       # Registers meta data for the introspection hash
241       def self.metadata(data)
242         agent = File.basename(caller.first).split(":").first
243
244         log_code(:PLMC34, "setting meta data in agents have been deprecated, DDL files are now being used for this information. Please update the '%{agent}' agent", :warn,  :agent => agent)
245       end
246
247       # Creates the needed activate? class in a manner similar to the other
248       # helpers like action, authorized_by etc
249       #
250       # activate_when do
251       #    File.exist?("/usr/bin/puppet")
252       # end
253       def self.activate_when(&block)
254         (class << self; self; end).instance_eval do
255           define_method("activate?", &block)
256         end
257       end
258
259       # Creates a new action with the block passed and sets some defaults
260       #
261       # action "status" do
262       #    # logic here to restart service
263       # end
264       def self.action(name, &block)
265         raise "Need to pass a body for the action" unless block_given?
266
267         self.module_eval { define_method("#{name}_action", &block) }
268       end
269
270       # Helper that creates a method on the class that will call your authorization
271       # plugin.  If your plugin raises an exception that will abort the request
272       def self.authorized_by(plugin)
273         plugin = plugin.to_s.capitalize
274
275         # turns foo_bar into FooBar
276         plugin = plugin.to_s.split("_").map {|v| v.capitalize}.join
277         pluginname = "MCollective::Util::#{plugin}"
278
279         PluginManager.loadclass(pluginname) unless MCollective::Util.constants.include?(plugin)
280
281         class_eval("
282                       def authorization_hook(request)
283                    #{pluginname}.authorize(request)
284                       end
285                    ")
286       end
287
288       # Validates a data member, if validation is a regex then it will try to match it
289       # else it supports testing object types only:
290       #
291       # validate :msg, String
292       # validate :msg, /^[\w\s]+$/
293       #
294       # There are also some special helper validators:
295       #
296       # validate :command, :shellsafe
297       # validate :command, :ipv6address
298       # validate :command, :ipv4address
299       # validate :command, :boolean
300       # validate :command, ["start", "stop"]
301       #
302       # It will raise appropriate exceptions that the RPC system understand
303       def validate(key, validation)
304         raise MissingRPCData, "please supply a #{key} argument" unless @request.include?(key)
305
306         Validator.validate(@request[key], validation)
307       rescue ValidatorError => e
308         raise InvalidRPCData, "Input %s did not pass validation: %s" % [ key, e.message ]
309       end
310
311       # convenience wrapper around Util#shellescape
312       def shellescape(str)
313         Util.shellescape(str)
314       end
315
316       # handles external actions
317       def implemented_by(command, type=:json)
318         runner = ActionRunner.new(command, request, type)
319
320         res = runner.run
321
322         reply.fail! "Did not receive data from #{command}" unless res.include?(:data)
323         reply.fail! "Reply data from #{command} is not a Hash" unless res[:data].is_a?(Hash)
324
325         reply.data.merge!(res[:data])
326
327         if res[:exitstatus] > 0
328           reply.fail "Failed to run #{command}: #{res[:stderr]}", res[:exitstatus]
329         end
330       rescue Exception => e
331         Log.warn("Unhandled #{e.class} exception during #{request.agent}##{request.action}: #{e}")
332         reply.fail! "Unexpected failure calling #{command}: #{e.class}: #{e}"
333       end
334
335       # Called at the end of the RPC::Agent standard initialize method
336       # use this to adjust meta parameters, timeouts and any setup you
337       # need to do.
338       #
339       # This will not be called right when the daemon starts up, we use
340       # lazy loading and initialization so it will only be called the first
341       # time a request for this agent arrives.
342       def startup_hook
343       end
344
345       # Called just after a message was received from the middleware before
346       # it gets passed to the handlers.  @request and @reply will already be
347       # set, the msg passed is the message as received from the normal
348       # mcollective runner and the connection is the actual connector.
349       def before_processing_hook(msg, connection)
350       end
351
352       # Called at the end of processing just before the response gets sent
353       # to the middleware.
354       #
355       # This gets run outside of the main exception handling block of the agent
356       # so you should handle any exceptions you could raise yourself.  The reason
357       # it is outside of the block is so you'll have access to even status codes
358       # set by the exception handlers.  If you do raise an exception it will just
359       # be passed onto the runner and processing will fail.
360       def after_processing_hook
361       end
362
363       # Gets called right after a request was received and calls audit plugins
364       #
365       # Agents can disable auditing by just overriding this method with a noop one
366       # this might be useful for agents that gets a lot of requests or simply if you
367       # do not care for the auditing in a specific agent.
368       def audit_request(msg, connection)
369         PluginManager["rpcaudit_plugin"].audit_request(msg, connection) if @config.rpcaudit
370       rescue Exception => e
371         logexception(:PLMC39, "Audit failed with an error, processing the request will continue.", :warn, e)
372       end
373     end
374   end
375 end