summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
authorBhasker Hariharan <bhaskerh@google.com>2020-05-07 10:24:47 -0700
committergVisor bot <gvisor-bot@google.com>2020-05-07 10:26:00 -0700
commit08f4846ebe79236ded6f4bc816b0a776afe14463 (patch)
treeb972f70c342165d479fbedc0f8ae424353f41463 /pkg
parent16da7e790f570d9743fb47e50c304e6a24834772 (diff)
Fix bugs in SACK recovery.
Every call to sender.NextSeg does not need to iterate from the front of the writeList as in a given recovery episode we can cache the last nextSeg returned. There cannot be a lower sequenced segment that matches the next call to NextSeg as otherwise we would have returned that instead in the previous call. This fixes the issue of excessive CPU usage w/ large send buffers where we spend a lot of time iterating from the front of the list on every NextSeg invocation. Further the following other bugs were also fixed: * Iteration of segments never sent in NextSeg() when looking for segments for retransmission that match step1/3/4 of the NextSeg algorithm * Correctly setting rescueRxt only if the rescue segment was actually sent. * Correctly initializing rescueRxt/highRxt when entering SACK recovery. * Correctly re-arming the timer only on retransmissions when SACK is in use and not for every segment being sent as it was being done before. * Copy over xmitTime and xmitCount on segment clone. * Move writeNext along when skipping over SACKED segments. This is required to prevent spurious retransmissions where we end up retransmitting data that was never lost. PiperOrigin-RevId: 310387671
Diffstat (limited to 'pkg')
-rw-r--r--pkg/tcpip/transport/tcp/segment.go2
-rw-r--r--pkg/tcpip/transport/tcp/snd.go208
2 files changed, 127 insertions, 83 deletions
diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go
index 7712ce652..074edded6 100644
--- a/pkg/tcpip/transport/tcp/segment.go
+++ b/pkg/tcpip/transport/tcp/segment.go
@@ -96,6 +96,8 @@ func (s *segment) clone() *segment {
route: s.route.Clone(),
viewToDeliver: s.viewToDeliver,
rcvdTime: s.rcvdTime,
+ xmitTime: s.xmitTime,
+ xmitCount: s.xmitCount,
}
t.data = s.data.Clone(t.views[:])
return t
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index a3018914b..9e547a221 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -598,22 +598,34 @@ func (s *sender) splitSeg(seg *segment, size int) {
seg.data.CapLength(size)
}
-// NextSeg implements the RFC6675 NextSeg() operation. It returns segments that
-// match rule 1, 3 and 4 of the NextSeg() operation defined in RFC6675. Rule 2
-// is handled by the normal send logic.
-func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
+// NextSeg implements the RFC6675 NextSeg() operation.
+//
+// NextSeg starts scanning the writeList starting from nextSegHint and returns
+// the hint to be passed on the next call to NextSeg. This is required to avoid
+// iterating the write list repeatedly when NextSeg is invoked in a loop during
+// recovery. The returned hint will be nil if there are no more segments that
+// can match rules defined by NextSeg operation in RFC6675.
+//
+// rescueRtx will be true only if nextSeg is a rescue retransmission as
+// described by Step 4) of the NextSeg algorithm.
+func (s *sender) NextSeg(nextSegHint *segment) (nextSeg, hint *segment, rescueRtx bool) {
var s3 *segment
var s4 *segment
- smss := s.ep.scoreboard.SMSS()
// Step 1.
- for seg := s.writeList.Front(); seg != nil; seg = seg.Next() {
- if !s.isAssignedSequenceNumber(seg) {
+ for seg := nextSegHint; seg != nil; seg = seg.Next() {
+ // Stop iteration if we hit a segment that has never been
+ // transmitted (i.e. either it has no assigned sequence number
+ // or if it does have one, it's >= the next sequence number
+ // to be sent [i.e. >= s.sndNxt]).
+ if !s.isAssignedSequenceNumber(seg) || s.sndNxt.LessThanEq(seg.sequenceNumber) {
+ hint = nil
break
}
segSeq := seg.sequenceNumber
- if seg.data.Size() > int(smss) {
+ if smss := s.ep.scoreboard.SMSS(); seg.data.Size() > int(smss) {
s.splitSeg(seg, int(smss))
}
+
// See RFC 6675 Section 4
//
// 1. If there exists a smallest unSACKED sequence number
@@ -630,8 +642,9 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
// NextSeg():
// (1.c) IsLost(S2) returns true.
if s.ep.scoreboard.IsLost(segSeq) {
- return seg, s3, s4
+ return seg, seg.Next(), false
}
+
// NextSeg():
//
// (3): If the conditions for rules (1) and (2)
@@ -643,6 +656,7 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
// SHOULD be returned.
if s3 == nil {
s3 = seg
+ hint = seg.Next()
}
}
// NextSeg():
@@ -651,10 +665,12 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
// but there exists outstanding unSACKED data, we
// provide the opportunity for a single "rescue"
// retransmission per entry into loss recovery. If
- // HighACK is greater than RescueRxt, the one
- // segment of upto SMSS octects that MUST include
- // the highest outstanding unSACKed sequence number
- // SHOULD be returned.
+ // HighACK is greater than RescueRxt (or RescueRxt
+ // is undefined), then one segment of upto SMSS
+ // octects that MUST include the highest outstanding
+ // unSACKed sequence number SHOULD be returned, and
+ // RescueRxt set to RecoveryPoint. HighRxt MUST NOT
+ // be updated.
if s.fr.rescueRxt.LessThan(s.sndUna - 1) {
if s4 != nil {
if s4.sequenceNumber.LessThan(segSeq) {
@@ -663,12 +679,31 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
} else {
s4 = seg
}
- s.fr.rescueRxt = s.fr.last
}
}
}
- return nil, s3, s4
+ // If we got here then no segment matched step (1).
+ // Step (2): "If no sequence number 'S2' per rule (1)
+ // exists but there exists available unsent data and the
+ // receiver's advertised window allows, the sequence
+ // range of one segment of up to SMSS octets of
+ // previously unsent data starting with sequence number
+ // HighData+1 MUST be returned."
+ for seg := s.writeNext; seg != nil; seg = seg.Next() {
+ if s.isAssignedSequenceNumber(seg) && seg.sequenceNumber.LessThan(s.sndNxt) {
+ continue
+ }
+ // We do not split the segment here to <= smss as it has
+ // potentially not been assigned a sequence number yet.
+ return seg, nil, false
+ }
+
+ if s3 != nil {
+ return s3, hint, false
+ }
+
+ return s4, nil, true
}
// maybeSendSegment tries to send the specified segment and either coalesces
@@ -792,64 +827,47 @@ func (s *sender) maybeSendSegment(seg *segment, limit int, end seqnum.Value) (se
// section 5, step C.
func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool) {
s.SetPipe()
+
+ if smss := int(s.ep.scoreboard.SMSS()); limit > smss {
+ // Cap segment size limit to s.smss as SACK recovery requires
+ // that all retransmissions or new segments send during recovery
+ // be of <= SMSS.
+ limit = smss
+ }
+
+ nextSegHint := s.writeList.Front()
for s.outstanding < s.sndCwnd {
- nextSeg, s3, s4 := s.NextSeg()
- if nextSeg == nil {
- // NextSeg():
- //
- // Step (2): "If no sequence number 'S2' per rule (1)
- // exists but there exists available unsent data and the
- // receiver's advertised window allows, the sequence
- // range of one segment of up to SMSS octets of
- // previously unsent data starting with sequence number
- // HighData+1 MUST be returned."
- for seg := s.writeNext; seg != nil; seg = seg.Next() {
- if s.isAssignedSequenceNumber(seg) && seg.sequenceNumber.LessThan(s.sndNxt) {
- continue
- }
- // Step C.3 described below is handled by
- // maybeSendSegment which increments sndNxt when
- // a segment is transmitted.
- //
- // Step C.3 "If any of the data octets sent in
- // (C.1) are above HighData, HighData must be
- // updated to reflect the transmission of
- // previously unsent data."
- if sent := s.maybeSendSegment(seg, limit, end); !sent {
- break
- }
- dataSent = true
- s.outstanding++
- s.writeNext = seg.Next()
- nextSeg = seg
- break
- }
- if nextSeg != nil {
- continue
- }
- }
- rescueRtx := false
- if nextSeg == nil && s3 != nil {
- nextSeg = s3
- }
- if nextSeg == nil && s4 != nil {
- nextSeg = s4
- rescueRtx = true
- }
+ var nextSeg *segment
+ var rescueRtx bool
+ nextSeg, nextSegHint, rescueRtx = s.NextSeg(nextSegHint)
if nextSeg == nil {
- break
+ return dataSent
}
- segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen())
- if !rescueRtx && nextSeg.sequenceNumber.LessThan(s.sndNxt) {
- // RFC 6675, Step C.2
+ if !s.isAssignedSequenceNumber(nextSeg) || s.sndNxt.LessThanEq(nextSeg.sequenceNumber) {
+ // New data being sent.
+
+ // Step C.3 described below is handled by
+ // maybeSendSegment which increments sndNxt when
+ // a segment is transmitted.
//
- // "If any of the data octets sent in (C.1) are below
- // HighData, HighRxt MUST be set to the highest sequence
- // number of the retransmitted segment unless NextSeg ()
- // rule (4) was invoked for this retransmission."
- s.fr.highRxt = segEnd - 1
+ // Step C.3 "If any of the data octets sent in
+ // (C.1) are above HighData, HighData must be
+ // updated to reflect the transmission of
+ // previously unsent data."
+ //
+ // We pass s.smss as the limit as the Step 2) requires that
+ // new data sent should be of size s.smss or less.
+ if sent := s.maybeSendSegment(nextSeg, limit, end); !sent {
+ return dataSent
+ }
+ dataSent = true
+ s.outstanding++
+ s.writeNext = nextSeg.Next()
+ continue
}
+ // Now handle the retransmission case where we matched either step 1,3 or 4
+ // of the NextSeg algorithm.
// RFC 6675, Step C.4.
//
// "The estimate of the amount of data outstanding in the network
@@ -858,6 +876,22 @@ func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool)
s.outstanding++
dataSent = true
s.sendSegment(nextSeg)
+
+ segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen())
+ if rescueRtx {
+ // We do the last part of rule (4) of NextSeg here to update
+ // RescueRxt as until this point we don't know if we are going
+ // to use the rescue transmission.
+ s.fr.rescueRxt = s.fr.last
+ } else {
+ // RFC 6675, Step C.2
+ //
+ // "If any of the data octets sent in (C.1) are below
+ // HighData, HighRxt MUST be set to the highest sequence
+ // number of the retransmitted segment unless NextSeg ()
+ // rule (4) was invoked for this retransmission."
+ s.fr.highRxt = segEnd - 1
+ }
}
return dataSent
}
@@ -903,7 +937,7 @@ func (s *sender) sendData() {
// "A TCP SHOULD set cwnd to no more than RW before beginning
// transmission if the TCP has not sent data in the interval exceeding
// the retrasmission timeout."
- if !s.fr.active && time.Now().Sub(s.lastSendTime) > s.rto {
+ if !s.fr.active && s.state != RTORecovery && time.Now().Sub(s.lastSendTime) > s.rto {
if s.sndCwnd > InitialCwnd {
s.sndCwnd = InitialCwnd
}
@@ -921,6 +955,9 @@ func (s *sender) sendData() {
limit = cwndLimit
}
if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) {
+ // Move writeNext along so that we don't try and scan data that
+ // has already been SACKED.
+ s.writeNext = seg.Next()
continue
}
if sent := s.maybeSendSegment(seg, limit, end); !sent {
@@ -966,6 +1003,8 @@ func (s *sender) enterFastRecovery() {
s.fr.first = s.sndUna
s.fr.last = s.sndNxt - 1
s.fr.maxCwnd = s.sndCwnd + s.outstanding
+ s.fr.highRxt = s.sndUna
+ s.fr.rescueRxt = s.sndUna
if s.ep.sackPermitted {
s.state = SACKRecovery
s.ep.stack.Stats().TCP.SACKRecovery.Increment()
@@ -1258,6 +1297,7 @@ func (s *sender) handleRcvdSegment(seg *segment) {
if s.writeNext == seg {
s.writeNext = seg.Next()
}
+
s.writeList.Remove(seg)
// if SACK is enabled then Only reduce outstanding if
@@ -1329,7 +1369,23 @@ func (s *sender) sendSegment(seg *segment) *tcpip.Error {
}
seg.xmitTime = time.Now()
seg.xmitCount++
- return s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber)
+ err := s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber)
+
+ // Every time a packet containing data is sent (including a
+ // retransmission), if SACK is enabled and we are retransmitting data
+ // then use the conservative timer described in RFC6675 Section 6.0,
+ // otherwise follow the standard time described in RFC6298 Section 5.1.
+ if err != nil && seg.data.Size() != 0 {
+ if s.fr.active && seg.xmitCount > 1 && s.ep.sackPermitted {
+ s.resendTimer.enable(s.rto)
+ } else {
+ if !s.resendTimer.enabled() {
+ s.resendTimer.enable(s.rto)
+ }
+ }
+ }
+
+ return err
}
// sendSegmentFromView sends a new segment containing the given payload, flags
@@ -1345,19 +1401,5 @@ func (s *sender) sendSegmentFromView(data buffer.VectorisedView, flags byte, seq
// Remember the max sent ack.
s.maxSentAck = rcvNxt
- // Every time a packet containing data is sent (including a
- // retransmission), if SACK is enabled then use the conservative timer
- // described in RFC6675 Section 4.0, otherwise follow the standard time
- // described in RFC6298 Section 5.2.
- if data.Size() != 0 {
- if s.ep.sackPermitted {
- s.resendTimer.enable(s.rto)
- } else {
- if !s.resendTimer.enabled() {
- s.resendTimer.enable(s.rto)
- }
- }
- }
-
return s.ep.sendRaw(data, flags, seq, rcvNxt, rcvWnd)
}