Updated mcollective.init according to OSCI-658
[packages/precise/mcollective.git] / lib / mcollective / message.rb
diff --git a/lib/mcollective/message.rb b/lib/mcollective/message.rb
new file mode 100644 (file)
index 0000000..f9b5e56
--- /dev/null
@@ -0,0 +1,242 @@
+module MCollective
+  # container for a message, its headers, agent, collective and other meta data
+  class Message
+    attr_reader :message, :request, :validated, :msgtime, :payload, :type, :expected_msgid, :reply_to
+    attr_accessor :headers, :agent, :collective, :filter
+    attr_accessor :requestid, :discovered_hosts, :options, :ttl
+
+    VALIDTYPES = [:message, :request, :direct_request, :reply]
+
+    # payload                  - the message body without headers etc, just the text
+    # message                  - the original message received from the middleware
+    # options[:base64]         - if the body base64 encoded?
+    # options[:agent]          - the agent the message is for/from
+    # options[:collective]     - the collective its for/from
+    # options[:headers]        - the message headers
+    # options[:type]           - an indicator about the type of message, :message, :request, :direct_request or :reply
+    # options[:request]        - if this is a reply this should old the message we are replying to
+    # options[:filter]         - for requests, the filter to encode into the message
+    # options[:options]        - the normal client options hash
+    # options[:ttl]            - the maximum amount of seconds this message can be valid for
+    # options[:expected_msgid] - in the case of replies this is the msgid it is expecting in the replies
+    # options[:requestid]      - specific request id to use else one will be generated
+    def initialize(payload, message, options = {})
+      options = {:base64 => false,
+                 :agent => nil,
+                 :headers => {},
+                 :type => :message,
+                 :request => nil,
+                 :filter => Util.empty_filter,
+                 :options => {},
+                 :ttl => 60,
+                 :expected_msgid => nil,
+                 :requestid => nil,
+                 :collective => nil}.merge(options)
+
+      @payload = payload
+      @message = message
+      @requestid = options[:requestid]
+      @discovered_hosts = nil
+      @reply_to = nil
+
+      @type = options[:type]
+      @headers = options[:headers]
+      @base64 = options[:base64]
+      @filter = options[:filter]
+      @expected_msgid = options[:expected_msgid]
+      @options = options[:options]
+
+      @ttl = @options[:ttl] || Config.instance.ttl
+      @msgtime = 0
+
+      @validated = false
+
+      if options[:request]
+        @request = options[:request]
+        @agent = request.agent
+        @collective = request.collective
+        @type = :reply
+      else
+        @agent = options[:agent]
+        @collective = options[:collective]
+      end
+
+      base64_decode!
+    end
+
+    # Sets the message type to one of the known types.  In the case of :direct_request
+    # the list of hosts to communicate with should have been set with #discovered_hosts
+    # else an exception will be raised.  This is for extra security, we never accidentally
+    # want to send a direct request without a list of hosts or something weird like that
+    # as it might result in a filterless broadcast being sent.
+    #
+    # Additionally you simply cannot set :direct_request if direct_addressing was not enabled
+    # this is to force a workflow that doesnt not yield in a mistake when someone might assume
+    # direct_addressing is enabled when its not.
+    def type=(type)
+      raise "Unknown message type #{type}" unless VALIDTYPES.include?(type)
+
+      if type == :direct_request
+        raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing
+
+        unless @discovered_hosts && !@discovered_hosts.empty?
+          raise "Can only set type to :direct_request if discovered_hosts have been set"
+        end
+
+        # clear out the filter, custom discovery sources might interpret the filters
+        # different than the remote mcollectived and in directed mode really the only
+        # filter that matters is the agent filter
+        @filter = Util.empty_filter
+        @filter["agent"] << @agent
+      end
+
+      @type = type
+    end
+
+    # Sets a custom reply-to target for requests.  The connector plugin should inspect this
+    # when constructing requests and set this header ensuring replies will go to the custom target
+    # otherwise the connector should just do what it usually does
+    def reply_to=(target)
+      raise "Custom reply targets can only be set on requests" unless [:request, :direct_request].include?(@type)
+
+      @reply_to = target
+    end
+
+    # in the case of reply messages we are expecting replies to a previously
+    # created message.  This stores a hint to that previously sent message id
+    # and can be used by other classes like the security plugins as a means
+    # of optimizing their behavior like by ignoring messages not directed
+    # at us.
+    def expected_msgid=(msgid)
+      raise "Can only store the expected msgid for reply messages" unless @type == :reply
+      @expected_msgid = msgid
+    end
+
+    def base64_decode!
+      return unless @base64
+
+      @payload = SSL.base64_decode(@payload)
+      @base64 = false
+    end
+
+    def base64_encode!
+      return if @base64
+
+      @payload = SSL.base64_encode(@payload)
+      @base64 = true
+    end
+
+    def base64?
+      @base64
+    end
+
+    def encode!
+      case type
+        when :reply
+          raise "Cannot encode a reply message if no request has been associated with it" unless request
+          raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid])
+
+          @requestid = request.payload[:requestid]
+          @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid])
+        when :request, :direct_request
+          validate_compount_filter(@filter["compound"]) unless @filter["compound"].empty?
+
+          @requestid = create_reqid unless @requestid
+          @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl)
+        else
+          raise "Cannot encode #{type} messages"
+      end
+    end
+
+    def validate_compount_filter(compound_filter)
+      compound_filter.each do |filter|
+        filter.each do |statement|
+          if statement["fstatement"]
+            functionname = statement["fstatement"]["name"]
+            pluginname = Data.pluginname(functionname)
+            value = statement["fstatement"]["value"]
+
+            begin
+              ddl = DDL.new(pluginname, :data)
+            rescue
+              raise DDLValidationError, "Could not find DDL for data plugin #{pluginname}, cannot use #{functionname}() in discovery"
+            end
+
+            # parses numbers and booleans entered as strings into proper
+            # types of data so that DDL validation will pass
+            statement["fstatement"]["params"] = Data.ddl_transform_input(ddl, statement["fstatement"]["params"])
+
+            Data.ddl_validate(ddl, statement["fstatement"]["params"])
+
+            unless value && Data.ddl_has_output?(ddl, value)
+              raise DDLValidationError, "#{functionname}() does not return a #{value} value"
+            end
+          end
+        end
+      end
+    end
+
+    def decode!
+      raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type)
+
+      @payload = PluginManager["security_plugin"].decodemsg(self)
+
+      if type == :request
+        raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid])
+      end
+
+      [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop|
+        instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop)
+      end
+    end
+
+    # Perform validation against the message by checking filters and ttl
+    def validate
+      raise "Can only validate request messages" unless type == :request
+
+      msg_age = Time.now.utc.to_i - msgtime
+
+      if msg_age > ttl
+        cid = ""
+        cid += payload[:callerid] + "@" if payload.include?(:callerid)
+        cid += payload[:senderid]
+
+        if msg_age > ttl
+          PluginManager["global_stats"].ttlexpired
+
+          raise(MsgTTLExpired, "message #{requestid} from #{cid} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}")
+        end
+      end
+
+      raise(NotTargettedAtUs, "Received message is not targetted to us") unless PluginManager["security_plugin"].validate_filter?(payload[:filter])
+
+      @validated = true
+    end
+
+    # publish a reply message by creating a target name and sending it
+    def publish
+      Timeout.timeout(2) do
+        # If we've been specificaly told about hosts that were discovered
+        # use that information to do P2P calls if appropriate else just
+        # send it as is.
+        if @discovered_hosts && Config.instance.direct_addressing
+          if @discovered_hosts.size <= Config.instance.direct_addressing_threshold
+            self.type = :direct_request
+            Log.debug("Handling #{requestid} as a direct request")
+          end
+
+          PluginManager["connector_plugin"].publish(self)
+        else
+          PluginManager["connector_plugin"].publish(self)
+        end
+      end
+    end
+
+    def create_reqid
+      # we gsub out the -s so that the format of the id does not
+      # change from previous versions, these should just be more
+      # unique than previous ones
+      SSL.uuid.gsub("-", "")
+    end
+  end
+end