From ad0ac73626c3d65712791eba652c05869ed287f8 Mon Sep 17 00:00:00 2001 From: Ayush Ranjan Date: Tue, 12 Jan 2021 15:57:48 -0800 Subject: [rack] Set up TLP timer and configure timeout. This change implements TLP details enumerated in https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.1. Fixes #5083 PiperOrigin-RevId: 351467357 --- pkg/tcpip/transport/tcp/connect.go | 5 +++ pkg/tcpip/transport/tcp/endpoint.go | 7 +++- pkg/tcpip/transport/tcp/protocol.go | 5 ++- pkg/tcpip/transport/tcp/rack.go | 76 +++++++++++++++++++++++++++++++++++ pkg/tcpip/transport/tcp/rack_state.go | 5 +++ pkg/tcpip/transport/tcp/snd.go | 3 ++ 6 files changed, 98 insertions(+), 3 deletions(-) (limited to 'pkg/tcpip') diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 0dc710276..a00ef97c6 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1357,6 +1357,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ // e.mu is expected to be hold upon entering this section. if e.snd != nil { e.snd.resendTimer.cleanup() + e.snd.rc.probeTimer.cleanup() } if closeTimer != nil { @@ -1436,6 +1437,10 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ return nil }, }, + { + w: &e.snd.rc.probeWaker, + f: e.snd.probeTimerExpired, + }, { w: &e.newSegmentWaker, f: func() *tcpip.Error { diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 8f3981075..281f4cd58 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -508,6 +508,9 @@ type endpoint struct { // shutdownFlags represent the current shutdown state of the endpoint. shutdownFlags tcpip.ShutdownFlags + // tcpRecovery is the loss deteoction algorithm used by TCP. + tcpRecovery tcpip.TCPRecovery + // sackPermitted is set to true if the peer sends the TCPSACKPermitted // option in the SYN/SYN-ACK. sackPermitted bool @@ -918,6 +921,8 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue e.maxSynRetries = uint8(synRetries) } + s.TransportProtocolOption(ProtocolNumber, &e.tcpRecovery) + if p := s.GetTCPProbe(); p != nil { e.probe = p } @@ -3072,7 +3077,7 @@ func (e *endpoint) completeState() stack.TCPEndpointState { } } - rc := e.snd.rc + rc := &e.snd.rc s.Sender.RACKState = stack.TCPRACKState{ XmitTime: rc.xmitTime, EndSequence: rc.endSequence, diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go index 672159eed..c9e194f82 100644 --- a/pkg/tcpip/transport/tcp/protocol.go +++ b/pkg/tcpip/transport/tcp/protocol.go @@ -405,7 +405,7 @@ func (p *protocol) Option(option tcpip.GettableTransportProtocolOption) *tcpip.E case *tcpip.TCPRecovery: p.mu.RLock() - *v = tcpip.TCPRecovery(p.recovery) + *v = p.recovery p.mu.RUnlock() return nil @@ -543,7 +543,8 @@ func NewProtocol(s *stack.Stack) stack.TransportProtocol { minRTO: MinRTO, maxRTO: MaxRTO, maxRetries: MaxRetries, - recovery: tcpip.TCPRACKLossDetection, + // TODO(gvisor.dev/issue/5243): Set recovery to tcpip.TCPRACKLossDetection. + recovery: 0, } p.dispatcher.init(runtime.GOMAXPROCS(0)) return &p diff --git a/pkg/tcpip/transport/tcp/rack.go b/pkg/tcpip/transport/tcp/rack.go index e0a50a919..b71e6b992 100644 --- a/pkg/tcpip/transport/tcp/rack.go +++ b/pkg/tcpip/transport/tcp/rack.go @@ -17,9 +17,18 @@ package tcp import ( "time" + "gvisor.dev/gvisor/pkg/sleep" + "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/seqnum" ) +// wcDelayedACKTimeout is the recommended maximum delayed ACK timer value as +// defined in https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5. +// It stands for worst case delayed ACK timer (WCDelAckT). When FlightSize is +// 1, PTO is inflated by WCDelAckT time to compensate for a potential long +// delayed ACK timer at the receiver. +const wcDelayedACKTimeout = 200 * time.Millisecond + // RACK is a loss detection algorithm used in TCP to detect packet loss and // reordering using transmission timestamp of the packets instead of packet or // sequence counts. To use RACK, SACK should be enabled on the connection. @@ -54,6 +63,15 @@ type rackControl struct { // xmitTime is the latest transmission timestamp of rackControl.seg. xmitTime time.Time `state:".(unixTime)"` + + // probeTimer and probeWaker are used to schedule PTO for RACK TLP algorithm. + probeTimer timer `state:"nosave"` + probeWaker sleep.Waker `state:"nosave"` +} + +// init initializes RACK specific fields. +func (rc *rackControl) init() { + rc.probeTimer.init(&rc.probeWaker) } // update will update the RACK related fields when an ACK has been received. @@ -127,3 +145,61 @@ func (rc *rackControl) detectReorder(seg *segment) { func (rc *rackControl) setDSACKSeen() { rc.dsackSeen = true } + +// shouldSchedulePTO dictates whether we should schedule a PTO or not. +// See https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.1. +func (s *sender) shouldSchedulePTO() bool { + // Schedule PTO only if RACK loss detection is enabled. + return s.ep.tcpRecovery&tcpip.TCPRACKLossDetection != 0 && + // The connection supports SACK. + s.ep.sackPermitted && + // The connection is not in loss recovery. + (s.state != RTORecovery && s.state != SACKRecovery) && + // The connection has no SACKed sequences in the SACK scoreboard. + s.ep.scoreboard.Sacked() == 0 +} + +// schedulePTO schedules the probe timeout as defined in +// https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.1. +func (s *sender) schedulePTO() { + pto := time.Second + s.rtt.Lock() + if s.rtt.srttInited && s.rtt.srtt > 0 { + pto = s.rtt.srtt * 2 + if s.outstanding == 1 { + pto += wcDelayedACKTimeout + } + } + s.rtt.Unlock() + + now := time.Now() + if s.resendTimer.enabled() { + if now.Add(pto).After(s.resendTimer.target) { + pto = s.resendTimer.target.Sub(now) + } + s.resendTimer.disable() + } + + s.rc.probeTimer.enable(pto) +} + +// probeTimerExpired is the same as TLP_send_probe() as defined in +// https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.5.2. +func (s *sender) probeTimerExpired() *tcpip.Error { + if !s.rc.probeTimer.checkExpiration() { + return nil + } + // TODO(gvisor.dev/issue/5084): Implement this pseudo algorithm. + // If an unsent segment exists AND + // the receive window allows new data to be sent: + // Transmit the lowest-sequence unsent segment of up to SMSS + // Increment FlightSize by the size of the newly-sent segment + // Else if TLPRxtOut is not set: + // Retransmit the highest-sequence segment sent so far + // TLPRxtOut = true + // TLPHighRxt = SND.NXT + // The cwnd remains unchanged + // If FlightSize != 0: + // Arm RTO timer only. + return nil +} diff --git a/pkg/tcpip/transport/tcp/rack_state.go b/pkg/tcpip/transport/tcp/rack_state.go index c9dc7e773..76cad0831 100644 --- a/pkg/tcpip/transport/tcp/rack_state.go +++ b/pkg/tcpip/transport/tcp/rack_state.go @@ -27,3 +27,8 @@ func (rc *rackControl) saveXmitTime() unixTime { func (rc *rackControl) loadXmitTime(unix unixTime) { rc.xmitTime = time.Unix(unix.second, unix.nano) } + +// afterLoad is invoked by stateify. +func (rc *rackControl) afterLoad() { + rc.probeTimer.init(&rc.probeWaker) +} diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index cc991aba6..c0e9d98e3 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -286,6 +286,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint gso: ep.gso != nil, } + s.rc.init() + if s.gso { s.ep.gso.MSS = uint16(maxPayloadSize) } @@ -1455,6 +1457,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { // Reset firstRetransmittedSegXmitTime to the zero value. s.firstRetransmittedSegXmitTime = time.Time{} s.resendTimer.disable() + s.rc.probeTimer.disable() } } -- cgit v1.2.3