summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
authorTamir Duberstein <tamird@google.com>2021-05-26 06:47:52 -0700
committergVisor bot <gvisor-bot@google.com>2021-05-26 06:49:57 -0700
commitfcad6f91a3f292b6b76be10f03baf05ee5245d3d (patch)
treefa383878218fe0c69c5c346a08cfd953e398ee2d /pkg
parentb63e61828d0652ad1769db342c17a3529d2d24ed (diff)
Use the stack clock everywhere
Updates #5939. Updates #6012. RELNOTES: n/a PiperOrigin-RevId: 375931554
Diffstat (limited to 'pkg')
-rw-r--r--pkg/tcpip/network/ipv6/ndp.go30
-rw-r--r--pkg/tcpip/stack/conntrack.go2
-rw-r--r--pkg/tcpip/stack/iptables.go1
-rw-r--r--pkg/tcpip/stack/neighbor_cache.go2
-rw-r--r--pkg/tcpip/stack/nud.go12
-rw-r--r--pkg/tcpip/stack/nud_test.go6
-rw-r--r--pkg/tcpip/stack/stack_global_state.go72
-rw-r--r--pkg/tcpip/stack/tcp.go14
-rw-r--r--pkg/tcpip/tcpip.go5
-rw-r--r--pkg/tcpip/tests/integration/link_resolution_test.go214
-rw-r--r--pkg/tcpip/timer_test.go15
-rw-r--r--pkg/tcpip/transport/tcp/BUILD3
-rw-r--r--pkg/tcpip/transport/tcp/accept.go12
-rw-r--r--pkg/tcpip/transport/tcp/connect.go30
-rw-r--r--pkg/tcpip/transport/tcp/cubic.go19
-rw-r--r--pkg/tcpip/transport/tcp/dispatcher.go5
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go43
-rw-r--r--pkg/tcpip/transport/tcp/endpoint_state.go27
-rw-r--r--pkg/tcpip/transport/tcp/forwarder.go2
-rw-r--r--pkg/tcpip/transport/tcp/protocol.go4
-rw-r--r--pkg/tcpip/transport/tcp/rack.go12
-rw-r--r--pkg/tcpip/transport/tcp/rcv.go15
-rw-r--r--pkg/tcpip/transport/tcp/rcv_state.go29
-rw-r--r--pkg/tcpip/transport/tcp/segment.go13
-rw-r--r--pkg/tcpip/transport/tcp/segment_state.go22
-rw-r--r--pkg/tcpip/transport/tcp/segment_test.go6
-rw-r--r--pkg/tcpip/transport/tcp/snd.go30
-rw-r--r--pkg/tcpip/transport/tcp/snd_state.go42
-rw-r--r--pkg/tcpip/transport/tcp/tcp_rack_test.go4
-rw-r--r--pkg/tcpip/transport/tcp/timer.go51
-rw-r--r--pkg/tcpip/transport/tcp/timer_test.go7
31 files changed, 352 insertions, 397 deletions
diff --git a/pkg/tcpip/network/ipv6/ndp.go b/pkg/tcpip/network/ipv6/ndp.go
index 7af5e6975..851cd6e75 100644
--- a/pkg/tcpip/network/ipv6/ndp.go
+++ b/pkg/tcpip/network/ipv6/ndp.go
@@ -548,7 +548,7 @@ type tempSLAACAddrState struct {
// Must not be nil.
regenJob *tcpip.Job
- createdAt time.Time
+ createdAt tcpip.MonotonicTime
// The address's endpoint.
//
@@ -572,10 +572,10 @@ type slaacPrefixState struct {
invalidationJob *tcpip.Job
// Nonzero only when the address is not valid forever.
- validUntil time.Time
+ validUntil tcpip.MonotonicTime
// Nonzero only when the address is not preferred forever.
- preferredUntil time.Time
+ preferredUntil tcpip.MonotonicTime
// State associated with the stable address generated for the prefix.
stableAddr struct {
@@ -1050,7 +1050,7 @@ func (ndp *ndpState) doSLAAC(prefix tcpip.Subnet, pl, vl time.Duration) {
maxGenerationAttempts: ndp.configs.AutoGenAddressConflictRetries + 1,
}
- now := time.Now()
+ now := ndp.ep.protocol.stack.Clock().NowMonotonic()
// The time an address is preferred until is needed to properly generate the
// address.
@@ -1181,7 +1181,7 @@ func (ndp *ndpState) generateSLAACAddr(prefix tcpip.Subnet, state *slaacPrefixSt
state.stableAddr.localGenerationFailures++
}
- if addressEndpoint := ndp.addAndAcquireSLAACAddr(generatedAddr, stack.AddressConfigSlaac, time.Since(state.preferredUntil) >= 0 /* deprecated */); addressEndpoint != nil {
+ if addressEndpoint := ndp.addAndAcquireSLAACAddr(generatedAddr, stack.AddressConfigSlaac, ndp.ep.protocol.stack.Clock().NowMonotonic().Sub(state.preferredUntil) >= 0 /* deprecated */); addressEndpoint != nil {
state.stableAddr.addressEndpoint = addressEndpoint
state.generationAttempts++
return true
@@ -1236,13 +1236,13 @@ func (ndp *ndpState) generateTempSLAACAddr(prefix tcpip.Subnet, prefixState *sla
}
stableAddr := prefixState.stableAddr.addressEndpoint.AddressWithPrefix().Address
- now := time.Now()
+ now := ndp.ep.protocol.stack.Clock().NowMonotonic()
// As per RFC 4941 section 3.3 step 4, the valid lifetime of a temporary
// address is the lower of the valid lifetime of the stable address or the
// maximum temporary address valid lifetime.
vl := ndp.configs.MaxTempAddrValidLifetime
- if prefixState.validUntil != (time.Time{}) {
+ if prefixState.validUntil != (tcpip.MonotonicTime{}) {
if prefixVL := prefixState.validUntil.Sub(now); vl > prefixVL {
vl = prefixVL
}
@@ -1258,7 +1258,7 @@ func (ndp *ndpState) generateTempSLAACAddr(prefix tcpip.Subnet, prefixState *sla
// maximum temporary address preferred lifetime - the temporary address desync
// factor.
pl := ndp.configs.MaxTempAddrPreferredLifetime - ndp.temporaryAddressDesyncFactor
- if prefixState.preferredUntil != (time.Time{}) {
+ if prefixState.preferredUntil != (tcpip.MonotonicTime{}) {
if prefixPL := prefixState.preferredUntil.Sub(now); pl > prefixPL {
// Respect the preferred lifetime of the prefix, as per RFC 4941 section
// 3.3 step 4.
@@ -1393,7 +1393,7 @@ func (ndp *ndpState) refreshSLAACPrefixLifetimes(prefix tcpip.Subnet, prefixStat
// deprecation job so it can be reset.
prefixState.deprecationJob.Cancel()
- now := time.Now()
+ now := ndp.ep.protocol.stack.Clock().NowMonotonic()
// Schedule the deprecation job if prefix has a finite preferred lifetime.
if pl < header.NDPInfiniteLifetime {
@@ -1402,7 +1402,7 @@ func (ndp *ndpState) refreshSLAACPrefixLifetimes(prefix tcpip.Subnet, prefixStat
}
prefixState.preferredUntil = now.Add(pl)
} else {
- prefixState.preferredUntil = time.Time{}
+ prefixState.preferredUntil = tcpip.MonotonicTime{}
}
// As per RFC 4862 section 5.5.3.e, update the valid lifetime for prefix:
@@ -1420,17 +1420,17 @@ func (ndp *ndpState) refreshSLAACPrefixLifetimes(prefix tcpip.Subnet, prefixStat
// Handle the infinite valid lifetime separately as we do not schedule a
// job in this case.
prefixState.invalidationJob.Cancel()
- prefixState.validUntil = time.Time{}
+ prefixState.validUntil = tcpip.MonotonicTime{}
} else {
var effectiveVl time.Duration
var rl time.Duration
// If the prefix was originally set to be valid forever, assume the
// remaining time to be the maximum possible value.
- if prefixState.validUntil == (time.Time{}) {
+ if prefixState.validUntil == (tcpip.MonotonicTime{}) {
rl = header.NDPInfiniteLifetime
} else {
- rl = time.Until(prefixState.validUntil)
+ rl = prefixState.validUntil.Sub(now)
}
if vl > MinPrefixInformationValidLifetimeForUpdate || vl > rl {
@@ -1462,7 +1462,7 @@ func (ndp *ndpState) refreshSLAACPrefixLifetimes(prefix tcpip.Subnet, prefixStat
// maximum temporary address valid lifetime. Note, the valid lifetime of a
// temporary address is relative to the address's creation time.
validUntil := tempAddrState.createdAt.Add(ndp.configs.MaxTempAddrValidLifetime)
- if prefixState.validUntil != (time.Time{}) && validUntil.Sub(prefixState.validUntil) > 0 {
+ if prefixState.validUntil != (tcpip.MonotonicTime{}) && validUntil.Sub(prefixState.validUntil) > 0 {
validUntil = prefixState.validUntil
}
@@ -1482,7 +1482,7 @@ func (ndp *ndpState) refreshSLAACPrefixLifetimes(prefix tcpip.Subnet, prefixStat
// desync factor. Note, the preferred lifetime of a temporary address is
// relative to the address's creation time.
preferredUntil := tempAddrState.createdAt.Add(ndp.configs.MaxTempAddrPreferredLifetime - ndp.temporaryAddressDesyncFactor)
- if prefixState.preferredUntil != (time.Time{}) && preferredUntil.Sub(prefixState.preferredUntil) > 0 {
+ if prefixState.preferredUntil != (tcpip.MonotonicTime{}) && preferredUntil.Sub(prefixState.preferredUntil) > 0 {
preferredUntil = prefixState.preferredUntil
}
diff --git a/pkg/tcpip/stack/conntrack.go b/pkg/tcpip/stack/conntrack.go
index 5720e7543..f7fbcbaa7 100644
--- a/pkg/tcpip/stack/conntrack.go
+++ b/pkg/tcpip/stack/conntrack.go
@@ -125,6 +125,8 @@ type conn struct {
tcb tcpconntrack.TCB
// lastUsed is the last time the connection saw a relevant packet, and
// is updated by each packet on the connection. It is protected by mu.
+ //
+ // TODO(gvisor.dev/issue/5939): do not use the ambient clock.
lastUsed time.Time `state:".(unixTime)"`
}
diff --git a/pkg/tcpip/stack/iptables.go b/pkg/tcpip/stack/iptables.go
index 3670d5995..d2f666c09 100644
--- a/pkg/tcpip/stack/iptables.go
+++ b/pkg/tcpip/stack/iptables.go
@@ -371,6 +371,7 @@ func (it *IPTables) startReaper(interval time.Duration) {
select {
case <-it.reaperDone:
return
+ // TODO(gvisor.dev/issue/5939): do not use the ambient clock.
case <-time.After(interval):
bucket, interval = it.connections.reapUnused(bucket, interval)
}
diff --git a/pkg/tcpip/stack/neighbor_cache.go b/pkg/tcpip/stack/neighbor_cache.go
index 509f5ce5c..08857e1a9 100644
--- a/pkg/tcpip/stack/neighbor_cache.go
+++ b/pkg/tcpip/stack/neighbor_cache.go
@@ -310,7 +310,7 @@ func (n *neighborCache) handleUpperLevelConfirmation(addr tcpip.Address) {
func (n *neighborCache) init(nic *nic, r LinkAddressResolver) {
*n = neighborCache{
nic: nic,
- state: NewNUDState(nic.stack.nudConfigs, nic.stack.randomGenerator),
+ state: NewNUDState(nic.stack.nudConfigs, nic.stack.clock, nic.stack.randomGenerator),
linkRes: r,
}
n.mu.Lock()
diff --git a/pkg/tcpip/stack/nud.go b/pkg/tcpip/stack/nud.go
index dac94cbe4..ca9822bca 100644
--- a/pkg/tcpip/stack/nud.go
+++ b/pkg/tcpip/stack/nud.go
@@ -316,7 +316,8 @@ func calcMaxRandomFactor(minRandomFactor float32) float32 {
// NUDState stores states needed for calculating reachable time.
type NUDState struct {
- rng *rand.Rand
+ clock tcpip.Clock
+ rng *rand.Rand
mu struct {
sync.RWMutex
@@ -337,9 +338,10 @@ type NUDState struct {
// NewNUDState returns new NUDState using c as configuration and the specified
// random number generator for use in recomputing ReachableTime.
-func NewNUDState(c NUDConfigurations, rng *rand.Rand) *NUDState {
+func NewNUDState(c NUDConfigurations, clock tcpip.Clock, rng *rand.Rand) *NUDState {
s := &NUDState{
- rng: rng,
+ clock: clock,
+ rng: rng,
}
s.mu.config = c
return s
@@ -367,7 +369,7 @@ func (s *NUDState) ReachableTime() time.Duration {
s.mu.Lock()
defer s.mu.Unlock()
- if time.Now().After(s.mu.expiration) ||
+ if s.clock.Now().After(s.mu.expiration) ||
s.mu.config.BaseReachableTime != s.mu.prevBaseReachableTime ||
s.mu.config.MinRandomFactor != s.mu.prevMinRandomFactor ||
s.mu.config.MaxRandomFactor != s.mu.prevMaxRandomFactor {
@@ -416,5 +418,5 @@ func (s *NUDState) recomputeReachableTimeLocked() {
s.mu.reachableTime = time.Duration(reachableTime)
}
- s.mu.expiration = time.Now().Add(2 * time.Hour)
+ s.mu.expiration = s.clock.Now().Add(2 * time.Hour)
}
diff --git a/pkg/tcpip/stack/nud_test.go b/pkg/tcpip/stack/nud_test.go
index bfce2fe16..1aeb2f8a5 100644
--- a/pkg/tcpip/stack/nud_test.go
+++ b/pkg/tcpip/stack/nud_test.go
@@ -711,7 +711,8 @@ func TestNUDStateReachableTime(t *testing.T) {
rng := fakeRand{
num: defaultFakeRandomNum,
}
- s := stack.NewNUDState(c, rand.New(&rng))
+ var clock faketime.NullClock
+ s := stack.NewNUDState(c, &clock, rand.New(&rng))
if got, want := s.ReachableTime(), test.want; got != want {
t.Errorf("got ReachableTime = %q, want = %q", got, want)
}
@@ -783,7 +784,8 @@ func TestNUDStateRecomputeReachableTime(t *testing.T) {
rng := fakeRand{
num: defaultFakeRandomNum,
}
- s := stack.NewNUDState(c, rand.New(&rng))
+ var clock faketime.NullClock
+ s := stack.NewNUDState(c, &clock, rand.New(&rng))
old := s.ReachableTime()
if got, want := s.ReachableTime(), old; got != want {
diff --git a/pkg/tcpip/stack/stack_global_state.go b/pkg/tcpip/stack/stack_global_state.go
index 33824afd0..dfec4258a 100644
--- a/pkg/tcpip/stack/stack_global_state.go
+++ b/pkg/tcpip/stack/stack_global_state.go
@@ -14,78 +14,6 @@
package stack
-import "time"
-
// StackFromEnv is the global stack created in restore run.
// FIXME(b/36201077)
var StackFromEnv *Stack
-
-// saveT is invoked by stateify.
-func (t *TCPCubicState) saveT() unixTime {
- return unixTime{t.T.Unix(), t.T.UnixNano()}
-}
-
-// loadT is invoked by stateify.
-func (t *TCPCubicState) loadT(unix unixTime) {
- t.T = time.Unix(unix.second, unix.nano)
-}
-
-// saveXmitTime is invoked by stateify.
-func (t *TCPRACKState) saveXmitTime() unixTime {
- return unixTime{t.XmitTime.Unix(), t.XmitTime.UnixNano()}
-}
-
-// loadXmitTime is invoked by stateify.
-func (t *TCPRACKState) loadXmitTime(unix unixTime) {
- t.XmitTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveLastSendTime is invoked by stateify.
-func (t *TCPSenderState) saveLastSendTime() unixTime {
- return unixTime{t.LastSendTime.Unix(), t.LastSendTime.UnixNano()}
-}
-
-// loadLastSendTime is invoked by stateify.
-func (t *TCPSenderState) loadLastSendTime(unix unixTime) {
- t.LastSendTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveRTTMeasureTime is invoked by stateify.
-func (t *TCPSenderState) saveRTTMeasureTime() unixTime {
- return unixTime{t.RTTMeasureTime.Unix(), t.RTTMeasureTime.UnixNano()}
-}
-
-// loadRTTMeasureTime is invoked by stateify.
-func (t *TCPSenderState) loadRTTMeasureTime(unix unixTime) {
- t.RTTMeasureTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveMeasureTime is invoked by stateify.
-func (r *RcvBufAutoTuneParams) saveMeasureTime() unixTime {
- return unixTime{r.MeasureTime.Unix(), r.MeasureTime.UnixNano()}
-}
-
-// loadMeasureTime is invoked by stateify.
-func (r *RcvBufAutoTuneParams) loadMeasureTime(unix unixTime) {
- r.MeasureTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveRTTMeasureTime is invoked by stateify.
-func (r *RcvBufAutoTuneParams) saveRTTMeasureTime() unixTime {
- return unixTime{r.RTTMeasureTime.Unix(), r.RTTMeasureTime.UnixNano()}
-}
-
-// loadRTTMeasureTime is invoked by stateify.
-func (r *RcvBufAutoTuneParams) loadRTTMeasureTime(unix unixTime) {
- r.RTTMeasureTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveSegTime is invoked by stateify.
-func (t *TCPEndpointState) saveSegTime() unixTime {
- return unixTime{t.SegTime.Unix(), t.SegTime.UnixNano()}
-}
-
-// loadSegTime is invoked by stateify.
-func (t *TCPEndpointState) loadSegTime(unix unixTime) {
- t.SegTime = time.Unix(unix.second, unix.nano)
-}
diff --git a/pkg/tcpip/stack/tcp.go b/pkg/tcpip/stack/tcp.go
index ddff6e2d6..e90c1a770 100644
--- a/pkg/tcpip/stack/tcp.go
+++ b/pkg/tcpip/stack/tcp.go
@@ -39,7 +39,7 @@ type TCPCubicState struct {
WMax float64
// T is the time when the current congestion avoidance was entered.
- T time.Time `state:".(unixTime)"`
+ T tcpip.MonotonicTime
// TimeSinceLastCongestion denotes the time since the current
// congestion avoidance was entered.
@@ -78,7 +78,7 @@ type TCPCubicState struct {
type TCPRACKState struct {
// XmitTime is the transmission timestamp of the most recent
// acknowledged segment.
- XmitTime time.Time `state:".(unixTime)"`
+ XmitTime tcpip.MonotonicTime
// EndSequence is the ending TCP sequence number of the most recent
// acknowledged segment.
@@ -216,7 +216,7 @@ type TCPRTTState struct {
// +stateify savable
type TCPSenderState struct {
// LastSendTime is the timestamp at which we sent the last segment.
- LastSendTime time.Time `state:".(unixTime)"`
+ LastSendTime tcpip.MonotonicTime
// DupAckCount is the number of Duplicate ACKs received. It is used for
// fast retransmit.
@@ -256,7 +256,7 @@ type TCPSenderState struct {
RTTMeasureSeqNum seqnum.Value
// RTTMeasureTime is the time when the RTTMeasureSeqNum was sent.
- RTTMeasureTime time.Time `state:".(unixTime)"`
+ RTTMeasureTime tcpip.MonotonicTime
// Closed indicates that the caller has closed the endpoint for
// sending.
@@ -313,7 +313,7 @@ type TCPSACKInfo struct {
type RcvBufAutoTuneParams struct {
// MeasureTime is the time at which the current measurement was
// started.
- MeasureTime time.Time `state:".(unixTime)"`
+ MeasureTime tcpip.MonotonicTime
// CopiedBytes is the number of bytes copied to user space since this
// measure began.
@@ -341,7 +341,7 @@ type RcvBufAutoTuneParams struct {
// RTTMeasureTime is the absolute time at which the current RTT
// measurement period began.
- RTTMeasureTime time.Time `state:".(unixTime)"`
+ RTTMeasureTime tcpip.MonotonicTime
// Disabled is true if an explicit receive buffer is set for the
// endpoint.
@@ -429,7 +429,7 @@ type TCPEndpointState struct {
ID TCPEndpointID
// SegTime denotes the absolute time when this segment was received.
- SegTime time.Time `state:".(unixTime)"`
+ SegTime tcpip.MonotonicTime
// RcvBufState contains information about the state of the endpoint's
// receive socket buffer.
diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go
index ca98da847..91622fa4c 100644
--- a/pkg/tcpip/tcpip.go
+++ b/pkg/tcpip/tcpip.go
@@ -76,6 +76,11 @@ func (mt MonotonicTime) Before(u MonotonicTime) bool {
return mt.nanoseconds < u.nanoseconds
}
+// After reports whether the monotonic clock reading mt is after u.
+func (mt MonotonicTime) After(u MonotonicTime) bool {
+ return mt.nanoseconds > u.nanoseconds
+}
+
// Add returns the monotonic clock reading mt+d.
func (mt MonotonicTime) Add(d time.Duration) MonotonicTime {
return MonotonicTime{
diff --git a/pkg/tcpip/tests/integration/link_resolution_test.go b/pkg/tcpip/tests/integration/link_resolution_test.go
index 4c06460e4..16fd7a99a 100644
--- a/pkg/tcpip/tests/integration/link_resolution_test.go
+++ b/pkg/tcpip/tests/integration/link_resolution_test.go
@@ -18,6 +18,7 @@ import (
"bytes"
"fmt"
"net"
+ "runtime"
"testing"
"time"
@@ -287,9 +288,11 @@ func TestTCPLinkResolutionFailure(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
+ clock := faketime.NewManualClock()
stackOpts := stack.Options{
NetworkProtocols: []stack.NetworkProtocolFactory{arp.NewProtocol, ipv4.NewProtocol, ipv6.NewProtocol},
TransportProtocols: []stack.TransportProtocolFactory{tcp.NewProtocol},
+ Clock: clock,
}
host1Stack, host2Stack := setupStack(t, stackOpts, host1NICID, host2NICID)
@@ -333,7 +336,17 @@ func TestTCPLinkResolutionFailure(t *testing.T) {
// Wait for an error due to link resolution failing, or the endpoint to be
// writable.
+ if test.expectedWriteErr != nil {
+ nudConfigs, err := host1Stack.NUDConfigurations(host1NICID, test.netProto)
+ if err != nil {
+ t.Fatalf("host1Stack.NUDConfigurations(%d, %d): %s", host1NICID, test.netProto, err)
+ }
+ clock.Advance(time.Duration(nudConfigs.MaxMulticastProbes) * nudConfigs.RetransmitTimer)
+ } else {
+ clock.RunImmediatelyScheduledJobs()
+ }
<-ch
+
{
var r bytes.Reader
r.Reset([]byte{0})
@@ -693,8 +706,10 @@ func TestGetLinkAddress(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
+ clock := faketime.NewManualClock()
stackOpts := stack.Options{
NetworkProtocols: []stack.NetworkProtocolFactory{arp.NewProtocol, ipv4.NewProtocol, ipv6.NewProtocol},
+ Clock: clock,
}
host1Stack, _ := setupStack(t, stackOpts, host1NICID, host2NICID)
@@ -710,8 +725,20 @@ func TestGetLinkAddress(t *testing.T) {
if test.expectedErr == nil {
wantRes.LinkAddress = utils.LinkAddr2
}
- if diff := cmp.Diff(wantRes, <-ch); diff != "" {
- t.Fatalf("link resolution result mismatch (-want +got):\n%s", diff)
+
+ nudConfigs, err := host1Stack.NUDConfigurations(host1NICID, test.netProto)
+ if err != nil {
+ t.Fatalf("host1Stack.NUDConfigurations(%d, %d): %s", host1NICID, test.netProto, err)
+ }
+
+ clock.Advance(time.Duration(nudConfigs.MaxMulticastProbes) * nudConfigs.RetransmitTimer)
+ select {
+ case got := <-ch:
+ if diff := cmp.Diff(wantRes, got); diff != "" {
+ t.Fatalf("link resolution result mismatch (-want +got):\n%s", diff)
+ }
+ default:
+ t.Fatal("event didn't arrive")
}
})
}
@@ -788,8 +815,10 @@ func TestRouteResolvedFields(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
+ clock := faketime.NewManualClock()
stackOpts := stack.Options{
NetworkProtocols: []stack.NetworkProtocolFactory{arp.NewProtocol, ipv4.NewProtocol, ipv6.NewProtocol},
+ Clock: clock,
}
host1Stack, _ := setupStack(t, stackOpts, host1NICID, host2NICID)
@@ -819,8 +848,20 @@ func TestRouteResolvedFields(t *testing.T) {
if _, ok := err.(*tcpip.ErrWouldBlock); !ok {
t.Errorf("got r.ResolvedFields(_) = %s, want = %s", err, &tcpip.ErrWouldBlock{})
}
- if diff := cmp.Diff(stack.ResolvedFieldsResult{RouteInfo: wantRouteInfo, Err: test.expectedErr}, <-ch, cmp.AllowUnexported(stack.RouteInfo{})); diff != "" {
- t.Errorf("route resolve result mismatch (-want +got):\n%s", diff)
+
+ nudConfigs, err := host1Stack.NUDConfigurations(host1NICID, test.netProto)
+ if err != nil {
+ t.Fatalf("host1Stack.NUDConfigurations(%d, %d): %s", host1NICID, test.netProto, err)
+ }
+ clock.Advance(time.Duration(nudConfigs.MaxMulticastProbes) * nudConfigs.RetransmitTimer)
+
+ select {
+ case got := <-ch:
+ if diff := cmp.Diff(stack.ResolvedFieldsResult{RouteInfo: wantRouteInfo, Err: test.expectedErr}, got, cmp.AllowUnexported(stack.RouteInfo{})); diff != "" {
+ t.Errorf("route resolve result mismatch (-want +got):\n%s", diff)
+ }
+ default:
+ t.Fatalf("event didn't arrive")
}
if test.expectedErr != nil {
@@ -1033,11 +1074,16 @@ func (d *nudDispatcher) OnNeighborRemoved(nicID tcpip.NICID, entry stack.Neighbo
d.c <- e
}
-func (d *nudDispatcher) waitForEvent(want eventInfo) error {
- if diff := cmp.Diff(want, <-d.c, cmp.AllowUnexported(eventInfo{}), cmpopts.IgnoreFields(stack.NeighborEntry{}, "UpdatedAt")); diff != "" {
- return fmt.Errorf("got invalid event (-want +got):\n%s", diff)
+func (d *nudDispatcher) expectEvent(want eventInfo) error {
+ select {
+ case got := <-d.c:
+ if diff := cmp.Diff(want, got, cmp.AllowUnexported(eventInfo{}), cmpopts.IgnoreFields(stack.NeighborEntry{}, "UpdatedAt")); diff != "" {
+ return fmt.Errorf("got invalid event (-want +got):\n%s", diff)
+ }
+ return nil
+ default:
+ return fmt.Errorf("event didn't arrive")
}
- return nil
}
// TestTCPConfirmNeighborReachability tests that TCP informs layers beneath it
@@ -1048,7 +1094,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
netProto tcpip.NetworkProtocolNumber
remoteAddr tcpip.Address
neighborAddr tcpip.Address
- getEndpoints func(*testing.T, *stack.Stack, *stack.Stack, *stack.Stack) (tcpip.Endpoint, tcpip.Endpoint, <-chan struct{})
+ getEndpoints func(*testing.T, *stack.Stack, *stack.Stack, *stack.Stack) (tcpip.Endpoint, <-chan struct{}, tcpip.Endpoint, <-chan struct{})
isHost1Listener bool
}{
{
@@ -1056,23 +1102,25 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
netProto: ipv4.ProtocolNumber,
remoteAddr: utils.Host2IPv4Addr.AddressWithPrefix.Address,
neighborAddr: utils.RouterNIC1IPv4Addr.AddressWithPrefix.Address,
- getEndpoints: func(t *testing.T, host1Stack, _, host2Stack *stack.Stack) (tcpip.Endpoint, tcpip.Endpoint, <-chan struct{}) {
+ getEndpoints: func(t *testing.T, host1Stack, _, host2Stack *stack.Stack) (tcpip.Endpoint, <-chan struct{}, tcpip.Endpoint, <-chan struct{}) {
var listenerWQ waiter.Queue
+ listenerWE, listenerCH := waiter.NewChannelEntry(nil)
+ listenerWQ.EventRegister(&listenerWE, waiter.EventIn)
listenerEP, err := host2Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &listenerWQ)
if err != nil {
t.Fatalf("host2Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv4.ProtocolNumber, err)
}
+ t.Cleanup(listenerEP.Close)
var clientWQ waiter.Queue
clientWE, clientCH := waiter.NewChannelEntry(nil)
- clientWQ.EventRegister(&clientWE, waiter.WritableEvents)
+ clientWQ.EventRegister(&clientWE, waiter.ReadableEvents|waiter.WritableEvents)
clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ)
if err != nil {
- listenerEP.Close()
t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv4.ProtocolNumber, err)
}
- return listenerEP, clientEP, clientCH
+ return listenerEP, listenerCH, clientEP, clientCH
},
},
{
@@ -1080,23 +1128,25 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
netProto: ipv6.ProtocolNumber,
remoteAddr: utils.Host2IPv6Addr.AddressWithPrefix.Address,
neighborAddr: utils.RouterNIC1IPv6Addr.AddressWithPrefix.Address,
- getEndpoints: func(t *testing.T, host1Stack, _, host2Stack *stack.Stack) (tcpip.Endpoint, tcpip.Endpoint, <-chan struct{}) {
+ getEndpoints: func(t *testing.T, host1Stack, _, host2Stack *stack.Stack) (tcpip.Endpoint, <-chan struct{}, tcpip.Endpoint, <-chan struct{}) {
var listenerWQ waiter.Queue
+ listenerWE, listenerCH := waiter.NewChannelEntry(nil)
+ listenerWQ.EventRegister(&listenerWE, waiter.EventIn)
listenerEP, err := host2Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &listenerWQ)
if err != nil {
t.Fatalf("host2Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv6.ProtocolNumber, err)
}
+ t.Cleanup(listenerEP.Close)
var clientWQ waiter.Queue
clientWE, clientCH := waiter.NewChannelEntry(nil)
- clientWQ.EventRegister(&clientWE, waiter.WritableEvents)
+ clientWQ.EventRegister(&clientWE, waiter.ReadableEvents|waiter.WritableEvents)
clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ)
if err != nil {
- listenerEP.Close()
t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv6.ProtocolNumber, err)
}
- return listenerEP, clientEP, clientCH
+ return listenerEP, listenerCH, clientEP, clientCH
},
},
{
@@ -1104,23 +1154,25 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
netProto: ipv4.ProtocolNumber,
remoteAddr: utils.RouterNIC1IPv4Addr.AddressWithPrefix.Address,
neighborAddr: utils.RouterNIC1IPv4Addr.AddressWithPrefix.Address,
- getEndpoints: func(t *testing.T, host1Stack, routerStack, _ *stack.Stack) (tcpip.Endpoint, tcpip.Endpoint, <-chan struct{}) {
+ getEndpoints: func(t *testing.T, host1Stack, routerStack, _ *stack.Stack) (tcpip.Endpoint, <-chan struct{}, tcpip.Endpoint, <-chan struct{}) {
var listenerWQ waiter.Queue
+ listenerWE, listenerCH := waiter.NewChannelEntry(nil)
+ listenerWQ.EventRegister(&listenerWE, waiter.EventIn)
listenerEP, err := routerStack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &listenerWQ)
if err != nil {
t.Fatalf("routerStack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv4.ProtocolNumber, err)
}
+ t.Cleanup(listenerEP.Close)
var clientWQ waiter.Queue
clientWE, clientCH := waiter.NewChannelEntry(nil)
- clientWQ.EventRegister(&clientWE, waiter.WritableEvents)
+ clientWQ.EventRegister(&clientWE, waiter.ReadableEvents|waiter.WritableEvents)
clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ)
if err != nil {
- listenerEP.Close()
t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv4.ProtocolNumber, err)
}
- return listenerEP, clientEP, clientCH
+ return listenerEP, listenerCH, clientEP, clientCH
},
},
{
@@ -1128,23 +1180,25 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
netProto: ipv6.ProtocolNumber,
remoteAddr: utils.RouterNIC1IPv6Addr.AddressWithPrefix.Address,
neighborAddr: utils.RouterNIC1IPv6Addr.AddressWithPrefix.Address,
- getEndpoints: func(t *testing.T, host1Stack, routerStack, _ *stack.Stack) (tcpip.Endpoint, tcpip.Endpoint, <-chan struct{}) {
+ getEndpoints: func(t *testing.T, host1Stack, routerStack, _ *stack.Stack) (tcpip.Endpoint, <-chan struct{}, tcpip.Endpoint, <-chan struct{}) {
var listenerWQ waiter.Queue
+ listenerWE, listenerCH := waiter.NewChannelEntry(nil)
+ listenerWQ.EventRegister(&listenerWE, waiter.EventIn)
listenerEP, err := routerStack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &listenerWQ)
if err != nil {
t.Fatalf("routerStack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv6.ProtocolNumber, err)
}
+ t.Cleanup(listenerEP.Close)
var clientWQ waiter.Queue
clientWE, clientCH := waiter.NewChannelEntry(nil)
- clientWQ.EventRegister(&clientWE, waiter.WritableEvents)
+ clientWQ.EventRegister(&clientWE, waiter.ReadableEvents|waiter.WritableEvents)
clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ)
if err != nil {
- listenerEP.Close()
t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv6.ProtocolNumber, err)
}
- return listenerEP, clientEP, clientCH
+ return listenerEP, listenerCH, clientEP, clientCH
},
},
{
@@ -1152,23 +1206,25 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
netProto: ipv4.ProtocolNumber,
remoteAddr: utils.Host1IPv4Addr.AddressWithPrefix.Address,
neighborAddr: utils.RouterNIC1IPv4Addr.AddressWithPrefix.Address,
- getEndpoints: func(t *testing.T, host1Stack, routerStack, _ *stack.Stack) (tcpip.Endpoint, tcpip.Endpoint, <-chan struct{}) {
+ getEndpoints: func(t *testing.T, host1Stack, routerStack, _ *stack.Stack) (tcpip.Endpoint, <-chan struct{}, tcpip.Endpoint, <-chan struct{}) {
var listenerWQ waiter.Queue
+ listenerWE, listenerCH := waiter.NewChannelEntry(nil)
+ listenerWQ.EventRegister(&listenerWE, waiter.EventIn)
listenerEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &listenerWQ)
if err != nil {
t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv4.ProtocolNumber, err)
}
+ t.Cleanup(listenerEP.Close)
var clientWQ waiter.Queue
clientWE, clientCH := waiter.NewChannelEntry(nil)
- clientWQ.EventRegister(&clientWE, waiter.WritableEvents)
+ clientWQ.EventRegister(&clientWE, waiter.ReadableEvents|waiter.WritableEvents)
clientEP, err := routerStack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ)
if err != nil {
- listenerEP.Close()
t.Fatalf("routerStack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv4.ProtocolNumber, err)
}
- return listenerEP, clientEP, clientCH
+ return listenerEP, listenerCH, clientEP, clientCH
},
isHost1Listener: true,
},
@@ -1177,23 +1233,25 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
netProto: ipv6.ProtocolNumber,
remoteAddr: utils.Host1IPv6Addr.AddressWithPrefix.Address,
neighborAddr: utils.RouterNIC1IPv6Addr.AddressWithPrefix.Address,
- getEndpoints: func(t *testing.T, host1Stack, routerStack, _ *stack.Stack) (tcpip.Endpoint, tcpip.Endpoint, <-chan struct{}) {
+ getEndpoints: func(t *testing.T, host1Stack, routerStack, _ *stack.Stack) (tcpip.Endpoint, <-chan struct{}, tcpip.Endpoint, <-chan struct{}) {
var listenerWQ waiter.Queue
+ listenerWE, listenerCH := waiter.NewChannelEntry(nil)
+ listenerWQ.EventRegister(&listenerWE, waiter.EventIn)
listenerEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &listenerWQ)
if err != nil {
t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv6.ProtocolNumber, err)
}
+ t.Cleanup(listenerEP.Close)
var clientWQ waiter.Queue
clientWE, clientCH := waiter.NewChannelEntry(nil)
- clientWQ.EventRegister(&clientWE, waiter.WritableEvents)
+ clientWQ.EventRegister(&clientWE, waiter.ReadableEvents|waiter.WritableEvents)
clientEP, err := routerStack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ)
if err != nil {
- listenerEP.Close()
t.Fatalf("routerStack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv6.ProtocolNumber, err)
}
- return listenerEP, clientEP, clientCH
+ return listenerEP, listenerCH, clientEP, clientCH
},
isHost1Listener: true,
},
@@ -1202,23 +1260,25 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
netProto: ipv4.ProtocolNumber,
remoteAddr: utils.Host1IPv4Addr.AddressWithPrefix.Address,
neighborAddr: utils.RouterNIC1IPv4Addr.AddressWithPrefix.Address,
- getEndpoints: func(t *testing.T, host1Stack, _, host2Stack *stack.Stack) (tcpip.Endpoint, tcpip.Endpoint, <-chan struct{}) {
+ getEndpoints: func(t *testing.T, host1Stack, _, host2Stack *stack.Stack) (tcpip.Endpoint, <-chan struct{}, tcpip.Endpoint, <-chan struct{}) {
var listenerWQ waiter.Queue
+ listenerWE, listenerCH := waiter.NewChannelEntry(nil)
+ listenerWQ.EventRegister(&listenerWE, waiter.EventIn)
listenerEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &listenerWQ)
if err != nil {
t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv4.ProtocolNumber, err)
}
+ t.Cleanup(listenerEP.Close)
var clientWQ waiter.Queue
clientWE, clientCH := waiter.NewChannelEntry(nil)
- clientWQ.EventRegister(&clientWE, waiter.WritableEvents)
+ clientWQ.EventRegister(&clientWE, waiter.ReadableEvents|waiter.WritableEvents)
clientEP, err := host2Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ)
if err != nil {
- listenerEP.Close()
t.Fatalf("host2Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv4.ProtocolNumber, err)
}
- return listenerEP, clientEP, clientCH
+ return listenerEP, listenerCH, clientEP, clientCH
},
isHost1Listener: true,
},
@@ -1227,23 +1287,25 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
netProto: ipv6.ProtocolNumber,
remoteAddr: utils.Host1IPv6Addr.AddressWithPrefix.Address,
neighborAddr: utils.RouterNIC1IPv6Addr.AddressWithPrefix.Address,
- getEndpoints: func(t *testing.T, host1Stack, _, host2Stack *stack.Stack) (tcpip.Endpoint, tcpip.Endpoint, <-chan struct{}) {
+ getEndpoints: func(t *testing.T, host1Stack, _, host2Stack *stack.Stack) (tcpip.Endpoint, <-chan struct{}, tcpip.Endpoint, <-chan struct{}) {
var listenerWQ waiter.Queue
+ listenerWE, listenerCH := waiter.NewChannelEntry(nil)
+ listenerWQ.EventRegister(&listenerWE, waiter.EventIn)
listenerEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &listenerWQ)
if err != nil {
t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv6.ProtocolNumber, err)
}
+ t.Cleanup(listenerEP.Close)
var clientWQ waiter.Queue
clientWE, clientCH := waiter.NewChannelEntry(nil)
- clientWQ.EventRegister(&clientWE, waiter.WritableEvents)
+ clientWQ.EventRegister(&clientWE, waiter.ReadableEvents|waiter.WritableEvents)
clientEP, err := host2Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ)
if err != nil {
- listenerEP.Close()
t.Fatalf("host2Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, ipv6.ProtocolNumber, err)
}
- return listenerEP, clientEP, clientCH
+ return listenerEP, listenerCH, clientEP, clientCH
},
isHost1Listener: true,
},
@@ -1281,14 +1343,14 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
t.Fatalf("link resolution mismatch (-want +got):\n%s", diff)
}
}
- if err := nudDisp.waitForEvent(eventInfo{
+ if err := nudDisp.expectEvent(eventInfo{
eventType: entryAdded,
nicID: utils.Host1NICID,
entry: stack.NeighborEntry{State: stack.Incomplete, Addr: test.neighborAddr},
}); err != nil {
t.Fatalf("error waiting for initial NUD event: %s", err)
}
- if err := nudDisp.waitForEvent(eventInfo{
+ if err := nudDisp.expectEvent(eventInfo{
eventType: entryChanged,
nicID: utils.Host1NICID,
entry: stack.NeighborEntry{State: stack.Reachable, Addr: test.neighborAddr, LinkAddr: utils.LinkAddr2},
@@ -1308,7 +1370,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
// See NUDConfigurations.BaseReachableTime for more information.
maxReachableTime := time.Duration(float32(nudConfigs.BaseReachableTime) * nudConfigs.MaxRandomFactor)
clock.Advance(maxReachableTime)
- if err := nudDisp.waitForEvent(eventInfo{
+ if err := nudDisp.expectEvent(eventInfo{
eventType: entryChanged,
nicID: utils.Host1NICID,
entry: stack.NeighborEntry{State: stack.Stale, Addr: test.neighborAddr, LinkAddr: utils.LinkAddr2},
@@ -1316,8 +1378,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
t.Fatalf("error waiting for stale NUD event: %s", err)
}
- listenerEP, clientEP, clientCH := test.getEndpoints(t, host1Stack, routerStack, host2Stack)
- defer listenerEP.Close()
+ listenerEP, listenerCH, clientEP, clientCH := test.getEndpoints(t, host1Stack, routerStack, host2Stack)
defer clientEP.Close()
listenerAddr := tcpip.FullAddress{Addr: test.remoteAddr, Port: 1234}
if err := listenerEP.Bind(listenerAddr); err != nil {
@@ -1338,14 +1399,15 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
// with confirmation that the neighbor is reachable (indicated by a
// successful 3-way handshake).
<-clientCH
- if err := nudDisp.waitForEvent(eventInfo{
+ if err := nudDisp.expectEvent(eventInfo{
eventType: entryChanged,
nicID: utils.Host1NICID,
entry: stack.NeighborEntry{State: stack.Delay, Addr: test.neighborAddr, LinkAddr: utils.LinkAddr2},
}); err != nil {
t.Fatalf("error waiting for delay NUD event: %s", err)
}
- if err := nudDisp.waitForEvent(eventInfo{
+ <-listenerCH
+ if err := nudDisp.expectEvent(eventInfo{
eventType: entryChanged,
nicID: utils.Host1NICID,
entry: stack.NeighborEntry{State: stack.Reachable, Addr: test.neighborAddr, LinkAddr: utils.LinkAddr2},
@@ -1353,26 +1415,55 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
t.Fatalf("error waiting for reachable NUD event: %s", err)
}
+ peerEP, peerWQ, err := listenerEP.Accept(nil)
+ if err != nil {
+ t.Fatalf("listenerEP.Accept(): %s", err)
+ }
+ defer peerEP.Close()
+ peerWE, peerCH := waiter.NewChannelEntry(nil)
+ peerWQ.EventRegister(&peerWE, waiter.ReadableEvents)
+
// Wait for the neighbor to be stale again then send data to the remote.
//
// On successful transmission, the neighbor should become reachable
// without probing the neighbor as a TCP ACK would be received which is an
// indication of the neighbor being reachable.
clock.Advance(maxReachableTime)
- if err := nudDisp.waitForEvent(eventInfo{
+ if err := nudDisp.expectEvent(eventInfo{
eventType: entryChanged,
nicID: utils.Host1NICID,
entry: stack.NeighborEntry{State: stack.Stale, Addr: test.neighborAddr, LinkAddr: utils.LinkAddr2},
}); err != nil {
t.Fatalf("error waiting for stale NUD event: %s", err)
}
- var r bytes.Reader
- r.Reset([]byte{0})
- var wOpts tcpip.WriteOptions
- if _, err := clientEP.Write(&r, wOpts); err != nil {
- t.Errorf("clientEP.Write(_, %#v): %s", wOpts, err)
+ {
+ var r bytes.Reader
+ r.Reset([]byte{0})
+ var wOpts tcpip.WriteOptions
+ if _, err := clientEP.Write(&r, wOpts); err != nil {
+ t.Errorf("clientEP.Write(_, %#v): %s", wOpts, err)
+ }
+ }
+ // Heads up, there is a race here.
+ //
+ // Incoming TCP segments are handled in
+ // tcp.(*endpoint).handleSegmentLocked:
+ //
+ // - tcp.(*endpoint).rcv.handleRcvdSegment puts the segment on the
+ // segment queue and notifies waiting readers (such as this channel)
+ //
+ // - tcp.(*endpoint).snd.handleRcvdSegment sends an ACK for the segment
+ // and notifies the NUD machinery that the peer is reachable
+ //
+ // Thus we must permit a delay between the readable signal and the
+ // expected NUD event.
+ //
+ // At the time of writing, this race is reliably hit with gotsan.
+ <-peerCH
+ for len(nudDisp.c) == 0 {
+ runtime.Gosched()
}
- if err := nudDisp.waitForEvent(eventInfo{
+ if err := nudDisp.expectEvent(eventInfo{
eventType: entryChanged,
nicID: utils.Host1NICID,
entry: stack.NeighborEntry{State: stack.Delay, Addr: test.neighborAddr, LinkAddr: utils.LinkAddr2},
@@ -1385,7 +1476,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
// TCP should not mark the route reachable and NUD should go through the
// probe state.
clock.Advance(nudConfigs.DelayFirstProbeTime)
- if err := nudDisp.waitForEvent(eventInfo{
+ if err := nudDisp.expectEvent(eventInfo{
eventType: entryChanged,
nicID: utils.Host1NICID,
entry: stack.NeighborEntry{State: stack.Probe, Addr: test.neighborAddr, LinkAddr: utils.LinkAddr2},
@@ -1393,7 +1484,16 @@ func TestTCPConfirmNeighborReachability(t *testing.T) {
t.Fatalf("error waiting for probe NUD event: %s", err)
}
}
- if err := nudDisp.waitForEvent(eventInfo{
+ {
+ var r bytes.Reader
+ r.Reset([]byte{0})
+ var wOpts tcpip.WriteOptions
+ if _, err := peerEP.Write(&r, wOpts); err != nil {
+ t.Errorf("peerEP.Write(_, %#v): %s", wOpts, err)
+ }
+ }
+ <-clientCH
+ if err := nudDisp.expectEvent(eventInfo{
eventType: entryChanged,
nicID: utils.Host1NICID,
entry: stack.NeighborEntry{State: stack.Reachable, Addr: test.neighborAddr, LinkAddr: utils.LinkAddr2},
diff --git a/pkg/tcpip/timer_test.go b/pkg/tcpip/timer_test.go
index e6783b126..8c43dd627 100644
--- a/pkg/tcpip/timer_test.go
+++ b/pkg/tcpip/timer_test.go
@@ -38,6 +38,21 @@ func TestMonotonicTimeBefore(t *testing.T) {
}
}
+func TestMonotonicTimeAfter(t *testing.T) {
+ var mt tcpip.MonotonicTime
+ if mt.After(mt) {
+ t.Errorf("%#v.After(%#v)", mt, mt)
+ }
+
+ one := mt.Add(1)
+ if mt.After(one) {
+ t.Errorf("%#v.After(%#v)", mt, one)
+ }
+ if !one.After(mt) {
+ t.Errorf("!%#v.After(%#v)", one, mt)
+ }
+}
+
func TestMonotonicTimeAddSub(t *testing.T) {
var mt tcpip.MonotonicTime
if one, two := mt.Add(2), mt.Add(1).Add(1); one != two {
diff --git a/pkg/tcpip/transport/tcp/BUILD b/pkg/tcpip/transport/tcp/BUILD
index 0f20d3856..8436d2cf0 100644
--- a/pkg/tcpip/transport/tcp/BUILD
+++ b/pkg/tcpip/transport/tcp/BUILD
@@ -41,7 +41,6 @@ go_library(
"protocol.go",
"rack.go",
"rcv.go",
- "rcv_state.go",
"reno.go",
"reno_recovery.go",
"sack.go",
@@ -53,7 +52,6 @@ go_library(
"segment_state.go",
"segment_unsafe.go",
"snd.go",
- "snd_state.go",
"tcp_endpoint_list.go",
"tcp_segment_list.go",
"timer.go",
@@ -134,6 +132,7 @@ go_test(
deps = [
"//pkg/sleep",
"//pkg/tcpip/buffer",
+ "//pkg/tcpip/faketime",
"//pkg/tcpip/stack",
"@com_github_google_go_cmp//cmp:go_default_library",
],
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index d4bd4e80e..4a33fbc24 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -114,8 +114,8 @@ type listenContext struct {
}
// timeStamp returns an 8-bit timestamp with a granularity of 64 seconds.
-func timeStamp() uint32 {
- return uint32(time.Now().Unix()>>6) & tsMask
+func timeStamp(clock tcpip.Clock) uint32 {
+ return uint32(clock.NowMonotonic().Sub(tcpip.MonotonicTime{}).Seconds()) >> 6 & tsMask
}
// newListenContext creates a new listen context.
@@ -171,7 +171,7 @@ func (l *listenContext) cookieHash(id stack.TransportEndpointID, ts uint32, nonc
// createCookie creates a SYN cookie for the given id and incoming sequence
// number.
func (l *listenContext) createCookie(id stack.TransportEndpointID, seq seqnum.Value, data uint32) seqnum.Value {
- ts := timeStamp()
+ ts := timeStamp(l.stack.Clock())
v := l.cookieHash(id, 0, 0) + uint32(seq) + (ts << tsOffset)
v += (l.cookieHash(id, ts, 1) + data) & hashMask
return seqnum.Value(v)
@@ -181,7 +181,7 @@ func (l *listenContext) createCookie(id stack.TransportEndpointID, seq seqnum.Va
// sequence number. If it is, it also returns the data originally encoded in the
// cookie when createCookie was called.
func (l *listenContext) isCookieValid(id stack.TransportEndpointID, cookie seqnum.Value, seq seqnum.Value) (uint32, bool) {
- ts := timeStamp()
+ ts := timeStamp(l.stack.Clock())
v := uint32(cookie) - l.cookieHash(id, 0, 0) - uint32(seq)
cookieTS := v >> tsOffset
if ((ts - cookieTS) & tsMask) > maxTSDiff {
@@ -247,7 +247,7 @@ func (l *listenContext) createConnectingEndpoint(s *segment, rcvdSynOpts *header
func (l *listenContext) startHandshake(s *segment, opts *header.TCPSynOptions, queue *waiter.Queue, owner tcpip.PacketOwner) (*handshake, tcpip.Error) {
// Create new endpoint.
irs := s.sequenceNumber
- isn := generateSecureISN(s.id, l.stack.Seed())
+ isn := generateSecureISN(s.id, l.stack.Clock(), l.stack.Seed())
ep, err := l.createConnectingEndpoint(s, opts, queue)
if err != nil {
return nil, err
@@ -591,7 +591,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err
synOpts := header.TCPSynOptions{
WS: -1,
TS: opts.TS,
- TSVal: tcpTimeStamp(time.Now(), timeStampOffset()),
+ TSVal: tcpTimeStamp(e.stack.Clock().NowMonotonic(), timeStampOffset()),
TSEcr: opts.TSVal,
MSS: calculateAdvertisedMSS(e.userMSS, route),
}
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 05b41e0f8..2fcdf1cda 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -92,7 +92,7 @@ type handshake struct {
rcvWndScale int
// startTime is the time at which the first SYN/SYN-ACK was sent.
- startTime time.Time
+ startTime tcpip.MonotonicTime
// deferAccept if non-zero will drop the final ACK for a passive
// handshake till an ACK segment with data is received or the timeout is
@@ -156,12 +156,12 @@ func (h *handshake) resetState() {
h.flags = header.TCPFlagSyn
h.ackNum = 0
h.mss = 0
- h.iss = generateSecureISN(h.ep.TransportEndpointInfo.ID, h.ep.stack.Seed())
+ h.iss = generateSecureISN(h.ep.TransportEndpointInfo.ID, h.ep.stack.Clock(), h.ep.stack.Seed())
}
// generateSecureISN generates a secure Initial Sequence number based on the
// recommendation here https://tools.ietf.org/html/rfc6528#page-3.
-func generateSecureISN(id stack.TransportEndpointID, seed uint32) seqnum.Value {
+func generateSecureISN(id stack.TransportEndpointID, clock tcpip.Clock, seed uint32) seqnum.Value {
isnHasher := jenkins.Sum32(seed)
isnHasher.Write([]byte(id.LocalAddress))
isnHasher.Write([]byte(id.RemoteAddress))
@@ -180,7 +180,7 @@ func generateSecureISN(id stack.TransportEndpointID, seed uint32) seqnum.Value {
//
// Which sort of guarantees that we won't reuse the ISN for a new
// connection for the same tuple for at least 274s.
- isn := isnHasher.Sum32() + uint32(time.Now().UnixNano()>>6)
+ isn := isnHasher.Sum32() + uint32(clock.NowMonotonic().Sub(tcpip.MonotonicTime{}).Nanoseconds()>>6)
return seqnum.Value(isn)
}
@@ -381,7 +381,7 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error {
if s.flagIsSet(header.TCPFlagAck) {
// If deferAccept is not zero and this is a bare ACK and the
// timeout is not hit then drop the ACK.
- if h.deferAccept != 0 && s.data.Size() == 0 && time.Since(h.startTime) < h.deferAccept {
+ if h.deferAccept != 0 && s.data.Size() == 0 && h.ep.stack.Clock().NowMonotonic().Sub(h.startTime) < h.deferAccept {
h.acked = true
h.ep.stack.Stats().DroppedPackets.Increment()
return nil
@@ -474,7 +474,7 @@ func (h *handshake) processSegments() tcpip.Error {
// start sends the first SYN/SYN-ACK. It does not block, even if link address
// resolution is required.
func (h *handshake) start() {
- h.startTime = time.Now()
+ h.startTime = h.ep.stack.Clock().NowMonotonic()
h.ep.amss = calculateAdvertisedMSS(h.ep.userMSS, h.ep.route)
var sackEnabled tcpip.TCPSACKEnabled
if err := h.ep.stack.TransportProtocolOption(ProtocolNumber, &sackEnabled); err != nil {
@@ -527,7 +527,7 @@ func (h *handshake) complete() tcpip.Error {
defer s.Done()
// Initialize the resend timer.
- timer, err := newBackoffTimer(time.Second, MaxRTO, resendWaker.Assert)
+ timer, err := newBackoffTimer(h.ep.stack.Clock(), time.Second, MaxRTO, resendWaker.Assert)
if err != nil {
return err
}
@@ -552,7 +552,7 @@ func (h *handshake) complete() tcpip.Error {
// The last is required to provide a way for the peer to complete
// the connection with another ACK or data (as ACKs are never
// retransmitted on their own).
- if h.active || !h.acked || h.deferAccept != 0 && time.Since(h.startTime) > h.deferAccept {
+ if h.active || !h.acked || h.deferAccept != 0 && h.ep.stack.Clock().NowMonotonic().Sub(h.startTime) > h.deferAccept {
h.ep.sendSynTCP(h.ep.route, tcpFields{
id: h.ep.TransportEndpointInfo.ID,
ttl: h.ep.ttl,
@@ -608,15 +608,15 @@ func (h *handshake) complete() tcpip.Error {
type backoffTimer struct {
timeout time.Duration
maxTimeout time.Duration
- t *time.Timer
+ t tcpip.Timer
}
-func newBackoffTimer(timeout, maxTimeout time.Duration, f func()) (*backoffTimer, tcpip.Error) {
+func newBackoffTimer(clock tcpip.Clock, timeout, maxTimeout time.Duration, f func()) (*backoffTimer, tcpip.Error) {
if timeout > maxTimeout {
return nil, &tcpip.ErrTimeout{}
}
bt := &backoffTimer{timeout: timeout, maxTimeout: maxTimeout}
- bt.t = time.AfterFunc(timeout, f)
+ bt.t = clock.AfterFunc(timeout, f)
return bt, nil
}
@@ -1267,7 +1267,7 @@ func (e *endpoint) keepaliveTimerExpired() tcpip.Error {
// If a userTimeout is set then abort the connection if it is
// exceeded.
- if userTimeout != 0 && time.Since(e.rcv.lastRcvdAckTime) >= userTimeout && e.keepalive.unacked > 0 {
+ if userTimeout != 0 && e.stack.Clock().NowMonotonic().Sub(e.rcv.lastRcvdAckTime) >= userTimeout && e.keepalive.unacked > 0 {
e.keepalive.Unlock()
e.stack.Stats().TCP.EstablishedTimedout.Increment()
return &tcpip.ErrTimeout{}
@@ -1322,7 +1322,7 @@ func (e *endpoint) disableKeepaliveTimer() {
// segments.
func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{}) tcpip.Error {
e.mu.Lock()
- var closeTimer *time.Timer
+ var closeTimer tcpip.Timer
var closeWaker sleep.Waker
epilogue := func() {
@@ -1484,7 +1484,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
if e.EndpointState() == StateFinWait2 && e.closed {
// The socket has been closed and we are in FIN_WAIT2
// so start the FIN_WAIT2 timer.
- closeTimer = time.AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
+ closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
}
}
@@ -1721,7 +1721,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
var timeWaitWaker sleep.Waker
s.AddWaker(&timeWaitWaker, timeWaitDone)
- timeWaitTimer := time.AfterFunc(timeWaitDuration, timeWaitWaker.Assert)
+ timeWaitTimer := e.stack.Clock().AfterFunc(timeWaitDuration, timeWaitWaker.Assert)
defer timeWaitTimer.Stop()
for {
diff --git a/pkg/tcpip/transport/tcp/cubic.go b/pkg/tcpip/transport/tcp/cubic.go
index 962f1d687..6985194bb 100644
--- a/pkg/tcpip/transport/tcp/cubic.go
+++ b/pkg/tcpip/transport/tcp/cubic.go
@@ -41,7 +41,7 @@ type cubicState struct {
func newCubicCC(s *sender) *cubicState {
return &cubicState{
TCPCubicState: stack.TCPCubicState{
- T: time.Now(),
+ T: s.ep.stack.Clock().NowMonotonic(),
Beta: 0.7,
C: 0.4,
},
@@ -60,7 +60,7 @@ func (c *cubicState) enterCongestionAvoidance() {
// https://tools.ietf.org/html/rfc8312#section-4.8
if c.numCongestionEvents == 0 {
c.K = 0
- c.T = time.Now()
+ c.T = c.s.ep.stack.Clock().NowMonotonic()
c.WLastMax = c.WMax
c.WMax = float64(c.s.SndCwnd)
}
@@ -115,14 +115,15 @@ func (c *cubicState) cubicCwnd(t float64) float64 {
// getCwnd returns the current congestion window as computed by CUBIC.
// Refer: https://tools.ietf.org/html/rfc8312#section-4
func (c *cubicState) getCwnd(packetsAcked, sndCwnd int, srtt time.Duration) int {
- elapsed := time.Since(c.T).Seconds()
+ elapsed := c.s.ep.stack.Clock().NowMonotonic().Sub(c.T)
+ elapsedSeconds := elapsed.Seconds()
// Compute the window as per Cubic after 'elapsed' time
// since last congestion event.
- c.WC = c.cubicCwnd(elapsed - c.K)
+ c.WC = c.cubicCwnd(elapsedSeconds - c.K)
// Compute the TCP friendly estimate of the congestion window.
- c.WEst = c.WMax*c.Beta + (3.0*((1.0-c.Beta)/(1.0+c.Beta)))*(elapsed/srtt.Seconds())
+ c.WEst = c.WMax*c.Beta + (3.0*((1.0-c.Beta)/(1.0+c.Beta)))*(elapsedSeconds/srtt.Seconds())
// Make sure in the TCP friendly region CUBIC performs at least
// as well as Reno.
@@ -134,7 +135,7 @@ func (c *cubicState) getCwnd(packetsAcked, sndCwnd int, srtt time.Duration) int
// In Concave/Convex region of CUBIC, calculate what CUBIC window
// will be after 1 RTT and use that to grow congestion window
// for every ack.
- tEst := (time.Since(c.T) + srtt).Seconds()
+ tEst := (elapsed + srtt).Seconds()
wtRtt := c.cubicCwnd(tEst - c.K)
// As per 4.3 for each received ACK cwnd must be incremented
// by (w_cubic(t+RTT) - cwnd/cwnd.
@@ -151,7 +152,7 @@ func (c *cubicState) getCwnd(packetsAcked, sndCwnd int, srtt time.Duration) int
func (c *cubicState) HandleLossDetected() {
// See: https://tools.ietf.org/html/rfc8312#section-4.5
c.numCongestionEvents++
- c.T = time.Now()
+ c.T = c.s.ep.stack.Clock().NowMonotonic()
c.WLastMax = c.WMax
c.WMax = float64(c.s.SndCwnd)
@@ -162,7 +163,7 @@ func (c *cubicState) HandleLossDetected() {
// HandleRTOExpired implements congestionContrl.HandleRTOExpired.
func (c *cubicState) HandleRTOExpired() {
// See: https://tools.ietf.org/html/rfc8312#section-4.6
- c.T = time.Now()
+ c.T = c.s.ep.stack.Clock().NowMonotonic()
c.numCongestionEvents = 0
c.WLastMax = c.WMax
c.WMax = float64(c.s.SndCwnd)
@@ -193,7 +194,7 @@ func (c *cubicState) fastConvergence() {
// PostRecovery implemements congestionControl.PostRecovery.
func (c *cubicState) PostRecovery() {
- c.T = time.Now()
+ c.T = c.s.ep.stack.Clock().NowMonotonic()
}
// reduceSlowStartThreshold returns new SsThresh as described in
diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go
index 0ca986512..5963a169c 100644
--- a/pkg/tcpip/transport/tcp/dispatcher.go
+++ b/pkg/tcpip/transport/tcp/dispatcher.go
@@ -20,6 +20,7 @@ import (
"gvisor.dev/gvisor/pkg/rand"
"gvisor.dev/gvisor/pkg/sleep"
"gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/hash/jenkins"
"gvisor.dev/gvisor/pkg/tcpip/header"
"gvisor.dev/gvisor/pkg/tcpip/stack"
@@ -172,10 +173,10 @@ func (d *dispatcher) wait() {
d.wg.Wait()
}
-func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.TransportEndpointID, pkt *stack.PacketBuffer) {
+func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.TransportEndpointID, clock tcpip.Clock, pkt *stack.PacketBuffer) {
ep := stackEP.(*endpoint)
- s := newIncomingSegment(id, pkt)
+ s := newIncomingSegment(id, clock, pkt)
if !s.parse(pkt.RXTransportChecksumValidated) {
ep.stack.Stats().TCP.InvalidSegmentsReceived.Increment()
ep.stats.ReceiveErrors.MalformedPacketsReceived.Increment()
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index fb7670adb..d44f480ab 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -480,7 +480,7 @@ type endpoint struct {
// recentTSTime is the unix time when we last updated
// TCPEndpointStateInner.RecentTS.
- recentTSTime time.Time `state:".(unixTime)"`
+ recentTSTime tcpip.MonotonicTime
// shutdownFlags represent the current shutdown state of the endpoint.
shutdownFlags tcpip.ShutdownFlags
@@ -638,7 +638,7 @@ type endpoint struct {
// lastOutOfWindowAckTime is the time at which the an ACK was sent in response
// to an out of window segment being received by this endpoint.
- lastOutOfWindowAckTime time.Time `state:".(unixTime)"`
+ lastOutOfWindowAckTime tcpip.MonotonicTime
}
// UniqueID implements stack.TransportEndpoint.UniqueID.
@@ -787,7 +787,7 @@ func (e *endpoint) EndpointState() EndpointState {
// setRecentTimestamp sets the recentTS field to the provided value.
func (e *endpoint) setRecentTimestamp(recentTS uint32) {
e.RecentTS = recentTS
- e.recentTSTime = time.Now()
+ e.recentTSTime = e.stack.Clock().NowMonotonic()
}
// recentTimestamp returns the value of the recentTS field.
@@ -884,7 +884,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
e.segmentQueue.ep = e
e.TSOffset = timeStampOffset()
e.acceptCond = sync.NewCond(&e.acceptMu)
- e.keepalive.timer.init(&e.keepalive.waker)
+ e.keepalive.timer.init(e.stack.Clock(), &e.keepalive.waker)
return e
}
@@ -1201,7 +1201,7 @@ func (e *endpoint) ModerateRecvBuf(copied int) {
e.rcvQueueInfo.rcvQueueMu.Unlock()
return
}
- now := time.Now()
+ now := e.stack.Clock().NowMonotonic()
if rtt := e.rcvQueueInfo.RcvAutoParams.RTT; rtt == 0 || now.Sub(e.rcvQueueInfo.RcvAutoParams.MeasureTime) < rtt {
e.rcvQueueInfo.RcvAutoParams.CopiedBytes += copied
e.rcvQueueInfo.rcvQueueMu.Unlock()
@@ -1556,7 +1556,7 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp
}
// Add data to the send queue.
- s := newOutgoingSegment(e.TransportEndpointInfo.ID, v)
+ s := newOutgoingSegment(e.TransportEndpointInfo.ID, e.stack.Clock(), v)
e.sndQueueInfo.SndBufUsed += len(v)
e.sndQueueInfo.SndBufInQueue += seqnum.Size(len(v))
e.sndQueueInfo.sndQueue.PushBack(s)
@@ -2241,7 +2241,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
// If the endpoint is not in TIME-WAIT or if it is in TIME-WAIT but
// less than 1 second has elapsed since its recentTS was updated then
// we cannot reuse the port.
- if tcpEP.EndpointState() != StateTimeWait || time.Since(tcpEP.recentTSTime) < 1*time.Second {
+ if tcpEP.EndpointState() != StateTimeWait || e.stack.Clock().NowMonotonic().Sub(tcpEP.recentTSTime) < 1*time.Second {
tcpEP.UnlockUser()
return false, nil
}
@@ -2387,7 +2387,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error {
}
// Queue fin segment.
- s := newOutgoingSegment(e.TransportEndpointInfo.ID, nil)
+ s := newOutgoingSegment(e.TransportEndpointInfo.ID, e.stack.Clock(), nil)
e.sndQueueInfo.sndQueue.PushBack(s)
e.sndQueueInfo.SndBufInQueue++
// Mark endpoint as closed.
@@ -2865,14 +2865,15 @@ func (e *endpoint) maybeEnableTimestamp(synOpts *header.TCPSynOptions) {
// timestamp returns the timestamp value to be used in the TSVal field of the
// timestamp option for outgoing TCP segments for a given endpoint.
func (e *endpoint) timestamp() uint32 {
- return tcpTimeStamp(time.Now(), e.TSOffset)
+ return tcpTimeStamp(e.stack.Clock().NowMonotonic(), e.TSOffset)
}
// tcpTimeStamp returns a timestamp offset by the provided offset. This is
// not inlined above as it's used when SYN cookies are in use and endpoint
// is not created at the time when the SYN cookie is sent.
-func tcpTimeStamp(curTime time.Time, offset uint32) uint32 {
- return uint32(curTime.Unix()*1000+int64(curTime.Nanosecond()/1e6)) + offset
+func tcpTimeStamp(curTime tcpip.MonotonicTime, offset uint32) uint32 {
+ d := curTime.Sub(tcpip.MonotonicTime{})
+ return uint32(d.Milliseconds()) + offset
}
// timeStampOffset returns a randomized timestamp offset to be used when sending
@@ -2926,7 +2927,7 @@ func (e *endpoint) completeStateLocked() stack.TCPEndpointState {
s := stack.TCPEndpointState{
TCPEndpointStateInner: e.TCPEndpointStateInner,
ID: stack.TCPEndpointID(e.TransportEndpointInfo.ID),
- SegTime: time.Now(),
+ SegTime: e.stack.Clock().NowMonotonic(),
Receiver: e.rcv.TCPReceiverState,
Sender: e.snd.TCPSenderState,
}
@@ -2954,7 +2955,7 @@ func (e *endpoint) completeStateLocked() stack.TCPEndpointState {
if cubic, ok := e.snd.cc.(*cubicState); ok {
s.Sender.Cubic = cubic.TCPCubicState
- s.Sender.Cubic.TimeSinceLastCongestion = time.Since(s.Sender.Cubic.T)
+ s.Sender.Cubic.TimeSinceLastCongestion = e.stack.Clock().NowMonotonic().Sub(s.Sender.Cubic.T)
}
s.Sender.RACKState = e.snd.rc.TCPRACKState
@@ -3046,14 +3047,16 @@ func GetTCPSendBufferLimits(s tcpip.StackHandler) tcpip.SendBufferSizeOption {
// allowOutOfWindowAck returns true if an out-of-window ACK can be sent now.
func (e *endpoint) allowOutOfWindowAck() bool {
- var limit stack.TCPInvalidRateLimitOption
- if err := e.stack.Option(&limit); err != nil {
- panic(fmt.Sprintf("e.stack.Option(%+v) failed with error: %s", limit, err))
- }
+ now := e.stack.Clock().NowMonotonic()
- now := time.Now()
- if now.Sub(e.lastOutOfWindowAckTime) < time.Duration(limit) {
- return false
+ if e.lastOutOfWindowAckTime != (tcpip.MonotonicTime{}) {
+ var limit stack.TCPInvalidRateLimitOption
+ if err := e.stack.Option(&limit); err != nil {
+ panic(fmt.Sprintf("e.stack.Option(%+v) failed with error: %s", limit, err))
+ }
+ if now.Sub(e.lastOutOfWindowAckTime) < time.Duration(limit) {
+ return false
+ }
}
e.lastOutOfWindowAckTime = now
diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go
index a56d34dc5..952ccacdd 100644
--- a/pkg/tcpip/transport/tcp/endpoint_state.go
+++ b/pkg/tcpip/transport/tcp/endpoint_state.go
@@ -158,12 +158,17 @@ func (e *endpoint) afterLoad() {
// Condition variables and mutexs are not S/R'ed so reinitialize
// acceptCond with e.acceptMu.
e.acceptCond = sync.NewCond(&e.acceptMu)
- e.keepalive.timer.init(&e.keepalive.waker)
stack.StackFromEnv.RegisterRestoredEndpoint(e)
}
// Resume implements tcpip.ResumableEndpoint.Resume.
func (e *endpoint) Resume(s *stack.Stack) {
+ e.keepalive.timer.init(s.Clock(), &e.keepalive.waker)
+ if snd := e.snd; snd != nil {
+ snd.resendTimer.init(s.Clock(), &snd.resendWaker)
+ snd.reorderTimer.init(s.Clock(), &snd.reorderWaker)
+ snd.probeTimer.init(s.Clock(), &snd.probeWaker)
+ }
e.stack = s
e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits, GetTCPReceiveBufferLimits)
e.segmentQueue.thaw()
@@ -290,23 +295,3 @@ func (e *endpoint) Resume(s *stack.Stack) {
tcpip.DeleteDanglingEndpoint(e)
}
}
-
-// saveRecentTSTime is invoked by stateify.
-func (e *endpoint) saveRecentTSTime() unixTime {
- return unixTime{e.recentTSTime.Unix(), e.recentTSTime.UnixNano()}
-}
-
-// loadRecentTSTime is invoked by stateify.
-func (e *endpoint) loadRecentTSTime(unix unixTime) {
- e.recentTSTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveLastOutOfWindowAckTime is invoked by stateify.
-func (e *endpoint) saveLastOutOfWindowAckTime() unixTime {
- return unixTime{e.lastOutOfWindowAckTime.Unix(), e.lastOutOfWindowAckTime.UnixNano()}
-}
-
-// loadLastOutOfWindowAckTime is invoked by stateify.
-func (e *endpoint) loadLastOutOfWindowAckTime(unix unixTime) {
- e.lastOutOfWindowAckTime = time.Unix(unix.second, unix.nano)
-}
diff --git a/pkg/tcpip/transport/tcp/forwarder.go b/pkg/tcpip/transport/tcp/forwarder.go
index 2f9fe7ee0..65c86823a 100644
--- a/pkg/tcpip/transport/tcp/forwarder.go
+++ b/pkg/tcpip/transport/tcp/forwarder.go
@@ -65,7 +65,7 @@ func NewForwarder(s *stack.Stack, rcvWnd, maxInFlight int, handler func(*Forward
// This function is expected to be passed as an argument to the
// stack.SetTransportProtocolHandler function.
func (f *Forwarder) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketBuffer) bool {
- s := newIncomingSegment(id, pkt)
+ s := newIncomingSegment(id, f.stack.Clock(), pkt)
defer s.decRef()
// We only care about well-formed SYN packets.
diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go
index d43e21426..c970dcecc 100644
--- a/pkg/tcpip/transport/tcp/protocol.go
+++ b/pkg/tcpip/transport/tcp/protocol.go
@@ -131,7 +131,7 @@ func (*protocol) ParsePorts(v buffer.View) (src, dst uint16, err tcpip.Error) {
// goroutine which is responsible for dequeuing and doing full TCP dispatch of
// the packet.
func (p *protocol) QueuePacket(ep stack.TransportEndpoint, id stack.TransportEndpointID, pkt *stack.PacketBuffer) {
- p.dispatcher.queuePacket(ep, id, pkt)
+ p.dispatcher.queuePacket(ep, id, p.stack.Clock(), pkt)
}
// HandleUnknownDestinationPacket handles packets targeted at this protocol but
@@ -142,7 +142,7 @@ func (p *protocol) QueuePacket(ep stack.TransportEndpoint, id stack.TransportEnd
// particular, SYNs addressed to a non-existent connection are rejected by this
// means."
func (p *protocol) HandleUnknownDestinationPacket(id stack.TransportEndpointID, pkt *stack.PacketBuffer) stack.UnknownDestinationPacketDisposition {
- s := newIncomingSegment(id, pkt)
+ s := newIncomingSegment(id, p.stack.Clock(), pkt)
defer s.decRef()
if !s.parse(pkt.RXTransportChecksumValidated) || !s.csumValid {
diff --git a/pkg/tcpip/transport/tcp/rack.go b/pkg/tcpip/transport/tcp/rack.go
index 813a6dffd..0da4eafaa 100644
--- a/pkg/tcpip/transport/tcp/rack.go
+++ b/pkg/tcpip/transport/tcp/rack.go
@@ -79,7 +79,7 @@ func (rc *rackControl) init(snd *sender, iss seqnum.Value) {
// update will update the RACK related fields when an ACK has been received.
// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-09#section-6.2
func (rc *rackControl) update(seg *segment, ackSeg *segment) {
- rtt := time.Now().Sub(seg.xmitTime)
+ rtt := rc.snd.ep.stack.Clock().NowMonotonic().Sub(seg.xmitTime)
tsOffset := rc.snd.ep.TSOffset
// If the ACK is for a retransmitted packet, do not update if it is a
@@ -115,7 +115,7 @@ func (rc *rackControl) update(seg *segment, ackSeg *segment) {
// ending sequence number of the packet which has been acknowledged
// most recently.
endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size()))
- if rc.XmitTime.Before(seg.xmitTime) || (seg.xmitTime.Equal(rc.XmitTime) && rc.EndSequence.LessThan(endSeq)) {
+ if rc.XmitTime.Before(seg.xmitTime) || (seg.xmitTime == rc.XmitTime && rc.EndSequence.LessThan(endSeq)) {
rc.XmitTime = seg.xmitTime
rc.EndSequence = endSeq
}
@@ -174,7 +174,7 @@ func (s *sender) schedulePTO() {
}
s.rtt.Unlock()
- now := time.Now()
+ now := s.ep.stack.Clock().NowMonotonic()
if s.resendTimer.enabled() {
if now.Add(pto).After(s.resendTimer.target) {
pto = s.resendTimer.target.Sub(now)
@@ -352,7 +352,7 @@ func (rc *rackControl) exitRecovery() {
// detectLoss marks the segment as lost if the reordering window has elapsed
// and the ACK is not received. It will also arm the reorder timer.
// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 Step 5.
-func (rc *rackControl) detectLoss(rcvTime time.Time) int {
+func (rc *rackControl) detectLoss(rcvTime tcpip.MonotonicTime) int {
var timeout time.Duration
numLost := 0
for seg := rc.snd.writeList.Front(); seg != nil && seg.xmitCount != 0; seg = seg.Next() {
@@ -366,7 +366,7 @@ func (rc *rackControl) detectLoss(rcvTime time.Time) int {
}
endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size()))
- if seg.xmitTime.Before(rc.XmitTime) || (seg.xmitTime.Equal(rc.XmitTime) && rc.EndSequence.LessThan(endSeq)) {
+ if seg.xmitTime.Before(rc.XmitTime) || (seg.xmitTime == rc.XmitTime && rc.EndSequence.LessThan(endSeq)) {
timeRemaining := seg.xmitTime.Sub(rcvTime) + rc.RTT + rc.ReoWnd
if timeRemaining <= 0 {
seg.lost = true
@@ -392,7 +392,7 @@ func (rc *rackControl) reorderTimerExpired() tcpip.Error {
return nil
}
- numLost := rc.detectLoss(time.Now())
+ numLost := rc.detectLoss(rc.snd.ep.stack.Clock().NowMonotonic())
if numLost == 0 {
return nil
}
diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go
index 4fd8a0624..b46f8320e 100644
--- a/pkg/tcpip/transport/tcp/rcv.go
+++ b/pkg/tcpip/transport/tcp/rcv.go
@@ -17,7 +17,6 @@ package tcp
import (
"container/heap"
"math"
- "time"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/header"
@@ -50,7 +49,7 @@ type receiver struct {
pendingRcvdSegments segmentHeap
// Time when the last ack was received.
- lastRcvdAckTime time.Time `state:".(unixTime)"`
+ lastRcvdAckTime tcpip.MonotonicTime
}
func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8) *receiver {
@@ -63,7 +62,7 @@ func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale
},
rcvWnd: rcvWnd,
rcvWUP: irs + 1,
- lastRcvdAckTime: time.Now(),
+ lastRcvdAckTime: ep.stack.Clock().NowMonotonic(),
}
}
@@ -325,9 +324,9 @@ func (r *receiver) updateRTT() {
// is first acknowledged and the receipt of data that is at least one
// window beyond the sequence number that was acknowledged.
r.ep.rcvQueueInfo.rcvQueueMu.Lock()
- if r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime.IsZero() {
+ if r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime == (tcpip.MonotonicTime{}) {
// New measurement.
- r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = time.Now()
+ r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = r.ep.stack.Clock().NowMonotonic()
r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureSeqNumber = r.RcvNxt.Add(r.rcvWnd)
r.ep.rcvQueueInfo.rcvQueueMu.Unlock()
return
@@ -336,14 +335,14 @@ func (r *receiver) updateRTT() {
r.ep.rcvQueueInfo.rcvQueueMu.Unlock()
return
}
- rtt := time.Since(r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime)
+ rtt := r.ep.stack.Clock().NowMonotonic().Sub(r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime)
// We only store the minimum observed RTT here as this is only used in
// absence of a SRTT available from either timestamps or a sender
// measurement of RTT.
if r.ep.rcvQueueInfo.RcvAutoParams.RTT == 0 || rtt < r.ep.rcvQueueInfo.RcvAutoParams.RTT {
r.ep.rcvQueueInfo.RcvAutoParams.RTT = rtt
}
- r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = time.Now()
+ r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = r.ep.stack.Clock().NowMonotonic()
r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureSeqNumber = r.RcvNxt.Add(r.rcvWnd)
r.ep.rcvQueueInfo.rcvQueueMu.Unlock()
}
@@ -467,7 +466,7 @@ func (r *receiver) handleRcvdSegment(s *segment) (drop bool, err tcpip.Error) {
}
// Store the time of the last ack.
- r.lastRcvdAckTime = time.Now()
+ r.lastRcvdAckTime = r.ep.stack.Clock().NowMonotonic()
// Defer segment processing if it can't be consumed now.
if !r.consumeSegment(s, segSeq, segLen) {
diff --git a/pkg/tcpip/transport/tcp/rcv_state.go b/pkg/tcpip/transport/tcp/rcv_state.go
deleted file mode 100644
index 2bf21a2e7..000000000
--- a/pkg/tcpip/transport/tcp/rcv_state.go
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2019 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package tcp
-
-import (
- "time"
-)
-
-// saveLastRcvdAckTime is invoked by stateify.
-func (r *receiver) saveLastRcvdAckTime() unixTime {
- return unixTime{r.lastRcvdAckTime.Unix(), r.lastRcvdAckTime.UnixNano()}
-}
-
-// loadLastRcvdAckTime is invoked by stateify.
-func (r *receiver) loadLastRcvdAckTime(unix unixTime) {
- r.lastRcvdAckTime = time.Unix(unix.second, unix.nano)
-}
diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go
index 61754de29..e5c0d52fa 100644
--- a/pkg/tcpip/transport/tcp/segment.go
+++ b/pkg/tcpip/transport/tcp/segment.go
@@ -17,7 +17,6 @@ package tcp
import (
"fmt"
"sync/atomic"
- "time"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
@@ -73,9 +72,9 @@ type segment struct {
parsedOptions header.TCPOptions
options []byte `state:".([]byte)"`
hasNewSACKInfo bool
- rcvdTime time.Time `state:".(unixTime)"`
+ rcvdTime tcpip.MonotonicTime
// xmitTime is the last transmit time of this segment.
- xmitTime time.Time `state:".(unixTime)"`
+ xmitTime tcpip.MonotonicTime
xmitCount uint32
// acked indicates if the segment has already been SACKed.
@@ -88,7 +87,7 @@ type segment struct {
lost bool
}
-func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) *segment {
+func newIncomingSegment(id stack.TransportEndpointID, clock tcpip.Clock, pkt *stack.PacketBuffer) *segment {
netHdr := pkt.Network()
s := &segment{
refCnt: 1,
@@ -100,17 +99,17 @@ func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) *
}
s.data = pkt.Data().ExtractVV().Clone(s.views[:])
s.hdr = header.TCP(pkt.TransportHeader().View())
- s.rcvdTime = time.Now()
+ s.rcvdTime = clock.NowMonotonic()
s.dataMemSize = s.data.Size()
return s
}
-func newOutgoingSegment(id stack.TransportEndpointID, v buffer.View) *segment {
+func newOutgoingSegment(id stack.TransportEndpointID, clock tcpip.Clock, v buffer.View) *segment {
s := &segment{
refCnt: 1,
id: id,
}
- s.rcvdTime = time.Now()
+ s.rcvdTime = clock.NowMonotonic()
if len(v) != 0 {
s.views[0] = v
s.data = buffer.NewVectorisedView(len(v), s.views[:1])
diff --git a/pkg/tcpip/transport/tcp/segment_state.go b/pkg/tcpip/transport/tcp/segment_state.go
index 7422d8c02..dcfa80f95 100644
--- a/pkg/tcpip/transport/tcp/segment_state.go
+++ b/pkg/tcpip/transport/tcp/segment_state.go
@@ -15,8 +15,6 @@
package tcp
import (
- "time"
-
"gvisor.dev/gvisor/pkg/tcpip/buffer"
)
@@ -55,23 +53,3 @@ func (s *segment) loadOptions(options []byte) {
// allocated so there is no cost here.
s.options = options
}
-
-// saveRcvdTime is invoked by stateify.
-func (s *segment) saveRcvdTime() unixTime {
- return unixTime{s.rcvdTime.Unix(), s.rcvdTime.UnixNano()}
-}
-
-// loadRcvdTime is invoked by stateify.
-func (s *segment) loadRcvdTime(unix unixTime) {
- s.rcvdTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveXmitTime is invoked by stateify.
-func (s *segment) saveXmitTime() unixTime {
- return unixTime{s.rcvdTime.Unix(), s.rcvdTime.UnixNano()}
-}
-
-// loadXmitTime is invoked by stateify.
-func (s *segment) loadXmitTime(unix unixTime) {
- s.rcvdTime = time.Unix(unix.second, unix.nano)
-}
diff --git a/pkg/tcpip/transport/tcp/segment_test.go b/pkg/tcpip/transport/tcp/segment_test.go
index 486016fc0..2e6ea06f5 100644
--- a/pkg/tcpip/transport/tcp/segment_test.go
+++ b/pkg/tcpip/transport/tcp/segment_test.go
@@ -19,6 +19,7 @@ import (
"github.com/google/go-cmp/cmp"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/faketime"
"gvisor.dev/gvisor/pkg/tcpip/stack"
)
@@ -39,10 +40,11 @@ func checkSegmentSize(t *testing.T, name string, seg *segment, want segmentSizeW
}
func TestSegmentMerge(t *testing.T) {
+ var clock faketime.NullClock
id := stack.TransportEndpointID{}
- seg1 := newOutgoingSegment(id, buffer.NewView(10))
+ seg1 := newOutgoingSegment(id, &clock, buffer.NewView(10))
defer seg1.decRef()
- seg2 := newOutgoingSegment(id, buffer.NewView(20))
+ seg2 := newOutgoingSegment(id, &clock, buffer.NewView(20))
defer seg2.decRef()
checkSegmentSize(t, "seg1", seg1, segmentSizeWants{
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index 6b7293755..ccff9ca8d 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -94,7 +94,7 @@ type sender struct {
// firstRetransmittedSegXmitTime is the original transmit time of
// the first segment that was retransmitted due to RTO expiration.
- firstRetransmittedSegXmitTime time.Time `state:".(unixTime)"`
+ firstRetransmittedSegXmitTime tcpip.MonotonicTime
// zeroWindowProbing is set if the sender is currently probing
// for zero receive window.
@@ -169,7 +169,7 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint
SndUna: iss + 1,
SndNxt: iss + 1,
RTTMeasureSeqNum: iss + 1,
- LastSendTime: time.Now(),
+ LastSendTime: ep.stack.Clock().NowMonotonic(),
MaxPayloadSize: maxPayloadSize,
MaxSentAck: irs + 1,
FastRecovery: stack.TCPFastRecoveryState{
@@ -197,9 +197,9 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint
s.SndWndScale = uint8(sndWndScale)
}
- s.resendTimer.init(&s.resendWaker)
- s.reorderTimer.init(&s.reorderWaker)
- s.probeTimer.init(&s.probeWaker)
+ s.resendTimer.init(s.ep.stack.Clock(), &s.resendWaker)
+ s.reorderTimer.init(s.ep.stack.Clock(), &s.reorderWaker)
+ s.probeTimer.init(s.ep.stack.Clock(), &s.probeWaker)
s.updateMaxPayloadSize(int(ep.route.MTU()), 0)
@@ -441,7 +441,7 @@ func (s *sender) retransmitTimerExpired() bool {
// timeout since the first retransmission.
uto := s.ep.userTimeout
- if s.firstRetransmittedSegXmitTime.IsZero() {
+ if s.firstRetransmittedSegXmitTime == (tcpip.MonotonicTime{}) {
// We store the original xmitTime of the segment that we are
// about to retransmit as the retransmission time. This is
// required as by the time the retransmitTimer has expired the
@@ -450,7 +450,7 @@ func (s *sender) retransmitTimerExpired() bool {
s.firstRetransmittedSegXmitTime = s.writeList.Front().xmitTime
}
- elapsed := time.Since(s.firstRetransmittedSegXmitTime)
+ elapsed := s.ep.stack.Clock().NowMonotonic().Sub(s.firstRetransmittedSegXmitTime)
remaining := s.maxRTO
if uto != 0 {
// Cap to the user specified timeout if one is specified.
@@ -866,8 +866,8 @@ func (s *sender) enableZeroWindowProbing() {
// We piggyback the probing on the retransmit timer with the
// current retranmission interval, as we may start probing while
// segment retransmissions.
- if s.firstRetransmittedSegXmitTime.IsZero() {
- s.firstRetransmittedSegXmitTime = time.Now()
+ if s.firstRetransmittedSegXmitTime == (tcpip.MonotonicTime{}) {
+ s.firstRetransmittedSegXmitTime = s.ep.stack.Clock().NowMonotonic()
}
s.resendTimer.enable(s.RTO)
}
@@ -875,7 +875,7 @@ func (s *sender) enableZeroWindowProbing() {
func (s *sender) disableZeroWindowProbing() {
s.zeroWindowProbing = false
s.unackZeroWindowProbes = 0
- s.firstRetransmittedSegXmitTime = time.Time{}
+ s.firstRetransmittedSegXmitTime = tcpip.MonotonicTime{}
s.resendTimer.disable()
}
@@ -925,7 +925,7 @@ func (s *sender) sendData() {
// "A TCP SHOULD set cwnd to no more than RW before beginning
// transmission if the TCP has not sent data in the interval exceeding
// the retrasmission timeout."
- if !s.FastRecovery.Active && s.state != tcpip.RTORecovery && time.Now().Sub(s.LastSendTime) > s.RTO {
+ if !s.FastRecovery.Active && s.state != tcpip.RTORecovery && s.ep.stack.Clock().NowMonotonic().Sub(s.LastSendTime) > s.RTO {
if s.SndCwnd > InitialCwnd {
s.SndCwnd = InitialCwnd
}
@@ -1234,7 +1234,7 @@ func checkDSACK(rcvdSeg *segment) bool {
func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
// Check if we can extract an RTT measurement from this ack.
if !rcvdSeg.parsedOptions.TS && s.RTTMeasureSeqNum.LessThan(rcvdSeg.ackNumber) {
- s.updateRTO(time.Now().Sub(s.RTTMeasureTime))
+ s.updateRTO(s.ep.stack.Clock().NowMonotonic().Sub(s.RTTMeasureTime))
s.RTTMeasureSeqNum = s.SndNxt
}
@@ -1444,7 +1444,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
if s.SndUna == s.SndNxt {
s.Outstanding = 0
// Reset firstRetransmittedSegXmitTime to the zero value.
- s.firstRetransmittedSegXmitTime = time.Time{}
+ s.firstRetransmittedSegXmitTime = tcpip.MonotonicTime{}
s.resendTimer.disable()
s.probeTimer.disable()
}
@@ -1502,7 +1502,7 @@ func (s *sender) sendSegment(seg *segment) tcpip.Error {
s.ep.stack.Stats().TCP.SlowStartRetransmits.Increment()
}
}
- seg.xmitTime = time.Now()
+ seg.xmitTime = s.ep.stack.Clock().NowMonotonic()
seg.xmitCount++
seg.lost = false
err := s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber)
@@ -1527,7 +1527,7 @@ func (s *sender) sendSegment(seg *segment) tcpip.Error {
// sendSegmentFromView sends a new segment containing the given payload, flags
// and sequence number.
func (s *sender) sendSegmentFromView(data buffer.VectorisedView, flags header.TCPFlags, seq seqnum.Value) tcpip.Error {
- s.LastSendTime = time.Now()
+ s.LastSendTime = s.ep.stack.Clock().NowMonotonic()
if seq == s.RTTMeasureSeqNum {
s.RTTMeasureTime = s.LastSendTime
}
diff --git a/pkg/tcpip/transport/tcp/snd_state.go b/pkg/tcpip/transport/tcp/snd_state.go
deleted file mode 100644
index 2f805d8ce..000000000
--- a/pkg/tcpip/transport/tcp/snd_state.go
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package tcp
-
-import (
- "time"
-)
-
-// +stateify savable
-type unixTime struct {
- second int64
- nano int64
-}
-
-// afterLoad is invoked by stateify.
-func (s *sender) afterLoad() {
- s.resendTimer.init(&s.resendWaker)
- s.reorderTimer.init(&s.reorderWaker)
- s.probeTimer.init(&s.probeWaker)
-}
-
-// saveFirstRetransmittedSegXmitTime is invoked by stateify.
-func (s *sender) saveFirstRetransmittedSegXmitTime() unixTime {
- return unixTime{s.firstRetransmittedSegXmitTime.Unix(), s.firstRetransmittedSegXmitTime.UnixNano()}
-}
-
-// loadFirstRetransmittedSegXmitTime is invoked by stateify.
-func (s *sender) loadFirstRetransmittedSegXmitTime(unix unixTime) {
- s.firstRetransmittedSegXmitTime = time.Unix(unix.second, unix.nano)
-}
diff --git a/pkg/tcpip/transport/tcp/tcp_rack_test.go b/pkg/tcpip/transport/tcp/tcp_rack_test.go
index 29af87b7d..d6cf786a1 100644
--- a/pkg/tcpip/transport/tcp/tcp_rack_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_rack_test.go
@@ -50,7 +50,7 @@ func TestRACKUpdate(t *testing.T) {
c := context.New(t, uint32(mtu))
defer c.Cleanup()
- var xmitTime time.Time
+ var xmitTime tcpip.MonotonicTime
probeDone := make(chan struct{})
c.Stack().AddTCPProbe(func(state stack.TCPEndpointState) {
// Validate that the endpoint Sender.RACKState is what we expect.
@@ -79,7 +79,7 @@ func TestRACKUpdate(t *testing.T) {
}
// Write the data.
- xmitTime = time.Now()
+ xmitTime = c.Stack().Clock().NowMonotonic()
var r bytes.Reader
r.Reset(data)
if _, err := c.EP.Write(&r, tcpip.WriteOptions{}); err != nil {
diff --git a/pkg/tcpip/transport/tcp/timer.go b/pkg/tcpip/transport/tcp/timer.go
index 38a335840..5645c772e 100644
--- a/pkg/tcpip/transport/tcp/timer.go
+++ b/pkg/tcpip/transport/tcp/timer.go
@@ -15,21 +15,29 @@
package tcp
import (
+ "math"
"time"
"gvisor.dev/gvisor/pkg/sleep"
+ "gvisor.dev/gvisor/pkg/tcpip"
)
type timerState int
const (
+ // The timer is disabled.
timerStateDisabled timerState = iota
+ // The timer is enabled, but the clock timer may be set to an earlier
+ // expiration time due to a previous orphaned state.
timerStateEnabled
+ // The timer is disabled, but the clock timer is enabled, which means that
+ // it will cause a spurious wakeup unless the timer is enabled before the
+ // clock timer fires.
timerStateOrphaned
)
// timer is a timer implementation that reduces the interactions with the
-// runtime timer infrastructure by letting timers run (and potentially
+// clock timer infrastructure by letting timers run (and potentially
// eventually expire) even if they are stopped. It makes it cheaper to
// disable/reenable timers at the expense of spurious wakes. This is useful for
// cases when the same timer is disabled/reenabled repeatedly with relatively
@@ -39,44 +47,37 @@ const (
// (currently at least 200ms), and get disabled when acks are received, and
// reenabled when new pending segments are sent.
//
-// It is advantageous to avoid interacting with the runtime because it acquires
+// It is advantageous to avoid interacting with the clock because it acquires
// a global mutex and performs O(log n) operations, where n is the global number
// of timers, whenever a timer is enabled or disabled, and may make a syscall.
//
// This struct is thread-compatible.
type timer struct {
- // state is the current state of the timer, it can be one of the
- // following values:
- // disabled - the timer is disabled.
- // orphaned - the timer is disabled, but the runtime timer is
- // enabled, which means that it will evetually cause a
- // spurious wake (unless it gets enabled again before
- // then).
- // enabled - the timer is enabled, but the runtime timer may be set
- // to an earlier expiration time due to a previous
- // orphaned state.
state timerState
+ clock tcpip.Clock
+
// target is the expiration time of the current timer. It is only
// meaningful in the enabled state.
- target time.Time
+ target tcpip.MonotonicTime
- // runtimeTarget is the expiration time of the runtime timer. It is
+ // clockTarget is the expiration time of the clock timer. It is
// meaningful in the enabled and orphaned states.
- runtimeTarget time.Time
+ clockTarget tcpip.MonotonicTime
- // timer is the runtime timer used to wait on.
- timer *time.Timer
+ // timer is the clock timer used to wait on.
+ timer tcpip.Timer
}
// init initializes the timer. Once it expires, it the given waker will be
// asserted.
-func (t *timer) init(w *sleep.Waker) {
+func (t *timer) init(clock tcpip.Clock, w *sleep.Waker) {
t.state = timerStateDisabled
+ t.clock = clock
- // Initialize a runtime timer that will assert the waker, then
+ // Initialize a clock timer that will assert the waker, then
// immediately stop it.
- t.timer = time.AfterFunc(time.Hour, func() {
+ t.timer = t.clock.AfterFunc(math.MaxInt64, func() {
w.Assert()
})
t.timer.Stop()
@@ -106,9 +107,9 @@ func (t *timer) checkExpiration() bool {
// The timer is enabled, but it may have expired early. Check if that's
// the case, and if so, reset the runtime timer to the correct time.
- now := time.Now()
+ now := t.clock.NowMonotonic()
if now.Before(t.target) {
- t.runtimeTarget = t.target
+ t.clockTarget = t.target
t.timer.Reset(t.target.Sub(now))
return false
}
@@ -134,11 +135,11 @@ func (t *timer) enabled() bool {
// enable enables the timer, programming the runtime timer if necessary.
func (t *timer) enable(d time.Duration) {
- t.target = time.Now().Add(d)
+ t.target = t.clock.NowMonotonic().Add(d)
// Check if we need to set the runtime timer.
- if t.state == timerStateDisabled || t.target.Before(t.runtimeTarget) {
- t.runtimeTarget = t.target
+ if t.state == timerStateDisabled || t.target.Before(t.clockTarget) {
+ t.clockTarget = t.target
t.timer.Reset(d)
}
diff --git a/pkg/tcpip/transport/tcp/timer_test.go b/pkg/tcpip/transport/tcp/timer_test.go
index dbd6dff54..479752de7 100644
--- a/pkg/tcpip/transport/tcp/timer_test.go
+++ b/pkg/tcpip/transport/tcp/timer_test.go
@@ -19,6 +19,7 @@ import (
"time"
"gvisor.dev/gvisor/pkg/sleep"
+ "gvisor.dev/gvisor/pkg/tcpip/faketime"
)
func TestCleanup(t *testing.T) {
@@ -27,9 +28,11 @@ func TestCleanup(t *testing.T) {
isAssertedTimeoutSeconds = timerDurationSeconds + 1
)
+ clock := faketime.NewManualClock()
+
tmr := timer{}
w := sleep.Waker{}
- tmr.init(&w)
+ tmr.init(clock, &w)
tmr.enable(timerDurationSeconds * time.Second)
tmr.cleanup()
@@ -39,7 +42,7 @@ func TestCleanup(t *testing.T) {
// The waker should not be asserted.
for i := 0; i < isAssertedTimeoutSeconds; i++ {
- time.Sleep(time.Second)
+ clock.Advance(time.Second)
if w.IsAsserted() {
t.Fatalf("waker asserted unexpectedly")
}