summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/services/protocols/bgp/net_ctrl.py36
1 files changed, 24 insertions, 12 deletions
diff --git a/ryu/services/protocols/bgp/net_ctrl.py b/ryu/services/protocols/bgp/net_ctrl.py
index 400032c8..972d0b98 100644
--- a/ryu/services/protocols/bgp/net_ctrl.py
+++ b/ryu/services/protocols/bgp/net_ctrl.py
@@ -94,19 +94,21 @@ class RpcSession(Activity):
and utilities that use these. It also cares about socket communication w/
RPC peer.
"""
+ NAME_FMT = 'RpcSession%s'
def __init__(self, sock, outgoing_msg_sink_iter):
- super(RpcSession, self).__init__("RpcSession(%s)" % sock)
+ self.peer_name = str(sock.getpeername())
+ super(RpcSession, self).__init__(self.NAME_FMT % self.peer_name)
self._packer = msgpack.Packer(encoding='utf-8')
self._unpacker = msgpack.Unpacker(encoding='utf-8')
self._next_msgid = 0
self._socket = sock
self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
- self.peer_name = str(self._socket.getpeername())
self.is_connected = True
def stop(self):
super(RpcSession, self).stop()
+ self.is_connected = False
LOG.info('RPC Session to %s stopped', self.peer_name)
def _run(self):
@@ -330,7 +332,8 @@ class _NetworkController(FlexinetPeer, Activity):
# Outstanding requests, i.e. requests for which we are yet to receive
# response from peer. We currently do not have any requests going out.
self._outstanding_reqs = {}
- self._rpc_session = None
+ # Dictionary for Peer name to RPC session.
+ self._rpc_sessions = {}
def _run(self, *args, **kwargs):
"""Runs RPC server.
@@ -352,19 +355,28 @@ class _NetworkController(FlexinetPeer, Activity):
def _start_rpc_session(self, sock):
"""Starts a new RPC session with given connection.
"""
- if self._rpc_session and self._rpc_session.started:
- self._rpc_session.stop()
+ session_name = RpcSession.NAME_FMT % str(sock.getpeername())
+ self._stop_child_activities(session_name)
- self._rpc_session = RpcSession(sock, self)
- self._rpc_session.start()
+ rpc_session = RpcSession(sock, self)
+ self._spawn_activity(rpc_session)
+
+ def _send_rpc_notification_to_session(self, session, method, params):
+ if not session.is_connected:
+ # Stops disconnected RPC session.
+ self._stop_child_activities(session.name)
+ return
+
+ return session.send_notification(method, params)
def send_rpc_notification(self, method, params):
- if not self.started or self._rpc_session is None:
+ if not self.started:
return
- elif not self._rpc_session.is_connected:
- self._rpc_session = None
- elif self._rpc_session.started:
- return self._rpc_session.send_notification(method, params)
+
+ for session in list(self._child_activity_map.values()):
+ if not isinstance(session, RpcSession):
+ continue
+ self._send_rpc_notification_to_session(session, method, params)
def _handle_response(response):