diff options
author | gVisor bot <gvisor-bot@google.com> | 2020-11-13 20:15:51 +0000 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2020-11-13 20:15:51 +0000 |
commit | b1e7f6f75f9ee63906081f9a1dc7d26b148836c1 (patch) | |
tree | 6374a5bd6b2989eef9da740075399b8a02e11db9 /pkg/tcpip/transport/tcp/snd.go | |
parent | 325b2c65da958be80bc87e27c2421e7f8651f44a (diff) | |
parent | 7fff51e50fb6af3e41669a3b229dc05ab4a8017d (diff) |
Merge release-20201030.0-89-g7fff51e50 (automated)
Diffstat (limited to 'pkg/tcpip/transport/tcp/snd.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 283 |
1 files changed, 96 insertions, 187 deletions
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index ab5fa4fb7..c42d7159a 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -92,6 +92,17 @@ type congestionControl interface { PostRecovery() } +// lossRecovery is an interface that must be implemented by any supported +// loss recovery algorithm. +type lossRecovery interface { + // DoRecovery is invoked when loss is detected and segments need + // to be retransmitted. The cumulative or selective ACK is passed along + // with the flag which identifies whether the connection entered fast + // retransmit with this ACK and to retransmit the first unacknowledged + // segment. + DoRecovery(rcvdSeg *segment, fastRetransmit bool) +} + // sender holds the state necessary to send TCP segments. // // +stateify savable @@ -108,6 +119,9 @@ type sender struct { // fr holds state related to fast recovery. fr fastRecovery + // lr is the loss recovery algorithm used by the sender. + lr lossRecovery + // sndCwnd is the congestion window, in packets. sndCwnd int @@ -276,6 +290,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint s.cc = s.initCongestionControl(ep.cc) + s.lr = s.initLossRecovery() + // A negative sndWndScale means that no scaling is in use, otherwise we // store the scaling value. if sndWndScale > 0 { @@ -330,6 +346,14 @@ func (s *sender) initCongestionControl(congestionControlName tcpip.CongestionCon } } +// initLossRecovery initiates the loss recovery algorithm for the sender. +func (s *sender) initLossRecovery() lossRecovery { + if s.ep.sackPermitted { + return newSACKRecovery(s) + } + return newRenoRecovery(s) +} + // updateMaxPayloadSize updates the maximum payload size based on the given // MTU. If this is in response to "packet too big" control packets (indicated // by the count argument), it also reduces the number of outstanding packets and @@ -550,7 +574,7 @@ func (s *sender) retransmitTimerExpired() bool { // We were attempting fast recovery but were not successful. // Leave the state. We don't need to update ssthresh because it // has already been updated when entered fast-recovery. - s.leaveFastRecovery() + s.leaveRecovery() } s.state = RTORecovery @@ -913,79 +937,6 @@ func (s *sender) maybeSendSegment(seg *segment, limit int, end seqnum.Value) (se return true } -// handleSACKRecovery implements the loss recovery phase as described in RFC6675 -// section 5, step C. -func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool) { - s.SetPipe() - - if smss := int(s.ep.scoreboard.SMSS()); limit > smss { - // Cap segment size limit to s.smss as SACK recovery requires - // that all retransmissions or new segments send during recovery - // be of <= SMSS. - limit = smss - } - - nextSegHint := s.writeList.Front() - for s.outstanding < s.sndCwnd { - var nextSeg *segment - var rescueRtx bool - nextSeg, nextSegHint, rescueRtx = s.NextSeg(nextSegHint) - if nextSeg == nil { - return dataSent - } - if !s.isAssignedSequenceNumber(nextSeg) || s.sndNxt.LessThanEq(nextSeg.sequenceNumber) { - // New data being sent. - - // Step C.3 described below is handled by - // maybeSendSegment which increments sndNxt when - // a segment is transmitted. - // - // Step C.3 "If any of the data octets sent in - // (C.1) are above HighData, HighData must be - // updated to reflect the transmission of - // previously unsent data." - // - // We pass s.smss as the limit as the Step 2) requires that - // new data sent should be of size s.smss or less. - if sent := s.maybeSendSegment(nextSeg, limit, end); !sent { - return dataSent - } - dataSent = true - s.outstanding++ - s.writeNext = nextSeg.Next() - continue - } - - // Now handle the retransmission case where we matched either step 1,3 or 4 - // of the NextSeg algorithm. - // RFC 6675, Step C.4. - // - // "The estimate of the amount of data outstanding in the network - // must be updated by incrementing pipe by the number of octets - // transmitted in (C.1)." - s.outstanding++ - dataSent = true - s.sendSegment(nextSeg) - - segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen()) - if rescueRtx { - // We do the last part of rule (4) of NextSeg here to update - // RescueRxt as until this point we don't know if we are going - // to use the rescue transmission. - s.fr.rescueRxt = s.fr.last - } else { - // RFC 6675, Step C.2 - // - // "If any of the data octets sent in (C.1) are below - // HighData, HighRxt MUST be set to the highest sequence - // number of the retransmitted segment unless NextSeg () - // rule (4) was invoked for this retransmission." - s.fr.highRxt = segEnd - 1 - } - } - return dataSent -} - func (s *sender) sendZeroWindowProbe() { ack, win := s.ep.rcv.getSendParams() s.unackZeroWindowProbes++ @@ -1014,6 +965,30 @@ func (s *sender) disableZeroWindowProbing() { s.resendTimer.disable() } +func (s *sender) postXmit(dataSent bool) { + if dataSent { + // We sent data, so we should stop the keepalive timer to ensure + // that no keepalives are sent while there is pending data. + s.ep.disableKeepaliveTimer() + } + + // If the sender has advertized zero receive window and we have + // data to be sent out, start zero window probing to query the + // the remote for it's receive window size. + if s.writeNext != nil && s.sndWnd == 0 { + s.enableZeroWindowProbing() + } + + // Enable the timer if we have pending data and it's not enabled yet. + if !s.resendTimer.enabled() && s.sndUna != s.sndNxt { + s.resendTimer.enable(s.rto) + } + // If we have no more pending data, start the keepalive timer. + if s.sndUna == s.sndNxt { + s.ep.resetKeepaliveTimer(false) + } +} + // sendData sends new data segments. It is called when data becomes available or // when the send window opens up. func (s *sender) sendData() { @@ -1034,55 +1009,29 @@ func (s *sender) sendData() { } var dataSent bool - - // RFC 6675 recovery algorithm step C 1-5. - if s.fr.active && s.ep.sackPermitted { - dataSent = s.handleSACKRecovery(s.maxPayloadSize, end) - } else { - for seg := s.writeNext; seg != nil && s.outstanding < s.sndCwnd; seg = seg.Next() { - cwndLimit := (s.sndCwnd - s.outstanding) * s.maxPayloadSize - if cwndLimit < limit { - limit = cwndLimit - } - if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) { - // Move writeNext along so that we don't try and scan data that - // has already been SACKED. - s.writeNext = seg.Next() - continue - } - if sent := s.maybeSendSegment(seg, limit, end); !sent { - break - } - dataSent = true - s.outstanding += s.pCount(seg) + for seg := s.writeNext; seg != nil && s.outstanding < s.sndCwnd; seg = seg.Next() { + cwndLimit := (s.sndCwnd - s.outstanding) * s.maxPayloadSize + if cwndLimit < limit { + limit = cwndLimit + } + if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) { + // Move writeNext along so that we don't try and scan data that + // has already been SACKED. s.writeNext = seg.Next() + continue } + if sent := s.maybeSendSegment(seg, limit, end); !sent { + break + } + dataSent = true + s.outstanding += s.pCount(seg) + s.writeNext = seg.Next() } - if dataSent { - // We sent data, so we should stop the keepalive timer to ensure - // that no keepalives are sent while there is pending data. - s.ep.disableKeepaliveTimer() - } - - // If the sender has advertized zero receive window and we have - // data to be sent out, start zero window probing to query the - // the remote for it's receive window size. - if s.writeNext != nil && s.sndWnd == 0 { - s.enableZeroWindowProbing() - } - - // Enable the timer if we have pending data and it's not enabled yet. - if !s.resendTimer.enabled() && s.sndUna != s.sndNxt { - s.resendTimer.enable(s.rto) - } - // If we have no more pending data, start the keepalive timer. - if s.sndUna == s.sndNxt { - s.ep.resetKeepaliveTimer(false) - } + s.postXmit(dataSent) } -func (s *sender) enterFastRecovery() { +func (s *sender) enterRecovery() { s.fr.active = true // Save state to reflect we're now in fast recovery. // @@ -1095,6 +1044,7 @@ func (s *sender) enterFastRecovery() { s.fr.maxCwnd = s.sndCwnd + s.outstanding s.fr.highRxt = s.sndUna s.fr.rescueRxt = s.sndUna + if s.ep.sackPermitted { s.state = SACKRecovery s.ep.stack.Stats().TCP.SACKRecovery.Increment() @@ -1104,7 +1054,7 @@ func (s *sender) enterFastRecovery() { s.ep.stack.Stats().TCP.FastRecovery.Increment() } -func (s *sender) leaveFastRecovery() { +func (s *sender) leaveRecovery() { s.fr.active = false s.fr.maxCwnd = 0 s.dupAckCount = 0 @@ -1115,57 +1065,6 @@ func (s *sender) leaveFastRecovery() { s.cc.PostRecovery() } -func (s *sender) handleFastRecovery(seg *segment) (rtx bool) { - ack := seg.ackNumber - // We are in fast recovery mode. Ignore the ack if it's out of - // range. - if !ack.InRange(s.sndUna, s.sndNxt+1) { - return false - } - - // Leave fast recovery if it acknowledges all the data covered by - // this fast recovery session. - if s.fr.last.LessThan(ack) { - s.leaveFastRecovery() - return false - } - - if s.ep.sackPermitted { - // When SACK is enabled we let retransmission be governed by - // the SACK logic. - return false - } - - // Don't count this as a duplicate if it is carrying data or - // updating the window. - if seg.logicalLen() != 0 || s.sndWnd != seg.window { - return false - } - - // Inflate the congestion window if we're getting duplicate acks - // for the packet we retransmitted. - if ack == s.fr.first { - // We received a dup, inflate the congestion window by 1 packet - // if we're not at the max yet. Only inflate the window if - // regular FastRecovery is in use, RFC6675 does not require - // inflating cwnd on duplicate ACKs. - if s.sndCwnd < s.fr.maxCwnd { - s.sndCwnd++ - } - return false - } - - // A partial ack was received. Retransmit this packet and - // remember it so that we don't retransmit it again. We don't - // inflate the window because we're putting the same packet back - // onto the wire. - // - // N.B. The retransmit timer will be reset by the caller. - s.fr.first = ack - s.dupAckCount = 0 - return true -} - // isAssignedSequenceNumber relies on the fact that we only set flags once a // sequencenumber is assigned and that is only done right before we send the // segment. As a result any segment that has a non-zero flag has a valid @@ -1228,14 +1127,11 @@ func (s *sender) SetPipe() { s.outstanding = pipe } -// checkDuplicateAck is called when an ack is received. It manages the state -// related to duplicate acks and determines if a retransmit is needed according -// to the rules in RFC 6582 (NewReno). -func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) { +// detectLoss is called when an ack is received and returns whether a loss is +// detected. It manages the state related to duplicate acks and determines if +// a retransmit is needed according to the rules in RFC 6582 (NewReno). +func (s *sender) detectLoss(seg *segment) (fastRetransmit bool) { ack := seg.ackNumber - if s.fr.active { - return s.handleFastRecovery(seg) - } // We're not in fast recovery yet. A segment is considered a duplicate // only if it doesn't carry any data and doesn't update the send window, @@ -1266,15 +1162,16 @@ func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) { // See: https://tools.ietf.org/html/rfc6582#section-3.2 Step 2 // // We only do the check here, the incrementing of last to the highest - // sequence number transmitted till now is done when enterFastRecovery + // sequence number transmitted till now is done when enterRecovery // is invoked. if !s.fr.last.LessThan(seg.ackNumber) { s.dupAckCount = 0 return false } s.cc.HandleNDupAcks() - s.enterFastRecovery() + s.enterRecovery() s.dupAckCount = 0 + return true } @@ -1367,14 +1264,21 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { s.SetPipe() } - // Count the duplicates and do the fast retransmit if needed. - rtx := s.checkDuplicateAck(rcvdSeg) + ack := rcvdSeg.ackNumber + if s.fr.active { + // Leave fast recovery if it acknowledges all the data covered by + // this fast recovery session. + if s.fr.last.LessThan(ack) { + s.leaveRecovery() + } + } + + // Detect loss by counting the duplicates and enter recovery. + fastRetransmit := s.detectLoss(rcvdSeg) // Stash away the current window size. s.sndWnd = rcvdSeg.window - ack := rcvdSeg.ackNumber - // Disable zero window probing if remote advertizes a non-zero receive // window. This can be with an ACK to the zero window probe (where the // acknumber refers to the already acknowledged byte) OR to any previously @@ -1491,19 +1395,24 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { s.resendTimer.disable() } } + // Now that we've popped all acknowledged data from the retransmit // queue, retransmit if needed. - if rtx { - s.resendSegment() + if s.fr.active { + s.lr.DoRecovery(rcvdSeg, fastRetransmit) + // When SACK is enabled data sending is governed by steps in + // RFC 6675 Section 5 recovery steps A-C. + // See: https://tools.ietf.org/html/rfc6675#section-5. + if s.ep.sackPermitted { + return + } } // Send more data now that some of the pending data has been ack'd, or // that the window opened up, or the congestion window was inflated due // to a duplicate ack during fast recovery. This will also re-enable // the retransmit timer if needed. - if !s.ep.sackPermitted || s.fr.active || s.dupAckCount == 0 || rcvdSeg.hasNewSACKInfo { - s.sendData() - } + s.sendData() } // sendSegment sends the specified segment. |