summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2014-04-18 02:02:16 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2014-04-21 20:55:35 +0900
commit3a4f092de50f9cc185c667dfeb17bc4c8762ae33 (patch)
treeae84e7d418c6a1a88c95289db9b11cb50f946c05
parent68c13b7fa15a9f51edeb53da3e5ecc2adcdef383 (diff)
bgp: use ryu.lib.hub instead of the direct use of eventlet
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/lib/hub.py22
-rw-r--r--ryu/services/protocols/bgp/api/core.py6
-rw-r--r--ryu/services/protocols/bgp/base.py20
-rw-r--r--ryu/services/protocols/bgp/utils/evtlet.py59
4 files changed, 42 insertions, 65 deletions
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index e9726c0c..95199e75 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -39,6 +39,8 @@ if HUB_TYPE == 'eventlet':
getcurrent = eventlet.getcurrent
patch = eventlet.monkey_patch
sleep = eventlet.sleep
+ listen = eventlet.listen
+ connect = eventlet.connect
def spawn(*args, **kwargs):
def _launch(func, *args, **kwargs):
@@ -57,6 +59,23 @@ if HUB_TYPE == 'eventlet':
return eventlet.spawn(_launch, *args, **kwargs)
+ def spawn_after(seconds, *args, **kwargs):
+ def _launch(func, *args, **kwargs):
+ # mimic gevent's default raise_error=False behaviour
+ # by not propergating an exception to the joiner.
+ try:
+ func(*args, **kwargs)
+ except greenlet.GreenletExit:
+ pass
+ except:
+ # log uncaught exception.
+ # note: this is an intentional divergence from gevent
+ # behaviour. gevent silently ignores such exceptions.
+ LOG.error('hub: uncaught exception: %s',
+ traceback.format_exc())
+
+ return eventlet.spawn_after(seconds, _launch, *args, **kwargs)
+
def kill(thread):
thread.kill()
@@ -119,6 +138,9 @@ if HUB_TYPE == 'eventlet':
# note: _ev.reset() is obsolete.
self._ev = eventlet.event.Event()
+ def is_set(self):
+ return self._cond
+
def set(self):
self._cond = True
self._broadcast()
diff --git a/ryu/services/protocols/bgp/api/core.py b/ryu/services/protocols/bgp/api/core.py
index d8ee1444..e272eef9 100644
--- a/ryu/services/protocols/bgp/api/core.py
+++ b/ryu/services/protocols/bgp/api/core.py
@@ -16,7 +16,7 @@
"""
Defines APIs related to Core/CoreManager.
"""
-import eventlet
+from ryu.lib import hub
from ryu.services.protocols.bgp.api.base import register
from ryu.services.protocols.bgp.core_manager import CORE_MANAGER
@@ -39,7 +39,7 @@ def start(**kwargs):
waiter = kwargs.pop('waiter')
common_config = CommonConf(**kwargs)
- eventlet.spawn(CORE_MANAGER.start, *[], **{'common_conf': common_config,
+ hub.spawn(CORE_MANAGER.start, *[], **{'common_conf': common_config,
'waiter': waiter})
return True
@@ -70,7 +70,7 @@ def reset_neighor(ip_address):
# Disable neighbor to close existing session.
neigh_conf.enabled = False
# Yield here so that we give chance for neighbor to be disabled.
- eventlet.sleep(NEIGHBOR_RESET_WAIT_TIME)
+ hub.sleep(NEIGHBOR_RESET_WAIT_TIME)
# Enable neighbor, so that we have a new session with it.
neigh_conf.enabled = True
else:
diff --git a/ryu/services/protocols/bgp/base.py b/ryu/services/protocols/bgp/base.py
index 85795c5b..24bb289f 100644
--- a/ryu/services/protocols/bgp/base.py
+++ b/ryu/services/protocols/bgp/base.py
@@ -16,13 +16,13 @@
Defines some base class related to managing green threads.
"""
import abc
-import eventlet
import logging
import time
import traceback
import weakref
-from eventlet.timeout import Timeout
+from ryu.lib import hub
+from ryu.lib.hub import Timeout
from ryu.lib.packet.bgp import RF_IPv4_UC
from ryu.lib.packet.bgp import RF_IPv6_UC
from ryu.lib.packet.bgp import RF_IPv4_VPN
@@ -171,7 +171,7 @@ class Activity(object):
self._validate_activity(activity)
# Spawn a new greenthread for given activity
- greenthread = eventlet.spawn(activity.start, *args, **kwargs)
+ greenthread = hub.spawn(activity.start, *args, **kwargs)
self._child_thread_map[activity.name] = greenthread
self._child_activity_map[activity.name] = activity
return greenthread
@@ -180,7 +180,7 @@ class Activity(object):
self._validate_activity(activity)
# Schedule to spawn a new greenthread after requested delay
- greenthread = eventlet.spawn_after(seconds, activity.start, *args,
+ greenthread = hub.spawn_after(seconds, activity.start, *args,
**kwargs)
self._child_thread_map[activity.name] = greenthread
self._child_activity_map[activity.name] = activity
@@ -200,13 +200,13 @@ class Activity(object):
def _spawn(self, name, callable_, *args, **kwargs):
self._validate_callable(callable_)
- greenthread = eventlet.spawn(callable_, *args, **kwargs)
+ greenthread = hub.spawn(callable_, *args, **kwargs)
self._child_thread_map[name] = greenthread
return greenthread
def _spawn_after(self, name, seconds, callable_, *args, **kwargs):
self._validate_callable(callable_)
- greenthread = eventlet.spawn_after(seconds, callable_, *args, **kwargs)
+ greenthread = hub.spawn_after(seconds, callable_, *args, **kwargs)
self._child_thread_map[name] = greenthread
return greenthread
@@ -244,12 +244,12 @@ class Activity(object):
self.stop()
def pause(self, seconds=0):
- """Relinquishes eventlet hub for given number of seconds.
+ """Relinquishes hub for given number of seconds.
In other words is puts to sleep to give other greeenthread a chance to
run.
"""
- eventlet.sleep(seconds)
+ hub.sleep(seconds)
def _stop_child_activities(self):
"""Stop all child activities spawn by this activity.
@@ -317,7 +317,7 @@ class Activity(object):
For each connection `server_factory` starts a new protocol.
"""
- server = eventlet.listen(loc_addr)
+ server = hub.listen(loc_addr)
server_name = self.name + '_server@' + str(loc_addr)
self._asso_socket_map[server_name] = server
@@ -341,7 +341,7 @@ class Activity(object):
LOG.debug('Connect TCP called for %s:%s' % (peer_addr[0],
peer_addr[1]))
with Timeout(time_out, False):
- sock = eventlet.connect(peer_addr, bind=bind_address)
+ sock = hub.connect(peer_addr, bind=bind_address)
if sock:
# Connection name for pro-active connection is made up
# of local end address + remote end address
diff --git a/ryu/services/protocols/bgp/utils/evtlet.py b/ryu/services/protocols/bgp/utils/evtlet.py
index 4dc8a943..4c2aa44b 100644
--- a/ryu/services/protocols/bgp/utils/evtlet.py
+++ b/ryu/services/protocols/bgp/utils/evtlet.py
@@ -16,8 +16,7 @@
"""
Concurrent networking library - Eventlet, based utilities classes.
"""
-import eventlet
-from eventlet import event
+from ryu.lib import hub
import logging
LOG = logging.getLogger('utils.evtlet')
@@ -28,58 +27,14 @@ class EventletIOFactory(object):
@staticmethod
def create_custom_event():
LOG.debug('Create CustomEvent called')
- return CustomEvent()
+ return hub.Event()
@staticmethod
def create_looping_call(funct, *args, **kwargs):
LOG.debug('create_looping_call called')
return LoopingCall(funct, *args, **kwargs)
-
-class CustomEvent(object):
- """Encapsulates eventlet event to provide a event which can recur.
-
- It has the same interface as threading.Event but works for eventlet.
- """
- def __init__(self,):
- self._event = event.Event()
- self._is_set = False
-
- def is_set(self):
- """Return true if and only if the internal flag is true."""
- return self._is_set
-
- def set(self):
- """Set the internal flag to true.
-
- All threads waiting for it to become true are awakened.
- Threads that call wait() once the flag is true will not block at all.
- """
- if self._event and not self._event.ready():
- self._event.send()
- self._is_set = True
-
- def clear(self):
- """Reset the internal flag to false.
-
- Subsequently, threads calling wait() will block until set() is called
- to set the internal flag to true again.
- """
- if self._is_set:
- self._is_set = False
- self._event = event.Event()
-
- def wait(self):
- """Block until the internal flag is true.
-
- If the internal flag is true on entry, return immediately. Otherwise,
- block until another thread calls set() to set the flag to true, or
- until the optional timeout occurs.
- """
- if not self._is_set:
- self._event.wait()
-
-
+# TODO: improve Timer service and move it into framework
class LoopingCall(object):
"""Call a function repeatedly.
"""
@@ -102,7 +57,7 @@ class LoopingCall(object):
def __call__(self):
if self._running:
# Schedule next iteration of the call.
- self._self_thread = eventlet.spawn_after(self._interval, self)
+ self._self_thread = hub.spawn_after(self._interval, self)
self._funct(*self._args, **self._kwargs)
def start(self, interval, now=True):
@@ -117,9 +72,9 @@ class LoopingCall(object):
self._running = True
self._interval = interval
if now:
- self._self_thread = eventlet.spawn_after(0, self)
+ self._self_thread = hub.spawn_after(0, self)
else:
- self._self_thread = eventlet.spawn_after(self._interval, self)
+ self._self_thread = hub.spawn_after(self._interval, self)
def stop(self):
"""Stop running scheduled function.
@@ -137,4 +92,4 @@ class LoopingCall(object):
self._self_thread.cancel()
self._self_thread = None
# Schedule a new call
- self._self_thread = eventlet.spawn_after(self._interval, self)
+ self._self_thread = hub.spawn_after(self._interval, self)