Added python-eventlet 0.15.2 for Ubuntu 14.04
[packages/trusty/python-eventlet.git] / eventlet / eventlet / websocket.py
1 import base64
2 import codecs
3 import collections
4 import errno
5 from random import Random
6 from socket import error as SocketError
7 import string
8 import struct
9 import sys
10 import time
11
12 try:
13     from hashlib import md5, sha1
14 except ImportError:  # pragma NO COVER
15     from md5 import md5
16     from sha import sha as sha1
17
18 from eventlet import semaphore
19 from eventlet import wsgi
20 from eventlet.green import socket
21 from eventlet.support import get_errno, six
22
23 # Python 2's utf8 decoding is more lenient than we'd like
24 # In order to pass autobahn's testsuite we need stricter validation
25 # if available...
26 for _mod in ('wsaccel.utf8validator', 'autobahn.utf8validator'):
27     # autobahn has it's own python-based validator. in newest versions
28     # this prefers to use wsaccel, a cython based implementation, if available.
29     # wsaccel may also be installed w/out autobahn, or with a earlier version.
30     try:
31         utf8validator = __import__(_mod, {}, {}, [''])
32     except ImportError:
33         utf8validator = None
34     else:
35         break
36
37 ACCEPTABLE_CLIENT_ERRORS = set((errno.ECONNRESET, errno.EPIPE))
38
39 __all__ = ["WebSocketWSGI", "WebSocket"]
40 PROTOCOL_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
41 VALID_CLOSE_STATUS = (range(1000, 1004)
42                       + range(1007, 1012)
43                       # 3000-3999: reserved for use by libraries, frameworks,
44                       # and applications
45                       + range(3000, 4000)
46                       # 4000-4999: reserved for private use and thus can't
47                       # be registered
48                       + range(4000, 5000))
49
50
51 class BadRequest(Exception):
52     def __init__(self, status='400 Bad Request', body=None, headers=None):
53         super(Exception, self).__init__()
54         self.status = status
55         self.body = body
56         self.headers = headers
57
58
59 class WebSocketWSGI(object):
60     """Wraps a websocket handler function in a WSGI application.
61
62     Use it like this::
63
64       @websocket.WebSocketWSGI
65       def my_handler(ws):
66           from_browser = ws.wait()
67           ws.send("from server")
68
69     The single argument to the function will be an instance of
70     :class:`WebSocket`.  To close the socket, simply return from the
71     function.  Note that the server will log the websocket request at
72     the time of closure.
73     """
74
75     def __init__(self, handler):
76         self.handler = handler
77         self.protocol_version = None
78         self.support_legacy_versions = True
79         self.supported_protocols = []
80         self.origin_checker = None
81
82     @classmethod
83     def configured(cls,
84                    handler=None,
85                    supported_protocols=None,
86                    origin_checker=None,
87                    support_legacy_versions=False):
88         def decorator(handler):
89             inst = cls(handler)
90             inst.support_legacy_versions = support_legacy_versions
91             inst.origin_checker = origin_checker
92             if supported_protocols:
93                 inst.supported_protocols = supported_protocols
94             return inst
95         if handler is None:
96             return decorator
97         return decorator(handler)
98
99     def __call__(self, environ, start_response):
100         http_connection_parts = [
101             part.strip()
102             for part in environ.get('HTTP_CONNECTION', '').lower().split(',')]
103         if not ('upgrade' in http_connection_parts and
104                 environ.get('HTTP_UPGRADE', '').lower() == 'websocket'):
105             # need to check a few more things here for true compliance
106             start_response('400 Bad Request', [('Connection', 'close')])
107             return []
108
109         try:
110             if 'HTTP_SEC_WEBSOCKET_VERSION' in environ:
111                 ws = self._handle_hybi_request(environ)
112             elif self.support_legacy_versions:
113                 ws = self._handle_legacy_request(environ)
114             else:
115                 raise BadRequest()
116         except BadRequest as e:
117             status = e.status
118             body = e.body or ''
119             headers = e.headers or []
120             start_response(status,
121                            [('Connection', 'close'), ] + headers)
122             return [body]
123
124         try:
125             self.handler(ws)
126         except socket.error as e:
127             if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS:
128                 raise
129         # Make sure we send the closing frame
130         ws._send_closing_frame(True)
131         # use this undocumented feature of eventlet.wsgi to ensure that it
132         # doesn't barf on the fact that we didn't call start_response
133         return wsgi.ALREADY_HANDLED
134
135     def _handle_legacy_request(self, environ):
136         sock = environ['eventlet.input'].get_socket()
137
138         if 'HTTP_SEC_WEBSOCKET_KEY1' in environ:
139             self.protocol_version = 76
140             if 'HTTP_SEC_WEBSOCKET_KEY2' not in environ:
141                 raise BadRequest()
142         else:
143             self.protocol_version = 75
144
145         if self.protocol_version == 76:
146             key1 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY1'])
147             key2 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY2'])
148             # There's no content-length header in the request, but it has 8
149             # bytes of data.
150             environ['wsgi.input'].content_length = 8
151             key3 = environ['wsgi.input'].read(8)
152             key = struct.pack(">II", key1, key2) + key3
153             response = md5(key).digest()
154
155         # Start building the response
156         scheme = 'ws'
157         if environ.get('wsgi.url_scheme') == 'https':
158             scheme = 'wss'
159         location = '%s://%s%s%s' % (
160             scheme,
161             environ.get('HTTP_HOST'),
162             environ.get('SCRIPT_NAME'),
163             environ.get('PATH_INFO')
164         )
165         qs = environ.get('QUERY_STRING')
166         if qs is not None:
167             location += '?' + qs
168         if self.protocol_version == 75:
169             handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
170                                "Upgrade: WebSocket\r\n"
171                                "Connection: Upgrade\r\n"
172                                "WebSocket-Origin: %s\r\n"
173                                "WebSocket-Location: %s\r\n\r\n" % (
174                                    environ.get('HTTP_ORIGIN'),
175                                    location))
176         elif self.protocol_version == 76:
177             handshake_reply = ("HTTP/1.1 101 WebSocket Protocol Handshake\r\n"
178                                "Upgrade: WebSocket\r\n"
179                                "Connection: Upgrade\r\n"
180                                "Sec-WebSocket-Origin: %s\r\n"
181                                "Sec-WebSocket-Protocol: %s\r\n"
182                                "Sec-WebSocket-Location: %s\r\n"
183                                "\r\n%s" % (
184                                    environ.get('HTTP_ORIGIN'),
185                                    environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'default'),
186                                    location,
187                                    response))
188         else:  # pragma NO COVER
189             raise ValueError("Unknown WebSocket protocol version.")
190         sock.sendall(handshake_reply)
191         return WebSocket(sock, environ, self.protocol_version)
192
193     def _handle_hybi_request(self, environ):
194         sock = environ['eventlet.input'].get_socket()
195         hybi_version = environ['HTTP_SEC_WEBSOCKET_VERSION']
196         if hybi_version not in ('8', '13', ):
197             raise BadRequest(status='426 Upgrade Required',
198                              headers=[('Sec-WebSocket-Version', '8, 13')])
199         self.protocol_version = int(hybi_version)
200         if 'HTTP_SEC_WEBSOCKET_KEY' not in environ:
201             # That's bad.
202             raise BadRequest()
203         origin = environ.get(
204             'HTTP_ORIGIN',
205             (environ.get('HTTP_SEC_WEBSOCKET_ORIGIN', '')
206              if self.protocol_version <= 8 else ''))
207         if self.origin_checker is not None:
208             if not self.origin_checker(environ.get('HTTP_HOST'), origin):
209                 raise BadRequest(status='403 Forbidden')
210         protocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', None)
211         negotiated_protocol = None
212         if protocols:
213             for p in (i.strip() for i in protocols.split(',')):
214                 if p in self.supported_protocols:
215                     negotiated_protocol = p
216                     break
217         # extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS', None)
218         # if extensions:
219         #    extensions = [i.strip() for i in extensions.split(',')]
220
221         key = environ['HTTP_SEC_WEBSOCKET_KEY']
222         response = base64.b64encode(sha1(key + PROTOCOL_GUID).digest())
223         handshake_reply = ["HTTP/1.1 101 Switching Protocols",
224                            "Upgrade: websocket",
225                            "Connection: Upgrade",
226                            "Sec-WebSocket-Accept: %s" % (response, )]
227         if negotiated_protocol:
228             handshake_reply.append("Sec-WebSocket-Protocol: %s"
229                                    % (negotiated_protocol, ))
230         sock.sendall('\r\n'.join(handshake_reply) + '\r\n\r\n')
231         return RFC6455WebSocket(sock, environ, self.protocol_version,
232                                 protocol=negotiated_protocol)
233
234     def _extract_number(self, value):
235         """
236         Utility function which, given a string like 'g98sd  5[]221@1', will
237         return 9852211. Used to parse the Sec-WebSocket-Key headers.
238         """
239         out = ""
240         spaces = 0
241         for char in value:
242             if char in string.digits:
243                 out += char
244             elif char == " ":
245                 spaces += 1
246         return int(out) / spaces
247
248
249 class WebSocket(object):
250     """A websocket object that handles the details of
251     serialization/deserialization to the socket.
252
253     The primary way to interact with a :class:`WebSocket` object is to
254     call :meth:`send` and :meth:`wait` in order to pass messages back
255     and forth with the browser.  Also available are the following
256     properties:
257
258     path
259         The path value of the request.  This is the same as the WSGI PATH_INFO variable, but more convenient.
260     protocol
261         The value of the Websocket-Protocol header.
262     origin
263         The value of the 'Origin' header.
264     environ
265         The full WSGI environment for this request.
266
267     """
268
269     def __init__(self, sock, environ, version=76):
270         """
271         :param socket: The eventlet socket
272         :type socket: :class:`eventlet.greenio.GreenSocket`
273         :param environ: The wsgi environment
274         :param version: The WebSocket spec version to follow (default is 76)
275         """
276         self.socket = sock
277         self.origin = environ.get('HTTP_ORIGIN')
278         self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL')
279         self.path = environ.get('PATH_INFO')
280         self.environ = environ
281         self.version = version
282         self.websocket_closed = False
283         self._buf = ""
284         self._msgs = collections.deque()
285         self._sendlock = semaphore.Semaphore()
286
287     @staticmethod
288     def _pack_message(message):
289         """Pack the message inside ``00`` and ``FF``
290
291         As per the dataframing section (5.3) for the websocket spec
292         """
293         if isinstance(message, six.text_type):
294             message = message.encode('utf-8')
295         elif not isinstance(message, six.binary_type):
296             message = b'%s' % (message,)
297         packed = b"\x00%s\xFF" % message
298         return packed
299
300     def _parse_messages(self):
301         """ Parses for messages in the buffer *buf*.  It is assumed that
302         the buffer contains the start character for a message, but that it
303         may contain only part of the rest of the message.
304
305         Returns an array of messages, and the buffer remainder that
306         didn't contain any full messages."""
307         msgs = []
308         end_idx = 0
309         buf = self._buf
310         while buf:
311             frame_type = ord(buf[0])
312             if frame_type == 0:
313                 # Normal message.
314                 end_idx = buf.find("\xFF")
315                 if end_idx == -1:  # pragma NO COVER
316                     break
317                 msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
318                 buf = buf[end_idx + 1:]
319             elif frame_type == 255:
320                 # Closing handshake.
321                 assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf
322                 self.websocket_closed = True
323                 break
324             else:
325                 raise ValueError("Don't understand how to parse this type of message: %r" % buf)
326         self._buf = buf
327         return msgs
328
329     def send(self, message):
330         """Send a message to the browser.
331
332         *message* should be convertable to a string; unicode objects should be
333         encodable as utf-8.  Raises socket.error with errno of 32
334         (broken pipe) if the socket has already been closed by the client."""
335         packed = self._pack_message(message)
336         # if two greenthreads are trying to send at the same time
337         # on the same socket, sendlock prevents interleaving and corruption
338         self._sendlock.acquire()
339         try:
340             self.socket.sendall(packed)
341         finally:
342             self._sendlock.release()
343
344     def wait(self):
345         """Waits for and deserializes messages.
346
347         Returns a single message; the oldest not yet processed. If the client
348         has already closed the connection, returns None.  This is different
349         from normal socket behavior because the empty string is a valid
350         websocket message."""
351         while not self._msgs:
352             # Websocket might be closed already.
353             if self.websocket_closed:
354                 return None
355             # no parsed messages, must mean buf needs more data
356             delta = self.socket.recv(8096)
357             if delta == '':
358                 return None
359             self._buf += delta
360             msgs = self._parse_messages()
361             self._msgs.extend(msgs)
362         return self._msgs.popleft()
363
364     def _send_closing_frame(self, ignore_send_errors=False):
365         """Sends the closing frame to the client, if required."""
366         if self.version == 76 and not self.websocket_closed:
367             try:
368                 self.socket.sendall(b"\xff\x00")
369             except SocketError:
370                 # Sometimes, like when the remote side cuts off the connection,
371                 # we don't care about this.
372                 if not ignore_send_errors:  # pragma NO COVER
373                     raise
374             self.websocket_closed = True
375
376     def close(self):
377         """Forcibly close the websocket; generally it is preferable to
378         return from the handler method."""
379         self._send_closing_frame()
380         self.socket.shutdown(True)
381         self.socket.close()
382
383
384 class ConnectionClosedError(Exception):
385     pass
386
387
388 class FailedConnectionError(Exception):
389     def __init__(self, status, message):
390         super(FailedConnectionError, self).__init__(status, message)
391         self.message = message
392         self.status = status
393
394
395 class ProtocolError(ValueError):
396     pass
397
398
399 class RFC6455WebSocket(WebSocket):
400     def __init__(self, sock, environ, version=13, protocol=None, client=False):
401         super(RFC6455WebSocket, self).__init__(sock, environ, version)
402         self.iterator = self._iter_frames()
403         self.client = client
404         self.protocol = protocol
405
406     class UTF8Decoder(object):
407         def __init__(self):
408             if utf8validator:
409                 self.validator = utf8validator.Utf8Validator()
410             else:
411                 self.validator = None
412             decoderclass = codecs.getincrementaldecoder('utf8')
413             self.decoder = decoderclass()
414
415         def reset(self):
416             if self.validator:
417                 self.validator.reset()
418             self.decoder.reset()
419
420         def decode(self, data, final=False):
421             if self.validator:
422                 valid, eocp, c_i, t_i = self.validator.validate(data)
423                 if not valid:
424                     raise ValueError('Data is not valid unicode')
425             return self.decoder.decode(data, final)
426
427     def _get_bytes(self, numbytes):
428         data = ''
429         while len(data) < numbytes:
430             d = self.socket.recv(numbytes - len(data))
431             if not d:
432                 raise ConnectionClosedError()
433             data = data + d
434         return data
435
436     class Message(object):
437         def __init__(self, opcode, decoder=None):
438             self.decoder = decoder
439             self.data = []
440             self.finished = False
441             self.opcode = opcode
442
443         def push(self, data, final=False):
444             if self.decoder:
445                 data = self.decoder.decode(data, final=final)
446             self.finished = final
447             self.data.append(data)
448
449         def getvalue(self):
450             return ''.join(self.data)
451
452     @staticmethod
453     def _apply_mask(data, mask, length=None, offset=0):
454         if length is None:
455             length = len(data)
456         cnt = range(length)
457         return ''.join(chr(ord(data[i]) ^ mask[(offset + i) % 4]) for i in cnt)
458
459     def _handle_control_frame(self, opcode, data):
460         if opcode == 8:  # connection close
461             if not data:
462                 status = 1000
463             elif len(data) > 1:
464                 status = struct.unpack_from('!H', data)[0]
465                 if not status or status not in VALID_CLOSE_STATUS:
466                     raise FailedConnectionError(
467                         1002,
468                         "Unexpected close status code.")
469                 try:
470                     data = self.UTF8Decoder().decode(data[2:], True)
471                 except (UnicodeDecodeError, ValueError):
472                     raise FailedConnectionError(
473                         1002,
474                         "Close message data should be valid UTF-8.")
475             else:
476                 status = 1002
477             self.close(close_data=(status, ''))
478             raise ConnectionClosedError()
479         elif opcode == 9:  # ping
480             self.send(data, control_code=0xA)
481         elif opcode == 0xA:  # pong
482             pass
483         else:
484             raise FailedConnectionError(
485                 1002, "Unknown control frame received.")
486
487     def _iter_frames(self):
488         fragmented_message = None
489         try:
490             while True:
491                 message = self._recv_frame(message=fragmented_message)
492                 if message.opcode & 8:
493                     self._handle_control_frame(
494                         message.opcode, message.getvalue())
495                     continue
496                 if fragmented_message and message is not fragmented_message:
497                     raise RuntimeError('Unexpected message change.')
498                 fragmented_message = message
499                 if message.finished:
500                     data = fragmented_message.getvalue()
501                     fragmented_message = None
502                     yield data
503         except FailedConnectionError:
504             exc_typ, exc_val, exc_tb = sys.exc_info()
505             self.close(close_data=(exc_val.status, exc_val.message))
506         except ConnectionClosedError:
507             return
508         except Exception:
509             self.close(close_data=(1011, 'Internal Server Error'))
510             raise
511
512     def _recv_frame(self, message=None):
513         recv = self._get_bytes
514         header = recv(2)
515         a, b = struct.unpack('!BB', header)
516         finished = a >> 7 == 1
517         rsv123 = a >> 4 & 7
518         if rsv123:
519             # must be zero
520             raise FailedConnectionError(
521                 1002,
522                 "RSV1, RSV2, RSV3: MUST be 0 unless an extension is"
523                 " negotiated that defines meanings for non-zero values.")
524         opcode = a & 15
525         if opcode not in (0, 1, 2, 8, 9, 0xA):
526             raise FailedConnectionError(1002, "Unknown opcode received.")
527         masked = b & 128 == 128
528         if not masked and not self.client:
529             raise FailedConnectionError(1002, "A client MUST mask all frames"
530                                         " that it sends to the server")
531         length = b & 127
532         if opcode & 8:
533             if not finished:
534                 raise FailedConnectionError(1002, "Control frames must not"
535                                             " be fragmented.")
536             if length > 125:
537                 raise FailedConnectionError(
538                     1002,
539                     "All control frames MUST have a payload length of 125"
540                     " bytes or less")
541         elif opcode and message:
542             raise FailedConnectionError(
543                 1002,
544                 "Received a non-continuation opcode within"
545                 " fragmented message.")
546         elif not opcode and not message:
547             raise FailedConnectionError(
548                 1002,
549                 "Received continuation opcode with no previous"
550                 " fragments received.")
551         if length == 126:
552             length = struct.unpack('!H', recv(2))[0]
553         elif length == 127:
554             length = struct.unpack('!Q', recv(8))[0]
555         if masked:
556             mask = struct.unpack('!BBBB', recv(4))
557         received = 0
558         if not message or opcode & 8:
559             decoder = self.UTF8Decoder() if opcode == 1 else None
560             message = self.Message(opcode, decoder=decoder)
561         if not length:
562             message.push('', final=finished)
563         else:
564             while received < length:
565                 d = self.socket.recv(length - received)
566                 if not d:
567                     raise ConnectionClosedError()
568                 dlen = len(d)
569                 if masked:
570                     d = self._apply_mask(d, mask, length=dlen, offset=received)
571                 received = received + dlen
572                 try:
573                     message.push(d, final=finished)
574                 except (UnicodeDecodeError, ValueError):
575                     raise FailedConnectionError(
576                         1007, "Text data must be valid utf-8")
577         return message
578
579     @staticmethod
580     def _pack_message(message, masked=False,
581                       continuation=False, final=True, control_code=None):
582         is_text = False
583         if isinstance(message, six.text_type):
584             message = message.encode('utf-8')
585             is_text = True
586         length = len(message)
587         if not length:
588             # no point masking empty data
589             masked = False
590         if control_code:
591             if control_code not in (8, 9, 0xA):
592                 raise ProtocolError('Unknown control opcode.')
593             if continuation or not final:
594                 raise ProtocolError('Control frame cannot be a fragment.')
595             if length > 125:
596                 raise ProtocolError('Control frame data too large (>125).')
597             header = struct.pack('!B', control_code | 1 << 7)
598         else:
599             opcode = 0 if continuation else (1 if is_text else 2)
600             header = struct.pack('!B', opcode | (1 << 7 if final else 0))
601         lengthdata = 1 << 7 if masked else 0
602         if length > 65535:
603             lengthdata = struct.pack('!BQ', lengthdata | 127, length)
604         elif length > 125:
605             lengthdata = struct.pack('!BH', lengthdata | 126, length)
606         else:
607             lengthdata = struct.pack('!B', lengthdata | length)
608         if masked:
609             # NOTE: RFC6455 states:
610             # A server MUST NOT mask any frames that it sends to the client
611             rand = Random(time.time())
612             mask = map(rand.getrandbits, (8, ) * 4)
613             message = RFC6455WebSocket._apply_mask(message, mask, length)
614             maskdata = struct.pack('!BBBB', *mask)
615         else:
616             maskdata = ''
617         return ''.join((header, lengthdata, maskdata, message))
618
619     def wait(self):
620         for i in self.iterator:
621             return i
622
623     def _send(self, frame):
624         self._sendlock.acquire()
625         try:
626             self.socket.sendall(frame)
627         finally:
628             self._sendlock.release()
629
630     def send(self, message, **kw):
631         kw['masked'] = self.client
632         payload = self._pack_message(message, **kw)
633         self._send(payload)
634
635     def _send_closing_frame(self, ignore_send_errors=False, close_data=None):
636         if self.version in (8, 13) and not self.websocket_closed:
637             if close_data is not None:
638                 status, msg = close_data
639                 if isinstance(msg, six.text_type):
640                     msg = msg.encode('utf-8')
641                 data = struct.pack('!H', status) + msg
642             else:
643                 data = ''
644             try:
645                 self.send(data, control_code=8)
646             except SocketError:
647                 # Sometimes, like when the remote side cuts off the connection,
648                 # we don't care about this.
649                 if not ignore_send_errors:  # pragma NO COVER
650                     raise
651             self.websocket_closed = True
652
653     def close(self, close_data=None):
654         """Forcibly close the websocket; generally it is preferable to
655         return from the handler method."""
656         self._send_closing_frame(close_data=close_data)
657         self.socket.shutdown(socket.SHUT_WR)
658         self.socket.close()