diff options
author | Bhasker Hariharan <bhaskerh@google.com> | 2019-05-30 10:47:11 -0700 |
---|---|---|
committer | Shentubot <shentubot@google.com> | 2019-05-30 12:08:41 -0700 |
commit | ae26b2c425d53aa8720238183d1c156a45904311 (patch) | |
tree | e7eff51fb10a29493586d4193acb3b1047ce1d57 /pkg/tcpip/transport/tcp/accept.go | |
parent | 8d25cd0b40694d1911724816d72b34d0717878d6 (diff) |
Fixes to TCP listen behavior.
Netstack listen loop can get stuck if cookies are in-use and the app is slow to
accept incoming connections. Further we continue to complete handshake for a
connection even if the backlog is full. This creates a problem when a lots of
connections come in rapidly and we end up with lots of completed connections
just hanging around to be delivered.
These fixes change netstack behaviour to mirror what linux does as described
here in the following article
http://veithen.io/2014/01/01/how-tcp-backlog-works-in-linux.html
Now when cookies are not in-use Netstack will silently drop the ACK to a SYN-ACK
and not complete the handshake if the backlog is full. This will result in the
connection staying in a half-complete state. Eventually the sender will
retransmit the ACK and if backlog has space we will transition to a connected
state and deliver the endpoint.
Similarly when cookies are in use we do not try and create an endpoint unless
there is space in the accept queue to accept the newly created endpoint. If
there is no space then we again silently drop the ACK as we can just recreate it
when the ACK is retransmitted by the peer.
We also now use the backlog to cap the size of the SYN-RCVD queue for a given
endpoint. So at any time there can be N connections in the backlog and N in a
SYN-RCVD state if the application is not accepting connections. Any new SYNs
will be dropped.
This CL also fixes another small bug where we mark a new endpoint which has not
completed handshake as connected. We should wait till handshake successfully
completes before marking it connected.
Updates #236
PiperOrigin-RevId: 250717817
Diffstat (limited to 'pkg/tcpip/transport/tcp/accept.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/accept.go | 161 |
1 files changed, 118 insertions, 43 deletions
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index e506d7133..d4b860975 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -19,6 +19,7 @@ import ( "encoding/binary" "hash" "io" + "log" "sync" "time" @@ -87,9 +88,10 @@ var synRcvdCount struct { // and must not be accessed or have its methods called concurrently as they // may mutate the stored objects. type listenContext struct { - stack *stack.Stack - rcvWnd seqnum.Size - nonce [2][sha1.BlockSize]byte + stack *stack.Stack + rcvWnd seqnum.Size + nonce [2][sha1.BlockSize]byte + listenEP *endpoint hasherMu sync.Mutex hasher hash.Hash @@ -107,15 +109,16 @@ func timeStamp() uint32 { // threshold, and fails otherwise. func incSynRcvdCount() bool { synRcvdCount.Lock() - defer synRcvdCount.Unlock() if synRcvdCount.value >= SynRcvdCountThreshold { + synRcvdCount.Unlock() return false } synRcvdCount.pending.Add(1) synRcvdCount.value++ + synRcvdCount.Unlock() return true } @@ -124,20 +127,21 @@ func incSynRcvdCount() bool { // succeeded. func decSynRcvdCount() { synRcvdCount.Lock() - defer synRcvdCount.Unlock() synRcvdCount.value-- synRcvdCount.pending.Done() + synRcvdCount.Unlock() } // newListenContext creates a new listen context. -func newListenContext(stack *stack.Stack, rcvWnd seqnum.Size, v6only bool, netProto tcpip.NetworkProtocolNumber) *listenContext { +func newListenContext(stack *stack.Stack, listenEP *endpoint, rcvWnd seqnum.Size, v6only bool, netProto tcpip.NetworkProtocolNumber) *listenContext { l := &listenContext{ stack: stack, rcvWnd: rcvWnd, hasher: sha1.New(), v6only: v6only, netProto: netProto, + listenEP: listenEP, } rand.Read(l.nonce[0][:]) @@ -195,9 +199,9 @@ func (l *listenContext) isCookieValid(id stack.TransportEndpointID, cookie seqnu return (v - l.cookieHash(id, cookieTS, 1)) & hashMask, true } -// createConnectedEndpoint creates a new connected endpoint, with the connection -// parameters given by the arguments. -func (l *listenContext) createConnectedEndpoint(s *segment, iss seqnum.Value, irs seqnum.Value, rcvdSynOpts *header.TCPSynOptions) (*endpoint, *tcpip.Error) { +// createConnectingEndpoint creates a new endpoint in a connecting state, with +// the connection parameters given by the arguments. +func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, irs seqnum.Value, rcvdSynOpts *header.TCPSynOptions) (*endpoint, *tcpip.Error) { // Create a new endpoint. netProto := l.netProto if netProto == 0 { @@ -223,7 +227,7 @@ func (l *listenContext) createConnectedEndpoint(s *segment, iss seqnum.Value, ir } n.isRegistered = true - n.state = stateConnected + n.state = stateConnecting // Create sender and receiver. // @@ -241,7 +245,7 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head // Create new endpoint. irs := s.sequenceNumber cookie := l.createCookie(s.id, irs, encodeMSS(opts.MSS)) - ep, err := l.createConnectedEndpoint(s, cookie, irs, opts) + ep, err := l.createConnectingEndpoint(s, cookie, irs, opts) if err != nil { return nil, err } @@ -249,12 +253,15 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head // Perform the 3-way handshake. h := newHandshake(ep, l.rcvWnd) - h.resetToSynRcvd(cookie, irs, opts) + h.resetToSynRcvd(cookie, irs, opts, l.listenEP) if err := h.execute(); err != nil { + ep.stack.Stats().TCP.FailedConnectionAttempts.Increment() ep.Close() return nil, err } + ep.state = stateConnected + // Update the receive window scaling. We can't do it before the // handshake because it's possible that the peer doesn't support window // scaling. @@ -268,13 +275,14 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head // instead. func (e *endpoint) deliverAccepted(n *endpoint) { e.mu.RLock() - if e.state == stateListen { + state := e.state + e.mu.RUnlock() + if state == stateListen { e.acceptedChan <- n e.waiterQueue.Notify(waiter.EventIn) } else { n.Close() } - e.mu.RUnlock() } // handleSynSegment is called in its own goroutine once the listening endpoint @@ -285,16 +293,36 @@ func (e *endpoint) deliverAccepted(n *endpoint) { // cookies to accept connections. func (e *endpoint) handleSynSegment(ctx *listenContext, s *segment, opts *header.TCPSynOptions) { defer decSynRcvdCount() + defer e.decSynRcvdCount() defer s.decRef() n, err := ctx.createEndpointAndPerformHandshake(s, opts) if err != nil { + e.stack.Stats().TCP.FailedConnectionAttempts.Increment() return } e.deliverAccepted(n) } +func (e *endpoint) incSynRcvdCount() bool { + e.mu.Lock() + log.Printf("l: %d, c: %d, e.synRcvdCount: %d", len(e.acceptedChan), cap(e.acceptedChan), e.synRcvdCount) + if l, c := len(e.acceptedChan), cap(e.acceptedChan); l == c && e.synRcvdCount >= c { + e.mu.Unlock() + return false + } + e.synRcvdCount++ + e.mu.Unlock() + return true +} + +func (e *endpoint) decSynRcvdCount() { + e.mu.Lock() + e.synRcvdCount-- + e.mu.Unlock() +} + // handleListenSegment is called when a listening endpoint receives a segment // and needs to handle it. func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { @@ -302,9 +330,20 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { case header.TCPFlagSyn: opts := parseSynSegmentOptions(s) if incSynRcvdCount() { - s.incRef() - go e.handleSynSegment(ctx, s, &opts) // S/R-SAFE: synRcvdCount is the barrier. + // Drop the SYN if the listen endpoint's accept queue is + // overflowing. + if e.incSynRcvdCount() { + log.Printf("processing syn packet") + s.incRef() + go e.handleSynSegment(ctx, s, &opts) // S/R-SAFE: synRcvdCount is the barrier. + return + } + log.Printf("dropping syn packet") + e.stack.Stats().TCP.ListenOverflowSynDrop.Increment() + e.stack.Stats().DroppedPackets.Increment() + return } else { + // TODO(bhaskerh): Increment syncookie sent stat. cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS)) // Send SYN with window scaling because we currently // dont't encode this information in the cookie. @@ -318,36 +357,72 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { TSEcr: opts.TSVal, } sendSynTCP(&s.route, s.id, header.TCPFlagSyn|header.TCPFlagAck, cookie, s.sequenceNumber+1, ctx.rcvWnd, synOpts) + e.stack.Stats().TCP.ListenOverflowSynCookieSent.Increment() } case header.TCPFlagAck: - if data, ok := ctx.isCookieValid(s.id, s.ackNumber-1, s.sequenceNumber-1); ok && int(data) < len(mssTable) { - // Create newly accepted endpoint and deliver it. - rcvdSynOptions := &header.TCPSynOptions{ - MSS: mssTable[data], - // Disable Window scaling as original SYN is - // lost. - WS: -1, - } - // When syn cookies are in use we enable timestamp only - // if the ack specifies the timestamp option assuming - // that the other end did in fact negotiate the - // timestamp option in the original SYN. - if s.parsedOptions.TS { - rcvdSynOptions.TS = true - rcvdSynOptions.TSVal = s.parsedOptions.TSVal - rcvdSynOptions.TSEcr = s.parsedOptions.TSEcr - } - n, err := ctx.createConnectedEndpoint(s, s.ackNumber-1, s.sequenceNumber-1, rcvdSynOptions) - if err == nil { - // clear the tsOffset for the newly created - // endpoint as the Timestamp was already - // randomly offset when the original SYN-ACK was - // sent above. - n.tsOffset = 0 - e.deliverAccepted(n) - } + if len(e.acceptedChan) == cap(e.acceptedChan) { + // Silently drop the ack as the application can't accept + // the connection at this point. The ack will be + // retransmitted by the sender anyway and we can + // complete the connection at the time of retransmit if + // the backlog has space. + e.stack.Stats().TCP.ListenOverflowAckDrop.Increment() + e.stack.Stats().DroppedPackets.Increment() + return + } + + // Validate the cookie. + data, ok := ctx.isCookieValid(s.id, s.ackNumber-1, s.sequenceNumber-1) + if !ok || int(data) >= len(mssTable) { + e.stack.Stats().TCP.ListenOverflowInvalidSynCookieRcvd.Increment() + e.stack.Stats().DroppedPackets.Increment() + return } + e.stack.Stats().TCP.ListenOverflowSynCookieRcvd.Increment() + // Create newly accepted endpoint and deliver it. + rcvdSynOptions := &header.TCPSynOptions{ + MSS: mssTable[data], + // Disable Window scaling as original SYN is + // lost. + WS: -1, + } + + // When syn cookies are in use we enable timestamp only + // if the ack specifies the timestamp option assuming + // that the other end did in fact negotiate the + // timestamp option in the original SYN. + if s.parsedOptions.TS { + rcvdSynOptions.TS = true + rcvdSynOptions.TSVal = s.parsedOptions.TSVal + rcvdSynOptions.TSEcr = s.parsedOptions.TSEcr + } + + n, err := ctx.createConnectingEndpoint(s, s.ackNumber-1, s.sequenceNumber-1, rcvdSynOptions) + if err != nil { + e.stack.Stats().TCP.FailedConnectionAttempts.Increment() + return + } + + // clear the tsOffset for the newly created + // endpoint as the Timestamp was already + // randomly offset when the original SYN-ACK was + // sent above. + n.tsOffset = 0 + + // Switch state to connected. + n.state = stateConnected + + // Do the delivery in a separate goroutine so + // that we don't block the listen loop in case + // the application is slow to accept or stops + // accepting. + // + // NOTE: This won't result in an unbounded + // number of goroutines as we do check before + // entering here that there was at least some + // space available in the backlog. + go e.deliverAccepted(n) } } @@ -377,7 +452,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error { v6only := e.v6only e.mu.Unlock() - ctx := newListenContext(e.stack, rcvWnd, v6only, e.netProto) + ctx := newListenContext(e.stack, e, rcvWnd, v6only, e.netProto) s := sleep.Sleeper{} s.AddWaker(&e.notificationWaker, wakerForNotification) |