diff options
author | Satoshi Kobayashi <satoshi-k@stratosphere.co.jp> | 2014-05-22 11:23:12 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2014-05-24 10:53:06 +0900 |
commit | 41a52adaf8506bbbc1f2be007717990ebff2bba1 (patch) | |
tree | d302fde4a4e19777138411f255d529bf25c2e7d3 | |
parent | aecc7b0af53c3d8c0136373b254c1b674bcf28ef (diff) |
wsgi: add ws_topology application
- Topology change is notified
- JSON-RPC/WebSocket
Signed-off-by: Satoshi Kobayashi <satoshi-k@stratosphere.co.jp>
Reviewed-by: YAMADA Hideki <yamada.hideki@po.ntts.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/app/ws_topology.py | 105 | ||||
-rw-r--r-- | ryu/app/wsgi.py | 34 |
2 files changed, 138 insertions, 1 deletions
diff --git a/ryu/app/ws_topology.py b/ryu/app/ws_topology.py new file mode 100644 index 00000000..5679c9b8 --- /dev/null +++ b/ryu/app/ws_topology.py @@ -0,0 +1,105 @@ +# Copyright (C) 2014 Stratosphere Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Usage example + +1. Run this application: +$ ryu-manager --verbose --observe-links ryu.app.ws_topology + +2. Connect to this application by WebSocket (use your favorite client): +$ wscat -c ws://localhost:8080/v1.0/topology/ws + +3. Join switches (use your favorite method): +$ sudo mn --controller=remote --topo linear,2 + +4. Topology change is notified: +< {"params": [{"ports": [{"hw_addr": "56:c7:08:12:bb:36", "name": "s1-eth1", "port_no": "00000001", "dpid": "0000000000000001"}, {"hw_addr": "de:b9:49:24:74:3f", "name": "s1-eth2", "port_no": "00000002", "dpid": "0000000000000001"}], "dpid": "0000000000000001"}], "jsonrpc": "2.0", "method": "event_switch_enter", "id": 1} +> {"id": 1, "jsonrpc": "2.0", "result": ""} + +< {"params": [{"ports": [{"hw_addr": "56:c7:08:12:bb:36", "name": "s1-eth1", "port_no": "00000001", "dpid": "0000000000000001"}, {"hw_addr": "de:b9:49:24:74:3f", "name": "s1-eth2", "port_no": "00000002", "dpid": "0000000000000001"}], "dpid": "0000000000000001"}], "jsonrpc": "2.0", "method": "event_switch_leave", "id": 2} +> {"id": 2, "jsonrpc": "2.0", "result": ""} +... +""" + +from tinyrpc.exc import InvalidReplyError + +from ryu.app.wsgi import ( + ControllerBase, + WSGIApplication, + websocket, + WebSocketRPCClient +) +from ryu.base import app_manager +from ryu.topology import event, switches +from ryu.controller.handler import set_ev_cls + + +class WebSocketTopology(app_manager.RyuApp): + _CONTEXTS = { + 'wsgi': WSGIApplication, + 'switches': switches.Switches, + } + + def __init__(self, *args, **kwargs): + super(WebSocketTopology, self).__init__(*args, **kwargs) + + self.rpc_clients = [] + + wsgi = kwargs['wsgi'] + wsgi.register(TopologyController, {'app': self}) + + @set_ev_cls(event.EventSwitchEnter) + def _event_switch_enter_handler(self, ev): + msg = ev.switch.to_dict() + self._rpc_broadcall('event_switch_enter', msg) + + @set_ev_cls(event.EventSwitchLeave) + def _event_switch_leave_handler(self, ev): + msg = ev.switch.to_dict() + self._rpc_broadcall('event_switch_leave', msg) + + @set_ev_cls(event.EventLinkAdd) + def _event_link_add_handler(self, ev): + msg = ev.link.to_dict() + self._rpc_broadcall('event_link_add', msg) + + @set_ev_cls(event.EventLinkDelete) + def _event_link_delete_handler(self, ev): + msg = ev.link.to_dict() + self._rpc_broadcall('event_link_delete', msg) + + def _rpc_broadcall(self, func_name, msg): + for rpc_client in self.rpc_clients: + # NOTE: Although broadcasting is desired, + # RPCClient#get_proxy(one_way=False) does not work well + rpc_server = rpc_client.get_proxy() + try: + getattr(rpc_server, func_name)(msg) + except InvalidReplyError as e: + self.logger.error(e) + + +class TopologyController(ControllerBase): + + def __init__(self, req, link, data, **config): + super(TopologyController, self).__init__(req, link, data, **config) + self.app = data['app'] + + @websocket('topology', '/v1.0/topology/ws') + def _websocket_handler(self, ws): + rpc_client = WebSocketRPCClient(ws) + self.app.rpc_clients.append(rpc_client) + rpc_client.serve_forever() diff --git a/ryu/app/wsgi.py b/ryu/app/wsgi.py index 68707319..94e67f6b 100644 --- a/ryu/app/wsgi.py +++ b/ryu/app/wsgi.py @@ -27,7 +27,8 @@ from tinyrpc.server import RPCServer from tinyrpc.dispatch import RPCDispatcher from tinyrpc.dispatch import public as rpc_public from tinyrpc.protocols.jsonrpc import JSONRPCProtocol -from tinyrpc.transports import ServerTransport +from tinyrpc.transports import ServerTransport, ClientTransport +from tinyrpc.client import RPCClient CONF = cfg.CONF CONF.register_cli_opts([ @@ -159,6 +160,37 @@ class WebSocketRPCServer(RPCServer): hub.spawn(func, *args, **kwargs) +class WebSocketClientTransport(ClientTransport): + + def __init__(self, ws, queue): + self.ws = ws + self.queue = queue + + def send_message(self, message, expect_reply=True): + self.ws.send(unicode(message)) + + if expect_reply: + return self.queue.get() + + +class WebSocketRPCClient(RPCClient): + + def __init__(self, ws): + self.ws = ws + self.queue = hub.Queue() + super(WebSocketRPCClient, self).__init__( + JSONRPCProtocol(), + WebSocketClientTransport(ws, self.queue), + ) + + def serve_forever(self): + while True: + msg = self.ws.wait() + if msg is None: + break + self.queue.put(msg) + + class wsgify_hack(webob.dec.wsgify): def __call__(self, environ, start_response): self.kwargs['start_response'] = start_response |