diff options
author | Fabricio Voznika <fvoznika@google.com> | 2018-05-29 14:23:17 -0700 |
---|---|---|
committer | Shentubot <shentubot@google.com> | 2018-05-29 14:24:07 -0700 |
commit | c5dc873e441706e8aaff7389e26c862f1386c6a8 (patch) | |
tree | b29bb76081379a2aec47fe3ea3d666423a4de45b /pkg/tcpip/transport/tcp/connect.go | |
parent | a8b90a7158d4197428639c912d97f3bdbaf63f5a (diff) |
Automated rollback of changelist 196886839
PiperOrigin-RevId: 198457660
Change-Id: I6ea5cf0b4cfe2b5ba455325a7e5299880e5a088a
Diffstat (limited to 'pkg/tcpip/transport/tcp/connect.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 84 |
1 files changed, 45 insertions, 39 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 66904856c..5115dabe6 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -690,7 +690,7 @@ func (e *endpoint) sendRaw(data buffer.View, flags byte, seq, ack seqnum.Value, return err } -func (e *endpoint) handleWrite() *tcpip.Error { +func (e *endpoint) handleWrite() bool { // Move packets from send queue to send list. The queue is accessible // from other goroutines and protected by the send mutex, while the send // list is only accessible from the handler goroutine, so it needs no @@ -714,42 +714,47 @@ func (e *endpoint) handleWrite() *tcpip.Error { // Push out any new packets. e.snd.sendData() - return nil + return true } -func (e *endpoint) handleClose() *tcpip.Error { +func (e *endpoint) handleClose() bool { // Drain the send queue. e.handleWrite() // Mark send side as closed. e.snd.closed = true - return nil + return true } -// resetConnectionLocked sends a RST segment and puts the endpoint in an error -// state with the given error code. This method must only be called from the -// protocol goroutine. -func (e *endpoint) resetConnectionLocked(err *tcpip.Error) { +// resetConnection sends a RST segment and puts the endpoint in an error state +// with the given error code. +// This method must only be called from the protocol goroutine. +func (e *endpoint) resetConnection(err *tcpip.Error) { e.sendRaw(nil, flagAck|flagRst, e.snd.sndUna, e.rcv.rcvNxt, 0) + e.mu.Lock() e.state = stateError e.hardError = err + e.mu.Unlock() } -// completeWorkerLocked is called by the worker goroutine when it's about to -// exit. It marks the worker as completed and performs cleanup work if requested -// by Close(). -func (e *endpoint) completeWorkerLocked() { +// completeWorker is called by the worker goroutine when it's about to exit. It +// marks the worker as completed and performs cleanup work if requested by +// Close(). +func (e *endpoint) completeWorker() { + e.mu.Lock() + defer e.mu.Unlock() + e.workerRunning = false if e.workerCleanup { - e.cleanupLocked() + e.cleanup() } } // handleSegments pulls segments from the queue and processes them. It returns -// no error if the protocol loop should continue, an error otherwise. -func (e *endpoint) handleSegments() *tcpip.Error { +// true if the protocol loop should continue, false otherwise. +func (e *endpoint) handleSegments() bool { checkRequeue := true for i := 0; i < maxSegmentsPerWake; i++ { s := e.segmentQueue.dequeue() @@ -770,7 +775,11 @@ func (e *endpoint) handleSegments() *tcpip.Error { // validated by checking their SEQ-fields." So // we only process it if it's acceptable. s.decRef() - return tcpip.ErrConnectionReset + e.mu.Lock() + e.state = stateError + e.hardError = tcpip.ErrConnectionReset + e.mu.Unlock() + return false } } else if s.flagIsSet(flagAck) { // Patch the window size in the segment according to the @@ -807,7 +816,7 @@ func (e *endpoint) handleSegments() *tcpip.Error { e.snd.sendAck() } - return nil + return true } // protocolMainLoop is the main loop of the TCP protocol. It runs in its own @@ -818,10 +827,9 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error { var closeWaker sleep.Waker defer func() { - // e.mu is expected to be held upon entering this section. // When the protocol loop exits we should wake up our waiters. e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) - e.completeWorkerLocked() + e.completeWorker() if e.snd != nil { e.snd.resendTimer.cleanup() @@ -830,12 +838,6 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error { if closeTimer != nil { closeTimer.Stop() } - - if e.drainDone != nil { - close(e.drainDone) - } - - e.mu.Unlock() }() if !passive { @@ -854,7 +856,12 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error { e.mu.Lock() e.state = stateError e.hardError = err - // Lock released in deferred statement. + drained := e.drainDone != nil + e.mu.Unlock() + if drained { + close(e.drainDone) + <-e.undrain + } return err } @@ -885,7 +892,7 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error { // wakes up. funcs := []struct { w *sleep.Waker - f func() *tcpip.Error + f func() bool }{ { w: &e.sndWaker, @@ -901,22 +908,24 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error { }, { w: &closeWaker, - f: func() *tcpip.Error { - return tcpip.ErrConnectionAborted + f: func() bool { + e.resetConnection(tcpip.ErrConnectionAborted) + return false }, }, { w: &e.snd.resendWaker, - f: func() *tcpip.Error { + f: func() bool { if !e.snd.retransmitTimerExpired() { - return tcpip.ErrTimeout + e.resetConnection(tcpip.ErrTimeout) + return false } - return nil + return true }, }, { w: &e.notificationWaker, - f: func() *tcpip.Error { + f: func() bool { n := e.fetchNotifications() if n¬ifyNonZeroReceiveWindow != 0 { e.rcv.nonZeroWindow() @@ -943,7 +952,7 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error { closeWaker.Assert() }) } - return nil + return true }, }, } @@ -960,10 +969,7 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error { e.workMu.Unlock() v, _ := s.Fetch(true) e.workMu.Lock() - if err := funcs[v].f(); err != nil { - e.mu.Lock() - e.resetConnectionLocked(err) - // Lock released in deferred statement. + if !funcs[v].f() { return nil } } @@ -971,7 +977,7 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error { // Mark endpoint as closed. e.mu.Lock() e.state = stateClosed - // Lock released in deferred statement. + e.mu.Unlock() return nil } |