diff options
author | Nayana Bidari <nybidari@google.com> | 2021-02-08 12:08:11 -0800 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2021-02-08 12:09:54 -0800 |
commit | fe63db2e96069140a3e02a0dc7b7e28af9b463db (patch) | |
tree | c6ac9e4e6f0f8df01eb1f623cff853d013fd886f /pkg/tcpip/transport/tcp | |
parent | 3853a94f10b736e0c7b65766683ebe90b400d15b (diff) |
RACK: Detect loss
Detect packet loss using reorder window and re-transmit them after the reorder
timer expires.
PiperOrigin-RevId: 356321786
Diffstat (limited to 'pkg/tcpip/transport/tcp')
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 9 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rack.go | 115 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rack_state.go | 5 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 52 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd_state.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_rack_test.go | 47 |
7 files changed, 207 insertions, 26 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 34a631b53..461b1a9d7 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1301,7 +1301,8 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ // e.mu is expected to be hold upon entering this section. if e.snd != nil { e.snd.resendTimer.cleanup() - e.snd.rc.probeTimer.cleanup() + e.snd.probeTimer.cleanup() + e.snd.reorderTimer.cleanup() } if closeTimer != nil { @@ -1396,7 +1397,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ }, }, { - w: &e.snd.rc.probeWaker, + w: &e.snd.probeWaker, f: e.snd.probeTimerExpired, }, { @@ -1475,6 +1476,10 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ return nil }, }, + { + w: &e.snd.reorderWaker, + f: e.snd.rc.reorderTimerExpired, + }, } // Initialize the sleeper based on the wakers in funcs. diff --git a/pkg/tcpip/transport/tcp/rack.go b/pkg/tcpip/transport/tcp/rack.go index e862f159e..9959b60b8 100644 --- a/pkg/tcpip/transport/tcp/rack.go +++ b/pkg/tcpip/transport/tcp/rack.go @@ -17,7 +17,6 @@ package tcp import ( "time" - "gvisor.dev/gvisor/pkg/sleep" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/seqnum" ) @@ -50,7 +49,8 @@ type rackControl struct { // dsackSeen indicates if the connection has seen a DSACK. dsackSeen bool - // endSequence is the ending TCP sequence number of rackControl.seg. + // endSequence is the ending TCP sequence number of the most recent + // acknowledged segment. endSequence seqnum.Value // exitedRecovery indicates if the connection is exiting loss recovery. @@ -90,13 +90,10 @@ type rackControl struct { // rttSeq is the SND.NXT when rtt is updated. rttSeq seqnum.Value - // xmitTime is the latest transmission timestamp of rackControl.seg. + // xmitTime is the latest transmission timestamp of the most recent + // acknowledged segment. xmitTime time.Time `state:".(unixTime)"` - // probeTimer and probeWaker are used to schedule PTO for RACK TLP algorithm. - probeTimer timer `state:"nosave"` - probeWaker sleep.Waker `state:"nosave"` - // tlpRxtOut indicates whether there is an unacknowledged // TLP retransmission. tlpRxtOut bool @@ -114,7 +111,6 @@ func (rc *rackControl) init(snd *sender, iss seqnum.Value) { rc.fack = iss rc.reoWndIncr = 1 rc.snd = snd - rc.probeTimer.init(&rc.probeWaker) } // update will update the RACK related fields when an ACK has been received. @@ -223,13 +219,13 @@ func (s *sender) schedulePTO() { s.resendTimer.disable() } - s.rc.probeTimer.enable(pto) + s.probeTimer.enable(pto) } // probeTimerExpired is the same as TLP_send_probe() as defined in // https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.2. func (s *sender) probeTimerExpired() tcpip.Error { - if !s.rc.probeTimer.checkExpiration() { + if !s.probeTimer.checkExpiration() { return nil } @@ -386,3 +382,102 @@ func (rc *rackControl) updateRACKReorderWindow(ackSeg *segment) { func (rc *rackControl) exitRecovery() { rc.exitedRecovery = true } + +// detectLoss marks the segment as lost if the reordering window has elapsed +// and the ACK is not received. It will also arm the reorder timer. +// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 Step 5. +func (rc *rackControl) detectLoss(rcvTime time.Time) int { + var timeout time.Duration + numLost := 0 + for seg := rc.snd.writeList.Front(); seg != nil && seg.xmitCount != 0; seg = seg.Next() { + if rc.snd.ep.scoreboard.IsSACKED(seg.sackBlock()) { + continue + } + + if seg.lost && seg.xmitCount == 1 { + numLost++ + continue + } + + endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size())) + if seg.xmitTime.Before(rc.xmitTime) || (seg.xmitTime.Equal(rc.xmitTime) && rc.endSequence.LessThan(endSeq)) { + timeRemaining := seg.xmitTime.Sub(rcvTime) + rc.rtt + rc.reoWnd + if timeRemaining <= 0 { + seg.lost = true + numLost++ + } else if timeRemaining > timeout { + timeout = timeRemaining + } + } + } + + if timeout != 0 && !rc.snd.reorderTimer.enabled() { + rc.snd.reorderTimer.enable(timeout) + } + return numLost +} + +// reorderTimerExpired will retransmit the segments which have not been acked +// before the reorder timer expired. +func (rc *rackControl) reorderTimerExpired() tcpip.Error { + // Check if the timer actually expired or if it's a spurious wake due + // to a previously orphaned runtime timer. + if !rc.snd.reorderTimer.checkExpiration() { + return nil + } + + numLost := rc.detectLoss(time.Now()) + if numLost == 0 { + return nil + } + + fastRetransmit := false + if !rc.snd.fr.active { + rc.snd.cc.HandleLossDetected() + rc.snd.enterRecovery() + fastRetransmit = true + } + + rc.DoRecovery(nil, fastRetransmit) + return nil +} + +// DoRecovery implements lossRecovery.DoRecovery. +func (rc *rackControl) DoRecovery(_ *segment, fastRetransmit bool) { + snd := rc.snd + if fastRetransmit { + snd.resendSegment() + } + + var dataSent bool + // Iterate the writeList and retransmit the segments which are marked + // as lost by RACK. + for seg := snd.writeList.Front(); seg != nil && seg.xmitCount > 0; seg = seg.Next() { + if seg == snd.writeNext { + break + } + + if !seg.lost { + continue + } + + // Reset seg.lost as it is already SACKed. + if snd.ep.scoreboard.IsSACKED(seg.sackBlock()) { + seg.lost = false + continue + } + + // Check the congestion window after entering recovery. + if snd.outstanding >= snd.sndCwnd { + break + } + + snd.outstanding++ + dataSent = true + snd.sendSegment(seg) + } + + // Rearm the RTO. + snd.resendTimer.enable(snd.rto) + snd.postXmit(dataSent) +} diff --git a/pkg/tcpip/transport/tcp/rack_state.go b/pkg/tcpip/transport/tcp/rack_state.go index 76cad0831..c9dc7e773 100644 --- a/pkg/tcpip/transport/tcp/rack_state.go +++ b/pkg/tcpip/transport/tcp/rack_state.go @@ -27,8 +27,3 @@ func (rc *rackControl) saveXmitTime() unixTime { func (rc *rackControl) loadXmitTime(unix unixTime) { rc.xmitTime = time.Unix(unix.second, unix.nano) } - -// afterLoad is invoked by stateify. -func (rc *rackControl) afterLoad() { - rc.probeTimer.init(&rc.probeWaker) -} diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go index 7cca4def5..f27eef6a9 100644 --- a/pkg/tcpip/transport/tcp/segment.go +++ b/pkg/tcpip/transport/tcp/segment.go @@ -83,6 +83,9 @@ type segment struct { // dataMemSize is the memory used by data initially. dataMemSize int + + // lost indicates if the segment is marked as lost by RACK. + lost bool } func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) *segment { diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 063a7086a..d6365b93d 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -191,6 +191,15 @@ type sender struct { // rc has the fields needed for implementing RACK loss detection // algorithm. rc rackControl + + // reorderTimer is the timer used to retransmit the segments after RACK + // detects them as lost. + reorderTimer timer `state:"nosave"` + reorderWaker sleep.Waker `state:"nosave"` + + // probeTimer and probeWaker are used to schedule PTO for RACK TLP algorithm. + probeTimer timer `state:"nosave"` + probeWaker sleep.Waker `state:"nosave"` } // rtt is a synchronization wrapper used to appease stateify. See the comment @@ -267,7 +276,6 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint } s.cc = s.initCongestionControl(ep.cc) - s.lr = s.initLossRecovery() s.rc.init(s, iss) @@ -278,6 +286,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint } s.resendTimer.init(&s.resendWaker) + s.reorderTimer.init(&s.reorderWaker) + s.probeTimer.init(&s.probeWaker) s.updateMaxPayloadSize(int(ep.route.MTU()), 0) @@ -1126,6 +1136,15 @@ func (s *sender) SetPipe() { func (s *sender) detectLoss(seg *segment) (fastRetransmit bool) { // We're not in fast recovery yet. + // If RACK is enabled and there is no reordering we should honor the + // three duplicate ACK rule to enter recovery. + // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-4 + if s.ep.sackPermitted && s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 { + if s.rc.reorderSeen { + return false + } + } + if !s.isDupAck(seg) { s.dupAckCount = 0 return false @@ -1462,6 +1481,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { if s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 { s.rc.exitRecovery() } + s.reorderTimer.disable() } } @@ -1481,21 +1501,36 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { // Reset firstRetransmittedSegXmitTime to the zero value. s.firstRetransmittedSegXmitTime = time.Time{} s.resendTimer.disable() - s.rc.probeTimer.disable() + s.probeTimer.disable() } } - // Update RACK reorder window. - // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 - // * Upon receiving an ACK: - // * Step 4: Update RACK reordering window - if s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 { + if s.ep.sackPermitted && s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 { + // Update RACK reorder window. + // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 + // * Upon receiving an ACK: + // * Step 4: Update RACK reordering window s.rc.updateRACKReorderWindow(rcvdSeg) + + // After the reorder window is calculated, detect any loss by checking + // if the time elapsed after the segments are sent is greater than the + // reorder window. + if numLost := s.rc.detectLoss(rcvdSeg.rcvdTime); numLost > 0 && !s.fr.active { + // If any segment is marked as lost by + // RACK, enter recovery and retransmit + // the lost segments. + s.cc.HandleLossDetected() + s.enterRecovery() + } + + if s.fr.active { + s.rc.DoRecovery(nil, true) + } } // Now that we've popped all acknowledged data from the retransmit // queue, retransmit if needed. - if s.fr.active { + if s.fr.active && s.ep.tcpRecovery&tcpip.TCPRACKLossDetection == 0 { s.lr.DoRecovery(rcvdSeg, fastRetransmit) // When SACK is enabled data sending is governed by steps in // RFC 6675 Section 5 recovery steps A-C. @@ -1523,6 +1558,7 @@ func (s *sender) sendSegment(seg *segment) tcpip.Error { } seg.xmitTime = time.Now() seg.xmitCount++ + seg.lost = false err := s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber) // Every time a packet containing data is sent (including a diff --git a/pkg/tcpip/transport/tcp/snd_state.go b/pkg/tcpip/transport/tcp/snd_state.go index 8b20c3455..ba41cff6d 100644 --- a/pkg/tcpip/transport/tcp/snd_state.go +++ b/pkg/tcpip/transport/tcp/snd_state.go @@ -47,6 +47,8 @@ func (s *sender) loadRttMeasureTime(unix unixTime) { // afterLoad is invoked by stateify. func (s *sender) afterLoad() { s.resendTimer.init(&s.resendWaker) + s.reorderTimer.init(&s.reorderWaker) + s.probeTimer.init(&s.probeWaker) } // saveFirstRetransmittedSegXmitTime is invoked by stateify. diff --git a/pkg/tcpip/transport/tcp/tcp_rack_test.go b/pkg/tcpip/transport/tcp/tcp_rack_test.go index 78db742a9..6da981d80 100644 --- a/pkg/tcpip/transport/tcp/tcp_rack_test.go +++ b/pkg/tcpip/transport/tcp/tcp_rack_test.go @@ -25,6 +25,7 @@ import ( "gvisor.dev/gvisor/pkg/tcpip/seqnum" "gvisor.dev/gvisor/pkg/tcpip/stack" "gvisor.dev/gvisor/pkg/tcpip/transport/tcp/testing/context" + "gvisor.dev/gvisor/pkg/test/testutil" ) const ( @@ -99,6 +100,8 @@ func TestRACKDetectReorder(t *testing.T) { c := context.New(t, uint32(mtu)) defer c.Cleanup() + t.Skipf("Skipping this test as reorder detection does not consider DSACK.") + var n int const ackNumToVerify = 2 probeDone := make(chan struct{}) @@ -591,7 +594,6 @@ func TestRACKCheckReorderWindow(t *testing.T) { c.SendAck(seq, bytesRead) // Missing [2-6] packets and SACK #7 packet. - seq = seqnum.Value(context.TestInitialSequenceNumber).Add(1) start := c.IRS.Add(1 + seqnum.Size(6*maxPayload)) end := start.Add(seqnum.Size(maxPayload)) c.SendAckWithSACK(seq, bytesRead, []header.SACKBlock{{start, end}}) @@ -607,3 +609,46 @@ func TestRACKCheckReorderWindow(t *testing.T) { t.Fatalf("unexpected values for RACK variables: %v", err) } } + +func TestRACKWithDuplicateACK(t *testing.T) { + c := context.New(t, uint32(mtu)) + defer c.Cleanup() + + const numPackets = 4 + data := sendAndReceive(t, c, numPackets) + + // Send three duplicate ACKs to trigger fast recovery. + seq := seqnum.Value(context.TestInitialSequenceNumber).Add(1) + start := c.IRS.Add(1 + seqnum.Size(maxPayload)) + end := start.Add(seqnum.Size(maxPayload)) + for i := 0; i < 3; i++ { + c.SendAckWithSACK(seq, maxPayload, []header.SACKBlock{{start, end}}) + end = end.Add(seqnum.Size(maxPayload)) + } + + // Receive the retransmitted packet. + c.ReceiveAndCheckPacketWithOptions(data, maxPayload, maxPayload, tsOptionSize) + + metricPollFn := func() error { + tcpStats := c.Stack().Stats().TCP + stats := []struct { + stat *tcpip.StatCounter + name string + want uint64 + }{ + {tcpStats.FastRetransmit, "stats.TCP.FastRetransmit", 1}, + {tcpStats.SACKRecovery, "stats.TCP.SACKRecovery", 1}, + {tcpStats.FastRecovery, "stats.TCP.FastRecovery", 0}, + } + for _, s := range stats { + if got, want := s.stat.Value(), s.want; got != want { + return fmt.Errorf("got %s.Value() = %d, want = %d", s.name, got, want) + } + } + return nil + } + + if err := testutil.Poll(metricPollFn, 1*time.Second); err != nil { + t.Error(err) + } +} |