Class | MCollective::Message |
In: |
lib/mcollective/message.rb
|
Parent: | Object |
container for a message, its headers, agent, collective and other meta data
VALIDTYPES | = | [:message, :request, :direct_request, :reply] |
agent | [RW] | |
collective | [RW] | |
discovered_hosts | [RW] | |
expected_msgid | [R] | |
filter | [RW] | |
headers | [RW] | |
message | [R] | |
msgtime | [R] | |
options | [RW] | |
payload | [R] | |
reply_to | [R] | |
request | [R] | |
requestid | [RW] | |
ttl | [RW] | |
type | [R] | |
validated | [R] |
payload - the message body without headers etc, just the text message - the original message received from the middleware options[:base64] - if the body base64 encoded? options[:agent] - the agent the message is for/from options[:collective] - the collective its for/from options[:headers] - the message headers options[:type] - an indicator about the type of message, :message, :request, :direct_request or :reply options[:request] - if this is a reply this should old the message we are replying to options[:filter] - for requests, the filter to encode into the message options[:options] - the normal client options hash options[:ttl] - the maximum amount of seconds this message can be valid for options[:expected_msgid] - in the case of replies this is the msgid it is expecting in the replies options[:requestid] - specific request id to use else one will be generated
# File lib/mcollective/message.rb, line 23 23: def initialize(payload, message, options = {}) 24: options = {:base64 => false, 25: :agent => nil, 26: :headers => {}, 27: :type => :message, 28: :request => nil, 29: :filter => Util.empty_filter, 30: :options => {}, 31: :ttl => 60, 32: :expected_msgid => nil, 33: :requestid => nil, 34: :collective => nil}.merge(options) 35: 36: @payload = payload 37: @message = message 38: @requestid = options[:requestid] 39: @discovered_hosts = nil 40: @reply_to = nil 41: 42: @type = options[:type] 43: @headers = options[:headers] 44: @base64 = options[:base64] 45: @filter = options[:filter] 46: @expected_msgid = options[:expected_msgid] 47: @options = options[:options] 48: 49: @ttl = @options[:ttl] || Config.instance.ttl 50: @msgtime = 0 51: 52: @validated = false 53: 54: if options[:request] 55: @request = options[:request] 56: @agent = request.agent 57: @collective = request.collective 58: @type = :reply 59: else 60: @agent = options[:agent] 61: @collective = options[:collective] 62: end 63: 64: base64_decode! 65: end
# File lib/mcollective/message.rb, line 115 115: def base64_decode! 116: return unless @base64 117: 118: @payload = SSL.base64_decode(@payload) 119: @base64 = false 120: end
# File lib/mcollective/message.rb, line 122 122: def base64_encode! 123: return if @base64 124: 125: @payload = SSL.base64_encode(@payload) 126: @base64 = true 127: end
# File lib/mcollective/message.rb, line 226 226: def create_reqid 227: # we gsub out the -s so that the format of the id does not 228: # change from previous versions, these should just be more 229: # unique than previous ones 230: SSL.uuid.gsub("-", "") 231: end
# File lib/mcollective/message.rb, line 175 175: def decode! 176: raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type) 177: 178: @payload = PluginManager["security_plugin"].decodemsg(self) 179: 180: if type == :request 181: raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid]) 182: end 183: 184: [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop| 185: instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop) 186: end 187: end
# File lib/mcollective/message.rb, line 133 133: def encode! 134: case type 135: when :reply 136: raise "Cannot encode a reply message if no request has been associated with it" unless request 137: raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid]) 138: 139: @requestid = request.payload[:requestid] 140: @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid]) 141: when :request, :direct_request 142: validate_compound_filter(@filter["compound"]) unless @filter["compound"].empty? 143: 144: @requestid = create_reqid unless @requestid 145: @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl) 146: else 147: raise "Cannot encode #{type} messages" 148: end 149: end
in the case of reply messages we are expecting replies to a previously created message. This stores a hint to that previously sent message id and can be used by other classes like the security plugins as a means of optimizing their behavior like by ignoring messages not directed at us.
# File lib/mcollective/message.rb, line 110 110: def expected_msgid=(msgid) 111: raise "Can only store the expected msgid for reply messages" unless @type == :reply 112: @expected_msgid = msgid 113: end
publish a reply message by creating a target name and sending it
# File lib/mcollective/message.rb, line 213 213: def publish 214: # If we've been specificaly told about hosts that were discovered 215: # use that information to do P2P calls if appropriate else just 216: # send it as is. 217: config = Config.instance 218: if @discovered_hosts && config.direct_addressing && (@discovered_hosts.size <= config.direct_addressing_threshold) 219: self.type = :direct_request 220: Log.debug("Handling #{requestid} as a direct request") 221: end 222: 223: PluginManager['connector_plugin'].publish(self) 224: end
Sets a custom reply-to target for requests. The connector plugin should inspect this when constructing requests and set this header ensuring replies will go to the custom target otherwise the connector should just do what it usually does
# File lib/mcollective/message.rb, line 99 99: def reply_to=(target) 100: raise "Custom reply targets can only be set on requests" unless [:request, :direct_request].include?(@type) 101: 102: @reply_to = target 103: end
Sets the message type to one of the known types. In the case of :direct_request the list of hosts to communicate with should have been set with discovered_hosts else an exception will be raised. This is for extra security, we never accidentally want to send a direct request without a list of hosts or something weird like that as it might result in a filterless broadcast being sent.
Additionally you simply cannot set :direct_request if direct_addressing was not enabled this is to force a workflow that doesnt not yield in a mistake when someone might assume direct_addressing is enabled when its not.
# File lib/mcollective/message.rb, line 76 76: def type=(type) 77: raise "Unknown message type #{type}" unless VALIDTYPES.include?(type) 78: 79: if type == :direct_request 80: raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing 81: 82: unless @discovered_hosts && !@discovered_hosts.empty? 83: raise "Can only set type to :direct_request if discovered_hosts have been set" 84: end 85: 86: # clear out the filter, custom discovery sources might interpret the filters 87: # different than the remote mcollectived and in directed mode really the only 88: # filter that matters is the agent filter 89: @filter = Util.empty_filter 90: @filter["agent"] << @agent 91: end 92: 93: @type = type 94: end
Perform validation against the message by checking filters and ttl
# File lib/mcollective/message.rb, line 190 190: def validate 191: raise "Can only validate request messages" unless type == :request 192: 193: msg_age = Time.now.utc.to_i - msgtime 194: 195: if msg_age > ttl 196: cid = "" 197: cid += payload[:callerid] + "@" if payload.include?(:callerid) 198: cid += payload[:senderid] 199: 200: if msg_age > ttl 201: PluginManager["global_stats"].ttlexpired 202: 203: raise(MsgTTLExpired, "message #{requestid} from #{cid} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}") 204: end 205: end 206: 207: raise(NotTargettedAtUs, "Received message is not targetted to us") unless PluginManager["security_plugin"].validate_filter?(payload[:filter]) 208: 209: @validated = true 210: end
# File lib/mcollective/message.rb, line 151 151: def validate_compound_filter(compound_filter) 152: compound_filter.each do |filter| 153: filter.each do |statement| 154: if statement["fstatement"] 155: functionname = statement["fstatement"]["name"] 156: pluginname = Data.pluginname(functionname) 157: value = statement["fstatement"]["value"] 158: 159: ddl = DDL.new(pluginname, :data) 160: 161: # parses numbers and booleans entered as strings into proper 162: # types of data so that DDL validation will pass 163: statement["fstatement"]["params"] = Data.ddl_transform_input(ddl, statement["fstatement"]["params"]) 164: 165: Data.ddl_validate(ddl, statement["fstatement"]["params"]) 166: 167: unless value && Data.ddl_has_output?(ddl, value) 168: DDL.validation_fail!(:PLMC41, "Data plugin '%{functionname}()' does not return a '%{value}' value", :error, {:functionname => functionname, :value => value}) 169: end 170: end 171: end 172: end 173: end