From 7fff51e50fb6af3e41669a3b229dc05ab4a8017d Mon Sep 17 00:00:00 2001 From: Nayana Bidari Date: Fri, 13 Nov 2020 12:10:15 -0800 Subject: Refactor loss recovery in TCP. The current implementation of loss recovery algorithms SACK and NewReno are in the same file(snd.go). The functions have multiple checks to see which one is currently used by the endpoint. This CL will refactor and separate the implementation of existing recovery algorithms which will help us to implement new recovery algorithms(such as RACK) with less changes to the existing code. There is no change in the behavior. PiperOrigin-RevId: 342312166 --- pkg/tcpip/transport/tcp/snd.go | 283 ++++++++++++++--------------------------- 1 file changed, 96 insertions(+), 187 deletions(-) (limited to 'pkg/tcpip/transport/tcp/snd.go') diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index ab5fa4fb7..c42d7159a 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -92,6 +92,17 @@ type congestionControl interface { PostRecovery() } +// lossRecovery is an interface that must be implemented by any supported +// loss recovery algorithm. +type lossRecovery interface { + // DoRecovery is invoked when loss is detected and segments need + // to be retransmitted. The cumulative or selective ACK is passed along + // with the flag which identifies whether the connection entered fast + // retransmit with this ACK and to retransmit the first unacknowledged + // segment. + DoRecovery(rcvdSeg *segment, fastRetransmit bool) +} + // sender holds the state necessary to send TCP segments. // // +stateify savable @@ -108,6 +119,9 @@ type sender struct { // fr holds state related to fast recovery. fr fastRecovery + // lr is the loss recovery algorithm used by the sender. + lr lossRecovery + // sndCwnd is the congestion window, in packets. sndCwnd int @@ -276,6 +290,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint s.cc = s.initCongestionControl(ep.cc) + s.lr = s.initLossRecovery() + // A negative sndWndScale means that no scaling is in use, otherwise we // store the scaling value. if sndWndScale > 0 { @@ -330,6 +346,14 @@ func (s *sender) initCongestionControl(congestionControlName tcpip.CongestionCon } } +// initLossRecovery initiates the loss recovery algorithm for the sender. +func (s *sender) initLossRecovery() lossRecovery { + if s.ep.sackPermitted { + return newSACKRecovery(s) + } + return newRenoRecovery(s) +} + // updateMaxPayloadSize updates the maximum payload size based on the given // MTU. If this is in response to "packet too big" control packets (indicated // by the count argument), it also reduces the number of outstanding packets and @@ -550,7 +574,7 @@ func (s *sender) retransmitTimerExpired() bool { // We were attempting fast recovery but were not successful. // Leave the state. We don't need to update ssthresh because it // has already been updated when entered fast-recovery. - s.leaveFastRecovery() + s.leaveRecovery() } s.state = RTORecovery @@ -913,79 +937,6 @@ func (s *sender) maybeSendSegment(seg *segment, limit int, end seqnum.Value) (se return true } -// handleSACKRecovery implements the loss recovery phase as described in RFC6675 -// section 5, step C. -func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool) { - s.SetPipe() - - if smss := int(s.ep.scoreboard.SMSS()); limit > smss { - // Cap segment size limit to s.smss as SACK recovery requires - // that all retransmissions or new segments send during recovery - // be of <= SMSS. - limit = smss - } - - nextSegHint := s.writeList.Front() - for s.outstanding < s.sndCwnd { - var nextSeg *segment - var rescueRtx bool - nextSeg, nextSegHint, rescueRtx = s.NextSeg(nextSegHint) - if nextSeg == nil { - return dataSent - } - if !s.isAssignedSequenceNumber(nextSeg) || s.sndNxt.LessThanEq(nextSeg.sequenceNumber) { - // New data being sent. - - // Step C.3 described below is handled by - // maybeSendSegment which increments sndNxt when - // a segment is transmitted. - // - // Step C.3 "If any of the data octets sent in - // (C.1) are above HighData, HighData must be - // updated to reflect the transmission of - // previously unsent data." - // - // We pass s.smss as the limit as the Step 2) requires that - // new data sent should be of size s.smss or less. - if sent := s.maybeSendSegment(nextSeg, limit, end); !sent { - return dataSent - } - dataSent = true - s.outstanding++ - s.writeNext = nextSeg.Next() - continue - } - - // Now handle the retransmission case where we matched either step 1,3 or 4 - // of the NextSeg algorithm. - // RFC 6675, Step C.4. - // - // "The estimate of the amount of data outstanding in the network - // must be updated by incrementing pipe by the number of octets - // transmitted in (C.1)." - s.outstanding++ - dataSent = true - s.sendSegment(nextSeg) - - segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen()) - if rescueRtx { - // We do the last part of rule (4) of NextSeg here to update - // RescueRxt as until this point we don't know if we are going - // to use the rescue transmission. - s.fr.rescueRxt = s.fr.last - } else { - // RFC 6675, Step C.2 - // - // "If any of the data octets sent in (C.1) are below - // HighData, HighRxt MUST be set to the highest sequence - // number of the retransmitted segment unless NextSeg () - // rule (4) was invoked for this retransmission." - s.fr.highRxt = segEnd - 1 - } - } - return dataSent -} - func (s *sender) sendZeroWindowProbe() { ack, win := s.ep.rcv.getSendParams() s.unackZeroWindowProbes++ @@ -1014,6 +965,30 @@ func (s *sender) disableZeroWindowProbing() { s.resendTimer.disable() } +func (s *sender) postXmit(dataSent bool) { + if dataSent { + // We sent data, so we should stop the keepalive timer to ensure + // that no keepalives are sent while there is pending data. + s.ep.disableKeepaliveTimer() + } + + // If the sender has advertized zero receive window and we have + // data to be sent out, start zero window probing to query the + // the remote for it's receive window size. + if s.writeNext != nil && s.sndWnd == 0 { + s.enableZeroWindowProbing() + } + + // Enable the timer if we have pending data and it's not enabled yet. + if !s.resendTimer.enabled() && s.sndUna != s.sndNxt { + s.resendTimer.enable(s.rto) + } + // If we have no more pending data, start the keepalive timer. + if s.sndUna == s.sndNxt { + s.ep.resetKeepaliveTimer(false) + } +} + // sendData sends new data segments. It is called when data becomes available or // when the send window opens up. func (s *sender) sendData() { @@ -1034,55 +1009,29 @@ func (s *sender) sendData() { } var dataSent bool - - // RFC 6675 recovery algorithm step C 1-5. - if s.fr.active && s.ep.sackPermitted { - dataSent = s.handleSACKRecovery(s.maxPayloadSize, end) - } else { - for seg := s.writeNext; seg != nil && s.outstanding < s.sndCwnd; seg = seg.Next() { - cwndLimit := (s.sndCwnd - s.outstanding) * s.maxPayloadSize - if cwndLimit < limit { - limit = cwndLimit - } - if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) { - // Move writeNext along so that we don't try and scan data that - // has already been SACKED. - s.writeNext = seg.Next() - continue - } - if sent := s.maybeSendSegment(seg, limit, end); !sent { - break - } - dataSent = true - s.outstanding += s.pCount(seg) + for seg := s.writeNext; seg != nil && s.outstanding < s.sndCwnd; seg = seg.Next() { + cwndLimit := (s.sndCwnd - s.outstanding) * s.maxPayloadSize + if cwndLimit < limit { + limit = cwndLimit + } + if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) { + // Move writeNext along so that we don't try and scan data that + // has already been SACKED. s.writeNext = seg.Next() + continue } + if sent := s.maybeSendSegment(seg, limit, end); !sent { + break + } + dataSent = true + s.outstanding += s.pCount(seg) + s.writeNext = seg.Next() } - if dataSent { - // We sent data, so we should stop the keepalive timer to ensure - // that no keepalives are sent while there is pending data. - s.ep.disableKeepaliveTimer() - } - - // If the sender has advertized zero receive window and we have - // data to be sent out, start zero window probing to query the - // the remote for it's receive window size. - if s.writeNext != nil && s.sndWnd == 0 { - s.enableZeroWindowProbing() - } - - // Enable the timer if we have pending data and it's not enabled yet. - if !s.resendTimer.enabled() && s.sndUna != s.sndNxt { - s.resendTimer.enable(s.rto) - } - // If we have no more pending data, start the keepalive timer. - if s.sndUna == s.sndNxt { - s.ep.resetKeepaliveTimer(false) - } + s.postXmit(dataSent) } -func (s *sender) enterFastRecovery() { +func (s *sender) enterRecovery() { s.fr.active = true // Save state to reflect we're now in fast recovery. // @@ -1095,6 +1044,7 @@ func (s *sender) enterFastRecovery() { s.fr.maxCwnd = s.sndCwnd + s.outstanding s.fr.highRxt = s.sndUna s.fr.rescueRxt = s.sndUna + if s.ep.sackPermitted { s.state = SACKRecovery s.ep.stack.Stats().TCP.SACKRecovery.Increment() @@ -1104,7 +1054,7 @@ func (s *sender) enterFastRecovery() { s.ep.stack.Stats().TCP.FastRecovery.Increment() } -func (s *sender) leaveFastRecovery() { +func (s *sender) leaveRecovery() { s.fr.active = false s.fr.maxCwnd = 0 s.dupAckCount = 0 @@ -1115,57 +1065,6 @@ func (s *sender) leaveFastRecovery() { s.cc.PostRecovery() } -func (s *sender) handleFastRecovery(seg *segment) (rtx bool) { - ack := seg.ackNumber - // We are in fast recovery mode. Ignore the ack if it's out of - // range. - if !ack.InRange(s.sndUna, s.sndNxt+1) { - return false - } - - // Leave fast recovery if it acknowledges all the data covered by - // this fast recovery session. - if s.fr.last.LessThan(ack) { - s.leaveFastRecovery() - return false - } - - if s.ep.sackPermitted { - // When SACK is enabled we let retransmission be governed by - // the SACK logic. - return false - } - - // Don't count this as a duplicate if it is carrying data or - // updating the window. - if seg.logicalLen() != 0 || s.sndWnd != seg.window { - return false - } - - // Inflate the congestion window if we're getting duplicate acks - // for the packet we retransmitted. - if ack == s.fr.first { - // We received a dup, inflate the congestion window by 1 packet - // if we're not at the max yet. Only inflate the window if - // regular FastRecovery is in use, RFC6675 does not require - // inflating cwnd on duplicate ACKs. - if s.sndCwnd < s.fr.maxCwnd { - s.sndCwnd++ - } - return false - } - - // A partial ack was received. Retransmit this packet and - // remember it so that we don't retransmit it again. We don't - // inflate the window because we're putting the same packet back - // onto the wire. - // - // N.B. The retransmit timer will be reset by the caller. - s.fr.first = ack - s.dupAckCount = 0 - return true -} - // isAssignedSequenceNumber relies on the fact that we only set flags once a // sequencenumber is assigned and that is only done right before we send the // segment. As a result any segment that has a non-zero flag has a valid @@ -1228,14 +1127,11 @@ func (s *sender) SetPipe() { s.outstanding = pipe } -// checkDuplicateAck is called when an ack is received. It manages the state -// related to duplicate acks and determines if a retransmit is needed according -// to the rules in RFC 6582 (NewReno). -func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) { +// detectLoss is called when an ack is received and returns whether a loss is +// detected. It manages the state related to duplicate acks and determines if +// a retransmit is needed according to the rules in RFC 6582 (NewReno). +func (s *sender) detectLoss(seg *segment) (fastRetransmit bool) { ack := seg.ackNumber - if s.fr.active { - return s.handleFastRecovery(seg) - } // We're not in fast recovery yet. A segment is considered a duplicate // only if it doesn't carry any data and doesn't update the send window, @@ -1266,15 +1162,16 @@ func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) { // See: https://tools.ietf.org/html/rfc6582#section-3.2 Step 2 // // We only do the check here, the incrementing of last to the highest - // sequence number transmitted till now is done when enterFastRecovery + // sequence number transmitted till now is done when enterRecovery // is invoked. if !s.fr.last.LessThan(seg.ackNumber) { s.dupAckCount = 0 return false } s.cc.HandleNDupAcks() - s.enterFastRecovery() + s.enterRecovery() s.dupAckCount = 0 + return true } @@ -1367,14 +1264,21 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { s.SetPipe() } - // Count the duplicates and do the fast retransmit if needed. - rtx := s.checkDuplicateAck(rcvdSeg) + ack := rcvdSeg.ackNumber + if s.fr.active { + // Leave fast recovery if it acknowledges all the data covered by + // this fast recovery session. + if s.fr.last.LessThan(ack) { + s.leaveRecovery() + } + } + + // Detect loss by counting the duplicates and enter recovery. + fastRetransmit := s.detectLoss(rcvdSeg) // Stash away the current window size. s.sndWnd = rcvdSeg.window - ack := rcvdSeg.ackNumber - // Disable zero window probing if remote advertizes a non-zero receive // window. This can be with an ACK to the zero window probe (where the // acknumber refers to the already acknowledged byte) OR to any previously @@ -1491,19 +1395,24 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { s.resendTimer.disable() } } + // Now that we've popped all acknowledged data from the retransmit // queue, retransmit if needed. - if rtx { - s.resendSegment() + if s.fr.active { + s.lr.DoRecovery(rcvdSeg, fastRetransmit) + // When SACK is enabled data sending is governed by steps in + // RFC 6675 Section 5 recovery steps A-C. + // See: https://tools.ietf.org/html/rfc6675#section-5. + if s.ep.sackPermitted { + return + } } // Send more data now that some of the pending data has been ack'd, or // that the window opened up, or the congestion window was inflated due // to a duplicate ack during fast recovery. This will also re-enable // the retransmit timer if needed. - if !s.ep.sackPermitted || s.fr.active || s.dupAckCount == 0 || rcvdSeg.hasNewSACKInfo { - s.sendData() - } + s.sendData() } // sendSegment sends the specified segment. -- cgit v1.2.3