f9b5e56a99e4d86f705b74f1bfd4fc6b935b7968
[packages/precise/mcollective.git] / lib / mcollective / message.rb
1 module MCollective
2   # container for a message, its headers, agent, collective and other meta data
3   class Message
4     attr_reader :message, :request, :validated, :msgtime, :payload, :type, :expected_msgid, :reply_to
5     attr_accessor :headers, :agent, :collective, :filter
6     attr_accessor :requestid, :discovered_hosts, :options, :ttl
7
8     VALIDTYPES = [:message, :request, :direct_request, :reply]
9
10     # payload                  - the message body without headers etc, just the text
11     # message                  - the original message received from the middleware
12     # options[:base64]         - if the body base64 encoded?
13     # options[:agent]          - the agent the message is for/from
14     # options[:collective]     - the collective its for/from
15     # options[:headers]        - the message headers
16     # options[:type]           - an indicator about the type of message, :message, :request, :direct_request or :reply
17     # options[:request]        - if this is a reply this should old the message we are replying to
18     # options[:filter]         - for requests, the filter to encode into the message
19     # options[:options]        - the normal client options hash
20     # options[:ttl]            - the maximum amount of seconds this message can be valid for
21     # options[:expected_msgid] - in the case of replies this is the msgid it is expecting in the replies
22     # options[:requestid]      - specific request id to use else one will be generated
23     def initialize(payload, message, options = {})
24       options = {:base64 => false,
25                  :agent => nil,
26                  :headers => {},
27                  :type => :message,
28                  :request => nil,
29                  :filter => Util.empty_filter,
30                  :options => {},
31                  :ttl => 60,
32                  :expected_msgid => nil,
33                  :requestid => nil,
34                  :collective => nil}.merge(options)
35
36       @payload = payload
37       @message = message
38       @requestid = options[:requestid]
39       @discovered_hosts = nil
40       @reply_to = nil
41
42       @type = options[:type]
43       @headers = options[:headers]
44       @base64 = options[:base64]
45       @filter = options[:filter]
46       @expected_msgid = options[:expected_msgid]
47       @options = options[:options]
48
49       @ttl = @options[:ttl] || Config.instance.ttl
50       @msgtime = 0
51
52       @validated = false
53
54       if options[:request]
55         @request = options[:request]
56         @agent = request.agent
57         @collective = request.collective
58         @type = :reply
59       else
60         @agent = options[:agent]
61         @collective = options[:collective]
62       end
63
64       base64_decode!
65     end
66
67     # Sets the message type to one of the known types.  In the case of :direct_request
68     # the list of hosts to communicate with should have been set with #discovered_hosts
69     # else an exception will be raised.  This is for extra security, we never accidentally
70     # want to send a direct request without a list of hosts or something weird like that
71     # as it might result in a filterless broadcast being sent.
72     #
73     # Additionally you simply cannot set :direct_request if direct_addressing was not enabled
74     # this is to force a workflow that doesnt not yield in a mistake when someone might assume
75     # direct_addressing is enabled when its not.
76     def type=(type)
77       raise "Unknown message type #{type}" unless VALIDTYPES.include?(type)
78
79       if type == :direct_request
80         raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing
81
82         unless @discovered_hosts && !@discovered_hosts.empty?
83           raise "Can only set type to :direct_request if discovered_hosts have been set"
84         end
85
86         # clear out the filter, custom discovery sources might interpret the filters
87         # different than the remote mcollectived and in directed mode really the only
88         # filter that matters is the agent filter
89         @filter = Util.empty_filter
90         @filter["agent"] << @agent
91       end
92
93       @type = type
94     end
95
96     # Sets a custom reply-to target for requests.  The connector plugin should inspect this
97     # when constructing requests and set this header ensuring replies will go to the custom target
98     # otherwise the connector should just do what it usually does
99     def reply_to=(target)
100       raise "Custom reply targets can only be set on requests" unless [:request, :direct_request].include?(@type)
101
102       @reply_to = target
103     end
104
105     # in the case of reply messages we are expecting replies to a previously
106     # created message.  This stores a hint to that previously sent message id
107     # and can be used by other classes like the security plugins as a means
108     # of optimizing their behavior like by ignoring messages not directed
109     # at us.
110     def expected_msgid=(msgid)
111       raise "Can only store the expected msgid for reply messages" unless @type == :reply
112       @expected_msgid = msgid
113     end
114
115     def base64_decode!
116       return unless @base64
117
118       @payload = SSL.base64_decode(@payload)
119       @base64 = false
120     end
121
122     def base64_encode!
123       return if @base64
124
125       @payload = SSL.base64_encode(@payload)
126       @base64 = true
127     end
128
129     def base64?
130       @base64
131     end
132
133     def encode!
134       case type
135         when :reply
136           raise "Cannot encode a reply message if no request has been associated with it" unless request
137           raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid])
138
139           @requestid = request.payload[:requestid]
140           @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid])
141         when :request, :direct_request
142           validate_compount_filter(@filter["compound"]) unless @filter["compound"].empty?
143
144           @requestid = create_reqid unless @requestid
145           @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl)
146         else
147           raise "Cannot encode #{type} messages"
148       end
149     end
150
151     def validate_compount_filter(compound_filter)
152       compound_filter.each do |filter|
153         filter.each do |statement|
154           if statement["fstatement"]
155             functionname = statement["fstatement"]["name"]
156             pluginname = Data.pluginname(functionname)
157             value = statement["fstatement"]["value"]
158
159             begin
160               ddl = DDL.new(pluginname, :data)
161             rescue
162               raise DDLValidationError, "Could not find DDL for data plugin #{pluginname}, cannot use #{functionname}() in discovery"
163             end
164
165             # parses numbers and booleans entered as strings into proper
166             # types of data so that DDL validation will pass
167             statement["fstatement"]["params"] = Data.ddl_transform_input(ddl, statement["fstatement"]["params"])
168
169             Data.ddl_validate(ddl, statement["fstatement"]["params"])
170
171             unless value && Data.ddl_has_output?(ddl, value)
172               raise DDLValidationError, "#{functionname}() does not return a #{value} value"
173             end
174           end
175         end
176       end
177     end
178
179     def decode!
180       raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type)
181
182       @payload = PluginManager["security_plugin"].decodemsg(self)
183
184       if type == :request
185         raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid])
186       end
187
188       [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop|
189         instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop)
190       end
191     end
192
193     # Perform validation against the message by checking filters and ttl
194     def validate
195       raise "Can only validate request messages" unless type == :request
196
197       msg_age = Time.now.utc.to_i - msgtime
198
199       if msg_age > ttl
200         cid = ""
201         cid += payload[:callerid] + "@" if payload.include?(:callerid)
202         cid += payload[:senderid]
203
204         if msg_age > ttl
205           PluginManager["global_stats"].ttlexpired
206
207           raise(MsgTTLExpired, "message #{requestid} from #{cid} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}")
208         end
209       end
210
211       raise(NotTargettedAtUs, "Received message is not targetted to us") unless PluginManager["security_plugin"].validate_filter?(payload[:filter])
212
213       @validated = true
214     end
215
216     # publish a reply message by creating a target name and sending it
217     def publish
218       Timeout.timeout(2) do
219         # If we've been specificaly told about hosts that were discovered
220         # use that information to do P2P calls if appropriate else just
221         # send it as is.
222         if @discovered_hosts && Config.instance.direct_addressing
223           if @discovered_hosts.size <= Config.instance.direct_addressing_threshold
224             self.type = :direct_request
225             Log.debug("Handling #{requestid} as a direct request")
226           end
227
228           PluginManager["connector_plugin"].publish(self)
229         else
230           PluginManager["connector_plugin"].publish(self)
231         end
232       end
233     end
234
235     def create_reqid
236       # we gsub out the -s so that the format of the id does not
237       # change from previous versions, these should just be more
238       # unique than previous ones
239       SSL.uuid.gsub("-", "")
240     end
241   end
242 end