summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/endpoint.go
diff options
context:
space:
mode:
authorTamir Duberstein <tamird@google.com>2021-04-10 00:58:43 -0700
committergVisor bot <gvisor-bot@google.com>2021-04-10 01:00:41 -0700
commit2fea7d096b6224da50e09fa4bace7f3c203ed074 (patch)
tree34e3420bf4e5822d55a49cecf6f9092d3b30363c /pkg/tcpip/transport/tcp/endpoint.go
parentd1edabdca016b9d80295855a3ce6d2816486d65c (diff)
Don't store accepted endpoints in a channel
Use a linked list with cached length and capacity. The current channel is already composed with a mutex and condition variable, and is never used for its channel-like properties. Channels also require eager allocation equal to their capacity, which a linked list does not. PiperOrigin-RevId: 367766626
Diffstat (limited to 'pkg/tcpip/transport/tcp/endpoint.go')
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go90
1 files changed, 43 insertions, 47 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 5001d222e..9fbaf6f4b 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -15,6 +15,7 @@
package tcp
import (
+ "container/list"
"encoding/binary"
"fmt"
"io"
@@ -322,6 +323,15 @@ type EndpointInfo struct {
// marker interface.
func (*EndpointInfo) IsEndpointInfo() {}
+// +stateify savable
+type accepted struct {
+ // NB: this could be an endpointList, but ilist only permits endpoints to
+ // belong to one list at a time, and endpoints are already stored in the
+ // dispatcher's list.
+ endpoints list.List `state:".([]*endpoint)"`
+ cap int
+}
+
// endpoint represents a TCP endpoint. This struct serves as the interface
// between users of the endpoint and the protocol implementation; it is legal to
// have concurrent goroutines make calls into the endpoint, they are properly
@@ -337,7 +347,7 @@ func (*EndpointInfo) IsEndpointInfo() {}
// The following three mutexes can be acquired independent of e.mu but if
// acquired with e.mu then e.mu must be acquired first.
//
-// e.acceptMu -> protects acceptedChan.
+// e.acceptMu -> protects accepted.
// e.rcvListMu -> Protects the rcvList and associated fields.
// e.sndBufMu -> Protects the sndQueue and associated fields.
// e.lastErrorMu -> Protects the lastError field.
@@ -607,33 +617,26 @@ type endpoint struct {
// listener.
deferAccept time.Duration
- // pendingAccepted is a synchronization primitive used to track number
- // of connections that are queued up to be delivered to the accepted
- // channel. We use this to ensure that all goroutines blocked on writing
- // to the acceptedChan below terminate before we close acceptedChan.
+ // pendingAccepted tracks connections queued to be accepted. It is used to
+ // ensure such queued connections are terminated before the accepted queue is
+ // marked closed (by setting its capacity to zero).
pendingAccepted sync.WaitGroup `state:"nosave"`
- // acceptMu protects acceptedChan.
+ // acceptMu protects accepted.
acceptMu sync.Mutex `state:"nosave"`
// acceptCond is a condition variable that can be used to block on when
- // acceptedChan is full and an endpoint is ready to be delivered.
- //
- // This condition variable is required because just blocking on sending
- // to acceptedChan does not work in cases where endpoint.Listen is
- // called twice with different backlog values. In such cases the channel
- // is closed and a new one created. Any pending goroutines blocking on
- // the write to the channel will panic.
+ // accepted is full and an endpoint is ready to be delivered.
//
// We use this condition variable to block/unblock goroutines which
// tried to deliver an endpoint but couldn't because accept backlog was
// full ( See: endpoint.deliverAccepted ).
acceptCond *sync.Cond `state:"nosave"`
- // acceptedChan is used by a listening endpoint protocol goroutine to
+ // accepted is used by a listening endpoint protocol goroutine to
// send newly accepted connections to the endpoint so that they can be
// read by Accept() calls.
- acceptedChan chan *endpoint `state:".([]*endpoint)"`
+ accepted accepted
// The following are only used from the protocol goroutine, and
// therefore don't need locks to protect them.
@@ -962,7 +965,7 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
// Check if there's anything in the accepted channel.
if (mask & waiter.ReadableEvents) != 0 {
e.acceptMu.Lock()
- if len(e.acceptedChan) > 0 {
+ if e.accepted.endpoints.Len() != 0 {
result |= waiter.ReadableEvents
}
e.acceptMu.Unlock()
@@ -1145,22 +1148,22 @@ func (e *endpoint) closeNoShutdownLocked() {
// handshake but not yet been delivered to the application.
func (e *endpoint) closePendingAcceptableConnectionsLocked() {
e.acceptMu.Lock()
- if e.acceptedChan == nil {
- e.acceptMu.Unlock()
+ acceptedCopy := e.accepted
+ e.accepted = accepted{}
+ e.acceptMu.Unlock()
+
+ if acceptedCopy == (accepted{}) {
return
}
- close(e.acceptedChan)
- ch := e.acceptedChan
- e.acceptedChan = nil
+
e.acceptCond.Broadcast()
- e.acceptMu.Unlock()
// Reset all connections that are waiting to be accepted.
- for n := range ch {
- n.notifyProtocolGoroutine(notifyReset)
+ for n := acceptedCopy.endpoints.Front(); n != nil; n = n.Next() {
+ n.Value.(*endpoint).notifyProtocolGoroutine(notifyReset)
}
// Wait for reset of all endpoints that are still waiting to be delivered to
- // the now closed acceptedChan.
+ // the now closed accepted.
e.pendingAccepted.Wait()
}
@@ -2495,28 +2498,20 @@ func (e *endpoint) listen(backlog int) tcpip.Error {
if e.EndpointState() == StateListen && !e.closed {
e.acceptMu.Lock()
defer e.acceptMu.Unlock()
- if e.acceptedChan == nil {
+ if e.accepted == (accepted{}) {
// listen is called after shutdown.
- e.acceptedChan = make(chan *endpoint, backlog)
+ e.accepted.cap = backlog
e.shutdownFlags = 0
e.rcvListMu.Lock()
e.rcvClosed = false
e.rcvListMu.Unlock()
} else {
- // Adjust the size of the channel iff we can fix
+ // Adjust the size of the backlog iff we can fit
// existing pending connections into the new one.
- if len(e.acceptedChan) > backlog {
+ if e.accepted.endpoints.Len() > backlog {
return &tcpip.ErrInvalidEndpointState{}
}
- if cap(e.acceptedChan) == backlog {
- return nil
- }
- origChan := e.acceptedChan
- e.acceptedChan = make(chan *endpoint, backlog)
- close(origChan)
- for ep := range origChan {
- e.acceptedChan <- ep
- }
+ e.accepted.cap = backlog
}
// Notify any blocked goroutines that they can attempt to
@@ -2549,12 +2544,12 @@ func (e *endpoint) listen(backlog int) tcpip.Error {
e.isRegistered = true
e.setEndpointState(StateListen)
- // The channel may be non-nil when we're restoring the endpoint, and it
+ // The queue may be non-zero when we're restoring the endpoint, and it
// may be pre-populated with some previously accepted (but not Accepted)
// endpoints.
e.acceptMu.Lock()
- if e.acceptedChan == nil {
- e.acceptedChan = make(chan *endpoint, backlog)
+ if e.accepted == (accepted{}) {
+ e.accepted.cap = backlog
}
e.acceptMu.Unlock()
@@ -2591,15 +2586,16 @@ func (e *endpoint) Accept(peerAddr *tcpip.FullAddress) (tcpip.Endpoint, *waiter.
}
// Get the new accepted endpoint.
- e.acceptMu.Lock()
- defer e.acceptMu.Unlock()
var n *endpoint
- select {
- case n = <-e.acceptedChan:
- e.acceptCond.Signal()
- default:
+ e.acceptMu.Lock()
+ if element := e.accepted.endpoints.Front(); element != nil {
+ n = e.accepted.endpoints.Remove(element).(*endpoint)
+ }
+ e.acceptMu.Unlock()
+ if n == nil {
return nil, nil, &tcpip.ErrWouldBlock{}
}
+ e.acceptCond.Signal()
if peerAddr != nil {
*peerAddr = n.getRemoteAddress()
}