diff options
author | Satoshi Kobayashi <satoshi-k@stratosphere.co.jp> | 2014-05-19 01:51:52 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2014-05-19 14:36:28 +0900 |
commit | 7598ef056193f02b4cedc452774a0795d6df6c29 (patch) | |
tree | 76b9a0b3fe9d833821b5e615bc57003e5aa98575 | |
parent | eb0055f0ce1bff85ecfbe63ac32cebd03436a97c (diff) |
New API for WebSocket support
- API
- @websocket decorator
- WSGIApplication#websocketmanager()
- bugfix
- Even if a connection is cut, it continues remaining
- remove restriction
- Two or more connections can be accepted
Signed-off-by: Satoshi Kobayashi <satoshi-k@stratosphere.co.jp>
Reviewed-by: YAMADA Hideki <yamada.hideki@po.ntts.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/app/simple_switch_websocket_13.py | 30 | ||||
-rw-r--r-- | ryu/app/wsgi.py | 79 |
2 files changed, 81 insertions, 28 deletions
diff --git a/ryu/app/simple_switch_websocket_13.py b/ryu/app/simple_switch_websocket_13.py index fa68fc5a..a6fd989b 100644 --- a/ryu/app/simple_switch_websocket_13.py +++ b/ryu/app/simple_switch_websocket_13.py @@ -25,10 +25,10 @@ $ wsdump.py ws://127.0.0.1:8080/simpleswitch/ws """ import json -from webob import Response +from webob import Response from ryu.app import simple_switch_13 -from ryu.app.wsgi import route, ControllerBase, WSGIApplication +from ryu.app.wsgi import route, websocket, ControllerBase, WSGIApplication from ryu.controller import ofp_event from ryu.controller.handler import set_ev_cls from ryu.lib import hub @@ -47,21 +47,19 @@ class SimpleSwitchWebSocket13(simple_switch_13.SimpleSwitch13): def __init__(self, *args, **kwargs): super(SimpleSwitchWebSocket13, self).__init__(*args, **kwargs) - self.ws_send_queue = hub.Queue() - self.ws_lock = hub.BoundedSemaphore() - wsgi = kwargs['wsgi'] wsgi.register( SimpleSwitchWebSocketController, data={simple_switch_instance_name: self}, ) + self._ws_manager = wsgi.websocketmanager @set_ev_cls(ofp_event.EventOFPPacketIn) def _packet_in_handler(self, ev): super(SimpleSwitchWebSocket13, self)._packet_in_handler(ev) pkt = packet.Packet(ev.msg.data) - self.ws_send_queue.put(str(pkt)) + self._ws_manager.broadcast(str(pkt)) class SimpleSwitchWebSocketController(ControllerBase): @@ -70,22 +68,12 @@ class SimpleSwitchWebSocketController(ControllerBase): req, link, data, **config) self.simpl_switch_spp = data[simple_switch_instance_name] + @websocket('simpleswitch', url) def _websocket_handler(self, ws): simple_switch = self.simpl_switch_spp simple_switch.logger.debug('WebSocket connected: %s', ws) while True: - data = simple_switch.ws_send_queue.get() - ws.send(unicode(json.dumps(data))) - - @route('simpleswitch', url) - def websocket(self, req, **kwargs): - simple_switch = self.simpl_switch_spp - if simple_switch.ws_lock.acquire(blocking=False): - try: - self.websocket_handshake(req, self._websocket_handler) - return - finally: - simple_switch.logger.debug('WebSocket disconnected') - simple_switch.ws_lock.release() - else: - return Response(status=503) + msg = ws.wait() + if msg is None: + break + simple_switch.logger.debug('WebSocket disconnected') diff --git a/ryu/app/wsgi.py b/ryu/app/wsgi.py index 320f6b77..406fbab9 100644 --- a/ryu/app/wsgi.py +++ b/ryu/app/wsgi.py @@ -15,15 +15,15 @@ # limitations under the License. import inspect +from types import MethodType -from ryu import cfg import webob.dec - +from webob.response import Response +from ryu import cfg from ryu.lib import hub from routes import Mapper from routes.util import URLGenerator - CONF = cfg.CONF CONF.register_cli_opts([ cfg.StrOpt('wsapi-host', default='', help='webapp listen host'), @@ -46,12 +46,59 @@ def route(name, path, methods=None, requirements=None): return _route +class WebSocketRegistrationWrapper(object): + + def __init__(self, func, controller): + self._controller = controller + self._controller_method = MethodType(func, controller) + + def __call__(self, ws): + wsgi_application = self._controller.parent + ws_manager = wsgi_application.websocketmanager + ws_manager.add_connection(ws) + try: + self._controller_method(ws) + finally: + ws_manager.delete_connection(ws) + + +class _AlreadyHandledResponse(Response): + # XXX: Eventlet API should not be used directly. + from eventlet.wsgi import ALREADY_HANDLED + _ALREADY_HANDLED = ALREADY_HANDLED + + def __call__(self, environ, start_response): + return self._ALREADY_HANDLED + + +def websocket(name, path): + def _websocket(controller_func): + def __websocket(self, req, **kwargs): + wrapper = WebSocketRegistrationWrapper(controller_func, self) + ws_wsgi = hub.WebSocketWSGI(wrapper) + ws_wsgi(req.environ, req.start_response) + # XXX: In order to prevent the writing to a already closed socket. + # This issue is caused by combined use: + # - webob.dec.wsgify() + # - eventlet.wsgi.HttpProtocol.handle_one_response() + return _AlreadyHandledResponse() + __websocket.routing_info = { + 'name': name, + 'path': path, + 'methods': None, + 'requirements': None, + } + return __websocket + return _websocket + + class ControllerBase(object): special_vars = ['action', 'controller'] def __init__(self, req, link, data, **config): self.req = req self.link = link + self.parent = None for name, value in config.items(): setattr(self, name, value) @@ -67,10 +114,6 @@ class ControllerBase(object): return getattr(self, action)(req, **kwargs) - def websocket_handshake(self, req, handler): - ws_wsgi = hub.WebSocketWSGI(handler) - return ws_wsgi(req.environ, req.start_response) - class wsgify_hack(webob.dec.wsgify): def __call__(self, environ, start_response): @@ -78,11 +121,28 @@ class wsgify_hack(webob.dec.wsgify): return super(wsgify_hack, self).__call__(environ, start_response) +class WebSocketManager(object): + + def __init__(self): + self._connections = [] + + def add_connection(self, ws): + self._connections.append(ws) + + def delete_connection(self, ws): + self._connections.remove(ws) + + def broadcast(self, msg): + for connection in self._connections: + connection.send(msg) + + class WSGIApplication(object): def __init__(self, **config): self.config = config self.mapper = Mapper() self.registory = {} + self._wsmanager = WebSocketManager() super(WSGIApplication, self).__init__() # XXX: Switch how to call the API of Routes for every version match_argspec = inspect.getargspec(self.mapper.match) @@ -119,6 +179,7 @@ class WSGIApplication(object): data = self.registory[name] controller = match['controller'](req, link, data, **self.config) + controller.parent = self return controller(req) def register(self, controller, data=None): @@ -142,6 +203,10 @@ class WSGIApplication(object): if data: self.registory[controller.__name__] = data + @property + def websocketmanager(self): + return self._wsmanager + class WSGIServer(hub.WSGIServer): def __init__(self, application, **config): |