From 08a97a6d193f46cd547fadae3bb4125cc788543b Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Thu, 2 Jan 2020 12:09:24 +0100 Subject: #1398 - send ACK when available buffer space gets larger than 1 MSS When receiving data, netstack avoids sending spurious acks. When user does recv() should netstack send ack telling the sender that the window was increased? It depends. Before this patch, netstack _will_ send the ack in the case when window was zero or window >> scale was zero. Basically - when recv space increased from zero. This is not working right with silly-window-avoidance on the sender side. Some network stacks refuse to transmit segments, that will fill the window but are below MSS. Before this patch, this confuses netstack. On one hand if the window was like 3 bytes, netstack will _not_ send ack if the window increases. On the other hand sending party will refuse to transmit 3-byte packet. This patch changes that, making netstack will send an ACK when the available buffer size increases to or above 1*MSS. This will inform other party buffer is large enough, and hopefully uncork it. Signed-off-by: Marek Majkowski --- pkg/tcpip/transport/tcp/endpoint.go | 30 +++++--- pkg/tcpip/transport/tcp/rcv.go | 6 -- pkg/tcpip/transport/tcp/tcp_test.go | 139 ++++++++++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 14 deletions(-) (limited to 'pkg/tcpip/transport/tcp') diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index fe629aa40..5d42f8045 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -955,10 +955,20 @@ func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) { } e.rcvBufUsed -= len(v) + + avail := e.receiveBufferAvailableLocked() + + // If the window was small before, lower than MSS, send immediate + // ack. Without this, the sender might be stuck waiting for the + // window to grow, while we think the window is non-zero so we + // don't need to send acks. To avoid silly window syndrome, send + // ack only when the window grows above one MSS. + crossedMSS := avail-len(v) < int(e.amss) && avail >= int(e.amss) + // If the window was zero before this read and if the read freed up // enough buffer space for the scaled window to be non-zero then notify // the protocol goroutine to send a window update. - if e.zeroWindow && !e.zeroReceiveWindow(e.rcv.rcvWndScale) { + if (e.zeroWindow && !e.zeroReceiveWindow(e.rcv.rcvWndScale)) || crossedMSS { e.zeroWindow = false e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) } @@ -1138,11 +1148,8 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro // // It must be called with rcvListMu held. func (e *endpoint) zeroReceiveWindow(scale uint8) bool { - if e.rcvBufUsed >= e.rcvBufSize { - return true - } - - return ((e.rcvBufSize - e.rcvBufUsed) >> scale) == 0 + avail := e.receiveBufferAvailableLocked() + return (avail >> scale) == 0 } // SetSockOptInt sets a socket option. @@ -1181,9 +1188,16 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOpt, v int) *tcpip.Error { size = math.MaxInt32 / 2 } + availBefore := e.receiveBufferAvailableLocked() e.rcvBufSize = size + availAfter := e.receiveBufferAvailableLocked() + e.rcvAutoParams.disabled = true - if e.zeroWindow && !e.zeroReceiveWindow(scale) { + + // Immediatelly send ACK in two cases: when the buffer + // grows so that it leaves zero-window state, when the + // buffer grows from small < MSS to >= MSS. + if (e.zeroWindow && !e.zeroReceiveWindow(scale)) || (availBefore < int(e.amss) && availAfter >= int(e.amss)) { e.zeroWindow = false mask |= notifyNonZeroReceiveWindow } @@ -2229,7 +2243,7 @@ func (e *endpoint) readyToRead(s *segment) { // we set the zero window before we deliver the segment to ensure // that a subsequent read of the segment will correctly trigger // a non-zero notification. - if avail := e.receiveBufferAvailableLocked(); avail>>e.rcv.rcvWndScale == 0 { + if e.zeroReceiveWindow(e.rcv.rcvWndScale) { e.stats.ReceiveErrors.ZeroRcvWindowState.Increment() e.zeroWindow = true } diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go index 0a5534959..05c8488f8 100644 --- a/pkg/tcpip/transport/tcp/rcv.go +++ b/pkg/tcpip/transport/tcp/rcv.go @@ -98,12 +98,6 @@ func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { // in such cases we may need to send an ack to indicate to our peer that it can // resume sending data. func (r *receiver) nonZeroWindow() { - if (r.rcvAcc-r.rcvNxt)>>r.rcvWndScale != 0 { - // We never got around to announcing a zero window size, so we - // don't need to immediately announce a nonzero one. - return - } - // Immediately send an ack. r.ep.snd.sendAck() } diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index e8fe4dab5..a05365c6a 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -6561,3 +6561,142 @@ func TestKeepaliveWithUserTimeout(t *testing.T) { t.Errorf("got c.Stack().Stats().TCP.EstablishedTimedout = %v, want = %v", got, want) } } + +func TestIncreaseWindowOnReceive(t *testing.T) { + // This test ensures that the endpoint sends an ack, + // after recv() when the window grows to more than 1 MSS. + c := context.New(t, defaultMTU) + defer c.Cleanup() + + const rcvBuf = 65535 * 10 + c.CreateConnected(789, 30000, rcvBuf) + + // Write chunks of ~30000 bytes. It's important that two + // payloads make it equal or longer than MSS. + remain := rcvBuf + sent := 0 + data := make([]byte, defaultMTU/2) + lastWnd := uint16(0) + + for remain > len(data) { + c.SendPacket(data, &context.Headers{ + SrcPort: context.TestPort, + DstPort: c.Port, + Flags: header.TCPFlagAck, + SeqNum: seqnum.Value(790 + sent), + AckNum: c.IRS.Add(1), + RcvWnd: 30000, + }) + sent += len(data) + remain -= len(data) + + lastWnd = uint16(remain) + if remain > 0xffff { + lastWnd = 0xffff + } + checker.IPv4(t, c.GetPacket(), + checker.PayloadLen(header.TCPMinimumSize), + checker.TCP( + checker.DstPort(context.TestPort), + checker.SeqNum(uint32(c.IRS)+1), + checker.AckNum(uint32(790+sent)), + checker.Window(lastWnd), + checker.TCPFlags(header.TCPFlagAck), + ), + ) + } + + if lastWnd == 0xffff || lastWnd == 0 { + t.Fatalf("expected small, non-zero window: %d", lastWnd) + } + + // We now have < 1 MSS in the buffer space. Read the data! An + // ack should be sent in response to that. The window was not + // zero, but it grew to larger than MSS. + _, _, err := c.EP.Read(nil) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + + _, _, err = c.EP.Read(nil) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + + // After reading two packets, we surely crossed MSS. See the ack: + checker.IPv4(t, c.GetPacket(), + checker.PayloadLen(header.TCPMinimumSize), + checker.TCP( + checker.DstPort(context.TestPort), + checker.SeqNum(uint32(c.IRS)+1), + checker.AckNum(uint32(790+sent)), + checker.Window(uint16(0xffff)), + checker.TCPFlags(header.TCPFlagAck), + ), + ) +} + +func TestIncreaseWindowOnBufferResize(t *testing.T) { + // This test ensures that the endpoint sends an ack, + // after available recv buffer grows to more than 1 MSS. + c := context.New(t, defaultMTU) + defer c.Cleanup() + + const rcvBuf = 65535 * 10 + c.CreateConnected(789, 30000, rcvBuf) + + // Write chunks of ~30000 bytes. It's important that two + // payloads make it equal or longer than MSS. + remain := rcvBuf + sent := 0 + data := make([]byte, defaultMTU/2) + lastWnd := uint16(0) + + for remain > len(data) { + c.SendPacket(data, &context.Headers{ + SrcPort: context.TestPort, + DstPort: c.Port, + Flags: header.TCPFlagAck, + SeqNum: seqnum.Value(790 + sent), + AckNum: c.IRS.Add(1), + RcvWnd: 30000, + }) + sent += len(data) + remain -= len(data) + + lastWnd = uint16(remain) + if remain > 0xffff { + lastWnd = 0xffff + } + checker.IPv4(t, c.GetPacket(), + checker.PayloadLen(header.TCPMinimumSize), + checker.TCP( + checker.DstPort(context.TestPort), + checker.SeqNum(uint32(c.IRS)+1), + checker.AckNum(uint32(790+sent)), + checker.Window(lastWnd), + checker.TCPFlags(header.TCPFlagAck), + ), + ) + } + + if lastWnd == 0xffff || lastWnd == 0 { + t.Fatalf("expected small, non-zero window: %d", lastWnd) + } + + // Increasing the buffer from should generate an ACK, + // since window grew from small value to larger equal MSS + c.EP.SetSockOptInt(tcpip.ReceiveBufferSizeOption, rcvBuf*2) + + // After reading two packets, we surely crossed MSS. See the ack: + checker.IPv4(t, c.GetPacket(), + checker.PayloadLen(header.TCPMinimumSize), + checker.TCP( + checker.DstPort(context.TestPort), + checker.SeqNum(uint32(c.IRS)+1), + checker.AckNum(uint32(790+sent)), + checker.Window(uint16(0xffff)), + checker.TCPFlags(header.TCPFlagAck), + ), + ) +} -- cgit v1.2.3 From c276e4740f225068dc394a500dfdddd64af1d18a Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Wed, 8 Jan 2020 12:56:39 +0000 Subject: Fix #1522 - implement silly window sydrome protection on rx side Before, each of small read()'s that raises window either from zero or above threshold of aMSS, would generate an ACK. In a classic silly-window-syndrome scenario, we can imagine a pessimistic case when small read()'s generate a stream of ACKs. This PR fixes that, essentially treating window size < aMSS as zero. We send ACK exactly in a moment when window increases to >= aMSS or half of receive buffer size (whichever smaller). --- pkg/tcpip/transport/tcp/endpoint.go | 73 +++++++++++++++++++++---------------- pkg/tcpip/transport/tcp/tcp_test.go | 14 ++++--- 2 files changed, 51 insertions(+), 36 deletions(-) (limited to 'pkg/tcpip/transport/tcp') diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 5d42f8045..8ff125855 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -956,20 +956,11 @@ func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) { e.rcvBufUsed -= len(v) - avail := e.receiveBufferAvailableLocked() - - // If the window was small before, lower than MSS, send immediate - // ack. Without this, the sender might be stuck waiting for the - // window to grow, while we think the window is non-zero so we - // don't need to send acks. To avoid silly window syndrome, send - // ack only when the window grows above one MSS. - crossedMSS := avail-len(v) < int(e.amss) && avail >= int(e.amss) - - // If the window was zero before this read and if the read freed up - // enough buffer space for the scaled window to be non-zero then notify - // the protocol goroutine to send a window update. - if (e.zeroWindow && !e.zeroReceiveWindow(e.rcv.rcvWndScale)) || crossedMSS { - e.zeroWindow = false + // If the window was small before this read and if the read + // freed up enough buffer space, to either fit an aMSS or half + // a receive buffer (whichever smaller), then notify the + // protocol goroutine to send a window update. + if e.windowCrossedACKThreshold(len(v)) == 1 { e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) } @@ -1143,13 +1134,35 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro return num, tcpip.ControlMessages{}, nil } -// zeroReceiveWindow checks if the receive window to be announced now would be -// zero, based on the amount of available buffer and the receive window scaling. +// windowCrossedACKThreshold checks if the receive window to be announced now would be +// under aMSS or under half receive buffer, whichever smaller. This is useful as +// a receive side silly window syndrome prevention mechanism. If window grows +// to reasonable value, we should send ACK to the sender to inform the rx space is now +// large. We also want ensure a series of small read()'s won't trigger a flood of +// spurious tiny ACK's. // -// It must be called with rcvListMu held. -func (e *endpoint) zeroReceiveWindow(scale uint8) bool { - avail := e.receiveBufferAvailableLocked() - return (avail >> scale) == 0 +// For large receive buffers, the threshold is aMSS - once reader reads more than aMSS +// we'll send ACK. For tiny receive buffers, the threshold is half of receive buffer size. +// This is chosen arbitrairly. +func (e *endpoint) windowCrossedACKThreshold(deltaBefore int) int { + newAvail := e.receiveBufferAvailableLocked() + oldAvail := newAvail - deltaBefore + if oldAvail < 0 { + oldAvail = 0 + } + + threshold := int(e.amss) + if threshold > e.rcvBufSize/2 { + threshold = e.rcvBufSize / 2 + } + + switch { + case oldAvail < threshold && newAvail >= threshold: + return 1 + case oldAvail >= threshold && newAvail < threshold: + return -1 + } + return 0 } // SetSockOptInt sets a socket option. @@ -1194,11 +1207,11 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOpt, v int) *tcpip.Error { e.rcvAutoParams.disabled = true - // Immediatelly send ACK in two cases: when the buffer - // grows so that it leaves zero-window state, when the - // buffer grows from small < MSS to >= MSS. - if (e.zeroWindow && !e.zeroReceiveWindow(scale)) || (availBefore < int(e.amss) && availAfter >= int(e.amss)) { - e.zeroWindow = false + // Immediatelly send an ACK to uncork the sender silly + // window syndrome prevetion, when our available space + // grows above aMSS or half receive buffer, whichever + // smaller. + if e.windowCrossedACKThreshold(availAfter-availBefore) == 1 { mask |= notifyNonZeroReceiveWindow } e.rcvListMu.Unlock() @@ -2239,13 +2252,11 @@ func (e *endpoint) readyToRead(s *segment) { if s != nil { s.incRef() e.rcvBufUsed += s.data.Size() - // Check if the receive window is now closed. If so make sure - // we set the zero window before we deliver the segment to ensure - // that a subsequent read of the segment will correctly trigger - // a non-zero notification. - if e.zeroReceiveWindow(e.rcv.rcvWndScale) { + // Increase counter if the receive window falls down + // below MSS or half receive buffer size, whichever + // smaller. + if e.windowCrossedACKThreshold(-s.data.Size()) == -1 { e.stats.ReceiveErrors.ZeroRcvWindowState.Increment() - e.zeroWindow = true } e.rcvList.PushBack(s) } else { diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index a05365c6a..4c2e458e3 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -2091,10 +2091,14 @@ func TestZeroScaledWindowReceive(t *testing.T) { ) } - // Read some data. An ack should be sent in response to that. - v, _, err := c.EP.Read(nil) - if err != nil { - t.Fatalf("Read failed: %v", err) + // Read at least 1MSS of data. An ack should be sent in response to that. + sz := 0 + for sz < defaultMTU { + v, _, err := c.EP.Read(nil) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + sz += len(v) } checker.IPv4(t, c.GetPacket(), @@ -2103,7 +2107,7 @@ func TestZeroScaledWindowReceive(t *testing.T) { checker.DstPort(context.TestPort), checker.SeqNum(uint32(c.IRS)+1), checker.AckNum(uint32(790+sent)), - checker.Window(uint16(len(v)>>ws)), + checker.Window(uint16(sz>>ws)), checker.TCPFlags(header.TCPFlagAck), ), ) -- cgit v1.2.3