diff options
author | gVisor bot <gvisor-bot@google.com> | 2020-01-09 19:35:27 -0800 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2020-01-09 19:35:27 -0800 |
commit | b08da42285fe97f23e20bdf35ab20fdac92b3a5c (patch) | |
tree | bd3e582056a81cead4f1e29e3f6068a80a8d03c9 | |
parent | 356d81146bafc4b4548163eb87e886c851b49e12 (diff) | |
parent | c276e4740f225068dc394a500dfdddd64af1d18a (diff) |
Merge pull request #1523 from majek:fix-1522-silly-window-rx
PiperOrigin-RevId: 289019953
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 74 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rcv.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_test.go | 151 |
3 files changed, 199 insertions, 32 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 920b24975..830bc1e3e 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -885,8 +885,14 @@ func (e *endpoint) ModerateRecvBuf(copied int) { // reject valid data that might already be in flight as the // acceptable window will shrink. if rcvWnd > e.rcvBufSize { + availBefore := e.receiveBufferAvailableLocked() e.rcvBufSize = rcvWnd - e.notifyProtocolGoroutine(notifyReceiveWindowChanged) + availAfter := e.receiveBufferAvailableLocked() + mask := uint32(notifyReceiveWindowChanged) + if crossed, above := e.windowCrossedACKThreshold(availAfter - availBefore); crossed && above { + mask |= notifyNonZeroReceiveWindow + } + e.notifyProtocolGoroutine(mask) } // We only update prevCopied when we grow the buffer because in cases @@ -955,11 +961,12 @@ func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) { } e.rcvBufUsed -= len(v) - // If the window was zero before this read and if the read freed up - // enough buffer space for the scaled window to be non-zero then notify - // the protocol goroutine to send a window update. - if e.zeroWindow && !e.zeroReceiveWindow(e.rcv.rcvWndScale) { - e.zeroWindow = false + + // If the window was small before this read and if the read freed up + // enough buffer space, to either fit an aMSS or half a receive buffer + // (whichever smaller), then notify the protocol goroutine to send a + // window update. + if crossed, above := e.windowCrossedACKThreshold(len(v)); crossed && above { e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) } @@ -1133,16 +1140,38 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro return num, tcpip.ControlMessages{}, nil } -// zeroReceiveWindow checks if the receive window to be announced now would be -// zero, based on the amount of available buffer and the receive window scaling. +// windowCrossedACKThreshold checks if the receive window to be announced now +// would be under aMSS or under half receive buffer, whichever smaller. This is +// useful as a receive side silly window syndrome prevention mechanism. If +// window grows to reasonable value, we should send ACK to the sender to inform +// the rx space is now large. We also want ensure a series of small read()'s +// won't trigger a flood of spurious tiny ACK's. // -// It must be called with rcvListMu held. -func (e *endpoint) zeroReceiveWindow(scale uint8) bool { - if e.rcvBufUsed >= e.rcvBufSize { - return true +// For large receive buffers, the threshold is aMSS - once reader reads more +// than aMSS we'll send ACK. For tiny receive buffers, the threshold is half of +// receive buffer size. This is chosen arbitrairly. +// crossed will be true if the window size crossed the ACK threshold. +// above will be true if the new window is >= ACK threshold and false +// otherwise. +func (e *endpoint) windowCrossedACKThreshold(deltaBefore int) (crossed bool, above bool) { + newAvail := e.receiveBufferAvailableLocked() + oldAvail := newAvail - deltaBefore + if oldAvail < 0 { + oldAvail = 0 + } + + threshold := int(e.amss) + if threshold > e.rcvBufSize/2 { + threshold = e.rcvBufSize / 2 } - return ((e.rcvBufSize - e.rcvBufUsed) >> scale) == 0 + switch { + case oldAvail < threshold && newAvail >= threshold: + return true, true + case oldAvail >= threshold && newAvail < threshold: + return true, false + } + return false, false } // SetSockOptBool sets a socket option. @@ -1204,10 +1233,16 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) *tcpip.Error { size = math.MaxInt32 / 2 } + availBefore := e.receiveBufferAvailableLocked() e.rcvBufSize = size + availAfter := e.receiveBufferAvailableLocked() + e.rcvAutoParams.disabled = true - if e.zeroWindow && !e.zeroReceiveWindow(scale) { - e.zeroWindow = false + + // Immediately send an ACK to uncork the sender silly window + // syndrome prevetion, when our available space grows above aMSS + // or half receive buffer, whichever smaller. + if crossed, above := e.windowCrossedACKThreshold(availAfter - availBefore); crossed && above { mask |= notifyNonZeroReceiveWindow } e.rcvListMu.Unlock() @@ -2225,13 +2260,10 @@ func (e *endpoint) readyToRead(s *segment) { if s != nil { s.incRef() e.rcvBufUsed += s.data.Size() - // Check if the receive window is now closed. If so make sure - // we set the zero window before we deliver the segment to ensure - // that a subsequent read of the segment will correctly trigger - // a non-zero notification. - if avail := e.receiveBufferAvailableLocked(); avail>>e.rcv.rcvWndScale == 0 { + // Increase counter if the receive window falls down below MSS + // or half receive buffer size, whichever smaller. + if crossed, above := e.windowCrossedACKThreshold(-s.data.Size()); crossed && !above { e.stats.ReceiveErrors.ZeroRcvWindowState.Increment() - e.zeroWindow = true } e.rcvList.PushBack(s) } else { diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go index 0a5534959..05c8488f8 100644 --- a/pkg/tcpip/transport/tcp/rcv.go +++ b/pkg/tcpip/transport/tcp/rcv.go @@ -98,12 +98,6 @@ func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { // in such cases we may need to send an ack to indicate to our peer that it can // resume sending data. func (r *receiver) nonZeroWindow() { - if (r.rcvAcc-r.rcvNxt)>>r.rcvWndScale != 0 { - // We never got around to announcing a zero window size, so we - // don't need to immediately announce a nonzero one. - return - } - // Immediately send an ack. r.ep.snd.sendAck() } diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index 1aa0733d0..6edfa8dce 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -2091,10 +2091,14 @@ func TestZeroScaledWindowReceive(t *testing.T) { ) } - // Read some data. An ack should be sent in response to that. - v, _, err := c.EP.Read(nil) - if err != nil { - t.Fatalf("Read failed: %v", err) + // Read at least 1MSS of data. An ack should be sent in response to that. + sz := 0 + for sz < defaultMTU { + v, _, err := c.EP.Read(nil) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + sz += len(v) } checker.IPv4(t, c.GetPacket(), @@ -2103,7 +2107,7 @@ func TestZeroScaledWindowReceive(t *testing.T) { checker.DstPort(context.TestPort), checker.SeqNum(uint32(c.IRS)+1), checker.AckNum(uint32(790+sent)), - checker.Window(uint16(len(v)>>ws)), + checker.Window(uint16(sz>>ws)), checker.TCPFlags(header.TCPFlagAck), ), ) @@ -6556,3 +6560,140 @@ func TestKeepaliveWithUserTimeout(t *testing.T) { t.Errorf("got c.Stack().Stats().TCP.EstablishedTimedout = %v, want = %v", got, want) } } + +func TestIncreaseWindowOnReceive(t *testing.T) { + // This test ensures that the endpoint sends an ack, + // after recv() when the window grows to more than 1 MSS. + c := context.New(t, defaultMTU) + defer c.Cleanup() + + const rcvBuf = 65535 * 10 + c.CreateConnected(789, 30000, rcvBuf) + + // Write chunks of ~30000 bytes. It's important that two + // payloads make it equal or longer than MSS. + remain := rcvBuf + sent := 0 + data := make([]byte, defaultMTU/2) + lastWnd := uint16(0) + + for remain > len(data) { + c.SendPacket(data, &context.Headers{ + SrcPort: context.TestPort, + DstPort: c.Port, + Flags: header.TCPFlagAck, + SeqNum: seqnum.Value(790 + sent), + AckNum: c.IRS.Add(1), + RcvWnd: 30000, + }) + sent += len(data) + remain -= len(data) + + lastWnd = uint16(remain) + if remain > 0xffff { + lastWnd = 0xffff + } + checker.IPv4(t, c.GetPacket(), + checker.PayloadLen(header.TCPMinimumSize), + checker.TCP( + checker.DstPort(context.TestPort), + checker.SeqNum(uint32(c.IRS)+1), + checker.AckNum(uint32(790+sent)), + checker.Window(lastWnd), + checker.TCPFlags(header.TCPFlagAck), + ), + ) + } + + if lastWnd == 0xffff || lastWnd == 0 { + t.Fatalf("expected small, non-zero window: %d", lastWnd) + } + + // We now have < 1 MSS in the buffer space. Read the data! An + // ack should be sent in response to that. The window was not + // zero, but it grew to larger than MSS. + if _, _, err := c.EP.Read(nil); err != nil { + t.Fatalf("Read failed: %v", err) + } + + if _, _, err := c.EP.Read(nil); err != nil { + t.Fatalf("Read failed: %v", err) + } + + // After reading two packets, we surely crossed MSS. See the ack: + checker.IPv4(t, c.GetPacket(), + checker.PayloadLen(header.TCPMinimumSize), + checker.TCP( + checker.DstPort(context.TestPort), + checker.SeqNum(uint32(c.IRS)+1), + checker.AckNum(uint32(790+sent)), + checker.Window(uint16(0xffff)), + checker.TCPFlags(header.TCPFlagAck), + ), + ) +} + +func TestIncreaseWindowOnBufferResize(t *testing.T) { + // This test ensures that the endpoint sends an ack, + // after available recv buffer grows to more than 1 MSS. + c := context.New(t, defaultMTU) + defer c.Cleanup() + + const rcvBuf = 65535 * 10 + c.CreateConnected(789, 30000, rcvBuf) + + // Write chunks of ~30000 bytes. It's important that two + // payloads make it equal or longer than MSS. + remain := rcvBuf + sent := 0 + data := make([]byte, defaultMTU/2) + lastWnd := uint16(0) + + for remain > len(data) { + c.SendPacket(data, &context.Headers{ + SrcPort: context.TestPort, + DstPort: c.Port, + Flags: header.TCPFlagAck, + SeqNum: seqnum.Value(790 + sent), + AckNum: c.IRS.Add(1), + RcvWnd: 30000, + }) + sent += len(data) + remain -= len(data) + + lastWnd = uint16(remain) + if remain > 0xffff { + lastWnd = 0xffff + } + checker.IPv4(t, c.GetPacket(), + checker.PayloadLen(header.TCPMinimumSize), + checker.TCP( + checker.DstPort(context.TestPort), + checker.SeqNum(uint32(c.IRS)+1), + checker.AckNum(uint32(790+sent)), + checker.Window(lastWnd), + checker.TCPFlags(header.TCPFlagAck), + ), + ) + } + + if lastWnd == 0xffff || lastWnd == 0 { + t.Fatalf("expected small, non-zero window: %d", lastWnd) + } + + // Increasing the buffer from should generate an ACK, + // since window grew from small value to larger equal MSS + c.EP.SetSockOptInt(tcpip.ReceiveBufferSizeOption, rcvBuf*2) + + // After reading two packets, we surely crossed MSS. See the ack: + checker.IPv4(t, c.GetPacket(), + checker.PayloadLen(header.TCPMinimumSize), + checker.TCP( + checker.DstPort(context.TestPort), + checker.SeqNum(uint32(c.IRS)+1), + checker.AckNum(uint32(790+sent)), + checker.Window(uint16(0xffff)), + checker.TCPFlags(header.TCPFlagAck), + ), + ) +} |