From 2fea7d096b6224da50e09fa4bace7f3c203ed074 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Sat, 10 Apr 2021 00:58:43 -0700 Subject: Don't store accepted endpoints in a channel Use a linked list with cached length and capacity. The current channel is already composed with a mutex and condition variable, and is never used for its channel-like properties. Channels also require eager allocation equal to their capacity, which a linked list does not. PiperOrigin-RevId: 367766626 --- pkg/tcpip/transport/tcp/endpoint.go | 90 ++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 47 deletions(-) (limited to 'pkg/tcpip/transport/tcp/endpoint.go') diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 5001d222e..9fbaf6f4b 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -15,6 +15,7 @@ package tcp import ( + "container/list" "encoding/binary" "fmt" "io" @@ -322,6 +323,15 @@ type EndpointInfo struct { // marker interface. func (*EndpointInfo) IsEndpointInfo() {} +// +stateify savable +type accepted struct { + // NB: this could be an endpointList, but ilist only permits endpoints to + // belong to one list at a time, and endpoints are already stored in the + // dispatcher's list. + endpoints list.List `state:".([]*endpoint)"` + cap int +} + // endpoint represents a TCP endpoint. This struct serves as the interface // between users of the endpoint and the protocol implementation; it is legal to // have concurrent goroutines make calls into the endpoint, they are properly @@ -337,7 +347,7 @@ func (*EndpointInfo) IsEndpointInfo() {} // The following three mutexes can be acquired independent of e.mu but if // acquired with e.mu then e.mu must be acquired first. // -// e.acceptMu -> protects acceptedChan. +// e.acceptMu -> protects accepted. // e.rcvListMu -> Protects the rcvList and associated fields. // e.sndBufMu -> Protects the sndQueue and associated fields. // e.lastErrorMu -> Protects the lastError field. @@ -607,33 +617,26 @@ type endpoint struct { // listener. deferAccept time.Duration - // pendingAccepted is a synchronization primitive used to track number - // of connections that are queued up to be delivered to the accepted - // channel. We use this to ensure that all goroutines blocked on writing - // to the acceptedChan below terminate before we close acceptedChan. + // pendingAccepted tracks connections queued to be accepted. It is used to + // ensure such queued connections are terminated before the accepted queue is + // marked closed (by setting its capacity to zero). pendingAccepted sync.WaitGroup `state:"nosave"` - // acceptMu protects acceptedChan. + // acceptMu protects accepted. acceptMu sync.Mutex `state:"nosave"` // acceptCond is a condition variable that can be used to block on when - // acceptedChan is full and an endpoint is ready to be delivered. - // - // This condition variable is required because just blocking on sending - // to acceptedChan does not work in cases where endpoint.Listen is - // called twice with different backlog values. In such cases the channel - // is closed and a new one created. Any pending goroutines blocking on - // the write to the channel will panic. + // accepted is full and an endpoint is ready to be delivered. // // We use this condition variable to block/unblock goroutines which // tried to deliver an endpoint but couldn't because accept backlog was // full ( See: endpoint.deliverAccepted ). acceptCond *sync.Cond `state:"nosave"` - // acceptedChan is used by a listening endpoint protocol goroutine to + // accepted is used by a listening endpoint protocol goroutine to // send newly accepted connections to the endpoint so that they can be // read by Accept() calls. - acceptedChan chan *endpoint `state:".([]*endpoint)"` + accepted accepted // The following are only used from the protocol goroutine, and // therefore don't need locks to protect them. @@ -962,7 +965,7 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // Check if there's anything in the accepted channel. if (mask & waiter.ReadableEvents) != 0 { e.acceptMu.Lock() - if len(e.acceptedChan) > 0 { + if e.accepted.endpoints.Len() != 0 { result |= waiter.ReadableEvents } e.acceptMu.Unlock() @@ -1145,22 +1148,22 @@ func (e *endpoint) closeNoShutdownLocked() { // handshake but not yet been delivered to the application. func (e *endpoint) closePendingAcceptableConnectionsLocked() { e.acceptMu.Lock() - if e.acceptedChan == nil { - e.acceptMu.Unlock() + acceptedCopy := e.accepted + e.accepted = accepted{} + e.acceptMu.Unlock() + + if acceptedCopy == (accepted{}) { return } - close(e.acceptedChan) - ch := e.acceptedChan - e.acceptedChan = nil + e.acceptCond.Broadcast() - e.acceptMu.Unlock() // Reset all connections that are waiting to be accepted. - for n := range ch { - n.notifyProtocolGoroutine(notifyReset) + for n := acceptedCopy.endpoints.Front(); n != nil; n = n.Next() { + n.Value.(*endpoint).notifyProtocolGoroutine(notifyReset) } // Wait for reset of all endpoints that are still waiting to be delivered to - // the now closed acceptedChan. + // the now closed accepted. e.pendingAccepted.Wait() } @@ -2495,28 +2498,20 @@ func (e *endpoint) listen(backlog int) tcpip.Error { if e.EndpointState() == StateListen && !e.closed { e.acceptMu.Lock() defer e.acceptMu.Unlock() - if e.acceptedChan == nil { + if e.accepted == (accepted{}) { // listen is called after shutdown. - e.acceptedChan = make(chan *endpoint, backlog) + e.accepted.cap = backlog e.shutdownFlags = 0 e.rcvListMu.Lock() e.rcvClosed = false e.rcvListMu.Unlock() } else { - // Adjust the size of the channel iff we can fix + // Adjust the size of the backlog iff we can fit // existing pending connections into the new one. - if len(e.acceptedChan) > backlog { + if e.accepted.endpoints.Len() > backlog { return &tcpip.ErrInvalidEndpointState{} } - if cap(e.acceptedChan) == backlog { - return nil - } - origChan := e.acceptedChan - e.acceptedChan = make(chan *endpoint, backlog) - close(origChan) - for ep := range origChan { - e.acceptedChan <- ep - } + e.accepted.cap = backlog } // Notify any blocked goroutines that they can attempt to @@ -2549,12 +2544,12 @@ func (e *endpoint) listen(backlog int) tcpip.Error { e.isRegistered = true e.setEndpointState(StateListen) - // The channel may be non-nil when we're restoring the endpoint, and it + // The queue may be non-zero when we're restoring the endpoint, and it // may be pre-populated with some previously accepted (but not Accepted) // endpoints. e.acceptMu.Lock() - if e.acceptedChan == nil { - e.acceptedChan = make(chan *endpoint, backlog) + if e.accepted == (accepted{}) { + e.accepted.cap = backlog } e.acceptMu.Unlock() @@ -2591,15 +2586,16 @@ func (e *endpoint) Accept(peerAddr *tcpip.FullAddress) (tcpip.Endpoint, *waiter. } // Get the new accepted endpoint. - e.acceptMu.Lock() - defer e.acceptMu.Unlock() var n *endpoint - select { - case n = <-e.acceptedChan: - e.acceptCond.Signal() - default: + e.acceptMu.Lock() + if element := e.accepted.endpoints.Front(); element != nil { + n = e.accepted.endpoints.Remove(element).(*endpoint) + } + e.acceptMu.Unlock() + if n == nil { return nil, nil, &tcpip.ErrWouldBlock{} } + e.acceptCond.Signal() if peerAddr != nil { *peerAddr = n.getRemoteAddress() } -- cgit v1.2.3