summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/accept.go
diff options
context:
space:
mode:
authorBhasker Hariharan <bhaskerh@google.com>2019-05-30 10:47:11 -0700
committerShentubot <shentubot@google.com>2019-05-30 12:08:41 -0700
commitae26b2c425d53aa8720238183d1c156a45904311 (patch)
treee7eff51fb10a29493586d4193acb3b1047ce1d57 /pkg/tcpip/transport/tcp/accept.go
parent8d25cd0b40694d1911724816d72b34d0717878d6 (diff)
Fixes to TCP listen behavior.
Netstack listen loop can get stuck if cookies are in-use and the app is slow to accept incoming connections. Further we continue to complete handshake for a connection even if the backlog is full. This creates a problem when a lots of connections come in rapidly and we end up with lots of completed connections just hanging around to be delivered. These fixes change netstack behaviour to mirror what linux does as described here in the following article http://veithen.io/2014/01/01/how-tcp-backlog-works-in-linux.html Now when cookies are not in-use Netstack will silently drop the ACK to a SYN-ACK and not complete the handshake if the backlog is full. This will result in the connection staying in a half-complete state. Eventually the sender will retransmit the ACK and if backlog has space we will transition to a connected state and deliver the endpoint. Similarly when cookies are in use we do not try and create an endpoint unless there is space in the accept queue to accept the newly created endpoint. If there is no space then we again silently drop the ACK as we can just recreate it when the ACK is retransmitted by the peer. We also now use the backlog to cap the size of the SYN-RCVD queue for a given endpoint. So at any time there can be N connections in the backlog and N in a SYN-RCVD state if the application is not accepting connections. Any new SYNs will be dropped. This CL also fixes another small bug where we mark a new endpoint which has not completed handshake as connected. We should wait till handshake successfully completes before marking it connected. Updates #236 PiperOrigin-RevId: 250717817
Diffstat (limited to 'pkg/tcpip/transport/tcp/accept.go')
-rw-r--r--pkg/tcpip/transport/tcp/accept.go161
1 files changed, 118 insertions, 43 deletions
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index e506d7133..d4b860975 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -19,6 +19,7 @@ import (
"encoding/binary"
"hash"
"io"
+ "log"
"sync"
"time"
@@ -87,9 +88,10 @@ var synRcvdCount struct {
// and must not be accessed or have its methods called concurrently as they
// may mutate the stored objects.
type listenContext struct {
- stack *stack.Stack
- rcvWnd seqnum.Size
- nonce [2][sha1.BlockSize]byte
+ stack *stack.Stack
+ rcvWnd seqnum.Size
+ nonce [2][sha1.BlockSize]byte
+ listenEP *endpoint
hasherMu sync.Mutex
hasher hash.Hash
@@ -107,15 +109,16 @@ func timeStamp() uint32 {
// threshold, and fails otherwise.
func incSynRcvdCount() bool {
synRcvdCount.Lock()
- defer synRcvdCount.Unlock()
if synRcvdCount.value >= SynRcvdCountThreshold {
+ synRcvdCount.Unlock()
return false
}
synRcvdCount.pending.Add(1)
synRcvdCount.value++
+ synRcvdCount.Unlock()
return true
}
@@ -124,20 +127,21 @@ func incSynRcvdCount() bool {
// succeeded.
func decSynRcvdCount() {
synRcvdCount.Lock()
- defer synRcvdCount.Unlock()
synRcvdCount.value--
synRcvdCount.pending.Done()
+ synRcvdCount.Unlock()
}
// newListenContext creates a new listen context.
-func newListenContext(stack *stack.Stack, rcvWnd seqnum.Size, v6only bool, netProto tcpip.NetworkProtocolNumber) *listenContext {
+func newListenContext(stack *stack.Stack, listenEP *endpoint, rcvWnd seqnum.Size, v6only bool, netProto tcpip.NetworkProtocolNumber) *listenContext {
l := &listenContext{
stack: stack,
rcvWnd: rcvWnd,
hasher: sha1.New(),
v6only: v6only,
netProto: netProto,
+ listenEP: listenEP,
}
rand.Read(l.nonce[0][:])
@@ -195,9 +199,9 @@ func (l *listenContext) isCookieValid(id stack.TransportEndpointID, cookie seqnu
return (v - l.cookieHash(id, cookieTS, 1)) & hashMask, true
}
-// createConnectedEndpoint creates a new connected endpoint, with the connection
-// parameters given by the arguments.
-func (l *listenContext) createConnectedEndpoint(s *segment, iss seqnum.Value, irs seqnum.Value, rcvdSynOpts *header.TCPSynOptions) (*endpoint, *tcpip.Error) {
+// createConnectingEndpoint creates a new endpoint in a connecting state, with
+// the connection parameters given by the arguments.
+func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, irs seqnum.Value, rcvdSynOpts *header.TCPSynOptions) (*endpoint, *tcpip.Error) {
// Create a new endpoint.
netProto := l.netProto
if netProto == 0 {
@@ -223,7 +227,7 @@ func (l *listenContext) createConnectedEndpoint(s *segment, iss seqnum.Value, ir
}
n.isRegistered = true
- n.state = stateConnected
+ n.state = stateConnecting
// Create sender and receiver.
//
@@ -241,7 +245,7 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head
// Create new endpoint.
irs := s.sequenceNumber
cookie := l.createCookie(s.id, irs, encodeMSS(opts.MSS))
- ep, err := l.createConnectedEndpoint(s, cookie, irs, opts)
+ ep, err := l.createConnectingEndpoint(s, cookie, irs, opts)
if err != nil {
return nil, err
}
@@ -249,12 +253,15 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head
// Perform the 3-way handshake.
h := newHandshake(ep, l.rcvWnd)
- h.resetToSynRcvd(cookie, irs, opts)
+ h.resetToSynRcvd(cookie, irs, opts, l.listenEP)
if err := h.execute(); err != nil {
+ ep.stack.Stats().TCP.FailedConnectionAttempts.Increment()
ep.Close()
return nil, err
}
+ ep.state = stateConnected
+
// Update the receive window scaling. We can't do it before the
// handshake because it's possible that the peer doesn't support window
// scaling.
@@ -268,13 +275,14 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head
// instead.
func (e *endpoint) deliverAccepted(n *endpoint) {
e.mu.RLock()
- if e.state == stateListen {
+ state := e.state
+ e.mu.RUnlock()
+ if state == stateListen {
e.acceptedChan <- n
e.waiterQueue.Notify(waiter.EventIn)
} else {
n.Close()
}
- e.mu.RUnlock()
}
// handleSynSegment is called in its own goroutine once the listening endpoint
@@ -285,16 +293,36 @@ func (e *endpoint) deliverAccepted(n *endpoint) {
// cookies to accept connections.
func (e *endpoint) handleSynSegment(ctx *listenContext, s *segment, opts *header.TCPSynOptions) {
defer decSynRcvdCount()
+ defer e.decSynRcvdCount()
defer s.decRef()
n, err := ctx.createEndpointAndPerformHandshake(s, opts)
if err != nil {
+ e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
return
}
e.deliverAccepted(n)
}
+func (e *endpoint) incSynRcvdCount() bool {
+ e.mu.Lock()
+ log.Printf("l: %d, c: %d, e.synRcvdCount: %d", len(e.acceptedChan), cap(e.acceptedChan), e.synRcvdCount)
+ if l, c := len(e.acceptedChan), cap(e.acceptedChan); l == c && e.synRcvdCount >= c {
+ e.mu.Unlock()
+ return false
+ }
+ e.synRcvdCount++
+ e.mu.Unlock()
+ return true
+}
+
+func (e *endpoint) decSynRcvdCount() {
+ e.mu.Lock()
+ e.synRcvdCount--
+ e.mu.Unlock()
+}
+
// handleListenSegment is called when a listening endpoint receives a segment
// and needs to handle it.
func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
@@ -302,9 +330,20 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
case header.TCPFlagSyn:
opts := parseSynSegmentOptions(s)
if incSynRcvdCount() {
- s.incRef()
- go e.handleSynSegment(ctx, s, &opts) // S/R-SAFE: synRcvdCount is the barrier.
+ // Drop the SYN if the listen endpoint's accept queue is
+ // overflowing.
+ if e.incSynRcvdCount() {
+ log.Printf("processing syn packet")
+ s.incRef()
+ go e.handleSynSegment(ctx, s, &opts) // S/R-SAFE: synRcvdCount is the barrier.
+ return
+ }
+ log.Printf("dropping syn packet")
+ e.stack.Stats().TCP.ListenOverflowSynDrop.Increment()
+ e.stack.Stats().DroppedPackets.Increment()
+ return
} else {
+ // TODO(bhaskerh): Increment syncookie sent stat.
cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS))
// Send SYN with window scaling because we currently
// dont't encode this information in the cookie.
@@ -318,36 +357,72 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
TSEcr: opts.TSVal,
}
sendSynTCP(&s.route, s.id, header.TCPFlagSyn|header.TCPFlagAck, cookie, s.sequenceNumber+1, ctx.rcvWnd, synOpts)
+ e.stack.Stats().TCP.ListenOverflowSynCookieSent.Increment()
}
case header.TCPFlagAck:
- if data, ok := ctx.isCookieValid(s.id, s.ackNumber-1, s.sequenceNumber-1); ok && int(data) < len(mssTable) {
- // Create newly accepted endpoint and deliver it.
- rcvdSynOptions := &header.TCPSynOptions{
- MSS: mssTable[data],
- // Disable Window scaling as original SYN is
- // lost.
- WS: -1,
- }
- // When syn cookies are in use we enable timestamp only
- // if the ack specifies the timestamp option assuming
- // that the other end did in fact negotiate the
- // timestamp option in the original SYN.
- if s.parsedOptions.TS {
- rcvdSynOptions.TS = true
- rcvdSynOptions.TSVal = s.parsedOptions.TSVal
- rcvdSynOptions.TSEcr = s.parsedOptions.TSEcr
- }
- n, err := ctx.createConnectedEndpoint(s, s.ackNumber-1, s.sequenceNumber-1, rcvdSynOptions)
- if err == nil {
- // clear the tsOffset for the newly created
- // endpoint as the Timestamp was already
- // randomly offset when the original SYN-ACK was
- // sent above.
- n.tsOffset = 0
- e.deliverAccepted(n)
- }
+ if len(e.acceptedChan) == cap(e.acceptedChan) {
+ // Silently drop the ack as the application can't accept
+ // the connection at this point. The ack will be
+ // retransmitted by the sender anyway and we can
+ // complete the connection at the time of retransmit if
+ // the backlog has space.
+ e.stack.Stats().TCP.ListenOverflowAckDrop.Increment()
+ e.stack.Stats().DroppedPackets.Increment()
+ return
+ }
+
+ // Validate the cookie.
+ data, ok := ctx.isCookieValid(s.id, s.ackNumber-1, s.sequenceNumber-1)
+ if !ok || int(data) >= len(mssTable) {
+ e.stack.Stats().TCP.ListenOverflowInvalidSynCookieRcvd.Increment()
+ e.stack.Stats().DroppedPackets.Increment()
+ return
}
+ e.stack.Stats().TCP.ListenOverflowSynCookieRcvd.Increment()
+ // Create newly accepted endpoint and deliver it.
+ rcvdSynOptions := &header.TCPSynOptions{
+ MSS: mssTable[data],
+ // Disable Window scaling as original SYN is
+ // lost.
+ WS: -1,
+ }
+
+ // When syn cookies are in use we enable timestamp only
+ // if the ack specifies the timestamp option assuming
+ // that the other end did in fact negotiate the
+ // timestamp option in the original SYN.
+ if s.parsedOptions.TS {
+ rcvdSynOptions.TS = true
+ rcvdSynOptions.TSVal = s.parsedOptions.TSVal
+ rcvdSynOptions.TSEcr = s.parsedOptions.TSEcr
+ }
+
+ n, err := ctx.createConnectingEndpoint(s, s.ackNumber-1, s.sequenceNumber-1, rcvdSynOptions)
+ if err != nil {
+ e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
+ return
+ }
+
+ // clear the tsOffset for the newly created
+ // endpoint as the Timestamp was already
+ // randomly offset when the original SYN-ACK was
+ // sent above.
+ n.tsOffset = 0
+
+ // Switch state to connected.
+ n.state = stateConnected
+
+ // Do the delivery in a separate goroutine so
+ // that we don't block the listen loop in case
+ // the application is slow to accept or stops
+ // accepting.
+ //
+ // NOTE: This won't result in an unbounded
+ // number of goroutines as we do check before
+ // entering here that there was at least some
+ // space available in the backlog.
+ go e.deliverAccepted(n)
}
}
@@ -377,7 +452,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error {
v6only := e.v6only
e.mu.Unlock()
- ctx := newListenContext(e.stack, rcvWnd, v6only, e.netProto)
+ ctx := newListenContext(e.stack, e, rcvWnd, v6only, e.netProto)
s := sleep.Sleeper{}
s.AddWaker(&e.notificationWaker, wakerForNotification)