diff options
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 5 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 7 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/protocol.go | 5 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rack.go | 76 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rack_state.go | 5 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 3 | ||||
-rw-r--r-- | test/syscalls/linux/proc_net.cc | 8 |
7 files changed, 105 insertions, 4 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 0dc710276..a00ef97c6 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1357,6 +1357,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ // e.mu is expected to be hold upon entering this section. if e.snd != nil { e.snd.resendTimer.cleanup() + e.snd.rc.probeTimer.cleanup() } if closeTimer != nil { @@ -1437,6 +1438,10 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ }, }, { + w: &e.snd.rc.probeWaker, + f: e.snd.probeTimerExpired, + }, + { w: &e.newSegmentWaker, f: func() *tcpip.Error { return e.handleSegments(false /* fastPath */) diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 8f3981075..281f4cd58 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -508,6 +508,9 @@ type endpoint struct { // shutdownFlags represent the current shutdown state of the endpoint. shutdownFlags tcpip.ShutdownFlags + // tcpRecovery is the loss deteoction algorithm used by TCP. + tcpRecovery tcpip.TCPRecovery + // sackPermitted is set to true if the peer sends the TCPSACKPermitted // option in the SYN/SYN-ACK. sackPermitted bool @@ -918,6 +921,8 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue e.maxSynRetries = uint8(synRetries) } + s.TransportProtocolOption(ProtocolNumber, &e.tcpRecovery) + if p := s.GetTCPProbe(); p != nil { e.probe = p } @@ -3072,7 +3077,7 @@ func (e *endpoint) completeState() stack.TCPEndpointState { } } - rc := e.snd.rc + rc := &e.snd.rc s.Sender.RACKState = stack.TCPRACKState{ XmitTime: rc.xmitTime, EndSequence: rc.endSequence, diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go index 672159eed..c9e194f82 100644 --- a/pkg/tcpip/transport/tcp/protocol.go +++ b/pkg/tcpip/transport/tcp/protocol.go @@ -405,7 +405,7 @@ func (p *protocol) Option(option tcpip.GettableTransportProtocolOption) *tcpip.E case *tcpip.TCPRecovery: p.mu.RLock() - *v = tcpip.TCPRecovery(p.recovery) + *v = p.recovery p.mu.RUnlock() return nil @@ -543,7 +543,8 @@ func NewProtocol(s *stack.Stack) stack.TransportProtocol { minRTO: MinRTO, maxRTO: MaxRTO, maxRetries: MaxRetries, - recovery: tcpip.TCPRACKLossDetection, + // TODO(gvisor.dev/issue/5243): Set recovery to tcpip.TCPRACKLossDetection. + recovery: 0, } p.dispatcher.init(runtime.GOMAXPROCS(0)) return &p diff --git a/pkg/tcpip/transport/tcp/rack.go b/pkg/tcpip/transport/tcp/rack.go index e0a50a919..b71e6b992 100644 --- a/pkg/tcpip/transport/tcp/rack.go +++ b/pkg/tcpip/transport/tcp/rack.go @@ -17,9 +17,18 @@ package tcp import ( "time" + "gvisor.dev/gvisor/pkg/sleep" + "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/seqnum" ) +// wcDelayedACKTimeout is the recommended maximum delayed ACK timer value as +// defined in https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5. +// It stands for worst case delayed ACK timer (WCDelAckT). When FlightSize is +// 1, PTO is inflated by WCDelAckT time to compensate for a potential long +// delayed ACK timer at the receiver. +const wcDelayedACKTimeout = 200 * time.Millisecond + // RACK is a loss detection algorithm used in TCP to detect packet loss and // reordering using transmission timestamp of the packets instead of packet or // sequence counts. To use RACK, SACK should be enabled on the connection. @@ -54,6 +63,15 @@ type rackControl struct { // xmitTime is the latest transmission timestamp of rackControl.seg. xmitTime time.Time `state:".(unixTime)"` + + // probeTimer and probeWaker are used to schedule PTO for RACK TLP algorithm. + probeTimer timer `state:"nosave"` + probeWaker sleep.Waker `state:"nosave"` +} + +// init initializes RACK specific fields. +func (rc *rackControl) init() { + rc.probeTimer.init(&rc.probeWaker) } // update will update the RACK related fields when an ACK has been received. @@ -127,3 +145,61 @@ func (rc *rackControl) detectReorder(seg *segment) { func (rc *rackControl) setDSACKSeen() { rc.dsackSeen = true } + +// shouldSchedulePTO dictates whether we should schedule a PTO or not. +// See https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.1. +func (s *sender) shouldSchedulePTO() bool { + // Schedule PTO only if RACK loss detection is enabled. + return s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 && + // The connection supports SACK. + s.ep.sackPermitted && + // The connection is not in loss recovery. + (s.state != RTORecovery && s.state != SACKRecovery) && + // The connection has no SACKed sequences in the SACK scoreboard. + s.ep.scoreboard.Sacked() == 0 +} + +// schedulePTO schedules the probe timeout as defined in +// https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.1. +func (s *sender) schedulePTO() { + pto := time.Second + s.rtt.Lock() + if s.rtt.srttInited && s.rtt.srtt > 0 { + pto = s.rtt.srtt * 2 + if s.outstanding == 1 { + pto += wcDelayedACKTimeout + } + } + s.rtt.Unlock() + + now := time.Now() + if s.resendTimer.enabled() { + if now.Add(pto).After(s.resendTimer.target) { + pto = s.resendTimer.target.Sub(now) + } + s.resendTimer.disable() + } + + s.rc.probeTimer.enable(pto) +} + +// probeTimerExpired is the same as TLP_send_probe() as defined in +// https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.2. +func (s *sender) probeTimerExpired() *tcpip.Error { + if !s.rc.probeTimer.checkExpiration() { + return nil + } + // TODO(gvisor.dev/issue/5084): Implement this pseudo algorithm. + // If an unsent segment exists AND + // the receive window allows new data to be sent: + // Transmit the lowest-sequence unsent segment of up to SMSS + // Increment FlightSize by the size of the newly-sent segment + // Else if TLPRxtOut is not set: + // Retransmit the highest-sequence segment sent so far + // TLPRxtOut = true + // TLPHighRxt = SND.NXT + // The cwnd remains unchanged + // If FlightSize != 0: + // Arm RTO timer only. + return nil +} diff --git a/pkg/tcpip/transport/tcp/rack_state.go b/pkg/tcpip/transport/tcp/rack_state.go index c9dc7e773..76cad0831 100644 --- a/pkg/tcpip/transport/tcp/rack_state.go +++ b/pkg/tcpip/transport/tcp/rack_state.go @@ -27,3 +27,8 @@ func (rc *rackControl) saveXmitTime() unixTime { func (rc *rackControl) loadXmitTime(unix unixTime) { rc.xmitTime = time.Unix(unix.second, unix.nano) } + +// afterLoad is invoked by stateify. +func (rc *rackControl) afterLoad() { + rc.probeTimer.init(&rc.probeWaker) +} diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index cc991aba6..c0e9d98e3 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -286,6 +286,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint gso: ep.gso != nil, } + s.rc.init() + if s.gso { s.ep.gso.MSS = uint16(maxPayloadSize) } @@ -1455,6 +1457,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { // Reset firstRetransmittedSegXmitTime to the zero value. s.firstRetransmittedSegXmitTime = time.Time{} s.resendTimer.disable() + s.rc.probeTimer.disable() } } diff --git a/test/syscalls/linux/proc_net.cc b/test/syscalls/linux/proc_net.cc index 1cc700fe7..73140b2e9 100644 --- a/test/syscalls/linux/proc_net.cc +++ b/test/syscalls/linux/proc_net.cc @@ -499,7 +499,13 @@ TEST(ProcSysNetIpv4Recovery, CanReadAndWrite) { // Check initial value is set to 1. EXPECT_THAT(PreadFd(fd.get(), &buf, sizeof(buf), 0), SyscallSucceedsWithValue(sizeof(to_write) + 1)); - EXPECT_EQ(strcmp(buf, "1\n"), 0); + if (IsRunningOnGvisor()) { + // TODO(gvisor.dev/issue/5243): TCPRACKLossDetection = 1 should be turned on + // by default. + EXPECT_EQ(strcmp(buf, "0\n"), 0); + } else { + EXPECT_EQ(strcmp(buf, "1\n"), 0); + } // Set tcp_recovery to one of the allowed constants. EXPECT_THAT(PwriteFd(fd.get(), &to_write, sizeof(to_write), 0), |