container for a message, its headers, agent, collective and other meta data
payload - the message body without headers etc, just the text message - the original message received from the middleware options[:base64] - if the body base64 encoded? options[:agent] - the agent the message is for/from options[:collective] - the collective its for/from options[:headers] - the message headers options[:type] - an indicator about the type of message, :message, :request, :direct_request or :reply options[:request] - if this is a reply this should old the message we are replying to options[:filter] - for requests, the filter to encode into the message options[:options] - the normal client options hash options[:ttl] - the maximum amount of seconds this message can be valid for options[:expected_msgid] - in the case of replies this is the msgid it is expecting in the replies options[:requestid] - specific request id to use else one will be generated
# File lib/mcollective/message.rb, line 23 23: def initialize(payload, message, options = {}) 24: options = {:base64 => false, 25: :agent => nil, 26: :headers => {}, 27: :type => :message, 28: :request => nil, 29: :filter => Util.empty_filter, 30: :options => {}, 31: :ttl => 60, 32: :expected_msgid => nil, 33: :requestid => nil, 34: :collective => nil}.merge(options) 35: 36: @payload = payload 37: @message = message 38: @requestid = options[:requestid] 39: @discovered_hosts = nil 40: @reply_to = nil 41: 42: @type = options[:type] 43: @headers = options[:headers] 44: @base64 = options[:base64] 45: @filter = options[:filter] 46: @expected_msgid = options[:expected_msgid] 47: @options = options[:options] 48: 49: @ttl = @options[:ttl] || Config.instance.ttl 50: @msgtime = 0 51: 52: @validated = false 53: 54: if options[:request] 55: @request = options[:request] 56: @agent = request.agent 57: @collective = request.collective 58: @type = :reply 59: else 60: @agent = options[:agent] 61: @collective = options[:collective] 62: end 63: 64: base64_decode! 65: end
(Not documented)
# File lib/mcollective/message.rb, line 129 129: def base64? 130: @base64 131: end
(Not documented)
# File lib/mcollective/message.rb, line 115 115: def base64_decode! 116: return unless @base64 117: 118: @payload = SSL.base64_decode(@payload) 119: @base64 = false 120: end
(Not documented)
# File lib/mcollective/message.rb, line 122 122: def base64_encode! 123: return if @base64 124: 125: @payload = SSL.base64_encode(@payload) 126: @base64 = true 127: end
(Not documented)
# File lib/mcollective/message.rb, line 226 226: def create_reqid 227: # we gsub out the -s so that the format of the id does not 228: # change from previous versions, these should just be more 229: # unique than previous ones 230: SSL.uuid.gsub("-", "") 231: end
(Not documented)
# File lib/mcollective/message.rb, line 175 175: def decode! 176: raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type) 177: 178: @payload = PluginManager["security_plugin"].decodemsg(self) 179: 180: if type == :request 181: raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid]) 182: end 183: 184: [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop| 185: instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop) 186: end 187: end
(Not documented)
# File lib/mcollective/message.rb, line 133 133: def encode! 134: case type 135: when :reply 136: raise "Cannot encode a reply message if no request has been associated with it" unless request 137: raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid]) 138: 139: @requestid = request.payload[:requestid] 140: @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid]) 141: when :request, :direct_request 142: validate_compound_filter(@filter["compound"]) unless @filter["compound"].empty? 143: 144: @requestid = create_reqid unless @requestid 145: @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl) 146: else 147: raise "Cannot encode #{type} messages" 148: end 149: end
in the case of reply messages we are expecting replies to a previously created message. This stores a hint to that previously sent message id and can be used by other classes like the security plugins as a means of optimizing their behavior like by ignoring messages not directed at us.
# File lib/mcollective/message.rb, line 110 110: def expected_msgid=(msgid) 111: raise "Can only store the expected msgid for reply messages" unless @type == :reply 112: @expected_msgid = msgid 113: end
publish a reply message by creating a target name and sending it
# File lib/mcollective/message.rb, line 213 213: def publish 214: # If we've been specificaly told about hosts that were discovered 215: # use that information to do P2P calls if appropriate else just 216: # send it as is. 217: config = Config.instance 218: if @discovered_hosts && config.direct_addressing && (@discovered_hosts.size <= config.direct_addressing_threshold) 219: self.type = :direct_request 220: Log.debug("Handling #{requestid} as a direct request") 221: end 222: 223: PluginManager['connector_plugin'].publish(self) 224: end
Sets the message type to one of the known types. In the case of :direct_request the list of hosts to communicate with should have been set with discovered_hosts else an exception will be raised. This is for extra security, we never accidentally want to send a direct request without a list of hosts or something weird like that as it might result in a filterless broadcast being sent.
Additionally you simply cannot set :direct_request if direct_addressing was not enabled this is to force a workflow that doesnt not yield in a mistake when someone might assume direct_addressing is enabled when its not.
# File lib/mcollective/message.rb, line 76 76: def type=(type) 77: raise "Unknown message type #{type}" unless VALIDTYPES.include?(type) 78: 79: if type == :direct_request 80: raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing 81: 82: unless @discovered_hosts && !@discovered_hosts.empty? 83: raise "Can only set type to :direct_request if discovered_hosts have been set" 84: end 85: 86: # clear out the filter, custom discovery sources might interpret the filters 87: # different than the remote mcollectived and in directed mode really the only 88: # filter that matters is the agent filter 89: @filter = Util.empty_filter 90: @filter["agent"] << @agent 91: end 92: 93: @type = type 94: end
Perform validation against the message by checking filters and ttl
# File lib/mcollective/message.rb, line 190 190: def validate 191: raise "Can only validate request messages" unless type == :request 192: 193: msg_age = Time.now.utc.to_i - msgtime 194: 195: if msg_age > ttl 196: cid = "" 197: cid += payload[:callerid] + "@" if payload.include?(:callerid) 198: cid += payload[:senderid] 199: 200: if msg_age > ttl 201: PluginManager["global_stats"].ttlexpired 202: 203: raise(MsgTTLExpired, "message #{requestid} from #{cid} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}") 204: end 205: end 206: 207: raise(NotTargettedAtUs, "Received message is not targetted to us") unless PluginManager["security_plugin"].validate_filter?(payload[:filter]) 208: 209: @validated = true 210: end
(Not documented)
# File lib/mcollective/message.rb, line 151 151: def validate_compound_filter(compound_filter) 152: compound_filter.each do |filter| 153: filter.each do |statement| 154: if statement["fstatement"] 155: functionname = statement["fstatement"]["name"] 156: pluginname = Data.pluginname(functionname) 157: value = statement["fstatement"]["value"] 158: 159: ddl = DDL.new(pluginname, :data) 160: 161: # parses numbers and booleans entered as strings into proper 162: # types of data so that DDL validation will pass 163: statement["fstatement"]["params"] = Data.ddl_transform_input(ddl, statement["fstatement"]["params"]) 164: 165: Data.ddl_validate(ddl, statement["fstatement"]["params"]) 166: 167: unless value && Data.ddl_has_output?(ddl, value) 168: DDL.validation_fail!(:PLMC41, "Data plugin '%{functionname}()' does not return a '%{value}' value", :error, {:functionname => functionname, :value => value}) 169: end 170: end 171: end 172: end 173: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.