summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorYAMAMOTO Takashi <yamamoto@valinux.co.jp>2013-04-25 16:05:49 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2013-05-01 22:18:39 +0900
commita01972e18c8b7fe09bae45b43af8d234df96661a (patch)
tree3f6be91168ea196fed8c400e46c96db6326a193c
parent8fcbebb84993d1605782b44eee33027b4eb06dab (diff)
add a threading hub module
this provides gevent-like api using eventlet. Signed-off-by: YAMAMOTO Takashi <yamamoto@valinux.co.jp> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/lib/hub.py118
-rw-r--r--ryu/tests/unit/lib/test_hub.py201
2 files changed, 319 insertions, 0 deletions
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
new file mode 100644
index 00000000..02dae4d8
--- /dev/null
+++ b/ryu/lib/hub.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+
+
+# we don't bother to use cfg.py because monkey patch needs to be
+# called very early. instead, we use an environment variable to
+# select the type of hub.
+HUB_TYPE = os.getenv('RYU_HUB_TYPE', 'eventlet')
+
+LOG = logging.getLogger('ryu.lib.hub')
+
+if HUB_TYPE == 'eventlet':
+ import eventlet
+ import eventlet.event
+ import eventlet.queue
+ import eventlet.timeout
+ import eventlet.wsgi
+ import ssl
+ import traceback
+
+ getcurrent = eventlet.getcurrent
+ patch = eventlet.monkey_patch
+ sleep = eventlet.sleep
+
+ def spawn(*args, **kwargs):
+ def _launch(func, *args, **kwargs):
+ # mimic gevent's default raise_error=False behaviour
+ # by not propergating an exception to the joiner.
+ try:
+ func(*args, **kwargs)
+ except:
+ LOG.error('hub: uncaught exception: %s',
+ traceback.format_exc())
+
+ return eventlet.spawn(_launch, *args, **kwargs)
+
+ def kill(thread):
+ thread.kill()
+
+ def joinall(threads):
+ for t in threads:
+ t.wait()
+
+ Queue = eventlet.queue.Queue
+ QueueEmpty = eventlet.queue.Empty
+
+ class StreamServer(object):
+ def __init__(self, listen_info, handle=None, backlog=None,
+ spawn='default', **ssl_args):
+ assert backlog is None
+ assert spawn == 'default'
+ self.server = eventlet.listen(listen_info)
+ if ssl_args:
+ def wrap_and_handle(sock, addr):
+ ssl_args.setdefault('server_side', True)
+ handle(ssl.wrap_socket(sock, **ssl_args), addr)
+
+ self.handle = wrap_and_handle
+ else:
+ self.handle = handle
+
+ def serve_forever(self):
+ while True:
+ sock, addr = self.server.accept()
+ spawn(self.handle, sock, addr)
+
+ class WSGIServer(StreamServer):
+ def serve_forever(self):
+ eventlet.wsgi.server(self.server, self.handle)
+
+ Timeout = eventlet.timeout.Timeout
+
+ class Event(object):
+ def __init__(self):
+ self._ev = eventlet.event.Event()
+ self._cond = False
+
+ def _wait(self, timeout=None):
+ while not self._cond:
+ self._ev.wait()
+
+ def _broadcast(self):
+ self._ev.send()
+ # because eventlet Event doesn't allow mutiple send() on an event,
+ # re-create the underlying event.
+ # note: _ev.reset() is obsolete.
+ self._ev = eventlet.event.Event()
+
+ def set(self):
+ self._cond = True
+ self._broadcast()
+
+ def clear(self):
+ self._cond = False
+
+ def wait(self, timeout=None):
+ if timeout is None:
+ self._wait()
+ with Timeout(timeout):
+ self._wait()
diff --git a/ryu/tests/unit/lib/test_hub.py b/ryu/tests/unit/lib/test_hub.py
new file mode 100644
index 00000000..1b8166c3
--- /dev/null
+++ b/ryu/tests/unit/lib/test_hub.py
@@ -0,0 +1,201 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import unittest
+from nose.tools import raises
+
+from ryu.lib import hub
+hub.patch()
+
+
+class MyException(BaseException):
+ pass
+
+
+class Test_hub(unittest.TestCase):
+ """ Test case for ryu.lib.hub
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ # we want to test timeout first because the rest of tests rely on it.
+ # thus test_0_ prefix.
+
+ @raises(hub.Timeout)
+ def test_0_timeout1(self):
+ with hub.Timeout(0.1):
+ hub.sleep(1)
+
+ @raises(MyException)
+ def test_0_timeout2(self):
+ with hub.Timeout(0.1, MyException):
+ hub.sleep(1)
+
+ def test_0_timeout3(self):
+ with hub.Timeout(1):
+ hub.sleep(0.1)
+ # sleep some more to ensure timer cancelation
+ hub.sleep(2)
+
+ def test_spawn_event1(self):
+ def _child(ev, result):
+ hub.sleep(1)
+ result.append(1)
+ ev.set()
+
+ ev = hub.Event()
+ result = []
+ with hub.Timeout(2):
+ hub.spawn(_child, ev, result)
+ ev.wait()
+ assert len(result) == 1
+
+ def test_spawn_event2(self):
+ def _child(ev, result):
+ hub.sleep(1)
+ result.append(1)
+ ev.set()
+
+ ev = hub.Event()
+ result = []
+ with hub.Timeout(2):
+ hub.spawn(_child, ev, result)
+ try:
+ ev.wait(timeout=0.5)
+ raise BaseException("should timed out")
+ except hub.Timeout:
+ pass
+ assert len(result) == 0
+
+ def test_spawn_event3(self):
+ def _child(ev, ev2, result):
+ ev2.wait()
+ hub.sleep(0.5)
+ result.append(1)
+ ev.set()
+
+ ev = hub.Event()
+ ev2 = hub.Event()
+ result = []
+ with hub.Timeout(2):
+ hub.spawn(_child, ev, ev2, result)
+ hub.spawn(_child, ev, ev2, result)
+ hub.sleep(0.5)
+ ev2.set() # this should wake up the above created two threads
+ ev.wait(timeout=1)
+ assert len(result) == 2
+
+ def test_spawn_select1(self):
+ import select
+ import socket
+
+ def _child(s1):
+ hub.sleep(0.5)
+ s1.send("hoge")
+
+ s1, s2 = socket.socketpair()
+ with hub.Timeout(1):
+ hub.spawn(_child, s1)
+ select.select([s2.fileno()], [], [])
+ select.select([s2.fileno()], [], []) # return immediately
+
+ @raises(MyException)
+ def test_select1(self):
+ import select
+ import socket
+
+ s1, s2 = socket.socketpair()
+ with hub.Timeout(1, MyException):
+ select.select([s2.fileno()], [], [])
+
+ def test_select2(self):
+ import select
+
+ with hub.Timeout(1, MyException):
+ select.select([], [], [], 0) # timeout immediately
+
+ def test_select3(self):
+ import select
+ import socket
+
+ s1, s2 = socket.socketpair()
+ with hub.Timeout(1, MyException):
+ list = [s1.fileno(), s2.fileno()]
+ rlist, wlist, xlist = select.select(list, list, list)
+ assert not s1.fileno() in rlist
+ assert not s2.fileno() in rlist
+ # the following two assertions are commented out because one of
+ # them fails with eventlet-patched select.
+ # assert s1.fileno() in wlist
+ # assert s2.fileno() in wlist
+ # note: eventlet-patched select returns at most one file.
+ assert (s1.fileno() in wlist) or (s2.fileno() in wlist)
+ assert not s1.fileno() in xlist
+ assert not s2.fileno() in xlist
+
+ def test_spawn_joinall(self):
+ def _child(ev2, result):
+ ev2.wait()
+ hub.sleep(0.5)
+ result.append(1)
+ raise BaseException("this exception should not be propagated")
+
+ ev2 = hub.Event()
+ threads = []
+ result = []
+ with hub.Timeout(2):
+ threads.append(hub.spawn(_child, ev2, result))
+ threads.append(hub.spawn(_child, ev2, result))
+ hub.sleep(0.5)
+ ev2.set() # this should wake up the above created two threads
+ hub.joinall(threads)
+ assert len(result) == 2
+
+ def test_spawn_kill_joinall(self):
+ def _child(ev2, result):
+ ev2.wait()
+ result.append(1)
+
+ ev2 = hub.Event()
+ threads = []
+ result = []
+ with hub.Timeout(2):
+ threads.append(hub.spawn(_child, ev2, result))
+ threads.append(hub.spawn(_child, ev2, result))
+ hub.sleep(0.5)
+ for t in threads:
+ hub.kill(t)
+ hub.joinall(threads)
+ assert len(result) == 0
+
+ def test_event1(self):
+ ev = hub.Event()
+ ev.set()
+ with hub.Timeout(1):
+ ev.wait() # should return immediately
+
+ def test_event2(self):
+ ev = hub.Event()
+ # allow multiple sets unlike eventlet Event
+ ev.set()
+ ev.set()