diff options
author | Bhasker Hariharan <bhaskerh@google.com> | 2020-10-09 18:59:48 -0700 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2020-10-09 19:02:03 -0700 |
commit | db36d948fa63ce950d94a5e8e9ebc37956543661 (patch) | |
tree | 123b5ab20650dbb5f79300a4d68d2d98e4c79cfe | |
parent | d75fe7660a61a454ece9472658eac609b3bf61e6 (diff) |
TCP Receive window advertisement fixes.
The fix in commit 028e045da93b7c1c26417e80e4b4e388b86a713d was incorrect as
it can cause the right edge of the window to shrink when we announce
a zero window due to receive buffer being full as its done before the check
for seeing if the window is being shrunk because of the selected window.
Further the window was calculated purely on available space but in cases where
we are getting full sized segments it makes more sense to use the actual bytes
being held. This CL changes to use the lower of the total available space vs
the available space in the maximal window we could advertise minus the actual
payload bytes being held.
This change also cleans up the code so that the window selection logic is
not duplicated between getSendParams() and windowCrossedACKThresholdLocked.
PiperOrigin-RevId: 336404827
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 41 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rcv.go | 54 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_test.go | 20 | ||||
-rw-r--r-- | test/syscalls/linux/tcp_socket.cc | 52 |
5 files changed, 146 insertions, 27 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 189c01c8f..0aaef495d 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1219,12 +1219,6 @@ func (e *endpoint) handleSegment(s *segment) (cont bool, err *tcpip.Error) { return true, nil } - // Increase counter if after processing the segment we would potentially - // advertise a zero window. - if crossed, above := e.windowCrossedACKThresholdLocked(-s.segMemSize()); crossed && !above { - e.stats.ReceiveErrors.ZeroRcvWindowState.Increment() - } - // Now check if the received segment has caused us to transition // to a CLOSED state, if yes then terminate processing and do // not invoke the sender. diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index bc3b409ba..3bcd3923a 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -248,6 +248,11 @@ type ReceiveErrors struct { // ZeroRcvWindowState is the number of times we advertised // a zero receive window when rcvList is full. ZeroRcvWindowState tcpip.StatCounter + + // WantZeroWindow is the number of times we wanted to advertise a + // zero receive window but couldn't because it would have caused + // the receive window's right edge to shrink. + WantZeroRcvWindow tcpip.StatCounter } // SendErrors collect segment send errors within the transport layer. @@ -1162,7 +1167,7 @@ func (e *endpoint) cleanupLocked() { // wndFromSpace returns the window that we can advertise based on the available // receive buffer space. func wndFromSpace(space int) int { - return space / (1 << rcvAdvWndScale) + return space >> rcvAdvWndScale } // initialReceiveWindow returns the initial receive window to advertise in the @@ -1518,6 +1523,38 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro return num, tcpip.ControlMessages{}, nil } +// selectWindowLocked returns the new window without checking for shrinking or scaling +// applied. +// Precondition: e.mu and e.rcvListMu must be held. +func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) { + wndFromAvailable := wndFromSpace(e.receiveBufferAvailableLocked()) + maxWindow := wndFromSpace(e.rcvBufSize) + wndFromUsedBytes := maxWindow - e.rcvBufUsed + + // We take the lesser of the wndFromAvailable and wndFromUsedBytes because in + // cases where we receive a lot of small segments the segment overhead is a + // lot higher and we can run out socket buffer space before we can fill the + // previous window we advertised. In cases where we receive MSS sized or close + // MSS sized segments we will probably run out of window space before we + // exhaust receive buffer. + newWnd := wndFromAvailable + if newWnd > wndFromUsedBytes { + newWnd = wndFromUsedBytes + } + if newWnd < 0 { + newWnd = 0 + } + return seqnum.Size(newWnd) +} + +// selectWindow invokes selectWindowLocked after acquiring e.rcvListMu. +func (e *endpoint) selectWindow() (wnd seqnum.Size) { + e.rcvListMu.Lock() + wnd = e.selectWindowLocked() + e.rcvListMu.Unlock() + return wnd +} + // windowCrossedACKThresholdLocked checks if the receive window to be announced // would be under aMSS or under the window derived from half receive buffer, // whichever smaller. This is useful as a receive side silly window syndrome @@ -1534,7 +1571,7 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro // // Precondition: e.mu and e.rcvListMu must be held. func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed bool, above bool) { - newAvail := wndFromSpace(e.receiveBufferAvailableLocked()) + newAvail := int(e.selectWindowLocked()) oldAvail := newAvail - deltaBefore if oldAvail < 0 { oldAvail = 0 diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go index 48bf196d8..8e0b7c843 100644 --- a/pkg/tcpip/transport/tcp/rcv.go +++ b/pkg/tcpip/transport/tcp/rcv.go @@ -43,6 +43,9 @@ type receiver struct { // rcvWnd is the non-scaled receive window last advertised to the peer. rcvWnd seqnum.Size + // rcvWUP is the rcvNxt value at the last window update sent. + rcvWUP seqnum.Value + rcvWndScale uint8 closed bool @@ -64,6 +67,7 @@ func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale rcvNxt: irs + 1, rcvAcc: irs.Add(rcvWnd + 1), rcvWnd: rcvWnd, + rcvWUP: irs + 1, rcvWndScale: rcvWndScale, lastRcvdAckTime: time.Now(), } @@ -84,34 +88,54 @@ func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool { return header.Acceptable(segSeq, segLen, r.rcvNxt, r.rcvNxt.Add(advertisedWindowSize)) } +// currentWindow returns the available space in the window that was advertised +// last to our peer. +func (r *receiver) currentWindow() (curWnd seqnum.Size) { + endOfWnd := r.rcvWUP.Add(r.rcvWnd) + if endOfWnd.LessThan(r.rcvNxt) { + // return 0 if r.rcvNxt is past the end of the previously advertised window. + // This can happen because we accept a large segment completely even if + // accepting it causes it to partially exceed the advertised window. + return 0 + } + return r.rcvNxt.Size(endOfWnd) +} + // getSendParams returns the parameters needed by the sender when building // segments to send. func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { - avail := wndFromSpace(r.ep.receiveBufferAvailable()) - if avail == 0 { - // We have no space available to accept any data, move to zero window - // state. - r.rcvWnd = 0 - return r.rcvNxt, 0 - } - - acc := r.rcvNxt.Add(seqnum.Size(avail)) - newWnd := r.rcvNxt.Size(acc) - curWnd := r.rcvNxt.Size(r.rcvAcc) - + newWnd := r.ep.selectWindow() + curWnd := r.currentWindow() // Update rcvAcc only if new window is > previously advertised window. We // should never shrink the acceptable sequence space once it has been // advertised the peer. If we shrink the acceptable sequence space then we // would end up dropping bytes that might already be in flight. - if newWnd > curWnd { - r.rcvAcc = r.rcvNxt.Add(newWnd) + // ==================================================== sequence space. + // ^ ^ ^ ^ + // rcvWUP rcvNxt rcvAcc new rcvAcc + // <=====curWnd ===> + // <========= newWnd > curWnd ========= > + if r.rcvNxt.Add(seqnum.Size(curWnd)).LessThan(r.rcvNxt.Add(seqnum.Size(newWnd))) { + // If the new window moves the right edge, then update rcvAcc. + r.rcvAcc = r.rcvNxt.Add(seqnum.Size(newWnd)) } else { + if newWnd == 0 { + // newWnd is zero but we can't advertise a zero as it would cause window + // to shrink so just increment a metric to record this event. + r.ep.stats.ReceiveErrors.WantZeroRcvWindow.Increment() + } newWnd = curWnd } // Stash away the non-scaled receive window as we use it for measuring // receiver's estimated RTT. r.rcvWnd = newWnd - return r.rcvNxt, r.rcvWnd >> r.rcvWndScale + r.rcvWUP = r.rcvNxt + scaledWnd := r.rcvWnd >> r.rcvWndScale + if scaledWnd == 0 { + // Increment a metric if we are advertising an actual zero window. + r.ep.stats.ReceiveErrors.ZeroRcvWindowState.Increment() + } + return r.rcvNxt, scaledWnd } // nonZeroWindow is called when the receive window grows from zero to nonzero; diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index 5b504d0d1..a7149efd0 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -6264,14 +6264,27 @@ func TestReceiveBufferAutoTuning(t *testing.T) { rawEP.NextSeqNum-- rawEP.SendPacketWithTS(nil, tsVal) rawEP.NextSeqNum++ + if i == 0 { // In the first iteration the receiver based RTT is not // yet known as a result the moderation code should not // increase the advertised window. rawEP.VerifyACKRcvWnd(scaleRcvWnd(curRcvWnd)) } else { - pkt := c.GetPacket() - curRcvWnd = int(header.TCP(header.IPv4(pkt).Payload()).WindowSize()) << c.WindowScale + // Read loop above could generate an ACK if the window had dropped to + // zero and then read had opened it up. + lastACK := c.GetPacket() + // Discard any intermediate ACKs and only check the last ACK we get in a + // short time period of few ms. + for { + time.Sleep(1 * time.Millisecond) + pkt := c.GetPacketNonBlocking() + if pkt == nil { + break + } + lastACK = pkt + } + curRcvWnd = int(header.TCP(header.IPv4(lastACK).Payload()).WindowSize()) << c.WindowScale // If thew new current window is close maxReceiveBufferSize then terminate // the loop. This can happen before all iterations are done due to timing // differences when running the test. @@ -7328,7 +7341,7 @@ func TestIncreaseWindowOnBufferResize(t *testing.T) { // Write chunks of ~30000 bytes. It's important that two // payloads make it equal or longer than MSS. - remain := rcvBuf * 2 + remain := rcvBuf sent := 0 data := make([]byte, defaultMTU/2) @@ -7343,7 +7356,6 @@ func TestIncreaseWindowOnBufferResize(t *testing.T) { }) sent += len(data) remain -= len(data) - checker.IPv4(t, c.GetPacket(), checker.PayloadLen(header.TCPMinimumSize), checker.TCP( diff --git a/test/syscalls/linux/tcp_socket.cc b/test/syscalls/linux/tcp_socket.cc index e0981e28a..9f522f833 100644 --- a/test/syscalls/linux/tcp_socket.cc +++ b/test/syscalls/linux/tcp_socket.cc @@ -903,6 +903,58 @@ TEST_P(SimpleTcpSocketTest, NonBlockingConnectNoListener) { EXPECT_EQ(err, ECONNREFUSED); } +TEST_P(SimpleTcpSocketTest, SelfConnectSendRecv_NoRandomSave) { + // Initialize address to the loopback one. + sockaddr_storage addr = + ASSERT_NO_ERRNO_AND_VALUE(InetLoopbackAddr(GetParam())); + socklen_t addrlen = sizeof(addr); + + const FileDescriptor s = + ASSERT_NO_ERRNO_AND_VALUE(Socket(GetParam(), SOCK_STREAM, IPPROTO_TCP)); + + ASSERT_THAT( + (bind)(s.get(), reinterpret_cast<struct sockaddr*>(&addr), addrlen), + SyscallSucceeds()); + // Get the bound port. + ASSERT_THAT( + getsockname(s.get(), reinterpret_cast<struct sockaddr*>(&addr), &addrlen), + SyscallSucceeds()); + ASSERT_THAT(RetryEINTR(connect)( + s.get(), reinterpret_cast<struct sockaddr*>(&addr), addrlen), + SyscallSucceeds()); + + constexpr int kBufSz = 1 << 20; // 1 MiB + std::vector<char> writebuf(kBufSz); + + // Start reading the response in a loop. + int read_bytes = 0; + ScopedThread t([&s, &read_bytes]() { + // Too many syscalls. + const DisableSave ds; + + char readbuf[2500] = {}; + int n = -1; + while (n != 0) { + ASSERT_THAT(n = RetryEINTR(read)(s.get(), &readbuf, sizeof(readbuf)), + SyscallSucceeds()); + read_bytes += n; + } + }); + + // Try to send the whole thing. + int n; + ASSERT_THAT(n = SendFd(s.get(), writebuf.data(), kBufSz, 0), + SyscallSucceeds()); + + // We should have written the whole thing. + EXPECT_EQ(n, kBufSz); + EXPECT_THAT(shutdown(s.get(), SHUT_WR), SyscallSucceedsWithValue(0)); + t.Join(); + + // We should have read the whole thing. + EXPECT_EQ(read_bytes, kBufSz); +} + TEST_P(SimpleTcpSocketTest, NonBlockingConnect) { const FileDescriptor listener = ASSERT_NO_ERRNO_AND_VALUE(Socket(GetParam(), SOCK_STREAM, IPPROTO_TCP)); |