From fcad6f91a3f292b6b76be10f03baf05ee5245d3d Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Wed, 26 May 2021 06:47:52 -0700 Subject: Use the stack clock everywhere Updates #5939. Updates #6012. RELNOTES: n/a PiperOrigin-RevId: 375931554 --- pkg/tcpip/transport/tcp/BUILD | 3 +- pkg/tcpip/transport/tcp/accept.go | 12 ++++---- pkg/tcpip/transport/tcp/connect.go | 30 +++++++++--------- pkg/tcpip/transport/tcp/cubic.go | 19 ++++++------ pkg/tcpip/transport/tcp/dispatcher.go | 5 +-- pkg/tcpip/transport/tcp/endpoint.go | 43 ++++++++++++++------------ pkg/tcpip/transport/tcp/endpoint_state.go | 27 ++++------------ pkg/tcpip/transport/tcp/forwarder.go | 2 +- pkg/tcpip/transport/tcp/protocol.go | 4 +-- pkg/tcpip/transport/tcp/rack.go | 12 ++++---- pkg/tcpip/transport/tcp/rcv.go | 15 +++++---- pkg/tcpip/transport/tcp/rcv_state.go | 29 ------------------ pkg/tcpip/transport/tcp/segment.go | 13 ++++---- pkg/tcpip/transport/tcp/segment_state.go | 22 ------------- pkg/tcpip/transport/tcp/segment_test.go | 6 ++-- pkg/tcpip/transport/tcp/snd.go | 30 +++++++++--------- pkg/tcpip/transport/tcp/snd_state.go | 42 ------------------------- pkg/tcpip/transport/tcp/tcp_rack_test.go | 4 +-- pkg/tcpip/transport/tcp/timer.go | 51 ++++++++++++++++--------------- pkg/tcpip/transport/tcp/timer_test.go | 7 +++-- 20 files changed, 138 insertions(+), 238 deletions(-) delete mode 100644 pkg/tcpip/transport/tcp/rcv_state.go delete mode 100644 pkg/tcpip/transport/tcp/snd_state.go (limited to 'pkg/tcpip/transport') diff --git a/pkg/tcpip/transport/tcp/BUILD b/pkg/tcpip/transport/tcp/BUILD index 0f20d3856..8436d2cf0 100644 --- a/pkg/tcpip/transport/tcp/BUILD +++ b/pkg/tcpip/transport/tcp/BUILD @@ -41,7 +41,6 @@ go_library( "protocol.go", "rack.go", "rcv.go", - "rcv_state.go", "reno.go", "reno_recovery.go", "sack.go", @@ -53,7 +52,6 @@ go_library( "segment_state.go", "segment_unsafe.go", "snd.go", - "snd_state.go", "tcp_endpoint_list.go", "tcp_segment_list.go", "timer.go", @@ -134,6 +132,7 @@ go_test( deps = [ "//pkg/sleep", "//pkg/tcpip/buffer", + "//pkg/tcpip/faketime", "//pkg/tcpip/stack", "@com_github_google_go_cmp//cmp:go_default_library", ], diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index d4bd4e80e..4a33fbc24 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -114,8 +114,8 @@ type listenContext struct { } // timeStamp returns an 8-bit timestamp with a granularity of 64 seconds. -func timeStamp() uint32 { - return uint32(time.Now().Unix()>>6) & tsMask +func timeStamp(clock tcpip.Clock) uint32 { + return uint32(clock.NowMonotonic().Sub(tcpip.MonotonicTime{}).Seconds()) >> 6 & tsMask } // newListenContext creates a new listen context. @@ -171,7 +171,7 @@ func (l *listenContext) cookieHash(id stack.TransportEndpointID, ts uint32, nonc // createCookie creates a SYN cookie for the given id and incoming sequence // number. func (l *listenContext) createCookie(id stack.TransportEndpointID, seq seqnum.Value, data uint32) seqnum.Value { - ts := timeStamp() + ts := timeStamp(l.stack.Clock()) v := l.cookieHash(id, 0, 0) + uint32(seq) + (ts << tsOffset) v += (l.cookieHash(id, ts, 1) + data) & hashMask return seqnum.Value(v) @@ -181,7 +181,7 @@ func (l *listenContext) createCookie(id stack.TransportEndpointID, seq seqnum.Va // sequence number. If it is, it also returns the data originally encoded in the // cookie when createCookie was called. func (l *listenContext) isCookieValid(id stack.TransportEndpointID, cookie seqnum.Value, seq seqnum.Value) (uint32, bool) { - ts := timeStamp() + ts := timeStamp(l.stack.Clock()) v := uint32(cookie) - l.cookieHash(id, 0, 0) - uint32(seq) cookieTS := v >> tsOffset if ((ts - cookieTS) & tsMask) > maxTSDiff { @@ -247,7 +247,7 @@ func (l *listenContext) createConnectingEndpoint(s *segment, rcvdSynOpts *header func (l *listenContext) startHandshake(s *segment, opts *header.TCPSynOptions, queue *waiter.Queue, owner tcpip.PacketOwner) (*handshake, tcpip.Error) { // Create new endpoint. irs := s.sequenceNumber - isn := generateSecureISN(s.id, l.stack.Seed()) + isn := generateSecureISN(s.id, l.stack.Clock(), l.stack.Seed()) ep, err := l.createConnectingEndpoint(s, opts, queue) if err != nil { return nil, err @@ -591,7 +591,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err synOpts := header.TCPSynOptions{ WS: -1, TS: opts.TS, - TSVal: tcpTimeStamp(time.Now(), timeStampOffset()), + TSVal: tcpTimeStamp(e.stack.Clock().NowMonotonic(), timeStampOffset()), TSEcr: opts.TSVal, MSS: calculateAdvertisedMSS(e.userMSS, route), } diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 05b41e0f8..2fcdf1cda 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -92,7 +92,7 @@ type handshake struct { rcvWndScale int // startTime is the time at which the first SYN/SYN-ACK was sent. - startTime time.Time + startTime tcpip.MonotonicTime // deferAccept if non-zero will drop the final ACK for a passive // handshake till an ACK segment with data is received or the timeout is @@ -156,12 +156,12 @@ func (h *handshake) resetState() { h.flags = header.TCPFlagSyn h.ackNum = 0 h.mss = 0 - h.iss = generateSecureISN(h.ep.TransportEndpointInfo.ID, h.ep.stack.Seed()) + h.iss = generateSecureISN(h.ep.TransportEndpointInfo.ID, h.ep.stack.Clock(), h.ep.stack.Seed()) } // generateSecureISN generates a secure Initial Sequence number based on the // recommendation here https://tools.ietf.org/html/rfc6528#page-3. -func generateSecureISN(id stack.TransportEndpointID, seed uint32) seqnum.Value { +func generateSecureISN(id stack.TransportEndpointID, clock tcpip.Clock, seed uint32) seqnum.Value { isnHasher := jenkins.Sum32(seed) isnHasher.Write([]byte(id.LocalAddress)) isnHasher.Write([]byte(id.RemoteAddress)) @@ -180,7 +180,7 @@ func generateSecureISN(id stack.TransportEndpointID, seed uint32) seqnum.Value { // // Which sort of guarantees that we won't reuse the ISN for a new // connection for the same tuple for at least 274s. - isn := isnHasher.Sum32() + uint32(time.Now().UnixNano()>>6) + isn := isnHasher.Sum32() + uint32(clock.NowMonotonic().Sub(tcpip.MonotonicTime{}).Nanoseconds()>>6) return seqnum.Value(isn) } @@ -381,7 +381,7 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error { if s.flagIsSet(header.TCPFlagAck) { // If deferAccept is not zero and this is a bare ACK and the // timeout is not hit then drop the ACK. - if h.deferAccept != 0 && s.data.Size() == 0 && time.Since(h.startTime) < h.deferAccept { + if h.deferAccept != 0 && s.data.Size() == 0 && h.ep.stack.Clock().NowMonotonic().Sub(h.startTime) < h.deferAccept { h.acked = true h.ep.stack.Stats().DroppedPackets.Increment() return nil @@ -474,7 +474,7 @@ func (h *handshake) processSegments() tcpip.Error { // start sends the first SYN/SYN-ACK. It does not block, even if link address // resolution is required. func (h *handshake) start() { - h.startTime = time.Now() + h.startTime = h.ep.stack.Clock().NowMonotonic() h.ep.amss = calculateAdvertisedMSS(h.ep.userMSS, h.ep.route) var sackEnabled tcpip.TCPSACKEnabled if err := h.ep.stack.TransportProtocolOption(ProtocolNumber, &sackEnabled); err != nil { @@ -527,7 +527,7 @@ func (h *handshake) complete() tcpip.Error { defer s.Done() // Initialize the resend timer. - timer, err := newBackoffTimer(time.Second, MaxRTO, resendWaker.Assert) + timer, err := newBackoffTimer(h.ep.stack.Clock(), time.Second, MaxRTO, resendWaker.Assert) if err != nil { return err } @@ -552,7 +552,7 @@ func (h *handshake) complete() tcpip.Error { // The last is required to provide a way for the peer to complete // the connection with another ACK or data (as ACKs are never // retransmitted on their own). - if h.active || !h.acked || h.deferAccept != 0 && time.Since(h.startTime) > h.deferAccept { + if h.active || !h.acked || h.deferAccept != 0 && h.ep.stack.Clock().NowMonotonic().Sub(h.startTime) > h.deferAccept { h.ep.sendSynTCP(h.ep.route, tcpFields{ id: h.ep.TransportEndpointInfo.ID, ttl: h.ep.ttl, @@ -608,15 +608,15 @@ func (h *handshake) complete() tcpip.Error { type backoffTimer struct { timeout time.Duration maxTimeout time.Duration - t *time.Timer + t tcpip.Timer } -func newBackoffTimer(timeout, maxTimeout time.Duration, f func()) (*backoffTimer, tcpip.Error) { +func newBackoffTimer(clock tcpip.Clock, timeout, maxTimeout time.Duration, f func()) (*backoffTimer, tcpip.Error) { if timeout > maxTimeout { return nil, &tcpip.ErrTimeout{} } bt := &backoffTimer{timeout: timeout, maxTimeout: maxTimeout} - bt.t = time.AfterFunc(timeout, f) + bt.t = clock.AfterFunc(timeout, f) return bt, nil } @@ -1267,7 +1267,7 @@ func (e *endpoint) keepaliveTimerExpired() tcpip.Error { // If a userTimeout is set then abort the connection if it is // exceeded. - if userTimeout != 0 && time.Since(e.rcv.lastRcvdAckTime) >= userTimeout && e.keepalive.unacked > 0 { + if userTimeout != 0 && e.stack.Clock().NowMonotonic().Sub(e.rcv.lastRcvdAckTime) >= userTimeout && e.keepalive.unacked > 0 { e.keepalive.Unlock() e.stack.Stats().TCP.EstablishedTimedout.Increment() return &tcpip.ErrTimeout{} @@ -1322,7 +1322,7 @@ func (e *endpoint) disableKeepaliveTimer() { // segments. func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{}) tcpip.Error { e.mu.Lock() - var closeTimer *time.Timer + var closeTimer tcpip.Timer var closeWaker sleep.Waker epilogue := func() { @@ -1484,7 +1484,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ if e.EndpointState() == StateFinWait2 && e.closed { // The socket has been closed and we are in FIN_WAIT2 // so start the FIN_WAIT2 timer. - closeTimer = time.AfterFunc(e.tcpLingerTimeout, closeWaker.Assert) + closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert) } } @@ -1721,7 +1721,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) { var timeWaitWaker sleep.Waker s.AddWaker(&timeWaitWaker, timeWaitDone) - timeWaitTimer := time.AfterFunc(timeWaitDuration, timeWaitWaker.Assert) + timeWaitTimer := e.stack.Clock().AfterFunc(timeWaitDuration, timeWaitWaker.Assert) defer timeWaitTimer.Stop() for { diff --git a/pkg/tcpip/transport/tcp/cubic.go b/pkg/tcpip/transport/tcp/cubic.go index 962f1d687..6985194bb 100644 --- a/pkg/tcpip/transport/tcp/cubic.go +++ b/pkg/tcpip/transport/tcp/cubic.go @@ -41,7 +41,7 @@ type cubicState struct { func newCubicCC(s *sender) *cubicState { return &cubicState{ TCPCubicState: stack.TCPCubicState{ - T: time.Now(), + T: s.ep.stack.Clock().NowMonotonic(), Beta: 0.7, C: 0.4, }, @@ -60,7 +60,7 @@ func (c *cubicState) enterCongestionAvoidance() { // https://tools.ietf.org/html/rfc8312#section-4.8 if c.numCongestionEvents == 0 { c.K = 0 - c.T = time.Now() + c.T = c.s.ep.stack.Clock().NowMonotonic() c.WLastMax = c.WMax c.WMax = float64(c.s.SndCwnd) } @@ -115,14 +115,15 @@ func (c *cubicState) cubicCwnd(t float64) float64 { // getCwnd returns the current congestion window as computed by CUBIC. // Refer: https://tools.ietf.org/html/rfc8312#section-4 func (c *cubicState) getCwnd(packetsAcked, sndCwnd int, srtt time.Duration) int { - elapsed := time.Since(c.T).Seconds() + elapsed := c.s.ep.stack.Clock().NowMonotonic().Sub(c.T) + elapsedSeconds := elapsed.Seconds() // Compute the window as per Cubic after 'elapsed' time // since last congestion event. - c.WC = c.cubicCwnd(elapsed - c.K) + c.WC = c.cubicCwnd(elapsedSeconds - c.K) // Compute the TCP friendly estimate of the congestion window. - c.WEst = c.WMax*c.Beta + (3.0*((1.0-c.Beta)/(1.0+c.Beta)))*(elapsed/srtt.Seconds()) + c.WEst = c.WMax*c.Beta + (3.0*((1.0-c.Beta)/(1.0+c.Beta)))*(elapsedSeconds/srtt.Seconds()) // Make sure in the TCP friendly region CUBIC performs at least // as well as Reno. @@ -134,7 +135,7 @@ func (c *cubicState) getCwnd(packetsAcked, sndCwnd int, srtt time.Duration) int // In Concave/Convex region of CUBIC, calculate what CUBIC window // will be after 1 RTT and use that to grow congestion window // for every ack. - tEst := (time.Since(c.T) + srtt).Seconds() + tEst := (elapsed + srtt).Seconds() wtRtt := c.cubicCwnd(tEst - c.K) // As per 4.3 for each received ACK cwnd must be incremented // by (w_cubic(t+RTT) - cwnd/cwnd. @@ -151,7 +152,7 @@ func (c *cubicState) getCwnd(packetsAcked, sndCwnd int, srtt time.Duration) int func (c *cubicState) HandleLossDetected() { // See: https://tools.ietf.org/html/rfc8312#section-4.5 c.numCongestionEvents++ - c.T = time.Now() + c.T = c.s.ep.stack.Clock().NowMonotonic() c.WLastMax = c.WMax c.WMax = float64(c.s.SndCwnd) @@ -162,7 +163,7 @@ func (c *cubicState) HandleLossDetected() { // HandleRTOExpired implements congestionContrl.HandleRTOExpired. func (c *cubicState) HandleRTOExpired() { // See: https://tools.ietf.org/html/rfc8312#section-4.6 - c.T = time.Now() + c.T = c.s.ep.stack.Clock().NowMonotonic() c.numCongestionEvents = 0 c.WLastMax = c.WMax c.WMax = float64(c.s.SndCwnd) @@ -193,7 +194,7 @@ func (c *cubicState) fastConvergence() { // PostRecovery implemements congestionControl.PostRecovery. func (c *cubicState) PostRecovery() { - c.T = time.Now() + c.T = c.s.ep.stack.Clock().NowMonotonic() } // reduceSlowStartThreshold returns new SsThresh as described in diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go index 0ca986512..5963a169c 100644 --- a/pkg/tcpip/transport/tcp/dispatcher.go +++ b/pkg/tcpip/transport/tcp/dispatcher.go @@ -20,6 +20,7 @@ import ( "gvisor.dev/gvisor/pkg/rand" "gvisor.dev/gvisor/pkg/sleep" "gvisor.dev/gvisor/pkg/sync" + "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/hash/jenkins" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/stack" @@ -172,10 +173,10 @@ func (d *dispatcher) wait() { d.wg.Wait() } -func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.TransportEndpointID, pkt *stack.PacketBuffer) { +func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.TransportEndpointID, clock tcpip.Clock, pkt *stack.PacketBuffer) { ep := stackEP.(*endpoint) - s := newIncomingSegment(id, pkt) + s := newIncomingSegment(id, clock, pkt) if !s.parse(pkt.RXTransportChecksumValidated) { ep.stack.Stats().TCP.InvalidSegmentsReceived.Increment() ep.stats.ReceiveErrors.MalformedPacketsReceived.Increment() diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index fb7670adb..d44f480ab 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -480,7 +480,7 @@ type endpoint struct { // recentTSTime is the unix time when we last updated // TCPEndpointStateInner.RecentTS. - recentTSTime time.Time `state:".(unixTime)"` + recentTSTime tcpip.MonotonicTime // shutdownFlags represent the current shutdown state of the endpoint. shutdownFlags tcpip.ShutdownFlags @@ -638,7 +638,7 @@ type endpoint struct { // lastOutOfWindowAckTime is the time at which the an ACK was sent in response // to an out of window segment being received by this endpoint. - lastOutOfWindowAckTime time.Time `state:".(unixTime)"` + lastOutOfWindowAckTime tcpip.MonotonicTime } // UniqueID implements stack.TransportEndpoint.UniqueID. @@ -787,7 +787,7 @@ func (e *endpoint) EndpointState() EndpointState { // setRecentTimestamp sets the recentTS field to the provided value. func (e *endpoint) setRecentTimestamp(recentTS uint32) { e.RecentTS = recentTS - e.recentTSTime = time.Now() + e.recentTSTime = e.stack.Clock().NowMonotonic() } // recentTimestamp returns the value of the recentTS field. @@ -884,7 +884,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue e.segmentQueue.ep = e e.TSOffset = timeStampOffset() e.acceptCond = sync.NewCond(&e.acceptMu) - e.keepalive.timer.init(&e.keepalive.waker) + e.keepalive.timer.init(e.stack.Clock(), &e.keepalive.waker) return e } @@ -1201,7 +1201,7 @@ func (e *endpoint) ModerateRecvBuf(copied int) { e.rcvQueueInfo.rcvQueueMu.Unlock() return } - now := time.Now() + now := e.stack.Clock().NowMonotonic() if rtt := e.rcvQueueInfo.RcvAutoParams.RTT; rtt == 0 || now.Sub(e.rcvQueueInfo.RcvAutoParams.MeasureTime) < rtt { e.rcvQueueInfo.RcvAutoParams.CopiedBytes += copied e.rcvQueueInfo.rcvQueueMu.Unlock() @@ -1556,7 +1556,7 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp } // Add data to the send queue. - s := newOutgoingSegment(e.TransportEndpointInfo.ID, v) + s := newOutgoingSegment(e.TransportEndpointInfo.ID, e.stack.Clock(), v) e.sndQueueInfo.SndBufUsed += len(v) e.sndQueueInfo.SndBufInQueue += seqnum.Size(len(v)) e.sndQueueInfo.sndQueue.PushBack(s) @@ -2241,7 +2241,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp // If the endpoint is not in TIME-WAIT or if it is in TIME-WAIT but // less than 1 second has elapsed since its recentTS was updated then // we cannot reuse the port. - if tcpEP.EndpointState() != StateTimeWait || time.Since(tcpEP.recentTSTime) < 1*time.Second { + if tcpEP.EndpointState() != StateTimeWait || e.stack.Clock().NowMonotonic().Sub(tcpEP.recentTSTime) < 1*time.Second { tcpEP.UnlockUser() return false, nil } @@ -2387,7 +2387,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { } // Queue fin segment. - s := newOutgoingSegment(e.TransportEndpointInfo.ID, nil) + s := newOutgoingSegment(e.TransportEndpointInfo.ID, e.stack.Clock(), nil) e.sndQueueInfo.sndQueue.PushBack(s) e.sndQueueInfo.SndBufInQueue++ // Mark endpoint as closed. @@ -2865,14 +2865,15 @@ func (e *endpoint) maybeEnableTimestamp(synOpts *header.TCPSynOptions) { // timestamp returns the timestamp value to be used in the TSVal field of the // timestamp option for outgoing TCP segments for a given endpoint. func (e *endpoint) timestamp() uint32 { - return tcpTimeStamp(time.Now(), e.TSOffset) + return tcpTimeStamp(e.stack.Clock().NowMonotonic(), e.TSOffset) } // tcpTimeStamp returns a timestamp offset by the provided offset. This is // not inlined above as it's used when SYN cookies are in use and endpoint // is not created at the time when the SYN cookie is sent. -func tcpTimeStamp(curTime time.Time, offset uint32) uint32 { - return uint32(curTime.Unix()*1000+int64(curTime.Nanosecond()/1e6)) + offset +func tcpTimeStamp(curTime tcpip.MonotonicTime, offset uint32) uint32 { + d := curTime.Sub(tcpip.MonotonicTime{}) + return uint32(d.Milliseconds()) + offset } // timeStampOffset returns a randomized timestamp offset to be used when sending @@ -2926,7 +2927,7 @@ func (e *endpoint) completeStateLocked() stack.TCPEndpointState { s := stack.TCPEndpointState{ TCPEndpointStateInner: e.TCPEndpointStateInner, ID: stack.TCPEndpointID(e.TransportEndpointInfo.ID), - SegTime: time.Now(), + SegTime: e.stack.Clock().NowMonotonic(), Receiver: e.rcv.TCPReceiverState, Sender: e.snd.TCPSenderState, } @@ -2954,7 +2955,7 @@ func (e *endpoint) completeStateLocked() stack.TCPEndpointState { if cubic, ok := e.snd.cc.(*cubicState); ok { s.Sender.Cubic = cubic.TCPCubicState - s.Sender.Cubic.TimeSinceLastCongestion = time.Since(s.Sender.Cubic.T) + s.Sender.Cubic.TimeSinceLastCongestion = e.stack.Clock().NowMonotonic().Sub(s.Sender.Cubic.T) } s.Sender.RACKState = e.snd.rc.TCPRACKState @@ -3046,14 +3047,16 @@ func GetTCPSendBufferLimits(s tcpip.StackHandler) tcpip.SendBufferSizeOption { // allowOutOfWindowAck returns true if an out-of-window ACK can be sent now. func (e *endpoint) allowOutOfWindowAck() bool { - var limit stack.TCPInvalidRateLimitOption - if err := e.stack.Option(&limit); err != nil { - panic(fmt.Sprintf("e.stack.Option(%+v) failed with error: %s", limit, err)) - } + now := e.stack.Clock().NowMonotonic() - now := time.Now() - if now.Sub(e.lastOutOfWindowAckTime) < time.Duration(limit) { - return false + if e.lastOutOfWindowAckTime != (tcpip.MonotonicTime{}) { + var limit stack.TCPInvalidRateLimitOption + if err := e.stack.Option(&limit); err != nil { + panic(fmt.Sprintf("e.stack.Option(%+v) failed with error: %s", limit, err)) + } + if now.Sub(e.lastOutOfWindowAckTime) < time.Duration(limit) { + return false + } } e.lastOutOfWindowAckTime = now diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go index a56d34dc5..952ccacdd 100644 --- a/pkg/tcpip/transport/tcp/endpoint_state.go +++ b/pkg/tcpip/transport/tcp/endpoint_state.go @@ -158,12 +158,17 @@ func (e *endpoint) afterLoad() { // Condition variables and mutexs are not S/R'ed so reinitialize // acceptCond with e.acceptMu. e.acceptCond = sync.NewCond(&e.acceptMu) - e.keepalive.timer.init(&e.keepalive.waker) stack.StackFromEnv.RegisterRestoredEndpoint(e) } // Resume implements tcpip.ResumableEndpoint.Resume. func (e *endpoint) Resume(s *stack.Stack) { + e.keepalive.timer.init(s.Clock(), &e.keepalive.waker) + if snd := e.snd; snd != nil { + snd.resendTimer.init(s.Clock(), &snd.resendWaker) + snd.reorderTimer.init(s.Clock(), &snd.reorderWaker) + snd.probeTimer.init(s.Clock(), &snd.probeWaker) + } e.stack = s e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits, GetTCPReceiveBufferLimits) e.segmentQueue.thaw() @@ -290,23 +295,3 @@ func (e *endpoint) Resume(s *stack.Stack) { tcpip.DeleteDanglingEndpoint(e) } } - -// saveRecentTSTime is invoked by stateify. -func (e *endpoint) saveRecentTSTime() unixTime { - return unixTime{e.recentTSTime.Unix(), e.recentTSTime.UnixNano()} -} - -// loadRecentTSTime is invoked by stateify. -func (e *endpoint) loadRecentTSTime(unix unixTime) { - e.recentTSTime = time.Unix(unix.second, unix.nano) -} - -// saveLastOutOfWindowAckTime is invoked by stateify. -func (e *endpoint) saveLastOutOfWindowAckTime() unixTime { - return unixTime{e.lastOutOfWindowAckTime.Unix(), e.lastOutOfWindowAckTime.UnixNano()} -} - -// loadLastOutOfWindowAckTime is invoked by stateify. -func (e *endpoint) loadLastOutOfWindowAckTime(unix unixTime) { - e.lastOutOfWindowAckTime = time.Unix(unix.second, unix.nano) -} diff --git a/pkg/tcpip/transport/tcp/forwarder.go b/pkg/tcpip/transport/tcp/forwarder.go index 2f9fe7ee0..65c86823a 100644 --- a/pkg/tcpip/transport/tcp/forwarder.go +++ b/pkg/tcpip/transport/tcp/forwarder.go @@ -65,7 +65,7 @@ func NewForwarder(s *stack.Stack, rcvWnd, maxInFlight int, handler func(*Forward // This function is expected to be passed as an argument to the // stack.SetTransportProtocolHandler function. func (f *Forwarder) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketBuffer) bool { - s := newIncomingSegment(id, pkt) + s := newIncomingSegment(id, f.stack.Clock(), pkt) defer s.decRef() // We only care about well-formed SYN packets. diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go index d43e21426..c970dcecc 100644 --- a/pkg/tcpip/transport/tcp/protocol.go +++ b/pkg/tcpip/transport/tcp/protocol.go @@ -131,7 +131,7 @@ func (*protocol) ParsePorts(v buffer.View) (src, dst uint16, err tcpip.Error) { // goroutine which is responsible for dequeuing and doing full TCP dispatch of // the packet. func (p *protocol) QueuePacket(ep stack.TransportEndpoint, id stack.TransportEndpointID, pkt *stack.PacketBuffer) { - p.dispatcher.queuePacket(ep, id, pkt) + p.dispatcher.queuePacket(ep, id, p.stack.Clock(), pkt) } // HandleUnknownDestinationPacket handles packets targeted at this protocol but @@ -142,7 +142,7 @@ func (p *protocol) QueuePacket(ep stack.TransportEndpoint, id stack.TransportEnd // particular, SYNs addressed to a non-existent connection are rejected by this // means." func (p *protocol) HandleUnknownDestinationPacket(id stack.TransportEndpointID, pkt *stack.PacketBuffer) stack.UnknownDestinationPacketDisposition { - s := newIncomingSegment(id, pkt) + s := newIncomingSegment(id, p.stack.Clock(), pkt) defer s.decRef() if !s.parse(pkt.RXTransportChecksumValidated) || !s.csumValid { diff --git a/pkg/tcpip/transport/tcp/rack.go b/pkg/tcpip/transport/tcp/rack.go index 813a6dffd..0da4eafaa 100644 --- a/pkg/tcpip/transport/tcp/rack.go +++ b/pkg/tcpip/transport/tcp/rack.go @@ -79,7 +79,7 @@ func (rc *rackControl) init(snd *sender, iss seqnum.Value) { // update will update the RACK related fields when an ACK has been received. // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-09#section-6.2 func (rc *rackControl) update(seg *segment, ackSeg *segment) { - rtt := time.Now().Sub(seg.xmitTime) + rtt := rc.snd.ep.stack.Clock().NowMonotonic().Sub(seg.xmitTime) tsOffset := rc.snd.ep.TSOffset // If the ACK is for a retransmitted packet, do not update if it is a @@ -115,7 +115,7 @@ func (rc *rackControl) update(seg *segment, ackSeg *segment) { // ending sequence number of the packet which has been acknowledged // most recently. endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size())) - if rc.XmitTime.Before(seg.xmitTime) || (seg.xmitTime.Equal(rc.XmitTime) && rc.EndSequence.LessThan(endSeq)) { + if rc.XmitTime.Before(seg.xmitTime) || (seg.xmitTime == rc.XmitTime && rc.EndSequence.LessThan(endSeq)) { rc.XmitTime = seg.xmitTime rc.EndSequence = endSeq } @@ -174,7 +174,7 @@ func (s *sender) schedulePTO() { } s.rtt.Unlock() - now := time.Now() + now := s.ep.stack.Clock().NowMonotonic() if s.resendTimer.enabled() { if now.Add(pto).After(s.resendTimer.target) { pto = s.resendTimer.target.Sub(now) @@ -352,7 +352,7 @@ func (rc *rackControl) exitRecovery() { // detectLoss marks the segment as lost if the reordering window has elapsed // and the ACK is not received. It will also arm the reorder timer. // See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 Step 5. -func (rc *rackControl) detectLoss(rcvTime time.Time) int { +func (rc *rackControl) detectLoss(rcvTime tcpip.MonotonicTime) int { var timeout time.Duration numLost := 0 for seg := rc.snd.writeList.Front(); seg != nil && seg.xmitCount != 0; seg = seg.Next() { @@ -366,7 +366,7 @@ func (rc *rackControl) detectLoss(rcvTime time.Time) int { } endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size())) - if seg.xmitTime.Before(rc.XmitTime) || (seg.xmitTime.Equal(rc.XmitTime) && rc.EndSequence.LessThan(endSeq)) { + if seg.xmitTime.Before(rc.XmitTime) || (seg.xmitTime == rc.XmitTime && rc.EndSequence.LessThan(endSeq)) { timeRemaining := seg.xmitTime.Sub(rcvTime) + rc.RTT + rc.ReoWnd if timeRemaining <= 0 { seg.lost = true @@ -392,7 +392,7 @@ func (rc *rackControl) reorderTimerExpired() tcpip.Error { return nil } - numLost := rc.detectLoss(time.Now()) + numLost := rc.detectLoss(rc.snd.ep.stack.Clock().NowMonotonic()) if numLost == 0 { return nil } diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go index 4fd8a0624..b46f8320e 100644 --- a/pkg/tcpip/transport/tcp/rcv.go +++ b/pkg/tcpip/transport/tcp/rcv.go @@ -17,7 +17,6 @@ package tcp import ( "container/heap" "math" - "time" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/header" @@ -50,7 +49,7 @@ type receiver struct { pendingRcvdSegments segmentHeap // Time when the last ack was received. - lastRcvdAckTime time.Time `state:".(unixTime)"` + lastRcvdAckTime tcpip.MonotonicTime } func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8) *receiver { @@ -63,7 +62,7 @@ func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale }, rcvWnd: rcvWnd, rcvWUP: irs + 1, - lastRcvdAckTime: time.Now(), + lastRcvdAckTime: ep.stack.Clock().NowMonotonic(), } } @@ -325,9 +324,9 @@ func (r *receiver) updateRTT() { // is first acknowledged and the receipt of data that is at least one // window beyond the sequence number that was acknowledged. r.ep.rcvQueueInfo.rcvQueueMu.Lock() - if r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime.IsZero() { + if r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime == (tcpip.MonotonicTime{}) { // New measurement. - r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = time.Now() + r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = r.ep.stack.Clock().NowMonotonic() r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureSeqNumber = r.RcvNxt.Add(r.rcvWnd) r.ep.rcvQueueInfo.rcvQueueMu.Unlock() return @@ -336,14 +335,14 @@ func (r *receiver) updateRTT() { r.ep.rcvQueueInfo.rcvQueueMu.Unlock() return } - rtt := time.Since(r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime) + rtt := r.ep.stack.Clock().NowMonotonic().Sub(r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime) // We only store the minimum observed RTT here as this is only used in // absence of a SRTT available from either timestamps or a sender // measurement of RTT. if r.ep.rcvQueueInfo.RcvAutoParams.RTT == 0 || rtt < r.ep.rcvQueueInfo.RcvAutoParams.RTT { r.ep.rcvQueueInfo.RcvAutoParams.RTT = rtt } - r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = time.Now() + r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = r.ep.stack.Clock().NowMonotonic() r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureSeqNumber = r.RcvNxt.Add(r.rcvWnd) r.ep.rcvQueueInfo.rcvQueueMu.Unlock() } @@ -467,7 +466,7 @@ func (r *receiver) handleRcvdSegment(s *segment) (drop bool, err tcpip.Error) { } // Store the time of the last ack. - r.lastRcvdAckTime = time.Now() + r.lastRcvdAckTime = r.ep.stack.Clock().NowMonotonic() // Defer segment processing if it can't be consumed now. if !r.consumeSegment(s, segSeq, segLen) { diff --git a/pkg/tcpip/transport/tcp/rcv_state.go b/pkg/tcpip/transport/tcp/rcv_state.go deleted file mode 100644 index 2bf21a2e7..000000000 --- a/pkg/tcpip/transport/tcp/rcv_state.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2019 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tcp - -import ( - "time" -) - -// saveLastRcvdAckTime is invoked by stateify. -func (r *receiver) saveLastRcvdAckTime() unixTime { - return unixTime{r.lastRcvdAckTime.Unix(), r.lastRcvdAckTime.UnixNano()} -} - -// loadLastRcvdAckTime is invoked by stateify. -func (r *receiver) loadLastRcvdAckTime(unix unixTime) { - r.lastRcvdAckTime = time.Unix(unix.second, unix.nano) -} diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go index 61754de29..e5c0d52fa 100644 --- a/pkg/tcpip/transport/tcp/segment.go +++ b/pkg/tcpip/transport/tcp/segment.go @@ -17,7 +17,6 @@ package tcp import ( "fmt" "sync/atomic" - "time" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/buffer" @@ -73,9 +72,9 @@ type segment struct { parsedOptions header.TCPOptions options []byte `state:".([]byte)"` hasNewSACKInfo bool - rcvdTime time.Time `state:".(unixTime)"` + rcvdTime tcpip.MonotonicTime // xmitTime is the last transmit time of this segment. - xmitTime time.Time `state:".(unixTime)"` + xmitTime tcpip.MonotonicTime xmitCount uint32 // acked indicates if the segment has already been SACKed. @@ -88,7 +87,7 @@ type segment struct { lost bool } -func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) *segment { +func newIncomingSegment(id stack.TransportEndpointID, clock tcpip.Clock, pkt *stack.PacketBuffer) *segment { netHdr := pkt.Network() s := &segment{ refCnt: 1, @@ -100,17 +99,17 @@ func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) * } s.data = pkt.Data().ExtractVV().Clone(s.views[:]) s.hdr = header.TCP(pkt.TransportHeader().View()) - s.rcvdTime = time.Now() + s.rcvdTime = clock.NowMonotonic() s.dataMemSize = s.data.Size() return s } -func newOutgoingSegment(id stack.TransportEndpointID, v buffer.View) *segment { +func newOutgoingSegment(id stack.TransportEndpointID, clock tcpip.Clock, v buffer.View) *segment { s := &segment{ refCnt: 1, id: id, } - s.rcvdTime = time.Now() + s.rcvdTime = clock.NowMonotonic() if len(v) != 0 { s.views[0] = v s.data = buffer.NewVectorisedView(len(v), s.views[:1]) diff --git a/pkg/tcpip/transport/tcp/segment_state.go b/pkg/tcpip/transport/tcp/segment_state.go index 7422d8c02..dcfa80f95 100644 --- a/pkg/tcpip/transport/tcp/segment_state.go +++ b/pkg/tcpip/transport/tcp/segment_state.go @@ -15,8 +15,6 @@ package tcp import ( - "time" - "gvisor.dev/gvisor/pkg/tcpip/buffer" ) @@ -55,23 +53,3 @@ func (s *segment) loadOptions(options []byte) { // allocated so there is no cost here. s.options = options } - -// saveRcvdTime is invoked by stateify. -func (s *segment) saveRcvdTime() unixTime { - return unixTime{s.rcvdTime.Unix(), s.rcvdTime.UnixNano()} -} - -// loadRcvdTime is invoked by stateify. -func (s *segment) loadRcvdTime(unix unixTime) { - s.rcvdTime = time.Unix(unix.second, unix.nano) -} - -// saveXmitTime is invoked by stateify. -func (s *segment) saveXmitTime() unixTime { - return unixTime{s.rcvdTime.Unix(), s.rcvdTime.UnixNano()} -} - -// loadXmitTime is invoked by stateify. -func (s *segment) loadXmitTime(unix unixTime) { - s.rcvdTime = time.Unix(unix.second, unix.nano) -} diff --git a/pkg/tcpip/transport/tcp/segment_test.go b/pkg/tcpip/transport/tcp/segment_test.go index 486016fc0..2e6ea06f5 100644 --- a/pkg/tcpip/transport/tcp/segment_test.go +++ b/pkg/tcpip/transport/tcp/segment_test.go @@ -19,6 +19,7 @@ import ( "github.com/google/go-cmp/cmp" "gvisor.dev/gvisor/pkg/tcpip/buffer" + "gvisor.dev/gvisor/pkg/tcpip/faketime" "gvisor.dev/gvisor/pkg/tcpip/stack" ) @@ -39,10 +40,11 @@ func checkSegmentSize(t *testing.T, name string, seg *segment, want segmentSizeW } func TestSegmentMerge(t *testing.T) { + var clock faketime.NullClock id := stack.TransportEndpointID{} - seg1 := newOutgoingSegment(id, buffer.NewView(10)) + seg1 := newOutgoingSegment(id, &clock, buffer.NewView(10)) defer seg1.decRef() - seg2 := newOutgoingSegment(id, buffer.NewView(20)) + seg2 := newOutgoingSegment(id, &clock, buffer.NewView(20)) defer seg2.decRef() checkSegmentSize(t, "seg1", seg1, segmentSizeWants{ diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 6b7293755..ccff9ca8d 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -94,7 +94,7 @@ type sender struct { // firstRetransmittedSegXmitTime is the original transmit time of // the first segment that was retransmitted due to RTO expiration. - firstRetransmittedSegXmitTime time.Time `state:".(unixTime)"` + firstRetransmittedSegXmitTime tcpip.MonotonicTime // zeroWindowProbing is set if the sender is currently probing // for zero receive window. @@ -169,7 +169,7 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint SndUna: iss + 1, SndNxt: iss + 1, RTTMeasureSeqNum: iss + 1, - LastSendTime: time.Now(), + LastSendTime: ep.stack.Clock().NowMonotonic(), MaxPayloadSize: maxPayloadSize, MaxSentAck: irs + 1, FastRecovery: stack.TCPFastRecoveryState{ @@ -197,9 +197,9 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint s.SndWndScale = uint8(sndWndScale) } - s.resendTimer.init(&s.resendWaker) - s.reorderTimer.init(&s.reorderWaker) - s.probeTimer.init(&s.probeWaker) + s.resendTimer.init(s.ep.stack.Clock(), &s.resendWaker) + s.reorderTimer.init(s.ep.stack.Clock(), &s.reorderWaker) + s.probeTimer.init(s.ep.stack.Clock(), &s.probeWaker) s.updateMaxPayloadSize(int(ep.route.MTU()), 0) @@ -441,7 +441,7 @@ func (s *sender) retransmitTimerExpired() bool { // timeout since the first retransmission. uto := s.ep.userTimeout - if s.firstRetransmittedSegXmitTime.IsZero() { + if s.firstRetransmittedSegXmitTime == (tcpip.MonotonicTime{}) { // We store the original xmitTime of the segment that we are // about to retransmit as the retransmission time. This is // required as by the time the retransmitTimer has expired the @@ -450,7 +450,7 @@ func (s *sender) retransmitTimerExpired() bool { s.firstRetransmittedSegXmitTime = s.writeList.Front().xmitTime } - elapsed := time.Since(s.firstRetransmittedSegXmitTime) + elapsed := s.ep.stack.Clock().NowMonotonic().Sub(s.firstRetransmittedSegXmitTime) remaining := s.maxRTO if uto != 0 { // Cap to the user specified timeout if one is specified. @@ -866,8 +866,8 @@ func (s *sender) enableZeroWindowProbing() { // We piggyback the probing on the retransmit timer with the // current retranmission interval, as we may start probing while // segment retransmissions. - if s.firstRetransmittedSegXmitTime.IsZero() { - s.firstRetransmittedSegXmitTime = time.Now() + if s.firstRetransmittedSegXmitTime == (tcpip.MonotonicTime{}) { + s.firstRetransmittedSegXmitTime = s.ep.stack.Clock().NowMonotonic() } s.resendTimer.enable(s.RTO) } @@ -875,7 +875,7 @@ func (s *sender) enableZeroWindowProbing() { func (s *sender) disableZeroWindowProbing() { s.zeroWindowProbing = false s.unackZeroWindowProbes = 0 - s.firstRetransmittedSegXmitTime = time.Time{} + s.firstRetransmittedSegXmitTime = tcpip.MonotonicTime{} s.resendTimer.disable() } @@ -925,7 +925,7 @@ func (s *sender) sendData() { // "A TCP SHOULD set cwnd to no more than RW before beginning // transmission if the TCP has not sent data in the interval exceeding // the retrasmission timeout." - if !s.FastRecovery.Active && s.state != tcpip.RTORecovery && time.Now().Sub(s.LastSendTime) > s.RTO { + if !s.FastRecovery.Active && s.state != tcpip.RTORecovery && s.ep.stack.Clock().NowMonotonic().Sub(s.LastSendTime) > s.RTO { if s.SndCwnd > InitialCwnd { s.SndCwnd = InitialCwnd } @@ -1234,7 +1234,7 @@ func checkDSACK(rcvdSeg *segment) bool { func (s *sender) handleRcvdSegment(rcvdSeg *segment) { // Check if we can extract an RTT measurement from this ack. if !rcvdSeg.parsedOptions.TS && s.RTTMeasureSeqNum.LessThan(rcvdSeg.ackNumber) { - s.updateRTO(time.Now().Sub(s.RTTMeasureTime)) + s.updateRTO(s.ep.stack.Clock().NowMonotonic().Sub(s.RTTMeasureTime)) s.RTTMeasureSeqNum = s.SndNxt } @@ -1444,7 +1444,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { if s.SndUna == s.SndNxt { s.Outstanding = 0 // Reset firstRetransmittedSegXmitTime to the zero value. - s.firstRetransmittedSegXmitTime = time.Time{} + s.firstRetransmittedSegXmitTime = tcpip.MonotonicTime{} s.resendTimer.disable() s.probeTimer.disable() } @@ -1502,7 +1502,7 @@ func (s *sender) sendSegment(seg *segment) tcpip.Error { s.ep.stack.Stats().TCP.SlowStartRetransmits.Increment() } } - seg.xmitTime = time.Now() + seg.xmitTime = s.ep.stack.Clock().NowMonotonic() seg.xmitCount++ seg.lost = false err := s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber) @@ -1527,7 +1527,7 @@ func (s *sender) sendSegment(seg *segment) tcpip.Error { // sendSegmentFromView sends a new segment containing the given payload, flags // and sequence number. func (s *sender) sendSegmentFromView(data buffer.VectorisedView, flags header.TCPFlags, seq seqnum.Value) tcpip.Error { - s.LastSendTime = time.Now() + s.LastSendTime = s.ep.stack.Clock().NowMonotonic() if seq == s.RTTMeasureSeqNum { s.RTTMeasureTime = s.LastSendTime } diff --git a/pkg/tcpip/transport/tcp/snd_state.go b/pkg/tcpip/transport/tcp/snd_state.go deleted file mode 100644 index 2f805d8ce..000000000 --- a/pkg/tcpip/transport/tcp/snd_state.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tcp - -import ( - "time" -) - -// +stateify savable -type unixTime struct { - second int64 - nano int64 -} - -// afterLoad is invoked by stateify. -func (s *sender) afterLoad() { - s.resendTimer.init(&s.resendWaker) - s.reorderTimer.init(&s.reorderWaker) - s.probeTimer.init(&s.probeWaker) -} - -// saveFirstRetransmittedSegXmitTime is invoked by stateify. -func (s *sender) saveFirstRetransmittedSegXmitTime() unixTime { - return unixTime{s.firstRetransmittedSegXmitTime.Unix(), s.firstRetransmittedSegXmitTime.UnixNano()} -} - -// loadFirstRetransmittedSegXmitTime is invoked by stateify. -func (s *sender) loadFirstRetransmittedSegXmitTime(unix unixTime) { - s.firstRetransmittedSegXmitTime = time.Unix(unix.second, unix.nano) -} diff --git a/pkg/tcpip/transport/tcp/tcp_rack_test.go b/pkg/tcpip/transport/tcp/tcp_rack_test.go index 29af87b7d..d6cf786a1 100644 --- a/pkg/tcpip/transport/tcp/tcp_rack_test.go +++ b/pkg/tcpip/transport/tcp/tcp_rack_test.go @@ -50,7 +50,7 @@ func TestRACKUpdate(t *testing.T) { c := context.New(t, uint32(mtu)) defer c.Cleanup() - var xmitTime time.Time + var xmitTime tcpip.MonotonicTime probeDone := make(chan struct{}) c.Stack().AddTCPProbe(func(state stack.TCPEndpointState) { // Validate that the endpoint Sender.RACKState is what we expect. @@ -79,7 +79,7 @@ func TestRACKUpdate(t *testing.T) { } // Write the data. - xmitTime = time.Now() + xmitTime = c.Stack().Clock().NowMonotonic() var r bytes.Reader r.Reset(data) if _, err := c.EP.Write(&r, tcpip.WriteOptions{}); err != nil { diff --git a/pkg/tcpip/transport/tcp/timer.go b/pkg/tcpip/transport/tcp/timer.go index 38a335840..5645c772e 100644 --- a/pkg/tcpip/transport/tcp/timer.go +++ b/pkg/tcpip/transport/tcp/timer.go @@ -15,21 +15,29 @@ package tcp import ( + "math" "time" "gvisor.dev/gvisor/pkg/sleep" + "gvisor.dev/gvisor/pkg/tcpip" ) type timerState int const ( + // The timer is disabled. timerStateDisabled timerState = iota + // The timer is enabled, but the clock timer may be set to an earlier + // expiration time due to a previous orphaned state. timerStateEnabled + // The timer is disabled, but the clock timer is enabled, which means that + // it will cause a spurious wakeup unless the timer is enabled before the + // clock timer fires. timerStateOrphaned ) // timer is a timer implementation that reduces the interactions with the -// runtime timer infrastructure by letting timers run (and potentially +// clock timer infrastructure by letting timers run (and potentially // eventually expire) even if they are stopped. It makes it cheaper to // disable/reenable timers at the expense of spurious wakes. This is useful for // cases when the same timer is disabled/reenabled repeatedly with relatively @@ -39,44 +47,37 @@ const ( // (currently at least 200ms), and get disabled when acks are received, and // reenabled when new pending segments are sent. // -// It is advantageous to avoid interacting with the runtime because it acquires +// It is advantageous to avoid interacting with the clock because it acquires // a global mutex and performs O(log n) operations, where n is the global number // of timers, whenever a timer is enabled or disabled, and may make a syscall. // // This struct is thread-compatible. type timer struct { - // state is the current state of the timer, it can be one of the - // following values: - // disabled - the timer is disabled. - // orphaned - the timer is disabled, but the runtime timer is - // enabled, which means that it will evetually cause a - // spurious wake (unless it gets enabled again before - // then). - // enabled - the timer is enabled, but the runtime timer may be set - // to an earlier expiration time due to a previous - // orphaned state. state timerState + clock tcpip.Clock + // target is the expiration time of the current timer. It is only // meaningful in the enabled state. - target time.Time + target tcpip.MonotonicTime - // runtimeTarget is the expiration time of the runtime timer. It is + // clockTarget is the expiration time of the clock timer. It is // meaningful in the enabled and orphaned states. - runtimeTarget time.Time + clockTarget tcpip.MonotonicTime - // timer is the runtime timer used to wait on. - timer *time.Timer + // timer is the clock timer used to wait on. + timer tcpip.Timer } // init initializes the timer. Once it expires, it the given waker will be // asserted. -func (t *timer) init(w *sleep.Waker) { +func (t *timer) init(clock tcpip.Clock, w *sleep.Waker) { t.state = timerStateDisabled + t.clock = clock - // Initialize a runtime timer that will assert the waker, then + // Initialize a clock timer that will assert the waker, then // immediately stop it. - t.timer = time.AfterFunc(time.Hour, func() { + t.timer = t.clock.AfterFunc(math.MaxInt64, func() { w.Assert() }) t.timer.Stop() @@ -106,9 +107,9 @@ func (t *timer) checkExpiration() bool { // The timer is enabled, but it may have expired early. Check if that's // the case, and if so, reset the runtime timer to the correct time. - now := time.Now() + now := t.clock.NowMonotonic() if now.Before(t.target) { - t.runtimeTarget = t.target + t.clockTarget = t.target t.timer.Reset(t.target.Sub(now)) return false } @@ -134,11 +135,11 @@ func (t *timer) enabled() bool { // enable enables the timer, programming the runtime timer if necessary. func (t *timer) enable(d time.Duration) { - t.target = time.Now().Add(d) + t.target = t.clock.NowMonotonic().Add(d) // Check if we need to set the runtime timer. - if t.state == timerStateDisabled || t.target.Before(t.runtimeTarget) { - t.runtimeTarget = t.target + if t.state == timerStateDisabled || t.target.Before(t.clockTarget) { + t.clockTarget = t.target t.timer.Reset(d) } diff --git a/pkg/tcpip/transport/tcp/timer_test.go b/pkg/tcpip/transport/tcp/timer_test.go index dbd6dff54..479752de7 100644 --- a/pkg/tcpip/transport/tcp/timer_test.go +++ b/pkg/tcpip/transport/tcp/timer_test.go @@ -19,6 +19,7 @@ import ( "time" "gvisor.dev/gvisor/pkg/sleep" + "gvisor.dev/gvisor/pkg/tcpip/faketime" ) func TestCleanup(t *testing.T) { @@ -27,9 +28,11 @@ func TestCleanup(t *testing.T) { isAssertedTimeoutSeconds = timerDurationSeconds + 1 ) + clock := faketime.NewManualClock() + tmr := timer{} w := sleep.Waker{} - tmr.init(&w) + tmr.init(clock, &w) tmr.enable(timerDurationSeconds * time.Second) tmr.cleanup() @@ -39,7 +42,7 @@ func TestCleanup(t *testing.T) { // The waker should not be asserted. for i := 0; i < isAssertedTimeoutSeconds; i++ { - time.Sleep(time.Second) + clock.Advance(time.Second) if w.IsAsserted() { t.Fatalf("waker asserted unexpectedly") } -- cgit v1.2.3