summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/connect.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/transport/tcp/connect.go')
-rw-r--r--pkg/tcpip/transport/tcp/connect.go43
1 files changed, 38 insertions, 5 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 9aaabe0b1..d9f87c793 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -443,7 +443,8 @@ func (h *handshake) execute() *tcpip.Error {
return tcpip.ErrAborted
}
if n&notifyDrain != 0 {
- for s := h.ep.segmentQueue.dequeue(); s != nil; s = h.ep.segmentQueue.dequeue() {
+ for !h.ep.segmentQueue.empty() {
+ s := h.ep.segmentQueue.dequeue()
err := h.handleSegment(s)
s.decRef()
if err != nil {
@@ -813,15 +814,13 @@ func (e *endpoint) handleSegments() *tcpip.Error {
// protocolMainLoop is the main loop of the TCP protocol. It runs in its own
// goroutine and is responsible for sending segments and handling received
// segments.
-func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
+func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
var closeTimer *time.Timer
var closeWaker sleep.Waker
defer func() {
// e.mu is expected to be hold upon entering this section.
- e.completeWorkerLocked()
-
if e.snd != nil {
e.snd.resendTimer.cleanup()
}
@@ -830,6 +829,8 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
closeTimer.Stop()
}
+ e.completeWorkerLocked()
+
if e.drainDone != nil {
close(e.drainDone)
}
@@ -840,7 +841,7 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
}()
- if !passive {
+ if handshake {
// This is an active connection, so we must initiate the 3-way
// handshake, and then inform potential waiters about its
// completion.
@@ -945,6 +946,17 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
closeWaker.Assert()
})
}
+
+ if n&notifyDrain != 0 {
+ for !e.segmentQueue.empty() {
+ if err := e.handleSegments(); err != nil {
+ return err
+ }
+ }
+ close(e.drainDone)
+ <-e.undrain
+ }
+
return nil
},
},
@@ -956,6 +968,27 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
s.AddWaker(funcs[i].w, i)
}
+ // The following assertions and notifications are needed for restored
+ // endpoints. Fresh newly created endpoints have empty states and should
+ // not invoke any.
+ e.segmentQueue.mu.Lock()
+ if !e.segmentQueue.list.Empty() {
+ e.newSegmentWaker.Assert()
+ }
+ e.segmentQueue.mu.Unlock()
+
+ e.rcvListMu.Lock()
+ if !e.rcvList.Empty() {
+ e.waiterQueue.Notify(waiter.EventIn)
+ }
+ e.rcvListMu.Unlock()
+
+ e.mu.RLock()
+ if e.workerCleanup {
+ e.notifyProtocolGoroutine(notifyClose)
+ }
+ e.mu.RUnlock()
+
// Main loop. Handle segments until both send and receive ends of the
// connection have completed.
for !e.rcv.closed || !e.snd.closed || e.snd.sndUna != e.snd.sndNxtList {