diff options
-rw-r--r-- | pkg/sentry/socket/epsocket/epsocket.go | 1 | ||||
-rw-r--r-- | pkg/tcpip/stack/stack.go | 43 | ||||
-rw-r--r-- | pkg/tcpip/stack/transport_test.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/tcpip.go | 11 | ||||
-rw-r--r-- | pkg/tcpip/transport/icmp/endpoint.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/transport/raw/endpoint.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/accept.go | 14 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 48 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 178 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint_state.go | 20 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/forwarder.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/protocol.go | 36 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rcv.go | 78 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/sack.go | 12 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 14 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_test.go | 308 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_timestamp_test.go | 4 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/testing/context/context.go | 41 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/endpoint.go | 3 |
19 files changed, 740 insertions, 82 deletions
diff --git a/pkg/sentry/socket/epsocket/epsocket.go b/pkg/sentry/socket/epsocket/epsocket.go index ce3b247d0..a42cf4caf 100644 --- a/pkg/sentry/socket/epsocket/epsocket.go +++ b/pkg/sentry/socket/epsocket/epsocket.go @@ -1719,6 +1719,7 @@ func (s *SocketOperations) coalescingRead(ctx context.Context, dst usermem.IOSeq // If we managed to copy something, we must deliver it. if copied > 0 { + s.Endpoint.ModerateRecvBuf(copied) return copied, nil } diff --git a/pkg/tcpip/stack/stack.go b/pkg/tcpip/stack/stack.go index 8ecc51a58..2d7f56ca9 100644 --- a/pkg/tcpip/stack/stack.go +++ b/pkg/tcpip/stack/stack.go @@ -225,6 +225,45 @@ type TCPSACKInfo struct { MaxSACKED seqnum.Value } +// RcvBufAutoTuneParams holds state related to TCP receive buffer auto-tuning. +type RcvBufAutoTuneParams struct { + // MeasureTime is the time at which the current measurement + // was started. + MeasureTime time.Time + + // CopiedBytes is the number of bytes copied to user space since + // this measure began. + CopiedBytes int + + // PrevCopiedBytes is the number of bytes copied to user space in + // the previous RTT period. + PrevCopiedBytes int + + // RcvBufSize is the auto tuned receive buffer size. + RcvBufSize int + + // RTT is the smoothed RTT as measured by observing the time between + // when a byte is first acknowledged and the receipt of data that is at + // least one window beyond the sequence number that was acknowledged. + RTT time.Duration + + // RTTVar is the "round-trip time variation" as defined in section 2 + // of RFC6298. + RTTVar time.Duration + + // RTTMeasureSeqNumber is the highest acceptable sequence number at the + // time this RTT measurement period began. + RTTMeasureSeqNumber seqnum.Value + + // RTTMeasureTime is the absolute time at which the current RTT + // measurement period began. + RTTMeasureTime time.Time + + // Disabled is true if an explicit receive buffer is set for the + // endpoint. + Disabled bool +} + // TCPEndpointState is a copy of the internal state of a TCP endpoint. type TCPEndpointState struct { // ID is a copy of the TransportEndpointID for the endpoint. @@ -240,6 +279,10 @@ type TCPEndpointState struct { // buffer for the endpoint. RcvBufUsed int + // RcvBufAutoTuneParams is used to hold state variables to compute + // the auto tuned receive buffer size. + RcvAutoParams RcvBufAutoTuneParams + // RcvClosed if true, indicates the endpoint has been closed for reading. RcvClosed bool diff --git a/pkg/tcpip/stack/transport_test.go b/pkg/tcpip/stack/transport_test.go index 10fc8f195..788ffcc8c 100644 --- a/pkg/tcpip/stack/transport_test.go +++ b/pkg/tcpip/stack/transport_test.go @@ -192,6 +192,9 @@ func (f *fakeTransportEndpoint) State() uint32 { return 0 } +func (f *fakeTransportEndpoint) ModerateRecvBuf(copied int) { +} + type fakeTransportGoodOption bool type fakeTransportBadOption bool diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go index 2f931a5a3..4aafb51ab 100644 --- a/pkg/tcpip/tcpip.go +++ b/pkg/tcpip/tcpip.go @@ -381,6 +381,13 @@ type Endpoint interface { // State returns a socket's lifecycle state. The returned value is // protocol-specific and is primarily used for diagnostics. State() uint32 + + // ModerateRecvBuf should be called everytime data is copied to the user + // space. This allows for dynamic tuning of recv buffer space for a + // given socket. + // + // NOTE: This method is a no-op for sockets other than TCP. + ModerateRecvBuf(copied int) } // WriteOptions contains options for Endpoint.Write. @@ -480,6 +487,10 @@ type CongestionControlOption string // control algorithms. type AvailableCongestionControlOption string +// ModerateReceiveBufferOption allows the caller to enable/disable TCP receive +// buffer moderation. +type ModerateReceiveBufferOption bool + // MulticastTTLOption is used by SetSockOpt/GetSockOpt to control the default // TTL value for multicast messages. The default is 1. type MulticastTTLOption uint8 diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go index 0a74429b8..33cefd937 100644 --- a/pkg/tcpip/transport/icmp/endpoint.go +++ b/pkg/tcpip/transport/icmp/endpoint.go @@ -127,6 +127,9 @@ func (e *endpoint) Close() { e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) } +// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. +func (e *endpoint) ModerateRecvBuf(copied int) {} + // Read reads data from the endpoint. This method does not block if // there is no data pending. func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go index 5ea59d7ea..03f495e48 100644 --- a/pkg/tcpip/transport/raw/endpoint.go +++ b/pkg/tcpip/transport/raw/endpoint.go @@ -147,6 +147,9 @@ func (ep *endpoint) Close() { ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) } +// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. +func (ep *endpoint) ModerateRecvBuf(copied int) {} + // Read implements tcpip.Endpoint.Read. func (ep *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { ep.rcvMu.Lock() diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index 9b1ad6a28..52fd1bfa3 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -213,6 +213,7 @@ func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, i n.route = s.route.Clone() n.effectiveNetProtos = []tcpip.NetworkProtocolNumber{s.route.NetProto} n.rcvBufSize = int(l.rcvWnd) + n.amss = mssForRoute(&n.route) n.maybeEnableTimestamp(rcvdSynOpts) n.maybeEnableSACKPermitted(rcvdSynOpts) @@ -232,7 +233,11 @@ func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, i // The receiver at least temporarily has a zero receive window scale, // but the caller may change it (before starting the protocol loop). n.snd = newSender(n, iss, irs, s.window, rcvdSynOpts.MSS, rcvdSynOpts.WS) - n.rcv = newReceiver(n, irs, l.rcvWnd, 0) + n.rcv = newReceiver(n, irs, seqnum.Size(n.initialReceiveWindow()), 0, seqnum.Size(n.receiveBufferSize())) + // Bootstrap the auto tuning algorithm. Starting at zero will result in + // a large step function on the first window adjustment causing the + // window to grow to a really large value. + n.rcvAutoParams.prevCopied = n.initialReceiveWindow() return n, nil } @@ -249,7 +254,7 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head } // Perform the 3-way handshake. - h := newHandshake(ep, l.rcvWnd) + h := newHandshake(ep, seqnum.Size(ep.initialReceiveWindow())) h.resetToSynRcvd(cookie, irs, opts) if err := h.execute(); err != nil { @@ -359,16 +364,19 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { return } cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS)) - // Send SYN with window scaling because we currently + + // Send SYN without window scaling because we currently // dont't encode this information in the cookie. // // Enable Timestamp option if the original syn did have // the timestamp option specified. + mss := mssForRoute(&s.route) synOpts := header.TCPSynOptions{ WS: -1, TS: opts.TS, TSVal: tcpTimeStamp(timeStampOffset()), TSEcr: opts.TSVal, + MSS: uint16(mss), } sendSynTCP(&s.route, s.id, header.TCPFlagSyn|header.TCPFlagAck, cookie, s.sequenceNumber+1, ctx.rcvWnd, synOpts) e.stack.Stats().TCP.ListenOverflowSynCookieSent.Increment() diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 84e3dd26c..00d2ae524 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -78,6 +78,9 @@ type handshake struct { // mss is the maximum segment size received from the peer. mss uint16 + // amss is the maximum segment size advertised by us to the peer. + amss uint16 + // sndWndScale is the send window scale, as defined in RFC 1323. A // negative value means no scaling is supported by the peer. sndWndScale int @@ -87,11 +90,24 @@ type handshake struct { } func newHandshake(ep *endpoint, rcvWnd seqnum.Size) handshake { + rcvWndScale := ep.rcvWndScaleForHandshake() + + // Round-down the rcvWnd to a multiple of wndScale. This ensures that the + // window offered in SYN won't be reduced due to the loss of precision if + // window scaling is enabled after the handshake. + rcvWnd = (rcvWnd >> uint8(rcvWndScale)) << uint8(rcvWndScale) + + // Ensure we can always accept at least 1 byte if the scale specified + // was too high for the provided rcvWnd. + if rcvWnd == 0 { + rcvWnd = 1 + } + h := handshake{ ep: ep, active: true, rcvWnd: rcvWnd, - rcvWndScale: FindWndScale(rcvWnd), + rcvWndScale: int(rcvWndScale), } h.resetState() return h @@ -224,7 +240,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { h.ep.state = StateSynRecv h.ep.mu.Unlock() synOpts := header.TCPSynOptions{ - WS: h.rcvWndScale, + WS: int(h.effectiveRcvWndScale()), TS: rcvSynOpts.TS, TSVal: h.ep.timestamp(), TSEcr: h.ep.recentTS, @@ -233,6 +249,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { // permits SACK. This is not explicitly defined in the RFC but // this is the behaviour implemented by Linux. SACKPermitted: rcvSynOpts.SACKPermitted, + MSS: h.ep.amss, } sendSynTCP(&s.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) @@ -277,6 +294,7 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error { TSVal: h.ep.timestamp(), TSEcr: h.ep.recentTS, SACKPermitted: h.ep.sackPermitted, + MSS: h.ep.amss, } sendSynTCP(&s.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) return nil @@ -419,12 +437,15 @@ func (h *handshake) execute() *tcpip.Error { // Send the initial SYN segment and loop until the handshake is // completed. + h.ep.amss = mssForRoute(&h.ep.route) + synOpts := header.TCPSynOptions{ WS: h.rcvWndScale, TS: true, TSVal: h.ep.timestamp(), TSEcr: h.ep.recentTS, SACKPermitted: bool(sackEnabled), + MSS: h.ep.amss, } // Execute is also called in a listen context so we want to make sure we @@ -433,6 +454,11 @@ func (h *handshake) execute() *tcpip.Error { if h.state == handshakeSynRcvd { synOpts.TS = h.ep.sendTSOk synOpts.SACKPermitted = h.ep.sackPermitted && bool(sackEnabled) + if h.sndWndScale < 0 { + // Disable window scaling if the peer did not send us + // the window scaling option. + synOpts.WS = -1 + } } sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) for h.state != handshakeCompleted { @@ -554,13 +580,6 @@ func makeSynOptions(opts header.TCPSynOptions) []byte { } func sendSynTCP(r *stack.Route, id stack.TransportEndpointID, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts header.TCPSynOptions) *tcpip.Error { - // The MSS in opts is automatically calculated as this function is - // called from many places and we don't want every call point being - // embedded with the MSS calculation. - if opts.MSS == 0 { - opts.MSS = uint16(r.MTU() - header.TCPMinimumSize) - } - options := makeSynOptions(opts) err := sendTCP(r, id, buffer.VectorisedView{}, r.DefaultTTL(), flags, seq, ack, rcvWnd, options, nil) putOptions(options) @@ -861,7 +880,8 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { // This is an active connection, so we must initiate the 3-way // handshake, and then inform potential waiters about its // completion. - h := newHandshake(e, seqnum.Size(e.receiveBufferAvailable())) + initialRcvWnd := e.initialReceiveWindow() + h := newHandshake(e, seqnum.Size(initialRcvWnd)) e.mu.Lock() h.ep.state = StateSynSent e.mu.Unlock() @@ -886,8 +906,14 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { // (indicated by a negative send window scale). e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale) + rcvBufSize := seqnum.Size(e.receiveBufferSize()) e.rcvListMu.Lock() - e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale()) + e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale(), rcvBufSize) + // boot strap the auto tuning algorithm. Starting at zero will + // result in a large step function on the first proper causing + // the window to just go to a really large value after the first + // RTT itself. + e.rcvAutoParams.prevCopied = initialRcvWnd e.rcvListMu.Unlock() } diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 9614b2958..1aa1f12b4 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -132,6 +132,42 @@ type SACKInfo struct { NumBlocks int } +// rcvBufAutoTuneParams are used to hold state variables to compute +// the auto tuned recv buffer size. +// +// +stateify savable +type rcvBufAutoTuneParams struct { + // measureTime is the time at which the current measurement + // was started. + measureTime time.Time `state:".(unixTime)"` + + // copied is the number of bytes copied out of the receive + // buffers since this measure began. + copied int + + // prevCopied is the number of bytes copied out of the receive + // buffers in the previous RTT period. + prevCopied int + + // rtt is the non-smoothed minimum RTT as measured by observing the time + // between when a byte is first acknowledged and the receipt of data + // that is at least one window beyond the sequence number that was + // acknowledged. + rtt time.Duration + + // rttMeasureSeqNumber is the highest acceptable sequence number at the + // time this RTT measurement period began. + rttMeasureSeqNumber seqnum.Value + + // rttMeasureTime is the absolute time at which the current rtt + // measurement period began. + rttMeasureTime time.Time `state:".(unixTime)"` + + // disabled is true if an explicit receive buffer is set for the + // endpoint. + disabled bool +} + // endpoint represents a TCP endpoint. This struct serves as the interface // between users of the endpoint and the protocol implementation; it is legal to // have concurrent goroutines make calls into the endpoint, they are properly @@ -165,11 +201,12 @@ type endpoint struct { // to indicate to users that no more data is coming. // // rcvListMu can be taken after the endpoint mu below. - rcvListMu sync.Mutex `state:"nosave"` - rcvList segmentList `state:"wait"` - rcvClosed bool - rcvBufSize int - rcvBufUsed int + rcvListMu sync.Mutex `state:"nosave"` + rcvList segmentList `state:"wait"` + rcvClosed bool + rcvBufSize int + rcvBufUsed int + rcvAutoParams rcvBufAutoTuneParams // The following fields are protected by the mutex. mu sync.RWMutex `state:"nosave"` @@ -339,6 +376,9 @@ type endpoint struct { bindAddress tcpip.Address connectingAddress tcpip.Address + // amss is the advertised MSS to the peer by this endpoint. + amss uint16 + gso *stack.GSO } @@ -373,8 +413,8 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite netProto: netProto, waiterQueue: waiterQueue, state: StateInitial, - rcvBufSize: DefaultBufferSize, - sndBufSize: DefaultBufferSize, + rcvBufSize: DefaultReceiveBufferSize, + sndBufSize: DefaultSendBufferSize, sndMTU: int(math.MaxInt32), reuseAddr: true, keepalive: keepalive{ @@ -400,6 +440,11 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite e.cc = cs } + var mrb tcpip.ModerateReceiveBufferOption + if err := stack.TransportProtocolOption(ProtocolNumber, &mrb); err == nil { + e.rcvAutoParams.disabled = !bool(mrb) + } + if p := stack.GetTCPProbe(); p != nil { e.probe = p } @@ -408,6 +453,7 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite e.workMu.Init() e.workMu.Lock() e.tsOffset = timeStampOffset() + return e } @@ -551,6 +597,83 @@ func (e *endpoint) cleanupLocked() { tcpip.DeleteDanglingEndpoint(e) } +// initialReceiveWindow returns the initial receive window to advertise in the +// SYN/SYN-ACK. +func (e *endpoint) initialReceiveWindow() int { + rcvWnd := e.receiveBufferAvailable() + if rcvWnd > math.MaxUint16 { + rcvWnd = math.MaxUint16 + } + routeWnd := InitialCwnd * int(mssForRoute(&e.route)) * 2 + if rcvWnd > routeWnd { + rcvWnd = routeWnd + } + return rcvWnd +} + +// ModerateRecvBuf adjusts the receive buffer and the advertised window +// based on the number of bytes copied to user space. +func (e *endpoint) ModerateRecvBuf(copied int) { + e.rcvListMu.Lock() + if e.rcvAutoParams.disabled { + e.rcvListMu.Unlock() + return + } + now := time.Now() + if rtt := e.rcvAutoParams.rtt; rtt == 0 || now.Sub(e.rcvAutoParams.measureTime) < rtt { + e.rcvAutoParams.copied += copied + e.rcvListMu.Unlock() + return + } + prevRTTCopied := e.rcvAutoParams.copied + copied + prevCopied := e.rcvAutoParams.prevCopied + rcvWnd := 0 + if prevRTTCopied > prevCopied { + // The minimal receive window based on what was copied by the app + // in the immediate preceding RTT and some extra buffer for 16 + // segments to account for variations. + // We multiply by 2 to account for packet losses. + rcvWnd = prevRTTCopied*2 + 16*int(e.amss) + + // Scale for slow start based on bytes copied in this RTT vs previous. + grow := (rcvWnd * (prevRTTCopied - prevCopied)) / prevCopied + + // Multiply growth factor by 2 again to account for sender being + // in slow-start where the sender grows it's congestion window + // by 100% per RTT. + rcvWnd += grow * 2 + + // Make sure auto tuned buffer size can always receive upto 2x + // the initial window of 10 segments. + if minRcvWnd := int(e.amss) * InitialCwnd * 2; rcvWnd < minRcvWnd { + rcvWnd = minRcvWnd + } + + // Cap the auto tuned buffer size by the maximum permissible + // receive buffer size. + if max := e.maxReceiveBufferSize(); rcvWnd > max { + rcvWnd = max + } + + // We do not adjust downwards as that can cause the receiver to + // reject valid data that might already be in flight as the + // acceptable window will shrink. + if rcvWnd > e.rcvBufSize { + e.rcvBufSize = rcvWnd + e.notifyProtocolGoroutine(notifyReceiveWindowChanged) + } + + // We only update prevCopied when we grow the buffer because in cases + // where prevCopied > prevRTTCopied the existing buffer is already big + // enough to handle the current rate and we don't need to do any + // adjustments. + e.rcvAutoParams.prevCopied = prevRTTCopied + } + e.rcvAutoParams.measureTime = now + e.rcvAutoParams.copied = 0 + e.rcvListMu.Unlock() +} + // Read reads data from the endpoint. func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { e.mu.RLock() @@ -821,6 +944,7 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { wasZero := e.zeroReceiveWindow(scale) e.rcvBufSize = size + e.rcvAutoParams.disabled = true if wasZero && !e.zeroReceiveWindow(scale) { mask |= notifyNonZeroReceiveWindow } @@ -1657,6 +1781,33 @@ func (e *endpoint) receiveBufferSize() int { return size } +func (e *endpoint) maxReceiveBufferSize() int { + var rs ReceiveBufferSizeOption + if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err != nil { + // As a fallback return the hardcoded max buffer size. + return MaxBufferSize + } + return rs.Max +} + +// rcvWndScaleForHandshake computes the receive window scale to offer to the +// peer when window scaling is enabled (true by default). If auto-tuning is +// disabled then the window scaling factor is based on the size of the +// receiveBuffer otherwise we use the max permissible receive buffer size to +// compute the scale. +func (e *endpoint) rcvWndScaleForHandshake() int { + bufSizeForScale := e.receiveBufferSize() + + e.rcvListMu.Lock() + autoTuningDisabled := e.rcvAutoParams.disabled + e.rcvListMu.Unlock() + if autoTuningDisabled { + return FindWndScale(seqnum.Size(bufSizeForScale)) + } + + return FindWndScale(seqnum.Size(e.maxReceiveBufferSize())) +} + // updateRecentTimestamp updates the recent timestamp using the algorithm // described in https://tools.ietf.org/html/rfc7323#section-4.3 func (e *endpoint) updateRecentTimestamp(tsVal uint32, maxSentAck seqnum.Value, segSeq seqnum.Value) { @@ -1749,6 +1900,13 @@ func (e *endpoint) completeState() stack.TCPEndpointState { s.RcvBufSize = e.rcvBufSize s.RcvBufUsed = e.rcvBufUsed s.RcvClosed = e.rcvClosed + s.RcvAutoParams.MeasureTime = e.rcvAutoParams.measureTime + s.RcvAutoParams.CopiedBytes = e.rcvAutoParams.copied + s.RcvAutoParams.PrevCopiedBytes = e.rcvAutoParams.prevCopied + s.RcvAutoParams.RTT = e.rcvAutoParams.rtt + s.RcvAutoParams.RTTMeasureSeqNumber = e.rcvAutoParams.rttMeasureSeqNumber + s.RcvAutoParams.RTTMeasureTime = e.rcvAutoParams.rttMeasureTime + s.RcvAutoParams.Disabled = e.rcvAutoParams.disabled e.rcvListMu.Unlock() // Endpoint TCP Option state. @@ -1802,13 +1960,13 @@ func (e *endpoint) completeState() stack.TCPEndpointState { RTTMeasureTime: e.snd.rttMeasureTime, Closed: e.snd.closed, RTO: e.snd.rto, - SRTTInited: e.snd.srttInited, MaxPayloadSize: e.snd.maxPayloadSize, SndWndScale: e.snd.sndWndScale, MaxSentAck: e.snd.maxSentAck, } e.snd.rtt.Lock() s.Sender.SRTT = e.snd.rtt.srtt + s.Sender.SRTTInited = e.snd.rtt.srttInited e.snd.rtt.Unlock() if cubic, ok := e.snd.cc.(*cubicState); ok { @@ -1856,3 +2014,7 @@ func (e *endpoint) State() uint32 { defer e.mu.Unlock() return uint32(e.state) } + +func mssForRoute(r *stack.Route) uint16 { + return uint16(r.MTU() - header.TCPMinimumSize) +} diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go index 58be61927..ec61a3886 100644 --- a/pkg/tcpip/transport/tcp/endpoint_state.go +++ b/pkg/tcpip/transport/tcp/endpoint_state.go @@ -360,3 +360,23 @@ func loadError(s string) *tcpip.Error { return e } + +// saveMeasureTime is invoked by stateify. +func (r *rcvBufAutoTuneParams) saveMeasureTime() unixTime { + return unixTime{r.measureTime.Unix(), r.measureTime.UnixNano()} +} + +// loadMeasureTime is invoked by stateify. +func (r *rcvBufAutoTuneParams) loadMeasureTime(unix unixTime) { + r.measureTime = time.Unix(unix.second, unix.nano) +} + +// saveRttMeasureTime is invoked by stateify. +func (r *rcvBufAutoTuneParams) saveRttMeasureTime() unixTime { + return unixTime{r.rttMeasureTime.Unix(), r.rttMeasureTime.UnixNano()} +} + +// loadRttMeasureTime is invoked by stateify. +func (r *rcvBufAutoTuneParams) loadRttMeasureTime(unix unixTime) { + r.rttMeasureTime = time.Unix(unix.second, unix.nano) +} diff --git a/pkg/tcpip/transport/tcp/forwarder.go b/pkg/tcpip/transport/tcp/forwarder.go index 2ce94aeb9..63666f0b3 100644 --- a/pkg/tcpip/transport/tcp/forwarder.go +++ b/pkg/tcpip/transport/tcp/forwarder.go @@ -47,7 +47,7 @@ type Forwarder struct { // If rcvWnd is set to zero, the default buffer size is used instead. func NewForwarder(s *stack.Stack, rcvWnd, maxInFlight int, handler func(*ForwarderRequest)) *Forwarder { if rcvWnd == 0 { - rcvWnd = DefaultBufferSize + rcvWnd = DefaultReceiveBufferSize } return &Forwarder{ maxInFlight: maxInFlight, diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go index 919e4ce24..ee04dcfcc 100644 --- a/pkg/tcpip/transport/tcp/protocol.go +++ b/pkg/tcpip/transport/tcp/protocol.go @@ -41,13 +41,18 @@ const ( ProtocolNumber = header.TCPProtocolNumber // MinBufferSize is the smallest size of a receive or send buffer. - minBufferSize = 4 << 10 // 4096 bytes. + MinBufferSize = 4 << 10 // 4096 bytes. - // DefaultBufferSize is the default size of the receive and send buffers. - DefaultBufferSize = 1 << 20 // 1MB + // DefaultSendBufferSize is the default size of the send buffer for + // an endpoint. + DefaultSendBufferSize = 1 << 20 // 1MB - // MaxBufferSize is the largest size a receive and send buffer can grow to. - maxBufferSize = 4 << 20 // 4MB + // DefaultReceiveBufferSize is the default size of the receive buffer + // for an endpoint. + DefaultReceiveBufferSize = 1 << 20 // 1MB + + // MaxBufferSize is the largest size a receive/send buffer can grow to. + MaxBufferSize = 4 << 20 // 4MB // MaxUnprocessedSegments is the maximum number of unprocessed segments // that can be queued for a given endpoint. @@ -86,6 +91,7 @@ type protocol struct { recvBufferSize ReceiveBufferSizeOption congestionControl string availableCongestionControl []string + moderateReceiveBuffer bool } // Number returns the tcp protocol number. @@ -192,6 +198,13 @@ func (p *protocol) SetOption(option interface{}) *tcpip.Error { // linux returns ENOENT when an invalid congestion control // is specified. return tcpip.ErrNoSuchFile + + case tcpip.ModerateReceiveBufferOption: + p.mu.Lock() + p.moderateReceiveBuffer = bool(v) + p.mu.Unlock() + return nil + default: return tcpip.ErrUnknownProtocolOption } @@ -217,16 +230,25 @@ func (p *protocol) Option(option interface{}) *tcpip.Error { *v = p.recvBufferSize p.mu.Unlock() return nil + case *tcpip.CongestionControlOption: p.mu.Lock() *v = tcpip.CongestionControlOption(p.congestionControl) p.mu.Unlock() return nil + case *tcpip.AvailableCongestionControlOption: p.mu.Lock() *v = tcpip.AvailableCongestionControlOption(strings.Join(p.availableCongestionControl, " ")) p.mu.Unlock() return nil + + case *tcpip.ModerateReceiveBufferOption: + p.mu.Lock() + *v = tcpip.ModerateReceiveBufferOption(p.moderateReceiveBuffer) + p.mu.Unlock() + return nil + default: return tcpip.ErrUnknownProtocolOption } @@ -235,8 +257,8 @@ func (p *protocol) Option(option interface{}) *tcpip.Error { func init() { stack.RegisterTransportProtocolFactory(ProtocolName, func() stack.TransportProtocol { return &protocol{ - sendBufferSize: SendBufferSizeOption{minBufferSize, DefaultBufferSize, maxBufferSize}, - recvBufferSize: ReceiveBufferSizeOption{minBufferSize, DefaultBufferSize, maxBufferSize}, + sendBufferSize: SendBufferSizeOption{MinBufferSize, DefaultSendBufferSize, MaxBufferSize}, + recvBufferSize: ReceiveBufferSizeOption{MinBufferSize, DefaultReceiveBufferSize, MaxBufferSize}, congestionControl: ccReno, availableCongestionControl: []string{ccReno, ccCubic}, } diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go index 8d9de9bf9..e90f9a7d9 100644 --- a/pkg/tcpip/transport/tcp/rcv.go +++ b/pkg/tcpip/transport/tcp/rcv.go @@ -16,6 +16,7 @@ package tcp import ( "container/heap" + "time" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/seqnum" @@ -38,6 +39,9 @@ type receiver struct { // shrinking it. rcvAcc seqnum.Value + // rcvWnd is the non-scaled receive window last advertised to the peer. + rcvWnd seqnum.Size + rcvWndScale uint8 closed bool @@ -47,13 +51,14 @@ type receiver struct { pendingBufSize seqnum.Size } -func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8) *receiver { +func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8, pendingBufSize seqnum.Size) *receiver { return &receiver{ ep: ep, rcvNxt: irs + 1, rcvAcc: irs.Add(rcvWnd + 1), + rcvWnd: rcvWnd, rcvWndScale: rcvWndScale, - pendingBufSize: rcvWnd, + pendingBufSize: pendingBufSize, } } @@ -72,14 +77,16 @@ func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool { // getSendParams returns the parameters needed by the sender when building // segments to send. func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { - // Calculate the window size based on the current buffer size. - n := r.ep.receiveBufferAvailable() - acc := r.rcvNxt.Add(seqnum.Size(n)) + // Calculate the window size based on the available buffer space. + receiveBufferAvailable := r.ep.receiveBufferAvailable() + acc := r.rcvNxt.Add(seqnum.Size(receiveBufferAvailable)) if r.rcvAcc.LessThan(acc) { r.rcvAcc = acc } - - return r.rcvNxt, r.rcvNxt.Size(r.rcvAcc) >> r.rcvWndScale + // Stash away the non-scaled receive window as we use it for measuring + // receiver's estimated RTT. + r.rcvWnd = r.rcvNxt.Size(r.rcvAcc) + return r.rcvNxt, r.rcvWnd >> r.rcvWndScale } // nonZeroWindow is called when the receive window grows from zero to nonzero; @@ -130,6 +137,21 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum // Update the segment that we're expecting to consume. r.rcvNxt = segSeq.Add(segLen) + // In cases of a misbehaving sender which could send more than the + // advertised window, we could end up in a situation where we get a + // segment that exceeds the window advertised. Instead of partially + // accepting the segment and discarding bytes beyond the advertised + // window, we accept the whole segment and make sure r.rcvAcc is moved + // forward to match r.rcvNxt to indicate that the window is now closed. + // + // In absence of this check the r.acceptable() check fails and accepts + // segments that should be dropped because rcvWnd is calculated as + // the size of the interval (rcvNxt, rcvAcc] which becomes extremely + // large if rcvAcc is ever less than rcvNxt. + if r.rcvAcc.LessThan(r.rcvNxt) { + r.rcvAcc = r.rcvNxt + } + // Trim SACK Blocks to remove any SACK information that covers // sequence numbers that have been consumed. TrimSACKBlockList(&r.ep.sack, r.rcvNxt) @@ -198,6 +220,39 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum return true } +// updateRTT updates the receiver RTT measurement based on the sequence number +// of the received segment. +func (r *receiver) updateRTT() { + // From: https://public.lanl.gov/radiant/pubs/drs/sc2001-poster.pdf + // + // A system that is only transmitting acknowledgements can still + // estimate the round-trip time by observing the time between when a byte + // is first acknowledged and the receipt of data that is at least one + // window beyond the sequence number that was acknowledged. + r.ep.rcvListMu.Lock() + if r.ep.rcvAutoParams.rttMeasureTime.IsZero() { + // New measurement. + r.ep.rcvAutoParams.rttMeasureTime = time.Now() + r.ep.rcvAutoParams.rttMeasureSeqNumber = r.rcvNxt.Add(r.rcvWnd) + r.ep.rcvListMu.Unlock() + return + } + if r.rcvNxt.LessThan(r.ep.rcvAutoParams.rttMeasureSeqNumber) { + r.ep.rcvListMu.Unlock() + return + } + rtt := time.Since(r.ep.rcvAutoParams.rttMeasureTime) + // We only store the minimum observed RTT here as this is only used in + // absence of a SRTT available from either timestamps or a sender + // measurement of RTT. + if r.ep.rcvAutoParams.rtt == 0 || rtt < r.ep.rcvAutoParams.rtt { + r.ep.rcvAutoParams.rtt = rtt + } + r.ep.rcvAutoParams.rttMeasureTime = time.Now() + r.ep.rcvAutoParams.rttMeasureSeqNumber = r.rcvNxt.Add(r.rcvWnd) + r.ep.rcvListMu.Unlock() +} + // handleRcvdSegment handles TCP segments directed at the connection managed by // r as they arrive. It is called by the protocol main loop. func (r *receiver) handleRcvdSegment(s *segment) { @@ -226,10 +281,9 @@ func (r *receiver) handleRcvdSegment(s *segment) { r.pendingBufUsed += s.logicalLen() s.incRef() heap.Push(&r.pendingRcvdSegments, s) + UpdateSACKBlocks(&r.ep.sack, segSeq, segSeq.Add(segLen), r.rcvNxt) } - UpdateSACKBlocks(&r.ep.sack, segSeq, segSeq.Add(segLen), r.rcvNxt) - // Immediately send an ack so that the peer knows it may // have to retransmit. r.ep.snd.sendAck() @@ -237,6 +291,12 @@ func (r *receiver) handleRcvdSegment(s *segment) { return } + // Since we consumed a segment update the receiver's RTT estimate + // if required. + if segLen > 0 { + r.updateRTT() + } + // By consuming the current segment, we may have filled a gap in the // sequence number domain that allows pending segments to be consumed // now. So try to do it. diff --git a/pkg/tcpip/transport/tcp/sack.go b/pkg/tcpip/transport/tcp/sack.go index 52c5d9867..7be86d68e 100644 --- a/pkg/tcpip/transport/tcp/sack.go +++ b/pkg/tcpip/transport/tcp/sack.go @@ -31,6 +31,13 @@ const ( // segment identified by segStart->segEnd. func UpdateSACKBlocks(sack *SACKInfo, segStart seqnum.Value, segEnd seqnum.Value, rcvNxt seqnum.Value) { newSB := header.SACKBlock{Start: segStart, End: segEnd} + + // Ignore any invalid SACK blocks or blocks that are before rcvNxt as + // those bytes have already been acked. + if newSB.End.LessThanEq(newSB.Start) || newSB.End.LessThan(rcvNxt) { + return + } + if sack.NumBlocks == 0 { sack.Blocks[0] = newSB sack.NumBlocks = 1 @@ -39,9 +46,8 @@ func UpdateSACKBlocks(sack *SACKInfo, segStart seqnum.Value, segEnd seqnum.Value var n = 0 for i := 0; i < sack.NumBlocks; i++ { start, end := sack.Blocks[i].Start, sack.Blocks[i].End - if end.LessThanEq(start) || start.LessThanEq(rcvNxt) { - // Discard any invalid blocks where end is before start - // and discard any sack blocks that are before rcvNxt as + if end.LessThanEq(rcvNxt) { + // Discard any sack blocks that are before rcvNxt as // those have already been acked. continue } diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 297861462..0fee7ab72 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -121,9 +121,8 @@ type sender struct { // rtt.srtt, rtt.rttvar, and rto are the "smoothed round-trip time", // "round-trip time variation" and "retransmit timeout", as defined in // section 2 of RFC 6298. - rtt rtt - rto time.Duration - srttInited bool + rtt rtt + rto time.Duration // maxPayloadSize is the maximum size of the payload of a given segment. // It is initialized on demand. @@ -150,8 +149,9 @@ type sender struct { type rtt struct { sync.Mutex `state:"nosave"` - srtt time.Duration - rttvar time.Duration + srtt time.Duration + rttvar time.Duration + srttInited bool } // fastRecovery holds information related to fast recovery from a packet loss. @@ -323,10 +323,10 @@ func (s *sender) sendAck() { // available. This is done in accordance with section 2 of RFC 6298. func (s *sender) updateRTO(rtt time.Duration) { s.rtt.Lock() - if !s.srttInited { + if !s.rtt.srttInited { s.rtt.rttvar = rtt / 2 s.rtt.srtt = rtt - s.srttInited = true + s.rtt.srttInited = true } else { diff := s.rtt.srtt - rtt if diff < 0 { diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index ef7ec759d..915a98047 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -1110,8 +1110,9 @@ func TestNonScaledWindowAccept(t *testing.T) { t.Fatalf("Listen failed: %v", err) } - // Do 3-way handshake. - c.PassiveConnect(100, 2, header.TCPSynOptions{MSS: defaultIPv4MSS}) + // Do 3-way handshake w/ window scaling disabled. The SYN-ACK to the SYN + // should not carry the window scaling option. + c.PassiveConnect(100, -1, header.TCPSynOptions{MSS: defaultIPv4MSS}) // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) @@ -1600,7 +1601,6 @@ func TestPassiveSendMSSLessThanMTU(t *testing.T) { // Set the buffer size to a deterministic size so that we can check the // window scaling option. const rcvBufferSize = 0x20000 - const wndScale = 2 if err := ep.SetSockOpt(tcpip.ReceiveBufferSizeOption(rcvBufferSize)); err != nil { t.Fatalf("SetSockOpt failed failed: %v", err) } @@ -1614,7 +1614,7 @@ func TestPassiveSendMSSLessThanMTU(t *testing.T) { } // Do 3-way handshake. - c.PassiveConnect(maxPayload, wndScale, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize}) + c.PassiveConnect(maxPayload, -1, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize}) // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) @@ -1713,7 +1713,7 @@ func TestForwarderSendMSSLessThanMTU(t *testing.T) { s.SetTransportProtocolHandler(tcp.ProtocolNumber, f.HandlePacket) // Do 3-way handshake. - c.PassiveConnect(maxPayload, 1, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize}) + c.PassiveConnect(maxPayload, -1, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize}) // Wait for connection to be available. select { @@ -2767,11 +2767,11 @@ func TestDefaultBufferSizes(t *testing.T) { } }() - checkSendBufferSize(t, ep, tcp.DefaultBufferSize) - checkRecvBufferSize(t, ep, tcp.DefaultBufferSize) + checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize) + checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize) // Change the default send buffer size. - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultBufferSize * 2, tcp.DefaultBufferSize * 20}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultSendBufferSize * 2, tcp.DefaultSendBufferSize * 20}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } @@ -2781,11 +2781,11 @@ func TestDefaultBufferSizes(t *testing.T) { t.Fatalf("NewEndpoint failed; %v", err) } - checkSendBufferSize(t, ep, tcp.DefaultBufferSize*2) - checkRecvBufferSize(t, ep, tcp.DefaultBufferSize) + checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize*2) + checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize) // Change the default receive buffer size. - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultBufferSize * 3, tcp.DefaultBufferSize * 30}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultReceiveBufferSize * 3, tcp.DefaultReceiveBufferSize * 30}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } @@ -2795,8 +2795,8 @@ func TestDefaultBufferSizes(t *testing.T) { t.Fatalf("NewEndpoint failed; %v", err) } - checkSendBufferSize(t, ep, tcp.DefaultBufferSize*2) - checkRecvBufferSize(t, ep, tcp.DefaultBufferSize*3) + checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize*2) + checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize*3) } func TestMinMaxBufferSizes(t *testing.T) { @@ -2810,11 +2810,11 @@ func TestMinMaxBufferSizes(t *testing.T) { defer ep.Close() // Change the min/max values for send/receive - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{200, tcp.DefaultBufferSize * 2, tcp.DefaultBufferSize * 20}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{200, tcp.DefaultReceiveBufferSize * 2, tcp.DefaultReceiveBufferSize * 20}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{300, tcp.DefaultBufferSize * 3, tcp.DefaultBufferSize * 30}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{300, tcp.DefaultSendBufferSize * 3, tcp.DefaultSendBufferSize * 30}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } @@ -2832,17 +2832,17 @@ func TestMinMaxBufferSizes(t *testing.T) { checkSendBufferSize(t, ep, 300) // Set values above the max. - if err := ep.SetSockOpt(tcpip.ReceiveBufferSizeOption(1 + tcp.DefaultBufferSize*20)); err != nil { + if err := ep.SetSockOpt(tcpip.ReceiveBufferSizeOption(1 + tcp.DefaultReceiveBufferSize*20)); err != nil { t.Fatalf("GetSockOpt failed: %v", err) } - checkRecvBufferSize(t, ep, tcp.DefaultBufferSize*20) + checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize*20) - if err := ep.SetSockOpt(tcpip.SendBufferSizeOption(1 + tcp.DefaultBufferSize*30)); err != nil { + if err := ep.SetSockOpt(tcpip.SendBufferSizeOption(1 + tcp.DefaultSendBufferSize*30)); err != nil { t.Fatalf("GetSockOpt failed: %v", err) } - checkSendBufferSize(t, ep, tcp.DefaultBufferSize*30) + checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize*30) } func makeStack() (*stack.Stack, *tcpip.Error) { @@ -3976,3 +3976,273 @@ func TestEndpointBindListenAcceptState(t *testing.T) { } } + +// This test verifies that the auto tuning does not grow the receive buffer if +// the application is not reading the data actively. +func TestReceiveBufferAutoTuningApplicationLimited(t *testing.T) { + const mtu = 1500 + const mss = mtu - header.IPv4MinimumSize - header.TCPMinimumSize + + c := context.New(t, mtu) + defer c.Cleanup() + + stk := c.Stack() + // Set lower limits for auto-tuning tests. This is required because the + // test stops the worker which can cause packets to be dropped because + // the segment queue holding unprocessed packets is limited to 500. + const receiveBufferSize = 80 << 10 // 80KB. + const maxReceiveBufferSize = receiveBufferSize * 10 + if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, receiveBufferSize, maxReceiveBufferSize}); err != nil { + t.Fatalf("SetTransportProtocolOption failed: %v", err) + } + + // Enable auto-tuning. + if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcpip.ModerateReceiveBufferOption(true)); err != nil { + t.Fatalf("SetTransportProtocolOption failed: %v", err) + } + // Change the expected window scale to match the value needed for the + // maximum buffer size defined above. + c.WindowScale = uint8(tcp.FindWndScale(maxReceiveBufferSize)) + + rawEP := c.CreateConnectedWithOptions(header.TCPSynOptions{TS: true, WS: 4}) + + // NOTE: The timestamp values in the sent packets are meaningless to the + // peer so we just increment the timestamp value by 1 every batch as we + // are not really using them for anything. Send a single byte to verify + // the advertised window. + tsVal := rawEP.TSVal + 1 + + // Introduce a 25ms latency by delaying the first byte. + latency := 25 * time.Millisecond + time.Sleep(latency) + rawEP.SendPacketWithTS([]byte{1}, tsVal) + + // Verify that the ACK has the expected window. + wantRcvWnd := receiveBufferSize + wantRcvWnd = (wantRcvWnd >> uint32(c.WindowScale)) + rawEP.VerifyACKRcvWnd(uint16(wantRcvWnd - 1)) + time.Sleep(25 * time.Millisecond) + + // Allocate a large enough payload for the test. + b := make([]byte, int(receiveBufferSize)*2) + offset := 0 + payloadSize := receiveBufferSize - 1 + worker := (c.EP).(interface { + StopWork() + ResumeWork() + }) + tsVal++ + + // Stop the worker goroutine. + worker.StopWork() + start := offset + end := offset + payloadSize + packetsSent := 0 + for ; start < end; start += mss { + rawEP.SendPacketWithTS(b[start:start+mss], tsVal) + packetsSent++ + } + // Resume the worker so that it only sees the packets once all of them + // are waiting to be read. + worker.ResumeWork() + + // Since we read no bytes the window should goto zero till the + // application reads some of the data. + // Discard all intermediate acks except the last one. + if packetsSent > 100 { + for i := 0; i < (packetsSent / 100); i++ { + _ = c.GetPacket() + } + } + rawEP.VerifyACKRcvWnd(0) + + time.Sleep(25 * time.Millisecond) + // Verify that sending more data when window is closed is dropped and + // not acked. + rawEP.SendPacketWithTS(b[start:start+mss], tsVal) + + // Verify that the stack sends us back an ACK with the sequence number + // of the last packet sent indicating it was dropped. + p := c.GetPacket() + checker.IPv4(t, p, checker.TCP( + checker.AckNum(uint32(rawEP.NextSeqNum)-uint32(mss)), + checker.Window(0), + )) + + // Now read all the data from the endpoint and verify that advertised + // window increases to the full available buffer size. + for { + _, _, err := c.EP.Read(nil) + if err == tcpip.ErrWouldBlock { + break + } + } + + // Verify that we receive a non-zero window update ACK. When running + // under thread santizer this test can end up sending more than 1 + // ack, 1 for the non-zero window + p = c.GetPacket() + checker.IPv4(t, p, checker.TCP( + checker.AckNum(uint32(rawEP.NextSeqNum)-uint32(mss)), + func(t *testing.T, h header.Transport) { + tcp, ok := h.(header.TCP) + if !ok { + return + } + if w := tcp.WindowSize(); w == 0 || w > uint16(wantRcvWnd) { + t.Errorf("expected a non-zero window: got %d, want <= wantRcvWnd", w, wantRcvWnd) + } + }, + )) +} + +// This test verifies that the auto tuning does not grow the receive buffer if +// the application is not reading the data actively. +func TestReceiveBufferAutoTuning(t *testing.T) { + const mtu = 1500 + const mss = mtu - header.IPv4MinimumSize - header.TCPMinimumSize + + c := context.New(t, mtu) + defer c.Cleanup() + + // Enable Auto-tuning. + stk := c.Stack() + // Set lower limits for auto-tuning tests. This is required because the + // test stops the worker which can cause packets to be dropped because + // the segment queue holding unprocessed packets is limited to 500. + const receiveBufferSize = 80 << 10 // 80KB. + const maxReceiveBufferSize = receiveBufferSize * 10 + if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, receiveBufferSize, maxReceiveBufferSize}); err != nil { + t.Fatalf("SetTransportProtocolOption failed: %v", err) + } + + // Enable auto-tuning. + if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcpip.ModerateReceiveBufferOption(true)); err != nil { + t.Fatalf("SetTransportProtocolOption failed: %v", err) + } + // Change the expected window scale to match the value needed for the + // maximum buffer size used by stack. + c.WindowScale = uint8(tcp.FindWndScale(maxReceiveBufferSize)) + + rawEP := c.CreateConnectedWithOptions(header.TCPSynOptions{TS: true, WS: 4}) + + wantRcvWnd := receiveBufferSize + scaleRcvWnd := func(rcvWnd int) uint16 { + return uint16(rcvWnd >> uint16(c.WindowScale)) + } + // Allocate a large array to send to the endpoint. + b := make([]byte, receiveBufferSize*48) + + // In every iteration we will send double the number of bytes sent in + // the previous iteration and read the same from the app. The received + // window should grow by at least 2x of bytes read by the app in every + // RTT. + offset := 0 + payloadSize := receiveBufferSize / 8 + worker := (c.EP).(interface { + StopWork() + ResumeWork() + }) + tsVal := rawEP.TSVal + // We are going to do our own computation of what the moderated receive + // buffer should be based on sent/copied data per RTT and verify that + // the advertised window by the stack matches our calculations. + prevCopied := 0 + done := false + latency := 1 * time.Millisecond + for i := 0; !done; i++ { + tsVal++ + + // Stop the worker goroutine. + worker.StopWork() + start := offset + end := offset + payloadSize + totalSent := 0 + packetsSent := 0 + for ; start < end; start += mss { + rawEP.SendPacketWithTS(b[start:start+mss], tsVal) + totalSent += mss + packetsSent++ + } + // Resume it so that it only sees the packets once all of them + // are waiting to be read. + worker.ResumeWork() + + // Give 1ms for the worker to process the packets. + time.Sleep(1 * time.Millisecond) + + // Verify that the advertised window on the ACK is reduced by + // the total bytes sent. + expectedWnd := wantRcvWnd - totalSent + if packetsSent > 100 { + for i := 0; i < (packetsSent / 100); i++ { + _ = c.GetPacket() + } + } + rawEP.VerifyACKRcvWnd(scaleRcvWnd(expectedWnd)) + + // Now read all the data from the endpoint and invoke the + // moderation API to allow for receive buffer auto-tuning + // to happen before we measure the new window. + totalCopied := 0 + for { + b, _, err := c.EP.Read(nil) + if err == tcpip.ErrWouldBlock { + break + } + totalCopied += len(b) + } + + // Invoke the moderation API. This is required for auto-tuning + // to happen. This method is normally expected to be invoked + // from a higher layer than tcpip.Endpoint. So we simulate + // copying to user-space by invoking it explicitly here. + c.EP.ModerateRecvBuf(totalCopied) + + // Now send a keep-alive packet to trigger an ACK so that we can + // measure the new window. + rawEP.NextSeqNum-- + rawEP.SendPacketWithTS(nil, tsVal) + rawEP.NextSeqNum++ + + if i == 0 { + // In the first iteration the receiver based RTT is not + // yet known as a result the moderation code should not + // increase the advertised window. + rawEP.VerifyACKRcvWnd(scaleRcvWnd(wantRcvWnd)) + prevCopied = totalCopied + } else { + rttCopied := totalCopied + if i == 1 { + // The moderation code accumulates copied bytes till + // RTT is established. So add in the bytes sent in + // the first iteration to the total bytes for this + // RTT. + rttCopied += prevCopied + // Now reset it to the initial value used by the + // auto tuning logic. + prevCopied = tcp.InitialCwnd * mss * 2 + } + newWnd := rttCopied<<1 + 16*mss + grow := (newWnd * (rttCopied - prevCopied)) / prevCopied + newWnd += (grow << 1) + if newWnd > maxReceiveBufferSize { + newWnd = maxReceiveBufferSize + done = true + } + rawEP.VerifyACKRcvWnd(scaleRcvWnd(newWnd)) + wantRcvWnd = newWnd + prevCopied = rttCopied + // Increase the latency after first two iterations to + // establish a low RTT value in the receiver since it + // only tracks the lowest value. This ensures that when + // ModerateRcvBuf is called the elapsed time is always > + // rtt. Without this the test is flaky due to delays due + // to scheduling/wakeup etc. + latency += 50 * time.Millisecond + } + time.Sleep(latency) + offset += payloadSize + payloadSize *= 2 + } +} diff --git a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go index ad300b90b..a641e953d 100644 --- a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go @@ -182,7 +182,7 @@ func TestTimeStampEnabledAccept(t *testing.T) { wndSize uint16 }{ {true, -1, 0xffff}, // When cookie is used window scaling is disabled. - {false, 5, 0x8000}, // 0x8000 * 2^5 = 1<<20 = 1MB window (the default). + {false, 5, 0x8000}, // DefaultReceiveBufferSize is 1MB >> 5. } for _, tc := range testCases { timeStampEnabledAccept(t, tc.cookieEnabled, tc.wndScale, tc.wndSize) @@ -239,7 +239,7 @@ func TestTimeStampDisabledAccept(t *testing.T) { wndSize uint16 }{ {true, -1, 0xffff}, // When cookie is used window scaling is disabled. - {false, 5, 0x8000}, // 0x8000 * 2^5 = 1<<20 = 1MB window (the default). + {false, 5, 0x8000}, // DefaultReceiveBufferSize is 1MB >> 5. } for _, tc := range testCases { timeStampDisabledAccept(t, tc.cookieEnabled, tc.wndScale, tc.wndSize) diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go index 2cfeed224..630dd7925 100644 --- a/pkg/tcpip/transport/tcp/testing/context/context.go +++ b/pkg/tcpip/transport/tcp/testing/context/context.go @@ -72,12 +72,6 @@ const ( testInitialSequenceNumber = 789 ) -// defaultWindowScale value specified here depends on the tcp.DefaultBufferSize -// constant defined in the tcp/endpoint.go because the tcp.DefaultBufferSize is -// used in tcp.newHandshake to determine the window scale to use when sending a -// SYN/SYN-ACK. -var defaultWindowScale = tcp.FindWndScale(tcp.DefaultBufferSize) - // Headers is used to represent the TCP header fields when building a // new packet. type Headers struct { @@ -134,6 +128,10 @@ type Context struct { // TimeStampEnabled is true if ep is connected with the timestamp option // enabled. TimeStampEnabled bool + + // WindowScale is the expected window scale in SYN packets sent by + // the stack. + WindowScale uint8 } // New allocates and initializes a test context containing a new @@ -142,11 +140,11 @@ func New(t *testing.T, mtu uint32) *Context { s := stack.New([]string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{tcp.ProtocolName}, stack.Options{}) // Allow minimum send/receive buffer sizes to be 1 during tests. - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultBufferSize, tcp.DefaultBufferSize * 10}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultSendBufferSize, 10 * tcp.DefaultSendBufferSize}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultBufferSize, tcp.DefaultBufferSize * 10}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultReceiveBufferSize, 10 * tcp.DefaultReceiveBufferSize}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } @@ -184,9 +182,10 @@ func New(t *testing.T, mtu uint32) *Context { }) return &Context{ - t: t, - s: s, - linkEP: linkEP, + t: t, + s: s, + linkEP: linkEP, + WindowScale: uint8(tcp.FindWndScale(tcp.DefaultReceiveBufferSize)), } } @@ -672,6 +671,21 @@ func (r *RawEndpoint) VerifyACKWithTS(tsVal uint32) { r.RecentTS = opts.TSVal } +// VerifyACKRcvWnd verifies that the window advertised by the incoming ACK +// matches the provided rcvWnd. +func (r *RawEndpoint) VerifyACKRcvWnd(rcvWnd uint16) { + ackPacket := r.C.GetPacket() + checker.IPv4(r.C.t, ackPacket, + checker.TCP( + checker.DstPort(r.SrcPort), + checker.TCPFlags(header.TCPFlagAck), + checker.SeqNum(uint32(r.AckNum)), + checker.AckNum(uint32(r.NextSeqNum)), + checker.Window(rcvWnd), + ), + ) +} + // VerifyACKNoSACK verifies that the ACK does not contain a SACK block. func (r *RawEndpoint) VerifyACKNoSACK() { r.VerifyACKHasSACK(nil) @@ -732,7 +746,7 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) * checker.TCPSynOptions(header.TCPSynOptions{ MSS: mss, TS: true, - WS: defaultWindowScale, + WS: int(c.WindowScale), SACKPermitted: c.SACKEnabled(), }), ), @@ -747,6 +761,9 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) * // Build options w/ tsVal to be sent in the SYN-ACK. synAckOptions := make([]byte, header.TCPOptionsMaximumSize) offset := 0 + if wantOptions.WS != -1 { + offset += header.EncodeWSOption(wantOptions.WS, synAckOptions[offset:]) + } if wantOptions.TS { offset += header.EncodeTSOption(wantOptions.TSVal, synOptions.TSVal, synAckOptions[offset:]) } diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index fbb1c32e7..99fdfb795 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -169,6 +169,9 @@ func (e *endpoint) Close() { e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) } +// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. +func (e *endpoint) ModerateRecvBuf(copied int) {} + // Read reads data from the endpoint. This method does not block if // there is no data pending. func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { |