diff options
Diffstat (limited to 'pkg/tcpip/transport/tcp')
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 11 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rcv.go | 52 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment_unsafe.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_test.go | 5 |
5 files changed, 62 insertions, 11 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 31eded0ce..c944dccc0 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -16,6 +16,7 @@ package tcp import ( "encoding/binary" + "math" "time" "gvisor.dev/gvisor/pkg/rand" @@ -133,7 +134,7 @@ func FindWndScale(wnd seqnum.Size) int { return 0 } - max := seqnum.Size(0xffff) + max := seqnum.Size(math.MaxUint16) s := 0 for wnd > max && s < header.MaxWndScale { s++ @@ -818,8 +819,8 @@ func sendTCPBatch(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso data = data.Clone(nil) optLen := len(tf.opts) - if tf.rcvWnd > 0xffff { - tf.rcvWnd = 0xffff + if tf.rcvWnd > math.MaxUint16 { + tf.rcvWnd = math.MaxUint16 } mss := int(gso.MSS) @@ -863,8 +864,8 @@ func sendTCPBatch(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso // network endpoint and under the provided identity. func sendTCP(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso *stack.GSO, owner tcpip.PacketOwner) *tcpip.Error { optLen := len(tf.opts) - if tf.rcvWnd > 0xffff { - tf.rcvWnd = 0xffff + if tf.rcvWnd > math.MaxUint16 { + tf.rcvWnd = math.MaxUint16 } if r.Loop&stack.PacketLoop == 0 && gso != nil && gso.Type == stack.GSOSW && int(gso.MSS) < data.Size() { diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go index 8e0b7c843..f2b1b68da 100644 --- a/pkg/tcpip/transport/tcp/rcv.go +++ b/pkg/tcpip/transport/tcp/rcv.go @@ -16,6 +16,7 @@ package tcp import ( "container/heap" + "math" "time" "gvisor.dev/gvisor/pkg/tcpip" @@ -48,6 +49,10 @@ type receiver struct { rcvWndScale uint8 + // prevBufused is the snapshot of endpoint rcvBufUsed taken when we + // advertise a receive window. + prevBufUsed int + closed bool // pendingRcvdSegments is bounded by the receive buffer size of the @@ -80,9 +85,9 @@ func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool { // outgoing packets, we should use what we have advertised for acceptability // test. scaledWindowSize := r.rcvWnd >> r.rcvWndScale - if scaledWindowSize > 0xffff { + if scaledWindowSize > math.MaxUint16 { // This is what we actually put in the Window field. - scaledWindowSize = 0xffff + scaledWindowSize = math.MaxUint16 } advertisedWindowSize := scaledWindowSize << r.rcvWndScale return header.Acceptable(segSeq, segLen, r.rcvNxt, r.rcvNxt.Add(advertisedWindowSize)) @@ -106,6 +111,34 @@ func (r *receiver) currentWindow() (curWnd seqnum.Size) { func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { newWnd := r.ep.selectWindow() curWnd := r.currentWindow() + unackLen := int(r.ep.snd.maxSentAck.Size(r.rcvNxt)) + bufUsed := r.ep.receiveBufferUsed() + + // Grow the right edge of the window only for payloads larger than the + // the segment overhead OR if the application is actively consuming data. + // + // Avoiding growing the right edge otherwise, addresses a situation below: + // An application has been slow in reading data and we have burst of + // incoming segments lengths < segment overhead. Here, our available free + // memory would reduce drastically when compared to the advertised receive + // window. + // + // For example: With incoming 512 bytes segments, segment overhead of + // 552 bytes (at the time of writing this comment), with receive window + // starting from 1MB and with rcvAdvWndScale being 1, buffer would reach 0 + // when the curWnd is still 19436 bytes, because for every incoming segment + // newWnd would reduce by (552+512) >> rcvAdvWndScale (current value 1), + // while curWnd would reduce by 512 bytes. + // Such a situation causes us to keep tail dropping the incoming segments + // and never advertise zero receive window to the peer. + // + // Linux does a similar check for minimal sk_buff size (128): + // https://github.com/torvalds/linux/blob/d5beb3140f91b1c8a3d41b14d729aefa4dcc58bc/net/ipv4/tcp_input.c#L783 + // + // Also, if the application is reading the data, we keep growing the right + // edge, as we are still advertising a window that we think can be serviced. + toGrow := unackLen >= SegSize || bufUsed <= r.prevBufUsed + // Update rcvAcc only if new window is > previously advertised window. We // should never shrink the acceptable sequence space once it has been // advertised the peer. If we shrink the acceptable sequence space then we @@ -115,7 +148,7 @@ func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { // rcvWUP rcvNxt rcvAcc new rcvAcc // <=====curWnd ===> // <========= newWnd > curWnd ========= > - if r.rcvNxt.Add(seqnum.Size(curWnd)).LessThan(r.rcvNxt.Add(seqnum.Size(newWnd))) { + if r.rcvNxt.Add(seqnum.Size(curWnd)).LessThan(r.rcvNxt.Add(seqnum.Size(newWnd))) && toGrow { // If the new window moves the right edge, then update rcvAcc. r.rcvAcc = r.rcvNxt.Add(seqnum.Size(newWnd)) } else { @@ -130,11 +163,24 @@ func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { // receiver's estimated RTT. r.rcvWnd = newWnd r.rcvWUP = r.rcvNxt + r.prevBufUsed = bufUsed scaledWnd := r.rcvWnd >> r.rcvWndScale if scaledWnd == 0 { // Increment a metric if we are advertising an actual zero window. r.ep.stats.ReceiveErrors.ZeroRcvWindowState.Increment() } + + // If we started off with a window larger than what can he held in + // the 16bit window field, we ceil the value to the max value. + // While ceiling, we still do not want to grow the right edge when + // not applicable. + if scaledWnd > math.MaxUint16 { + if toGrow { + scaledWnd = seqnum.Size(math.MaxUint16) + } else { + scaledWnd = seqnum.Size(uint16(scaledWnd)) + } + } return r.rcvNxt, scaledWnd } diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go index 2091989cc..5ef73ec74 100644 --- a/pkg/tcpip/transport/tcp/segment.go +++ b/pkg/tcpip/transport/tcp/segment.go @@ -204,7 +204,7 @@ func (s *segment) payloadSize() int { // segMemSize is the amount of memory used to hold the segment data and // the associated metadata. func (s *segment) segMemSize() int { - return segSize + s.data.Size() + return SegSize + s.data.Size() } // parse populates the sequence & ack numbers, flags, and window fields of the diff --git a/pkg/tcpip/transport/tcp/segment_unsafe.go b/pkg/tcpip/transport/tcp/segment_unsafe.go index 0ab7b8f56..392ff0859 100644 --- a/pkg/tcpip/transport/tcp/segment_unsafe.go +++ b/pkg/tcpip/transport/tcp/segment_unsafe.go @@ -19,5 +19,6 @@ import ( ) const ( - segSize = int(unsafe.Sizeof(segment{})) + // SegSize is the minimal size of the segment overhead. + SegSize = int(unsafe.Sizeof(segment{})) ) diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index 98620e6db..1759ebea9 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -6098,10 +6098,13 @@ func TestReceiveBufferAutoTuningApplicationLimited(t *testing.T) { // Introduce a 25ms latency by delaying the first byte. latency := 25 * time.Millisecond time.Sleep(latency) - rawEP.SendPacketWithTS([]byte{1}, tsVal) + // Send an initial payload with atleast segment overhead size. The receive + // window would not grow for smaller segments. + rawEP.SendPacketWithTS(make([]byte, tcp.SegSize), tsVal) pkt := rawEP.VerifyAndReturnACKWithTS(tsVal) rcvWnd := header.TCP(header.IPv4(pkt).Payload()).WindowSize() + time.Sleep(25 * time.Millisecond) // Allocate a large enough payload for the test. |