summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/connect.go
diff options
context:
space:
mode:
authorFabricio Voznika <fvoznika@google.com>2018-05-29 14:23:17 -0700
committerShentubot <shentubot@google.com>2018-05-29 14:24:07 -0700
commitc5dc873e441706e8aaff7389e26c862f1386c6a8 (patch)
treeb29bb76081379a2aec47fe3ea3d666423a4de45b /pkg/tcpip/transport/tcp/connect.go
parenta8b90a7158d4197428639c912d97f3bdbaf63f5a (diff)
Automated rollback of changelist 196886839
PiperOrigin-RevId: 198457660 Change-Id: I6ea5cf0b4cfe2b5ba455325a7e5299880e5a088a
Diffstat (limited to 'pkg/tcpip/transport/tcp/connect.go')
-rw-r--r--pkg/tcpip/transport/tcp/connect.go84
1 files changed, 45 insertions, 39 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 66904856c..5115dabe6 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -690,7 +690,7 @@ func (e *endpoint) sendRaw(data buffer.View, flags byte, seq, ack seqnum.Value,
return err
}
-func (e *endpoint) handleWrite() *tcpip.Error {
+func (e *endpoint) handleWrite() bool {
// Move packets from send queue to send list. The queue is accessible
// from other goroutines and protected by the send mutex, while the send
// list is only accessible from the handler goroutine, so it needs no
@@ -714,42 +714,47 @@ func (e *endpoint) handleWrite() *tcpip.Error {
// Push out any new packets.
e.snd.sendData()
- return nil
+ return true
}
-func (e *endpoint) handleClose() *tcpip.Error {
+func (e *endpoint) handleClose() bool {
// Drain the send queue.
e.handleWrite()
// Mark send side as closed.
e.snd.closed = true
- return nil
+ return true
}
-// resetConnectionLocked sends a RST segment and puts the endpoint in an error
-// state with the given error code. This method must only be called from the
-// protocol goroutine.
-func (e *endpoint) resetConnectionLocked(err *tcpip.Error) {
+// resetConnection sends a RST segment and puts the endpoint in an error state
+// with the given error code.
+// This method must only be called from the protocol goroutine.
+func (e *endpoint) resetConnection(err *tcpip.Error) {
e.sendRaw(nil, flagAck|flagRst, e.snd.sndUna, e.rcv.rcvNxt, 0)
+ e.mu.Lock()
e.state = stateError
e.hardError = err
+ e.mu.Unlock()
}
-// completeWorkerLocked is called by the worker goroutine when it's about to
-// exit. It marks the worker as completed and performs cleanup work if requested
-// by Close().
-func (e *endpoint) completeWorkerLocked() {
+// completeWorker is called by the worker goroutine when it's about to exit. It
+// marks the worker as completed and performs cleanup work if requested by
+// Close().
+func (e *endpoint) completeWorker() {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
e.workerRunning = false
if e.workerCleanup {
- e.cleanupLocked()
+ e.cleanup()
}
}
// handleSegments pulls segments from the queue and processes them. It returns
-// no error if the protocol loop should continue, an error otherwise.
-func (e *endpoint) handleSegments() *tcpip.Error {
+// true if the protocol loop should continue, false otherwise.
+func (e *endpoint) handleSegments() bool {
checkRequeue := true
for i := 0; i < maxSegmentsPerWake; i++ {
s := e.segmentQueue.dequeue()
@@ -770,7 +775,11 @@ func (e *endpoint) handleSegments() *tcpip.Error {
// validated by checking their SEQ-fields." So
// we only process it if it's acceptable.
s.decRef()
- return tcpip.ErrConnectionReset
+ e.mu.Lock()
+ e.state = stateError
+ e.hardError = tcpip.ErrConnectionReset
+ e.mu.Unlock()
+ return false
}
} else if s.flagIsSet(flagAck) {
// Patch the window size in the segment according to the
@@ -807,7 +816,7 @@ func (e *endpoint) handleSegments() *tcpip.Error {
e.snd.sendAck()
}
- return nil
+ return true
}
// protocolMainLoop is the main loop of the TCP protocol. It runs in its own
@@ -818,10 +827,9 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
var closeWaker sleep.Waker
defer func() {
- // e.mu is expected to be held upon entering this section.
// When the protocol loop exits we should wake up our waiters.
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
- e.completeWorkerLocked()
+ e.completeWorker()
if e.snd != nil {
e.snd.resendTimer.cleanup()
@@ -830,12 +838,6 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
if closeTimer != nil {
closeTimer.Stop()
}
-
- if e.drainDone != nil {
- close(e.drainDone)
- }
-
- e.mu.Unlock()
}()
if !passive {
@@ -854,7 +856,12 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
e.mu.Lock()
e.state = stateError
e.hardError = err
- // Lock released in deferred statement.
+ drained := e.drainDone != nil
+ e.mu.Unlock()
+ if drained {
+ close(e.drainDone)
+ <-e.undrain
+ }
return err
}
@@ -885,7 +892,7 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
// wakes up.
funcs := []struct {
w *sleep.Waker
- f func() *tcpip.Error
+ f func() bool
}{
{
w: &e.sndWaker,
@@ -901,22 +908,24 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
},
{
w: &closeWaker,
- f: func() *tcpip.Error {
- return tcpip.ErrConnectionAborted
+ f: func() bool {
+ e.resetConnection(tcpip.ErrConnectionAborted)
+ return false
},
},
{
w: &e.snd.resendWaker,
- f: func() *tcpip.Error {
+ f: func() bool {
if !e.snd.retransmitTimerExpired() {
- return tcpip.ErrTimeout
+ e.resetConnection(tcpip.ErrTimeout)
+ return false
}
- return nil
+ return true
},
},
{
w: &e.notificationWaker,
- f: func() *tcpip.Error {
+ f: func() bool {
n := e.fetchNotifications()
if n&notifyNonZeroReceiveWindow != 0 {
e.rcv.nonZeroWindow()
@@ -943,7 +952,7 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
closeWaker.Assert()
})
}
- return nil
+ return true
},
},
}
@@ -960,10 +969,7 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
e.workMu.Unlock()
v, _ := s.Fetch(true)
e.workMu.Lock()
- if err := funcs[v].f(); err != nil {
- e.mu.Lock()
- e.resetConnectionLocked(err)
- // Lock released in deferred statement.
+ if !funcs[v].f() {
return nil
}
}
@@ -971,7 +977,7 @@ func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
// Mark endpoint as closed.
e.mu.Lock()
e.state = stateClosed
- // Lock released in deferred statement.
+ e.mu.Unlock()
return nil
}