summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorIsaku Yamahata <yamahata@valinux.co.jp>2013-11-22 16:45:59 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2013-11-22 14:51:12 -0800
commite45f382e51c9a3d57a6d8c01cec9e70f7ca364fd (patch)
treead0a3946acae0e1e4efe058edf89ef306c79f023
parent2a10dfbd9c9be0eb241ed628675816ab9874d89b (diff)
base/app_manager: create/destroy RyuApp instances dynamically
allow RyuManager to create/destroy RyuApp instances dynamically and register/unregister event observer dynamically. Cc: yuta-hamada <yuta.hamada.z02@gimal.com> Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp> Signed-off-by: YAMAMOTO Takashi <yamamoto@valinux.co.jp> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/base/app_manager.py121
1 files changed, 92 insertions, 29 deletions
diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
index 883ca49c..1cd7e9f6 100644
--- a/ryu/base/app_manager.py
+++ b/ryu/base/app_manager.py
@@ -22,6 +22,7 @@ import sys
from ryu import utils
from ryu.controller.handler import register_instance, get_dependent_services
from ryu.controller.controller import Datapath
+from ryu.controller import event
from ryu.controller.event import EventRequestBase, EventReplyBase
from ryu.lib import hub
@@ -41,6 +42,10 @@ def register_app(app):
register_instance(app)
+def unregister_app(app):
+ SERVICE_BRICKS.pop(app.name)
+
+
class RyuApp(object):
"""
Base class for Ryu network application
@@ -65,12 +70,23 @@ class RyuApp(object):
self.replies = hub.Queue()
self.logger = logging.getLogger(self.name)
+ # prevent accidental creation of instances of this class outside RyuApp
+ class _EventThreadStop(event.EventBase):
+ pass
+ self._event_stop = _EventThreadStop()
+ self.is_active = True
+
def start(self):
"""
Hook that is called after startup initialization is done.
"""
self.threads.append(hub.spawn(self._event_loop))
+ def stop(self):
+ self.is_active = False
+ self._send_event(self._event_stop, None)
+ hub.joinall(self.threads)
+
def register_handler(self, ev_cls, handler):
assert callable(handler)
self.event_handlers.setdefault(ev_cls, [])
@@ -81,6 +97,14 @@ class RyuApp(object):
ev_cls_observers = self.observers.setdefault(ev_cls, {})
ev_cls_observers.setdefault(name, set()).update(states)
+ def unregister_observer(self, ev_cls, name):
+ observers = self.observers.get(ev_cls, {})
+ observers.pop(name)
+
+ def unregister_observer_all_event(self, name):
+ for observers in self.observers.values():
+ observers.pop(name, None)
+
def get_handlers(self, ev, state=None):
handlers = self.event_handlers.get(ev.__class__, [])
if state is None:
@@ -109,8 +133,10 @@ class RyuApp(object):
return self.replies.get()
def _event_loop(self):
- while True:
+ while self.is_active or not self.events.empty():
ev, state = self.events.get()
+ if ev == self._event_stop:
+ continue
handlers = self.get_handlers(ev, state)
for handler in handlers:
handler(ev)
@@ -210,26 +236,7 @@ class AppManager(object):
register_app(context)
return self.contexts
- def instantiate_apps(self, *args, **kwargs):
- for app_name, cls in self.applications_cls.items():
- # for now, only single instance of a given module
- # Do we need to support multiple instances?
- # Yes, maybe for slicing.
- LOG.info('instantiating app %s', app_name)
-
- if hasattr(cls, 'OFP_VERSIONS'):
- for k in Datapath.supported_ofp_version.keys():
- if not k in cls.OFP_VERSIONS:
- del Datapath.supported_ofp_version[k]
-
- assert len(Datapath.supported_ofp_version), \
- 'No OpenFlow version is available'
-
- assert app_name not in self.applications
- app = cls(*args, **kwargs)
- register_app(app)
- self.applications[app_name] = app
-
+ def _update_bricks(self):
for i in SERVICE_BRICKS.values():
for _k, m in inspect.getmembers(i, inspect.ismethod):
if not hasattr(m, 'observer'):
@@ -247,22 +254,78 @@ class AppManager(object):
brick.register_observer(m.ev_cls, i.name,
m.dispatchers)
+ @staticmethod
+ def _report_brick(name, app):
+ LOG.debug("BRICK %s" % name)
+ for ev_cls, list_ in app.observers.items():
+ LOG.debug(" PROVIDES %s TO %s" % (ev_cls.__name__, list_))
+ for ev_cls in app.event_handlers.keys():
+ LOG.debug(" CONSUMES %s" % (ev_cls.__name__,))
+
+ @staticmethod
+ def report_bricks():
for brick, i in SERVICE_BRICKS.items():
- LOG.debug("BRICK %s" % brick)
- for ev_cls, list in i.observers.items():
- LOG.debug(" PROVIDES %s TO %s" % (ev_cls.__name__, list))
- for ev_cls in i.event_handlers.keys():
- LOG.debug(" CONSUMES %s" % (ev_cls.__name__,))
+ AppManager._report_brick(brick, i)
+
+ def _instantiate(self, app_name, cls, *args, **kwargs):
+ # for now, only single instance of a given module
+ # Do we need to support multiple instances?
+ # Yes, maybe for slicing.
+ LOG.info('instantiating app %s of %s', app_name, cls.__name__)
+
+ if hasattr(cls, 'OFP_VERSIONS'):
+ for k in Datapath.supported_ofp_version.keys():
+ if not k in cls.OFP_VERSIONS:
+ del Datapath.supported_ofp_version[k]
+
+ assert len(Datapath.supported_ofp_version), \
+ 'No OpenFlow version is available'
+
+ if app_name is not None:
+ assert app_name not in self.applications
+ app = cls(*args, **kwargs)
+ register_app(app)
+ assert app.name not in self.applications
+ self.applications[app.name] = app
+ return app
+
+ def instantiate(self, cls, *args, **kwargs):
+ app = self._instantiate(None, cls, *args, **kwargs)
+ self._update_bricks()
+ self._report_brick(app.name, app)
+ return app
+
+ def instantiate_apps(self, *args, **kwargs):
+ for app_name, cls in self.applications_cls.items():
+ self._instantiate(app_name, cls, *args, **kwargs)
+
+ self._update_bricks()
+ self.report_bricks()
for app in self.applications.values():
app.start()
+ @staticmethod
+ def _close(app):
+ close_method = getattr(app, 'close', None)
+ if callable(close_method):
+ close_method()
+
+ def uninstantiate(self, name):
+ app = self.applications.pop(name)
+ unregister_app(app)
+ for app_ in SERVICE_BRICKS.values():
+ app_.unregister_observer_all_event(name)
+ app.stop()
+ self._close(app)
+ events = app.events
+ if not events.empty():
+ app.logger.debug('%s events remians %d', app.name, events.qsize())
+
def close(self):
def close_all(close_dict):
for app in close_dict.values():
- close_method = getattr(app, 'close', None)
- if callable(close_method):
- close_method()
+ self._close(app)
close_dict.clear()
close_all(self.applications)