summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/tcpip/transport/tcp/accept.go137
1 files changed, 61 insertions, 76 deletions
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index 7348bb7a9..62b8d9de9 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -444,77 +444,6 @@ func (e *endpoint) notifyAborted() {
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
-// handleSynSegment is called in its own goroutine once the listening endpoint
-// receives a SYN segment. It is responsible for completing the handshake and
-// queueing the new endpoint for acceptance.
-//
-// A limited number of these goroutines are allowed before TCP starts using SYN
-// cookies to accept connections.
-//
-// +checklocks:e.mu
-func (e *endpoint) handleSynSegment(ctx *listenContext, s *segment, opts header.TCPSynOptions) tcpip.Error {
- defer s.decRef()
-
- h, err := ctx.startHandshake(s, opts, &waiter.Queue{}, e.owner)
- if err != nil {
- e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
- e.stats.FailedConnectionAttempts.Increment()
- atomic.AddInt32(&e.synRcvdCount, -1)
- return err
- }
-
- go func() {
- // Note that startHandshake returns a locked endpoint. The
- // force call here just makes it so.
- if err := h.complete(); err != nil { // +checklocksforce
- e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
- e.stats.FailedConnectionAttempts.Increment()
- ctx.cleanupFailedHandshake(h)
- atomic.AddInt32(&e.synRcvdCount, -1)
- return
- }
- ctx.cleanupCompletedHandshake(h)
- h.ep.startAcceptedLoop()
- e.stack.Stats().TCP.PassiveConnectionOpenings.Increment()
-
- // Deliver the endpoint to the accept queue.
- e.mu.Lock()
- e.pendingAccepted.Add(1)
- e.mu.Unlock()
- defer e.pendingAccepted.Done()
-
- // Drop the lock before notifying to avoid deadlock in user-specified
- // callbacks.
- delivered := func() bool {
- e.acceptMu.Lock()
- defer e.acceptMu.Unlock()
- for {
- if e.accepted == (accepted{}) {
- // If the listener has transitioned out of the listen state (accepted
- // is the zero value), the new endpoint is reset instead.
- return false
- }
- if e.accepted.acceptQueueIsFullLocked() {
- e.acceptCond.Wait()
- continue
- }
-
- e.accepted.endpoints.PushBack(h.ep)
- atomic.AddInt32(&e.synRcvdCount, -1)
- return true
- }
- }()
-
- if delivered {
- e.waiterQueue.Notify(waiter.ReadableEvents)
- } else {
- h.ep.notifyProtocolGoroutine(notifyReset)
- }
- }()
-
- return nil
-}
-
func (e *endpoint) synRcvdBacklogFull() bool {
e.acceptMu.Lock()
acceptedCap := e.accepted.cap
@@ -581,10 +510,69 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err
opts := parseSynSegmentOptions(s)
if !alwaysUseSynCookies && !e.synRcvdBacklogFull() {
- s.incRef()
atomic.AddInt32(&e.synRcvdCount, 1)
- return e.handleSynSegment(ctx, s, opts)
+
+ h, err := ctx.startHandshake(s, opts, &waiter.Queue{}, e.owner)
+ if err != nil {
+ e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
+ e.stats.FailedConnectionAttempts.Increment()
+ atomic.AddInt32(&e.synRcvdCount, -1)
+ return err
+ }
+
+ go func() {
+ // Note that startHandshake returns a locked endpoint. The force call
+ // here just makes it so.
+ if err := h.complete(); err != nil { // +checklocksforce
+ e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
+ e.stats.FailedConnectionAttempts.Increment()
+ ctx.cleanupFailedHandshake(h)
+ atomic.AddInt32(&e.synRcvdCount, -1)
+ return
+ }
+ ctx.cleanupCompletedHandshake(h)
+ h.ep.startAcceptedLoop()
+ e.stack.Stats().TCP.PassiveConnectionOpenings.Increment()
+
+ // Deliver the endpoint to the accept queue.
+ e.mu.Lock()
+ e.pendingAccepted.Add(1)
+ e.mu.Unlock()
+ defer e.pendingAccepted.Done()
+
+ // Drop the lock before notifying to avoid deadlock in user-specified
+ // callbacks.
+ delivered := func() bool {
+ e.acceptMu.Lock()
+ defer e.acceptMu.Unlock()
+ for {
+ if e.accepted == (accepted{}) {
+ // If the listener has transitioned out of the listen state
+ // (accepted is the zero value), the new endpoint is reset
+ // instead.
+ return false
+ }
+ if e.accepted.acceptQueueIsFullLocked() {
+ e.acceptCond.Wait()
+ continue
+ }
+
+ e.accepted.endpoints.PushBack(h.ep)
+ atomic.AddInt32(&e.synRcvdCount, -1)
+ return true
+ }
+ }()
+
+ if delivered {
+ e.waiterQueue.Notify(waiter.ReadableEvents)
+ } else {
+ h.ep.notifyProtocolGoroutine(notifyReset)
+ }
+ }()
+
+ return nil
}
+
route, err := e.stack.FindRoute(s.nicID, s.dstAddr, s.srcAddr, s.netProto, false /* multicastLoop */)
if err != nil {
return err
@@ -789,9 +777,6 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) {
ctx := newListenContext(e.stack, e.protocol, e, rcvWnd, v6Only, e.NetProto)
defer func() {
- // Mark endpoint as closed. This will prevent goroutines running
- // handleSynSegment() from attempting to queue new connections
- // to the endpoint.
e.setEndpointState(StateClose)
// Close any endpoints in SYN-RCVD state.