diff options
Diffstat (limited to 'pkg/tcpip/transport/tcp/snd.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 283 |
1 files changed, 187 insertions, 96 deletions
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 201cf9aa9..0e0fdf14c 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -92,17 +92,6 @@ type congestionControl interface { PostRecovery() } -// lossRecovery is an interface that must be implemented by any supported -// loss recovery algorithm. -type lossRecovery interface { - // DoRecovery is invoked when loss is detected and segments need - // to be retransmitted. The cumulative or selective ACK is passed along - // with the flag which identifies whether the connection entered fast - // retransmit with this ACK and to retransmit the first unacknowledged - // segment. - DoRecovery(rcvdSeg *segment, fastRetransmit bool) -} - // sender holds the state necessary to send TCP segments. // // +stateify savable @@ -119,9 +108,6 @@ type sender struct { // fr holds state related to fast recovery. fr fastRecovery - // lr is the loss recovery algorithm used by the sender. - lr lossRecovery - // sndCwnd is the congestion window, in packets. sndCwnd int @@ -290,8 +276,6 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint s.cc = s.initCongestionControl(ep.cc) - s.lr = s.initLossRecovery() - // A negative sndWndScale means that no scaling is in use, otherwise we // store the scaling value. if sndWndScale > 0 { @@ -346,14 +330,6 @@ func (s *sender) initCongestionControl(congestionControlName tcpip.CongestionCon } } -// initLossRecovery initiates the loss recovery algorithm for the sender. -func (s *sender) initLossRecovery() lossRecovery { - if s.ep.sackPermitted { - return newSACKRecovery(s) - } - return newRenoRecovery(s) -} - // updateMaxPayloadSize updates the maximum payload size based on the given // MTU. If this is in response to "packet too big" control packets (indicated // by the count argument), it also reduces the number of outstanding packets and @@ -574,7 +550,7 @@ func (s *sender) retransmitTimerExpired() bool { // We were attempting fast recovery but were not successful. // Leave the state. We don't need to update ssthresh because it // has already been updated when entered fast-recovery. - s.leaveRecovery() + s.leaveFastRecovery() } s.state = RTORecovery @@ -937,6 +913,79 @@ func (s *sender) maybeSendSegment(seg *segment, limit int, end seqnum.Value) (se return true } +// handleSACKRecovery implements the loss recovery phase as described in RFC6675 +// section 5, step C. +func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool) { + s.SetPipe() + + if smss := int(s.ep.scoreboard.SMSS()); limit > smss { + // Cap segment size limit to s.smss as SACK recovery requires + // that all retransmissions or new segments send during recovery + // be of <= SMSS. + limit = smss + } + + nextSegHint := s.writeList.Front() + for s.outstanding < s.sndCwnd { + var nextSeg *segment + var rescueRtx bool + nextSeg, nextSegHint, rescueRtx = s.NextSeg(nextSegHint) + if nextSeg == nil { + return dataSent + } + if !s.isAssignedSequenceNumber(nextSeg) || s.sndNxt.LessThanEq(nextSeg.sequenceNumber) { + // New data being sent. + + // Step C.3 described below is handled by + // maybeSendSegment which increments sndNxt when + // a segment is transmitted. + // + // Step C.3 "If any of the data octets sent in + // (C.1) are above HighData, HighData must be + // updated to reflect the transmission of + // previously unsent data." + // + // We pass s.smss as the limit as the Step 2) requires that + // new data sent should be of size s.smss or less. + if sent := s.maybeSendSegment(nextSeg, limit, end); !sent { + return dataSent + } + dataSent = true + s.outstanding++ + s.writeNext = nextSeg.Next() + continue + } + + // Now handle the retransmission case where we matched either step 1,3 or 4 + // of the NextSeg algorithm. + // RFC 6675, Step C.4. + // + // "The estimate of the amount of data outstanding in the network + // must be updated by incrementing pipe by the number of octets + // transmitted in (C.1)." + s.outstanding++ + dataSent = true + s.sendSegment(nextSeg) + + segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen()) + if rescueRtx { + // We do the last part of rule (4) of NextSeg here to update + // RescueRxt as until this point we don't know if we are going + // to use the rescue transmission. + s.fr.rescueRxt = s.fr.last + } else { + // RFC 6675, Step C.2 + // + // "If any of the data octets sent in (C.1) are below + // HighData, HighRxt MUST be set to the highest sequence + // number of the retransmitted segment unless NextSeg () + // rule (4) was invoked for this retransmission." + s.fr.highRxt = segEnd - 1 + } + } + return dataSent +} + func (s *sender) sendZeroWindowProbe() { ack, win := s.ep.rcv.getSendParams() s.unackZeroWindowProbes++ @@ -965,30 +1014,6 @@ func (s *sender) disableZeroWindowProbing() { s.resendTimer.disable() } -func (s *sender) postXmit(dataSent bool) { - if dataSent { - // We sent data, so we should stop the keepalive timer to ensure - // that no keepalives are sent while there is pending data. - s.ep.disableKeepaliveTimer() - } - - // If the sender has advertized zero receive window and we have - // data to be sent out, start zero window probing to query the - // the remote for it's receive window size. - if s.writeNext != nil && s.sndWnd == 0 { - s.enableZeroWindowProbing() - } - - // Enable the timer if we have pending data and it's not enabled yet. - if !s.resendTimer.enabled() && s.sndUna != s.sndNxt { - s.resendTimer.enable(s.rto) - } - // If we have no more pending data, start the keepalive timer. - if s.sndUna == s.sndNxt { - s.ep.resetKeepaliveTimer(false) - } -} - // sendData sends new data segments. It is called when data becomes available or // when the send window opens up. func (s *sender) sendData() { @@ -1009,29 +1034,55 @@ func (s *sender) sendData() { } var dataSent bool - for seg := s.writeNext; seg != nil && s.outstanding < s.sndCwnd; seg = seg.Next() { - cwndLimit := (s.sndCwnd - s.outstanding) * s.maxPayloadSize - if cwndLimit < limit { - limit = cwndLimit - } - if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) { - // Move writeNext along so that we don't try and scan data that - // has already been SACKED. + + // RFC 6675 recovery algorithm step C 1-5. + if s.fr.active && s.ep.sackPermitted { + dataSent = s.handleSACKRecovery(s.maxPayloadSize, end) + } else { + for seg := s.writeNext; seg != nil && s.outstanding < s.sndCwnd; seg = seg.Next() { + cwndLimit := (s.sndCwnd - s.outstanding) * s.maxPayloadSize + if cwndLimit < limit { + limit = cwndLimit + } + if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) { + // Move writeNext along so that we don't try and scan data that + // has already been SACKED. + s.writeNext = seg.Next() + continue + } + if sent := s.maybeSendSegment(seg, limit, end); !sent { + break + } + dataSent = true + s.outstanding += s.pCount(seg) s.writeNext = seg.Next() - continue } - if sent := s.maybeSendSegment(seg, limit, end); !sent { - break - } - dataSent = true - s.outstanding += s.pCount(seg) - s.writeNext = seg.Next() } - s.postXmit(dataSent) + if dataSent { + // We sent data, so we should stop the keepalive timer to ensure + // that no keepalives are sent while there is pending data. + s.ep.disableKeepaliveTimer() + } + + // If the sender has advertized zero receive window and we have + // data to be sent out, start zero window probing to query the + // the remote for it's receive window size. + if s.writeNext != nil && s.sndWnd == 0 { + s.enableZeroWindowProbing() + } + + // Enable the timer if we have pending data and it's not enabled yet. + if !s.resendTimer.enabled() && s.sndUna != s.sndNxt { + s.resendTimer.enable(s.rto) + } + // If we have no more pending data, start the keepalive timer. + if s.sndUna == s.sndNxt { + s.ep.resetKeepaliveTimer(false) + } } -func (s *sender) enterRecovery() { +func (s *sender) enterFastRecovery() { s.fr.active = true // Save state to reflect we're now in fast recovery. // @@ -1044,7 +1095,6 @@ func (s *sender) enterRecovery() { s.fr.maxCwnd = s.sndCwnd + s.outstanding s.fr.highRxt = s.sndUna s.fr.rescueRxt = s.sndUna - if s.ep.sackPermitted { s.state = SACKRecovery s.ep.stack.Stats().TCP.SACKRecovery.Increment() @@ -1054,7 +1104,7 @@ func (s *sender) enterRecovery() { s.ep.stack.Stats().TCP.FastRecovery.Increment() } -func (s *sender) leaveRecovery() { +func (s *sender) leaveFastRecovery() { s.fr.active = false s.fr.maxCwnd = 0 s.dupAckCount = 0 @@ -1065,6 +1115,57 @@ func (s *sender) leaveRecovery() { s.cc.PostRecovery() } +func (s *sender) handleFastRecovery(seg *segment) (rtx bool) { + ack := seg.ackNumber + // We are in fast recovery mode. Ignore the ack if it's out of + // range. + if !ack.InRange(s.sndUna, s.sndNxt+1) { + return false + } + + // Leave fast recovery if it acknowledges all the data covered by + // this fast recovery session. + if s.fr.last.LessThan(ack) { + s.leaveFastRecovery() + return false + } + + if s.ep.sackPermitted { + // When SACK is enabled we let retransmission be governed by + // the SACK logic. + return false + } + + // Don't count this as a duplicate if it is carrying data or + // updating the window. + if seg.logicalLen() != 0 || s.sndWnd != seg.window { + return false + } + + // Inflate the congestion window if we're getting duplicate acks + // for the packet we retransmitted. + if ack == s.fr.first { + // We received a dup, inflate the congestion window by 1 packet + // if we're not at the max yet. Only inflate the window if + // regular FastRecovery is in use, RFC6675 does not require + // inflating cwnd on duplicate ACKs. + if s.sndCwnd < s.fr.maxCwnd { + s.sndCwnd++ + } + return false + } + + // A partial ack was received. Retransmit this packet and + // remember it so that we don't retransmit it again. We don't + // inflate the window because we're putting the same packet back + // onto the wire. + // + // N.B. The retransmit timer will be reset by the caller. + s.fr.first = ack + s.dupAckCount = 0 + return true +} + // isAssignedSequenceNumber relies on the fact that we only set flags once a // sequencenumber is assigned and that is only done right before we send the // segment. As a result any segment that has a non-zero flag has a valid @@ -1127,11 +1228,14 @@ func (s *sender) SetPipe() { s.outstanding = pipe } -// detectLoss is called when an ack is received and returns whether a loss is -// detected. It manages the state related to duplicate acks and determines if -// a retransmit is needed according to the rules in RFC 6582 (NewReno). -func (s *sender) detectLoss(seg *segment) (fastRetransmit bool) { +// checkDuplicateAck is called when an ack is received. It manages the state +// related to duplicate acks and determines if a retransmit is needed according +// to the rules in RFC 6582 (NewReno). +func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) { ack := seg.ackNumber + if s.fr.active { + return s.handleFastRecovery(seg) + } // We're not in fast recovery yet. A segment is considered a duplicate // only if it doesn't carry any data and doesn't update the send window, @@ -1162,16 +1266,15 @@ func (s *sender) detectLoss(seg *segment) (fastRetransmit bool) { // See: https://tools.ietf.org/html/rfc6582#section-3.2 Step 2 // // We only do the check here, the incrementing of last to the highest - // sequence number transmitted till now is done when enterRecovery + // sequence number transmitted till now is done when enterFastRecovery // is invoked. if !s.fr.last.LessThan(seg.ackNumber) { s.dupAckCount = 0 return false } s.cc.HandleNDupAcks() - s.enterRecovery() + s.enterFastRecovery() s.dupAckCount = 0 - return true } @@ -1312,21 +1415,14 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { s.SetPipe() } - ack := rcvdSeg.ackNumber - if s.fr.active { - // Leave fast recovery if it acknowledges all the data covered by - // this fast recovery session. - if s.fr.last.LessThan(ack) { - s.leaveRecovery() - } - } - - // Detect loss by counting the duplicates and enter recovery. - fastRetransmit := s.detectLoss(rcvdSeg) + // Count the duplicates and do the fast retransmit if needed. + rtx := s.checkDuplicateAck(rcvdSeg) // Stash away the current window size. s.sndWnd = rcvdSeg.window + ack := rcvdSeg.ackNumber + // Disable zero window probing if remote advertizes a non-zero receive // window. This can be with an ACK to the zero window probe (where the // acknumber refers to the already acknowledged byte) OR to any previously @@ -1443,24 +1539,19 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { s.resendTimer.disable() } } - // Now that we've popped all acknowledged data from the retransmit // queue, retransmit if needed. - if s.fr.active { - s.lr.DoRecovery(rcvdSeg, fastRetransmit) - // When SACK is enabled data sending is governed by steps in - // RFC 6675 Section 5 recovery steps A-C. - // See: https://tools.ietf.org/html/rfc6675#section-5. - if s.ep.sackPermitted { - return - } + if rtx { + s.resendSegment() } // Send more data now that some of the pending data has been ack'd, or // that the window opened up, or the congestion window was inflated due // to a duplicate ack during fast recovery. This will also re-enable // the retransmit timer if needed. - s.sendData() + if !s.ep.sackPermitted || s.fr.active || s.dupAckCount == 0 || rcvdSeg.hasNewSACKInfo { + s.sendData() + } } // sendSegment sends the specified segment. |