summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorYAMADA Hideki <yamada.hideki@po.ntts.co.jp>2014-05-14 19:11:00 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2014-05-16 05:03:11 +0900
commit28339dd774572d3fdd48904438551c8d70ae6cf3 (patch)
tree38ee6249e4905f98916bc7dd71882c604bd55e9f
parent73e16c15ccaf45420da25dfec3b05a8488cb0b46 (diff)
Import eventlet.websocket from master branch
Because eventlet 0.14 doen't support RFC 6455. Signed-off-by: YAMADA Hideki <yamada.hideki@po.ntts.co.jp> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/contrib/_eventlet/__init__.py0
-rw-r--r--ryu/contrib/_eventlet/websocket.py656
-rw-r--r--ryu/lib/hub.py3
3 files changed, 659 insertions, 0 deletions
diff --git a/ryu/contrib/_eventlet/__init__.py b/ryu/contrib/_eventlet/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/ryu/contrib/_eventlet/__init__.py
diff --git a/ryu/contrib/_eventlet/websocket.py b/ryu/contrib/_eventlet/websocket.py
new file mode 100644
index 00000000..4a07c8d8
--- /dev/null
+++ b/ryu/contrib/_eventlet/websocket.py
@@ -0,0 +1,656 @@
+import base64
+import codecs
+import collections
+import errno
+from random import Random
+import string
+import struct
+import sys
+import time
+from socket import error as SocketError
+
+try:
+ from hashlib import md5, sha1
+except ImportError: #pragma NO COVER
+ from md5 import md5
+ from sha import sha as sha1
+
+import eventlet
+from eventlet import semaphore
+from eventlet import wsgi
+from eventlet.green import socket
+from eventlet.support import get_errno
+
+# Python 2's utf8 decoding is more lenient than we'd like
+# In order to pass autobahn's testsuite we need stricter validation
+# if available...
+for _mod in ('wsaccel.utf8validator', 'autobahn.utf8validator'):
+ # autobahn has it's own python-based validator. in newest versions
+ # this prefers to use wsaccel, a cython based implementation, if available.
+ # wsaccel may also be installed w/out autobahn, or with a earlier version.
+ try:
+ utf8validator = __import__(_mod, {}, {}, [''])
+ except ImportError:
+ utf8validator = None
+ else:
+ break
+
+ACCEPTABLE_CLIENT_ERRORS = set((errno.ECONNRESET, errno.EPIPE))
+
+__all__ = ["WebSocketWSGI", "WebSocket"]
+PROTOCOL_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
+VALID_CLOSE_STATUS = (range(1000, 1004)
+ + range(1007, 1012)
+ # 3000-3999: reserved for use by libraries, frameworks,
+ # and applications
+ + range(3000, 4000)
+ # 4000-4999: reserved for private use and thus can't
+ # be registered
+ + range(4000, 5000))
+
+
+class BadRequest(Exception):
+ def __init__(self, status='400 Bad Request', body=None, headers=None):
+ super(Exception, self).__init__()
+ self.status = status
+ self.body = body
+ self.headers = headers
+
+
+class WebSocketWSGI(object):
+ """Wraps a websocket handler function in a WSGI application.
+
+ Use it like this::
+
+ @websocket.WebSocketWSGI
+ def my_handler(ws):
+ from_browser = ws.wait()
+ ws.send("from server")
+
+ The single argument to the function will be an instance of
+ :class:`WebSocket`. To close the socket, simply return from the
+ function. Note that the server will log the websocket request at
+ the time of closure.
+ """
+ def __init__(self, handler):
+ self.handler = handler
+ self.protocol_version = None
+ self.support_legacy_versions = True
+ self.supported_protocols = []
+ self.origin_checker = None
+
+ @classmethod
+ def configured(cls,
+ handler=None,
+ supported_protocols=None,
+ origin_checker=None,
+ support_legacy_versions=False):
+ def decorator(handler):
+ inst = cls(handler)
+ inst.support_legacy_versions = support_legacy_versions
+ inst.origin_checker = origin_checker
+ if supported_protocols:
+ inst.supported_protocols = supported_protocols
+ return inst
+ if handler is None:
+ return decorator
+ return decorator(handler)
+
+ def __call__(self, environ, start_response):
+ http_connection_parts = [
+ part.strip()
+ for part in environ.get('HTTP_CONNECTION', '').lower().split(',')]
+ if not ('upgrade' in http_connection_parts and
+ environ.get('HTTP_UPGRADE', '').lower() == 'websocket'):
+ # need to check a few more things here for true compliance
+ start_response('400 Bad Request', [('Connection', 'close')])
+ return []
+
+ try:
+ if 'HTTP_SEC_WEBSOCKET_VERSION' in environ:
+ ws = self._handle_hybi_request(environ)
+ elif self.support_legacy_versions:
+ ws = self._handle_legacy_request(environ)
+ else:
+ raise BadRequest()
+ except BadRequest as e:
+ status = e.status
+ body = e.body or ''
+ headers = e.headers or []
+ start_response(status,
+ [('Connection', 'close'), ] + headers)
+ return [body]
+
+ try:
+ self.handler(ws)
+ except socket.error as e:
+ if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS:
+ raise
+ # Make sure we send the closing frame
+ ws._send_closing_frame(True)
+ # use this undocumented feature of eventlet.wsgi to ensure that it
+ # doesn't barf on the fact that we didn't call start_response
+ return wsgi.ALREADY_HANDLED
+
+ def _handle_legacy_request(self, environ):
+ sock = environ['eventlet.input'].get_socket()
+
+ if 'HTTP_SEC_WEBSOCKET_KEY1' in environ:
+ self.protocol_version = 76
+ if 'HTTP_SEC_WEBSOCKET_KEY2' not in environ:
+ raise BadRequest()
+ else:
+ self.protocol_version = 75
+
+ if self.protocol_version == 76:
+ key1 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY1'])
+ key2 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY2'])
+ # There's no content-length header in the request, but it has 8
+ # bytes of data.
+ environ['wsgi.input'].content_length = 8
+ key3 = environ['wsgi.input'].read(8)
+ key = struct.pack(">II", key1, key2) + key3
+ response = md5(key).digest()
+
+ # Start building the response
+ scheme = 'ws'
+ if environ.get('wsgi.url_scheme') == 'https':
+ scheme = 'wss'
+ location = '%s://%s%s%s' % (
+ scheme,
+ environ.get('HTTP_HOST'),
+ environ.get('SCRIPT_NAME'),
+ environ.get('PATH_INFO')
+ )
+ qs = environ.get('QUERY_STRING')
+ if qs is not None:
+ location += '?' + qs
+ if self.protocol_version == 75:
+ handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
+ "Upgrade: WebSocket\r\n"
+ "Connection: Upgrade\r\n"
+ "WebSocket-Origin: %s\r\n"
+ "WebSocket-Location: %s\r\n\r\n" % (
+ environ.get('HTTP_ORIGIN'),
+ location))
+ elif self.protocol_version == 76:
+ handshake_reply = ("HTTP/1.1 101 WebSocket Protocol Handshake\r\n"
+ "Upgrade: WebSocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Sec-WebSocket-Origin: %s\r\n"
+ "Sec-WebSocket-Protocol: %s\r\n"
+ "Sec-WebSocket-Location: %s\r\n"
+ "\r\n%s" % (
+ environ.get('HTTP_ORIGIN'),
+ environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'default'),
+ location,
+ response))
+ else: #pragma NO COVER
+ raise ValueError("Unknown WebSocket protocol version.")
+ sock.sendall(handshake_reply)
+ return WebSocket(sock, environ, self.protocol_version)
+
+ def _handle_hybi_request(self, environ):
+ sock = environ['eventlet.input'].get_socket()
+ hybi_version = environ['HTTP_SEC_WEBSOCKET_VERSION']
+ if hybi_version not in ('8', '13', ):
+ raise BadRequest(status='426 Upgrade Required',
+ headers=[('Sec-WebSocket-Version', '8, 13')])
+ self.protocol_version = int(hybi_version)
+ if 'HTTP_SEC_WEBSOCKET_KEY' not in environ:
+ # That's bad.
+ raise BadRequest()
+ origin = environ.get(
+ 'HTTP_ORIGIN',
+ (environ.get('HTTP_SEC_WEBSOCKET_ORIGIN', '')
+ if self.protocol_version <= 8 else ''))
+ if self.origin_checker is not None:
+ if not self.origin_checker(environ.get('HTTP_HOST'), origin):
+ raise BadRequest(status='403 Forbidden')
+ protocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', None)
+ negotiated_protocol = None
+ if protocols:
+ for p in (i.strip() for i in protocols.split(',')):
+ if p in self.supported_protocols:
+ negotiated_protocol = p
+ break
+ #extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS', None)
+ #if extensions:
+ # extensions = [i.strip() for i in extensions.split(',')]
+
+ key = environ['HTTP_SEC_WEBSOCKET_KEY']
+ response = base64.b64encode(sha1(key + PROTOCOL_GUID).digest())
+ handshake_reply = ["HTTP/1.1 101 Switching Protocols",
+ "Upgrade: websocket",
+ "Connection: Upgrade",
+ "Sec-WebSocket-Accept: %s" % (response, )]
+ if negotiated_protocol:
+ handshake_reply.append("Sec-WebSocket-Protocol: %s"
+ % (negotiated_protocol, ))
+ sock.sendall('\r\n'.join(handshake_reply) + '\r\n\r\n')
+ return RFC6455WebSocket(sock, environ, self.protocol_version,
+ protocol=negotiated_protocol)
+
+ def _extract_number(self, value):
+ """
+ Utility function which, given a string like 'g98sd 5[]221@1', will
+ return 9852211. Used to parse the Sec-WebSocket-Key headers.
+ """
+ out = ""
+ spaces = 0
+ for char in value:
+ if char in string.digits:
+ out += char
+ elif char == " ":
+ spaces += 1
+ return int(out) / spaces
+
+class WebSocket(object):
+ """A websocket object that handles the details of
+ serialization/deserialization to the socket.
+
+ The primary way to interact with a :class:`WebSocket` object is to
+ call :meth:`send` and :meth:`wait` in order to pass messages back
+ and forth with the browser. Also available are the following
+ properties:
+
+ path
+ The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient.
+ protocol
+ The value of the Websocket-Protocol header.
+ origin
+ The value of the 'Origin' header.
+ environ
+ The full WSGI environment for this request.
+
+ """
+ def __init__(self, sock, environ, version=76):
+ """
+ :param socket: The eventlet socket
+ :type socket: :class:`eventlet.greenio.GreenSocket`
+ :param environ: The wsgi environment
+ :param version: The WebSocket spec version to follow (default is 76)
+ """
+ self.socket = sock
+ self.origin = environ.get('HTTP_ORIGIN')
+ self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL')
+ self.path = environ.get('PATH_INFO')
+ self.environ = environ
+ self.version = version
+ self.websocket_closed = False
+ self._buf = ""
+ self._msgs = collections.deque()
+ self._sendlock = semaphore.Semaphore()
+
+ @staticmethod
+ def _pack_message(message):
+ """Pack the message inside ``00`` and ``FF``
+
+ As per the dataframing section (5.3) for the websocket spec
+ """
+ if isinstance(message, unicode):
+ message = message.encode('utf-8')
+ elif not isinstance(message, str):
+ message = str(message)
+ packed = "\x00%s\xFF" % message
+ return packed
+
+ def _parse_messages(self):
+ """ Parses for messages in the buffer *buf*. It is assumed that
+ the buffer contains the start character for a message, but that it
+ may contain only part of the rest of the message.
+
+ Returns an array of messages, and the buffer remainder that
+ didn't contain any full messages."""
+ msgs = []
+ end_idx = 0
+ buf = self._buf
+ while buf:
+ frame_type = ord(buf[0])
+ if frame_type == 0:
+ # Normal message.
+ end_idx = buf.find("\xFF")
+ if end_idx == -1: #pragma NO COVER
+ break
+ msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
+ buf = buf[end_idx+1:]
+ elif frame_type == 255:
+ # Closing handshake.
+ assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf
+ self.websocket_closed = True
+ break
+ else:
+ raise ValueError("Don't understand how to parse this type of message: %r" % buf)
+ self._buf = buf
+ return msgs
+
+ def send(self, message):
+ """Send a message to the browser.
+
+ *message* should be convertable to a string; unicode objects should be
+ encodable as utf-8. Raises socket.error with errno of 32
+ (broken pipe) if the socket has already been closed by the client."""
+ packed = self._pack_message(message)
+ # if two greenthreads are trying to send at the same time
+ # on the same socket, sendlock prevents interleaving and corruption
+ self._sendlock.acquire()
+ try:
+ self.socket.sendall(packed)
+ finally:
+ self._sendlock.release()
+
+ def wait(self):
+ """Waits for and deserializes messages.
+
+ Returns a single message; the oldest not yet processed. If the client
+ has already closed the connection, returns None. This is different
+ from normal socket behavior because the empty string is a valid
+ websocket message."""
+ while not self._msgs:
+ # Websocket might be closed already.
+ if self.websocket_closed:
+ return None
+ # no parsed messages, must mean buf needs more data
+ delta = self.socket.recv(8096)
+ if delta == '':
+ return None
+ self._buf += delta
+ msgs = self._parse_messages()
+ self._msgs.extend(msgs)
+ return self._msgs.popleft()
+
+ def _send_closing_frame(self, ignore_send_errors=False):
+ """Sends the closing frame to the client, if required."""
+ if self.version == 76 and not self.websocket_closed:
+ try:
+ self.socket.sendall("\xff\x00")
+ except SocketError:
+ # Sometimes, like when the remote side cuts off the connection,
+ # we don't care about this.
+ if not ignore_send_errors: #pragma NO COVER
+ raise
+ self.websocket_closed = True
+
+ def close(self):
+ """Forcibly close the websocket; generally it is preferable to
+ return from the handler method."""
+ self._send_closing_frame()
+ self.socket.shutdown(True)
+ self.socket.close()
+
+
+class ConnectionClosedError(Exception):
+ pass
+
+
+class FailedConnectionError(Exception):
+ def __init__(self, status, message):
+ super(FailedConnectionError, self).__init__(status, message)
+ self.message = message
+ self.status = status
+
+
+class ProtocolError(ValueError):
+ pass
+
+
+class RFC6455WebSocket(WebSocket):
+ def __init__(self, sock, environ, version=13, protocol=None, client=False):
+ super(RFC6455WebSocket, self).__init__(sock, environ, version)
+ self.iterator = self._iter_frames()
+ self.client = client
+ self.protocol = protocol
+
+ class UTF8Decoder(object):
+ def __init__(self):
+ if utf8validator:
+ self.validator = utf8validator.Utf8Validator()
+ else:
+ self.validator = None
+ decoderclass = codecs.getincrementaldecoder('utf8')
+ self.decoder = decoderclass()
+
+ def reset(self):
+ if self.validator:
+ self.validator.reset()
+ self.decoder.reset()
+
+ def decode(self, data, final=False):
+ if self.validator:
+ valid, eocp, c_i, t_i = self.validator.validate(data)
+ if not valid:
+ raise ValueError('Data is not valid unicode')
+ return self.decoder.decode(data, final)
+
+ def _get_bytes(self, numbytes):
+ data = ''
+ while len(data) < numbytes:
+ d = self.socket.recv(numbytes - len(data))
+ if not d:
+ raise ConnectionClosedError()
+ data = data + d
+ return data
+
+ class Message(object):
+ def __init__(self, opcode, decoder=None):
+ self.decoder = decoder
+ self.data = []
+ self.finished = False
+ self.opcode = opcode
+
+ def push(self, data, final=False):
+ if self.decoder:
+ data = self.decoder.decode(data, final=final)
+ self.finished = final
+ self.data.append(data)
+
+ def getvalue(self):
+ return ''.join(self.data)
+
+ @staticmethod
+ def _apply_mask(data, mask, length=None, offset=0):
+ if length is None:
+ length = len(data)
+ cnt = range(length)
+ return ''.join(chr(ord(data[i]) ^ mask[(offset + i) % 4]) for i in cnt)
+
+ def _handle_control_frame(self, opcode, data):
+ if opcode == 8: # connection close
+ if not data:
+ status = 1000
+ elif len(data) > 1:
+ status = struct.unpack_from('!H', data)[0]
+ if not status or status not in VALID_CLOSE_STATUS:
+ raise FailedConnectionError(
+ 1002,
+ "Unexpected close status code.")
+ try:
+ data = self.UTF8Decoder().decode(data[2:], True)
+ except (UnicodeDecodeError, ValueError):
+ raise FailedConnectionError(
+ 1002,
+ "Close message data should be valid UTF-8.")
+ else:
+ status = 1002
+ self.close(close_data=(status, ''))
+ raise ConnectionClosedError()
+ elif opcode == 9: # ping
+ self.send(data, control_code=0xA)
+ elif opcode == 0xA: # pong
+ pass
+ else:
+ raise FailedConnectionError(
+ 1002, "Unknown control frame received.")
+
+ def _iter_frames(self):
+ fragmented_message = None
+ try:
+ while True:
+ message = self._recv_frame(message=fragmented_message)
+ if message.opcode & 8:
+ self._handle_control_frame(
+ message.opcode, message.getvalue())
+ continue
+ if fragmented_message and message is not fragmented_message:
+ raise RuntimeError('Unexpected message change.')
+ fragmented_message = message
+ if message.finished:
+ data = fragmented_message.getvalue()
+ fragmented_message = None
+ yield data
+ except FailedConnectionError:
+ exc_typ, exc_val, exc_tb = sys.exc_info()
+ self.close(close_data=(exc_val.status, exc_val.message))
+ except ConnectionClosedError:
+ return
+ except Exception:
+ self.close(close_data=(1011, 'Internal Server Error'))
+ raise
+
+ def _recv_frame(self, message=None):
+ recv = self._get_bytes
+ header = recv(2)
+ a, b = struct.unpack('!BB', header)
+ finished = a >> 7 == 1
+ rsv123 = a >> 4 & 7
+ if rsv123:
+ # must be zero
+ raise FailedConnectionError(
+ 1002,
+ "RSV1, RSV2, RSV3: MUST be 0 unless an extension is"
+ " negotiated that defines meanings for non-zero values.")
+ opcode = a & 15
+ if opcode not in (0, 1, 2, 8, 9, 0xA):
+ raise FailedConnectionError(1002, "Unknown opcode received.")
+ masked = b & 128 == 128
+ if not masked and not self.client:
+ raise FailedConnectionError(1002, "A client MUST mask all frames"
+ " that it sends to the server")
+ length = b & 127
+ if opcode & 8:
+ if not finished:
+ raise FailedConnectionError(1002, "Control frames must not"
+ " be fragmented.")
+ if length > 125:
+ raise FailedConnectionError(
+ 1002,
+ "All control frames MUST have a payload length of 125"
+ " bytes or less")
+ elif opcode and message:
+ raise FailedConnectionError(
+ 1002,
+ "Received a non-continuation opcode within"
+ " fragmented message.")
+ elif not opcode and not message:
+ raise FailedConnectionError(
+ 1002,
+ "Received continuation opcode with no previous"
+ " fragments received.")
+ if length == 126:
+ length = struct.unpack('!H', recv(2))[0]
+ elif length == 127:
+ length = struct.unpack('!Q', recv(8))[0]
+ if masked:
+ mask = struct.unpack('!BBBB', recv(4))
+ received = 0
+ if not message or opcode & 8:
+ decoder = self.UTF8Decoder() if opcode == 1 else None
+ message = self.Message(opcode, decoder=decoder)
+ if not length:
+ message.push('', final=finished)
+ else:
+ while received < length:
+ d = self.socket.recv(length - received)
+ if not d:
+ raise ConnectionClosedError()
+ dlen = len(d)
+ if masked:
+ d = self._apply_mask(d, mask, length=dlen, offset=received)
+ received = received + dlen
+ try:
+ message.push(d, final=finished)
+ except (UnicodeDecodeError, ValueError):
+ raise FailedConnectionError(
+ 1007, "Text data must be valid utf-8")
+ return message
+
+ @staticmethod
+ def _pack_message(message, masked=False,
+ continuation=False, final=True, control_code=None):
+ is_text = False
+ if isinstance(message, unicode):
+ message = message.encode('utf-8')
+ is_text = True
+ length = len(message)
+ if not length:
+ # no point masking empty data
+ masked = False
+ if control_code:
+ if control_code not in (8, 9, 0xA):
+ raise ProtocolError('Unknown control opcode.')
+ if continuation or not final:
+ raise ProtocolError('Control frame cannot be a fragment.')
+ if length > 125:
+ raise ProtocolError('Control frame data too large (>125).')
+ header = struct.pack('!B', control_code | 1 << 7)
+ else:
+ opcode = 0 if continuation else (1 if is_text else 2)
+ header = struct.pack('!B', opcode | (1 << 7 if final else 0))
+ lengthdata = 1 << 7 if masked else 0
+ if length > 65535:
+ lengthdata = struct.pack('!BQ', lengthdata | 127, length)
+ elif length > 125:
+ lengthdata = struct.pack('!BH', lengthdata | 126, length)
+ else:
+ lengthdata = struct.pack('!B', lengthdata | length)
+ if masked:
+ # NOTE: RFC6455 states:
+ # A server MUST NOT mask any frames that it sends to the client
+ rand = Random(time.time())
+ mask = map(rand.getrandbits, (8, ) * 4)
+ message = RFC6455WebSocket._apply_mask(message, mask, length)
+ maskdata = struct.pack('!BBBB', *mask)
+ else:
+ maskdata = ''
+ return ''.join((header, lengthdata, maskdata, message))
+
+ def wait(self):
+ for i in self.iterator:
+ return i
+
+ def _send(self, frame):
+ self._sendlock.acquire()
+ try:
+ self.socket.sendall(frame)
+ finally:
+ self._sendlock.release()
+
+ def send(self, message, **kw):
+ kw['masked'] = self.client
+ payload = self._pack_message(message, **kw)
+ self._send(payload)
+
+ def _send_closing_frame(self, ignore_send_errors=False, close_data=None):
+ if self.version in (8, 13) and not self.websocket_closed:
+ if close_data is not None:
+ status, msg = close_data
+ if isinstance(msg, unicode):
+ msg = msg.encode('utf-8')
+ data = struct.pack('!H', status) + msg
+ else:
+ data = ''
+ try:
+ self.send(data, control_code=8)
+ except SocketError:
+ # Sometimes, like when the remote side cuts off the connection,
+ # we don't care about this.
+ if not ignore_send_errors: # pragma NO COVER
+ raise
+ self.websocket_closed = True
+
+ def close(self, close_data=None):
+ """Forcibly close the websocket; generally it is preferable to
+ return from the handler method."""
+ self._send_closing_frame(close_data=close_data)
+ self.socket.shutdown(socket.SHUT_WR)
+ self.socket.close()
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index c68f19be..3d87809b 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -32,6 +32,7 @@ if HUB_TYPE == 'eventlet':
import eventlet.semaphore
import eventlet.timeout
import eventlet.wsgi
+ from ryu.contrib._eventlet import websocket
import greenlet
import ssl
import socket
@@ -123,6 +124,8 @@ if HUB_TYPE == 'eventlet':
def serve_forever(self):
eventlet.wsgi.server(self.server, self.handle)
+ WebSocketWSGI = websocket.WebSocketWSGI
+
Timeout = eventlet.timeout.Timeout
class Event(object):