5 from random import Random
6 from socket import error as SocketError
13 from hashlib import md5, sha1
14 except ImportError: # pragma NO COVER
16 from sha import sha as sha1
18 from eventlet import semaphore
19 from eventlet import wsgi
20 from eventlet.green import socket
21 from eventlet.support import get_errno, six
23 # Python 2's utf8 decoding is more lenient than we'd like
24 # In order to pass autobahn's testsuite we need stricter validation
26 for _mod in ('wsaccel.utf8validator', 'autobahn.utf8validator'):
27 # autobahn has it's own python-based validator. in newest versions
28 # this prefers to use wsaccel, a cython based implementation, if available.
29 # wsaccel may also be installed w/out autobahn, or with a earlier version.
31 utf8validator = __import__(_mod, {}, {}, [''])
37 ACCEPTABLE_CLIENT_ERRORS = set((errno.ECONNRESET, errno.EPIPE))
39 __all__ = ["WebSocketWSGI", "WebSocket"]
40 PROTOCOL_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
41 VALID_CLOSE_STATUS = (range(1000, 1004)
43 # 3000-3999: reserved for use by libraries, frameworks,
46 # 4000-4999: reserved for private use and thus can't
51 class BadRequest(Exception):
52 def __init__(self, status='400 Bad Request', body=None, headers=None):
53 super(Exception, self).__init__()
56 self.headers = headers
59 class WebSocketWSGI(object):
60 """Wraps a websocket handler function in a WSGI application.
64 @websocket.WebSocketWSGI
66 from_browser = ws.wait()
67 ws.send("from server")
69 The single argument to the function will be an instance of
70 :class:`WebSocket`. To close the socket, simply return from the
71 function. Note that the server will log the websocket request at
75 def __init__(self, handler):
76 self.handler = handler
77 self.protocol_version = None
78 self.support_legacy_versions = True
79 self.supported_protocols = []
80 self.origin_checker = None
85 supported_protocols=None,
87 support_legacy_versions=False):
88 def decorator(handler):
90 inst.support_legacy_versions = support_legacy_versions
91 inst.origin_checker = origin_checker
92 if supported_protocols:
93 inst.supported_protocols = supported_protocols
97 return decorator(handler)
99 def __call__(self, environ, start_response):
100 http_connection_parts = [
102 for part in environ.get('HTTP_CONNECTION', '').lower().split(',')]
103 if not ('upgrade' in http_connection_parts and
104 environ.get('HTTP_UPGRADE', '').lower() == 'websocket'):
105 # need to check a few more things here for true compliance
106 start_response('400 Bad Request', [('Connection', 'close')])
110 if 'HTTP_SEC_WEBSOCKET_VERSION' in environ:
111 ws = self._handle_hybi_request(environ)
112 elif self.support_legacy_versions:
113 ws = self._handle_legacy_request(environ)
116 except BadRequest as e:
119 headers = e.headers or []
120 start_response(status,
121 [('Connection', 'close'), ] + headers)
126 except socket.error as e:
127 if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS:
129 # Make sure we send the closing frame
130 ws._send_closing_frame(True)
131 # use this undocumented feature of eventlet.wsgi to ensure that it
132 # doesn't barf on the fact that we didn't call start_response
133 return wsgi.ALREADY_HANDLED
135 def _handle_legacy_request(self, environ):
136 sock = environ['eventlet.input'].get_socket()
138 if 'HTTP_SEC_WEBSOCKET_KEY1' in environ:
139 self.protocol_version = 76
140 if 'HTTP_SEC_WEBSOCKET_KEY2' not in environ:
143 self.protocol_version = 75
145 if self.protocol_version == 76:
146 key1 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY1'])
147 key2 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY2'])
148 # There's no content-length header in the request, but it has 8
150 environ['wsgi.input'].content_length = 8
151 key3 = environ['wsgi.input'].read(8)
152 key = struct.pack(">II", key1, key2) + key3
153 response = md5(key).digest()
155 # Start building the response
157 if environ.get('wsgi.url_scheme') == 'https':
159 location = '%s://%s%s%s' % (
161 environ.get('HTTP_HOST'),
162 environ.get('SCRIPT_NAME'),
163 environ.get('PATH_INFO')
165 qs = environ.get('QUERY_STRING')
168 if self.protocol_version == 75:
169 handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
170 "Upgrade: WebSocket\r\n"
171 "Connection: Upgrade\r\n"
172 "WebSocket-Origin: %s\r\n"
173 "WebSocket-Location: %s\r\n\r\n" % (
174 environ.get('HTTP_ORIGIN'),
176 elif self.protocol_version == 76:
177 handshake_reply = ("HTTP/1.1 101 WebSocket Protocol Handshake\r\n"
178 "Upgrade: WebSocket\r\n"
179 "Connection: Upgrade\r\n"
180 "Sec-WebSocket-Origin: %s\r\n"
181 "Sec-WebSocket-Protocol: %s\r\n"
182 "Sec-WebSocket-Location: %s\r\n"
184 environ.get('HTTP_ORIGIN'),
185 environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'default'),
188 else: # pragma NO COVER
189 raise ValueError("Unknown WebSocket protocol version.")
190 sock.sendall(handshake_reply)
191 return WebSocket(sock, environ, self.protocol_version)
193 def _handle_hybi_request(self, environ):
194 sock = environ['eventlet.input'].get_socket()
195 hybi_version = environ['HTTP_SEC_WEBSOCKET_VERSION']
196 if hybi_version not in ('8', '13', ):
197 raise BadRequest(status='426 Upgrade Required',
198 headers=[('Sec-WebSocket-Version', '8, 13')])
199 self.protocol_version = int(hybi_version)
200 if 'HTTP_SEC_WEBSOCKET_KEY' not in environ:
203 origin = environ.get(
205 (environ.get('HTTP_SEC_WEBSOCKET_ORIGIN', '')
206 if self.protocol_version <= 8 else ''))
207 if self.origin_checker is not None:
208 if not self.origin_checker(environ.get('HTTP_HOST'), origin):
209 raise BadRequest(status='403 Forbidden')
210 protocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', None)
211 negotiated_protocol = None
213 for p in (i.strip() for i in protocols.split(',')):
214 if p in self.supported_protocols:
215 negotiated_protocol = p
217 # extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS', None)
219 # extensions = [i.strip() for i in extensions.split(',')]
221 key = environ['HTTP_SEC_WEBSOCKET_KEY']
222 response = base64.b64encode(sha1(key + PROTOCOL_GUID).digest())
223 handshake_reply = ["HTTP/1.1 101 Switching Protocols",
224 "Upgrade: websocket",
225 "Connection: Upgrade",
226 "Sec-WebSocket-Accept: %s" % (response, )]
227 if negotiated_protocol:
228 handshake_reply.append("Sec-WebSocket-Protocol: %s"
229 % (negotiated_protocol, ))
230 sock.sendall('\r\n'.join(handshake_reply) + '\r\n\r\n')
231 return RFC6455WebSocket(sock, environ, self.protocol_version,
232 protocol=negotiated_protocol)
234 def _extract_number(self, value):
236 Utility function which, given a string like 'g98sd 5[]221@1', will
237 return 9852211. Used to parse the Sec-WebSocket-Key headers.
242 if char in string.digits:
246 return int(out) / spaces
249 class WebSocket(object):
250 """A websocket object that handles the details of
251 serialization/deserialization to the socket.
253 The primary way to interact with a :class:`WebSocket` object is to
254 call :meth:`send` and :meth:`wait` in order to pass messages back
255 and forth with the browser. Also available are the following
259 The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient.
261 The value of the Websocket-Protocol header.
263 The value of the 'Origin' header.
265 The full WSGI environment for this request.
269 def __init__(self, sock, environ, version=76):
271 :param socket: The eventlet socket
272 :type socket: :class:`eventlet.greenio.GreenSocket`
273 :param environ: The wsgi environment
274 :param version: The WebSocket spec version to follow (default is 76)
277 self.origin = environ.get('HTTP_ORIGIN')
278 self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL')
279 self.path = environ.get('PATH_INFO')
280 self.environ = environ
281 self.version = version
282 self.websocket_closed = False
284 self._msgs = collections.deque()
285 self._sendlock = semaphore.Semaphore()
288 def _pack_message(message):
289 """Pack the message inside ``00`` and ``FF``
291 As per the dataframing section (5.3) for the websocket spec
293 if isinstance(message, six.text_type):
294 message = message.encode('utf-8')
295 elif not isinstance(message, six.binary_type):
296 message = b'%s' % (message,)
297 packed = b"\x00%s\xFF" % message
300 def _parse_messages(self):
301 """ Parses for messages in the buffer *buf*. It is assumed that
302 the buffer contains the start character for a message, but that it
303 may contain only part of the rest of the message.
305 Returns an array of messages, and the buffer remainder that
306 didn't contain any full messages."""
311 frame_type = ord(buf[0])
314 end_idx = buf.find("\xFF")
315 if end_idx == -1: # pragma NO COVER
317 msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
318 buf = buf[end_idx + 1:]
319 elif frame_type == 255:
321 assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf
322 self.websocket_closed = True
325 raise ValueError("Don't understand how to parse this type of message: %r" % buf)
329 def send(self, message):
330 """Send a message to the browser.
332 *message* should be convertable to a string; unicode objects should be
333 encodable as utf-8. Raises socket.error with errno of 32
334 (broken pipe) if the socket has already been closed by the client."""
335 packed = self._pack_message(message)
336 # if two greenthreads are trying to send at the same time
337 # on the same socket, sendlock prevents interleaving and corruption
338 self._sendlock.acquire()
340 self.socket.sendall(packed)
342 self._sendlock.release()
345 """Waits for and deserializes messages.
347 Returns a single message; the oldest not yet processed. If the client
348 has already closed the connection, returns None. This is different
349 from normal socket behavior because the empty string is a valid
350 websocket message."""
351 while not self._msgs:
352 # Websocket might be closed already.
353 if self.websocket_closed:
355 # no parsed messages, must mean buf needs more data
356 delta = self.socket.recv(8096)
360 msgs = self._parse_messages()
361 self._msgs.extend(msgs)
362 return self._msgs.popleft()
364 def _send_closing_frame(self, ignore_send_errors=False):
365 """Sends the closing frame to the client, if required."""
366 if self.version == 76 and not self.websocket_closed:
368 self.socket.sendall(b"\xff\x00")
370 # Sometimes, like when the remote side cuts off the connection,
371 # we don't care about this.
372 if not ignore_send_errors: # pragma NO COVER
374 self.websocket_closed = True
377 """Forcibly close the websocket; generally it is preferable to
378 return from the handler method."""
379 self._send_closing_frame()
380 self.socket.shutdown(True)
384 class ConnectionClosedError(Exception):
388 class FailedConnectionError(Exception):
389 def __init__(self, status, message):
390 super(FailedConnectionError, self).__init__(status, message)
391 self.message = message
395 class ProtocolError(ValueError):
399 class RFC6455WebSocket(WebSocket):
400 def __init__(self, sock, environ, version=13, protocol=None, client=False):
401 super(RFC6455WebSocket, self).__init__(sock, environ, version)
402 self.iterator = self._iter_frames()
404 self.protocol = protocol
406 class UTF8Decoder(object):
409 self.validator = utf8validator.Utf8Validator()
411 self.validator = None
412 decoderclass = codecs.getincrementaldecoder('utf8')
413 self.decoder = decoderclass()
417 self.validator.reset()
420 def decode(self, data, final=False):
422 valid, eocp, c_i, t_i = self.validator.validate(data)
424 raise ValueError('Data is not valid unicode')
425 return self.decoder.decode(data, final)
427 def _get_bytes(self, numbytes):
429 while len(data) < numbytes:
430 d = self.socket.recv(numbytes - len(data))
432 raise ConnectionClosedError()
436 class Message(object):
437 def __init__(self, opcode, decoder=None):
438 self.decoder = decoder
440 self.finished = False
443 def push(self, data, final=False):
445 data = self.decoder.decode(data, final=final)
446 self.finished = final
447 self.data.append(data)
450 return ''.join(self.data)
453 def _apply_mask(data, mask, length=None, offset=0):
457 return ''.join(chr(ord(data[i]) ^ mask[(offset + i) % 4]) for i in cnt)
459 def _handle_control_frame(self, opcode, data):
460 if opcode == 8: # connection close
464 status = struct.unpack_from('!H', data)[0]
465 if not status or status not in VALID_CLOSE_STATUS:
466 raise FailedConnectionError(
468 "Unexpected close status code.")
470 data = self.UTF8Decoder().decode(data[2:], True)
471 except (UnicodeDecodeError, ValueError):
472 raise FailedConnectionError(
474 "Close message data should be valid UTF-8.")
477 self.close(close_data=(status, ''))
478 raise ConnectionClosedError()
479 elif opcode == 9: # ping
480 self.send(data, control_code=0xA)
481 elif opcode == 0xA: # pong
484 raise FailedConnectionError(
485 1002, "Unknown control frame received.")
487 def _iter_frames(self):
488 fragmented_message = None
491 message = self._recv_frame(message=fragmented_message)
492 if message.opcode & 8:
493 self._handle_control_frame(
494 message.opcode, message.getvalue())
496 if fragmented_message and message is not fragmented_message:
497 raise RuntimeError('Unexpected message change.')
498 fragmented_message = message
500 data = fragmented_message.getvalue()
501 fragmented_message = None
503 except FailedConnectionError:
504 exc_typ, exc_val, exc_tb = sys.exc_info()
505 self.close(close_data=(exc_val.status, exc_val.message))
506 except ConnectionClosedError:
509 self.close(close_data=(1011, 'Internal Server Error'))
512 def _recv_frame(self, message=None):
513 recv = self._get_bytes
515 a, b = struct.unpack('!BB', header)
516 finished = a >> 7 == 1
520 raise FailedConnectionError(
522 "RSV1, RSV2, RSV3: MUST be 0 unless an extension is"
523 " negotiated that defines meanings for non-zero values.")
525 if opcode not in (0, 1, 2, 8, 9, 0xA):
526 raise FailedConnectionError(1002, "Unknown opcode received.")
527 masked = b & 128 == 128
528 if not masked and not self.client:
529 raise FailedConnectionError(1002, "A client MUST mask all frames"
530 " that it sends to the server")
534 raise FailedConnectionError(1002, "Control frames must not"
537 raise FailedConnectionError(
539 "All control frames MUST have a payload length of 125"
541 elif opcode and message:
542 raise FailedConnectionError(
544 "Received a non-continuation opcode within"
545 " fragmented message.")
546 elif not opcode and not message:
547 raise FailedConnectionError(
549 "Received continuation opcode with no previous"
550 " fragments received.")
552 length = struct.unpack('!H', recv(2))[0]
554 length = struct.unpack('!Q', recv(8))[0]
556 mask = struct.unpack('!BBBB', recv(4))
558 if not message or opcode & 8:
559 decoder = self.UTF8Decoder() if opcode == 1 else None
560 message = self.Message(opcode, decoder=decoder)
562 message.push('', final=finished)
564 while received < length:
565 d = self.socket.recv(length - received)
567 raise ConnectionClosedError()
570 d = self._apply_mask(d, mask, length=dlen, offset=received)
571 received = received + dlen
573 message.push(d, final=finished)
574 except (UnicodeDecodeError, ValueError):
575 raise FailedConnectionError(
576 1007, "Text data must be valid utf-8")
580 def _pack_message(message, masked=False,
581 continuation=False, final=True, control_code=None):
583 if isinstance(message, six.text_type):
584 message = message.encode('utf-8')
586 length = len(message)
588 # no point masking empty data
591 if control_code not in (8, 9, 0xA):
592 raise ProtocolError('Unknown control opcode.')
593 if continuation or not final:
594 raise ProtocolError('Control frame cannot be a fragment.')
596 raise ProtocolError('Control frame data too large (>125).')
597 header = struct.pack('!B', control_code | 1 << 7)
599 opcode = 0 if continuation else (1 if is_text else 2)
600 header = struct.pack('!B', opcode | (1 << 7 if final else 0))
601 lengthdata = 1 << 7 if masked else 0
603 lengthdata = struct.pack('!BQ', lengthdata | 127, length)
605 lengthdata = struct.pack('!BH', lengthdata | 126, length)
607 lengthdata = struct.pack('!B', lengthdata | length)
609 # NOTE: RFC6455 states:
610 # A server MUST NOT mask any frames that it sends to the client
611 rand = Random(time.time())
612 mask = map(rand.getrandbits, (8, ) * 4)
613 message = RFC6455WebSocket._apply_mask(message, mask, length)
614 maskdata = struct.pack('!BBBB', *mask)
617 return ''.join((header, lengthdata, maskdata, message))
620 for i in self.iterator:
623 def _send(self, frame):
624 self._sendlock.acquire()
626 self.socket.sendall(frame)
628 self._sendlock.release()
630 def send(self, message, **kw):
631 kw['masked'] = self.client
632 payload = self._pack_message(message, **kw)
635 def _send_closing_frame(self, ignore_send_errors=False, close_data=None):
636 if self.version in (8, 13) and not self.websocket_closed:
637 if close_data is not None:
638 status, msg = close_data
639 if isinstance(msg, six.text_type):
640 msg = msg.encode('utf-8')
641 data = struct.pack('!H', status) + msg
645 self.send(data, control_code=8)
647 # Sometimes, like when the remote side cuts off the connection,
648 # we don't care about this.
649 if not ignore_send_errors: # pragma NO COVER
651 self.websocket_closed = True
653 def close(self, close_data=None):
654 """Forcibly close the websocket; generally it is preferable to
655 return from the handler method."""
656 self._send_closing_frame(close_data=close_data)
657 self.socket.shutdown(socket.SHUT_WR)