summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/snd.go
diff options
context:
space:
mode:
authorgVisor bot <gvisor-bot@google.com>2020-11-13 20:15:51 +0000
committergVisor bot <gvisor-bot@google.com>2020-11-13 20:15:51 +0000
commitb1e7f6f75f9ee63906081f9a1dc7d26b148836c1 (patch)
tree6374a5bd6b2989eef9da740075399b8a02e11db9 /pkg/tcpip/transport/tcp/snd.go
parent325b2c65da958be80bc87e27c2421e7f8651f44a (diff)
parent7fff51e50fb6af3e41669a3b229dc05ab4a8017d (diff)
Merge release-20201030.0-89-g7fff51e50 (automated)
Diffstat (limited to 'pkg/tcpip/transport/tcp/snd.go')
-rw-r--r--pkg/tcpip/transport/tcp/snd.go283
1 files changed, 96 insertions, 187 deletions
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index ab5fa4fb7..c42d7159a 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -92,6 +92,17 @@ type congestionControl interface {
PostRecovery()
}
+// lossRecovery is an interface that must be implemented by any supported
+// loss recovery algorithm.
+type lossRecovery interface {
+ // DoRecovery is invoked when loss is detected and segments need
+ // to be retransmitted. The cumulative or selective ACK is passed along
+ // with the flag which identifies whether the connection entered fast
+ // retransmit with this ACK and to retransmit the first unacknowledged
+ // segment.
+ DoRecovery(rcvdSeg *segment, fastRetransmit bool)
+}
+
// sender holds the state necessary to send TCP segments.
//
// +stateify savable
@@ -108,6 +119,9 @@ type sender struct {
// fr holds state related to fast recovery.
fr fastRecovery
+ // lr is the loss recovery algorithm used by the sender.
+ lr lossRecovery
+
// sndCwnd is the congestion window, in packets.
sndCwnd int
@@ -276,6 +290,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint
s.cc = s.initCongestionControl(ep.cc)
+ s.lr = s.initLossRecovery()
+
// A negative sndWndScale means that no scaling is in use, otherwise we
// store the scaling value.
if sndWndScale > 0 {
@@ -330,6 +346,14 @@ func (s *sender) initCongestionControl(congestionControlName tcpip.CongestionCon
}
}
+// initLossRecovery initiates the loss recovery algorithm for the sender.
+func (s *sender) initLossRecovery() lossRecovery {
+ if s.ep.sackPermitted {
+ return newSACKRecovery(s)
+ }
+ return newRenoRecovery(s)
+}
+
// updateMaxPayloadSize updates the maximum payload size based on the given
// MTU. If this is in response to "packet too big" control packets (indicated
// by the count argument), it also reduces the number of outstanding packets and
@@ -550,7 +574,7 @@ func (s *sender) retransmitTimerExpired() bool {
// We were attempting fast recovery but were not successful.
// Leave the state. We don't need to update ssthresh because it
// has already been updated when entered fast-recovery.
- s.leaveFastRecovery()
+ s.leaveRecovery()
}
s.state = RTORecovery
@@ -913,79 +937,6 @@ func (s *sender) maybeSendSegment(seg *segment, limit int, end seqnum.Value) (se
return true
}
-// handleSACKRecovery implements the loss recovery phase as described in RFC6675
-// section 5, step C.
-func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool) {
- s.SetPipe()
-
- if smss := int(s.ep.scoreboard.SMSS()); limit > smss {
- // Cap segment size limit to s.smss as SACK recovery requires
- // that all retransmissions or new segments send during recovery
- // be of <= SMSS.
- limit = smss
- }
-
- nextSegHint := s.writeList.Front()
- for s.outstanding < s.sndCwnd {
- var nextSeg *segment
- var rescueRtx bool
- nextSeg, nextSegHint, rescueRtx = s.NextSeg(nextSegHint)
- if nextSeg == nil {
- return dataSent
- }
- if !s.isAssignedSequenceNumber(nextSeg) || s.sndNxt.LessThanEq(nextSeg.sequenceNumber) {
- // New data being sent.
-
- // Step C.3 described below is handled by
- // maybeSendSegment which increments sndNxt when
- // a segment is transmitted.
- //
- // Step C.3 "If any of the data octets sent in
- // (C.1) are above HighData, HighData must be
- // updated to reflect the transmission of
- // previously unsent data."
- //
- // We pass s.smss as the limit as the Step 2) requires that
- // new data sent should be of size s.smss or less.
- if sent := s.maybeSendSegment(nextSeg, limit, end); !sent {
- return dataSent
- }
- dataSent = true
- s.outstanding++
- s.writeNext = nextSeg.Next()
- continue
- }
-
- // Now handle the retransmission case where we matched either step 1,3 or 4
- // of the NextSeg algorithm.
- // RFC 6675, Step C.4.
- //
- // "The estimate of the amount of data outstanding in the network
- // must be updated by incrementing pipe by the number of octets
- // transmitted in (C.1)."
- s.outstanding++
- dataSent = true
- s.sendSegment(nextSeg)
-
- segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen())
- if rescueRtx {
- // We do the last part of rule (4) of NextSeg here to update
- // RescueRxt as until this point we don't know if we are going
- // to use the rescue transmission.
- s.fr.rescueRxt = s.fr.last
- } else {
- // RFC 6675, Step C.2
- //
- // "If any of the data octets sent in (C.1) are below
- // HighData, HighRxt MUST be set to the highest sequence
- // number of the retransmitted segment unless NextSeg ()
- // rule (4) was invoked for this retransmission."
- s.fr.highRxt = segEnd - 1
- }
- }
- return dataSent
-}
-
func (s *sender) sendZeroWindowProbe() {
ack, win := s.ep.rcv.getSendParams()
s.unackZeroWindowProbes++
@@ -1014,6 +965,30 @@ func (s *sender) disableZeroWindowProbing() {
s.resendTimer.disable()
}
+func (s *sender) postXmit(dataSent bool) {
+ if dataSent {
+ // We sent data, so we should stop the keepalive timer to ensure
+ // that no keepalives are sent while there is pending data.
+ s.ep.disableKeepaliveTimer()
+ }
+
+ // If the sender has advertized zero receive window and we have
+ // data to be sent out, start zero window probing to query the
+ // the remote for it's receive window size.
+ if s.writeNext != nil && s.sndWnd == 0 {
+ s.enableZeroWindowProbing()
+ }
+
+ // Enable the timer if we have pending data and it's not enabled yet.
+ if !s.resendTimer.enabled() && s.sndUna != s.sndNxt {
+ s.resendTimer.enable(s.rto)
+ }
+ // If we have no more pending data, start the keepalive timer.
+ if s.sndUna == s.sndNxt {
+ s.ep.resetKeepaliveTimer(false)
+ }
+}
+
// sendData sends new data segments. It is called when data becomes available or
// when the send window opens up.
func (s *sender) sendData() {
@@ -1034,55 +1009,29 @@ func (s *sender) sendData() {
}
var dataSent bool
-
- // RFC 6675 recovery algorithm step C 1-5.
- if s.fr.active && s.ep.sackPermitted {
- dataSent = s.handleSACKRecovery(s.maxPayloadSize, end)
- } else {
- for seg := s.writeNext; seg != nil && s.outstanding < s.sndCwnd; seg = seg.Next() {
- cwndLimit := (s.sndCwnd - s.outstanding) * s.maxPayloadSize
- if cwndLimit < limit {
- limit = cwndLimit
- }
- if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) {
- // Move writeNext along so that we don't try and scan data that
- // has already been SACKED.
- s.writeNext = seg.Next()
- continue
- }
- if sent := s.maybeSendSegment(seg, limit, end); !sent {
- break
- }
- dataSent = true
- s.outstanding += s.pCount(seg)
+ for seg := s.writeNext; seg != nil && s.outstanding < s.sndCwnd; seg = seg.Next() {
+ cwndLimit := (s.sndCwnd - s.outstanding) * s.maxPayloadSize
+ if cwndLimit < limit {
+ limit = cwndLimit
+ }
+ if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) {
+ // Move writeNext along so that we don't try and scan data that
+ // has already been SACKED.
s.writeNext = seg.Next()
+ continue
}
+ if sent := s.maybeSendSegment(seg, limit, end); !sent {
+ break
+ }
+ dataSent = true
+ s.outstanding += s.pCount(seg)
+ s.writeNext = seg.Next()
}
- if dataSent {
- // We sent data, so we should stop the keepalive timer to ensure
- // that no keepalives are sent while there is pending data.
- s.ep.disableKeepaliveTimer()
- }
-
- // If the sender has advertized zero receive window and we have
- // data to be sent out, start zero window probing to query the
- // the remote for it's receive window size.
- if s.writeNext != nil && s.sndWnd == 0 {
- s.enableZeroWindowProbing()
- }
-
- // Enable the timer if we have pending data and it's not enabled yet.
- if !s.resendTimer.enabled() && s.sndUna != s.sndNxt {
- s.resendTimer.enable(s.rto)
- }
- // If we have no more pending data, start the keepalive timer.
- if s.sndUna == s.sndNxt {
- s.ep.resetKeepaliveTimer(false)
- }
+ s.postXmit(dataSent)
}
-func (s *sender) enterFastRecovery() {
+func (s *sender) enterRecovery() {
s.fr.active = true
// Save state to reflect we're now in fast recovery.
//
@@ -1095,6 +1044,7 @@ func (s *sender) enterFastRecovery() {
s.fr.maxCwnd = s.sndCwnd + s.outstanding
s.fr.highRxt = s.sndUna
s.fr.rescueRxt = s.sndUna
+
if s.ep.sackPermitted {
s.state = SACKRecovery
s.ep.stack.Stats().TCP.SACKRecovery.Increment()
@@ -1104,7 +1054,7 @@ func (s *sender) enterFastRecovery() {
s.ep.stack.Stats().TCP.FastRecovery.Increment()
}
-func (s *sender) leaveFastRecovery() {
+func (s *sender) leaveRecovery() {
s.fr.active = false
s.fr.maxCwnd = 0
s.dupAckCount = 0
@@ -1115,57 +1065,6 @@ func (s *sender) leaveFastRecovery() {
s.cc.PostRecovery()
}
-func (s *sender) handleFastRecovery(seg *segment) (rtx bool) {
- ack := seg.ackNumber
- // We are in fast recovery mode. Ignore the ack if it's out of
- // range.
- if !ack.InRange(s.sndUna, s.sndNxt+1) {
- return false
- }
-
- // Leave fast recovery if it acknowledges all the data covered by
- // this fast recovery session.
- if s.fr.last.LessThan(ack) {
- s.leaveFastRecovery()
- return false
- }
-
- if s.ep.sackPermitted {
- // When SACK is enabled we let retransmission be governed by
- // the SACK logic.
- return false
- }
-
- // Don't count this as a duplicate if it is carrying data or
- // updating the window.
- if seg.logicalLen() != 0 || s.sndWnd != seg.window {
- return false
- }
-
- // Inflate the congestion window if we're getting duplicate acks
- // for the packet we retransmitted.
- if ack == s.fr.first {
- // We received a dup, inflate the congestion window by 1 packet
- // if we're not at the max yet. Only inflate the window if
- // regular FastRecovery is in use, RFC6675 does not require
- // inflating cwnd on duplicate ACKs.
- if s.sndCwnd < s.fr.maxCwnd {
- s.sndCwnd++
- }
- return false
- }
-
- // A partial ack was received. Retransmit this packet and
- // remember it so that we don't retransmit it again. We don't
- // inflate the window because we're putting the same packet back
- // onto the wire.
- //
- // N.B. The retransmit timer will be reset by the caller.
- s.fr.first = ack
- s.dupAckCount = 0
- return true
-}
-
// isAssignedSequenceNumber relies on the fact that we only set flags once a
// sequencenumber is assigned and that is only done right before we send the
// segment. As a result any segment that has a non-zero flag has a valid
@@ -1228,14 +1127,11 @@ func (s *sender) SetPipe() {
s.outstanding = pipe
}
-// checkDuplicateAck is called when an ack is received. It manages the state
-// related to duplicate acks and determines if a retransmit is needed according
-// to the rules in RFC 6582 (NewReno).
-func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) {
+// detectLoss is called when an ack is received and returns whether a loss is
+// detected. It manages the state related to duplicate acks and determines if
+// a retransmit is needed according to the rules in RFC 6582 (NewReno).
+func (s *sender) detectLoss(seg *segment) (fastRetransmit bool) {
ack := seg.ackNumber
- if s.fr.active {
- return s.handleFastRecovery(seg)
- }
// We're not in fast recovery yet. A segment is considered a duplicate
// only if it doesn't carry any data and doesn't update the send window,
@@ -1266,15 +1162,16 @@ func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) {
// See: https://tools.ietf.org/html/rfc6582#section-3.2 Step 2
//
// We only do the check here, the incrementing of last to the highest
- // sequence number transmitted till now is done when enterFastRecovery
+ // sequence number transmitted till now is done when enterRecovery
// is invoked.
if !s.fr.last.LessThan(seg.ackNumber) {
s.dupAckCount = 0
return false
}
s.cc.HandleNDupAcks()
- s.enterFastRecovery()
+ s.enterRecovery()
s.dupAckCount = 0
+
return true
}
@@ -1367,14 +1264,21 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
s.SetPipe()
}
- // Count the duplicates and do the fast retransmit if needed.
- rtx := s.checkDuplicateAck(rcvdSeg)
+ ack := rcvdSeg.ackNumber
+ if s.fr.active {
+ // Leave fast recovery if it acknowledges all the data covered by
+ // this fast recovery session.
+ if s.fr.last.LessThan(ack) {
+ s.leaveRecovery()
+ }
+ }
+
+ // Detect loss by counting the duplicates and enter recovery.
+ fastRetransmit := s.detectLoss(rcvdSeg)
// Stash away the current window size.
s.sndWnd = rcvdSeg.window
- ack := rcvdSeg.ackNumber
-
// Disable zero window probing if remote advertizes a non-zero receive
// window. This can be with an ACK to the zero window probe (where the
// acknumber refers to the already acknowledged byte) OR to any previously
@@ -1491,19 +1395,24 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
s.resendTimer.disable()
}
}
+
// Now that we've popped all acknowledged data from the retransmit
// queue, retransmit if needed.
- if rtx {
- s.resendSegment()
+ if s.fr.active {
+ s.lr.DoRecovery(rcvdSeg, fastRetransmit)
+ // When SACK is enabled data sending is governed by steps in
+ // RFC 6675 Section 5 recovery steps A-C.
+ // See: https://tools.ietf.org/html/rfc6675#section-5.
+ if s.ep.sackPermitted {
+ return
+ }
}
// Send more data now that some of the pending data has been ack'd, or
// that the window opened up, or the congestion window was inflated due
// to a duplicate ack during fast recovery. This will also re-enable
// the retransmit timer if needed.
- if !s.ep.sackPermitted || s.fr.active || s.dupAckCount == 0 || rcvdSeg.hasNewSACKInfo {
- s.sendData()
- }
+ s.sendData()
}
// sendSegment sends the specified segment.