diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/tcpip/transport/tcp/accept.go | 30 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 40 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint_state.go | 6 |
3 files changed, 37 insertions, 39 deletions
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index d41e07521..52d121907 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -15,6 +15,7 @@ package tcp import ( + "container/list" "crypto/sha1" "encoding/binary" "fmt" @@ -409,13 +410,24 @@ func (e *endpoint) notifyAborted() { func (e *endpoint) acceptQueueIsFull() bool { e.acceptMu.Lock() - full := e.accepted.acceptQueueIsFullLocked() + full := e.acceptQueue.isFull() e.acceptMu.Unlock() return full } -func (a *accepted) acceptQueueIsFullLocked() bool { - return a.endpoints.Len() == a.cap +// +stateify savable +type acceptQueue struct { + // NB: this could be an endpointList, but ilist only permits endpoints to + // belong to one list at a time, and endpoints are already stored in the + // dispatcher's list. + endpoints list.List `state:".([]*endpoint)"` + + // capacity is the maximum number of endpoints that can be in endpoints. + capacity int +} + +func (a *acceptQueue) isFull() bool { + return a.endpoints.Len() == a.capacity } // handleListenSegment is called when a listening endpoint receives a segment @@ -467,7 +479,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err // listen backlog. But, the SYNRCVD connections count is always checked // against the listen backlog value for Linux parity reason. // https://github.com/torvalds/linux/blob/7acac4b3196/include/net/inet_connection_sock.h#L280 - if len(ctx.pendingEndpoints) == e.accepted.cap-1 { + if len(ctx.pendingEndpoints) == e.acceptQueue.capacity-1 { return true, nil } @@ -513,18 +525,18 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err e.acceptMu.Lock() defer e.acceptMu.Unlock() for { - if e.accepted == (accepted{}) { + if e.acceptQueue == (acceptQueue{}) { // If the listener has transitioned out of the listen state // (accepted is the zero value), the new endpoint is reset // instead. return false } - if e.accepted.acceptQueueIsFullLocked() { + if e.acceptQueue.isFull() { e.acceptCond.Wait() continue } - e.accepted.endpoints.PushBack(h.ep) + e.acceptQueue.endpoints.PushBack(h.ep) return true } }() @@ -591,7 +603,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err // if there is an error), to guarantee that we will keep our spot in the // queue even if another handshake from the syn queue completes. e.acceptMu.Lock() - if e.accepted.acceptQueueIsFullLocked() { + if e.acceptQueue.isFull() { // Silently drop the ack as the application can't accept // the connection at this point. The ack will be // retransmitted by the sender anyway and we can @@ -729,7 +741,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err e.stack.Stats().TCP.PassiveConnectionOpenings.Increment() // Deliver the endpoint to the accept queue. - e.accepted.endpoints.PushBack(n) + e.acceptQueue.endpoints.PushBack(n) e.acceptMu.Unlock() e.waiterQueue.Notify(waiter.ReadableEvents) diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 9c5b1b016..ec7e98ec8 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -15,7 +15,6 @@ package tcp import ( - "container/list" "encoding/binary" "fmt" "io" @@ -317,18 +316,6 @@ type rcvQueueInfo struct { rcvQueue segmentList `state:"wait"` } -// +stateify savable -type accepted struct { - // NB: this could be an endpointList, but ilist only permits endpoints to - // belong to one list at a time, and endpoints are already stored in the - // dispatcher's list. - endpoints list.List `state:".([]*endpoint)"` - - // cap is the maximum number of endpoints that can be in the accepted endpoint - // list. - cap int -} - // endpoint represents a TCP endpoint. This struct serves as the interface // between users of the endpoint and the protocol implementation; it is legal to // have concurrent goroutines make calls into the endpoint, they are properly @@ -344,7 +331,7 @@ type accepted struct { // The following three mutexes can be acquired independent of e.mu but if // acquired with e.mu then e.mu must be acquired first. // -// e.acceptMu -> Protects e.accepted. +// e.acceptMu -> Protects e.acceptQueue. // e.rcvQueueMu -> Protects e.rcvQueue and associated fields. // e.sndQueueMu -> Protects the e.sndQueue and associated fields. // e.lastErrorMu -> Protects the lastError field. @@ -581,7 +568,7 @@ type endpoint struct { // send newly accepted connections to the endpoint so that they can be // read by Accept() calls. // +checklocks:acceptMu - accepted accepted + acceptQueue acceptQueue // The following are only used from the protocol goroutine, and // therefore don't need locks to protect them. @@ -911,7 +898,7 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // Check if there's anything in the accepted queue. if (mask & waiter.ReadableEvents) != 0 { e.acceptMu.Lock() - if e.accepted.endpoints.Len() != 0 { + if e.acceptQueue.endpoints.Len() != 0 { result |= waiter.ReadableEvents } e.acceptMu.Unlock() @@ -1094,11 +1081,11 @@ func (e *endpoint) closeNoShutdownLocked() { // handshake but not yet been delivered to the application. func (e *endpoint) closePendingAcceptableConnectionsLocked() { e.acceptMu.Lock() - acceptedCopy := e.accepted - e.accepted = accepted{} + acceptedCopy := e.acceptQueue + e.acceptQueue = acceptQueue{} e.acceptMu.Unlock() - if acceptedCopy == (accepted{}) { + if acceptedCopy == (acceptQueue{}) { return } @@ -2499,9 +2486,8 @@ func (e *endpoint) listen(backlog int) tcpip.Error { if e.EndpointState() == StateListen && !e.closed { e.acceptMu.Lock() defer e.acceptMu.Unlock() - if e.accepted == (accepted{}) { + if e.acceptQueue == (acceptQueue{}) { // listen is called after shutdown. - e.accepted.cap = backlog e.shutdownFlags = 0 e.rcvQueueInfo.rcvQueueMu.Lock() e.rcvQueueInfo.RcvClosed = false @@ -2509,11 +2495,11 @@ func (e *endpoint) listen(backlog int) tcpip.Error { } else { // Adjust the size of the backlog iff we can fit // existing pending connections into the new one. - if e.accepted.endpoints.Len() > backlog { + if e.acceptQueue.endpoints.Len() > backlog { return &tcpip.ErrInvalidEndpointState{} } - e.accepted.cap = backlog } + e.acceptQueue.capacity = backlog // Notify any blocked goroutines that they can attempt to // deliver endpoints again. @@ -2549,8 +2535,8 @@ func (e *endpoint) listen(backlog int) tcpip.Error { // may be pre-populated with some previously accepted (but not Accepted) // endpoints. e.acceptMu.Lock() - if e.accepted == (accepted{}) { - e.accepted.cap = backlog + if e.acceptQueue == (acceptQueue{}) { + e.acceptQueue.capacity = backlog } e.acceptMu.Unlock() @@ -2590,8 +2576,8 @@ func (e *endpoint) Accept(peerAddr *tcpip.FullAddress) (tcpip.Endpoint, *waiter. // Get the new accepted endpoint. var n *endpoint e.acceptMu.Lock() - if element := e.accepted.endpoints.Front(); element != nil { - n = e.accepted.endpoints.Remove(element).(*endpoint) + if element := e.acceptQueue.endpoints.Front(); element != nil { + n = e.acceptQueue.endpoints.Remove(element).(*endpoint) } e.acceptMu.Unlock() if n == nil { diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go index 381f4474d..94072a115 100644 --- a/pkg/tcpip/transport/tcp/endpoint_state.go +++ b/pkg/tcpip/transport/tcp/endpoint_state.go @@ -100,7 +100,7 @@ func (e *endpoint) beforeSave() { } // saveEndpoints is invoked by stateify. -func (a *accepted) saveEndpoints() []*endpoint { +func (a *acceptQueue) saveEndpoints() []*endpoint { acceptedEndpoints := make([]*endpoint, a.endpoints.Len()) for i, e := 0, a.endpoints.Front(); e != nil; i, e = i+1, e.Next() { acceptedEndpoints[i] = e.Value.(*endpoint) @@ -109,7 +109,7 @@ func (a *accepted) saveEndpoints() []*endpoint { } // loadEndpoints is invoked by stateify. -func (a *accepted) loadEndpoints(acceptedEndpoints []*endpoint) { +func (a *acceptQueue) loadEndpoints(acceptedEndpoints []*endpoint) { for _, ep := range acceptedEndpoints { a.endpoints.PushBack(ep) } @@ -252,7 +252,7 @@ func (e *endpoint) Resume(s *stack.Stack) { connectedLoading.Wait() bind() e.acceptMu.Lock() - backlog := e.accepted.cap + backlog := e.acceptQueue.capacity e.acceptMu.Unlock() if err := e.Listen(backlog); err != nil { panic("endpoint listening failed: " + err.String()) |