diff options
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 1 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rack.go | 40 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 59 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_rack_test.go | 75 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/testing/context/context.go | 12 |
6 files changed, 170 insertions, 20 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index ae817091a..bc3b409ba 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -3013,6 +3013,7 @@ func (e *endpoint) completeState() stack.TCPEndpointState { EndSequence: rc.endSequence, FACK: rc.fack, RTT: rc.rtt, + Reord: rc.reorderSeen, } return s } diff --git a/pkg/tcpip/transport/tcp/rack.go b/pkg/tcpip/transport/tcp/rack.go index 439932595..d312b1b8b 100644 --- a/pkg/tcpip/transport/tcp/rack.go +++ b/pkg/tcpip/transport/tcp/rack.go @@ -29,12 +29,12 @@ import ( // // +stateify savable type rackControl struct { - // xmitTime is the latest transmission timestamp of rackControl.seg. - xmitTime time.Time `state:".(unixTime)"` - // endSequence is the ending TCP sequence number of rackControl.seg. endSequence seqnum.Value + // dsack indicates if the connection has seen a DSACK. + dsack bool + // fack is the highest selectively or cumulatively acknowledged // sequence. fack seqnum.Value @@ -47,11 +47,18 @@ type rackControl struct { // acknowledged) that was not marked invalid as a possible spurious // retransmission. rtt time.Duration + + // reorderSeen indicates if reordering has been detected on this + // connection. + reorderSeen bool + + // xmitTime is the latest transmission timestamp of rackControl.seg. + xmitTime time.Time `state:".(unixTime)"` } -// Update will update the RACK related fields when an ACK has been received. +// update will update the RACK related fields when an ACK has been received. // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 -func (rc *rackControl) Update(seg *segment, ackSeg *segment, offset uint32) { +func (rc *rackControl) update(seg *segment, ackSeg *segment, offset uint32) { rtt := time.Now().Sub(seg.xmitTime) // If the ACK is for a retransmitted packet, do not update if it is a @@ -92,3 +99,26 @@ func (rc *rackControl) Update(seg *segment, ackSeg *segment, offset uint32) { rc.endSequence = endSeq } } + +// detectReorder detects if packet reordering has been observed. +// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 +// * Step 3: Detect data segment reordering. +// To detect reordering, the sender looks for original data segments being +// delivered out of order. To detect such cases, the sender tracks the +// highest sequence selectively or cumulatively acknowledged in the RACK.fack +// variable. The name "fack" stands for the most "Forward ACK" (this term is +// adopted from [FACK]). If a never retransmitted segment that's below +// RACK.fack is (selectively or cumulatively) acknowledged, it has been +// delivered out of order. The sender sets RACK.reord to TRUE if such segment +// is identified. +func (rc *rackControl) detectReorder(seg *segment) { + endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size())) + if rc.fack.LessThan(endSeq) { + rc.fack = endSeq + return + } + + if endSeq.LessThan(rc.fack) && seg.xmitCount == 1 { + rc.reorderSeen = true + } +} diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go index 13acaf753..1f9c5cf50 100644 --- a/pkg/tcpip/transport/tcp/segment.go +++ b/pkg/tcpip/transport/tcp/segment.go @@ -71,6 +71,9 @@ type segment struct { // xmitTime is the last transmit time of this segment. xmitTime time.Time `state:".(unixTime)"` xmitCount uint32 + + // acked indicates if the segment has already been SACKed. + acked bool } func newSegment(r *stack.Route, id stack.TransportEndpointID, pkt *stack.PacketBuffer) *segment { diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 4c9a86cda..6fa8d63cd 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -17,6 +17,7 @@ package tcp import ( "fmt" "math" + "sort" "sync/atomic" "time" @@ -263,6 +264,9 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint highRxt: iss, rescueRxt: iss, }, + rc: rackControl{ + fack: iss, + }, gso: ep.gso != nil, } @@ -1274,6 +1278,39 @@ func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) { return true } +// Iterate the writeList and update RACK for each segment which is newly acked +// either cumulatively or selectively. Loop through the segments which are +// sacked, and update the RACK related variables and check for reordering. +// +// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 +// steps 2 and 3. +func (s *sender) walkSACK(rcvdSeg *segment) { + // Sort the SACK blocks. The first block is the most recent unacked + // block. The following blocks can be in arbitrary order. + sackBlocks := make([]header.SACKBlock, len(rcvdSeg.parsedOptions.SACKBlocks)) + copy(sackBlocks, rcvdSeg.parsedOptions.SACKBlocks) + sort.Slice(sackBlocks, func(i, j int) bool { + return sackBlocks[j].Start.LessThan(sackBlocks[i].Start) + }) + + seg := s.writeList.Front() + for _, sb := range sackBlocks { + // This check excludes DSACK blocks. + if sb.Start.LessThanEq(rcvdSeg.ackNumber) || sb.Start.LessThanEq(s.sndUna) || s.sndNxt.LessThan(sb.End) { + continue + } + + for seg != nil && seg.sequenceNumber.LessThan(sb.End) && seg.xmitCount != 0 { + if sb.Start.LessThanEq(seg.sequenceNumber) && !seg.acked { + s.rc.update(seg, rcvdSeg, s.ep.tsOffset) + s.rc.detectReorder(seg) + seg.acked = true + } + seg = seg.Next() + } + } +} + // handleRcvdSegment is called when a segment is received; it is responsible for // updating the send-related state. func (s *sender) handleRcvdSegment(rcvdSeg *segment) { @@ -1308,6 +1345,21 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { rcvdSeg.hasNewSACKInfo = true } } + + // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08 + // section-7.2 + // * Step 2: Update RACK stats. + // If the ACK is not ignored as invalid, update the RACK.rtt + // to be the RTT sample calculated using this ACK, and + // continue. If this ACK or SACK was for the most recently + // sent packet, then record the RACK.xmit_ts timestamp and + // RACK.end_seq sequence implied by this ACK. + // * Step 3: Detect packet reordering. + // If the ACK selectively or cumulatively acknowledges an + // unacknowledged and also never retransmitted sequence below + // RACK.fack, then the corresponding packet has been + // reordered and RACK.reord is set to TRUE. + s.walkSACK(rcvdSeg) s.SetPipe() } @@ -1385,13 +1437,14 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { } // Update the RACK fields if SACK is enabled. - if s.ep.sackPermitted { - s.rc.Update(seg, rcvdSeg, s.ep.tsOffset) + if s.ep.sackPermitted && !seg.acked { + s.rc.update(seg, rcvdSeg, s.ep.tsOffset) + s.rc.detectReorder(seg) } s.writeList.Remove(seg) - // if SACK is enabled then Only reduce outstanding if + // If SACK is enabled then Only reduce outstanding if // the segment was not previously SACKED as these have // already been accounted for in SetPipe(). if !s.ep.sackPermitted || !s.ep.scoreboard.IsSACKED(seg.sackBlock()) { diff --git a/pkg/tcpip/transport/tcp/tcp_rack_test.go b/pkg/tcpip/transport/tcp/tcp_rack_test.go index e03f101e8..d3f92b48c 100644 --- a/pkg/tcpip/transport/tcp/tcp_rack_test.go +++ b/pkg/tcpip/transport/tcp/tcp_rack_test.go @@ -21,17 +21,20 @@ import ( "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/buffer" "gvisor.dev/gvisor/pkg/tcpip/header" + "gvisor.dev/gvisor/pkg/tcpip/seqnum" "gvisor.dev/gvisor/pkg/tcpip/stack" "gvisor.dev/gvisor/pkg/tcpip/transport/tcp/testing/context" ) +const ( + maxPayload = 10 + tsOptionSize = 12 + maxTCPOptionSize = 40 +) + // TestRACKUpdate tests the RACK related fields are updated when an ACK is // received on a SACK enabled connection. func TestRACKUpdate(t *testing.T) { - const maxPayload = 10 - const tsOptionSize = 12 - const maxTCPOptionSize = 40 - c := context.New(t, uint32(header.TCPMinimumSize+header.IPv4MinimumSize+maxTCPOptionSize+maxPayload)) defer c.Cleanup() @@ -49,7 +52,7 @@ func TestRACKUpdate(t *testing.T) { } if state.Sender.RACKState.RTT == 0 { - t.Fatalf("RACK RTT failed to update when an ACK is received") + t.Fatalf("RACK RTT failed to update when an ACK is received, got RACKState.RTT == 0 want != 0") } }) setStackSACKPermitted(t, c, true) @@ -69,6 +72,66 @@ func TestRACKUpdate(t *testing.T) { bytesRead := 0 c.ReceiveAndCheckPacketWithOptions(data, bytesRead, maxPayload, tsOptionSize) bytesRead += maxPayload - c.SendAck(790, bytesRead) + c.SendAck(seqnum.Value(context.TestInitialSequenceNumber).Add(1), bytesRead) time.Sleep(200 * time.Millisecond) } + +// TestRACKDetectReorder tests that RACK detects packet reordering. +func TestRACKDetectReorder(t *testing.T) { + c := context.New(t, uint32(header.TCPMinimumSize+header.IPv4MinimumSize+maxTCPOptionSize+maxPayload)) + defer c.Cleanup() + + const ackNum = 2 + + var n int + ch := make(chan struct{}) + c.Stack().AddTCPProbe(func(state stack.TCPEndpointState) { + gotSeq := state.Sender.RACKState.FACK + wantSeq := state.Sender.SndNxt + // FACK should be updated to the highest ending sequence number of the + // segment acknowledged most recently. + if !gotSeq.LessThanEq(wantSeq) || gotSeq.LessThan(wantSeq) { + t.Fatalf("RACK FACK failed to update, got: %v, but want: %v", gotSeq, wantSeq) + } + + n++ + if n < ackNum { + if state.Sender.RACKState.Reord { + t.Fatalf("RACK reorder detected when there is no reordering") + } + return + } + + if state.Sender.RACKState.Reord == false { + t.Fatalf("RACK reorder detection failed") + } + close(ch) + }) + setStackSACKPermitted(t, c, true) + createConnectedWithSACKAndTS(c) + data := buffer.NewView(ackNum * maxPayload) + for i := range data { + data[i] = byte(i) + } + + // Write the data. + if _, _, err := c.EP.Write(tcpip.SlicePayload(data), tcpip.WriteOptions{}); err != nil { + t.Fatalf("Write failed: %s", err) + } + + bytesRead := 0 + for i := 0; i < ackNum; i++ { + c.ReceiveAndCheckPacketWithOptions(data, bytesRead, maxPayload, tsOptionSize) + bytesRead += maxPayload + } + + start := c.IRS.Add(maxPayload + 1) + end := start.Add(maxPayload) + seq := seqnum.Value(context.TestInitialSequenceNumber).Add(1) + c.SendAckWithSACK(seq, 0, []header.SACKBlock{{start, end}}) + c.SendAck(seq, bytesRead) + + // Wait for the probe function to finish processing the ACK before the + // test completes. + <-ch +} diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go index faf51ef95..4d7847142 100644 --- a/pkg/tcpip/transport/tcp/testing/context/context.go +++ b/pkg/tcpip/transport/tcp/testing/context/context.go @@ -68,9 +68,9 @@ const ( // V4MappedWildcardAddr is the mapped v6 representation of 0.0.0.0. V4MappedWildcardAddr = "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x00\x00" - // testInitialSequenceNumber is the initial sequence number sent in packets that + // TestInitialSequenceNumber is the initial sequence number sent in packets that // are sent in response to a SYN or in the initial SYN sent to the stack. - testInitialSequenceNumber = 789 + TestInitialSequenceNumber = 789 ) // StackAddrWithPrefix is StackAddr with its associated prefix length. @@ -505,7 +505,7 @@ func (c *Context) ReceiveAndCheckPacketWithOptions(data []byte, offset, size, op checker.TCP( checker.DstPort(TestPort), checker.TCPSeqNum(uint32(c.IRS.Add(seqnum.Size(1+offset)))), - checker.TCPAckNum(uint32(seqnum.Value(testInitialSequenceNumber).Add(1))), + checker.TCPAckNum(uint32(seqnum.Value(TestInitialSequenceNumber).Add(1))), checker.TCPFlagsMatch(header.TCPFlagAck, ^uint8(header.TCPFlagPsh)), ), ) @@ -532,7 +532,7 @@ func (c *Context) ReceiveNonBlockingAndCheckPacket(data []byte, offset, size int checker.TCP( checker.DstPort(TestPort), checker.TCPSeqNum(uint32(c.IRS.Add(seqnum.Size(1+offset)))), - checker.TCPAckNum(uint32(seqnum.Value(testInitialSequenceNumber).Add(1))), + checker.TCPAckNum(uint32(seqnum.Value(TestInitialSequenceNumber).Add(1))), checker.TCPFlagsMatch(header.TCPFlagAck, ^uint8(header.TCPFlagPsh)), ), ) @@ -912,7 +912,7 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) * // Build SYN-ACK. c.IRS = seqnum.Value(tcpSeg.SequenceNumber()) - iss := seqnum.Value(testInitialSequenceNumber) + iss := seqnum.Value(TestInitialSequenceNumber) c.SendPacket(nil, &Headers{ SrcPort: tcpSeg.DestinationPort(), DstPort: tcpSeg.SourcePort(), @@ -1084,7 +1084,7 @@ func (c *Context) PassiveConnectWithOptions(maxPayload, wndScale int, synOptions offset += paddingToAdd // Send a SYN request. - iss := seqnum.Value(testInitialSequenceNumber) + iss := seqnum.Value(TestInitialSequenceNumber) c.SendPacket(nil, &Headers{ SrcPort: TestPort, DstPort: StackPort, |