diff options
-rw-r--r-- | ryu/services/protocols/bgp/net_ctrl.py | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/ryu/services/protocols/bgp/net_ctrl.py b/ryu/services/protocols/bgp/net_ctrl.py index 925df518..342e2efe 100644 --- a/ryu/services/protocols/bgp/net_ctrl.py +++ b/ryu/services/protocols/bgp/net_ctrl.py @@ -102,12 +102,12 @@ class RpcSession(Activity): self._next_msgid = 0 self._socket = sock self._outgoing_msg_sink_iter = outgoing_msg_sink_iter + self.peer_name = str(self._socket.getpeername()) + self.is_connected = True def stop(self): super(RpcSession, self).stop() - LOG.critical( - 'RPC Session to %s stopped', str(self._socket.getpeername()) - ) + LOG.info('RPC Session to %s stopped', self.peer_name) def _run(self): # Process outgoing messages in new thread. @@ -117,9 +117,7 @@ class RpcSession(Activity): # Process incoming messages in new thread. green_in = self._spawn('net_ctrl._process_incoming', self._process_incoming_msgs) - LOG.critical( - 'RPC Session to %s started', str(self._socket.getpeername()) - ) + LOG.info('RPC Session to %s started', self.peer_name) green_in.wait() green_out.wait() @@ -166,11 +164,13 @@ class RpcSession(Activity): LOG.debug('NetworkController started processing incoming messages') assert self._socket - while True: + while self.is_connected: # Wait for request/response/notification from peer. msg_buff = self._recv() if len(msg_buff) == 0: - LOG.info('Peer %r disconnected.', self._socket) + LOG.info('Peer %s disconnected.', self.peer_name) + self.is_connected = False + self._socket.close() break messages = self.feed_and_get_messages(msg_buff) for msg in messages: @@ -197,16 +197,17 @@ class RpcSession(Activity): it loops forever. """ LOG.debug('NetworkController processing outgoing request list.') - # TODO(Team): handle un-expected exception breaking the loop in - # graceful manner. Discuss this with other component developers. # TODO(PH): We should try not to sent routes from bgp peer that is not # in established state. from ryu.services.protocols.bgp.model import ( FlexinetOutgoingRoute) - while True: + while self.is_connected: # sink iter is Sink instance and next is blocking so this isn't # active wait. for outgoing_msg in sink_iter: + if not self.is_connected: + self._socket.close() + return if isinstance(outgoing_msg, FlexinetOutgoingRoute): rpc_msg = _create_prefix_notification(outgoing_msg, self) else: @@ -349,8 +350,11 @@ class _NetworkController(FlexinetPeer, Activity): self._rpc_session.start() def send_rpc_notification(self, method, params): - if (self.started and self._rpc_session is not None and - self._rpc_session.started): + if not self.started or self._rpc_session is None: + return + elif not self._rpc_session.is_connected: + self._rpc_session = None + elif self._rpc_session.started: return self._rpc_session.send_notification(method, params) |