summaryrefslogtreecommitdiffhomepage
path: root/tests/unit/lib/test_hub.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/lib/test_hub.py')
-rw-r--r--tests/unit/lib/test_hub.py239
1 files changed, 239 insertions, 0 deletions
diff --git a/tests/unit/lib/test_hub.py b/tests/unit/lib/test_hub.py
new file mode 100644
index 00000000..d8b75996
--- /dev/null
+++ b/tests/unit/lib/test_hub.py
@@ -0,0 +1,239 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import unittest
+from nose.tools import raises
+
+from ryu.lib import hub
+hub.patch()
+
+
+class MyException(BaseException):
+ pass
+
+
+class Test_hub(unittest.TestCase):
+ """ Test case for ryu.lib.hub
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ # we want to test timeout first because the rest of tests rely on it.
+ # thus test_0_ prefix.
+
+ @raises(hub.Timeout)
+ def test_0_timeout1(self):
+ with hub.Timeout(0.1):
+ hub.sleep(1)
+
+ @raises(MyException)
+ def test_0_timeout2(self):
+ with hub.Timeout(0.1, MyException):
+ hub.sleep(1)
+
+ def test_0_timeout3(self):
+ with hub.Timeout(1):
+ hub.sleep(0.1)
+ # sleep some more to ensure timer cancelation
+ hub.sleep(2)
+
+ def test_spawn_event1(self):
+ def _child(ev, result):
+ hub.sleep(1)
+ result.append(1)
+ ev.set()
+
+ ev = hub.Event()
+ result = []
+ with hub.Timeout(2):
+ hub.spawn(_child, ev, result)
+ ev.wait()
+ assert len(result) == 1
+
+ def test_spawn_event2(self):
+ def _child(ev, result):
+ hub.sleep(1)
+ result.append(1)
+ ev.set()
+
+ ev = hub.Event()
+ result = []
+ with hub.Timeout(2):
+ t = hub.spawn(_child, ev, result)
+ ev.wait(timeout=0.5)
+ assert len(result) == 0
+ ev.wait()
+ assert len(result) == 1
+
+ def test_spawn_event3(self):
+ def _child(ev, ev2, result):
+ ev2.wait()
+ hub.sleep(0.5)
+ result.append(1)
+ ev.set()
+
+ ev = hub.Event()
+ ev2 = hub.Event()
+ result = []
+ with hub.Timeout(2):
+ hub.spawn(_child, ev, ev2, result)
+ hub.spawn(_child, ev, ev2, result)
+ hub.sleep(0.5)
+ ev2.set() # this should wake up the above created two threads
+ ev.wait(timeout=1)
+ assert len(result) == 2
+
+ def test_spawn_select1(self):
+ import select
+ import socket
+
+ def _child(s1):
+ hub.sleep(0.5)
+ s1.send(b"hoge")
+
+ s1, s2 = socket.socketpair()
+ with hub.Timeout(1):
+ hub.spawn(_child, s1)
+ select.select([s2.fileno()], [], [])
+ select.select([s2.fileno()], [], []) # return immediately
+
+ @raises(MyException)
+ def test_select1(self):
+ import select
+ import socket
+
+ s1, s2 = socket.socketpair()
+ with hub.Timeout(1, MyException):
+ select.select([s2.fileno()], [], [])
+
+ def test_select2(self):
+ import select
+
+ with hub.Timeout(1, MyException):
+ select.select([], [], [], 0) # timeout immediately
+
+ def test_select3(self):
+ import select
+ import socket
+
+ s1, s2 = socket.socketpair()
+ with hub.Timeout(1, MyException):
+ list = [s1.fileno(), s2.fileno()]
+ rlist, wlist, xlist = select.select(list, list, list)
+ assert not s1.fileno() in rlist
+ assert not s2.fileno() in rlist
+ # the following two assertions are commented out because one of
+ # them fails with eventlet-patched select.
+ # assert s1.fileno() in wlist
+ # assert s2.fileno() in wlist
+ # note: eventlet-patched select returns at most one file.
+ assert (s1.fileno() in wlist) or (s2.fileno() in wlist)
+ assert not s1.fileno() in xlist
+ assert not s2.fileno() in xlist
+
+ def test_spawn_joinall(self):
+ def _child(ev2, result):
+ ev2.wait()
+ hub.sleep(0.5)
+ result.append(1)
+ raise BaseException("this exception should not be propagated")
+
+ ev2 = hub.Event()
+ threads = []
+ result = []
+ with hub.Timeout(2):
+ threads.append(hub.spawn(_child, ev2, result))
+ threads.append(hub.spawn(_child, ev2, result))
+ hub.sleep(0.5)
+ ev2.set() # this should wake up the above created two threads
+ hub.joinall(threads)
+ assert len(result) == 2
+
+ def test_spawn_kill_joinall(self):
+ def _child(ev2, result):
+ ev2.wait()
+ result.append(1)
+
+ ev2 = hub.Event()
+ threads = []
+ result = []
+ with hub.Timeout(2):
+ threads.append(hub.spawn(_child, ev2, result))
+ threads.append(hub.spawn(_child, ev2, result))
+ hub.sleep(0.5)
+ for t in threads:
+ hub.kill(t)
+ hub.joinall(threads)
+ assert len(result) == 0
+
+ def test_spawn_kill_nowait_joinall(self):
+ # XXX this test relies on the scheduling behaviour.
+ # the intention here is, killing threads before they get active.
+
+ def _child(result):
+ result.append(1)
+
+ threads = []
+ result = []
+ with hub.Timeout(2):
+ threads.append(hub.spawn(_child, result))
+ for t in threads:
+ hub.kill(t)
+ hub.joinall(threads)
+ assert len(result) == 0
+
+ def test_spawn_kill_die_joinall(self):
+ def _child(result):
+ result.append(1)
+
+ threads = []
+ result = []
+ with hub.Timeout(2):
+ threads.append(hub.spawn(_child, result))
+ threads.append(hub.spawn(_child, result))
+ hub.sleep(0.5)
+ for t in threads:
+ hub.kill(t)
+ hub.joinall(threads)
+ assert len(result) == 2
+
+ def test_spawn_exception_joinall(self):
+ def _child():
+ raise Exception("hoge")
+
+ threads = []
+ with hub.Timeout(2):
+ threads.append(hub.spawn(_child))
+ threads.append(hub.spawn(_child))
+ hub.sleep(0.5)
+ hub.joinall(threads)
+
+ def test_event1(self):
+ ev = hub.Event()
+ ev.set()
+ with hub.Timeout(1):
+ ev.wait() # should return immediately
+
+ def test_event2(self):
+ ev = hub.Event()
+ # allow multiple sets unlike eventlet Event
+ ev.set()
+ ev.set()