diff options
-rw-r--r-- | ryu/lib/hub.py | 22 | ||||
-rw-r--r-- | ryu/services/protocols/bgp/api/core.py | 6 | ||||
-rw-r--r-- | ryu/services/protocols/bgp/base.py | 20 | ||||
-rw-r--r-- | ryu/services/protocols/bgp/utils/evtlet.py | 59 |
4 files changed, 42 insertions, 65 deletions
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index e9726c0c..95199e75 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -39,6 +39,8 @@ if HUB_TYPE == 'eventlet': getcurrent = eventlet.getcurrent patch = eventlet.monkey_patch sleep = eventlet.sleep + listen = eventlet.listen + connect = eventlet.connect def spawn(*args, **kwargs): def _launch(func, *args, **kwargs): @@ -57,6 +59,23 @@ if HUB_TYPE == 'eventlet': return eventlet.spawn(_launch, *args, **kwargs) + def spawn_after(seconds, *args, **kwargs): + def _launch(func, *args, **kwargs): + # mimic gevent's default raise_error=False behaviour + # by not propergating an exception to the joiner. + try: + func(*args, **kwargs) + except greenlet.GreenletExit: + pass + except: + # log uncaught exception. + # note: this is an intentional divergence from gevent + # behaviour. gevent silently ignores such exceptions. + LOG.error('hub: uncaught exception: %s', + traceback.format_exc()) + + return eventlet.spawn_after(seconds, _launch, *args, **kwargs) + def kill(thread): thread.kill() @@ -119,6 +138,9 @@ if HUB_TYPE == 'eventlet': # note: _ev.reset() is obsolete. self._ev = eventlet.event.Event() + def is_set(self): + return self._cond + def set(self): self._cond = True self._broadcast() diff --git a/ryu/services/protocols/bgp/api/core.py b/ryu/services/protocols/bgp/api/core.py index d8ee1444..e272eef9 100644 --- a/ryu/services/protocols/bgp/api/core.py +++ b/ryu/services/protocols/bgp/api/core.py @@ -16,7 +16,7 @@ """ Defines APIs related to Core/CoreManager. """ -import eventlet +from ryu.lib import hub from ryu.services.protocols.bgp.api.base import register from ryu.services.protocols.bgp.core_manager import CORE_MANAGER @@ -39,7 +39,7 @@ def start(**kwargs): waiter = kwargs.pop('waiter') common_config = CommonConf(**kwargs) - eventlet.spawn(CORE_MANAGER.start, *[], **{'common_conf': common_config, + hub.spawn(CORE_MANAGER.start, *[], **{'common_conf': common_config, 'waiter': waiter}) return True @@ -70,7 +70,7 @@ def reset_neighor(ip_address): # Disable neighbor to close existing session. neigh_conf.enabled = False # Yield here so that we give chance for neighbor to be disabled. - eventlet.sleep(NEIGHBOR_RESET_WAIT_TIME) + hub.sleep(NEIGHBOR_RESET_WAIT_TIME) # Enable neighbor, so that we have a new session with it. neigh_conf.enabled = True else: diff --git a/ryu/services/protocols/bgp/base.py b/ryu/services/protocols/bgp/base.py index 85795c5b..24bb289f 100644 --- a/ryu/services/protocols/bgp/base.py +++ b/ryu/services/protocols/bgp/base.py @@ -16,13 +16,13 @@ Defines some base class related to managing green threads. """ import abc -import eventlet import logging import time import traceback import weakref -from eventlet.timeout import Timeout +from ryu.lib import hub +from ryu.lib.hub import Timeout from ryu.lib.packet.bgp import RF_IPv4_UC from ryu.lib.packet.bgp import RF_IPv6_UC from ryu.lib.packet.bgp import RF_IPv4_VPN @@ -171,7 +171,7 @@ class Activity(object): self._validate_activity(activity) # Spawn a new greenthread for given activity - greenthread = eventlet.spawn(activity.start, *args, **kwargs) + greenthread = hub.spawn(activity.start, *args, **kwargs) self._child_thread_map[activity.name] = greenthread self._child_activity_map[activity.name] = activity return greenthread @@ -180,7 +180,7 @@ class Activity(object): self._validate_activity(activity) # Schedule to spawn a new greenthread after requested delay - greenthread = eventlet.spawn_after(seconds, activity.start, *args, + greenthread = hub.spawn_after(seconds, activity.start, *args, **kwargs) self._child_thread_map[activity.name] = greenthread self._child_activity_map[activity.name] = activity @@ -200,13 +200,13 @@ class Activity(object): def _spawn(self, name, callable_, *args, **kwargs): self._validate_callable(callable_) - greenthread = eventlet.spawn(callable_, *args, **kwargs) + greenthread = hub.spawn(callable_, *args, **kwargs) self._child_thread_map[name] = greenthread return greenthread def _spawn_after(self, name, seconds, callable_, *args, **kwargs): self._validate_callable(callable_) - greenthread = eventlet.spawn_after(seconds, callable_, *args, **kwargs) + greenthread = hub.spawn_after(seconds, callable_, *args, **kwargs) self._child_thread_map[name] = greenthread return greenthread @@ -244,12 +244,12 @@ class Activity(object): self.stop() def pause(self, seconds=0): - """Relinquishes eventlet hub for given number of seconds. + """Relinquishes hub for given number of seconds. In other words is puts to sleep to give other greeenthread a chance to run. """ - eventlet.sleep(seconds) + hub.sleep(seconds) def _stop_child_activities(self): """Stop all child activities spawn by this activity. @@ -317,7 +317,7 @@ class Activity(object): For each connection `server_factory` starts a new protocol. """ - server = eventlet.listen(loc_addr) + server = hub.listen(loc_addr) server_name = self.name + '_server@' + str(loc_addr) self._asso_socket_map[server_name] = server @@ -341,7 +341,7 @@ class Activity(object): LOG.debug('Connect TCP called for %s:%s' % (peer_addr[0], peer_addr[1])) with Timeout(time_out, False): - sock = eventlet.connect(peer_addr, bind=bind_address) + sock = hub.connect(peer_addr, bind=bind_address) if sock: # Connection name for pro-active connection is made up # of local end address + remote end address diff --git a/ryu/services/protocols/bgp/utils/evtlet.py b/ryu/services/protocols/bgp/utils/evtlet.py index 4dc8a943..4c2aa44b 100644 --- a/ryu/services/protocols/bgp/utils/evtlet.py +++ b/ryu/services/protocols/bgp/utils/evtlet.py @@ -16,8 +16,7 @@ """ Concurrent networking library - Eventlet, based utilities classes. """ -import eventlet -from eventlet import event +from ryu.lib import hub import logging LOG = logging.getLogger('utils.evtlet') @@ -28,58 +27,14 @@ class EventletIOFactory(object): @staticmethod def create_custom_event(): LOG.debug('Create CustomEvent called') - return CustomEvent() + return hub.Event() @staticmethod def create_looping_call(funct, *args, **kwargs): LOG.debug('create_looping_call called') return LoopingCall(funct, *args, **kwargs) - -class CustomEvent(object): - """Encapsulates eventlet event to provide a event which can recur. - - It has the same interface as threading.Event but works for eventlet. - """ - def __init__(self,): - self._event = event.Event() - self._is_set = False - - def is_set(self): - """Return true if and only if the internal flag is true.""" - return self._is_set - - def set(self): - """Set the internal flag to true. - - All threads waiting for it to become true are awakened. - Threads that call wait() once the flag is true will not block at all. - """ - if self._event and not self._event.ready(): - self._event.send() - self._is_set = True - - def clear(self): - """Reset the internal flag to false. - - Subsequently, threads calling wait() will block until set() is called - to set the internal flag to true again. - """ - if self._is_set: - self._is_set = False - self._event = event.Event() - - def wait(self): - """Block until the internal flag is true. - - If the internal flag is true on entry, return immediately. Otherwise, - block until another thread calls set() to set the flag to true, or - until the optional timeout occurs. - """ - if not self._is_set: - self._event.wait() - - +# TODO: improve Timer service and move it into framework class LoopingCall(object): """Call a function repeatedly. """ @@ -102,7 +57,7 @@ class LoopingCall(object): def __call__(self): if self._running: # Schedule next iteration of the call. - self._self_thread = eventlet.spawn_after(self._interval, self) + self._self_thread = hub.spawn_after(self._interval, self) self._funct(*self._args, **self._kwargs) def start(self, interval, now=True): @@ -117,9 +72,9 @@ class LoopingCall(object): self._running = True self._interval = interval if now: - self._self_thread = eventlet.spawn_after(0, self) + self._self_thread = hub.spawn_after(0, self) else: - self._self_thread = eventlet.spawn_after(self._interval, self) + self._self_thread = hub.spawn_after(self._interval, self) def stop(self): """Stop running scheduled function. @@ -137,4 +92,4 @@ class LoopingCall(object): self._self_thread.cancel() self._self_thread = None # Schedule a new call - self._self_thread = eventlet.spawn_after(self._interval, self) + self._self_thread = hub.spawn_after(self._interval, self) |