diff options
authorFUJITA Tomonori <>2015-06-23 05:33:56 +0900
committerFUJITA Tomonori <>2015-06-23 14:55:51 +0900
commitf65ecadfaaba12d494610c33c0408fd64ed0f737 (patch)
parent4ee2f054d22363227aca2686a47a416731fa40bc (diff)
eventlet has already supported WebSocket(RFC6455) in stable releases (>= 0.15)
Signed-off-by: Satoshi Kobayashi <> Signed-off-by: FUJITA Tomonori <>
4 files changed, 2 insertions, 658 deletions
diff --git a/ryu/contrib/_eventlet/ b/ryu/contrib/_eventlet/
deleted file mode 100644
index e69de29b..00000000
--- a/ryu/contrib/_eventlet/
+++ /dev/null
diff --git a/ryu/contrib/_eventlet/ b/ryu/contrib/_eventlet/
deleted file mode 100644
index 4a07c8d8..00000000
--- a/ryu/contrib/_eventlet/
+++ /dev/null
@@ -1,656 +0,0 @@
-import base64
-import codecs
-import collections
-import errno
-from random import Random
-import string
-import struct
-import sys
-import time
-from socket import error as SocketError
- from hashlib import md5, sha1
-except ImportError: #pragma NO COVER
- from md5 import md5
- from sha import sha as sha1
-import eventlet
-from eventlet import semaphore
-from eventlet import wsgi
-from import socket
-from import get_errno
-# Python 2's utf8 decoding is more lenient than we'd like
-# In order to pass autobahn's testsuite we need stricter validation
-# if available...
-for _mod in ('wsaccel.utf8validator', 'autobahn.utf8validator'):
- # autobahn has it's own python-based validator. in newest versions
- # this prefers to use wsaccel, a cython based implementation, if available.
- # wsaccel may also be installed w/out autobahn, or with a earlier version.
- try:
- utf8validator = __import__(_mod, {}, {}, [''])
- except ImportError:
- utf8validator = None
- else:
- break
-__all__ = ["WebSocketWSGI", "WebSocket"]
-PROTOCOL_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
-VALID_CLOSE_STATUS = (range(1000, 1004)
- + range(1007, 1012)
- # 3000-3999: reserved for use by libraries, frameworks,
- # and applications
- + range(3000, 4000)
- # 4000-4999: reserved for private use and thus can't
- # be registered
- + range(4000, 5000))
-class BadRequest(Exception):
- def __init__(self, status='400 Bad Request', body=None, headers=None):
- super(Exception, self).__init__()
- self.status = status
- self.body = body
- self.headers = headers
-class WebSocketWSGI(object):
- """Wraps a websocket handler function in a WSGI application.
- Use it like this::
- @websocket.WebSocketWSGI
- def my_handler(ws):
- from_browser = ws.wait()
- ws.send("from server")
- The single argument to the function will be an instance of
- :class:`WebSocket`. To close the socket, simply return from the
- function. Note that the server will log the websocket request at
- the time of closure.
- """
- def __init__(self, handler):
- self.handler = handler
- self.protocol_version = None
- self.support_legacy_versions = True
- self.supported_protocols = []
- self.origin_checker = None
- @classmethod
- def configured(cls,
- handler=None,
- supported_protocols=None,
- origin_checker=None,
- support_legacy_versions=False):
- def decorator(handler):
- inst = cls(handler)
- inst.support_legacy_versions = support_legacy_versions
- inst.origin_checker = origin_checker
- if supported_protocols:
- inst.supported_protocols = supported_protocols
- return inst
- if handler is None:
- return decorator
- return decorator(handler)
- def __call__(self, environ, start_response):
- http_connection_parts = [
- part.strip()
- for part in environ.get('HTTP_CONNECTION', '').lower().split(',')]
- if not ('upgrade' in http_connection_parts and
- environ.get('HTTP_UPGRADE', '').lower() == 'websocket'):
- # need to check a few more things here for true compliance
- start_response('400 Bad Request', [('Connection', 'close')])
- return []
- try:
- ws = self._handle_hybi_request(environ)
- elif self.support_legacy_versions:
- ws = self._handle_legacy_request(environ)
- else:
- raise BadRequest()
- except BadRequest as e:
- status = e.status
- body = e.body or ''
- headers = e.headers or []
- start_response(status,
- [('Connection', 'close'), ] + headers)
- return [body]
- try:
- self.handler(ws)
- except socket.error as e:
- if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS:
- raise
- # Make sure we send the closing frame
- ws._send_closing_frame(True)
- # use this undocumented feature of eventlet.wsgi to ensure that it
- # doesn't barf on the fact that we didn't call start_response
- return wsgi.ALREADY_HANDLED
- def _handle_legacy_request(self, environ):
- sock = environ['eventlet.input'].get_socket()
- if 'HTTP_SEC_WEBSOCKET_KEY1' in environ:
- self.protocol_version = 76
- if 'HTTP_SEC_WEBSOCKET_KEY2' not in environ:
- raise BadRequest()
- else:
- self.protocol_version = 75
- if self.protocol_version == 76:
- key1 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY1'])
- key2 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY2'])
- # There's no content-length header in the request, but it has 8
- # bytes of data.
- environ['wsgi.input'].content_length = 8
- key3 = environ['wsgi.input'].read(8)
- key = struct.pack(">II", key1, key2) + key3
- response = md5(key).digest()
- # Start building the response
- scheme = 'ws'
- if environ.get('wsgi.url_scheme') == 'https':
- scheme = 'wss'
- location = '%s://%s%s%s' % (
- scheme,
- environ.get('HTTP_HOST'),
- environ.get('SCRIPT_NAME'),
- environ.get('PATH_INFO')
- )
- qs = environ.get('QUERY_STRING')
- if qs is not None:
- location += '?' + qs
- if self.protocol_version == 75:
- handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
- "Upgrade: WebSocket\r\n"
- "Connection: Upgrade\r\n"
- "WebSocket-Origin: %s\r\n"
- "WebSocket-Location: %s\r\n\r\n" % (
- environ.get('HTTP_ORIGIN'),
- location))
- elif self.protocol_version == 76:
- handshake_reply = ("HTTP/1.1 101 WebSocket Protocol Handshake\r\n"
- "Upgrade: WebSocket\r\n"
- "Connection: Upgrade\r\n"
- "Sec-WebSocket-Origin: %s\r\n"
- "Sec-WebSocket-Protocol: %s\r\n"
- "Sec-WebSocket-Location: %s\r\n"
- "\r\n%s" % (
- environ.get('HTTP_ORIGIN'),
- environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'default'),
- location,
- response))
- else: #pragma NO COVER
- raise ValueError("Unknown WebSocket protocol version.")
- sock.sendall(handshake_reply)
- return WebSocket(sock, environ, self.protocol_version)
- def _handle_hybi_request(self, environ):
- sock = environ['eventlet.input'].get_socket()
- hybi_version = environ['HTTP_SEC_WEBSOCKET_VERSION']
- if hybi_version not in ('8', '13', ):
- raise BadRequest(status='426 Upgrade Required',
- headers=[('Sec-WebSocket-Version', '8, 13')])
- self.protocol_version = int(hybi_version)
- if 'HTTP_SEC_WEBSOCKET_KEY' not in environ:
- # That's bad.
- raise BadRequest()
- origin = environ.get(
- (environ.get('HTTP_SEC_WEBSOCKET_ORIGIN', '')
- if self.protocol_version <= 8 else ''))
- if self.origin_checker is not None:
- if not self.origin_checker(environ.get('HTTP_HOST'), origin):
- raise BadRequest(status='403 Forbidden')
- protocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', None)
- negotiated_protocol = None
- if protocols:
- for p in (i.strip() for i in protocols.split(',')):
- if p in self.supported_protocols:
- negotiated_protocol = p
- break
- #extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS', None)
- #if extensions:
- # extensions = [i.strip() for i in extensions.split(',')]
- key = environ['HTTP_SEC_WEBSOCKET_KEY']
- response = base64.b64encode(sha1(key + PROTOCOL_GUID).digest())
- handshake_reply = ["HTTP/1.1 101 Switching Protocols",
- "Upgrade: websocket",
- "Connection: Upgrade",
- "Sec-WebSocket-Accept: %s" % (response, )]
- if negotiated_protocol:
- handshake_reply.append("Sec-WebSocket-Protocol: %s"
- % (negotiated_protocol, ))
- sock.sendall('\r\n'.join(handshake_reply) + '\r\n\r\n')
- return RFC6455WebSocket(sock, environ, self.protocol_version,
- protocol=negotiated_protocol)
- def _extract_number(self, value):
- """
- Utility function which, given a string like 'g98sd 5[]221@1', will
- return 9852211. Used to parse the Sec-WebSocket-Key headers.
- """
- out = ""
- spaces = 0
- for char in value:
- if char in string.digits:
- out += char
- elif char == " ":
- spaces += 1
- return int(out) / spaces
-class WebSocket(object):
- """A websocket object that handles the details of
- serialization/deserialization to the socket.
- The primary way to interact with a :class:`WebSocket` object is to
- call :meth:`send` and :meth:`wait` in order to pass messages back
- and forth with the browser. Also available are the following
- properties:
- path
- The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient.
- protocol
- The value of the Websocket-Protocol header.
- origin
- The value of the 'Origin' header.
- environ
- The full WSGI environment for this request.
- """
- def __init__(self, sock, environ, version=76):
- """
- :param socket: The eventlet socket
- :type socket: :class:`eventlet.greenio.GreenSocket`
- :param environ: The wsgi environment
- :param version: The WebSocket spec version to follow (default is 76)
- """
- self.socket = sock
- self.origin = environ.get('HTTP_ORIGIN')
- self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL')
- self.path = environ.get('PATH_INFO')
- self.environ = environ
- self.version = version
- self.websocket_closed = False
- self._buf = ""
- self._msgs = collections.deque()
- self._sendlock = semaphore.Semaphore()
- @staticmethod
- def _pack_message(message):
- """Pack the message inside ``00`` and ``FF``
- As per the dataframing section (5.3) for the websocket spec
- """
- if isinstance(message, unicode):
- message = message.encode('utf-8')
- elif not isinstance(message, str):
- message = str(message)
- packed = "\x00%s\xFF" % message
- return packed
- def _parse_messages(self):
- """ Parses for messages in the buffer *buf*. It is assumed that
- the buffer contains the start character for a message, but that it
- may contain only part of the rest of the message.
- Returns an array of messages, and the buffer remainder that
- didn't contain any full messages."""
- msgs = []
- end_idx = 0
- buf = self._buf
- while buf:
- frame_type = ord(buf[0])
- if frame_type == 0:
- # Normal message.
- end_idx = buf.find("\xFF")
- if end_idx == -1: #pragma NO COVER
- break
- msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
- buf = buf[end_idx+1:]
- elif frame_type == 255:
- # Closing handshake.
- assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf
- self.websocket_closed = True
- break
- else:
- raise ValueError("Don't understand how to parse this type of message: %r" % buf)
- self._buf = buf
- return msgs
- def send(self, message):
- """Send a message to the browser.
- *message* should be convertable to a string; unicode objects should be
- encodable as utf-8. Raises socket.error with errno of 32
- (broken pipe) if the socket has already been closed by the client."""
- packed = self._pack_message(message)
- # if two greenthreads are trying to send at the same time
- # on the same socket, sendlock prevents interleaving and corruption
- self._sendlock.acquire()
- try:
- self.socket.sendall(packed)
- finally:
- self._sendlock.release()
- def wait(self):
- """Waits for and deserializes messages.
- Returns a single message; the oldest not yet processed. If the client
- has already closed the connection, returns None. This is different
- from normal socket behavior because the empty string is a valid
- websocket message."""
- while not self._msgs:
- # Websocket might be closed already.
- if self.websocket_closed:
- return None
- # no parsed messages, must mean buf needs more data
- delta = self.socket.recv(8096)
- if delta == '':
- return None
- self._buf += delta
- msgs = self._parse_messages()
- self._msgs.extend(msgs)
- return self._msgs.popleft()
- def _send_closing_frame(self, ignore_send_errors=False):
- """Sends the closing frame to the client, if required."""
- if self.version == 76 and not self.websocket_closed:
- try:
- self.socket.sendall("\xff\x00")
- except SocketError:
- # Sometimes, like when the remote side cuts off the connection,
- # we don't care about this.
- if not ignore_send_errors: #pragma NO COVER
- raise
- self.websocket_closed = True
- def close(self):
- """Forcibly close the websocket; generally it is preferable to
- return from the handler method."""
- self._send_closing_frame()
- self.socket.shutdown(True)
- self.socket.close()
-class ConnectionClosedError(Exception):
- pass
-class FailedConnectionError(Exception):
- def __init__(self, status, message):
- super(FailedConnectionError, self).__init__(status, message)
- self.message = message
- self.status = status
-class ProtocolError(ValueError):
- pass
-class RFC6455WebSocket(WebSocket):
- def __init__(self, sock, environ, version=13, protocol=None, client=False):
- super(RFC6455WebSocket, self).__init__(sock, environ, version)
- self.iterator = self._iter_frames()
- self.client = client
- self.protocol = protocol
- class UTF8Decoder(object):
- def __init__(self):
- if utf8validator:
- self.validator = utf8validator.Utf8Validator()
- else:
- self.validator = None
- decoderclass = codecs.getincrementaldecoder('utf8')
- self.decoder = decoderclass()
- def reset(self):
- if self.validator:
- self.validator.reset()
- self.decoder.reset()
- def decode(self, data, final=False):
- if self.validator:
- valid, eocp, c_i, t_i = self.validator.validate(data)
- if not valid:
- raise ValueError('Data is not valid unicode')
- return self.decoder.decode(data, final)
- def _get_bytes(self, numbytes):
- data = ''
- while len(data) < numbytes:
- d = self.socket.recv(numbytes - len(data))
- if not d:
- raise ConnectionClosedError()
- data = data + d
- return data
- class Message(object):
- def __init__(self, opcode, decoder=None):
- self.decoder = decoder
- = []
- self.finished = False
- self.opcode = opcode
- def push(self, data, final=False):
- if self.decoder:
- data = self.decoder.decode(data, final=final)
- self.finished = final
- def getvalue(self):
- return ''.join(
- @staticmethod
- def _apply_mask(data, mask, length=None, offset=0):
- if length is None:
- length = len(data)
- cnt = range(length)
- return ''.join(chr(ord(data[i]) ^ mask[(offset + i) % 4]) for i in cnt)
- def _handle_control_frame(self, opcode, data):
- if opcode == 8: # connection close
- if not data:
- status = 1000
- elif len(data) > 1:
- status = struct.unpack_from('!H', data)[0]
- if not status or status not in VALID_CLOSE_STATUS:
- raise FailedConnectionError(
- 1002,
- "Unexpected close status code.")
- try:
- data = self.UTF8Decoder().decode(data[2:], True)
- except (UnicodeDecodeError, ValueError):
- raise FailedConnectionError(
- 1002,
- "Close message data should be valid UTF-8.")
- else:
- status = 1002
- self.close(close_data=(status, ''))
- raise ConnectionClosedError()
- elif opcode == 9: # ping
- self.send(data, control_code=0xA)
- elif opcode == 0xA: # pong
- pass
- else:
- raise FailedConnectionError(
- 1002, "Unknown control frame received.")
- def _iter_frames(self):
- fragmented_message = None
- try:
- while True:
- message = self._recv_frame(message=fragmented_message)
- if message.opcode & 8:
- self._handle_control_frame(
- message.opcode, message.getvalue())
- continue
- if fragmented_message and message is not fragmented_message:
- raise RuntimeError('Unexpected message change.')
- fragmented_message = message
- if message.finished:
- data = fragmented_message.getvalue()
- fragmented_message = None
- yield data
- except FailedConnectionError:
- exc_typ, exc_val, exc_tb = sys.exc_info()
- self.close(close_data=(exc_val.status, exc_val.message))
- except ConnectionClosedError:
- return
- except Exception:
- self.close(close_data=(1011, 'Internal Server Error'))
- raise
- def _recv_frame(self, message=None):
- recv = self._get_bytes
- header = recv(2)
- a, b = struct.unpack('!BB', header)
- finished = a >> 7 == 1
- rsv123 = a >> 4 & 7
- if rsv123:
- # must be zero
- raise FailedConnectionError(
- 1002,
- "RSV1, RSV2, RSV3: MUST be 0 unless an extension is"
- " negotiated that defines meanings for non-zero values.")
- opcode = a & 15
- if opcode not in (0, 1, 2, 8, 9, 0xA):
- raise FailedConnectionError(1002, "Unknown opcode received.")
- masked = b & 128 == 128
- if not masked and not self.client:
- raise FailedConnectionError(1002, "A client MUST mask all frames"
- " that it sends to the server")
- length = b & 127
- if opcode & 8:
- if not finished:
- raise FailedConnectionError(1002, "Control frames must not"
- " be fragmented.")
- if length > 125:
- raise FailedConnectionError(
- 1002,
- "All control frames MUST have a payload length of 125"
- " bytes or less")
- elif opcode and message:
- raise FailedConnectionError(
- 1002,
- "Received a non-continuation opcode within"
- " fragmented message.")
- elif not opcode and not message:
- raise FailedConnectionError(
- 1002,
- "Received continuation opcode with no previous"
- " fragments received.")
- if length == 126:
- length = struct.unpack('!H', recv(2))[0]
- elif length == 127:
- length = struct.unpack('!Q', recv(8))[0]
- if masked:
- mask = struct.unpack('!BBBB', recv(4))
- received = 0
- if not message or opcode & 8:
- decoder = self.UTF8Decoder() if opcode == 1 else None
- message = self.Message(opcode, decoder=decoder)
- if not length:
- message.push('', final=finished)
- else:
- while received < length:
- d = self.socket.recv(length - received)
- if not d:
- raise ConnectionClosedError()
- dlen = len(d)
- if masked:
- d = self._apply_mask(d, mask, length=dlen, offset=received)
- received = received + dlen
- try:
- message.push(d, final=finished)
- except (UnicodeDecodeError, ValueError):
- raise FailedConnectionError(
- 1007, "Text data must be valid utf-8")
- return message
- @staticmethod
- def _pack_message(message, masked=False,
- continuation=False, final=True, control_code=None):
- is_text = False
- if isinstance(message, unicode):
- message = message.encode('utf-8')
- is_text = True
- length = len(message)
- if not length:
- # no point masking empty data
- masked = False
- if control_code:
- if control_code not in (8, 9, 0xA):
- raise ProtocolError('Unknown control opcode.')
- if continuation or not final:
- raise ProtocolError('Control frame cannot be a fragment.')
- if length > 125:
- raise ProtocolError('Control frame data too large (>125).')
- header = struct.pack('!B', control_code | 1 << 7)
- else:
- opcode = 0 if continuation else (1 if is_text else 2)
- header = struct.pack('!B', opcode | (1 << 7 if final else 0))
- lengthdata = 1 << 7 if masked else 0
- if length > 65535:
- lengthdata = struct.pack('!BQ', lengthdata | 127, length)
- elif length > 125:
- lengthdata = struct.pack('!BH', lengthdata | 126, length)
- else:
- lengthdata = struct.pack('!B', lengthdata | length)
- if masked:
- # NOTE: RFC6455 states:
- # A server MUST NOT mask any frames that it sends to the client
- rand = Random(time.time())
- mask = map(rand.getrandbits, (8, ) * 4)
- message = RFC6455WebSocket._apply_mask(message, mask, length)
- maskdata = struct.pack('!BBBB', *mask)
- else:
- maskdata = ''
- return ''.join((header, lengthdata, maskdata, message))
- def wait(self):
- for i in self.iterator:
- return i
- def _send(self, frame):
- self._sendlock.acquire()
- try:
- self.socket.sendall(frame)
- finally:
- self._sendlock.release()
- def send(self, message, **kw):
- kw['masked'] = self.client
- payload = self._pack_message(message, **kw)
- self._send(payload)
- def _send_closing_frame(self, ignore_send_errors=False, close_data=None):
- if self.version in (8, 13) and not self.websocket_closed:
- if close_data is not None:
- status, msg = close_data
- if isinstance(msg, unicode):
- msg = msg.encode('utf-8')
- data = struct.pack('!H', status) + msg
- else:
- data = ''
- try:
- self.send(data, control_code=8)
- except SocketError:
- # Sometimes, like when the remote side cuts off the connection,
- # we don't care about this.
- if not ignore_send_errors: # pragma NO COVER
- raise
- self.websocket_closed = True
- def close(self, close_data=None):
- """Forcibly close the websocket; generally it is preferable to
- return from the handler method."""
- self._send_closing_frame(close_data=close_data)
- self.socket.shutdown(socket.SHUT_WR)
- self.socket.close()
diff --git a/ryu/lib/ b/ryu/lib/
index 836b9242..56211479 100644
--- a/ryu/lib/
+++ b/ryu/lib/
@@ -32,7 +32,7 @@ if HUB_TYPE == 'eventlet':
import eventlet.semaphore
import eventlet.timeout
import eventlet.wsgi
- from ryu.contrib._eventlet import websocket
+ from eventlet import websocket
import greenlet
import ssl
import socket
diff --git a/tools/pip-requires b/tools/pip-requires
index 96eba22c..567600f1 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -1,4 +1,4 @@
msgpack-python>=0.3.0 # RPC library, BGP speaker(net_cntl)