X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=lib%2Fmcollective%2Frpc%2Fagent.rb;fp=lib%2Fmcollective%2Frpc%2Fagent.rb;h=d2cf043d40c3bd5409440534842133b730b4b1ea;hb=b87d2f4e68281062df1913440ca5753ae63314a9;hp=0000000000000000000000000000000000000000;hpb=ab0ea530b8ac956091f17b104ab2311336cfc250;p=packages%2Fprecise%2Fmcollective.git diff --git a/lib/mcollective/rpc/agent.rb b/lib/mcollective/rpc/agent.rb new file mode 100644 index 0000000..d2cf043 --- /dev/null +++ b/lib/mcollective/rpc/agent.rb @@ -0,0 +1,375 @@ +module MCollective + module RPC + # A wrapper around the traditional agent, it takes care of a lot of the tedious setup + # you would do for each agent allowing you to just create methods following a naming + # standard leaving the heavy lifting up to this clas. + # + # See http://marionette-collective.org/simplerpc/agents.html + # + # It only really makes sense to use this with a Simple RPC client on the other end, basic + # usage would be: + # + # module MCollective + # module Agent + # class Helloworld e + DDL.validation_fail!(:PLMC24, "Failed to load DDL for the '%{agent}' agent, DDLs are required: %{error_class}: %{error}", :error, :agent => @agent_name, :error_class => e.class, :error => e.to_s) + end + + def handlemsg(msg, connection) + @request = RPC::Request.new(msg, @ddl) + @reply = RPC::Reply.new(@request.action, @ddl) + + begin + # Incoming requests need to be validated against the DDL thus reusing + # all the work users put into creating DDLs and creating a consistant + # quality of input validation everywhere with the a simple once off + # investment of writing a DDL + @request.validate! + + # Calls the authorization plugin if any is defined + # if this raises an exception we wil just skip processing this + # message + authorization_hook(@request) if respond_to?("authorization_hook") + + # Audits the request, currently continues processing the message + # we should make this a configurable so that an audit failure means + # a message wont be processed by this node depending on config + audit_request(@request, connection) + + before_processing_hook(msg, connection) + + if respond_to?("#{@request.action}_action") + send("#{@request.action}_action") + else + log_code(:PLMC36, "Unknown action '%{action}' for agent '%{agent}'", :warn, :action => @request.action, :agent => @request.agent) + raise UnknownRPCAction, "Unknown action '#{@request.action}' for agent '#{@request.agent}'" + end + rescue RPCAborted => e + @reply.fail e.to_s, 1 + + rescue UnknownRPCAction => e + @reply.fail e.to_s, 2 + + rescue MissingRPCData => e + @reply.fail e.to_s, 3 + + rescue InvalidRPCData, DDLValidationError => e + @reply.fail e.to_s, 4 + + rescue UnknownRPCError => e + Log.error("%s#%s failed: %s: %s" % [@agent_name, @request.action, e.class, e.to_s]) + Log.error(e.backtrace.join("\n\t")) + @reply.fail e.to_s, 5 + + rescue Exception => e + Log.error("%s#%s failed: %s: %s" % [@agent_name, @request.action, e.class, e.to_s]) + Log.error(e.backtrace.join("\n\t")) + @reply.fail e.to_s, 5 + + end + + after_processing_hook + + if @request.should_respond? + return @reply.to_hash + else + log_code(:PLMC35, "Client did not request a response, surpressing reply", :debug) + return nil + end + end + + # By default RPC Agents support a toggle in the configuration that + # can enable and disable them based on the agent name + # + # Example an agent called Foo can have: + # + # plugin.foo.activate_agent = false + # + # and this will prevent the agent from loading on this particular + # machine. + # + # Agents can use the activate_when helper to override this for example: + # + # activate_when do + # File.exist?("/usr/bin/puppet") + # end + def self.activate? + agent_name = self.to_s.split("::").last.downcase + + log_code(:PLMC37, "Starting default activation checks for the '%{agent}' agent", :debug, :agent => agent_name) + + should_activate = Config.instance.pluginconf["#{agent_name}.activate_agent"] + + if should_activate + log_code(:PLMC38, "Found plugin configuration '%{agent}.activate_agent' with value '%{should_activate}'", :debug, :agent => agent_name, :should_activate => should_activate) + + unless should_activate =~ /^1|y|true$/ + return false + end + end + + return true + end + + # Returns an array of actions this agent support + def self.actions + public_instance_methods.sort.grep(/_action$/).map do |method| + $1 if method =~ /(.+)_action$/ + end + end + + private + # Runs a command via the MC::Shell wrapper, options are as per MC::Shell + # + # The simplest use is: + # + # out = "" + # err = "" + # status = run("echo 1", :stdout => out, :stderr => err) + # + # reply[:out] = out + # reply[:error] = err + # reply[:exitstatus] = status + # + # This can be simplified as: + # + # reply[:exitstatus] = run("echo 1", :stdout => :out, :stderr => :error) + # + # You can set a command specific environment and cwd: + # + # run("echo 1", :cwd => "/tmp", :environment => {"FOO" => "BAR"}) + # + # This will run 'echo 1' from /tmp with FOO=BAR in addition to a setting forcing + # LC_ALL = C. To prevent LC_ALL from being set either set it specifically or: + # + # run("echo 1", :cwd => "/tmp", :environment => nil) + # + # Exceptions here will be handled by the usual agent exception handler or any + # specific one you create, if you dont it will just fall through and be sent + # to the client. + # + # If the shell handler fails to return a Process::Status instance for exit + # status this method will return -1 as the exit status + def run(command, options={}) + shellopts = {} + + # force stderr and stdout to be strings as the library + # will append data to them if given using the << method. + # + # if the data pased to :stderr or :stdin is a Symbol + # add that into the reply hash with that Symbol + [:stderr, :stdout].each do |k| + if options.include?(k) + if options[k].is_a?(Symbol) + reply[ options[k] ] = "" + shellopts[k] = reply[ options[k] ] + else + if options[k].respond_to?("<<") + shellopts[k] = options[k] + else + reply.fail! "#{k} should support << while calling run(#{command})" + end + end + end + end + + [:stdin, :cwd, :environment].each do |k| + if options.include?(k) + shellopts[k] = options[k] + end + end + + shell = Shell.new(command, shellopts) + + shell.runcommand + + if options[:chomp] + shellopts[:stdout].chomp! if shellopts[:stdout].is_a?(String) + shellopts[:stderr].chomp! if shellopts[:stderr].is_a?(String) + end + + shell.status.exitstatus rescue -1 + end + + # Registers meta data for the introspection hash + def self.metadata(data) + agent = File.basename(caller.first).split(":").first + + log_code(:PLMC34, "setting meta data in agents have been deprecated, DDL files are now being used for this information. Please update the '%{agent}' agent", :warn, :agent => agent) + end + + # Creates the needed activate? class in a manner similar to the other + # helpers like action, authorized_by etc + # + # activate_when do + # File.exist?("/usr/bin/puppet") + # end + def self.activate_when(&block) + (class << self; self; end).instance_eval do + define_method("activate?", &block) + end + end + + # Creates a new action with the block passed and sets some defaults + # + # action "status" do + # # logic here to restart service + # end + def self.action(name, &block) + raise "Need to pass a body for the action" unless block_given? + + self.module_eval { define_method("#{name}_action", &block) } + end + + # Helper that creates a method on the class that will call your authorization + # plugin. If your plugin raises an exception that will abort the request + def self.authorized_by(plugin) + plugin = plugin.to_s.capitalize + + # turns foo_bar into FooBar + plugin = plugin.to_s.split("_").map {|v| v.capitalize}.join + pluginname = "MCollective::Util::#{plugin}" + + PluginManager.loadclass(pluginname) unless MCollective::Util.constants.include?(plugin) + + class_eval(" + def authorization_hook(request) + #{pluginname}.authorize(request) + end + ") + end + + # Validates a data member, if validation is a regex then it will try to match it + # else it supports testing object types only: + # + # validate :msg, String + # validate :msg, /^[\w\s]+$/ + # + # There are also some special helper validators: + # + # validate :command, :shellsafe + # validate :command, :ipv6address + # validate :command, :ipv4address + # validate :command, :boolean + # validate :command, ["start", "stop"] + # + # It will raise appropriate exceptions that the RPC system understand + def validate(key, validation) + raise MissingRPCData, "please supply a #{key} argument" unless @request.include?(key) + + Validator.validate(@request[key], validation) + rescue ValidatorError => e + raise InvalidRPCData, "Input %s did not pass validation: %s" % [ key, e.message ] + end + + # convenience wrapper around Util#shellescape + def shellescape(str) + Util.shellescape(str) + end + + # handles external actions + def implemented_by(command, type=:json) + runner = ActionRunner.new(command, request, type) + + res = runner.run + + reply.fail! "Did not receive data from #{command}" unless res.include?(:data) + reply.fail! "Reply data from #{command} is not a Hash" unless res[:data].is_a?(Hash) + + reply.data.merge!(res[:data]) + + if res[:exitstatus] > 0 + reply.fail "Failed to run #{command}: #{res[:stderr]}", res[:exitstatus] + end + rescue Exception => e + Log.warn("Unhandled #{e.class} exception during #{request.agent}##{request.action}: #{e}") + reply.fail! "Unexpected failure calling #{command}: #{e.class}: #{e}" + end + + # Called at the end of the RPC::Agent standard initialize method + # use this to adjust meta parameters, timeouts and any setup you + # need to do. + # + # This will not be called right when the daemon starts up, we use + # lazy loading and initialization so it will only be called the first + # time a request for this agent arrives. + def startup_hook + end + + # Called just after a message was received from the middleware before + # it gets passed to the handlers. @request and @reply will already be + # set, the msg passed is the message as received from the normal + # mcollective runner and the connection is the actual connector. + def before_processing_hook(msg, connection) + end + + # Called at the end of processing just before the response gets sent + # to the middleware. + # + # This gets run outside of the main exception handling block of the agent + # so you should handle any exceptions you could raise yourself. The reason + # it is outside of the block is so you'll have access to even status codes + # set by the exception handlers. If you do raise an exception it will just + # be passed onto the runner and processing will fail. + def after_processing_hook + end + + # Gets called right after a request was received and calls audit plugins + # + # Agents can disable auditing by just overriding this method with a noop one + # this might be useful for agents that gets a lot of requests or simply if you + # do not care for the auditing in a specific agent. + def audit_request(msg, connection) + PluginManager["rpcaudit_plugin"].audit_request(msg, connection) if @config.rpcaudit + rescue Exception => e + logexception(:PLMC39, "Audit failed with an error, processing the request will continue.", :warn, e) + end + end + end +end