summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/tcpip/faketime/BUILD5
-rw-r--r--pkg/tcpip/faketime/faketime.go318
2 files changed, 221 insertions, 102 deletions
diff --git a/pkg/tcpip/faketime/BUILD b/pkg/tcpip/faketime/BUILD
index 114d43df3..bb9d44aff 100644
--- a/pkg/tcpip/faketime/BUILD
+++ b/pkg/tcpip/faketime/BUILD
@@ -6,10 +6,7 @@ go_library(
name = "faketime",
srcs = ["faketime.go"],
visibility = ["//visibility:public"],
- deps = [
- "//pkg/tcpip",
- "@com_github_dpjacques_clockwork//:go_default_library",
- ],
+ deps = ["//pkg/tcpip"],
)
go_test(
diff --git a/pkg/tcpip/faketime/faketime.go b/pkg/tcpip/faketime/faketime.go
index f7a4fbde1..fb819d7a8 100644
--- a/pkg/tcpip/faketime/faketime.go
+++ b/pkg/tcpip/faketime/faketime.go
@@ -17,10 +17,10 @@ package faketime
import (
"container/heap"
+ "fmt"
"sync"
"time"
- "github.com/dpjacques/clockwork"
"gvisor.dev/gvisor/pkg/tcpip"
)
@@ -44,38 +44,85 @@ func (*NullClock) AfterFunc(time.Duration, func()) tcpip.Timer {
return nil
}
+type notificationChannels struct {
+ mu struct {
+ sync.Mutex
+
+ ch []<-chan struct{}
+ }
+}
+
+func (n *notificationChannels) add(ch <-chan struct{}) {
+ n.mu.Lock()
+ defer n.mu.Unlock()
+ n.mu.ch = append(n.mu.ch, ch)
+}
+
+// wait returns once all the notification channels are readable.
+//
+// Channels that are added while waiting on existing channels will be waited on
+// as well.
+func (n *notificationChannels) wait() {
+ for {
+ n.mu.Lock()
+ ch := n.mu.ch
+ n.mu.ch = nil
+ n.mu.Unlock()
+
+ if len(ch) == 0 {
+ break
+ }
+
+ for _, c := range ch {
+ <-c
+ }
+ }
+}
+
// ManualClock implements tcpip.Clock and only advances manually with Advance
// method.
type ManualClock struct {
- clock clockwork.FakeClock
+ // runningTimers tracks the completion of timer callbacks that began running
+ // immediately upon their scheduling. It is used to ensure the proper ordering
+ // of timer callback dispatch.
+ runningTimers notificationChannels
+
+ mu struct {
+ sync.RWMutex
- // mu protects the fields below.
- mu sync.RWMutex
+ // now is the current (fake) time of the clock.
+ now time.Time
- // times is min-heap of times. A heap is used for quick retrieval of the next
- // upcoming time of scheduled work.
- times *timeHeap
+ // times is min-heap of times.
+ times timeHeap
- // waitGroups stores one WaitGroup for all work scheduled to execute at the
- // same time via AfterFunc. This allows parallel execution of all functions
- // passed to AfterFunc scheduled for the same time.
- waitGroups map[time.Time]*sync.WaitGroup
+ // timers holds the timers scheduled for each time.
+ timers map[time.Time]map[*manualTimer]struct{}
+ }
}
// NewManualClock creates a new ManualClock instance.
func NewManualClock() *ManualClock {
- return &ManualClock{
- clock: clockwork.NewFakeClock(),
- times: &timeHeap{},
- waitGroups: make(map[time.Time]*sync.WaitGroup),
- }
+ c := &ManualClock{}
+
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ // Set the initial time to a non-zero value since the zero value is used to
+ // detect inactive timers.
+ c.mu.now = time.Unix(0, 0)
+ c.mu.timers = make(map[time.Time]map[*manualTimer]struct{})
+
+ return c
}
var _ tcpip.Clock = (*ManualClock)(nil)
// NowNanoseconds implements tcpip.Clock.NowNanoseconds.
func (mc *ManualClock) NowNanoseconds() int64 {
- return mc.clock.Now().UnixNano()
+ mc.mu.RLock()
+ defer mc.mu.RUnlock()
+ return mc.mu.now.UnixNano()
}
// NowMonotonic implements tcpip.Clock.NowMonotonic.
@@ -85,128 +132,203 @@ func (mc *ManualClock) NowMonotonic() int64 {
// AfterFunc implements tcpip.Clock.AfterFunc.
func (mc *ManualClock) AfterFunc(d time.Duration, f func()) tcpip.Timer {
- until := mc.clock.Now().Add(d)
- wg := mc.addWait(until)
- return &manualTimer{
+ mt := &manualTimer{
clock: mc,
- until: until,
- timer: mc.clock.AfterFunc(d, func() {
- defer wg.Done()
- f()
- }),
+ f: f,
}
-}
-// addWait adds an additional wait to the WaitGroup for parallel execution of
-// all work scheduled for t. Returns a reference to the WaitGroup modified.
-func (mc *ManualClock) addWait(t time.Time) *sync.WaitGroup {
- mc.mu.RLock()
- wg, ok := mc.waitGroups[t]
- mc.mu.RUnlock()
+ mc.mu.Lock()
+ defer mc.mu.Unlock()
+
+ mt.mu.Lock()
+ defer mt.mu.Unlock()
- if ok {
- wg.Add(1)
- return wg
+ mc.resetTimerLocked(mt, d)
+ return mt
+}
+
+// resetTimerLocked schedules a timer to be fired after the given duration.
+//
+// Precondition: mc.mu and mt.mu must be locked.
+func (mc *ManualClock) resetTimerLocked(mt *manualTimer, d time.Duration) {
+ if !mt.mu.firesAt.IsZero() {
+ panic("tried to reset an active timer")
}
- mc.mu.Lock()
- heap.Push(mc.times, t)
- mc.mu.Unlock()
+ t := mc.mu.now.Add(d)
- wg = &sync.WaitGroup{}
- wg.Add(1)
+ if !mc.mu.now.Before(t) {
+ // If the timer is scheduled to fire immediately, call its callback
+ // in a new goroutine immediately.
+ //
+ // It needs to be called in its own goroutine to escape its current
+ // execution context - like an actual timer.
+ ch := make(chan struct{})
+ mc.runningTimers.add(ch)
- mc.mu.Lock()
- mc.waitGroups[t] = wg
- mc.mu.Unlock()
+ go func() {
+ defer close(ch)
+
+ mt.f()
+ }()
- return wg
+ return
+ }
+
+ mt.mu.firesAt = t
+
+ timers, ok := mc.mu.timers[t]
+ if !ok {
+ timers = make(map[*manualTimer]struct{})
+ mc.mu.timers[t] = timers
+ heap.Push(&mc.mu.times, t)
+ }
+
+ timers[mt] = struct{}{}
}
-// removeWait removes a wait from the WaitGroup for parallel execution of all
-// work scheduled for t.
-func (mc *ManualClock) removeWait(t time.Time) {
- mc.mu.RLock()
- defer mc.mu.RUnlock()
+// stopTimerLocked stops a timer from firing.
+//
+// Precondition: mc.mu and mt.mu must be locked.
+func (mc *ManualClock) stopTimerLocked(mt *manualTimer) {
+ t := mt.mu.firesAt
+ mt.mu.firesAt = time.Time{}
+
+ if t.IsZero() {
+ panic("tried to stop an inactive timer")
+ }
- wg := mc.waitGroups[t]
- wg.Done()
+ timers, ok := mc.mu.timers[t]
+ if !ok {
+ err := fmt.Sprintf("tried to stop an active timer but the clock does not have anything scheduled for the timer @ t = %s %p\nScheduled timers @:", t.UTC(), mt)
+ for t := range mc.mu.timers {
+ err += fmt.Sprintf("%s\n", t.UTC())
+ }
+ panic(err)
+ }
+
+ if _, ok := timers[mt]; !ok {
+ panic(fmt.Sprintf("did not have an entry in timers for an active timer @ t = %s", t.UTC()))
+ }
+
+ delete(timers, mt)
+
+ if len(timers) == 0 {
+ delete(mc.mu.timers, t)
+ }
}
// Advance executes all work that have been scheduled to execute within d from
-// the current time. Blocks until all work has completed execution.
+// the current time. Blocks until all work has completed execution.
func (mc *ManualClock) Advance(d time.Duration) {
- // Block until all the work is done
- until := mc.clock.Now().Add(d)
- for {
- mc.mu.Lock()
- if mc.times.Len() == 0 {
- mc.mu.Unlock()
- break
- }
+ // We spawn goroutines for timers that were scheduled to fire at the time of
+ // being reset. Wait for those goroutines to complete before proceeding so
+ // that timer callbacks are called in the right order.
+ mc.runningTimers.wait()
- t := heap.Pop(mc.times).(time.Time)
+ mc.mu.Lock()
+ defer mc.mu.Unlock()
+
+ until := mc.mu.now.Add(d)
+ for mc.mu.times.Len() > 0 {
+ t := heap.Pop(&mc.mu.times).(time.Time)
if t.After(until) {
// No work to do
- heap.Push(mc.times, t)
- mc.mu.Unlock()
+ heap.Push(&mc.mu.times, t)
break
}
- mc.mu.Unlock()
- diff := t.Sub(mc.clock.Now())
- mc.clock.Advance(diff)
+ timers := mc.mu.timers[t]
+ delete(mc.mu.timers, t)
+
+ mc.mu.now = t
+
+ // Mark the timers as inactive since they will be fired.
+ //
+ // This needs to be done while holding mc's lock because we remove the entry
+ // in the map of timers for the current time. If an attempt to stop a
+ // timer is made after mc's lock was dropped but before the timer is
+ // marked inactive, we would panic since no entry exists for the time when
+ // the timer was expected to fire.
+ for mt := range timers {
+ mt.mu.Lock()
+ mt.mu.firesAt = time.Time{}
+ mt.mu.Unlock()
+ }
- mc.mu.RLock()
- wg := mc.waitGroups[t]
- mc.mu.RUnlock()
+ // Release the lock before calling the timer's callback fn since the
+ // callback fn might try to schedule a timer which requires obtaining
+ // mc's lock.
+ mc.mu.Unlock()
- wg.Wait()
+ for mt := range timers {
+ mt.f()
+ }
+ // The timer callbacks may have scheduled a timer to fire immediately.
+ // We spawn goroutines for these timers and need to wait for them to
+ // finish before proceeding so that timer callbacks are called in the
+ // right order.
+ mc.runningTimers.wait()
mc.mu.Lock()
- delete(mc.waitGroups, t)
- mc.mu.Unlock()
}
- if now := mc.clock.Now(); until.After(now) {
- mc.clock.Advance(until.Sub(now))
+
+ mc.mu.now = until
+}
+
+func (mc *ManualClock) resetTimer(mt *manualTimer, d time.Duration) {
+ mc.mu.Lock()
+ defer mc.mu.Unlock()
+
+ mt.mu.Lock()
+ defer mt.mu.Unlock()
+
+ if !mt.mu.firesAt.IsZero() {
+ mc.stopTimerLocked(mt)
}
+
+ mc.resetTimerLocked(mt, d)
+}
+
+func (mc *ManualClock) stopTimer(mt *manualTimer) bool {
+ mc.mu.Lock()
+ defer mc.mu.Unlock()
+
+ mt.mu.Lock()
+ defer mt.mu.Unlock()
+
+ if mt.mu.firesAt.IsZero() {
+ return false
+ }
+
+ mc.stopTimerLocked(mt)
+ return true
}
type manualTimer struct {
clock *ManualClock
- timer clockwork.Timer
+ f func()
- mu sync.RWMutex
- until time.Time
+ mu struct {
+ sync.Mutex
+
+ // firesAt is the time when the timer will fire.
+ //
+ // Zero only when the timer is not active.
+ firesAt time.Time
+ }
}
var _ tcpip.Timer = (*manualTimer)(nil)
// Reset implements tcpip.Timer.Reset.
-func (t *manualTimer) Reset(d time.Duration) {
- if !t.timer.Reset(d) {
- return
- }
-
- t.mu.Lock()
- defer t.mu.Unlock()
-
- t.clock.removeWait(t.until)
- t.until = t.clock.clock.Now().Add(d)
- t.clock.addWait(t.until)
+func (mt *manualTimer) Reset(d time.Duration) {
+ mt.clock.resetTimer(mt, d)
}
// Stop implements tcpip.Timer.Stop.
-func (t *manualTimer) Stop() bool {
- if !t.timer.Stop() {
- return false
- }
-
- t.mu.RLock()
- defer t.mu.RUnlock()
-
- t.clock.removeWait(t.until)
- return true
+func (mt *manualTimer) Stop() bool {
+ return mt.clock.stopTimer(mt)
}
type timeHeap []time.Time