diff options
author | Isaku Yamahata <yamahata@valinux.co.jp> | 2012-01-31 16:45:02 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2012-02-01 08:55:37 +0900 |
commit | f9bae8bb898dce7816f84fed196a49f9a03ed52a (patch) | |
tree | 4e5edfe1e803933583378cd97d0e7d5b7addb48b | |
parent | 4dd1118b7df7fa4c0b729d1fcf24f18284af6c6f (diff) |
controller/dispatcher: introduce events on event queue itself
This patch introduces event queue event(event on event queue).
And track dispatcher children in order to register callback handler
for all children of a given dispatcher.
Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/controller/dispatcher.py | 98 |
1 files changed, 91 insertions, 7 deletions
diff --git a/ryu/controller/dispatcher.py b/ryu/controller/dispatcher.py index 1768a136..629d6a15 100644 --- a/ryu/controller/dispatcher.py +++ b/ryu/controller/dispatcher.py @@ -1,5 +1,5 @@ # Copyright (C) 2011 Nippon Telegraph and Telephone Corporation. -# Copyright (C) 2011 Isaku Yamahata <yamahata at valinux co jp> +# Copyright (C) 2011, 2012 Isaku Yamahata <yamahata at valinux co jp> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -18,6 +18,7 @@ import weakref from gevent.queue import Queue +from . import event LOG = logging.getLogger('ryu.controller.dispatcher') @@ -29,16 +30,47 @@ class EventQueue(object): # Otherwise, instances can't be freed. event_queues = weakref.WeakSet() - def __init__(self, name, dispatcher): - self.event_queues.add(self) + # weakref: break circular reference + # self._ev_q_weakref == weakref.ref(self) + _ev_q_weakref = None + + def set_ev_q(self): + self.__class__._ev_q_weakref = weakref.ref(self) + + def _get_ev_q(self): + ev_q = self._ev_q_weakref + if ev_q is not None: + ev_q = ev_q() + return ev_q + def _queue_q_ev(self, ev): + ev_q = self._get_ev_q() + if ev_q is not None: + ev_q.queue(ev) + + def __init__(self, name, dispatcher, aux=None): self.name = name - self.dispatcher = dispatcher + self.dispatcher = dispatcher.clone() self.is_dispatching = False self.ev_q = Queue() + self.aux = aux # for EventQueueCreate event + + self.event_queues.add(self) + self._queue_q_ev(EventQueueCreate(self, True)) + + def __del__(self): + # This can be called when python interpreter exiting. + # At that time, other object like EventQueueCreate can be + # already destructed. So we can't call it blindly. + ev_q = self._get_ev_q() + if ev_q is not None and self != ev_q: + self._queue_q_ev(EventQueueCreate(self, False)) def set_dispatcher(self, dispatcher): - self.dispatcher = dispatcher + old = self.dispatcher + new = dispatcher.clone() + self.dispatcher = new + self._queue_q_ev(EventDispatcherChange(self, old, new)) def queue_raw(self, ev): self.ev_q.put(ev) @@ -76,28 +108,51 @@ class EventDispatcher(object): event_dispatchers = weakref.WeakSet() def __init__(self, name): - self.event_dispatchers.add(self) - + # WeakSet: In order to let child to go away. + # We are interested only in alive children. + self.children = weakref.WeakSet() self.name = name self.events = {} self.all_handlers = [] + self.event_dispatchers.add(self) + + def clone(self): + cloned = EventDispatcher(self.name) + for ev_cls, h in self.events.items(): + cloned.events[ev_cls] = copy.copy(h) + cloned.all_handlers = copy.copy(self.all_handlers) + self.children.add(cloned) + return cloned + + def _foreach_children(self, call, *args, **kwargs): + for c in self.children: + call(c, *args, **kwargs) + def register_all_handler(self, all_handler): self.all_handlers.append(all_handler) + self._foreach_children(EventDispatcher.register_all_handler, + all_handler) def unregister_all_handler(self, all_handler): + self._foreach_children(EventDispatcher.unregister_all_handler, + all_handler) del self.all_handlers[all_handler] def register_handler(self, ev_cls, handler): assert callable(handler) self.events.setdefault(ev_cls, []) self.events[ev_cls].append(handler) + self._foreach_children(EventDispatcher.register_handler, + ev_cls, handler) def register_handlers(self, handlers): for ev_cls, h in handlers: self.register_handler(ev_cls, h) def unregister_handler(self, ev_cls, handler): + self._foreach_children(EventDispatcher.unregister_handler, + ev_cls, handler) del self.events[ev_cls][handler] def register_static(self, ev_cls): @@ -143,3 +198,32 @@ class EventDispatcher(object): ret = h(ev) if ret is False: break + + +class EventQueueBase(event.EventBase): + def __init__(self, ev_q): + super(EventQueueBase, self).__init__() + self.ev_q = ev_q + self.aux = ev_q.aux + + +class EventQueueCreate(EventQueueBase): + def __init__(self, ev_q, create): + super(EventQueueCreate, self).__init__(ev_q) + self.create = bool(create) # True: queue is created + # False: queue is destroyed + self.dispatcher = ev_q.dispatcher + + +class EventDispatcherChange(EventQueueBase): + def __init__(self, ev_q, old_dispatcher, new_dispatcher): + super(EventDispatcherChange, self).__init__(ev_q) + self.old_dispatcher = old_dispatcher + self.new_dispatcher = new_dispatcher + + +DISPATCHER_NAME_QUEUE_EV = 'queue_event' +QUEUE_EV_DISPATCHER = EventDispatcher(DISPATCHER_NAME_QUEUE_EV) +QUEUE_NAME_QEV_Q = 'queue_event' +QUEUE_EV_Q = EventQueue(QUEUE_NAME_QEV_Q, QUEUE_EV_DISPATCHER) +QUEUE_EV_Q.set_ev_q() |