From 08f4846ebe79236ded6f4bc816b0a776afe14463 Mon Sep 17 00:00:00 2001 From: Bhasker Hariharan Date: Thu, 7 May 2020 10:24:47 -0700 Subject: Fix bugs in SACK recovery. Every call to sender.NextSeg does not need to iterate from the front of the writeList as in a given recovery episode we can cache the last nextSeg returned. There cannot be a lower sequenced segment that matches the next call to NextSeg as otherwise we would have returned that instead in the previous call. This fixes the issue of excessive CPU usage w/ large send buffers where we spend a lot of time iterating from the front of the list on every NextSeg invocation. Further the following other bugs were also fixed: * Iteration of segments never sent in NextSeg() when looking for segments for retransmission that match step1/3/4 of the NextSeg algorithm * Correctly setting rescueRxt only if the rescue segment was actually sent. * Correctly initializing rescueRxt/highRxt when entering SACK recovery. * Correctly re-arming the timer only on retransmissions when SACK is in use and not for every segment being sent as it was being done before. * Copy over xmitTime and xmitCount on segment clone. * Move writeNext along when skipping over SACKED segments. This is required to prevent spurious retransmissions where we end up retransmitting data that was never lost. PiperOrigin-RevId: 310387671 --- pkg/tcpip/transport/tcp/segment.go | 2 + pkg/tcpip/transport/tcp/snd.go | 208 ++++++++++++++++++++++--------------- 2 files changed, 127 insertions(+), 83 deletions(-) diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go index 7712ce652..074edded6 100644 --- a/pkg/tcpip/transport/tcp/segment.go +++ b/pkg/tcpip/transport/tcp/segment.go @@ -96,6 +96,8 @@ func (s *segment) clone() *segment { route: s.route.Clone(), viewToDeliver: s.viewToDeliver, rcvdTime: s.rcvdTime, + xmitTime: s.xmitTime, + xmitCount: s.xmitCount, } t.data = s.data.Clone(t.views[:]) return t diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index a3018914b..9e547a221 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -598,22 +598,34 @@ func (s *sender) splitSeg(seg *segment, size int) { seg.data.CapLength(size) } -// NextSeg implements the RFC6675 NextSeg() operation. It returns segments that -// match rule 1, 3 and 4 of the NextSeg() operation defined in RFC6675. Rule 2 -// is handled by the normal send logic. -func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) { +// NextSeg implements the RFC6675 NextSeg() operation. +// +// NextSeg starts scanning the writeList starting from nextSegHint and returns +// the hint to be passed on the next call to NextSeg. This is required to avoid +// iterating the write list repeatedly when NextSeg is invoked in a loop during +// recovery. The returned hint will be nil if there are no more segments that +// can match rules defined by NextSeg operation in RFC6675. +// +// rescueRtx will be true only if nextSeg is a rescue retransmission as +// described by Step 4) of the NextSeg algorithm. +func (s *sender) NextSeg(nextSegHint *segment) (nextSeg, hint *segment, rescueRtx bool) { var s3 *segment var s4 *segment - smss := s.ep.scoreboard.SMSS() // Step 1. - for seg := s.writeList.Front(); seg != nil; seg = seg.Next() { - if !s.isAssignedSequenceNumber(seg) { + for seg := nextSegHint; seg != nil; seg = seg.Next() { + // Stop iteration if we hit a segment that has never been + // transmitted (i.e. either it has no assigned sequence number + // or if it does have one, it's >= the next sequence number + // to be sent [i.e. >= s.sndNxt]). + if !s.isAssignedSequenceNumber(seg) || s.sndNxt.LessThanEq(seg.sequenceNumber) { + hint = nil break } segSeq := seg.sequenceNumber - if seg.data.Size() > int(smss) { + if smss := s.ep.scoreboard.SMSS(); seg.data.Size() > int(smss) { s.splitSeg(seg, int(smss)) } + // See RFC 6675 Section 4 // // 1. If there exists a smallest unSACKED sequence number @@ -630,8 +642,9 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) { // NextSeg(): // (1.c) IsLost(S2) returns true. if s.ep.scoreboard.IsLost(segSeq) { - return seg, s3, s4 + return seg, seg.Next(), false } + // NextSeg(): // // (3): If the conditions for rules (1) and (2) @@ -643,6 +656,7 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) { // SHOULD be returned. if s3 == nil { s3 = seg + hint = seg.Next() } } // NextSeg(): @@ -651,10 +665,12 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) { // but there exists outstanding unSACKED data, we // provide the opportunity for a single "rescue" // retransmission per entry into loss recovery. If - // HighACK is greater than RescueRxt, the one - // segment of upto SMSS octects that MUST include - // the highest outstanding unSACKed sequence number - // SHOULD be returned. + // HighACK is greater than RescueRxt (or RescueRxt + // is undefined), then one segment of upto SMSS + // octects that MUST include the highest outstanding + // unSACKed sequence number SHOULD be returned, and + // RescueRxt set to RecoveryPoint. HighRxt MUST NOT + // be updated. if s.fr.rescueRxt.LessThan(s.sndUna - 1) { if s4 != nil { if s4.sequenceNumber.LessThan(segSeq) { @@ -663,12 +679,31 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) { } else { s4 = seg } - s.fr.rescueRxt = s.fr.last } } } - return nil, s3, s4 + // If we got here then no segment matched step (1). + // Step (2): "If no sequence number 'S2' per rule (1) + // exists but there exists available unsent data and the + // receiver's advertised window allows, the sequence + // range of one segment of up to SMSS octets of + // previously unsent data starting with sequence number + // HighData+1 MUST be returned." + for seg := s.writeNext; seg != nil; seg = seg.Next() { + if s.isAssignedSequenceNumber(seg) && seg.sequenceNumber.LessThan(s.sndNxt) { + continue + } + // We do not split the segment here to <= smss as it has + // potentially not been assigned a sequence number yet. + return seg, nil, false + } + + if s3 != nil { + return s3, hint, false + } + + return s4, nil, true } // maybeSendSegment tries to send the specified segment and either coalesces @@ -792,64 +827,47 @@ func (s *sender) maybeSendSegment(seg *segment, limit int, end seqnum.Value) (se // section 5, step C. func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool) { s.SetPipe() + + if smss := int(s.ep.scoreboard.SMSS()); limit > smss { + // Cap segment size limit to s.smss as SACK recovery requires + // that all retransmissions or new segments send during recovery + // be of <= SMSS. + limit = smss + } + + nextSegHint := s.writeList.Front() for s.outstanding < s.sndCwnd { - nextSeg, s3, s4 := s.NextSeg() - if nextSeg == nil { - // NextSeg(): - // - // Step (2): "If no sequence number 'S2' per rule (1) - // exists but there exists available unsent data and the - // receiver's advertised window allows, the sequence - // range of one segment of up to SMSS octets of - // previously unsent data starting with sequence number - // HighData+1 MUST be returned." - for seg := s.writeNext; seg != nil; seg = seg.Next() { - if s.isAssignedSequenceNumber(seg) && seg.sequenceNumber.LessThan(s.sndNxt) { - continue - } - // Step C.3 described below is handled by - // maybeSendSegment which increments sndNxt when - // a segment is transmitted. - // - // Step C.3 "If any of the data octets sent in - // (C.1) are above HighData, HighData must be - // updated to reflect the transmission of - // previously unsent data." - if sent := s.maybeSendSegment(seg, limit, end); !sent { - break - } - dataSent = true - s.outstanding++ - s.writeNext = seg.Next() - nextSeg = seg - break - } - if nextSeg != nil { - continue - } - } - rescueRtx := false - if nextSeg == nil && s3 != nil { - nextSeg = s3 - } - if nextSeg == nil && s4 != nil { - nextSeg = s4 - rescueRtx = true - } + var nextSeg *segment + var rescueRtx bool + nextSeg, nextSegHint, rescueRtx = s.NextSeg(nextSegHint) if nextSeg == nil { - break + return dataSent } - segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen()) - if !rescueRtx && nextSeg.sequenceNumber.LessThan(s.sndNxt) { - // RFC 6675, Step C.2 + if !s.isAssignedSequenceNumber(nextSeg) || s.sndNxt.LessThanEq(nextSeg.sequenceNumber) { + // New data being sent. + + // Step C.3 described below is handled by + // maybeSendSegment which increments sndNxt when + // a segment is transmitted. // - // "If any of the data octets sent in (C.1) are below - // HighData, HighRxt MUST be set to the highest sequence - // number of the retransmitted segment unless NextSeg () - // rule (4) was invoked for this retransmission." - s.fr.highRxt = segEnd - 1 + // Step C.3 "If any of the data octets sent in + // (C.1) are above HighData, HighData must be + // updated to reflect the transmission of + // previously unsent data." + // + // We pass s.smss as the limit as the Step 2) requires that + // new data sent should be of size s.smss or less. + if sent := s.maybeSendSegment(nextSeg, limit, end); !sent { + return dataSent + } + dataSent = true + s.outstanding++ + s.writeNext = nextSeg.Next() + continue } + // Now handle the retransmission case where we matched either step 1,3 or 4 + // of the NextSeg algorithm. // RFC 6675, Step C.4. // // "The estimate of the amount of data outstanding in the network @@ -858,6 +876,22 @@ func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool) s.outstanding++ dataSent = true s.sendSegment(nextSeg) + + segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen()) + if rescueRtx { + // We do the last part of rule (4) of NextSeg here to update + // RescueRxt as until this point we don't know if we are going + // to use the rescue transmission. + s.fr.rescueRxt = s.fr.last + } else { + // RFC 6675, Step C.2 + // + // "If any of the data octets sent in (C.1) are below + // HighData, HighRxt MUST be set to the highest sequence + // number of the retransmitted segment unless NextSeg () + // rule (4) was invoked for this retransmission." + s.fr.highRxt = segEnd - 1 + } } return dataSent } @@ -903,7 +937,7 @@ func (s *sender) sendData() { // "A TCP SHOULD set cwnd to no more than RW before beginning // transmission if the TCP has not sent data in the interval exceeding // the retrasmission timeout." - if !s.fr.active && time.Now().Sub(s.lastSendTime) > s.rto { + if !s.fr.active && s.state != RTORecovery && time.Now().Sub(s.lastSendTime) > s.rto { if s.sndCwnd > InitialCwnd { s.sndCwnd = InitialCwnd } @@ -921,6 +955,9 @@ func (s *sender) sendData() { limit = cwndLimit } if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) { + // Move writeNext along so that we don't try and scan data that + // has already been SACKED. + s.writeNext = seg.Next() continue } if sent := s.maybeSendSegment(seg, limit, end); !sent { @@ -966,6 +1003,8 @@ func (s *sender) enterFastRecovery() { s.fr.first = s.sndUna s.fr.last = s.sndNxt - 1 s.fr.maxCwnd = s.sndCwnd + s.outstanding + s.fr.highRxt = s.sndUna + s.fr.rescueRxt = s.sndUna if s.ep.sackPermitted { s.state = SACKRecovery s.ep.stack.Stats().TCP.SACKRecovery.Increment() @@ -1258,6 +1297,7 @@ func (s *sender) handleRcvdSegment(seg *segment) { if s.writeNext == seg { s.writeNext = seg.Next() } + s.writeList.Remove(seg) // if SACK is enabled then Only reduce outstanding if @@ -1329,7 +1369,23 @@ func (s *sender) sendSegment(seg *segment) *tcpip.Error { } seg.xmitTime = time.Now() seg.xmitCount++ - return s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber) + err := s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber) + + // Every time a packet containing data is sent (including a + // retransmission), if SACK is enabled and we are retransmitting data + // then use the conservative timer described in RFC6675 Section 6.0, + // otherwise follow the standard time described in RFC6298 Section 5.1. + if err != nil && seg.data.Size() != 0 { + if s.fr.active && seg.xmitCount > 1 && s.ep.sackPermitted { + s.resendTimer.enable(s.rto) + } else { + if !s.resendTimer.enabled() { + s.resendTimer.enable(s.rto) + } + } + } + + return err } // sendSegmentFromView sends a new segment containing the given payload, flags @@ -1345,19 +1401,5 @@ func (s *sender) sendSegmentFromView(data buffer.VectorisedView, flags byte, seq // Remember the max sent ack. s.maxSentAck = rcvNxt - // Every time a packet containing data is sent (including a - // retransmission), if SACK is enabled then use the conservative timer - // described in RFC6675 Section 4.0, otherwise follow the standard time - // described in RFC6298 Section 5.2. - if data.Size() != 0 { - if s.ep.sackPermitted { - s.resendTimer.enable(s.rto) - } else { - if !s.resendTimer.enabled() { - s.resendTimer.enable(s.rto) - } - } - } - return s.ep.sendRaw(data, flags, seq, rcvNxt, rcvWnd) } -- cgit v1.2.3