summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport
diff options
context:
space:
mode:
authorNayana Bidari <nybidari@google.com>2021-02-08 12:08:11 -0800
committergVisor bot <gvisor-bot@google.com>2021-02-08 12:09:54 -0800
commitfe63db2e96069140a3e02a0dc7b7e28af9b463db (patch)
treec6ac9e4e6f0f8df01eb1f623cff853d013fd886f /pkg/tcpip/transport
parent3853a94f10b736e0c7b65766683ebe90b400d15b (diff)
RACK: Detect loss
Detect packet loss using reorder window and re-transmit them after the reorder timer expires. PiperOrigin-RevId: 356321786
Diffstat (limited to 'pkg/tcpip/transport')
-rw-r--r--pkg/tcpip/transport/tcp/connect.go9
-rw-r--r--pkg/tcpip/transport/tcp/rack.go115
-rw-r--r--pkg/tcpip/transport/tcp/rack_state.go5
-rw-r--r--pkg/tcpip/transport/tcp/segment.go3
-rw-r--r--pkg/tcpip/transport/tcp/snd.go52
-rw-r--r--pkg/tcpip/transport/tcp/snd_state.go2
-rw-r--r--pkg/tcpip/transport/tcp/tcp_rack_test.go47
7 files changed, 207 insertions, 26 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 34a631b53..461b1a9d7 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -1301,7 +1301,8 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
// e.mu is expected to be hold upon entering this section.
if e.snd != nil {
e.snd.resendTimer.cleanup()
- e.snd.rc.probeTimer.cleanup()
+ e.snd.probeTimer.cleanup()
+ e.snd.reorderTimer.cleanup()
}
if closeTimer != nil {
@@ -1396,7 +1397,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
},
},
{
- w: &e.snd.rc.probeWaker,
+ w: &e.snd.probeWaker,
f: e.snd.probeTimerExpired,
},
{
@@ -1475,6 +1476,10 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
return nil
},
},
+ {
+ w: &e.snd.reorderWaker,
+ f: e.snd.rc.reorderTimerExpired,
+ },
}
// Initialize the sleeper based on the wakers in funcs.
diff --git a/pkg/tcpip/transport/tcp/rack.go b/pkg/tcpip/transport/tcp/rack.go
index e862f159e..9959b60b8 100644
--- a/pkg/tcpip/transport/tcp/rack.go
+++ b/pkg/tcpip/transport/tcp/rack.go
@@ -17,7 +17,6 @@ package tcp
import (
"time"
- "gvisor.dev/gvisor/pkg/sleep"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/seqnum"
)
@@ -50,7 +49,8 @@ type rackControl struct {
// dsackSeen indicates if the connection has seen a DSACK.
dsackSeen bool
- // endSequence is the ending TCP sequence number of rackControl.seg.
+ // endSequence is the ending TCP sequence number of the most recent
+ // acknowledged segment.
endSequence seqnum.Value
// exitedRecovery indicates if the connection is exiting loss recovery.
@@ -90,13 +90,10 @@ type rackControl struct {
// rttSeq is the SND.NXT when rtt is updated.
rttSeq seqnum.Value
- // xmitTime is the latest transmission timestamp of rackControl.seg.
+ // xmitTime is the latest transmission timestamp of the most recent
+ // acknowledged segment.
xmitTime time.Time `state:".(unixTime)"`
- // probeTimer and probeWaker are used to schedule PTO for RACK TLP algorithm.
- probeTimer timer `state:"nosave"`
- probeWaker sleep.Waker `state:"nosave"`
-
// tlpRxtOut indicates whether there is an unacknowledged
// TLP retransmission.
tlpRxtOut bool
@@ -114,7 +111,6 @@ func (rc *rackControl) init(snd *sender, iss seqnum.Value) {
rc.fack = iss
rc.reoWndIncr = 1
rc.snd = snd
- rc.probeTimer.init(&rc.probeWaker)
}
// update will update the RACK related fields when an ACK has been received.
@@ -223,13 +219,13 @@ func (s *sender) schedulePTO() {
s.resendTimer.disable()
}
- s.rc.probeTimer.enable(pto)
+ s.probeTimer.enable(pto)
}
// probeTimerExpired is the same as TLP_send_probe() as defined in
// https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.2.
func (s *sender) probeTimerExpired() tcpip.Error {
- if !s.rc.probeTimer.checkExpiration() {
+ if !s.probeTimer.checkExpiration() {
return nil
}
@@ -386,3 +382,102 @@ func (rc *rackControl) updateRACKReorderWindow(ackSeg *segment) {
func (rc *rackControl) exitRecovery() {
rc.exitedRecovery = true
}
+
+// detectLoss marks the segment as lost if the reordering window has elapsed
+// and the ACK is not received. It will also arm the reorder timer.
+// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 Step 5.
+func (rc *rackControl) detectLoss(rcvTime time.Time) int {
+ var timeout time.Duration
+ numLost := 0
+ for seg := rc.snd.writeList.Front(); seg != nil && seg.xmitCount != 0; seg = seg.Next() {
+ if rc.snd.ep.scoreboard.IsSACKED(seg.sackBlock()) {
+ continue
+ }
+
+ if seg.lost && seg.xmitCount == 1 {
+ numLost++
+ continue
+ }
+
+ endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size()))
+ if seg.xmitTime.Before(rc.xmitTime) || (seg.xmitTime.Equal(rc.xmitTime) && rc.endSequence.LessThan(endSeq)) {
+ timeRemaining := seg.xmitTime.Sub(rcvTime) + rc.rtt + rc.reoWnd
+ if timeRemaining <= 0 {
+ seg.lost = true
+ numLost++
+ } else if timeRemaining > timeout {
+ timeout = timeRemaining
+ }
+ }
+ }
+
+ if timeout != 0 && !rc.snd.reorderTimer.enabled() {
+ rc.snd.reorderTimer.enable(timeout)
+ }
+ return numLost
+}
+
+// reorderTimerExpired will retransmit the segments which have not been acked
+// before the reorder timer expired.
+func (rc *rackControl) reorderTimerExpired() tcpip.Error {
+ // Check if the timer actually expired or if it's a spurious wake due
+ // to a previously orphaned runtime timer.
+ if !rc.snd.reorderTimer.checkExpiration() {
+ return nil
+ }
+
+ numLost := rc.detectLoss(time.Now())
+ if numLost == 0 {
+ return nil
+ }
+
+ fastRetransmit := false
+ if !rc.snd.fr.active {
+ rc.snd.cc.HandleLossDetected()
+ rc.snd.enterRecovery()
+ fastRetransmit = true
+ }
+
+ rc.DoRecovery(nil, fastRetransmit)
+ return nil
+}
+
+// DoRecovery implements lossRecovery.DoRecovery.
+func (rc *rackControl) DoRecovery(_ *segment, fastRetransmit bool) {
+ snd := rc.snd
+ if fastRetransmit {
+ snd.resendSegment()
+ }
+
+ var dataSent bool
+ // Iterate the writeList and retransmit the segments which are marked
+ // as lost by RACK.
+ for seg := snd.writeList.Front(); seg != nil && seg.xmitCount > 0; seg = seg.Next() {
+ if seg == snd.writeNext {
+ break
+ }
+
+ if !seg.lost {
+ continue
+ }
+
+ // Reset seg.lost as it is already SACKed.
+ if snd.ep.scoreboard.IsSACKED(seg.sackBlock()) {
+ seg.lost = false
+ continue
+ }
+
+ // Check the congestion window after entering recovery.
+ if snd.outstanding >= snd.sndCwnd {
+ break
+ }
+
+ snd.outstanding++
+ dataSent = true
+ snd.sendSegment(seg)
+ }
+
+ // Rearm the RTO.
+ snd.resendTimer.enable(snd.rto)
+ snd.postXmit(dataSent)
+}
diff --git a/pkg/tcpip/transport/tcp/rack_state.go b/pkg/tcpip/transport/tcp/rack_state.go
index 76cad0831..c9dc7e773 100644
--- a/pkg/tcpip/transport/tcp/rack_state.go
+++ b/pkg/tcpip/transport/tcp/rack_state.go
@@ -27,8 +27,3 @@ func (rc *rackControl) saveXmitTime() unixTime {
func (rc *rackControl) loadXmitTime(unix unixTime) {
rc.xmitTime = time.Unix(unix.second, unix.nano)
}
-
-// afterLoad is invoked by stateify.
-func (rc *rackControl) afterLoad() {
- rc.probeTimer.init(&rc.probeWaker)
-}
diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go
index 7cca4def5..f27eef6a9 100644
--- a/pkg/tcpip/transport/tcp/segment.go
+++ b/pkg/tcpip/transport/tcp/segment.go
@@ -83,6 +83,9 @@ type segment struct {
// dataMemSize is the memory used by data initially.
dataMemSize int
+
+ // lost indicates if the segment is marked as lost by RACK.
+ lost bool
}
func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) *segment {
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index 063a7086a..d6365b93d 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -191,6 +191,15 @@ type sender struct {
// rc has the fields needed for implementing RACK loss detection
// algorithm.
rc rackControl
+
+ // reorderTimer is the timer used to retransmit the segments after RACK
+ // detects them as lost.
+ reorderTimer timer `state:"nosave"`
+ reorderWaker sleep.Waker `state:"nosave"`
+
+ // probeTimer and probeWaker are used to schedule PTO for RACK TLP algorithm.
+ probeTimer timer `state:"nosave"`
+ probeWaker sleep.Waker `state:"nosave"`
}
// rtt is a synchronization wrapper used to appease stateify. See the comment
@@ -267,7 +276,6 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint
}
s.cc = s.initCongestionControl(ep.cc)
-
s.lr = s.initLossRecovery()
s.rc.init(s, iss)
@@ -278,6 +286,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint
}
s.resendTimer.init(&s.resendWaker)
+ s.reorderTimer.init(&s.reorderWaker)
+ s.probeTimer.init(&s.probeWaker)
s.updateMaxPayloadSize(int(ep.route.MTU()), 0)
@@ -1126,6 +1136,15 @@ func (s *sender) SetPipe() {
func (s *sender) detectLoss(seg *segment) (fastRetransmit bool) {
// We're not in fast recovery yet.
+ // If RACK is enabled and there is no reordering we should honor the
+ // three duplicate ACK rule to enter recovery.
+ // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-4
+ if s.ep.sackPermitted && s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 {
+ if s.rc.reorderSeen {
+ return false
+ }
+ }
+
if !s.isDupAck(seg) {
s.dupAckCount = 0
return false
@@ -1462,6 +1481,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
if s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 {
s.rc.exitRecovery()
}
+ s.reorderTimer.disable()
}
}
@@ -1481,21 +1501,36 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
// Reset firstRetransmittedSegXmitTime to the zero value.
s.firstRetransmittedSegXmitTime = time.Time{}
s.resendTimer.disable()
- s.rc.probeTimer.disable()
+ s.probeTimer.disable()
}
}
- // Update RACK reorder window.
- // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2
- // * Upon receiving an ACK:
- // * Step 4: Update RACK reordering window
- if s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 {
+ if s.ep.sackPermitted && s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 {
+ // Update RACK reorder window.
+ // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2
+ // * Upon receiving an ACK:
+ // * Step 4: Update RACK reordering window
s.rc.updateRACKReorderWindow(rcvdSeg)
+
+ // After the reorder window is calculated, detect any loss by checking
+ // if the time elapsed after the segments are sent is greater than the
+ // reorder window.
+ if numLost := s.rc.detectLoss(rcvdSeg.rcvdTime); numLost > 0 && !s.fr.active {
+ // If any segment is marked as lost by
+ // RACK, enter recovery and retransmit
+ // the lost segments.
+ s.cc.HandleLossDetected()
+ s.enterRecovery()
+ }
+
+ if s.fr.active {
+ s.rc.DoRecovery(nil, true)
+ }
}
// Now that we've popped all acknowledged data from the retransmit
// queue, retransmit if needed.
- if s.fr.active {
+ if s.fr.active && s.ep.tcpRecovery&tcpip.TCPRACKLossDetection == 0 {
s.lr.DoRecovery(rcvdSeg, fastRetransmit)
// When SACK is enabled data sending is governed by steps in
// RFC 6675 Section 5 recovery steps A-C.
@@ -1523,6 +1558,7 @@ func (s *sender) sendSegment(seg *segment) tcpip.Error {
}
seg.xmitTime = time.Now()
seg.xmitCount++
+ seg.lost = false
err := s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber)
// Every time a packet containing data is sent (including a
diff --git a/pkg/tcpip/transport/tcp/snd_state.go b/pkg/tcpip/transport/tcp/snd_state.go
index 8b20c3455..ba41cff6d 100644
--- a/pkg/tcpip/transport/tcp/snd_state.go
+++ b/pkg/tcpip/transport/tcp/snd_state.go
@@ -47,6 +47,8 @@ func (s *sender) loadRttMeasureTime(unix unixTime) {
// afterLoad is invoked by stateify.
func (s *sender) afterLoad() {
s.resendTimer.init(&s.resendWaker)
+ s.reorderTimer.init(&s.reorderWaker)
+ s.probeTimer.init(&s.probeWaker)
}
// saveFirstRetransmittedSegXmitTime is invoked by stateify.
diff --git a/pkg/tcpip/transport/tcp/tcp_rack_test.go b/pkg/tcpip/transport/tcp/tcp_rack_test.go
index 78db742a9..6da981d80 100644
--- a/pkg/tcpip/transport/tcp/tcp_rack_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_rack_test.go
@@ -25,6 +25,7 @@ import (
"gvisor.dev/gvisor/pkg/tcpip/seqnum"
"gvisor.dev/gvisor/pkg/tcpip/stack"
"gvisor.dev/gvisor/pkg/tcpip/transport/tcp/testing/context"
+ "gvisor.dev/gvisor/pkg/test/testutil"
)
const (
@@ -99,6 +100,8 @@ func TestRACKDetectReorder(t *testing.T) {
c := context.New(t, uint32(mtu))
defer c.Cleanup()
+ t.Skipf("Skipping this test as reorder detection does not consider DSACK.")
+
var n int
const ackNumToVerify = 2
probeDone := make(chan struct{})
@@ -591,7 +594,6 @@ func TestRACKCheckReorderWindow(t *testing.T) {
c.SendAck(seq, bytesRead)
// Missing [2-6] packets and SACK #7 packet.
- seq = seqnum.Value(context.TestInitialSequenceNumber).Add(1)
start := c.IRS.Add(1 + seqnum.Size(6*maxPayload))
end := start.Add(seqnum.Size(maxPayload))
c.SendAckWithSACK(seq, bytesRead, []header.SACKBlock{{start, end}})
@@ -607,3 +609,46 @@ func TestRACKCheckReorderWindow(t *testing.T) {
t.Fatalf("unexpected values for RACK variables: %v", err)
}
}
+
+func TestRACKWithDuplicateACK(t *testing.T) {
+ c := context.New(t, uint32(mtu))
+ defer c.Cleanup()
+
+ const numPackets = 4
+ data := sendAndReceive(t, c, numPackets)
+
+ // Send three duplicate ACKs to trigger fast recovery.
+ seq := seqnum.Value(context.TestInitialSequenceNumber).Add(1)
+ start := c.IRS.Add(1 + seqnum.Size(maxPayload))
+ end := start.Add(seqnum.Size(maxPayload))
+ for i := 0; i < 3; i++ {
+ c.SendAckWithSACK(seq, maxPayload, []header.SACKBlock{{start, end}})
+ end = end.Add(seqnum.Size(maxPayload))
+ }
+
+ // Receive the retransmitted packet.
+ c.ReceiveAndCheckPacketWithOptions(data, maxPayload, maxPayload, tsOptionSize)
+
+ metricPollFn := func() error {
+ tcpStats := c.Stack().Stats().TCP
+ stats := []struct {
+ stat *tcpip.StatCounter
+ name string
+ want uint64
+ }{
+ {tcpStats.FastRetransmit, "stats.TCP.FastRetransmit", 1},
+ {tcpStats.SACKRecovery, "stats.TCP.SACKRecovery", 1},
+ {tcpStats.FastRecovery, "stats.TCP.FastRecovery", 0},
+ }
+ for _, s := range stats {
+ if got, want := s.stat.Value(), s.want; got != want {
+ return fmt.Errorf("got %s.Value() = %d, want = %d", s.name, got, want)
+ }
+ }
+ return nil
+ }
+
+ if err := testutil.Poll(metricPollFn, 1*time.Second); err != nil {
+ t.Error(err)
+ }
+}