diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 9 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rack.go | 115 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rack_state.go | 5 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 52 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd_state.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_state_autogen.go | 6 |
7 files changed, 166 insertions, 26 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 34a631b53..461b1a9d7 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1301,7 +1301,8 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ // e.mu is expected to be hold upon entering this section. if e.snd != nil { e.snd.resendTimer.cleanup() - e.snd.rc.probeTimer.cleanup() + e.snd.probeTimer.cleanup() + e.snd.reorderTimer.cleanup() } if closeTimer != nil { @@ -1396,7 +1397,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ }, }, { - w: &e.snd.rc.probeWaker, + w: &e.snd.probeWaker, f: e.snd.probeTimerExpired, }, { @@ -1475,6 +1476,10 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ return nil }, }, + { + w: &e.snd.reorderWaker, + f: e.snd.rc.reorderTimerExpired, + }, } // Initialize the sleeper based on the wakers in funcs. diff --git a/pkg/tcpip/transport/tcp/rack.go b/pkg/tcpip/transport/tcp/rack.go index e862f159e..9959b60b8 100644 --- a/pkg/tcpip/transport/tcp/rack.go +++ b/pkg/tcpip/transport/tcp/rack.go @@ -17,7 +17,6 @@ package tcp import ( "time" - "gvisor.dev/gvisor/pkg/sleep" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/seqnum" ) @@ -50,7 +49,8 @@ type rackControl struct { // dsackSeen indicates if the connection has seen a DSACK. dsackSeen bool - // endSequence is the ending TCP sequence number of rackControl.seg. + // endSequence is the ending TCP sequence number of the most recent + // acknowledged segment. endSequence seqnum.Value // exitedRecovery indicates if the connection is exiting loss recovery. @@ -90,13 +90,10 @@ type rackControl struct { // rttSeq is the SND.NXT when rtt is updated. rttSeq seqnum.Value - // xmitTime is the latest transmission timestamp of rackControl.seg. + // xmitTime is the latest transmission timestamp of the most recent + // acknowledged segment. xmitTime time.Time `state:".(unixTime)"` - // probeTimer and probeWaker are used to schedule PTO for RACK TLP algorithm. - probeTimer timer `state:"nosave"` - probeWaker sleep.Waker `state:"nosave"` - // tlpRxtOut indicates whether there is an unacknowledged // TLP retransmission. tlpRxtOut bool @@ -114,7 +111,6 @@ func (rc *rackControl) init(snd *sender, iss seqnum.Value) { rc.fack = iss rc.reoWndIncr = 1 rc.snd = snd - rc.probeTimer.init(&rc.probeWaker) } // update will update the RACK related fields when an ACK has been received. @@ -223,13 +219,13 @@ func (s *sender) schedulePTO() { s.resendTimer.disable() } - s.rc.probeTimer.enable(pto) + s.probeTimer.enable(pto) } // probeTimerExpired is the same as TLP_send_probe() as defined in // https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.2. func (s *sender) probeTimerExpired() tcpip.Error { - if !s.rc.probeTimer.checkExpiration() { + if !s.probeTimer.checkExpiration() { return nil } @@ -386,3 +382,102 @@ func (rc *rackControl) updateRACKReorderWindow(ackSeg *segment) { func (rc *rackControl) exitRecovery() { rc.exitedRecovery = true } + +// detectLoss marks the segment as lost if the reordering window has elapsed +// and the ACK is not received. It will also arm the reorder timer. +// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 Step 5. +func (rc *rackControl) detectLoss(rcvTime time.Time) int { + var timeout time.Duration + numLost := 0 + for seg := rc.snd.writeList.Front(); seg != nil && seg.xmitCount != 0; seg = seg.Next() { + if rc.snd.ep.scoreboard.IsSACKED(seg.sackBlock()) { + continue + } + + if seg.lost && seg.xmitCount == 1 { + numLost++ + continue + } + + endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size())) + if seg.xmitTime.Before(rc.xmitTime) || (seg.xmitTime.Equal(rc.xmitTime) && rc.endSequence.LessThan(endSeq)) { + timeRemaining := seg.xmitTime.Sub(rcvTime) + rc.rtt + rc.reoWnd + if timeRemaining <= 0 { + seg.lost = true + numLost++ + } else if timeRemaining > timeout { + timeout = timeRemaining + } + } + } + + if timeout != 0 && !rc.snd.reorderTimer.enabled() { + rc.snd.reorderTimer.enable(timeout) + } + return numLost +} + +// reorderTimerExpired will retransmit the segments which have not been acked +// before the reorder timer expired. +func (rc *rackControl) reorderTimerExpired() tcpip.Error { + // Check if the timer actually expired or if it's a spurious wake due + // to a previously orphaned runtime timer. + if !rc.snd.reorderTimer.checkExpiration() { + return nil + } + + numLost := rc.detectLoss(time.Now()) + if numLost == 0 { + return nil + } + + fastRetransmit := false + if !rc.snd.fr.active { + rc.snd.cc.HandleLossDetected() + rc.snd.enterRecovery() + fastRetransmit = true + } + + rc.DoRecovery(nil, fastRetransmit) + return nil +} + +// DoRecovery implements lossRecovery.DoRecovery. +func (rc *rackControl) DoRecovery(_ *segment, fastRetransmit bool) { + snd := rc.snd + if fastRetransmit { + snd.resendSegment() + } + + var dataSent bool + // Iterate the writeList and retransmit the segments which are marked + // as lost by RACK. + for seg := snd.writeList.Front(); seg != nil && seg.xmitCount > 0; seg = seg.Next() { + if seg == snd.writeNext { + break + } + + if !seg.lost { + continue + } + + // Reset seg.lost as it is already SACKed. + if snd.ep.scoreboard.IsSACKED(seg.sackBlock()) { + seg.lost = false + continue + } + + // Check the congestion window after entering recovery. + if snd.outstanding >= snd.sndCwnd { + break + } + + snd.outstanding++ + dataSent = true + snd.sendSegment(seg) + } + + // Rearm the RTO. + snd.resendTimer.enable(snd.rto) + snd.postXmit(dataSent) +} diff --git a/pkg/tcpip/transport/tcp/rack_state.go b/pkg/tcpip/transport/tcp/rack_state.go index 76cad0831..c9dc7e773 100644 --- a/pkg/tcpip/transport/tcp/rack_state.go +++ b/pkg/tcpip/transport/tcp/rack_state.go @@ -27,8 +27,3 @@ func (rc *rackControl) saveXmitTime() unixTime { func (rc *rackControl) loadXmitTime(unix unixTime) { rc.xmitTime = time.Unix(unix.second, unix.nano) } - -// afterLoad is invoked by stateify. -func (rc *rackControl) afterLoad() { - rc.probeTimer.init(&rc.probeWaker) -} diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go index 7cca4def5..f27eef6a9 100644 --- a/pkg/tcpip/transport/tcp/segment.go +++ b/pkg/tcpip/transport/tcp/segment.go @@ -83,6 +83,9 @@ type segment struct { // dataMemSize is the memory used by data initially. dataMemSize int + + // lost indicates if the segment is marked as lost by RACK. + lost bool } func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) *segment { diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 063a7086a..d6365b93d 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -191,6 +191,15 @@ type sender struct { // rc has the fields needed for implementing RACK loss detection // algorithm. rc rackControl + + // reorderTimer is the timer used to retransmit the segments after RACK + // detects them as lost. + reorderTimer timer `state:"nosave"` + reorderWaker sleep.Waker `state:"nosave"` + + // probeTimer and probeWaker are used to schedule PTO for RACK TLP algorithm. + probeTimer timer `state:"nosave"` + probeWaker sleep.Waker `state:"nosave"` } // rtt is a synchronization wrapper used to appease stateify. See the comment @@ -267,7 +276,6 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint } s.cc = s.initCongestionControl(ep.cc) - s.lr = s.initLossRecovery() s.rc.init(s, iss) @@ -278,6 +286,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint } s.resendTimer.init(&s.resendWaker) + s.reorderTimer.init(&s.reorderWaker) + s.probeTimer.init(&s.probeWaker) s.updateMaxPayloadSize(int(ep.route.MTU()), 0) @@ -1126,6 +1136,15 @@ func (s *sender) SetPipe() { func (s *sender) detectLoss(seg *segment) (fastRetransmit bool) { // We're not in fast recovery yet. + // If RACK is enabled and there is no reordering we should honor the + // three duplicate ACK rule to enter recovery. + // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-4 + if s.ep.sackPermitted && s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 { + if s.rc.reorderSeen { + return false + } + } + if !s.isDupAck(seg) { s.dupAckCount = 0 return false @@ -1462,6 +1481,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { if s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 { s.rc.exitRecovery() } + s.reorderTimer.disable() } } @@ -1481,21 +1501,36 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { // Reset firstRetransmittedSegXmitTime to the zero value. s.firstRetransmittedSegXmitTime = time.Time{} s.resendTimer.disable() - s.rc.probeTimer.disable() + s.probeTimer.disable() } } - // Update RACK reorder window. - // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 - // * Upon receiving an ACK: - // * Step 4: Update RACK reordering window - if s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 { + if s.ep.sackPermitted && s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 { + // Update RACK reorder window. + // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 + // * Upon receiving an ACK: + // * Step 4: Update RACK reordering window s.rc.updateRACKReorderWindow(rcvdSeg) + + // After the reorder window is calculated, detect any loss by checking + // if the time elapsed after the segments are sent is greater than the + // reorder window. + if numLost := s.rc.detectLoss(rcvdSeg.rcvdTime); numLost > 0 && !s.fr.active { + // If any segment is marked as lost by + // RACK, enter recovery and retransmit + // the lost segments. + s.cc.HandleLossDetected() + s.enterRecovery() + } + + if s.fr.active { + s.rc.DoRecovery(nil, true) + } } // Now that we've popped all acknowledged data from the retransmit // queue, retransmit if needed. - if s.fr.active { + if s.fr.active && s.ep.tcpRecovery&tcpip.TCPRACKLossDetection == 0 { s.lr.DoRecovery(rcvdSeg, fastRetransmit) // When SACK is enabled data sending is governed by steps in // RFC 6675 Section 5 recovery steps A-C. @@ -1523,6 +1558,7 @@ func (s *sender) sendSegment(seg *segment) tcpip.Error { } seg.xmitTime = time.Now() seg.xmitCount++ + seg.lost = false err := s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber) // Every time a packet containing data is sent (including a diff --git a/pkg/tcpip/transport/tcp/snd_state.go b/pkg/tcpip/transport/tcp/snd_state.go index 8b20c3455..ba41cff6d 100644 --- a/pkg/tcpip/transport/tcp/snd_state.go +++ b/pkg/tcpip/transport/tcp/snd_state.go @@ -47,6 +47,8 @@ func (s *sender) loadRttMeasureTime(unix unixTime) { // afterLoad is invoked by stateify. func (s *sender) afterLoad() { s.resendTimer.init(&s.resendWaker) + s.reorderTimer.init(&s.reorderWaker) + s.probeTimer.init(&s.probeWaker) } // saveFirstRetransmittedSegXmitTime is invoked by stateify. diff --git a/pkg/tcpip/transport/tcp/tcp_state_autogen.go b/pkg/tcpip/transport/tcp/tcp_state_autogen.go index f19781bfe..4f3f62b98 100644 --- a/pkg/tcpip/transport/tcp/tcp_state_autogen.go +++ b/pkg/tcpip/transport/tcp/tcp_state_autogen.go @@ -435,6 +435,8 @@ func (rc *rackControl) StateSave(stateSinkObject state.Sink) { stateSinkObject.Save(14, &rc.snd) } +func (rc *rackControl) afterLoad() {} + func (rc *rackControl) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(0, &rc.dsackSeen) stateSourceObject.Load(1, &rc.endSequence) @@ -451,7 +453,6 @@ func (rc *rackControl) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(13, &rc.tlpHighRxt) stateSourceObject.Load(14, &rc.snd) stateSourceObject.LoadValue(11, new(unixTime), func(y interface{}) { rc.loadXmitTime(y.(unixTime)) }) - stateSourceObject.AfterLoad(rc.afterLoad) } func (r *receiver) StateTypeName() string { @@ -633,6 +634,7 @@ func (s *segment) StateFields() []string { "xmitCount", "acked", "dataMemSize", + "lost", } } @@ -668,6 +670,7 @@ func (s *segment) StateSave(stateSinkObject state.Sink) { stateSinkObject.Save(21, &s.xmitCount) stateSinkObject.Save(22, &s.acked) stateSinkObject.Save(23, &s.dataMemSize) + stateSinkObject.Save(24, &s.lost) } func (s *segment) afterLoad() {} @@ -693,6 +696,7 @@ func (s *segment) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(21, &s.xmitCount) stateSourceObject.Load(22, &s.acked) stateSourceObject.Load(23, &s.dataMemSize) + stateSourceObject.Load(24, &s.lost) stateSourceObject.LoadValue(8, new(buffer.VectorisedView), func(y interface{}) { s.loadData(y.(buffer.VectorisedView)) }) stateSourceObject.LoadValue(17, new([]byte), func(y interface{}) { s.loadOptions(y.([]byte)) }) stateSourceObject.LoadValue(19, new(unixTime), func(y interface{}) { s.loadRcvdTime(y.(unixTime)) }) |