diff options
Diffstat (limited to 'tests/unit/lib/test_hub.py')
-rw-r--r-- | tests/unit/lib/test_hub.py | 239 |
1 files changed, 239 insertions, 0 deletions
diff --git a/tests/unit/lib/test_hub.py b/tests/unit/lib/test_hub.py new file mode 100644 index 00000000..d8b75996 --- /dev/null +++ b/tests/unit/lib/test_hub.py @@ -0,0 +1,239 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest +from nose.tools import raises + +from ryu.lib import hub +hub.patch() + + +class MyException(BaseException): + pass + + +class Test_hub(unittest.TestCase): + """ Test case for ryu.lib.hub + """ + + def setUp(self): + pass + + def tearDown(self): + pass + + # we want to test timeout first because the rest of tests rely on it. + # thus test_0_ prefix. + + @raises(hub.Timeout) + def test_0_timeout1(self): + with hub.Timeout(0.1): + hub.sleep(1) + + @raises(MyException) + def test_0_timeout2(self): + with hub.Timeout(0.1, MyException): + hub.sleep(1) + + def test_0_timeout3(self): + with hub.Timeout(1): + hub.sleep(0.1) + # sleep some more to ensure timer cancelation + hub.sleep(2) + + def test_spawn_event1(self): + def _child(ev, result): + hub.sleep(1) + result.append(1) + ev.set() + + ev = hub.Event() + result = [] + with hub.Timeout(2): + hub.spawn(_child, ev, result) + ev.wait() + assert len(result) == 1 + + def test_spawn_event2(self): + def _child(ev, result): + hub.sleep(1) + result.append(1) + ev.set() + + ev = hub.Event() + result = [] + with hub.Timeout(2): + t = hub.spawn(_child, ev, result) + ev.wait(timeout=0.5) + assert len(result) == 0 + ev.wait() + assert len(result) == 1 + + def test_spawn_event3(self): + def _child(ev, ev2, result): + ev2.wait() + hub.sleep(0.5) + result.append(1) + ev.set() + + ev = hub.Event() + ev2 = hub.Event() + result = [] + with hub.Timeout(2): + hub.spawn(_child, ev, ev2, result) + hub.spawn(_child, ev, ev2, result) + hub.sleep(0.5) + ev2.set() # this should wake up the above created two threads + ev.wait(timeout=1) + assert len(result) == 2 + + def test_spawn_select1(self): + import select + import socket + + def _child(s1): + hub.sleep(0.5) + s1.send(b"hoge") + + s1, s2 = socket.socketpair() + with hub.Timeout(1): + hub.spawn(_child, s1) + select.select([s2.fileno()], [], []) + select.select([s2.fileno()], [], []) # return immediately + + @raises(MyException) + def test_select1(self): + import select + import socket + + s1, s2 = socket.socketpair() + with hub.Timeout(1, MyException): + select.select([s2.fileno()], [], []) + + def test_select2(self): + import select + + with hub.Timeout(1, MyException): + select.select([], [], [], 0) # timeout immediately + + def test_select3(self): + import select + import socket + + s1, s2 = socket.socketpair() + with hub.Timeout(1, MyException): + list = [s1.fileno(), s2.fileno()] + rlist, wlist, xlist = select.select(list, list, list) + assert not s1.fileno() in rlist + assert not s2.fileno() in rlist + # the following two assertions are commented out because one of + # them fails with eventlet-patched select. + # assert s1.fileno() in wlist + # assert s2.fileno() in wlist + # note: eventlet-patched select returns at most one file. + assert (s1.fileno() in wlist) or (s2.fileno() in wlist) + assert not s1.fileno() in xlist + assert not s2.fileno() in xlist + + def test_spawn_joinall(self): + def _child(ev2, result): + ev2.wait() + hub.sleep(0.5) + result.append(1) + raise BaseException("this exception should not be propagated") + + ev2 = hub.Event() + threads = [] + result = [] + with hub.Timeout(2): + threads.append(hub.spawn(_child, ev2, result)) + threads.append(hub.spawn(_child, ev2, result)) + hub.sleep(0.5) + ev2.set() # this should wake up the above created two threads + hub.joinall(threads) + assert len(result) == 2 + + def test_spawn_kill_joinall(self): + def _child(ev2, result): + ev2.wait() + result.append(1) + + ev2 = hub.Event() + threads = [] + result = [] + with hub.Timeout(2): + threads.append(hub.spawn(_child, ev2, result)) + threads.append(hub.spawn(_child, ev2, result)) + hub.sleep(0.5) + for t in threads: + hub.kill(t) + hub.joinall(threads) + assert len(result) == 0 + + def test_spawn_kill_nowait_joinall(self): + # XXX this test relies on the scheduling behaviour. + # the intention here is, killing threads before they get active. + + def _child(result): + result.append(1) + + threads = [] + result = [] + with hub.Timeout(2): + threads.append(hub.spawn(_child, result)) + for t in threads: + hub.kill(t) + hub.joinall(threads) + assert len(result) == 0 + + def test_spawn_kill_die_joinall(self): + def _child(result): + result.append(1) + + threads = [] + result = [] + with hub.Timeout(2): + threads.append(hub.spawn(_child, result)) + threads.append(hub.spawn(_child, result)) + hub.sleep(0.5) + for t in threads: + hub.kill(t) + hub.joinall(threads) + assert len(result) == 2 + + def test_spawn_exception_joinall(self): + def _child(): + raise Exception("hoge") + + threads = [] + with hub.Timeout(2): + threads.append(hub.spawn(_child)) + threads.append(hub.spawn(_child)) + hub.sleep(0.5) + hub.joinall(threads) + + def test_event1(self): + ev = hub.Event() + ev.set() + with hub.Timeout(1): + ev.wait() # should return immediately + + def test_event2(self): + ev = hub.Event() + # allow multiple sets unlike eventlet Event + ev.set() + ev.set() |