diff options
author | Victor J. Orlikowski <vjo@duke.edu> | 2016-02-26 11:42:46 -0500 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-02-27 22:04:53 +0900 |
commit | 477f2ddd70d2a7709a04fe3062a52078139b9fa7 (patch) | |
tree | ceac3997a3792204e7356941e98e0d7a1b2654ff | |
parent | f1f0ca2d1688d25924854a635becdcbfef95a5cd (diff) |
Clean up socket close() handling
Also, temporarily work around a bug in eventlet's Queue.put() by
wrapping the send_q with a semaphore.
Signed-off-by: Victor J. Orlikowski <vjo@duke.edu>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/controller/controller.py | 62 |
1 files changed, 42 insertions, 20 deletions
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 25b8776d..da57a8ce 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer import traceback import random import ssl -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error as SocketError +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout import warnings import ryu.base.app_manager @@ -41,8 +41,8 @@ from ryu.ofproto import ofproto_protocol from ryu.ofproto import ofproto_v1_0 from ryu.ofproto import nx_match -from ryu.controller import handler from ryu.controller import ofp_event +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.lib.dpid import dpid_to_str @@ -103,8 +103,15 @@ def _deactivate(method): try: method(self) finally: - self.send_active = False - self.set_state(handler.DEAD_DISPATCHER) + try: + self.socket.shutdown(SHUT_RDWR) + except (EOFError, IOError): + pass + + if not self.is_active: + self.socket.close() + if self.state is not DEAD_DISPATCHER: + self.set_state(DEAD_DISPATCHER) return deactivate @@ -117,19 +124,20 @@ class Datapath(ofproto_protocol.ProtocolDesc): self.socket.settimeout(CONF.socket_timeout) self.address = address - self.send_active = True + self.is_active = True self.close_requested = False # The limit is arbitrary. We need to limit queue size to # prevent it from eating memory up self.send_q = hub.Queue(16) + self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize) self.xid = random.randint(0, self.ofproto.MAX_XID) self.id = None # datapath_id is unknown yet self._ports = None self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10 self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event') - self.set_state(handler.HANDSHAKE_DISPATCHER) + self.set_state(HANDSHAKE_DISPATCHER) def _get_ports(self): if (self.ofproto_parser is not None and @@ -167,19 +175,17 @@ class Datapath(ofproto_protocol.ProtocolDesc): required_len = ofproto_common.OFP_HEADER_SIZE count = 0 - while True: + while not self.close_requested: ret = "" try: ret = self.socket.recv(required_len) except SocketTimeout: - if not self.close_requested: - continue - except SocketError: - self.close_requested = True + continue + except (EOFError, IOError): + break - if (len(ret) == 0) or (self.close_requested): - self.socket.close() + if len(ret) == 0: break buf += ret @@ -215,30 +221,45 @@ class Datapath(ofproto_protocol.ProtocolDesc): count = 0 hub.sleep(0) - @_deactivate def _send_loop(self): try: - while self.send_active: + while True: buf = self.send_q.get() + self._send_q_sem.release() self.socket.sendall(buf) + except SocketTimeout: + LOG.debug("Socket timed out while sending data to switch at address %s", + self.address) except IOError as ioe: - LOG.debug("Socket error while sending data to switch at address %s: [%d] %s", - self.address, ioe.errno, ioe.strerror) + # Convert ioe.errno to a string, just in case it was somehow set to None. + errno = "%s" % ioe.errno + LOG.debug("Socket error while sending data to switch at address %s: [%s] %s", + self.address, errno, ioe.strerror) finally: q = self.send_q # first, clear self.send_q to prevent new references. self.send_q = None - # there might be threads currently blocking in send_q.put(). - # unblock them by draining the queue. + # Now, drain the send_q, releasing the associated semaphore for each entry. + # This should release all threads waiting to acquire the semaphore. try: while q.get(block=False): - pass + self._send_q_sem.release() except hub.QueueEmpty: pass + # Finally, ensure the _recv_loop terminates. + self.close() def send(self, buf): + msg_enqueued = False + self._send_q_sem.acquire() if self.send_q: self.send_q.put(buf) + msg_enqueued = True + else: + self._send_q_sem.release() + if not msg_enqueued: + LOG.debug('Datapath in process of terminating; send() to %s discarded.', + self.address) def set_xid(self, msg): self.xid += 1 @@ -266,6 +287,7 @@ class Datapath(ofproto_protocol.ProtocolDesc): finally: hub.kill(send_thr) hub.joinall([send_thr]) + self.is_active = False # # Utility methods for convenience |