diff options
-rw-r--r-- | pkg/tcpip/transport/tcp/accept.go | 137 |
1 files changed, 61 insertions, 76 deletions
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index 7348bb7a9..62b8d9de9 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -444,77 +444,6 @@ func (e *endpoint) notifyAborted() { e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } -// handleSynSegment is called in its own goroutine once the listening endpoint -// receives a SYN segment. It is responsible for completing the handshake and -// queueing the new endpoint for acceptance. -// -// A limited number of these goroutines are allowed before TCP starts using SYN -// cookies to accept connections. -// -// +checklocks:e.mu -func (e *endpoint) handleSynSegment(ctx *listenContext, s *segment, opts header.TCPSynOptions) tcpip.Error { - defer s.decRef() - - h, err := ctx.startHandshake(s, opts, &waiter.Queue{}, e.owner) - if err != nil { - e.stack.Stats().TCP.FailedConnectionAttempts.Increment() - e.stats.FailedConnectionAttempts.Increment() - atomic.AddInt32(&e.synRcvdCount, -1) - return err - } - - go func() { - // Note that startHandshake returns a locked endpoint. The - // force call here just makes it so. - if err := h.complete(); err != nil { // +checklocksforce - e.stack.Stats().TCP.FailedConnectionAttempts.Increment() - e.stats.FailedConnectionAttempts.Increment() - ctx.cleanupFailedHandshake(h) - atomic.AddInt32(&e.synRcvdCount, -1) - return - } - ctx.cleanupCompletedHandshake(h) - h.ep.startAcceptedLoop() - e.stack.Stats().TCP.PassiveConnectionOpenings.Increment() - - // Deliver the endpoint to the accept queue. - e.mu.Lock() - e.pendingAccepted.Add(1) - e.mu.Unlock() - defer e.pendingAccepted.Done() - - // Drop the lock before notifying to avoid deadlock in user-specified - // callbacks. - delivered := func() bool { - e.acceptMu.Lock() - defer e.acceptMu.Unlock() - for { - if e.accepted == (accepted{}) { - // If the listener has transitioned out of the listen state (accepted - // is the zero value), the new endpoint is reset instead. - return false - } - if e.accepted.acceptQueueIsFullLocked() { - e.acceptCond.Wait() - continue - } - - e.accepted.endpoints.PushBack(h.ep) - atomic.AddInt32(&e.synRcvdCount, -1) - return true - } - }() - - if delivered { - e.waiterQueue.Notify(waiter.ReadableEvents) - } else { - h.ep.notifyProtocolGoroutine(notifyReset) - } - }() - - return nil -} - func (e *endpoint) synRcvdBacklogFull() bool { e.acceptMu.Lock() acceptedCap := e.accepted.cap @@ -581,10 +510,69 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err opts := parseSynSegmentOptions(s) if !alwaysUseSynCookies && !e.synRcvdBacklogFull() { - s.incRef() atomic.AddInt32(&e.synRcvdCount, 1) - return e.handleSynSegment(ctx, s, opts) + + h, err := ctx.startHandshake(s, opts, &waiter.Queue{}, e.owner) + if err != nil { + e.stack.Stats().TCP.FailedConnectionAttempts.Increment() + e.stats.FailedConnectionAttempts.Increment() + atomic.AddInt32(&e.synRcvdCount, -1) + return err + } + + go func() { + // Note that startHandshake returns a locked endpoint. The force call + // here just makes it so. + if err := h.complete(); err != nil { // +checklocksforce + e.stack.Stats().TCP.FailedConnectionAttempts.Increment() + e.stats.FailedConnectionAttempts.Increment() + ctx.cleanupFailedHandshake(h) + atomic.AddInt32(&e.synRcvdCount, -1) + return + } + ctx.cleanupCompletedHandshake(h) + h.ep.startAcceptedLoop() + e.stack.Stats().TCP.PassiveConnectionOpenings.Increment() + + // Deliver the endpoint to the accept queue. + e.mu.Lock() + e.pendingAccepted.Add(1) + e.mu.Unlock() + defer e.pendingAccepted.Done() + + // Drop the lock before notifying to avoid deadlock in user-specified + // callbacks. + delivered := func() bool { + e.acceptMu.Lock() + defer e.acceptMu.Unlock() + for { + if e.accepted == (accepted{}) { + // If the listener has transitioned out of the listen state + // (accepted is the zero value), the new endpoint is reset + // instead. + return false + } + if e.accepted.acceptQueueIsFullLocked() { + e.acceptCond.Wait() + continue + } + + e.accepted.endpoints.PushBack(h.ep) + atomic.AddInt32(&e.synRcvdCount, -1) + return true + } + }() + + if delivered { + e.waiterQueue.Notify(waiter.ReadableEvents) + } else { + h.ep.notifyProtocolGoroutine(notifyReset) + } + }() + + return nil } + route, err := e.stack.FindRoute(s.nicID, s.dstAddr, s.srcAddr, s.netProto, false /* multicastLoop */) if err != nil { return err @@ -789,9 +777,6 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) { ctx := newListenContext(e.stack, e.protocol, e, rcvWnd, v6Only, e.NetProto) defer func() { - // Mark endpoint as closed. This will prevent goroutines running - // handleSynSegment() from attempting to queue new connections - // to the endpoint. e.setEndpointState(StateClose) // Close any endpoints in SYN-RCVD state. |