summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/app/simple_switch_websocket_13.py30
-rw-r--r--ryu/app/wsgi.py79
2 files changed, 81 insertions, 28 deletions
diff --git a/ryu/app/simple_switch_websocket_13.py b/ryu/app/simple_switch_websocket_13.py
index fa68fc5a..a6fd989b 100644
--- a/ryu/app/simple_switch_websocket_13.py
+++ b/ryu/app/simple_switch_websocket_13.py
@@ -25,10 +25,10 @@ $ wsdump.py ws://127.0.0.1:8080/simpleswitch/ws
"""
import json
-from webob import Response
+from webob import Response
from ryu.app import simple_switch_13
-from ryu.app.wsgi import route, ControllerBase, WSGIApplication
+from ryu.app.wsgi import route, websocket, ControllerBase, WSGIApplication
from ryu.controller import ofp_event
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub
@@ -47,21 +47,19 @@ class SimpleSwitchWebSocket13(simple_switch_13.SimpleSwitch13):
def __init__(self, *args, **kwargs):
super(SimpleSwitchWebSocket13, self).__init__(*args, **kwargs)
- self.ws_send_queue = hub.Queue()
- self.ws_lock = hub.BoundedSemaphore()
-
wsgi = kwargs['wsgi']
wsgi.register(
SimpleSwitchWebSocketController,
data={simple_switch_instance_name: self},
)
+ self._ws_manager = wsgi.websocketmanager
@set_ev_cls(ofp_event.EventOFPPacketIn)
def _packet_in_handler(self, ev):
super(SimpleSwitchWebSocket13, self)._packet_in_handler(ev)
pkt = packet.Packet(ev.msg.data)
- self.ws_send_queue.put(str(pkt))
+ self._ws_manager.broadcast(str(pkt))
class SimpleSwitchWebSocketController(ControllerBase):
@@ -70,22 +68,12 @@ class SimpleSwitchWebSocketController(ControllerBase):
req, link, data, **config)
self.simpl_switch_spp = data[simple_switch_instance_name]
+ @websocket('simpleswitch', url)
def _websocket_handler(self, ws):
simple_switch = self.simpl_switch_spp
simple_switch.logger.debug('WebSocket connected: %s', ws)
while True:
- data = simple_switch.ws_send_queue.get()
- ws.send(unicode(json.dumps(data)))
-
- @route('simpleswitch', url)
- def websocket(self, req, **kwargs):
- simple_switch = self.simpl_switch_spp
- if simple_switch.ws_lock.acquire(blocking=False):
- try:
- self.websocket_handshake(req, self._websocket_handler)
- return
- finally:
- simple_switch.logger.debug('WebSocket disconnected')
- simple_switch.ws_lock.release()
- else:
- return Response(status=503)
+ msg = ws.wait()
+ if msg is None:
+ break
+ simple_switch.logger.debug('WebSocket disconnected')
diff --git a/ryu/app/wsgi.py b/ryu/app/wsgi.py
index 320f6b77..406fbab9 100644
--- a/ryu/app/wsgi.py
+++ b/ryu/app/wsgi.py
@@ -15,15 +15,15 @@
# limitations under the License.
import inspect
+from types import MethodType
-from ryu import cfg
import webob.dec
-
+from webob.response import Response
+from ryu import cfg
from ryu.lib import hub
from routes import Mapper
from routes.util import URLGenerator
-
CONF = cfg.CONF
CONF.register_cli_opts([
cfg.StrOpt('wsapi-host', default='', help='webapp listen host'),
@@ -46,12 +46,59 @@ def route(name, path, methods=None, requirements=None):
return _route
+class WebSocketRegistrationWrapper(object):
+
+ def __init__(self, func, controller):
+ self._controller = controller
+ self._controller_method = MethodType(func, controller)
+
+ def __call__(self, ws):
+ wsgi_application = self._controller.parent
+ ws_manager = wsgi_application.websocketmanager
+ ws_manager.add_connection(ws)
+ try:
+ self._controller_method(ws)
+ finally:
+ ws_manager.delete_connection(ws)
+
+
+class _AlreadyHandledResponse(Response):
+ # XXX: Eventlet API should not be used directly.
+ from eventlet.wsgi import ALREADY_HANDLED
+ _ALREADY_HANDLED = ALREADY_HANDLED
+
+ def __call__(self, environ, start_response):
+ return self._ALREADY_HANDLED
+
+
+def websocket(name, path):
+ def _websocket(controller_func):
+ def __websocket(self, req, **kwargs):
+ wrapper = WebSocketRegistrationWrapper(controller_func, self)
+ ws_wsgi = hub.WebSocketWSGI(wrapper)
+ ws_wsgi(req.environ, req.start_response)
+ # XXX: In order to prevent the writing to a already closed socket.
+ # This issue is caused by combined use:
+ # - webob.dec.wsgify()
+ # - eventlet.wsgi.HttpProtocol.handle_one_response()
+ return _AlreadyHandledResponse()
+ __websocket.routing_info = {
+ 'name': name,
+ 'path': path,
+ 'methods': None,
+ 'requirements': None,
+ }
+ return __websocket
+ return _websocket
+
+
class ControllerBase(object):
special_vars = ['action', 'controller']
def __init__(self, req, link, data, **config):
self.req = req
self.link = link
+ self.parent = None
for name, value in config.items():
setattr(self, name, value)
@@ -67,10 +114,6 @@ class ControllerBase(object):
return getattr(self, action)(req, **kwargs)
- def websocket_handshake(self, req, handler):
- ws_wsgi = hub.WebSocketWSGI(handler)
- return ws_wsgi(req.environ, req.start_response)
-
class wsgify_hack(webob.dec.wsgify):
def __call__(self, environ, start_response):
@@ -78,11 +121,28 @@ class wsgify_hack(webob.dec.wsgify):
return super(wsgify_hack, self).__call__(environ, start_response)
+class WebSocketManager(object):
+
+ def __init__(self):
+ self._connections = []
+
+ def add_connection(self, ws):
+ self._connections.append(ws)
+
+ def delete_connection(self, ws):
+ self._connections.remove(ws)
+
+ def broadcast(self, msg):
+ for connection in self._connections:
+ connection.send(msg)
+
+
class WSGIApplication(object):
def __init__(self, **config):
self.config = config
self.mapper = Mapper()
self.registory = {}
+ self._wsmanager = WebSocketManager()
super(WSGIApplication, self).__init__()
# XXX: Switch how to call the API of Routes for every version
match_argspec = inspect.getargspec(self.mapper.match)
@@ -119,6 +179,7 @@ class WSGIApplication(object):
data = self.registory[name]
controller = match['controller'](req, link, data, **self.config)
+ controller.parent = self
return controller(req)
def register(self, controller, data=None):
@@ -142,6 +203,10 @@ class WSGIApplication(object):
if data:
self.registory[controller.__name__] = data
+ @property
+ def websocketmanager(self):
+ return self._wsmanager
+
class WSGIServer(hub.WSGIServer):
def __init__(self, application, **config):