From 3d71c627fa03a2da694ec88d690c633a15951fb0 Mon Sep 17 00:00:00 2001 From: Bhasker Hariharan Date: Thu, 13 Jun 2019 22:26:59 -0700 Subject: Add support for TCP receive buffer auto tuning. The implementation is similar to linux where we track the number of bytes consumed by the application to grow the receive buffer of a given TCP endpoint. This ensures that the advertised window grows at a reasonable rate to accomodate for the sender's rate and prevents large amounts of data being held in stack buffers if the application is not actively reading or not reading fast enough. The original paper that was used to implement the linux receive buffer auto- tuning is available @ https://public.lanl.gov/radiant/pubs/drs/lacsi2001.pdf NOTE: Linux does not implement DRS as defined in that paper, it's just a good reference to understand the solution space. Updates #230 PiperOrigin-RevId: 253168283 --- pkg/tcpip/transport/icmp/endpoint.go | 3 + pkg/tcpip/transport/raw/endpoint.go | 3 + pkg/tcpip/transport/tcp/accept.go | 14 +- pkg/tcpip/transport/tcp/connect.go | 48 +++- pkg/tcpip/transport/tcp/endpoint.go | 178 +++++++++++- pkg/tcpip/transport/tcp/endpoint_state.go | 20 ++ pkg/tcpip/transport/tcp/forwarder.go | 2 +- pkg/tcpip/transport/tcp/protocol.go | 36 ++- pkg/tcpip/transport/tcp/rcv.go | 78 +++++- pkg/tcpip/transport/tcp/sack.go | 12 +- pkg/tcpip/transport/tcp/snd.go | 14 +- pkg/tcpip/transport/tcp/tcp_test.go | 308 +++++++++++++++++++-- pkg/tcpip/transport/tcp/tcp_timestamp_test.go | 4 +- pkg/tcpip/transport/tcp/testing/context/context.go | 41 ++- pkg/tcpip/transport/udp/endpoint.go | 3 + 15 files changed, 682 insertions(+), 82 deletions(-) (limited to 'pkg/tcpip/transport') diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go index 0a74429b8..33cefd937 100644 --- a/pkg/tcpip/transport/icmp/endpoint.go +++ b/pkg/tcpip/transport/icmp/endpoint.go @@ -127,6 +127,9 @@ func (e *endpoint) Close() { e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) } +// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. +func (e *endpoint) ModerateRecvBuf(copied int) {} + // Read reads data from the endpoint. This method does not block if // there is no data pending. func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go index 5ea59d7ea..03f495e48 100644 --- a/pkg/tcpip/transport/raw/endpoint.go +++ b/pkg/tcpip/transport/raw/endpoint.go @@ -147,6 +147,9 @@ func (ep *endpoint) Close() { ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) } +// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. +func (ep *endpoint) ModerateRecvBuf(copied int) {} + // Read implements tcpip.Endpoint.Read. func (ep *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { ep.rcvMu.Lock() diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index 9b1ad6a28..52fd1bfa3 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -213,6 +213,7 @@ func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, i n.route = s.route.Clone() n.effectiveNetProtos = []tcpip.NetworkProtocolNumber{s.route.NetProto} n.rcvBufSize = int(l.rcvWnd) + n.amss = mssForRoute(&n.route) n.maybeEnableTimestamp(rcvdSynOpts) n.maybeEnableSACKPermitted(rcvdSynOpts) @@ -232,7 +233,11 @@ func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, i // The receiver at least temporarily has a zero receive window scale, // but the caller may change it (before starting the protocol loop). n.snd = newSender(n, iss, irs, s.window, rcvdSynOpts.MSS, rcvdSynOpts.WS) - n.rcv = newReceiver(n, irs, l.rcvWnd, 0) + n.rcv = newReceiver(n, irs, seqnum.Size(n.initialReceiveWindow()), 0, seqnum.Size(n.receiveBufferSize())) + // Bootstrap the auto tuning algorithm. Starting at zero will result in + // a large step function on the first window adjustment causing the + // window to grow to a really large value. + n.rcvAutoParams.prevCopied = n.initialReceiveWindow() return n, nil } @@ -249,7 +254,7 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head } // Perform the 3-way handshake. - h := newHandshake(ep, l.rcvWnd) + h := newHandshake(ep, seqnum.Size(ep.initialReceiveWindow())) h.resetToSynRcvd(cookie, irs, opts) if err := h.execute(); err != nil { @@ -359,16 +364,19 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { return } cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS)) - // Send SYN with window scaling because we currently + + // Send SYN without window scaling because we currently // dont't encode this information in the cookie. // // Enable Timestamp option if the original syn did have // the timestamp option specified. + mss := mssForRoute(&s.route) synOpts := header.TCPSynOptions{ WS: -1, TS: opts.TS, TSVal: tcpTimeStamp(timeStampOffset()), TSEcr: opts.TSVal, + MSS: uint16(mss), } sendSynTCP(&s.route, s.id, header.TCPFlagSyn|header.TCPFlagAck, cookie, s.sequenceNumber+1, ctx.rcvWnd, synOpts) e.stack.Stats().TCP.ListenOverflowSynCookieSent.Increment() diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 84e3dd26c..00d2ae524 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -78,6 +78,9 @@ type handshake struct { // mss is the maximum segment size received from the peer. mss uint16 + // amss is the maximum segment size advertised by us to the peer. + amss uint16 + // sndWndScale is the send window scale, as defined in RFC 1323. A // negative value means no scaling is supported by the peer. sndWndScale int @@ -87,11 +90,24 @@ type handshake struct { } func newHandshake(ep *endpoint, rcvWnd seqnum.Size) handshake { + rcvWndScale := ep.rcvWndScaleForHandshake() + + // Round-down the rcvWnd to a multiple of wndScale. This ensures that the + // window offered in SYN won't be reduced due to the loss of precision if + // window scaling is enabled after the handshake. + rcvWnd = (rcvWnd >> uint8(rcvWndScale)) << uint8(rcvWndScale) + + // Ensure we can always accept at least 1 byte if the scale specified + // was too high for the provided rcvWnd. + if rcvWnd == 0 { + rcvWnd = 1 + } + h := handshake{ ep: ep, active: true, rcvWnd: rcvWnd, - rcvWndScale: FindWndScale(rcvWnd), + rcvWndScale: int(rcvWndScale), } h.resetState() return h @@ -224,7 +240,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { h.ep.state = StateSynRecv h.ep.mu.Unlock() synOpts := header.TCPSynOptions{ - WS: h.rcvWndScale, + WS: int(h.effectiveRcvWndScale()), TS: rcvSynOpts.TS, TSVal: h.ep.timestamp(), TSEcr: h.ep.recentTS, @@ -233,6 +249,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { // permits SACK. This is not explicitly defined in the RFC but // this is the behaviour implemented by Linux. SACKPermitted: rcvSynOpts.SACKPermitted, + MSS: h.ep.amss, } sendSynTCP(&s.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) @@ -277,6 +294,7 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error { TSVal: h.ep.timestamp(), TSEcr: h.ep.recentTS, SACKPermitted: h.ep.sackPermitted, + MSS: h.ep.amss, } sendSynTCP(&s.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) return nil @@ -419,12 +437,15 @@ func (h *handshake) execute() *tcpip.Error { // Send the initial SYN segment and loop until the handshake is // completed. + h.ep.amss = mssForRoute(&h.ep.route) + synOpts := header.TCPSynOptions{ WS: h.rcvWndScale, TS: true, TSVal: h.ep.timestamp(), TSEcr: h.ep.recentTS, SACKPermitted: bool(sackEnabled), + MSS: h.ep.amss, } // Execute is also called in a listen context so we want to make sure we @@ -433,6 +454,11 @@ func (h *handshake) execute() *tcpip.Error { if h.state == handshakeSynRcvd { synOpts.TS = h.ep.sendTSOk synOpts.SACKPermitted = h.ep.sackPermitted && bool(sackEnabled) + if h.sndWndScale < 0 { + // Disable window scaling if the peer did not send us + // the window scaling option. + synOpts.WS = -1 + } } sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) for h.state != handshakeCompleted { @@ -554,13 +580,6 @@ func makeSynOptions(opts header.TCPSynOptions) []byte { } func sendSynTCP(r *stack.Route, id stack.TransportEndpointID, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts header.TCPSynOptions) *tcpip.Error { - // The MSS in opts is automatically calculated as this function is - // called from many places and we don't want every call point being - // embedded with the MSS calculation. - if opts.MSS == 0 { - opts.MSS = uint16(r.MTU() - header.TCPMinimumSize) - } - options := makeSynOptions(opts) err := sendTCP(r, id, buffer.VectorisedView{}, r.DefaultTTL(), flags, seq, ack, rcvWnd, options, nil) putOptions(options) @@ -861,7 +880,8 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { // This is an active connection, so we must initiate the 3-way // handshake, and then inform potential waiters about its // completion. - h := newHandshake(e, seqnum.Size(e.receiveBufferAvailable())) + initialRcvWnd := e.initialReceiveWindow() + h := newHandshake(e, seqnum.Size(initialRcvWnd)) e.mu.Lock() h.ep.state = StateSynSent e.mu.Unlock() @@ -886,8 +906,14 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { // (indicated by a negative send window scale). e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale) + rcvBufSize := seqnum.Size(e.receiveBufferSize()) e.rcvListMu.Lock() - e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale()) + e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale(), rcvBufSize) + // boot strap the auto tuning algorithm. Starting at zero will + // result in a large step function on the first proper causing + // the window to just go to a really large value after the first + // RTT itself. + e.rcvAutoParams.prevCopied = initialRcvWnd e.rcvListMu.Unlock() } diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 9614b2958..1aa1f12b4 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -132,6 +132,42 @@ type SACKInfo struct { NumBlocks int } +// rcvBufAutoTuneParams are used to hold state variables to compute +// the auto tuned recv buffer size. +// +// +stateify savable +type rcvBufAutoTuneParams struct { + // measureTime is the time at which the current measurement + // was started. + measureTime time.Time `state:".(unixTime)"` + + // copied is the number of bytes copied out of the receive + // buffers since this measure began. + copied int + + // prevCopied is the number of bytes copied out of the receive + // buffers in the previous RTT period. + prevCopied int + + // rtt is the non-smoothed minimum RTT as measured by observing the time + // between when a byte is first acknowledged and the receipt of data + // that is at least one window beyond the sequence number that was + // acknowledged. + rtt time.Duration + + // rttMeasureSeqNumber is the highest acceptable sequence number at the + // time this RTT measurement period began. + rttMeasureSeqNumber seqnum.Value + + // rttMeasureTime is the absolute time at which the current rtt + // measurement period began. + rttMeasureTime time.Time `state:".(unixTime)"` + + // disabled is true if an explicit receive buffer is set for the + // endpoint. + disabled bool +} + // endpoint represents a TCP endpoint. This struct serves as the interface // between users of the endpoint and the protocol implementation; it is legal to // have concurrent goroutines make calls into the endpoint, they are properly @@ -165,11 +201,12 @@ type endpoint struct { // to indicate to users that no more data is coming. // // rcvListMu can be taken after the endpoint mu below. - rcvListMu sync.Mutex `state:"nosave"` - rcvList segmentList `state:"wait"` - rcvClosed bool - rcvBufSize int - rcvBufUsed int + rcvListMu sync.Mutex `state:"nosave"` + rcvList segmentList `state:"wait"` + rcvClosed bool + rcvBufSize int + rcvBufUsed int + rcvAutoParams rcvBufAutoTuneParams // The following fields are protected by the mutex. mu sync.RWMutex `state:"nosave"` @@ -339,6 +376,9 @@ type endpoint struct { bindAddress tcpip.Address connectingAddress tcpip.Address + // amss is the advertised MSS to the peer by this endpoint. + amss uint16 + gso *stack.GSO } @@ -373,8 +413,8 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite netProto: netProto, waiterQueue: waiterQueue, state: StateInitial, - rcvBufSize: DefaultBufferSize, - sndBufSize: DefaultBufferSize, + rcvBufSize: DefaultReceiveBufferSize, + sndBufSize: DefaultSendBufferSize, sndMTU: int(math.MaxInt32), reuseAddr: true, keepalive: keepalive{ @@ -400,6 +440,11 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite e.cc = cs } + var mrb tcpip.ModerateReceiveBufferOption + if err := stack.TransportProtocolOption(ProtocolNumber, &mrb); err == nil { + e.rcvAutoParams.disabled = !bool(mrb) + } + if p := stack.GetTCPProbe(); p != nil { e.probe = p } @@ -408,6 +453,7 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite e.workMu.Init() e.workMu.Lock() e.tsOffset = timeStampOffset() + return e } @@ -551,6 +597,83 @@ func (e *endpoint) cleanupLocked() { tcpip.DeleteDanglingEndpoint(e) } +// initialReceiveWindow returns the initial receive window to advertise in the +// SYN/SYN-ACK. +func (e *endpoint) initialReceiveWindow() int { + rcvWnd := e.receiveBufferAvailable() + if rcvWnd > math.MaxUint16 { + rcvWnd = math.MaxUint16 + } + routeWnd := InitialCwnd * int(mssForRoute(&e.route)) * 2 + if rcvWnd > routeWnd { + rcvWnd = routeWnd + } + return rcvWnd +} + +// ModerateRecvBuf adjusts the receive buffer and the advertised window +// based on the number of bytes copied to user space. +func (e *endpoint) ModerateRecvBuf(copied int) { + e.rcvListMu.Lock() + if e.rcvAutoParams.disabled { + e.rcvListMu.Unlock() + return + } + now := time.Now() + if rtt := e.rcvAutoParams.rtt; rtt == 0 || now.Sub(e.rcvAutoParams.measureTime) < rtt { + e.rcvAutoParams.copied += copied + e.rcvListMu.Unlock() + return + } + prevRTTCopied := e.rcvAutoParams.copied + copied + prevCopied := e.rcvAutoParams.prevCopied + rcvWnd := 0 + if prevRTTCopied > prevCopied { + // The minimal receive window based on what was copied by the app + // in the immediate preceding RTT and some extra buffer for 16 + // segments to account for variations. + // We multiply by 2 to account for packet losses. + rcvWnd = prevRTTCopied*2 + 16*int(e.amss) + + // Scale for slow start based on bytes copied in this RTT vs previous. + grow := (rcvWnd * (prevRTTCopied - prevCopied)) / prevCopied + + // Multiply growth factor by 2 again to account for sender being + // in slow-start where the sender grows it's congestion window + // by 100% per RTT. + rcvWnd += grow * 2 + + // Make sure auto tuned buffer size can always receive upto 2x + // the initial window of 10 segments. + if minRcvWnd := int(e.amss) * InitialCwnd * 2; rcvWnd < minRcvWnd { + rcvWnd = minRcvWnd + } + + // Cap the auto tuned buffer size by the maximum permissible + // receive buffer size. + if max := e.maxReceiveBufferSize(); rcvWnd > max { + rcvWnd = max + } + + // We do not adjust downwards as that can cause the receiver to + // reject valid data that might already be in flight as the + // acceptable window will shrink. + if rcvWnd > e.rcvBufSize { + e.rcvBufSize = rcvWnd + e.notifyProtocolGoroutine(notifyReceiveWindowChanged) + } + + // We only update prevCopied when we grow the buffer because in cases + // where prevCopied > prevRTTCopied the existing buffer is already big + // enough to handle the current rate and we don't need to do any + // adjustments. + e.rcvAutoParams.prevCopied = prevRTTCopied + } + e.rcvAutoParams.measureTime = now + e.rcvAutoParams.copied = 0 + e.rcvListMu.Unlock() +} + // Read reads data from the endpoint. func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { e.mu.RLock() @@ -821,6 +944,7 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { wasZero := e.zeroReceiveWindow(scale) e.rcvBufSize = size + e.rcvAutoParams.disabled = true if wasZero && !e.zeroReceiveWindow(scale) { mask |= notifyNonZeroReceiveWindow } @@ -1657,6 +1781,33 @@ func (e *endpoint) receiveBufferSize() int { return size } +func (e *endpoint) maxReceiveBufferSize() int { + var rs ReceiveBufferSizeOption + if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err != nil { + // As a fallback return the hardcoded max buffer size. + return MaxBufferSize + } + return rs.Max +} + +// rcvWndScaleForHandshake computes the receive window scale to offer to the +// peer when window scaling is enabled (true by default). If auto-tuning is +// disabled then the window scaling factor is based on the size of the +// receiveBuffer otherwise we use the max permissible receive buffer size to +// compute the scale. +func (e *endpoint) rcvWndScaleForHandshake() int { + bufSizeForScale := e.receiveBufferSize() + + e.rcvListMu.Lock() + autoTuningDisabled := e.rcvAutoParams.disabled + e.rcvListMu.Unlock() + if autoTuningDisabled { + return FindWndScale(seqnum.Size(bufSizeForScale)) + } + + return FindWndScale(seqnum.Size(e.maxReceiveBufferSize())) +} + // updateRecentTimestamp updates the recent timestamp using the algorithm // described in https://tools.ietf.org/html/rfc7323#section-4.3 func (e *endpoint) updateRecentTimestamp(tsVal uint32, maxSentAck seqnum.Value, segSeq seqnum.Value) { @@ -1749,6 +1900,13 @@ func (e *endpoint) completeState() stack.TCPEndpointState { s.RcvBufSize = e.rcvBufSize s.RcvBufUsed = e.rcvBufUsed s.RcvClosed = e.rcvClosed + s.RcvAutoParams.MeasureTime = e.rcvAutoParams.measureTime + s.RcvAutoParams.CopiedBytes = e.rcvAutoParams.copied + s.RcvAutoParams.PrevCopiedBytes = e.rcvAutoParams.prevCopied + s.RcvAutoParams.RTT = e.rcvAutoParams.rtt + s.RcvAutoParams.RTTMeasureSeqNumber = e.rcvAutoParams.rttMeasureSeqNumber + s.RcvAutoParams.RTTMeasureTime = e.rcvAutoParams.rttMeasureTime + s.RcvAutoParams.Disabled = e.rcvAutoParams.disabled e.rcvListMu.Unlock() // Endpoint TCP Option state. @@ -1802,13 +1960,13 @@ func (e *endpoint) completeState() stack.TCPEndpointState { RTTMeasureTime: e.snd.rttMeasureTime, Closed: e.snd.closed, RTO: e.snd.rto, - SRTTInited: e.snd.srttInited, MaxPayloadSize: e.snd.maxPayloadSize, SndWndScale: e.snd.sndWndScale, MaxSentAck: e.snd.maxSentAck, } e.snd.rtt.Lock() s.Sender.SRTT = e.snd.rtt.srtt + s.Sender.SRTTInited = e.snd.rtt.srttInited e.snd.rtt.Unlock() if cubic, ok := e.snd.cc.(*cubicState); ok { @@ -1856,3 +2014,7 @@ func (e *endpoint) State() uint32 { defer e.mu.Unlock() return uint32(e.state) } + +func mssForRoute(r *stack.Route) uint16 { + return uint16(r.MTU() - header.TCPMinimumSize) +} diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go index 58be61927..ec61a3886 100644 --- a/pkg/tcpip/transport/tcp/endpoint_state.go +++ b/pkg/tcpip/transport/tcp/endpoint_state.go @@ -360,3 +360,23 @@ func loadError(s string) *tcpip.Error { return e } + +// saveMeasureTime is invoked by stateify. +func (r *rcvBufAutoTuneParams) saveMeasureTime() unixTime { + return unixTime{r.measureTime.Unix(), r.measureTime.UnixNano()} +} + +// loadMeasureTime is invoked by stateify. +func (r *rcvBufAutoTuneParams) loadMeasureTime(unix unixTime) { + r.measureTime = time.Unix(unix.second, unix.nano) +} + +// saveRttMeasureTime is invoked by stateify. +func (r *rcvBufAutoTuneParams) saveRttMeasureTime() unixTime { + return unixTime{r.rttMeasureTime.Unix(), r.rttMeasureTime.UnixNano()} +} + +// loadRttMeasureTime is invoked by stateify. +func (r *rcvBufAutoTuneParams) loadRttMeasureTime(unix unixTime) { + r.rttMeasureTime = time.Unix(unix.second, unix.nano) +} diff --git a/pkg/tcpip/transport/tcp/forwarder.go b/pkg/tcpip/transport/tcp/forwarder.go index 2ce94aeb9..63666f0b3 100644 --- a/pkg/tcpip/transport/tcp/forwarder.go +++ b/pkg/tcpip/transport/tcp/forwarder.go @@ -47,7 +47,7 @@ type Forwarder struct { // If rcvWnd is set to zero, the default buffer size is used instead. func NewForwarder(s *stack.Stack, rcvWnd, maxInFlight int, handler func(*ForwarderRequest)) *Forwarder { if rcvWnd == 0 { - rcvWnd = DefaultBufferSize + rcvWnd = DefaultReceiveBufferSize } return &Forwarder{ maxInFlight: maxInFlight, diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go index 919e4ce24..ee04dcfcc 100644 --- a/pkg/tcpip/transport/tcp/protocol.go +++ b/pkg/tcpip/transport/tcp/protocol.go @@ -41,13 +41,18 @@ const ( ProtocolNumber = header.TCPProtocolNumber // MinBufferSize is the smallest size of a receive or send buffer. - minBufferSize = 4 << 10 // 4096 bytes. + MinBufferSize = 4 << 10 // 4096 bytes. - // DefaultBufferSize is the default size of the receive and send buffers. - DefaultBufferSize = 1 << 20 // 1MB + // DefaultSendBufferSize is the default size of the send buffer for + // an endpoint. + DefaultSendBufferSize = 1 << 20 // 1MB - // MaxBufferSize is the largest size a receive and send buffer can grow to. - maxBufferSize = 4 << 20 // 4MB + // DefaultReceiveBufferSize is the default size of the receive buffer + // for an endpoint. + DefaultReceiveBufferSize = 1 << 20 // 1MB + + // MaxBufferSize is the largest size a receive/send buffer can grow to. + MaxBufferSize = 4 << 20 // 4MB // MaxUnprocessedSegments is the maximum number of unprocessed segments // that can be queued for a given endpoint. @@ -86,6 +91,7 @@ type protocol struct { recvBufferSize ReceiveBufferSizeOption congestionControl string availableCongestionControl []string + moderateReceiveBuffer bool } // Number returns the tcp protocol number. @@ -192,6 +198,13 @@ func (p *protocol) SetOption(option interface{}) *tcpip.Error { // linux returns ENOENT when an invalid congestion control // is specified. return tcpip.ErrNoSuchFile + + case tcpip.ModerateReceiveBufferOption: + p.mu.Lock() + p.moderateReceiveBuffer = bool(v) + p.mu.Unlock() + return nil + default: return tcpip.ErrUnknownProtocolOption } @@ -217,16 +230,25 @@ func (p *protocol) Option(option interface{}) *tcpip.Error { *v = p.recvBufferSize p.mu.Unlock() return nil + case *tcpip.CongestionControlOption: p.mu.Lock() *v = tcpip.CongestionControlOption(p.congestionControl) p.mu.Unlock() return nil + case *tcpip.AvailableCongestionControlOption: p.mu.Lock() *v = tcpip.AvailableCongestionControlOption(strings.Join(p.availableCongestionControl, " ")) p.mu.Unlock() return nil + + case *tcpip.ModerateReceiveBufferOption: + p.mu.Lock() + *v = tcpip.ModerateReceiveBufferOption(p.moderateReceiveBuffer) + p.mu.Unlock() + return nil + default: return tcpip.ErrUnknownProtocolOption } @@ -235,8 +257,8 @@ func (p *protocol) Option(option interface{}) *tcpip.Error { func init() { stack.RegisterTransportProtocolFactory(ProtocolName, func() stack.TransportProtocol { return &protocol{ - sendBufferSize: SendBufferSizeOption{minBufferSize, DefaultBufferSize, maxBufferSize}, - recvBufferSize: ReceiveBufferSizeOption{minBufferSize, DefaultBufferSize, maxBufferSize}, + sendBufferSize: SendBufferSizeOption{MinBufferSize, DefaultSendBufferSize, MaxBufferSize}, + recvBufferSize: ReceiveBufferSizeOption{MinBufferSize, DefaultReceiveBufferSize, MaxBufferSize}, congestionControl: ccReno, availableCongestionControl: []string{ccReno, ccCubic}, } diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go index 8d9de9bf9..e90f9a7d9 100644 --- a/pkg/tcpip/transport/tcp/rcv.go +++ b/pkg/tcpip/transport/tcp/rcv.go @@ -16,6 +16,7 @@ package tcp import ( "container/heap" + "time" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/seqnum" @@ -38,6 +39,9 @@ type receiver struct { // shrinking it. rcvAcc seqnum.Value + // rcvWnd is the non-scaled receive window last advertised to the peer. + rcvWnd seqnum.Size + rcvWndScale uint8 closed bool @@ -47,13 +51,14 @@ type receiver struct { pendingBufSize seqnum.Size } -func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8) *receiver { +func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8, pendingBufSize seqnum.Size) *receiver { return &receiver{ ep: ep, rcvNxt: irs + 1, rcvAcc: irs.Add(rcvWnd + 1), + rcvWnd: rcvWnd, rcvWndScale: rcvWndScale, - pendingBufSize: rcvWnd, + pendingBufSize: pendingBufSize, } } @@ -72,14 +77,16 @@ func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool { // getSendParams returns the parameters needed by the sender when building // segments to send. func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { - // Calculate the window size based on the current buffer size. - n := r.ep.receiveBufferAvailable() - acc := r.rcvNxt.Add(seqnum.Size(n)) + // Calculate the window size based on the available buffer space. + receiveBufferAvailable := r.ep.receiveBufferAvailable() + acc := r.rcvNxt.Add(seqnum.Size(receiveBufferAvailable)) if r.rcvAcc.LessThan(acc) { r.rcvAcc = acc } - - return r.rcvNxt, r.rcvNxt.Size(r.rcvAcc) >> r.rcvWndScale + // Stash away the non-scaled receive window as we use it for measuring + // receiver's estimated RTT. + r.rcvWnd = r.rcvNxt.Size(r.rcvAcc) + return r.rcvNxt, r.rcvWnd >> r.rcvWndScale } // nonZeroWindow is called when the receive window grows from zero to nonzero; @@ -130,6 +137,21 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum // Update the segment that we're expecting to consume. r.rcvNxt = segSeq.Add(segLen) + // In cases of a misbehaving sender which could send more than the + // advertised window, we could end up in a situation where we get a + // segment that exceeds the window advertised. Instead of partially + // accepting the segment and discarding bytes beyond the advertised + // window, we accept the whole segment and make sure r.rcvAcc is moved + // forward to match r.rcvNxt to indicate that the window is now closed. + // + // In absence of this check the r.acceptable() check fails and accepts + // segments that should be dropped because rcvWnd is calculated as + // the size of the interval (rcvNxt, rcvAcc] which becomes extremely + // large if rcvAcc is ever less than rcvNxt. + if r.rcvAcc.LessThan(r.rcvNxt) { + r.rcvAcc = r.rcvNxt + } + // Trim SACK Blocks to remove any SACK information that covers // sequence numbers that have been consumed. TrimSACKBlockList(&r.ep.sack, r.rcvNxt) @@ -198,6 +220,39 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum return true } +// updateRTT updates the receiver RTT measurement based on the sequence number +// of the received segment. +func (r *receiver) updateRTT() { + // From: https://public.lanl.gov/radiant/pubs/drs/sc2001-poster.pdf + // + // A system that is only transmitting acknowledgements can still + // estimate the round-trip time by observing the time between when a byte + // is first acknowledged and the receipt of data that is at least one + // window beyond the sequence number that was acknowledged. + r.ep.rcvListMu.Lock() + if r.ep.rcvAutoParams.rttMeasureTime.IsZero() { + // New measurement. + r.ep.rcvAutoParams.rttMeasureTime = time.Now() + r.ep.rcvAutoParams.rttMeasureSeqNumber = r.rcvNxt.Add(r.rcvWnd) + r.ep.rcvListMu.Unlock() + return + } + if r.rcvNxt.LessThan(r.ep.rcvAutoParams.rttMeasureSeqNumber) { + r.ep.rcvListMu.Unlock() + return + } + rtt := time.Since(r.ep.rcvAutoParams.rttMeasureTime) + // We only store the minimum observed RTT here as this is only used in + // absence of a SRTT available from either timestamps or a sender + // measurement of RTT. + if r.ep.rcvAutoParams.rtt == 0 || rtt < r.ep.rcvAutoParams.rtt { + r.ep.rcvAutoParams.rtt = rtt + } + r.ep.rcvAutoParams.rttMeasureTime = time.Now() + r.ep.rcvAutoParams.rttMeasureSeqNumber = r.rcvNxt.Add(r.rcvWnd) + r.ep.rcvListMu.Unlock() +} + // handleRcvdSegment handles TCP segments directed at the connection managed by // r as they arrive. It is called by the protocol main loop. func (r *receiver) handleRcvdSegment(s *segment) { @@ -226,10 +281,9 @@ func (r *receiver) handleRcvdSegment(s *segment) { r.pendingBufUsed += s.logicalLen() s.incRef() heap.Push(&r.pendingRcvdSegments, s) + UpdateSACKBlocks(&r.ep.sack, segSeq, segSeq.Add(segLen), r.rcvNxt) } - UpdateSACKBlocks(&r.ep.sack, segSeq, segSeq.Add(segLen), r.rcvNxt) - // Immediately send an ack so that the peer knows it may // have to retransmit. r.ep.snd.sendAck() @@ -237,6 +291,12 @@ func (r *receiver) handleRcvdSegment(s *segment) { return } + // Since we consumed a segment update the receiver's RTT estimate + // if required. + if segLen > 0 { + r.updateRTT() + } + // By consuming the current segment, we may have filled a gap in the // sequence number domain that allows pending segments to be consumed // now. So try to do it. diff --git a/pkg/tcpip/transport/tcp/sack.go b/pkg/tcpip/transport/tcp/sack.go index 52c5d9867..7be86d68e 100644 --- a/pkg/tcpip/transport/tcp/sack.go +++ b/pkg/tcpip/transport/tcp/sack.go @@ -31,6 +31,13 @@ const ( // segment identified by segStart->segEnd. func UpdateSACKBlocks(sack *SACKInfo, segStart seqnum.Value, segEnd seqnum.Value, rcvNxt seqnum.Value) { newSB := header.SACKBlock{Start: segStart, End: segEnd} + + // Ignore any invalid SACK blocks or blocks that are before rcvNxt as + // those bytes have already been acked. + if newSB.End.LessThanEq(newSB.Start) || newSB.End.LessThan(rcvNxt) { + return + } + if sack.NumBlocks == 0 { sack.Blocks[0] = newSB sack.NumBlocks = 1 @@ -39,9 +46,8 @@ func UpdateSACKBlocks(sack *SACKInfo, segStart seqnum.Value, segEnd seqnum.Value var n = 0 for i := 0; i < sack.NumBlocks; i++ { start, end := sack.Blocks[i].Start, sack.Blocks[i].End - if end.LessThanEq(start) || start.LessThanEq(rcvNxt) { - // Discard any invalid blocks where end is before start - // and discard any sack blocks that are before rcvNxt as + if end.LessThanEq(rcvNxt) { + // Discard any sack blocks that are before rcvNxt as // those have already been acked. continue } diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 297861462..0fee7ab72 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -121,9 +121,8 @@ type sender struct { // rtt.srtt, rtt.rttvar, and rto are the "smoothed round-trip time", // "round-trip time variation" and "retransmit timeout", as defined in // section 2 of RFC 6298. - rtt rtt - rto time.Duration - srttInited bool + rtt rtt + rto time.Duration // maxPayloadSize is the maximum size of the payload of a given segment. // It is initialized on demand. @@ -150,8 +149,9 @@ type sender struct { type rtt struct { sync.Mutex `state:"nosave"` - srtt time.Duration - rttvar time.Duration + srtt time.Duration + rttvar time.Duration + srttInited bool } // fastRecovery holds information related to fast recovery from a packet loss. @@ -323,10 +323,10 @@ func (s *sender) sendAck() { // available. This is done in accordance with section 2 of RFC 6298. func (s *sender) updateRTO(rtt time.Duration) { s.rtt.Lock() - if !s.srttInited { + if !s.rtt.srttInited { s.rtt.rttvar = rtt / 2 s.rtt.srtt = rtt - s.srttInited = true + s.rtt.srttInited = true } else { diff := s.rtt.srtt - rtt if diff < 0 { diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index ef7ec759d..915a98047 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -1110,8 +1110,9 @@ func TestNonScaledWindowAccept(t *testing.T) { t.Fatalf("Listen failed: %v", err) } - // Do 3-way handshake. - c.PassiveConnect(100, 2, header.TCPSynOptions{MSS: defaultIPv4MSS}) + // Do 3-way handshake w/ window scaling disabled. The SYN-ACK to the SYN + // should not carry the window scaling option. + c.PassiveConnect(100, -1, header.TCPSynOptions{MSS: defaultIPv4MSS}) // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) @@ -1600,7 +1601,6 @@ func TestPassiveSendMSSLessThanMTU(t *testing.T) { // Set the buffer size to a deterministic size so that we can check the // window scaling option. const rcvBufferSize = 0x20000 - const wndScale = 2 if err := ep.SetSockOpt(tcpip.ReceiveBufferSizeOption(rcvBufferSize)); err != nil { t.Fatalf("SetSockOpt failed failed: %v", err) } @@ -1614,7 +1614,7 @@ func TestPassiveSendMSSLessThanMTU(t *testing.T) { } // Do 3-way handshake. - c.PassiveConnect(maxPayload, wndScale, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize}) + c.PassiveConnect(maxPayload, -1, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize}) // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) @@ -1713,7 +1713,7 @@ func TestForwarderSendMSSLessThanMTU(t *testing.T) { s.SetTransportProtocolHandler(tcp.ProtocolNumber, f.HandlePacket) // Do 3-way handshake. - c.PassiveConnect(maxPayload, 1, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize}) + c.PassiveConnect(maxPayload, -1, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize}) // Wait for connection to be available. select { @@ -2767,11 +2767,11 @@ func TestDefaultBufferSizes(t *testing.T) { } }() - checkSendBufferSize(t, ep, tcp.DefaultBufferSize) - checkRecvBufferSize(t, ep, tcp.DefaultBufferSize) + checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize) + checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize) // Change the default send buffer size. - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultBufferSize * 2, tcp.DefaultBufferSize * 20}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultSendBufferSize * 2, tcp.DefaultSendBufferSize * 20}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } @@ -2781,11 +2781,11 @@ func TestDefaultBufferSizes(t *testing.T) { t.Fatalf("NewEndpoint failed; %v", err) } - checkSendBufferSize(t, ep, tcp.DefaultBufferSize*2) - checkRecvBufferSize(t, ep, tcp.DefaultBufferSize) + checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize*2) + checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize) // Change the default receive buffer size. - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultBufferSize * 3, tcp.DefaultBufferSize * 30}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultReceiveBufferSize * 3, tcp.DefaultReceiveBufferSize * 30}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } @@ -2795,8 +2795,8 @@ func TestDefaultBufferSizes(t *testing.T) { t.Fatalf("NewEndpoint failed; %v", err) } - checkSendBufferSize(t, ep, tcp.DefaultBufferSize*2) - checkRecvBufferSize(t, ep, tcp.DefaultBufferSize*3) + checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize*2) + checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize*3) } func TestMinMaxBufferSizes(t *testing.T) { @@ -2810,11 +2810,11 @@ func TestMinMaxBufferSizes(t *testing.T) { defer ep.Close() // Change the min/max values for send/receive - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{200, tcp.DefaultBufferSize * 2, tcp.DefaultBufferSize * 20}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{200, tcp.DefaultReceiveBufferSize * 2, tcp.DefaultReceiveBufferSize * 20}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{300, tcp.DefaultBufferSize * 3, tcp.DefaultBufferSize * 30}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{300, tcp.DefaultSendBufferSize * 3, tcp.DefaultSendBufferSize * 30}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } @@ -2832,17 +2832,17 @@ func TestMinMaxBufferSizes(t *testing.T) { checkSendBufferSize(t, ep, 300) // Set values above the max. - if err := ep.SetSockOpt(tcpip.ReceiveBufferSizeOption(1 + tcp.DefaultBufferSize*20)); err != nil { + if err := ep.SetSockOpt(tcpip.ReceiveBufferSizeOption(1 + tcp.DefaultReceiveBufferSize*20)); err != nil { t.Fatalf("GetSockOpt failed: %v", err) } - checkRecvBufferSize(t, ep, tcp.DefaultBufferSize*20) + checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize*20) - if err := ep.SetSockOpt(tcpip.SendBufferSizeOption(1 + tcp.DefaultBufferSize*30)); err != nil { + if err := ep.SetSockOpt(tcpip.SendBufferSizeOption(1 + tcp.DefaultSendBufferSize*30)); err != nil { t.Fatalf("GetSockOpt failed: %v", err) } - checkSendBufferSize(t, ep, tcp.DefaultBufferSize*30) + checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize*30) } func makeStack() (*stack.Stack, *tcpip.Error) { @@ -3976,3 +3976,273 @@ func TestEndpointBindListenAcceptState(t *testing.T) { } } + +// This test verifies that the auto tuning does not grow the receive buffer if +// the application is not reading the data actively. +func TestReceiveBufferAutoTuningApplicationLimited(t *testing.T) { + const mtu = 1500 + const mss = mtu - header.IPv4MinimumSize - header.TCPMinimumSize + + c := context.New(t, mtu) + defer c.Cleanup() + + stk := c.Stack() + // Set lower limits for auto-tuning tests. This is required because the + // test stops the worker which can cause packets to be dropped because + // the segment queue holding unprocessed packets is limited to 500. + const receiveBufferSize = 80 << 10 // 80KB. + const maxReceiveBufferSize = receiveBufferSize * 10 + if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, receiveBufferSize, maxReceiveBufferSize}); err != nil { + t.Fatalf("SetTransportProtocolOption failed: %v", err) + } + + // Enable auto-tuning. + if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcpip.ModerateReceiveBufferOption(true)); err != nil { + t.Fatalf("SetTransportProtocolOption failed: %v", err) + } + // Change the expected window scale to match the value needed for the + // maximum buffer size defined above. + c.WindowScale = uint8(tcp.FindWndScale(maxReceiveBufferSize)) + + rawEP := c.CreateConnectedWithOptions(header.TCPSynOptions{TS: true, WS: 4}) + + // NOTE: The timestamp values in the sent packets are meaningless to the + // peer so we just increment the timestamp value by 1 every batch as we + // are not really using them for anything. Send a single byte to verify + // the advertised window. + tsVal := rawEP.TSVal + 1 + + // Introduce a 25ms latency by delaying the first byte. + latency := 25 * time.Millisecond + time.Sleep(latency) + rawEP.SendPacketWithTS([]byte{1}, tsVal) + + // Verify that the ACK has the expected window. + wantRcvWnd := receiveBufferSize + wantRcvWnd = (wantRcvWnd >> uint32(c.WindowScale)) + rawEP.VerifyACKRcvWnd(uint16(wantRcvWnd - 1)) + time.Sleep(25 * time.Millisecond) + + // Allocate a large enough payload for the test. + b := make([]byte, int(receiveBufferSize)*2) + offset := 0 + payloadSize := receiveBufferSize - 1 + worker := (c.EP).(interface { + StopWork() + ResumeWork() + }) + tsVal++ + + // Stop the worker goroutine. + worker.StopWork() + start := offset + end := offset + payloadSize + packetsSent := 0 + for ; start < end; start += mss { + rawEP.SendPacketWithTS(b[start:start+mss], tsVal) + packetsSent++ + } + // Resume the worker so that it only sees the packets once all of them + // are waiting to be read. + worker.ResumeWork() + + // Since we read no bytes the window should goto zero till the + // application reads some of the data. + // Discard all intermediate acks except the last one. + if packetsSent > 100 { + for i := 0; i < (packetsSent / 100); i++ { + _ = c.GetPacket() + } + } + rawEP.VerifyACKRcvWnd(0) + + time.Sleep(25 * time.Millisecond) + // Verify that sending more data when window is closed is dropped and + // not acked. + rawEP.SendPacketWithTS(b[start:start+mss], tsVal) + + // Verify that the stack sends us back an ACK with the sequence number + // of the last packet sent indicating it was dropped. + p := c.GetPacket() + checker.IPv4(t, p, checker.TCP( + checker.AckNum(uint32(rawEP.NextSeqNum)-uint32(mss)), + checker.Window(0), + )) + + // Now read all the data from the endpoint and verify that advertised + // window increases to the full available buffer size. + for { + _, _, err := c.EP.Read(nil) + if err == tcpip.ErrWouldBlock { + break + } + } + + // Verify that we receive a non-zero window update ACK. When running + // under thread santizer this test can end up sending more than 1 + // ack, 1 for the non-zero window + p = c.GetPacket() + checker.IPv4(t, p, checker.TCP( + checker.AckNum(uint32(rawEP.NextSeqNum)-uint32(mss)), + func(t *testing.T, h header.Transport) { + tcp, ok := h.(header.TCP) + if !ok { + return + } + if w := tcp.WindowSize(); w == 0 || w > uint16(wantRcvWnd) { + t.Errorf("expected a non-zero window: got %d, want <= wantRcvWnd", w, wantRcvWnd) + } + }, + )) +} + +// This test verifies that the auto tuning does not grow the receive buffer if +// the application is not reading the data actively. +func TestReceiveBufferAutoTuning(t *testing.T) { + const mtu = 1500 + const mss = mtu - header.IPv4MinimumSize - header.TCPMinimumSize + + c := context.New(t, mtu) + defer c.Cleanup() + + // Enable Auto-tuning. + stk := c.Stack() + // Set lower limits for auto-tuning tests. This is required because the + // test stops the worker which can cause packets to be dropped because + // the segment queue holding unprocessed packets is limited to 500. + const receiveBufferSize = 80 << 10 // 80KB. + const maxReceiveBufferSize = receiveBufferSize * 10 + if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, receiveBufferSize, maxReceiveBufferSize}); err != nil { + t.Fatalf("SetTransportProtocolOption failed: %v", err) + } + + // Enable auto-tuning. + if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcpip.ModerateReceiveBufferOption(true)); err != nil { + t.Fatalf("SetTransportProtocolOption failed: %v", err) + } + // Change the expected window scale to match the value needed for the + // maximum buffer size used by stack. + c.WindowScale = uint8(tcp.FindWndScale(maxReceiveBufferSize)) + + rawEP := c.CreateConnectedWithOptions(header.TCPSynOptions{TS: true, WS: 4}) + + wantRcvWnd := receiveBufferSize + scaleRcvWnd := func(rcvWnd int) uint16 { + return uint16(rcvWnd >> uint16(c.WindowScale)) + } + // Allocate a large array to send to the endpoint. + b := make([]byte, receiveBufferSize*48) + + // In every iteration we will send double the number of bytes sent in + // the previous iteration and read the same from the app. The received + // window should grow by at least 2x of bytes read by the app in every + // RTT. + offset := 0 + payloadSize := receiveBufferSize / 8 + worker := (c.EP).(interface { + StopWork() + ResumeWork() + }) + tsVal := rawEP.TSVal + // We are going to do our own computation of what the moderated receive + // buffer should be based on sent/copied data per RTT and verify that + // the advertised window by the stack matches our calculations. + prevCopied := 0 + done := false + latency := 1 * time.Millisecond + for i := 0; !done; i++ { + tsVal++ + + // Stop the worker goroutine. + worker.StopWork() + start := offset + end := offset + payloadSize + totalSent := 0 + packetsSent := 0 + for ; start < end; start += mss { + rawEP.SendPacketWithTS(b[start:start+mss], tsVal) + totalSent += mss + packetsSent++ + } + // Resume it so that it only sees the packets once all of them + // are waiting to be read. + worker.ResumeWork() + + // Give 1ms for the worker to process the packets. + time.Sleep(1 * time.Millisecond) + + // Verify that the advertised window on the ACK is reduced by + // the total bytes sent. + expectedWnd := wantRcvWnd - totalSent + if packetsSent > 100 { + for i := 0; i < (packetsSent / 100); i++ { + _ = c.GetPacket() + } + } + rawEP.VerifyACKRcvWnd(scaleRcvWnd(expectedWnd)) + + // Now read all the data from the endpoint and invoke the + // moderation API to allow for receive buffer auto-tuning + // to happen before we measure the new window. + totalCopied := 0 + for { + b, _, err := c.EP.Read(nil) + if err == tcpip.ErrWouldBlock { + break + } + totalCopied += len(b) + } + + // Invoke the moderation API. This is required for auto-tuning + // to happen. This method is normally expected to be invoked + // from a higher layer than tcpip.Endpoint. So we simulate + // copying to user-space by invoking it explicitly here. + c.EP.ModerateRecvBuf(totalCopied) + + // Now send a keep-alive packet to trigger an ACK so that we can + // measure the new window. + rawEP.NextSeqNum-- + rawEP.SendPacketWithTS(nil, tsVal) + rawEP.NextSeqNum++ + + if i == 0 { + // In the first iteration the receiver based RTT is not + // yet known as a result the moderation code should not + // increase the advertised window. + rawEP.VerifyACKRcvWnd(scaleRcvWnd(wantRcvWnd)) + prevCopied = totalCopied + } else { + rttCopied := totalCopied + if i == 1 { + // The moderation code accumulates copied bytes till + // RTT is established. So add in the bytes sent in + // the first iteration to the total bytes for this + // RTT. + rttCopied += prevCopied + // Now reset it to the initial value used by the + // auto tuning logic. + prevCopied = tcp.InitialCwnd * mss * 2 + } + newWnd := rttCopied<<1 + 16*mss + grow := (newWnd * (rttCopied - prevCopied)) / prevCopied + newWnd += (grow << 1) + if newWnd > maxReceiveBufferSize { + newWnd = maxReceiveBufferSize + done = true + } + rawEP.VerifyACKRcvWnd(scaleRcvWnd(newWnd)) + wantRcvWnd = newWnd + prevCopied = rttCopied + // Increase the latency after first two iterations to + // establish a low RTT value in the receiver since it + // only tracks the lowest value. This ensures that when + // ModerateRcvBuf is called the elapsed time is always > + // rtt. Without this the test is flaky due to delays due + // to scheduling/wakeup etc. + latency += 50 * time.Millisecond + } + time.Sleep(latency) + offset += payloadSize + payloadSize *= 2 + } +} diff --git a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go index ad300b90b..a641e953d 100644 --- a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go @@ -182,7 +182,7 @@ func TestTimeStampEnabledAccept(t *testing.T) { wndSize uint16 }{ {true, -1, 0xffff}, // When cookie is used window scaling is disabled. - {false, 5, 0x8000}, // 0x8000 * 2^5 = 1<<20 = 1MB window (the default). + {false, 5, 0x8000}, // DefaultReceiveBufferSize is 1MB >> 5. } for _, tc := range testCases { timeStampEnabledAccept(t, tc.cookieEnabled, tc.wndScale, tc.wndSize) @@ -239,7 +239,7 @@ func TestTimeStampDisabledAccept(t *testing.T) { wndSize uint16 }{ {true, -1, 0xffff}, // When cookie is used window scaling is disabled. - {false, 5, 0x8000}, // 0x8000 * 2^5 = 1<<20 = 1MB window (the default). + {false, 5, 0x8000}, // DefaultReceiveBufferSize is 1MB >> 5. } for _, tc := range testCases { timeStampDisabledAccept(t, tc.cookieEnabled, tc.wndScale, tc.wndSize) diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go index 2cfeed224..630dd7925 100644 --- a/pkg/tcpip/transport/tcp/testing/context/context.go +++ b/pkg/tcpip/transport/tcp/testing/context/context.go @@ -72,12 +72,6 @@ const ( testInitialSequenceNumber = 789 ) -// defaultWindowScale value specified here depends on the tcp.DefaultBufferSize -// constant defined in the tcp/endpoint.go because the tcp.DefaultBufferSize is -// used in tcp.newHandshake to determine the window scale to use when sending a -// SYN/SYN-ACK. -var defaultWindowScale = tcp.FindWndScale(tcp.DefaultBufferSize) - // Headers is used to represent the TCP header fields when building a // new packet. type Headers struct { @@ -134,6 +128,10 @@ type Context struct { // TimeStampEnabled is true if ep is connected with the timestamp option // enabled. TimeStampEnabled bool + + // WindowScale is the expected window scale in SYN packets sent by + // the stack. + WindowScale uint8 } // New allocates and initializes a test context containing a new @@ -142,11 +140,11 @@ func New(t *testing.T, mtu uint32) *Context { s := stack.New([]string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{tcp.ProtocolName}, stack.Options{}) // Allow minimum send/receive buffer sizes to be 1 during tests. - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultBufferSize, tcp.DefaultBufferSize * 10}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultSendBufferSize, 10 * tcp.DefaultSendBufferSize}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } - if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultBufferSize, tcp.DefaultBufferSize * 10}); err != nil { + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultReceiveBufferSize, 10 * tcp.DefaultReceiveBufferSize}); err != nil { t.Fatalf("SetTransportProtocolOption failed: %v", err) } @@ -184,9 +182,10 @@ func New(t *testing.T, mtu uint32) *Context { }) return &Context{ - t: t, - s: s, - linkEP: linkEP, + t: t, + s: s, + linkEP: linkEP, + WindowScale: uint8(tcp.FindWndScale(tcp.DefaultReceiveBufferSize)), } } @@ -672,6 +671,21 @@ func (r *RawEndpoint) VerifyACKWithTS(tsVal uint32) { r.RecentTS = opts.TSVal } +// VerifyACKRcvWnd verifies that the window advertised by the incoming ACK +// matches the provided rcvWnd. +func (r *RawEndpoint) VerifyACKRcvWnd(rcvWnd uint16) { + ackPacket := r.C.GetPacket() + checker.IPv4(r.C.t, ackPacket, + checker.TCP( + checker.DstPort(r.SrcPort), + checker.TCPFlags(header.TCPFlagAck), + checker.SeqNum(uint32(r.AckNum)), + checker.AckNum(uint32(r.NextSeqNum)), + checker.Window(rcvWnd), + ), + ) +} + // VerifyACKNoSACK verifies that the ACK does not contain a SACK block. func (r *RawEndpoint) VerifyACKNoSACK() { r.VerifyACKHasSACK(nil) @@ -732,7 +746,7 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) * checker.TCPSynOptions(header.TCPSynOptions{ MSS: mss, TS: true, - WS: defaultWindowScale, + WS: int(c.WindowScale), SACKPermitted: c.SACKEnabled(), }), ), @@ -747,6 +761,9 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) * // Build options w/ tsVal to be sent in the SYN-ACK. synAckOptions := make([]byte, header.TCPOptionsMaximumSize) offset := 0 + if wantOptions.WS != -1 { + offset += header.EncodeWSOption(wantOptions.WS, synAckOptions[offset:]) + } if wantOptions.TS { offset += header.EncodeTSOption(wantOptions.TSVal, synOptions.TSVal, synAckOptions[offset:]) } diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index fbb1c32e7..99fdfb795 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -169,6 +169,9 @@ func (e *endpoint) Close() { e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) } +// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. +func (e *endpoint) ModerateRecvBuf(copied int) {} + // Read reads data from the endpoint. This method does not block if // there is no data pending. func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { -- cgit v1.2.3