summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorVictor J. Orlikowski <vjo@duke.edu>2016-02-26 11:42:46 -0500
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-02-27 22:04:53 +0900
commit477f2ddd70d2a7709a04fe3062a52078139b9fa7 (patch)
treeceac3997a3792204e7356941e98e0d7a1b2654ff
parentf1f0ca2d1688d25924854a635becdcbfef95a5cd (diff)
Clean up socket close() handling
Also, temporarily work around a bug in eventlet's Queue.put() by wrapping the send_q with a semaphore. Signed-off-by: Victor J. Orlikowski <vjo@duke.edu> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/controller/controller.py62
1 files changed, 42 insertions, 20 deletions
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index 25b8776d..da57a8ce 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
import traceback
import random
import ssl
-from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error as SocketError
+from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout
import warnings
import ryu.base.app_manager
@@ -41,8 +41,8 @@ from ryu.ofproto import ofproto_protocol
from ryu.ofproto import ofproto_v1_0
from ryu.ofproto import nx_match
-from ryu.controller import handler
from ryu.controller import ofp_event
+from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.lib.dpid import dpid_to_str
@@ -103,8 +103,15 @@ def _deactivate(method):
try:
method(self)
finally:
- self.send_active = False
- self.set_state(handler.DEAD_DISPATCHER)
+ try:
+ self.socket.shutdown(SHUT_RDWR)
+ except (EOFError, IOError):
+ pass
+
+ if not self.is_active:
+ self.socket.close()
+ if self.state is not DEAD_DISPATCHER:
+ self.set_state(DEAD_DISPATCHER)
return deactivate
@@ -117,19 +124,20 @@ class Datapath(ofproto_protocol.ProtocolDesc):
self.socket.settimeout(CONF.socket_timeout)
self.address = address
- self.send_active = True
+ self.is_active = True
self.close_requested = False
# The limit is arbitrary. We need to limit queue size to
# prevent it from eating memory up
self.send_q = hub.Queue(16)
+ self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
self.xid = random.randint(0, self.ofproto.MAX_XID)
self.id = None # datapath_id is unknown yet
self._ports = None
self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
- self.set_state(handler.HANDSHAKE_DISPATCHER)
+ self.set_state(HANDSHAKE_DISPATCHER)
def _get_ports(self):
if (self.ofproto_parser is not None and
@@ -167,19 +175,17 @@ class Datapath(ofproto_protocol.ProtocolDesc):
required_len = ofproto_common.OFP_HEADER_SIZE
count = 0
- while True:
+ while not self.close_requested:
ret = ""
try:
ret = self.socket.recv(required_len)
except SocketTimeout:
- if not self.close_requested:
- continue
- except SocketError:
- self.close_requested = True
+ continue
+ except (EOFError, IOError):
+ break
- if (len(ret) == 0) or (self.close_requested):
- self.socket.close()
+ if len(ret) == 0:
break
buf += ret
@@ -215,30 +221,45 @@ class Datapath(ofproto_protocol.ProtocolDesc):
count = 0
hub.sleep(0)
- @_deactivate
def _send_loop(self):
try:
- while self.send_active:
+ while True:
buf = self.send_q.get()
+ self._send_q_sem.release()
self.socket.sendall(buf)
+ except SocketTimeout:
+ LOG.debug("Socket timed out while sending data to switch at address %s",
+ self.address)
except IOError as ioe:
- LOG.debug("Socket error while sending data to switch at address %s: [%d] %s",
- self.address, ioe.errno, ioe.strerror)
+ # Convert ioe.errno to a string, just in case it was somehow set to None.
+ errno = "%s" % ioe.errno
+ LOG.debug("Socket error while sending data to switch at address %s: [%s] %s",
+ self.address, errno, ioe.strerror)
finally:
q = self.send_q
# first, clear self.send_q to prevent new references.
self.send_q = None
- # there might be threads currently blocking in send_q.put().
- # unblock them by draining the queue.
+ # Now, drain the send_q, releasing the associated semaphore for each entry.
+ # This should release all threads waiting to acquire the semaphore.
try:
while q.get(block=False):
- pass
+ self._send_q_sem.release()
except hub.QueueEmpty:
pass
+ # Finally, ensure the _recv_loop terminates.
+ self.close()
def send(self, buf):
+ msg_enqueued = False
+ self._send_q_sem.acquire()
if self.send_q:
self.send_q.put(buf)
+ msg_enqueued = True
+ else:
+ self._send_q_sem.release()
+ if not msg_enqueued:
+ LOG.debug('Datapath in process of terminating; send() to %s discarded.',
+ self.address)
def set_xid(self, msg):
self.xid += 1
@@ -266,6 +287,7 @@ class Datapath(ofproto_protocol.ProtocolDesc):
finally:
hub.kill(send_thr)
hub.joinall([send_thr])
+ self.is_active = False
#
# Utility methods for convenience