diff options
author | Nayana Bidari <nybidari@google.com> | 2021-10-07 16:48:28 -0700 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2021-10-07 16:51:06 -0700 |
commit | 487651ac46f302592ccffc9e5a4336a331010e42 (patch) | |
tree | 3175111e0821c590f48fb4d4f469c5d7d2bf1de6 /pkg/tcpip | |
parent | 0743a862e5e90368291ad4ef9061f3c4ca3a065f (diff) |
Add a new metric to detect the number of spurious loss recoveries.
- Implements RFC 3522 (Eifel detection algorithm) to detect if the connection
entered loss recovery unnecessarily.
- Added a new metric to count the total number of spurious loss recoveries.
- Added tests to verify the new metric.
PiperOrigin-RevId: 401637359
Diffstat (limited to 'pkg/tcpip')
-rw-r--r-- | pkg/tcpip/stack/tcp.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/tcpip.go | 4 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 127 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_sack_test.go | 255 |
5 files changed, 391 insertions, 3 deletions
diff --git a/pkg/tcpip/stack/tcp.go b/pkg/tcpip/stack/tcp.go index dc7289441..a941091b0 100644 --- a/pkg/tcpip/stack/tcp.go +++ b/pkg/tcpip/stack/tcp.go @@ -289,6 +289,12 @@ type TCPSenderState struct { // RACKState holds the state related to RACK loss detection algorithm. RACKState TCPRACKState + + // RetransmitTS records the timestamp used to detect spurious recovery. + RetransmitTS uint32 + + // SpuriousRecovery indicates if the sender entered recovery spuriously. + SpuriousRecovery bool } // TCPSACKInfo holds TCP SACK related information for a given TCP endpoint. diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go index 893894da3..460a6afaf 100644 --- a/pkg/tcpip/tcpip.go +++ b/pkg/tcpip/tcpip.go @@ -1865,6 +1865,10 @@ type TCPStats struct { // SegmentsAckedWithDSACK is the number of segments acknowledged with // DSACK. SegmentsAckedWithDSACK *StatCounter + + // SpuriousRecovery is the number of times the connection entered loss + // recovery spuriously. + SpuriousRecovery *StatCounter } // UDPStats collects UDP-specific stats. diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index b60f9becf..6a798e980 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -2999,6 +2999,8 @@ func (e *endpoint) completeStateLocked() stack.TCPEndpointState { } s.Sender.RACKState = e.snd.rc.TCPRACKState + s.Sender.RetransmitTS = e.snd.retransmitTS + s.Sender.SpuriousRecovery = e.snd.spuriousRecovery return s } diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 2fabf1594..4377f07a0 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -144,6 +144,15 @@ type sender struct { // probeTimer and probeWaker are used to schedule PTO for RACK TLP algorithm. probeTimer timer `state:"nosave"` probeWaker sleep.Waker `state:"nosave"` + + // spuriousRecovery indicates whether the sender entered recovery + // spuriously as described in RFC3522 Section 3.2. + spuriousRecovery bool + + // retransmitTS is the timestamp at which the sender sends retransmitted + // segment after entering an RTO for the first time as described in + // RFC3522 Section 3.2. + retransmitTS uint32 } // rtt is a synchronization wrapper used to appease stateify. See the comment @@ -425,6 +434,13 @@ func (s *sender) retransmitTimerExpired() bool { return true } + // Initialize the variables used to detect spurious recovery after + // entering RTO. + // + // See: https://www.rfc-editor.org/rfc/rfc3522.html#section-3.2 Step 1. + s.spuriousRecovery = false + s.retransmitTS = 0 + // TODO(b/147297758): Band-aid fix, retransmitTimer can fire in some edge cases // when writeList is empty. Remove this once we have a proper fix for this // issue. @@ -495,6 +511,10 @@ func (s *sender) retransmitTimerExpired() bool { s.leaveRecovery() } + // Record retransmitTS if the sender is not in recovery as per: + // https://datatracker.ietf.org/doc/html/rfc3522#section-3.2 Step 2 + s.recordRetransmitTS() + s.state = tcpip.RTORecovery s.cc.HandleRTOExpired() @@ -958,6 +978,13 @@ func (s *sender) sendData() { } func (s *sender) enterRecovery() { + // Initialize the variables used to detect spurious recovery after + // entering recovery. + // + // See: https://www.rfc-editor.org/rfc/rfc3522.html#section-3.2 Step 1. + s.spuriousRecovery = false + s.retransmitTS = 0 + s.FastRecovery.Active = true // Save state to reflect we're now in fast recovery. // @@ -972,6 +999,11 @@ func (s *sender) enterRecovery() { s.FastRecovery.MaxCwnd = s.SndCwnd + s.Outstanding s.FastRecovery.HighRxt = s.SndUna s.FastRecovery.RescueRxt = s.SndUna + + // Record retransmitTS if the sender is not in recovery as per: + // https://datatracker.ietf.org/doc/html/rfc3522#section-3.2 Step 2 + s.recordRetransmitTS() + if s.ep.SACKPermitted { s.state = tcpip.SACKRecovery s.ep.stack.Stats().TCP.SACKRecovery.Increment() @@ -1147,13 +1179,15 @@ func (s *sender) isDupAck(seg *segment) bool { // Iterate the writeList and update RACK for each segment which is newly acked // either cumulatively or selectively. Loop through the segments which are // sacked, and update the RACK related variables and check for reordering. +// Returns true when the DSACK block has been detected in the received ACK. // // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 // steps 2 and 3. -func (s *sender) walkSACK(rcvdSeg *segment) { +func (s *sender) walkSACK(rcvdSeg *segment) bool { s.rc.setDSACKSeen(false) // Look for DSACK block. + hasDSACK := false idx := 0 n := len(rcvdSeg.parsedOptions.SACKBlocks) if checkDSACK(rcvdSeg) { @@ -1167,10 +1201,11 @@ func (s *sender) walkSACK(rcvdSeg *segment) { s.rc.setDSACKSeen(true) idx = 1 n-- + hasDSACK = true } if n == 0 { - return + return hasDSACK } // Sort the SACK blocks. The first block is the most recent unacked @@ -1193,6 +1228,7 @@ func (s *sender) walkSACK(rcvdSeg *segment) { seg = seg.Next() } } + return hasDSACK } // checkDSACK checks if a DSACK is reported. @@ -1239,6 +1275,85 @@ func checkDSACK(rcvdSeg *segment) bool { return false } +func (s *sender) recordRetransmitTS() { + // See: https://datatracker.ietf.org/doc/html/rfc3522#section-3.2 + // + // The Eifel detection algorithm is used, only upon initiation of loss + // recovery, i.e., when either the timeout-based retransmit or the fast + // retransmit is sent. The Eifel detection algorithm MUST NOT be + // reinitiated after loss recovery has already started. In particular, + // it must not be reinitiated upon subsequent timeouts for the same + // segment, and not upon retransmitting segments other than the oldest + // outstanding segment, e.g., during selective loss recovery. + if s.inRecovery() { + return + } + + // See: https://datatracker.ietf.org/doc/html/rfc3522#section-3.2 Step 2 + // + // Set a "RetransmitTS" variable to the value of the Timestamp Value + // field of the Timestamps option included in the retransmit sent when + // loss recovery is initiated. A TCP sender must ensure that + // RetransmitTS does not get overwritten as loss recovery progresses, + // e.g., in case of a second timeout and subsequent second retransmit of + // the same octet. + s.retransmitTS = s.ep.tsValNow() +} + +func (s *sender) detectSpuriousRecovery(hasDSACK bool, tsEchoReply uint32) { + // Return if the sender has already detected spurious recovery. + if s.spuriousRecovery { + return + } + + // See: https://datatracker.ietf.org/doc/html/rfc3522#section-3.2 Step 4 + // + // If the value of the Timestamp Echo Reply field of the acceptable ACK's + // Timestamps option is smaller than the value of RetransmitTS, then + // proceed to next step, else return. + if tsEchoReply >= s.retransmitTS { + return + } + + // See: https://datatracker.ietf.org/doc/html/rfc3522#section-3.2 Step 5 + // + // If the acceptable ACK carries a DSACK option [RFC2883], then return. + if hasDSACK { + return + } + + // See: https://datatracker.ietf.org/doc/html/rfc3522#section-3.2 Step 5 + // + // If during the lifetime of the TCP connection the TCP sender has + // previously received an ACK with a DSACK option, or the acceptable ACK + // does not acknowledge all outstanding data, then proceed to next step, + // else return. + numDSACK := s.ep.stack.Stats().TCP.SegmentsAckedWithDSACK.Value() + if numDSACK == 0 && s.SndUna == s.SndNxt { + return + } + + // See: https://datatracker.ietf.org/doc/html/rfc3522#section-3.2 Step 6 + // + // If the loss recovery has been initiated with a timeout-based + // retransmit, then set + // SpuriousRecovery <- SPUR_TO (equal 1), + // else set + // SpuriousRecovery <- dupacks+1 + // Set the spurious recovery variable to true as we do not differentiate + // between fast, SACK or RTO recovery. + s.spuriousRecovery = true + s.ep.stack.Stats().TCP.SpuriousRecovery.Increment() +} + +// Check if the sender is in RTORecovery, FastRecovery or SACKRecovery state. +func (s *sender) inRecovery() bool { + if s.state == tcpip.RTORecovery || s.state == tcpip.FastRecovery || s.state == tcpip.SACKRecovery { + return true + } + return false +} + // handleRcvdSegment is called when a segment is received; it is responsible for // updating the send-related state. func (s *sender) handleRcvdSegment(rcvdSeg *segment) { @@ -1254,6 +1369,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { } // Insert SACKBlock information into our scoreboard. + hasDSACK := false if s.ep.SACKPermitted { for _, sb := range rcvdSeg.parsedOptions.SACKBlocks { // Only insert the SACK block if the following holds @@ -1288,7 +1404,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { // RACK.fack, then the corresponding packet has been // reordered and RACK.reord is set to TRUE. if s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 { - s.walkSACK(rcvdSeg) + hasDSACK = s.walkSACK(rcvdSeg) } s.SetPipe() } @@ -1418,6 +1534,11 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { // Clear SACK information for all acked data. s.ep.scoreboard.Delete(s.SndUna) + // Detect if the sender entered recovery spuriously. + if s.inRecovery() { + s.detectSpuriousRecovery(hasDSACK, rcvdSeg.parsedOptions.TSEcr) + } + // If we are not in fast recovery then update the congestion // window based on the number of acknowledged packets. if !s.FastRecovery.Active { diff --git a/pkg/tcpip/transport/tcp/tcp_sack_test.go b/pkg/tcpip/transport/tcp/tcp_sack_test.go index 6255355bb..896249d2d 100644 --- a/pkg/tcpip/transport/tcp/tcp_sack_test.go +++ b/pkg/tcpip/transport/tcp/tcp_sack_test.go @@ -23,6 +23,7 @@ import ( "time" "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/checker" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/seqnum" "gvisor.dev/gvisor/pkg/tcpip/stack" @@ -702,3 +703,257 @@ func TestRecoveryEntry(t *testing.T) { t.Error(err) } } + +func verifySpuriousRecoveryMetric(t *testing.T, c *context.Context, numSpuriousRecovery uint64) { + t.Helper() + + metricPollFn := func() error { + tcpStats := c.Stack().Stats().TCP + stats := []struct { + stat *tcpip.StatCounter + name string + want uint64 + }{ + {tcpStats.SpuriousRecovery, "stats.TCP.SpuriousRecovery", numSpuriousRecovery}, + } + for _, s := range stats { + if got, want := s.stat.Value(), s.want; got != want { + return fmt.Errorf("got %s.Value() = %d, want = %d", s.name, got, want) + } + } + return nil + } + + if err := testutil.Poll(metricPollFn, 1*time.Second); err != nil { + t.Error(err) + } +} + +func checkReceivedPacket(t *testing.T, c *context.Context, tcpHdr header.TCP, bytesRead uint32, b, data []byte) { + payloadLen := uint32(len(tcpHdr.Payload())) + checker.IPv4(t, b, + checker.TCP( + checker.DstPort(context.TestPort), + checker.TCPSeqNum(uint32(c.IRS)+1+bytesRead), + checker.TCPAckNum(context.TestInitialSequenceNumber+1), + checker.TCPFlagsMatch(header.TCPFlagAck, ^header.TCPFlagPsh), + ), + ) + pdata := data[bytesRead : bytesRead+payloadLen] + if p := tcpHdr.Payload(); !bytes.Equal(pdata, p) { + t.Fatalf("got data = %v, want = %v", p, pdata) + } +} + +func buildTSOptionFromHeader(tcpHdr header.TCP) []byte { + parsedOpts := tcpHdr.ParsedOptions() + tsOpt := [12]byte{header.TCPOptionNOP, header.TCPOptionNOP} + header.EncodeTSOption(parsedOpts.TSEcr+1, parsedOpts.TSVal, tsOpt[2:]) + return tsOpt[:] +} + +func TestDetectSpuriousRecoveryWithRTO(t *testing.T) { + c := context.New(t, uint32(mtu)) + defer c.Cleanup() + + probeDone := make(chan struct{}) + c.Stack().AddTCPProbe(func(s stack.TCPEndpointState) { + if s.Sender.RetransmitTS == 0 { + t.Fatalf("RetransmitTS did not get updated, got: 0 want > 0") + } + if !s.Sender.SpuriousRecovery { + t.Fatalf("Spurious recovery was not detected") + } + close(probeDone) + }) + + setStackSACKPermitted(t, c, true) + createConnectedWithSACKAndTS(c) + numPackets := 5 + data := make([]byte, numPackets*maxPayload) + for i := range data { + data[i] = byte(i) + } + // Write the data. + var r bytes.Reader + r.Reset(data) + if _, err := c.EP.Write(&r, tcpip.WriteOptions{}); err != nil { + t.Fatalf("Write failed: %s", err) + } + + var options []byte + var bytesRead uint32 + for i := 0; i < numPackets; i++ { + b := c.GetPacket() + tcpHdr := header.TCP(header.IPv4(b).Payload()) + checkReceivedPacket(t, c, tcpHdr, bytesRead, b, data) + + // Get options only for the first packet. This will be sent with + // the ACK to indicate the acknowledgement is for the original + // packet. + if i == 0 && c.TimeStampEnabled { + options = buildTSOptionFromHeader(tcpHdr) + } + bytesRead += uint32(len(tcpHdr.Payload())) + } + + seq := seqnum.Value(context.TestInitialSequenceNumber).Add(1) + // Expect #5 segment with TLP. + c.ReceiveAndCheckPacketWithOptions(data, 4*maxPayload, maxPayload, tsOptionSize) + + // Expect #1 segment because of RTO. + c.ReceiveAndCheckPacketWithOptions(data, 0, maxPayload, tsOptionSize) + + info := tcpip.TCPInfoOption{} + if err := c.EP.GetSockOpt(&info); err != nil { + t.Fatalf("c.EP.GetSockOpt(&%T) = %s", info, err) + } + + if info.CcState != tcpip.RTORecovery { + t.Fatalf("Loss recovery did not happen, got: %v want: %v", info.CcState, tcpip.RTORecovery) + } + + // Acknowledge the data. + rcvWnd := seqnum.Size(30000) + c.SendPacket(nil, &context.Headers{ + SrcPort: context.TestPort, + DstPort: c.Port, + Flags: header.TCPFlagAck, + SeqNum: seq, + AckNum: c.IRS.Add(1 + seqnum.Size(maxPayload)), + RcvWnd: rcvWnd, + TCPOpts: options, + }) + + // Wait for the probe function to finish processing the + // ACK before the test completes. + <-probeDone + + verifySpuriousRecoveryMetric(t, c, 1 /* numSpuriousRecovery */) +} + +func TestSACKDetectSpuriousRecoveryWithDupACK(t *testing.T) { + c := context.New(t, uint32(mtu)) + defer c.Cleanup() + + numAck := 0 + probeDone := make(chan struct{}) + c.Stack().AddTCPProbe(func(s stack.TCPEndpointState) { + if numAck < 3 { + numAck++ + return + } + + if s.Sender.RetransmitTS == 0 { + t.Fatalf("RetransmitTS did not get updated, got: 0 want > 0") + } + if !s.Sender.SpuriousRecovery { + t.Fatalf("Spurious recovery was not detected") + } + close(probeDone) + }) + + setStackSACKPermitted(t, c, true) + createConnectedWithSACKAndTS(c) + numPackets := 5 + data := make([]byte, numPackets*maxPayload) + for i := range data { + data[i] = byte(i) + } + // Write the data. + var r bytes.Reader + r.Reset(data) + if _, err := c.EP.Write(&r, tcpip.WriteOptions{}); err != nil { + t.Fatalf("Write failed: %s", err) + } + + var options []byte + var bytesRead uint32 + for i := 0; i < numPackets; i++ { + b := c.GetPacket() + tcpHdr := header.TCP(header.IPv4(b).Payload()) + checkReceivedPacket(t, c, tcpHdr, bytesRead, b, data) + + // Get options only for the first packet. This will be sent with + // the ACK to indicate the acknowledgement is for the original + // packet. + if i == 0 && c.TimeStampEnabled { + options = buildTSOptionFromHeader(tcpHdr) + } + bytesRead += uint32(len(tcpHdr.Payload())) + } + + // Receive the retransmitted packet after TLP. + c.ReceiveAndCheckPacketWithOptions(data, 4*maxPayload, maxPayload, tsOptionSize) + + seq := seqnum.Value(context.TestInitialSequenceNumber).Add(1) + // Send ACK for #3 and #4 segments to avoid entering TLP. + start := c.IRS.Add(3*maxPayload + 1) + end := start.Add(2 * maxPayload) + c.SendAckWithSACK(seq, 0, []header.SACKBlock{{start, end}}) + + c.SendAck(seq, 0 /* bytesReceived */) + c.SendAck(seq, 0 /* bytesReceived */) + + // Receive the retransmitted packet after three duplicate ACKs. + c.ReceiveAndCheckPacketWithOptions(data, 0, maxPayload, tsOptionSize) + + info := tcpip.TCPInfoOption{} + if err := c.EP.GetSockOpt(&info); err != nil { + t.Fatalf("c.EP.GetSockOpt(&%T) = %s", info, err) + } + + if info.CcState != tcpip.SACKRecovery { + t.Fatalf("Loss recovery did not happen, got: %v want: %v", info.CcState, tcpip.SACKRecovery) + } + + // Acknowledge the data. + rcvWnd := seqnum.Size(30000) + c.SendPacket(nil, &context.Headers{ + SrcPort: context.TestPort, + DstPort: c.Port, + Flags: header.TCPFlagAck, + SeqNum: seq, + AckNum: c.IRS.Add(1 + seqnum.Size(maxPayload)), + RcvWnd: rcvWnd, + TCPOpts: options, + }) + + // Wait for the probe function to finish processing the + // ACK before the test completes. + <-probeDone + + verifySpuriousRecoveryMetric(t, c, 1 /* numSpuriousRecovery */) +} + +func TestNoSpuriousRecoveryWithDSACK(t *testing.T) { + c := context.New(t, uint32(mtu)) + defer c.Cleanup() + setStackSACKPermitted(t, c, true) + createConnectedWithSACKAndTS(c) + numPackets := 5 + data := sendAndReceiveWithSACK(t, c, numPackets, true /* enableRACK */) + + // Receive the retransmitted packet after TLP. + c.ReceiveAndCheckPacketWithOptions(data, 4*maxPayload, maxPayload, tsOptionSize) + + // Send ACK for #3 and #4 segments to avoid entering TLP. + start := c.IRS.Add(3*maxPayload + 1) + end := start.Add(2 * maxPayload) + seq := seqnum.Value(context.TestInitialSequenceNumber).Add(1) + c.SendAckWithSACK(seq, 0, []header.SACKBlock{{start, end}}) + + c.SendAck(seq, 0 /* bytesReceived */) + c.SendAck(seq, 0 /* bytesReceived */) + + // Receive the retransmitted packet after three duplicate ACKs. + c.ReceiveAndCheckPacketWithOptions(data, 0, maxPayload, tsOptionSize) + + // Acknowledge the data with DSACK for #1 segment. + start = c.IRS.Add(maxPayload + 1) + end = start.Add(2 * maxPayload) + seq = seqnum.Value(context.TestInitialSequenceNumber).Add(1) + c.SendAckWithSACK(seq, 6*maxPayload, []header.SACKBlock{{start, end}}) + + verifySpuriousRecoveryMetric(t, c, 0 /* numSpuriousRecovery */) +} |