diff options
-rw-r--r-- | ryu/services/protocols/bgp/net_ctrl.py | 36 |
1 files changed, 24 insertions, 12 deletions
diff --git a/ryu/services/protocols/bgp/net_ctrl.py b/ryu/services/protocols/bgp/net_ctrl.py index 400032c8..972d0b98 100644 --- a/ryu/services/protocols/bgp/net_ctrl.py +++ b/ryu/services/protocols/bgp/net_ctrl.py @@ -94,19 +94,21 @@ class RpcSession(Activity): and utilities that use these. It also cares about socket communication w/ RPC peer. """ + NAME_FMT = 'RpcSession%s' def __init__(self, sock, outgoing_msg_sink_iter): - super(RpcSession, self).__init__("RpcSession(%s)" % sock) + self.peer_name = str(sock.getpeername()) + super(RpcSession, self).__init__(self.NAME_FMT % self.peer_name) self._packer = msgpack.Packer(encoding='utf-8') self._unpacker = msgpack.Unpacker(encoding='utf-8') self._next_msgid = 0 self._socket = sock self._outgoing_msg_sink_iter = outgoing_msg_sink_iter - self.peer_name = str(self._socket.getpeername()) self.is_connected = True def stop(self): super(RpcSession, self).stop() + self.is_connected = False LOG.info('RPC Session to %s stopped', self.peer_name) def _run(self): @@ -330,7 +332,8 @@ class _NetworkController(FlexinetPeer, Activity): # Outstanding requests, i.e. requests for which we are yet to receive # response from peer. We currently do not have any requests going out. self._outstanding_reqs = {} - self._rpc_session = None + # Dictionary for Peer name to RPC session. + self._rpc_sessions = {} def _run(self, *args, **kwargs): """Runs RPC server. @@ -352,19 +355,28 @@ class _NetworkController(FlexinetPeer, Activity): def _start_rpc_session(self, sock): """Starts a new RPC session with given connection. """ - if self._rpc_session and self._rpc_session.started: - self._rpc_session.stop() + session_name = RpcSession.NAME_FMT % str(sock.getpeername()) + self._stop_child_activities(session_name) - self._rpc_session = RpcSession(sock, self) - self._rpc_session.start() + rpc_session = RpcSession(sock, self) + self._spawn_activity(rpc_session) + + def _send_rpc_notification_to_session(self, session, method, params): + if not session.is_connected: + # Stops disconnected RPC session. + self._stop_child_activities(session.name) + return + + return session.send_notification(method, params) def send_rpc_notification(self, method, params): - if not self.started or self._rpc_session is None: + if not self.started: return - elif not self._rpc_session.is_connected: - self._rpc_session = None - elif self._rpc_session.started: - return self._rpc_session.send_notification(method, params) + + for session in list(self._child_activity_map.values()): + if not isinstance(session, RpcSession): + continue + self._send_rpc_notification_to_session(session, method, params) def _handle_response(response): |