summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorIWASE Yusuke <iwase.yusuke0@gmail.com>2017-08-22 11:38:55 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2017-08-29 22:50:36 +0900
commit80fedbfeafd955f844ea7a34e420d3718bf28151 (patch)
tree59421fe28feb9cfc37c53f5d9f3e063d17886a9b
parent98a838e487af16004bbb582ef35d4eb52433d283 (diff)
BGPSpeaker/net_ctrl: Close activity for each session
Currently, a thread for processing outgoing message will be remained in "RpcSession" activity even if RPC session is closed by the remote peer, and garbages on memory will grow. This patch fixes to close "RpcSession" activity when RPC session closing. Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/services/protocols/bgp/net_ctrl.py24
1 files changed, 17 insertions, 7 deletions
diff --git a/ryu/services/protocols/bgp/net_ctrl.py b/ryu/services/protocols/bgp/net_ctrl.py
index 7944ac20..92a8e71e 100644
--- a/ryu/services/protocols/bgp/net_ctrl.py
+++ b/ryu/services/protocols/bgp/net_ctrl.py
@@ -107,6 +107,8 @@ class RpcSession(Activity):
self._socket = sock
self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
self.is_connected = True
+ self.green_in = None
+ self.green_out = None
def stop(self):
super(RpcSession, self).stop()
@@ -115,15 +117,15 @@ class RpcSession(Activity):
def _run(self):
# Process outgoing messages in new thread.
- green_out = self._spawn('net_ctrl._process_outgoing',
- self._process_outgoing_msg,
- self._outgoing_msg_sink_iter)
+ self.green_out = self._spawn('net_ctrl._process_outgoing',
+ self._process_outgoing_msg,
+ self._outgoing_msg_sink_iter)
# Process incoming messages in new thread.
- green_in = self._spawn('net_ctrl._process_incoming',
- self._process_incoming_msgs)
+ self.green_in = self._spawn('net_ctrl._process_incoming',
+ self._process_incoming_msgs)
LOG.info('RPC Session to %s started', self.peer_name)
- green_in.wait()
- green_out.wait()
+ self.green_in.wait()
+ self.green_out.wait()
def _next_msg_id(self):
this_id = self._next_msgid
@@ -202,6 +204,10 @@ class RpcSession(Activity):
LOG.error('Invalid message type: %r', msg)
self.pause(0)
+ # Stop outgoing connection.
+ if self.green_out:
+ self.green_out.kill()
+
def _process_outgoing_msg(self, sink_iter):
"""For every message we construct a corresponding RPC message to be
sent over the given socket inside given RPC session.
@@ -231,6 +237,10 @@ class RpcSession(Activity):
self._sendall(rpc_msg)
self.pause(0)
+ # Stop incoming connection.
+ if self.green_in:
+ self.green_in.kill()
+
def _recv(self):
return self._sock_wrap(self._socket.recv)(RPC_SOCK_BUFF_SIZE)