diff options
-rw-r--r-- | ryu/controller/controller.py | 37 | ||||
-rw-r--r-- | ryu/controller/dpset.py | 3 |
2 files changed, 30 insertions, 10 deletions
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 577c6dac..10d8b453 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -57,7 +57,8 @@ CONF.register_cli_opts([ help='openflow ssl listen port'), cfg.StrOpt('ctl-privkey', default=None, help='controller private key'), cfg.StrOpt('ctl-cert', default=None, help='controller certificate'), - cfg.StrOpt('ca-certs', default=None, help='CA certificates') + cfg.StrOpt('ca-certs', default=None, help='CA certificates'), + cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to await completion of socket operations.') ]) @@ -102,7 +103,8 @@ def _deactivate(method): try: method(self) finally: - self.is_active = False + self.send_active = False + self.set_state(handler.DEAD_DISPATCHER) return deactivate @@ -112,8 +114,11 @@ class Datapath(ofproto_protocol.ProtocolDesc): self.socket = socket self.socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1) + self.socket.settimeout(CONF.socket_timeout) self.address = address - self.is_active = True + + self.send_active = True + self.close_requested = False # The limit is arbitrary. We need to limit queue size to # prevent it from eating memory up @@ -145,8 +150,9 @@ class Datapath(ofproto_protocol.ProtocolDesc): # To show warning when Datapath#ports is read ports = property(_get_ports, _set_ports) + @_deactivate def close(self): - self.set_state(handler.DEAD_DISPATCHER) + self.close_requested = True def set_state(self, state): self.state = state @@ -161,12 +167,22 @@ class Datapath(ofproto_protocol.ProtocolDesc): required_len = ofproto_common.OFP_HEADER_SIZE count = 0 - while self.is_active: - ret = self.socket.recv(required_len) - if len(ret) == 0: - self.is_active = False + while True: + ret = "" + + try: + ret = self.socket.recv(required_len) + except: + # Hit socket timeout; decide what to do. + if self.close_requested: + pass + else: + continue + + if (len(ret) == 0) or (self.close_requested): self.socket.close() break + buf += ret while len(buf) >= required_len: (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) @@ -203,9 +219,12 @@ class Datapath(ofproto_protocol.ProtocolDesc): @_deactivate def _send_loop(self): try: - while self.is_active: + while self.send_active: buf = self.send_q.get() self.socket.sendall(buf) + except IOError as ioe: + LOG.debug("Socket error while sending data to switch at address %s: [%d] %s", + self.address, ioe.errno, ioe.strerror) finally: q = self.send_q # first, clear self.send_q to prevent new references. diff --git a/ryu/controller/dpset.py b/ryu/controller/dpset.py index 304658dc..4eca0467 100644 --- a/ryu/controller/dpset.py +++ b/ryu/controller/dpset.py @@ -117,6 +117,7 @@ class DPSet(app_manager.RyuApp): self.logger.warning('DPSET: Multiple connections from %s', dpid_to_str(dp.id)) self.logger.debug('DPSET: Forgetting datapath %s', self.dps[dp.id]) + (self.dps[dp.id]).close() self.logger.debug('DPSET: New datapath %s', dp) self.dps[dp.id] = dp if dp.id not in self.port_state: @@ -176,7 +177,7 @@ class DPSet(app_manager.RyuApp): @set_ev_cls(ofp_event.EventOFPStateChange, [handler.MAIN_DISPATCHER, handler.DEAD_DISPATCHER]) - def dispacher_change(self, ev): + def dispatcher_change(self, ev): datapath = ev.datapath assert datapath is not None if ev.state == handler.MAIN_DISPATCHER: |