summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/sentry/socket/epsocket/epsocket.go1
-rw-r--r--pkg/tcpip/stack/stack.go43
-rw-r--r--pkg/tcpip/stack/transport_test.go3
-rw-r--r--pkg/tcpip/tcpip.go11
-rw-r--r--pkg/tcpip/transport/icmp/endpoint.go3
-rw-r--r--pkg/tcpip/transport/raw/endpoint.go3
-rw-r--r--pkg/tcpip/transport/tcp/accept.go14
-rw-r--r--pkg/tcpip/transport/tcp/connect.go48
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go178
-rw-r--r--pkg/tcpip/transport/tcp/endpoint_state.go20
-rw-r--r--pkg/tcpip/transport/tcp/forwarder.go2
-rw-r--r--pkg/tcpip/transport/tcp/protocol.go36
-rw-r--r--pkg/tcpip/transport/tcp/rcv.go78
-rw-r--r--pkg/tcpip/transport/tcp/sack.go12
-rw-r--r--pkg/tcpip/transport/tcp/snd.go14
-rw-r--r--pkg/tcpip/transport/tcp/tcp_test.go308
-rw-r--r--pkg/tcpip/transport/tcp/tcp_timestamp_test.go4
-rw-r--r--pkg/tcpip/transport/tcp/testing/context/context.go41
-rw-r--r--pkg/tcpip/transport/udp/endpoint.go3
19 files changed, 740 insertions, 82 deletions
diff --git a/pkg/sentry/socket/epsocket/epsocket.go b/pkg/sentry/socket/epsocket/epsocket.go
index ce3b247d0..a42cf4caf 100644
--- a/pkg/sentry/socket/epsocket/epsocket.go
+++ b/pkg/sentry/socket/epsocket/epsocket.go
@@ -1719,6 +1719,7 @@ func (s *SocketOperations) coalescingRead(ctx context.Context, dst usermem.IOSeq
// If we managed to copy something, we must deliver it.
if copied > 0 {
+ s.Endpoint.ModerateRecvBuf(copied)
return copied, nil
}
diff --git a/pkg/tcpip/stack/stack.go b/pkg/tcpip/stack/stack.go
index 8ecc51a58..2d7f56ca9 100644
--- a/pkg/tcpip/stack/stack.go
+++ b/pkg/tcpip/stack/stack.go
@@ -225,6 +225,45 @@ type TCPSACKInfo struct {
MaxSACKED seqnum.Value
}
+// RcvBufAutoTuneParams holds state related to TCP receive buffer auto-tuning.
+type RcvBufAutoTuneParams struct {
+ // MeasureTime is the time at which the current measurement
+ // was started.
+ MeasureTime time.Time
+
+ // CopiedBytes is the number of bytes copied to user space since
+ // this measure began.
+ CopiedBytes int
+
+ // PrevCopiedBytes is the number of bytes copied to user space in
+ // the previous RTT period.
+ PrevCopiedBytes int
+
+ // RcvBufSize is the auto tuned receive buffer size.
+ RcvBufSize int
+
+ // RTT is the smoothed RTT as measured by observing the time between
+ // when a byte is first acknowledged and the receipt of data that is at
+ // least one window beyond the sequence number that was acknowledged.
+ RTT time.Duration
+
+ // RTTVar is the "round-trip time variation" as defined in section 2
+ // of RFC6298.
+ RTTVar time.Duration
+
+ // RTTMeasureSeqNumber is the highest acceptable sequence number at the
+ // time this RTT measurement period began.
+ RTTMeasureSeqNumber seqnum.Value
+
+ // RTTMeasureTime is the absolute time at which the current RTT
+ // measurement period began.
+ RTTMeasureTime time.Time
+
+ // Disabled is true if an explicit receive buffer is set for the
+ // endpoint.
+ Disabled bool
+}
+
// TCPEndpointState is a copy of the internal state of a TCP endpoint.
type TCPEndpointState struct {
// ID is a copy of the TransportEndpointID for the endpoint.
@@ -240,6 +279,10 @@ type TCPEndpointState struct {
// buffer for the endpoint.
RcvBufUsed int
+ // RcvBufAutoTuneParams is used to hold state variables to compute
+ // the auto tuned receive buffer size.
+ RcvAutoParams RcvBufAutoTuneParams
+
// RcvClosed if true, indicates the endpoint has been closed for reading.
RcvClosed bool
diff --git a/pkg/tcpip/stack/transport_test.go b/pkg/tcpip/stack/transport_test.go
index 10fc8f195..788ffcc8c 100644
--- a/pkg/tcpip/stack/transport_test.go
+++ b/pkg/tcpip/stack/transport_test.go
@@ -192,6 +192,9 @@ func (f *fakeTransportEndpoint) State() uint32 {
return 0
}
+func (f *fakeTransportEndpoint) ModerateRecvBuf(copied int) {
+}
+
type fakeTransportGoodOption bool
type fakeTransportBadOption bool
diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go
index 2f931a5a3..4aafb51ab 100644
--- a/pkg/tcpip/tcpip.go
+++ b/pkg/tcpip/tcpip.go
@@ -381,6 +381,13 @@ type Endpoint interface {
// State returns a socket's lifecycle state. The returned value is
// protocol-specific and is primarily used for diagnostics.
State() uint32
+
+ // ModerateRecvBuf should be called everytime data is copied to the user
+ // space. This allows for dynamic tuning of recv buffer space for a
+ // given socket.
+ //
+ // NOTE: This method is a no-op for sockets other than TCP.
+ ModerateRecvBuf(copied int)
}
// WriteOptions contains options for Endpoint.Write.
@@ -480,6 +487,10 @@ type CongestionControlOption string
// control algorithms.
type AvailableCongestionControlOption string
+// ModerateReceiveBufferOption allows the caller to enable/disable TCP receive
+// buffer moderation.
+type ModerateReceiveBufferOption bool
+
// MulticastTTLOption is used by SetSockOpt/GetSockOpt to control the default
// TTL value for multicast messages. The default is 1.
type MulticastTTLOption uint8
diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go
index 0a74429b8..33cefd937 100644
--- a/pkg/tcpip/transport/icmp/endpoint.go
+++ b/pkg/tcpip/transport/icmp/endpoint.go
@@ -127,6 +127,9 @@ func (e *endpoint) Close() {
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
}
+// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
+func (e *endpoint) ModerateRecvBuf(copied int) {}
+
// Read reads data from the endpoint. This method does not block if
// there is no data pending.
func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) {
diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go
index 5ea59d7ea..03f495e48 100644
--- a/pkg/tcpip/transport/raw/endpoint.go
+++ b/pkg/tcpip/transport/raw/endpoint.go
@@ -147,6 +147,9 @@ func (ep *endpoint) Close() {
ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
}
+// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
+func (ep *endpoint) ModerateRecvBuf(copied int) {}
+
// Read implements tcpip.Endpoint.Read.
func (ep *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) {
ep.rcvMu.Lock()
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index 9b1ad6a28..52fd1bfa3 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -213,6 +213,7 @@ func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, i
n.route = s.route.Clone()
n.effectiveNetProtos = []tcpip.NetworkProtocolNumber{s.route.NetProto}
n.rcvBufSize = int(l.rcvWnd)
+ n.amss = mssForRoute(&n.route)
n.maybeEnableTimestamp(rcvdSynOpts)
n.maybeEnableSACKPermitted(rcvdSynOpts)
@@ -232,7 +233,11 @@ func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, i
// The receiver at least temporarily has a zero receive window scale,
// but the caller may change it (before starting the protocol loop).
n.snd = newSender(n, iss, irs, s.window, rcvdSynOpts.MSS, rcvdSynOpts.WS)
- n.rcv = newReceiver(n, irs, l.rcvWnd, 0)
+ n.rcv = newReceiver(n, irs, seqnum.Size(n.initialReceiveWindow()), 0, seqnum.Size(n.receiveBufferSize()))
+ // Bootstrap the auto tuning algorithm. Starting at zero will result in
+ // a large step function on the first window adjustment causing the
+ // window to grow to a really large value.
+ n.rcvAutoParams.prevCopied = n.initialReceiveWindow()
return n, nil
}
@@ -249,7 +254,7 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head
}
// Perform the 3-way handshake.
- h := newHandshake(ep, l.rcvWnd)
+ h := newHandshake(ep, seqnum.Size(ep.initialReceiveWindow()))
h.resetToSynRcvd(cookie, irs, opts)
if err := h.execute(); err != nil {
@@ -359,16 +364,19 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
return
}
cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS))
- // Send SYN with window scaling because we currently
+
+ // Send SYN without window scaling because we currently
// dont't encode this information in the cookie.
//
// Enable Timestamp option if the original syn did have
// the timestamp option specified.
+ mss := mssForRoute(&s.route)
synOpts := header.TCPSynOptions{
WS: -1,
TS: opts.TS,
TSVal: tcpTimeStamp(timeStampOffset()),
TSEcr: opts.TSVal,
+ MSS: uint16(mss),
}
sendSynTCP(&s.route, s.id, header.TCPFlagSyn|header.TCPFlagAck, cookie, s.sequenceNumber+1, ctx.rcvWnd, synOpts)
e.stack.Stats().TCP.ListenOverflowSynCookieSent.Increment()
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 84e3dd26c..00d2ae524 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -78,6 +78,9 @@ type handshake struct {
// mss is the maximum segment size received from the peer.
mss uint16
+ // amss is the maximum segment size advertised by us to the peer.
+ amss uint16
+
// sndWndScale is the send window scale, as defined in RFC 1323. A
// negative value means no scaling is supported by the peer.
sndWndScale int
@@ -87,11 +90,24 @@ type handshake struct {
}
func newHandshake(ep *endpoint, rcvWnd seqnum.Size) handshake {
+ rcvWndScale := ep.rcvWndScaleForHandshake()
+
+ // Round-down the rcvWnd to a multiple of wndScale. This ensures that the
+ // window offered in SYN won't be reduced due to the loss of precision if
+ // window scaling is enabled after the handshake.
+ rcvWnd = (rcvWnd >> uint8(rcvWndScale)) << uint8(rcvWndScale)
+
+ // Ensure we can always accept at least 1 byte if the scale specified
+ // was too high for the provided rcvWnd.
+ if rcvWnd == 0 {
+ rcvWnd = 1
+ }
+
h := handshake{
ep: ep,
active: true,
rcvWnd: rcvWnd,
- rcvWndScale: FindWndScale(rcvWnd),
+ rcvWndScale: int(rcvWndScale),
}
h.resetState()
return h
@@ -224,7 +240,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error {
h.ep.state = StateSynRecv
h.ep.mu.Unlock()
synOpts := header.TCPSynOptions{
- WS: h.rcvWndScale,
+ WS: int(h.effectiveRcvWndScale()),
TS: rcvSynOpts.TS,
TSVal: h.ep.timestamp(),
TSEcr: h.ep.recentTS,
@@ -233,6 +249,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error {
// permits SACK. This is not explicitly defined in the RFC but
// this is the behaviour implemented by Linux.
SACKPermitted: rcvSynOpts.SACKPermitted,
+ MSS: h.ep.amss,
}
sendSynTCP(&s.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts)
@@ -277,6 +294,7 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error {
TSVal: h.ep.timestamp(),
TSEcr: h.ep.recentTS,
SACKPermitted: h.ep.sackPermitted,
+ MSS: h.ep.amss,
}
sendSynTCP(&s.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts)
return nil
@@ -419,12 +437,15 @@ func (h *handshake) execute() *tcpip.Error {
// Send the initial SYN segment and loop until the handshake is
// completed.
+ h.ep.amss = mssForRoute(&h.ep.route)
+
synOpts := header.TCPSynOptions{
WS: h.rcvWndScale,
TS: true,
TSVal: h.ep.timestamp(),
TSEcr: h.ep.recentTS,
SACKPermitted: bool(sackEnabled),
+ MSS: h.ep.amss,
}
// Execute is also called in a listen context so we want to make sure we
@@ -433,6 +454,11 @@ func (h *handshake) execute() *tcpip.Error {
if h.state == handshakeSynRcvd {
synOpts.TS = h.ep.sendTSOk
synOpts.SACKPermitted = h.ep.sackPermitted && bool(sackEnabled)
+ if h.sndWndScale < 0 {
+ // Disable window scaling if the peer did not send us
+ // the window scaling option.
+ synOpts.WS = -1
+ }
}
sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts)
for h.state != handshakeCompleted {
@@ -554,13 +580,6 @@ func makeSynOptions(opts header.TCPSynOptions) []byte {
}
func sendSynTCP(r *stack.Route, id stack.TransportEndpointID, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts header.TCPSynOptions) *tcpip.Error {
- // The MSS in opts is automatically calculated as this function is
- // called from many places and we don't want every call point being
- // embedded with the MSS calculation.
- if opts.MSS == 0 {
- opts.MSS = uint16(r.MTU() - header.TCPMinimumSize)
- }
-
options := makeSynOptions(opts)
err := sendTCP(r, id, buffer.VectorisedView{}, r.DefaultTTL(), flags, seq, ack, rcvWnd, options, nil)
putOptions(options)
@@ -861,7 +880,8 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
// This is an active connection, so we must initiate the 3-way
// handshake, and then inform potential waiters about its
// completion.
- h := newHandshake(e, seqnum.Size(e.receiveBufferAvailable()))
+ initialRcvWnd := e.initialReceiveWindow()
+ h := newHandshake(e, seqnum.Size(initialRcvWnd))
e.mu.Lock()
h.ep.state = StateSynSent
e.mu.Unlock()
@@ -886,8 +906,14 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
// (indicated by a negative send window scale).
e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale)
+ rcvBufSize := seqnum.Size(e.receiveBufferSize())
e.rcvListMu.Lock()
- e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale())
+ e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale(), rcvBufSize)
+ // boot strap the auto tuning algorithm. Starting at zero will
+ // result in a large step function on the first proper causing
+ // the window to just go to a really large value after the first
+ // RTT itself.
+ e.rcvAutoParams.prevCopied = initialRcvWnd
e.rcvListMu.Unlock()
}
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 9614b2958..1aa1f12b4 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -132,6 +132,42 @@ type SACKInfo struct {
NumBlocks int
}
+// rcvBufAutoTuneParams are used to hold state variables to compute
+// the auto tuned recv buffer size.
+//
+// +stateify savable
+type rcvBufAutoTuneParams struct {
+ // measureTime is the time at which the current measurement
+ // was started.
+ measureTime time.Time `state:".(unixTime)"`
+
+ // copied is the number of bytes copied out of the receive
+ // buffers since this measure began.
+ copied int
+
+ // prevCopied is the number of bytes copied out of the receive
+ // buffers in the previous RTT period.
+ prevCopied int
+
+ // rtt is the non-smoothed minimum RTT as measured by observing the time
+ // between when a byte is first acknowledged and the receipt of data
+ // that is at least one window beyond the sequence number that was
+ // acknowledged.
+ rtt time.Duration
+
+ // rttMeasureSeqNumber is the highest acceptable sequence number at the
+ // time this RTT measurement period began.
+ rttMeasureSeqNumber seqnum.Value
+
+ // rttMeasureTime is the absolute time at which the current rtt
+ // measurement period began.
+ rttMeasureTime time.Time `state:".(unixTime)"`
+
+ // disabled is true if an explicit receive buffer is set for the
+ // endpoint.
+ disabled bool
+}
+
// endpoint represents a TCP endpoint. This struct serves as the interface
// between users of the endpoint and the protocol implementation; it is legal to
// have concurrent goroutines make calls into the endpoint, they are properly
@@ -165,11 +201,12 @@ type endpoint struct {
// to indicate to users that no more data is coming.
//
// rcvListMu can be taken after the endpoint mu below.
- rcvListMu sync.Mutex `state:"nosave"`
- rcvList segmentList `state:"wait"`
- rcvClosed bool
- rcvBufSize int
- rcvBufUsed int
+ rcvListMu sync.Mutex `state:"nosave"`
+ rcvList segmentList `state:"wait"`
+ rcvClosed bool
+ rcvBufSize int
+ rcvBufUsed int
+ rcvAutoParams rcvBufAutoTuneParams
// The following fields are protected by the mutex.
mu sync.RWMutex `state:"nosave"`
@@ -339,6 +376,9 @@ type endpoint struct {
bindAddress tcpip.Address
connectingAddress tcpip.Address
+ // amss is the advertised MSS to the peer by this endpoint.
+ amss uint16
+
gso *stack.GSO
}
@@ -373,8 +413,8 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite
netProto: netProto,
waiterQueue: waiterQueue,
state: StateInitial,
- rcvBufSize: DefaultBufferSize,
- sndBufSize: DefaultBufferSize,
+ rcvBufSize: DefaultReceiveBufferSize,
+ sndBufSize: DefaultSendBufferSize,
sndMTU: int(math.MaxInt32),
reuseAddr: true,
keepalive: keepalive{
@@ -400,6 +440,11 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite
e.cc = cs
}
+ var mrb tcpip.ModerateReceiveBufferOption
+ if err := stack.TransportProtocolOption(ProtocolNumber, &mrb); err == nil {
+ e.rcvAutoParams.disabled = !bool(mrb)
+ }
+
if p := stack.GetTCPProbe(); p != nil {
e.probe = p
}
@@ -408,6 +453,7 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite
e.workMu.Init()
e.workMu.Lock()
e.tsOffset = timeStampOffset()
+
return e
}
@@ -551,6 +597,83 @@ func (e *endpoint) cleanupLocked() {
tcpip.DeleteDanglingEndpoint(e)
}
+// initialReceiveWindow returns the initial receive window to advertise in the
+// SYN/SYN-ACK.
+func (e *endpoint) initialReceiveWindow() int {
+ rcvWnd := e.receiveBufferAvailable()
+ if rcvWnd > math.MaxUint16 {
+ rcvWnd = math.MaxUint16
+ }
+ routeWnd := InitialCwnd * int(mssForRoute(&e.route)) * 2
+ if rcvWnd > routeWnd {
+ rcvWnd = routeWnd
+ }
+ return rcvWnd
+}
+
+// ModerateRecvBuf adjusts the receive buffer and the advertised window
+// based on the number of bytes copied to user space.
+func (e *endpoint) ModerateRecvBuf(copied int) {
+ e.rcvListMu.Lock()
+ if e.rcvAutoParams.disabled {
+ e.rcvListMu.Unlock()
+ return
+ }
+ now := time.Now()
+ if rtt := e.rcvAutoParams.rtt; rtt == 0 || now.Sub(e.rcvAutoParams.measureTime) < rtt {
+ e.rcvAutoParams.copied += copied
+ e.rcvListMu.Unlock()
+ return
+ }
+ prevRTTCopied := e.rcvAutoParams.copied + copied
+ prevCopied := e.rcvAutoParams.prevCopied
+ rcvWnd := 0
+ if prevRTTCopied > prevCopied {
+ // The minimal receive window based on what was copied by the app
+ // in the immediate preceding RTT and some extra buffer for 16
+ // segments to account for variations.
+ // We multiply by 2 to account for packet losses.
+ rcvWnd = prevRTTCopied*2 + 16*int(e.amss)
+
+ // Scale for slow start based on bytes copied in this RTT vs previous.
+ grow := (rcvWnd * (prevRTTCopied - prevCopied)) / prevCopied
+
+ // Multiply growth factor by 2 again to account for sender being
+ // in slow-start where the sender grows it's congestion window
+ // by 100% per RTT.
+ rcvWnd += grow * 2
+
+ // Make sure auto tuned buffer size can always receive upto 2x
+ // the initial window of 10 segments.
+ if minRcvWnd := int(e.amss) * InitialCwnd * 2; rcvWnd < minRcvWnd {
+ rcvWnd = minRcvWnd
+ }
+
+ // Cap the auto tuned buffer size by the maximum permissible
+ // receive buffer size.
+ if max := e.maxReceiveBufferSize(); rcvWnd > max {
+ rcvWnd = max
+ }
+
+ // We do not adjust downwards as that can cause the receiver to
+ // reject valid data that might already be in flight as the
+ // acceptable window will shrink.
+ if rcvWnd > e.rcvBufSize {
+ e.rcvBufSize = rcvWnd
+ e.notifyProtocolGoroutine(notifyReceiveWindowChanged)
+ }
+
+ // We only update prevCopied when we grow the buffer because in cases
+ // where prevCopied > prevRTTCopied the existing buffer is already big
+ // enough to handle the current rate and we don't need to do any
+ // adjustments.
+ e.rcvAutoParams.prevCopied = prevRTTCopied
+ }
+ e.rcvAutoParams.measureTime = now
+ e.rcvAutoParams.copied = 0
+ e.rcvListMu.Unlock()
+}
+
// Read reads data from the endpoint.
func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) {
e.mu.RLock()
@@ -821,6 +944,7 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
wasZero := e.zeroReceiveWindow(scale)
e.rcvBufSize = size
+ e.rcvAutoParams.disabled = true
if wasZero && !e.zeroReceiveWindow(scale) {
mask |= notifyNonZeroReceiveWindow
}
@@ -1657,6 +1781,33 @@ func (e *endpoint) receiveBufferSize() int {
return size
}
+func (e *endpoint) maxReceiveBufferSize() int {
+ var rs ReceiveBufferSizeOption
+ if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err != nil {
+ // As a fallback return the hardcoded max buffer size.
+ return MaxBufferSize
+ }
+ return rs.Max
+}
+
+// rcvWndScaleForHandshake computes the receive window scale to offer to the
+// peer when window scaling is enabled (true by default). If auto-tuning is
+// disabled then the window scaling factor is based on the size of the
+// receiveBuffer otherwise we use the max permissible receive buffer size to
+// compute the scale.
+func (e *endpoint) rcvWndScaleForHandshake() int {
+ bufSizeForScale := e.receiveBufferSize()
+
+ e.rcvListMu.Lock()
+ autoTuningDisabled := e.rcvAutoParams.disabled
+ e.rcvListMu.Unlock()
+ if autoTuningDisabled {
+ return FindWndScale(seqnum.Size(bufSizeForScale))
+ }
+
+ return FindWndScale(seqnum.Size(e.maxReceiveBufferSize()))
+}
+
// updateRecentTimestamp updates the recent timestamp using the algorithm
// described in https://tools.ietf.org/html/rfc7323#section-4.3
func (e *endpoint) updateRecentTimestamp(tsVal uint32, maxSentAck seqnum.Value, segSeq seqnum.Value) {
@@ -1749,6 +1900,13 @@ func (e *endpoint) completeState() stack.TCPEndpointState {
s.RcvBufSize = e.rcvBufSize
s.RcvBufUsed = e.rcvBufUsed
s.RcvClosed = e.rcvClosed
+ s.RcvAutoParams.MeasureTime = e.rcvAutoParams.measureTime
+ s.RcvAutoParams.CopiedBytes = e.rcvAutoParams.copied
+ s.RcvAutoParams.PrevCopiedBytes = e.rcvAutoParams.prevCopied
+ s.RcvAutoParams.RTT = e.rcvAutoParams.rtt
+ s.RcvAutoParams.RTTMeasureSeqNumber = e.rcvAutoParams.rttMeasureSeqNumber
+ s.RcvAutoParams.RTTMeasureTime = e.rcvAutoParams.rttMeasureTime
+ s.RcvAutoParams.Disabled = e.rcvAutoParams.disabled
e.rcvListMu.Unlock()
// Endpoint TCP Option state.
@@ -1802,13 +1960,13 @@ func (e *endpoint) completeState() stack.TCPEndpointState {
RTTMeasureTime: e.snd.rttMeasureTime,
Closed: e.snd.closed,
RTO: e.snd.rto,
- SRTTInited: e.snd.srttInited,
MaxPayloadSize: e.snd.maxPayloadSize,
SndWndScale: e.snd.sndWndScale,
MaxSentAck: e.snd.maxSentAck,
}
e.snd.rtt.Lock()
s.Sender.SRTT = e.snd.rtt.srtt
+ s.Sender.SRTTInited = e.snd.rtt.srttInited
e.snd.rtt.Unlock()
if cubic, ok := e.snd.cc.(*cubicState); ok {
@@ -1856,3 +2014,7 @@ func (e *endpoint) State() uint32 {
defer e.mu.Unlock()
return uint32(e.state)
}
+
+func mssForRoute(r *stack.Route) uint16 {
+ return uint16(r.MTU() - header.TCPMinimumSize)
+}
diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go
index 58be61927..ec61a3886 100644
--- a/pkg/tcpip/transport/tcp/endpoint_state.go
+++ b/pkg/tcpip/transport/tcp/endpoint_state.go
@@ -360,3 +360,23 @@ func loadError(s string) *tcpip.Error {
return e
}
+
+// saveMeasureTime is invoked by stateify.
+func (r *rcvBufAutoTuneParams) saveMeasureTime() unixTime {
+ return unixTime{r.measureTime.Unix(), r.measureTime.UnixNano()}
+}
+
+// loadMeasureTime is invoked by stateify.
+func (r *rcvBufAutoTuneParams) loadMeasureTime(unix unixTime) {
+ r.measureTime = time.Unix(unix.second, unix.nano)
+}
+
+// saveRttMeasureTime is invoked by stateify.
+func (r *rcvBufAutoTuneParams) saveRttMeasureTime() unixTime {
+ return unixTime{r.rttMeasureTime.Unix(), r.rttMeasureTime.UnixNano()}
+}
+
+// loadRttMeasureTime is invoked by stateify.
+func (r *rcvBufAutoTuneParams) loadRttMeasureTime(unix unixTime) {
+ r.rttMeasureTime = time.Unix(unix.second, unix.nano)
+}
diff --git a/pkg/tcpip/transport/tcp/forwarder.go b/pkg/tcpip/transport/tcp/forwarder.go
index 2ce94aeb9..63666f0b3 100644
--- a/pkg/tcpip/transport/tcp/forwarder.go
+++ b/pkg/tcpip/transport/tcp/forwarder.go
@@ -47,7 +47,7 @@ type Forwarder struct {
// If rcvWnd is set to zero, the default buffer size is used instead.
func NewForwarder(s *stack.Stack, rcvWnd, maxInFlight int, handler func(*ForwarderRequest)) *Forwarder {
if rcvWnd == 0 {
- rcvWnd = DefaultBufferSize
+ rcvWnd = DefaultReceiveBufferSize
}
return &Forwarder{
maxInFlight: maxInFlight,
diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go
index 919e4ce24..ee04dcfcc 100644
--- a/pkg/tcpip/transport/tcp/protocol.go
+++ b/pkg/tcpip/transport/tcp/protocol.go
@@ -41,13 +41,18 @@ const (
ProtocolNumber = header.TCPProtocolNumber
// MinBufferSize is the smallest size of a receive or send buffer.
- minBufferSize = 4 << 10 // 4096 bytes.
+ MinBufferSize = 4 << 10 // 4096 bytes.
- // DefaultBufferSize is the default size of the receive and send buffers.
- DefaultBufferSize = 1 << 20 // 1MB
+ // DefaultSendBufferSize is the default size of the send buffer for
+ // an endpoint.
+ DefaultSendBufferSize = 1 << 20 // 1MB
- // MaxBufferSize is the largest size a receive and send buffer can grow to.
- maxBufferSize = 4 << 20 // 4MB
+ // DefaultReceiveBufferSize is the default size of the receive buffer
+ // for an endpoint.
+ DefaultReceiveBufferSize = 1 << 20 // 1MB
+
+ // MaxBufferSize is the largest size a receive/send buffer can grow to.
+ MaxBufferSize = 4 << 20 // 4MB
// MaxUnprocessedSegments is the maximum number of unprocessed segments
// that can be queued for a given endpoint.
@@ -86,6 +91,7 @@ type protocol struct {
recvBufferSize ReceiveBufferSizeOption
congestionControl string
availableCongestionControl []string
+ moderateReceiveBuffer bool
}
// Number returns the tcp protocol number.
@@ -192,6 +198,13 @@ func (p *protocol) SetOption(option interface{}) *tcpip.Error {
// linux returns ENOENT when an invalid congestion control
// is specified.
return tcpip.ErrNoSuchFile
+
+ case tcpip.ModerateReceiveBufferOption:
+ p.mu.Lock()
+ p.moderateReceiveBuffer = bool(v)
+ p.mu.Unlock()
+ return nil
+
default:
return tcpip.ErrUnknownProtocolOption
}
@@ -217,16 +230,25 @@ func (p *protocol) Option(option interface{}) *tcpip.Error {
*v = p.recvBufferSize
p.mu.Unlock()
return nil
+
case *tcpip.CongestionControlOption:
p.mu.Lock()
*v = tcpip.CongestionControlOption(p.congestionControl)
p.mu.Unlock()
return nil
+
case *tcpip.AvailableCongestionControlOption:
p.mu.Lock()
*v = tcpip.AvailableCongestionControlOption(strings.Join(p.availableCongestionControl, " "))
p.mu.Unlock()
return nil
+
+ case *tcpip.ModerateReceiveBufferOption:
+ p.mu.Lock()
+ *v = tcpip.ModerateReceiveBufferOption(p.moderateReceiveBuffer)
+ p.mu.Unlock()
+ return nil
+
default:
return tcpip.ErrUnknownProtocolOption
}
@@ -235,8 +257,8 @@ func (p *protocol) Option(option interface{}) *tcpip.Error {
func init() {
stack.RegisterTransportProtocolFactory(ProtocolName, func() stack.TransportProtocol {
return &protocol{
- sendBufferSize: SendBufferSizeOption{minBufferSize, DefaultBufferSize, maxBufferSize},
- recvBufferSize: ReceiveBufferSizeOption{minBufferSize, DefaultBufferSize, maxBufferSize},
+ sendBufferSize: SendBufferSizeOption{MinBufferSize, DefaultSendBufferSize, MaxBufferSize},
+ recvBufferSize: ReceiveBufferSizeOption{MinBufferSize, DefaultReceiveBufferSize, MaxBufferSize},
congestionControl: ccReno,
availableCongestionControl: []string{ccReno, ccCubic},
}
diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go
index 8d9de9bf9..e90f9a7d9 100644
--- a/pkg/tcpip/transport/tcp/rcv.go
+++ b/pkg/tcpip/transport/tcp/rcv.go
@@ -16,6 +16,7 @@ package tcp
import (
"container/heap"
+ "time"
"gvisor.dev/gvisor/pkg/tcpip/header"
"gvisor.dev/gvisor/pkg/tcpip/seqnum"
@@ -38,6 +39,9 @@ type receiver struct {
// shrinking it.
rcvAcc seqnum.Value
+ // rcvWnd is the non-scaled receive window last advertised to the peer.
+ rcvWnd seqnum.Size
+
rcvWndScale uint8
closed bool
@@ -47,13 +51,14 @@ type receiver struct {
pendingBufSize seqnum.Size
}
-func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8) *receiver {
+func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8, pendingBufSize seqnum.Size) *receiver {
return &receiver{
ep: ep,
rcvNxt: irs + 1,
rcvAcc: irs.Add(rcvWnd + 1),
+ rcvWnd: rcvWnd,
rcvWndScale: rcvWndScale,
- pendingBufSize: rcvWnd,
+ pendingBufSize: pendingBufSize,
}
}
@@ -72,14 +77,16 @@ func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool {
// getSendParams returns the parameters needed by the sender when building
// segments to send.
func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) {
- // Calculate the window size based on the current buffer size.
- n := r.ep.receiveBufferAvailable()
- acc := r.rcvNxt.Add(seqnum.Size(n))
+ // Calculate the window size based on the available buffer space.
+ receiveBufferAvailable := r.ep.receiveBufferAvailable()
+ acc := r.rcvNxt.Add(seqnum.Size(receiveBufferAvailable))
if r.rcvAcc.LessThan(acc) {
r.rcvAcc = acc
}
-
- return r.rcvNxt, r.rcvNxt.Size(r.rcvAcc) >> r.rcvWndScale
+ // Stash away the non-scaled receive window as we use it for measuring
+ // receiver's estimated RTT.
+ r.rcvWnd = r.rcvNxt.Size(r.rcvAcc)
+ return r.rcvNxt, r.rcvWnd >> r.rcvWndScale
}
// nonZeroWindow is called when the receive window grows from zero to nonzero;
@@ -130,6 +137,21 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum
// Update the segment that we're expecting to consume.
r.rcvNxt = segSeq.Add(segLen)
+ // In cases of a misbehaving sender which could send more than the
+ // advertised window, we could end up in a situation where we get a
+ // segment that exceeds the window advertised. Instead of partially
+ // accepting the segment and discarding bytes beyond the advertised
+ // window, we accept the whole segment and make sure r.rcvAcc is moved
+ // forward to match r.rcvNxt to indicate that the window is now closed.
+ //
+ // In absence of this check the r.acceptable() check fails and accepts
+ // segments that should be dropped because rcvWnd is calculated as
+ // the size of the interval (rcvNxt, rcvAcc] which becomes extremely
+ // large if rcvAcc is ever less than rcvNxt.
+ if r.rcvAcc.LessThan(r.rcvNxt) {
+ r.rcvAcc = r.rcvNxt
+ }
+
// Trim SACK Blocks to remove any SACK information that covers
// sequence numbers that have been consumed.
TrimSACKBlockList(&r.ep.sack, r.rcvNxt)
@@ -198,6 +220,39 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum
return true
}
+// updateRTT updates the receiver RTT measurement based on the sequence number
+// of the received segment.
+func (r *receiver) updateRTT() {
+ // From: https://public.lanl.gov/radiant/pubs/drs/sc2001-poster.pdf
+ //
+ // A system that is only transmitting acknowledgements can still
+ // estimate the round-trip time by observing the time between when a byte
+ // is first acknowledged and the receipt of data that is at least one
+ // window beyond the sequence number that was acknowledged.
+ r.ep.rcvListMu.Lock()
+ if r.ep.rcvAutoParams.rttMeasureTime.IsZero() {
+ // New measurement.
+ r.ep.rcvAutoParams.rttMeasureTime = time.Now()
+ r.ep.rcvAutoParams.rttMeasureSeqNumber = r.rcvNxt.Add(r.rcvWnd)
+ r.ep.rcvListMu.Unlock()
+ return
+ }
+ if r.rcvNxt.LessThan(r.ep.rcvAutoParams.rttMeasureSeqNumber) {
+ r.ep.rcvListMu.Unlock()
+ return
+ }
+ rtt := time.Since(r.ep.rcvAutoParams.rttMeasureTime)
+ // We only store the minimum observed RTT here as this is only used in
+ // absence of a SRTT available from either timestamps or a sender
+ // measurement of RTT.
+ if r.ep.rcvAutoParams.rtt == 0 || rtt < r.ep.rcvAutoParams.rtt {
+ r.ep.rcvAutoParams.rtt = rtt
+ }
+ r.ep.rcvAutoParams.rttMeasureTime = time.Now()
+ r.ep.rcvAutoParams.rttMeasureSeqNumber = r.rcvNxt.Add(r.rcvWnd)
+ r.ep.rcvListMu.Unlock()
+}
+
// handleRcvdSegment handles TCP segments directed at the connection managed by
// r as they arrive. It is called by the protocol main loop.
func (r *receiver) handleRcvdSegment(s *segment) {
@@ -226,10 +281,9 @@ func (r *receiver) handleRcvdSegment(s *segment) {
r.pendingBufUsed += s.logicalLen()
s.incRef()
heap.Push(&r.pendingRcvdSegments, s)
+ UpdateSACKBlocks(&r.ep.sack, segSeq, segSeq.Add(segLen), r.rcvNxt)
}
- UpdateSACKBlocks(&r.ep.sack, segSeq, segSeq.Add(segLen), r.rcvNxt)
-
// Immediately send an ack so that the peer knows it may
// have to retransmit.
r.ep.snd.sendAck()
@@ -237,6 +291,12 @@ func (r *receiver) handleRcvdSegment(s *segment) {
return
}
+ // Since we consumed a segment update the receiver's RTT estimate
+ // if required.
+ if segLen > 0 {
+ r.updateRTT()
+ }
+
// By consuming the current segment, we may have filled a gap in the
// sequence number domain that allows pending segments to be consumed
// now. So try to do it.
diff --git a/pkg/tcpip/transport/tcp/sack.go b/pkg/tcpip/transport/tcp/sack.go
index 52c5d9867..7be86d68e 100644
--- a/pkg/tcpip/transport/tcp/sack.go
+++ b/pkg/tcpip/transport/tcp/sack.go
@@ -31,6 +31,13 @@ const (
// segment identified by segStart->segEnd.
func UpdateSACKBlocks(sack *SACKInfo, segStart seqnum.Value, segEnd seqnum.Value, rcvNxt seqnum.Value) {
newSB := header.SACKBlock{Start: segStart, End: segEnd}
+
+ // Ignore any invalid SACK blocks or blocks that are before rcvNxt as
+ // those bytes have already been acked.
+ if newSB.End.LessThanEq(newSB.Start) || newSB.End.LessThan(rcvNxt) {
+ return
+ }
+
if sack.NumBlocks == 0 {
sack.Blocks[0] = newSB
sack.NumBlocks = 1
@@ -39,9 +46,8 @@ func UpdateSACKBlocks(sack *SACKInfo, segStart seqnum.Value, segEnd seqnum.Value
var n = 0
for i := 0; i < sack.NumBlocks; i++ {
start, end := sack.Blocks[i].Start, sack.Blocks[i].End
- if end.LessThanEq(start) || start.LessThanEq(rcvNxt) {
- // Discard any invalid blocks where end is before start
- // and discard any sack blocks that are before rcvNxt as
+ if end.LessThanEq(rcvNxt) {
+ // Discard any sack blocks that are before rcvNxt as
// those have already been acked.
continue
}
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index 297861462..0fee7ab72 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -121,9 +121,8 @@ type sender struct {
// rtt.srtt, rtt.rttvar, and rto are the "smoothed round-trip time",
// "round-trip time variation" and "retransmit timeout", as defined in
// section 2 of RFC 6298.
- rtt rtt
- rto time.Duration
- srttInited bool
+ rtt rtt
+ rto time.Duration
// maxPayloadSize is the maximum size of the payload of a given segment.
// It is initialized on demand.
@@ -150,8 +149,9 @@ type sender struct {
type rtt struct {
sync.Mutex `state:"nosave"`
- srtt time.Duration
- rttvar time.Duration
+ srtt time.Duration
+ rttvar time.Duration
+ srttInited bool
}
// fastRecovery holds information related to fast recovery from a packet loss.
@@ -323,10 +323,10 @@ func (s *sender) sendAck() {
// available. This is done in accordance with section 2 of RFC 6298.
func (s *sender) updateRTO(rtt time.Duration) {
s.rtt.Lock()
- if !s.srttInited {
+ if !s.rtt.srttInited {
s.rtt.rttvar = rtt / 2
s.rtt.srtt = rtt
- s.srttInited = true
+ s.rtt.srttInited = true
} else {
diff := s.rtt.srtt - rtt
if diff < 0 {
diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go
index ef7ec759d..915a98047 100644
--- a/pkg/tcpip/transport/tcp/tcp_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_test.go
@@ -1110,8 +1110,9 @@ func TestNonScaledWindowAccept(t *testing.T) {
t.Fatalf("Listen failed: %v", err)
}
- // Do 3-way handshake.
- c.PassiveConnect(100, 2, header.TCPSynOptions{MSS: defaultIPv4MSS})
+ // Do 3-way handshake w/ window scaling disabled. The SYN-ACK to the SYN
+ // should not carry the window scaling option.
+ c.PassiveConnect(100, -1, header.TCPSynOptions{MSS: defaultIPv4MSS})
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
@@ -1600,7 +1601,6 @@ func TestPassiveSendMSSLessThanMTU(t *testing.T) {
// Set the buffer size to a deterministic size so that we can check the
// window scaling option.
const rcvBufferSize = 0x20000
- const wndScale = 2
if err := ep.SetSockOpt(tcpip.ReceiveBufferSizeOption(rcvBufferSize)); err != nil {
t.Fatalf("SetSockOpt failed failed: %v", err)
}
@@ -1614,7 +1614,7 @@ func TestPassiveSendMSSLessThanMTU(t *testing.T) {
}
// Do 3-way handshake.
- c.PassiveConnect(maxPayload, wndScale, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize})
+ c.PassiveConnect(maxPayload, -1, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize})
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
@@ -1713,7 +1713,7 @@ func TestForwarderSendMSSLessThanMTU(t *testing.T) {
s.SetTransportProtocolHandler(tcp.ProtocolNumber, f.HandlePacket)
// Do 3-way handshake.
- c.PassiveConnect(maxPayload, 1, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize})
+ c.PassiveConnect(maxPayload, -1, header.TCPSynOptions{MSS: mtu - header.IPv4MinimumSize - header.TCPMinimumSize})
// Wait for connection to be available.
select {
@@ -2767,11 +2767,11 @@ func TestDefaultBufferSizes(t *testing.T) {
}
}()
- checkSendBufferSize(t, ep, tcp.DefaultBufferSize)
- checkRecvBufferSize(t, ep, tcp.DefaultBufferSize)
+ checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize)
+ checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize)
// Change the default send buffer size.
- if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultBufferSize * 2, tcp.DefaultBufferSize * 20}); err != nil {
+ if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultSendBufferSize * 2, tcp.DefaultSendBufferSize * 20}); err != nil {
t.Fatalf("SetTransportProtocolOption failed: %v", err)
}
@@ -2781,11 +2781,11 @@ func TestDefaultBufferSizes(t *testing.T) {
t.Fatalf("NewEndpoint failed; %v", err)
}
- checkSendBufferSize(t, ep, tcp.DefaultBufferSize*2)
- checkRecvBufferSize(t, ep, tcp.DefaultBufferSize)
+ checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize*2)
+ checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize)
// Change the default receive buffer size.
- if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultBufferSize * 3, tcp.DefaultBufferSize * 30}); err != nil {
+ if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultReceiveBufferSize * 3, tcp.DefaultReceiveBufferSize * 30}); err != nil {
t.Fatalf("SetTransportProtocolOption failed: %v", err)
}
@@ -2795,8 +2795,8 @@ func TestDefaultBufferSizes(t *testing.T) {
t.Fatalf("NewEndpoint failed; %v", err)
}
- checkSendBufferSize(t, ep, tcp.DefaultBufferSize*2)
- checkRecvBufferSize(t, ep, tcp.DefaultBufferSize*3)
+ checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize*2)
+ checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize*3)
}
func TestMinMaxBufferSizes(t *testing.T) {
@@ -2810,11 +2810,11 @@ func TestMinMaxBufferSizes(t *testing.T) {
defer ep.Close()
// Change the min/max values for send/receive
- if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{200, tcp.DefaultBufferSize * 2, tcp.DefaultBufferSize * 20}); err != nil {
+ if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{200, tcp.DefaultReceiveBufferSize * 2, tcp.DefaultReceiveBufferSize * 20}); err != nil {
t.Fatalf("SetTransportProtocolOption failed: %v", err)
}
- if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{300, tcp.DefaultBufferSize * 3, tcp.DefaultBufferSize * 30}); err != nil {
+ if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{300, tcp.DefaultSendBufferSize * 3, tcp.DefaultSendBufferSize * 30}); err != nil {
t.Fatalf("SetTransportProtocolOption failed: %v", err)
}
@@ -2832,17 +2832,17 @@ func TestMinMaxBufferSizes(t *testing.T) {
checkSendBufferSize(t, ep, 300)
// Set values above the max.
- if err := ep.SetSockOpt(tcpip.ReceiveBufferSizeOption(1 + tcp.DefaultBufferSize*20)); err != nil {
+ if err := ep.SetSockOpt(tcpip.ReceiveBufferSizeOption(1 + tcp.DefaultReceiveBufferSize*20)); err != nil {
t.Fatalf("GetSockOpt failed: %v", err)
}
- checkRecvBufferSize(t, ep, tcp.DefaultBufferSize*20)
+ checkRecvBufferSize(t, ep, tcp.DefaultReceiveBufferSize*20)
- if err := ep.SetSockOpt(tcpip.SendBufferSizeOption(1 + tcp.DefaultBufferSize*30)); err != nil {
+ if err := ep.SetSockOpt(tcpip.SendBufferSizeOption(1 + tcp.DefaultSendBufferSize*30)); err != nil {
t.Fatalf("GetSockOpt failed: %v", err)
}
- checkSendBufferSize(t, ep, tcp.DefaultBufferSize*30)
+ checkSendBufferSize(t, ep, tcp.DefaultSendBufferSize*30)
}
func makeStack() (*stack.Stack, *tcpip.Error) {
@@ -3976,3 +3976,273 @@ func TestEndpointBindListenAcceptState(t *testing.T) {
}
}
+
+// This test verifies that the auto tuning does not grow the receive buffer if
+// the application is not reading the data actively.
+func TestReceiveBufferAutoTuningApplicationLimited(t *testing.T) {
+ const mtu = 1500
+ const mss = mtu - header.IPv4MinimumSize - header.TCPMinimumSize
+
+ c := context.New(t, mtu)
+ defer c.Cleanup()
+
+ stk := c.Stack()
+ // Set lower limits for auto-tuning tests. This is required because the
+ // test stops the worker which can cause packets to be dropped because
+ // the segment queue holding unprocessed packets is limited to 500.
+ const receiveBufferSize = 80 << 10 // 80KB.
+ const maxReceiveBufferSize = receiveBufferSize * 10
+ if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, receiveBufferSize, maxReceiveBufferSize}); err != nil {
+ t.Fatalf("SetTransportProtocolOption failed: %v", err)
+ }
+
+ // Enable auto-tuning.
+ if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcpip.ModerateReceiveBufferOption(true)); err != nil {
+ t.Fatalf("SetTransportProtocolOption failed: %v", err)
+ }
+ // Change the expected window scale to match the value needed for the
+ // maximum buffer size defined above.
+ c.WindowScale = uint8(tcp.FindWndScale(maxReceiveBufferSize))
+
+ rawEP := c.CreateConnectedWithOptions(header.TCPSynOptions{TS: true, WS: 4})
+
+ // NOTE: The timestamp values in the sent packets are meaningless to the
+ // peer so we just increment the timestamp value by 1 every batch as we
+ // are not really using them for anything. Send a single byte to verify
+ // the advertised window.
+ tsVal := rawEP.TSVal + 1
+
+ // Introduce a 25ms latency by delaying the first byte.
+ latency := 25 * time.Millisecond
+ time.Sleep(latency)
+ rawEP.SendPacketWithTS([]byte{1}, tsVal)
+
+ // Verify that the ACK has the expected window.
+ wantRcvWnd := receiveBufferSize
+ wantRcvWnd = (wantRcvWnd >> uint32(c.WindowScale))
+ rawEP.VerifyACKRcvWnd(uint16(wantRcvWnd - 1))
+ time.Sleep(25 * time.Millisecond)
+
+ // Allocate a large enough payload for the test.
+ b := make([]byte, int(receiveBufferSize)*2)
+ offset := 0
+ payloadSize := receiveBufferSize - 1
+ worker := (c.EP).(interface {
+ StopWork()
+ ResumeWork()
+ })
+ tsVal++
+
+ // Stop the worker goroutine.
+ worker.StopWork()
+ start := offset
+ end := offset + payloadSize
+ packetsSent := 0
+ for ; start < end; start += mss {
+ rawEP.SendPacketWithTS(b[start:start+mss], tsVal)
+ packetsSent++
+ }
+ // Resume the worker so that it only sees the packets once all of them
+ // are waiting to be read.
+ worker.ResumeWork()
+
+ // Since we read no bytes the window should goto zero till the
+ // application reads some of the data.
+ // Discard all intermediate acks except the last one.
+ if packetsSent > 100 {
+ for i := 0; i < (packetsSent / 100); i++ {
+ _ = c.GetPacket()
+ }
+ }
+ rawEP.VerifyACKRcvWnd(0)
+
+ time.Sleep(25 * time.Millisecond)
+ // Verify that sending more data when window is closed is dropped and
+ // not acked.
+ rawEP.SendPacketWithTS(b[start:start+mss], tsVal)
+
+ // Verify that the stack sends us back an ACK with the sequence number
+ // of the last packet sent indicating it was dropped.
+ p := c.GetPacket()
+ checker.IPv4(t, p, checker.TCP(
+ checker.AckNum(uint32(rawEP.NextSeqNum)-uint32(mss)),
+ checker.Window(0),
+ ))
+
+ // Now read all the data from the endpoint and verify that advertised
+ // window increases to the full available buffer size.
+ for {
+ _, _, err := c.EP.Read(nil)
+ if err == tcpip.ErrWouldBlock {
+ break
+ }
+ }
+
+ // Verify that we receive a non-zero window update ACK. When running
+ // under thread santizer this test can end up sending more than 1
+ // ack, 1 for the non-zero window
+ p = c.GetPacket()
+ checker.IPv4(t, p, checker.TCP(
+ checker.AckNum(uint32(rawEP.NextSeqNum)-uint32(mss)),
+ func(t *testing.T, h header.Transport) {
+ tcp, ok := h.(header.TCP)
+ if !ok {
+ return
+ }
+ if w := tcp.WindowSize(); w == 0 || w > uint16(wantRcvWnd) {
+ t.Errorf("expected a non-zero window: got %d, want <= wantRcvWnd", w, wantRcvWnd)
+ }
+ },
+ ))
+}
+
+// This test verifies that the auto tuning does not grow the receive buffer if
+// the application is not reading the data actively.
+func TestReceiveBufferAutoTuning(t *testing.T) {
+ const mtu = 1500
+ const mss = mtu - header.IPv4MinimumSize - header.TCPMinimumSize
+
+ c := context.New(t, mtu)
+ defer c.Cleanup()
+
+ // Enable Auto-tuning.
+ stk := c.Stack()
+ // Set lower limits for auto-tuning tests. This is required because the
+ // test stops the worker which can cause packets to be dropped because
+ // the segment queue holding unprocessed packets is limited to 500.
+ const receiveBufferSize = 80 << 10 // 80KB.
+ const maxReceiveBufferSize = receiveBufferSize * 10
+ if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, receiveBufferSize, maxReceiveBufferSize}); err != nil {
+ t.Fatalf("SetTransportProtocolOption failed: %v", err)
+ }
+
+ // Enable auto-tuning.
+ if err := stk.SetTransportProtocolOption(tcp.ProtocolNumber, tcpip.ModerateReceiveBufferOption(true)); err != nil {
+ t.Fatalf("SetTransportProtocolOption failed: %v", err)
+ }
+ // Change the expected window scale to match the value needed for the
+ // maximum buffer size used by stack.
+ c.WindowScale = uint8(tcp.FindWndScale(maxReceiveBufferSize))
+
+ rawEP := c.CreateConnectedWithOptions(header.TCPSynOptions{TS: true, WS: 4})
+
+ wantRcvWnd := receiveBufferSize
+ scaleRcvWnd := func(rcvWnd int) uint16 {
+ return uint16(rcvWnd >> uint16(c.WindowScale))
+ }
+ // Allocate a large array to send to the endpoint.
+ b := make([]byte, receiveBufferSize*48)
+
+ // In every iteration we will send double the number of bytes sent in
+ // the previous iteration and read the same from the app. The received
+ // window should grow by at least 2x of bytes read by the app in every
+ // RTT.
+ offset := 0
+ payloadSize := receiveBufferSize / 8
+ worker := (c.EP).(interface {
+ StopWork()
+ ResumeWork()
+ })
+ tsVal := rawEP.TSVal
+ // We are going to do our own computation of what the moderated receive
+ // buffer should be based on sent/copied data per RTT and verify that
+ // the advertised window by the stack matches our calculations.
+ prevCopied := 0
+ done := false
+ latency := 1 * time.Millisecond
+ for i := 0; !done; i++ {
+ tsVal++
+
+ // Stop the worker goroutine.
+ worker.StopWork()
+ start := offset
+ end := offset + payloadSize
+ totalSent := 0
+ packetsSent := 0
+ for ; start < end; start += mss {
+ rawEP.SendPacketWithTS(b[start:start+mss], tsVal)
+ totalSent += mss
+ packetsSent++
+ }
+ // Resume it so that it only sees the packets once all of them
+ // are waiting to be read.
+ worker.ResumeWork()
+
+ // Give 1ms for the worker to process the packets.
+ time.Sleep(1 * time.Millisecond)
+
+ // Verify that the advertised window on the ACK is reduced by
+ // the total bytes sent.
+ expectedWnd := wantRcvWnd - totalSent
+ if packetsSent > 100 {
+ for i := 0; i < (packetsSent / 100); i++ {
+ _ = c.GetPacket()
+ }
+ }
+ rawEP.VerifyACKRcvWnd(scaleRcvWnd(expectedWnd))
+
+ // Now read all the data from the endpoint and invoke the
+ // moderation API to allow for receive buffer auto-tuning
+ // to happen before we measure the new window.
+ totalCopied := 0
+ for {
+ b, _, err := c.EP.Read(nil)
+ if err == tcpip.ErrWouldBlock {
+ break
+ }
+ totalCopied += len(b)
+ }
+
+ // Invoke the moderation API. This is required for auto-tuning
+ // to happen. This method is normally expected to be invoked
+ // from a higher layer than tcpip.Endpoint. So we simulate
+ // copying to user-space by invoking it explicitly here.
+ c.EP.ModerateRecvBuf(totalCopied)
+
+ // Now send a keep-alive packet to trigger an ACK so that we can
+ // measure the new window.
+ rawEP.NextSeqNum--
+ rawEP.SendPacketWithTS(nil, tsVal)
+ rawEP.NextSeqNum++
+
+ if i == 0 {
+ // In the first iteration the receiver based RTT is not
+ // yet known as a result the moderation code should not
+ // increase the advertised window.
+ rawEP.VerifyACKRcvWnd(scaleRcvWnd(wantRcvWnd))
+ prevCopied = totalCopied
+ } else {
+ rttCopied := totalCopied
+ if i == 1 {
+ // The moderation code accumulates copied bytes till
+ // RTT is established. So add in the bytes sent in
+ // the first iteration to the total bytes for this
+ // RTT.
+ rttCopied += prevCopied
+ // Now reset it to the initial value used by the
+ // auto tuning logic.
+ prevCopied = tcp.InitialCwnd * mss * 2
+ }
+ newWnd := rttCopied<<1 + 16*mss
+ grow := (newWnd * (rttCopied - prevCopied)) / prevCopied
+ newWnd += (grow << 1)
+ if newWnd > maxReceiveBufferSize {
+ newWnd = maxReceiveBufferSize
+ done = true
+ }
+ rawEP.VerifyACKRcvWnd(scaleRcvWnd(newWnd))
+ wantRcvWnd = newWnd
+ prevCopied = rttCopied
+ // Increase the latency after first two iterations to
+ // establish a low RTT value in the receiver since it
+ // only tracks the lowest value. This ensures that when
+ // ModerateRcvBuf is called the elapsed time is always >
+ // rtt. Without this the test is flaky due to delays due
+ // to scheduling/wakeup etc.
+ latency += 50 * time.Millisecond
+ }
+ time.Sleep(latency)
+ offset += payloadSize
+ payloadSize *= 2
+ }
+}
diff --git a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go
index ad300b90b..a641e953d 100644
--- a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go
@@ -182,7 +182,7 @@ func TestTimeStampEnabledAccept(t *testing.T) {
wndSize uint16
}{
{true, -1, 0xffff}, // When cookie is used window scaling is disabled.
- {false, 5, 0x8000}, // 0x8000 * 2^5 = 1<<20 = 1MB window (the default).
+ {false, 5, 0x8000}, // DefaultReceiveBufferSize is 1MB >> 5.
}
for _, tc := range testCases {
timeStampEnabledAccept(t, tc.cookieEnabled, tc.wndScale, tc.wndSize)
@@ -239,7 +239,7 @@ func TestTimeStampDisabledAccept(t *testing.T) {
wndSize uint16
}{
{true, -1, 0xffff}, // When cookie is used window scaling is disabled.
- {false, 5, 0x8000}, // 0x8000 * 2^5 = 1<<20 = 1MB window (the default).
+ {false, 5, 0x8000}, // DefaultReceiveBufferSize is 1MB >> 5.
}
for _, tc := range testCases {
timeStampDisabledAccept(t, tc.cookieEnabled, tc.wndScale, tc.wndSize)
diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go
index 2cfeed224..630dd7925 100644
--- a/pkg/tcpip/transport/tcp/testing/context/context.go
+++ b/pkg/tcpip/transport/tcp/testing/context/context.go
@@ -72,12 +72,6 @@ const (
testInitialSequenceNumber = 789
)
-// defaultWindowScale value specified here depends on the tcp.DefaultBufferSize
-// constant defined in the tcp/endpoint.go because the tcp.DefaultBufferSize is
-// used in tcp.newHandshake to determine the window scale to use when sending a
-// SYN/SYN-ACK.
-var defaultWindowScale = tcp.FindWndScale(tcp.DefaultBufferSize)
-
// Headers is used to represent the TCP header fields when building a
// new packet.
type Headers struct {
@@ -134,6 +128,10 @@ type Context struct {
// TimeStampEnabled is true if ep is connected with the timestamp option
// enabled.
TimeStampEnabled bool
+
+ // WindowScale is the expected window scale in SYN packets sent by
+ // the stack.
+ WindowScale uint8
}
// New allocates and initializes a test context containing a new
@@ -142,11 +140,11 @@ func New(t *testing.T, mtu uint32) *Context {
s := stack.New([]string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{tcp.ProtocolName}, stack.Options{})
// Allow minimum send/receive buffer sizes to be 1 during tests.
- if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultBufferSize, tcp.DefaultBufferSize * 10}); err != nil {
+ if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultSendBufferSize, 10 * tcp.DefaultSendBufferSize}); err != nil {
t.Fatalf("SetTransportProtocolOption failed: %v", err)
}
- if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultBufferSize, tcp.DefaultBufferSize * 10}); err != nil {
+ if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.ReceiveBufferSizeOption{1, tcp.DefaultReceiveBufferSize, 10 * tcp.DefaultReceiveBufferSize}); err != nil {
t.Fatalf("SetTransportProtocolOption failed: %v", err)
}
@@ -184,9 +182,10 @@ func New(t *testing.T, mtu uint32) *Context {
})
return &Context{
- t: t,
- s: s,
- linkEP: linkEP,
+ t: t,
+ s: s,
+ linkEP: linkEP,
+ WindowScale: uint8(tcp.FindWndScale(tcp.DefaultReceiveBufferSize)),
}
}
@@ -672,6 +671,21 @@ func (r *RawEndpoint) VerifyACKWithTS(tsVal uint32) {
r.RecentTS = opts.TSVal
}
+// VerifyACKRcvWnd verifies that the window advertised by the incoming ACK
+// matches the provided rcvWnd.
+func (r *RawEndpoint) VerifyACKRcvWnd(rcvWnd uint16) {
+ ackPacket := r.C.GetPacket()
+ checker.IPv4(r.C.t, ackPacket,
+ checker.TCP(
+ checker.DstPort(r.SrcPort),
+ checker.TCPFlags(header.TCPFlagAck),
+ checker.SeqNum(uint32(r.AckNum)),
+ checker.AckNum(uint32(r.NextSeqNum)),
+ checker.Window(rcvWnd),
+ ),
+ )
+}
+
// VerifyACKNoSACK verifies that the ACK does not contain a SACK block.
func (r *RawEndpoint) VerifyACKNoSACK() {
r.VerifyACKHasSACK(nil)
@@ -732,7 +746,7 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) *
checker.TCPSynOptions(header.TCPSynOptions{
MSS: mss,
TS: true,
- WS: defaultWindowScale,
+ WS: int(c.WindowScale),
SACKPermitted: c.SACKEnabled(),
}),
),
@@ -747,6 +761,9 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) *
// Build options w/ tsVal to be sent in the SYN-ACK.
synAckOptions := make([]byte, header.TCPOptionsMaximumSize)
offset := 0
+ if wantOptions.WS != -1 {
+ offset += header.EncodeWSOption(wantOptions.WS, synAckOptions[offset:])
+ }
if wantOptions.TS {
offset += header.EncodeTSOption(wantOptions.TSVal, synOptions.TSVal, synAckOptions[offset:])
}
diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go
index fbb1c32e7..99fdfb795 100644
--- a/pkg/tcpip/transport/udp/endpoint.go
+++ b/pkg/tcpip/transport/udp/endpoint.go
@@ -169,6 +169,9 @@ func (e *endpoint) Close() {
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
}
+// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
+func (e *endpoint) ModerateRecvBuf(copied int) {}
+
// Read reads data from the endpoint. This method does not block if
// there is no data pending.
func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) {