From 7265edc23677e4b837139dc6d5084cc7677dcc50 Mon Sep 17 00:00:00 2001
From: YAMAMOTO Takashi <yamamoto@valinux.co.jp>
Date: Wed, 26 Mar 2014 13:04:20 +0900
Subject: ryu.app.ofctl: implement reception of reply message

an example:
    msg = parser.OFPEchoRequest(datapath=datapath)
    result = ofctl.api.send_msg(self, msg, reply_cls=parser.OFPEchoReply)
    assert isinstance(result, parser.OFPEchoReply)

Signed-off-by: YAMAMOTO Takashi <yamamoto@valinux.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
---
 ryu/app/ofctl/api.py     |  5 +++--
 ryu/app/ofctl/event.py   |  3 ++-
 ryu/app/ofctl/service.py | 37 ++++++++++++++++++++++++++++++++++++-
 3 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/ryu/app/ofctl/api.py b/ryu/app/ofctl/api.py
index e49a001f..476543eb 100644
--- a/ryu/app/ofctl/api.py
+++ b/ryu/app/ofctl/api.py
@@ -29,11 +29,12 @@ def get_datapath(app, dpid):
     return app.send_request(event.GetDatapathRequest(dpid=dpid))()
 
 
-def send_msg(app, msg):
+def send_msg(app, msg, reply_cls=None):
     """
     Send an openflow message.
     """
-    return app.send_request(event.SendMsgRequest(msg=msg))()
+    return app.send_request(event.SendMsgRequest(msg=msg,
+                                                 reply_cls=reply_cls))()
 
 
 app_manager.require_app('ryu.app.ofctl.service')
diff --git a/ryu/app/ofctl/event.py b/ryu/app/ofctl/event.py
index 99208337..9dfefcdf 100644
--- a/ryu/app/ofctl/event.py
+++ b/ryu/app/ofctl/event.py
@@ -40,9 +40,10 @@ class GetDatapathRequest(_RequestBase):
 # send msg
 
 class SendMsgRequest(_RequestBase):
-    def __init__(self, msg):
+    def __init__(self, msg, reply_cls=None):
         super(SendMsgRequest, self).__init__()
         self.msg = msg
+        self.reply_cls = reply_cls
 
 
 # generic reply
diff --git a/ryu/app/ofctl/service.py b/ryu/app/ofctl/service.py
index 3e24170b..272a430f 100644
--- a/ryu/app/ofctl/service.py
+++ b/ryu/app/ofctl/service.py
@@ -39,6 +39,27 @@ class OfctlService(app_manager.RyuApp):
         super(OfctlService, self).__init__(*args, **kwargs)
         self.name = 'ofctl_service'
         self._switches = {}
+        self._observing_events = {}
+
+    def _observe_msg(self, msg_cls):
+        assert msg_cls is not None
+        ev_cls = ofp_event.ofp_msg_to_ev_cls(msg_cls)
+        self._observing_events.setdefault(ev_cls, 0)
+        if self._observing_events[ev_cls] == 0:
+            self.logger.debug('ofctl: start observing %s' % (ev_cls,))
+            self.register_handler(ev_cls, self._handle_reply)
+            self.observe_event(ev_cls)
+        self._observing_events[ev_cls] += 1
+
+    def _unobserve_msg(self, msg_cls):
+        assert msg_cls is not None
+        ev_cls = ofp_event.ofp_msg_to_ev_cls(msg_cls)
+        assert self._observing_events[ev_cls] > 0
+        self._observing_events[ev_cls] -= 1
+        if self._observing_events[ev_cls] == 0:
+            self.unregister_handler(ev_cls, self._handle_reply)
+            self.unobserve_event(ev_cls)
+            self.logger.debug('ofctl: stop observing %s' % (ev_cls,))
 
     @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
     def _switch_features_handler(self, ev):
@@ -80,6 +101,8 @@ class OfctlService(app_manager.RyuApp):
 
     @set_ev_cls(event.SendMsgRequest, MAIN_DISPATCHER)
     def _handle_send_msg(self, req):
+        if not req.reply_cls is None:
+            self._observe_msg(req.reply_cls)
         msg = req.msg
         datapath = msg.datapath
         datapath.set_xid(msg)
@@ -112,11 +135,13 @@ class OfctlService(app_manager.RyuApp):
         except KeyError:
             result = None
         req = si.xids.pop(xid)
+        if not req.reply_cls is None:
+            self._unobserve_msg(req.reply_cls)
         rep = event.Reply(result=result)
         self.reply_to_request(req, rep)
 
     @set_ev_cls(ofp_event.EventOFPErrorMsg, MAIN_DISPATCHER)
-    def _handle_error(self, ev):
+    def _handle_reply(self, ev):
         msg = ev.msg
         datapath = msg.datapath
         try:
@@ -124,6 +149,16 @@ class OfctlService(app_manager.RyuApp):
         except KeyError:
             self.logger.error('unknown dpid %s' % (datapath.id,))
             return
+        try:
+            req = si.xids[msg.xid]
+        except KeyError:
+            self.logger.error('unknown error xid %s' % (msg.xid,))
+            return
+        if ((not isinstance(ev, ofp_event.EventOFPErrorMsg)) and
+           (req.reply_cls is None or not isinstance(ev.msg, req.reply_cls))):
+            self.logger.error('unexpected reply %s for xid %s' %
+                              (ev, msg.xid,))
+            return
         try:
             si.results[msg.xid] = ev.msg
         except KeyError:
-- 
cgit v1.2.3