diff options
-rw-r--r-- | pkg/tcpip/faketime/BUILD | 5 | ||||
-rw-r--r-- | pkg/tcpip/faketime/faketime.go | 318 |
2 files changed, 221 insertions, 102 deletions
diff --git a/pkg/tcpip/faketime/BUILD b/pkg/tcpip/faketime/BUILD index 114d43df3..bb9d44aff 100644 --- a/pkg/tcpip/faketime/BUILD +++ b/pkg/tcpip/faketime/BUILD @@ -6,10 +6,7 @@ go_library( name = "faketime", srcs = ["faketime.go"], visibility = ["//visibility:public"], - deps = [ - "//pkg/tcpip", - "@com_github_dpjacques_clockwork//:go_default_library", - ], + deps = ["//pkg/tcpip"], ) go_test( diff --git a/pkg/tcpip/faketime/faketime.go b/pkg/tcpip/faketime/faketime.go index f7a4fbde1..fb819d7a8 100644 --- a/pkg/tcpip/faketime/faketime.go +++ b/pkg/tcpip/faketime/faketime.go @@ -17,10 +17,10 @@ package faketime import ( "container/heap" + "fmt" "sync" "time" - "github.com/dpjacques/clockwork" "gvisor.dev/gvisor/pkg/tcpip" ) @@ -44,38 +44,85 @@ func (*NullClock) AfterFunc(time.Duration, func()) tcpip.Timer { return nil } +type notificationChannels struct { + mu struct { + sync.Mutex + + ch []<-chan struct{} + } +} + +func (n *notificationChannels) add(ch <-chan struct{}) { + n.mu.Lock() + defer n.mu.Unlock() + n.mu.ch = append(n.mu.ch, ch) +} + +// wait returns once all the notification channels are readable. +// +// Channels that are added while waiting on existing channels will be waited on +// as well. +func (n *notificationChannels) wait() { + for { + n.mu.Lock() + ch := n.mu.ch + n.mu.ch = nil + n.mu.Unlock() + + if len(ch) == 0 { + break + } + + for _, c := range ch { + <-c + } + } +} + // ManualClock implements tcpip.Clock and only advances manually with Advance // method. type ManualClock struct { - clock clockwork.FakeClock + // runningTimers tracks the completion of timer callbacks that began running + // immediately upon their scheduling. It is used to ensure the proper ordering + // of timer callback dispatch. + runningTimers notificationChannels + + mu struct { + sync.RWMutex - // mu protects the fields below. - mu sync.RWMutex + // now is the current (fake) time of the clock. + now time.Time - // times is min-heap of times. A heap is used for quick retrieval of the next - // upcoming time of scheduled work. - times *timeHeap + // times is min-heap of times. + times timeHeap - // waitGroups stores one WaitGroup for all work scheduled to execute at the - // same time via AfterFunc. This allows parallel execution of all functions - // passed to AfterFunc scheduled for the same time. - waitGroups map[time.Time]*sync.WaitGroup + // timers holds the timers scheduled for each time. + timers map[time.Time]map[*manualTimer]struct{} + } } // NewManualClock creates a new ManualClock instance. func NewManualClock() *ManualClock { - return &ManualClock{ - clock: clockwork.NewFakeClock(), - times: &timeHeap{}, - waitGroups: make(map[time.Time]*sync.WaitGroup), - } + c := &ManualClock{} + + c.mu.Lock() + defer c.mu.Unlock() + + // Set the initial time to a non-zero value since the zero value is used to + // detect inactive timers. + c.mu.now = time.Unix(0, 0) + c.mu.timers = make(map[time.Time]map[*manualTimer]struct{}) + + return c } var _ tcpip.Clock = (*ManualClock)(nil) // NowNanoseconds implements tcpip.Clock.NowNanoseconds. func (mc *ManualClock) NowNanoseconds() int64 { - return mc.clock.Now().UnixNano() + mc.mu.RLock() + defer mc.mu.RUnlock() + return mc.mu.now.UnixNano() } // NowMonotonic implements tcpip.Clock.NowMonotonic. @@ -85,128 +132,203 @@ func (mc *ManualClock) NowMonotonic() int64 { // AfterFunc implements tcpip.Clock.AfterFunc. func (mc *ManualClock) AfterFunc(d time.Duration, f func()) tcpip.Timer { - until := mc.clock.Now().Add(d) - wg := mc.addWait(until) - return &manualTimer{ + mt := &manualTimer{ clock: mc, - until: until, - timer: mc.clock.AfterFunc(d, func() { - defer wg.Done() - f() - }), + f: f, } -} -// addWait adds an additional wait to the WaitGroup for parallel execution of -// all work scheduled for t. Returns a reference to the WaitGroup modified. -func (mc *ManualClock) addWait(t time.Time) *sync.WaitGroup { - mc.mu.RLock() - wg, ok := mc.waitGroups[t] - mc.mu.RUnlock() + mc.mu.Lock() + defer mc.mu.Unlock() + + mt.mu.Lock() + defer mt.mu.Unlock() - if ok { - wg.Add(1) - return wg + mc.resetTimerLocked(mt, d) + return mt +} + +// resetTimerLocked schedules a timer to be fired after the given duration. +// +// Precondition: mc.mu and mt.mu must be locked. +func (mc *ManualClock) resetTimerLocked(mt *manualTimer, d time.Duration) { + if !mt.mu.firesAt.IsZero() { + panic("tried to reset an active timer") } - mc.mu.Lock() - heap.Push(mc.times, t) - mc.mu.Unlock() + t := mc.mu.now.Add(d) - wg = &sync.WaitGroup{} - wg.Add(1) + if !mc.mu.now.Before(t) { + // If the timer is scheduled to fire immediately, call its callback + // in a new goroutine immediately. + // + // It needs to be called in its own goroutine to escape its current + // execution context - like an actual timer. + ch := make(chan struct{}) + mc.runningTimers.add(ch) - mc.mu.Lock() - mc.waitGroups[t] = wg - mc.mu.Unlock() + go func() { + defer close(ch) + + mt.f() + }() - return wg + return + } + + mt.mu.firesAt = t + + timers, ok := mc.mu.timers[t] + if !ok { + timers = make(map[*manualTimer]struct{}) + mc.mu.timers[t] = timers + heap.Push(&mc.mu.times, t) + } + + timers[mt] = struct{}{} } -// removeWait removes a wait from the WaitGroup for parallel execution of all -// work scheduled for t. -func (mc *ManualClock) removeWait(t time.Time) { - mc.mu.RLock() - defer mc.mu.RUnlock() +// stopTimerLocked stops a timer from firing. +// +// Precondition: mc.mu and mt.mu must be locked. +func (mc *ManualClock) stopTimerLocked(mt *manualTimer) { + t := mt.mu.firesAt + mt.mu.firesAt = time.Time{} + + if t.IsZero() { + panic("tried to stop an inactive timer") + } - wg := mc.waitGroups[t] - wg.Done() + timers, ok := mc.mu.timers[t] + if !ok { + err := fmt.Sprintf("tried to stop an active timer but the clock does not have anything scheduled for the timer @ t = %s %p\nScheduled timers @:", t.UTC(), mt) + for t := range mc.mu.timers { + err += fmt.Sprintf("%s\n", t.UTC()) + } + panic(err) + } + + if _, ok := timers[mt]; !ok { + panic(fmt.Sprintf("did not have an entry in timers for an active timer @ t = %s", t.UTC())) + } + + delete(timers, mt) + + if len(timers) == 0 { + delete(mc.mu.timers, t) + } } // Advance executes all work that have been scheduled to execute within d from -// the current time. Blocks until all work has completed execution. +// the current time. Blocks until all work has completed execution. func (mc *ManualClock) Advance(d time.Duration) { - // Block until all the work is done - until := mc.clock.Now().Add(d) - for { - mc.mu.Lock() - if mc.times.Len() == 0 { - mc.mu.Unlock() - break - } + // We spawn goroutines for timers that were scheduled to fire at the time of + // being reset. Wait for those goroutines to complete before proceeding so + // that timer callbacks are called in the right order. + mc.runningTimers.wait() - t := heap.Pop(mc.times).(time.Time) + mc.mu.Lock() + defer mc.mu.Unlock() + + until := mc.mu.now.Add(d) + for mc.mu.times.Len() > 0 { + t := heap.Pop(&mc.mu.times).(time.Time) if t.After(until) { // No work to do - heap.Push(mc.times, t) - mc.mu.Unlock() + heap.Push(&mc.mu.times, t) break } - mc.mu.Unlock() - diff := t.Sub(mc.clock.Now()) - mc.clock.Advance(diff) + timers := mc.mu.timers[t] + delete(mc.mu.timers, t) + + mc.mu.now = t + + // Mark the timers as inactive since they will be fired. + // + // This needs to be done while holding mc's lock because we remove the entry + // in the map of timers for the current time. If an attempt to stop a + // timer is made after mc's lock was dropped but before the timer is + // marked inactive, we would panic since no entry exists for the time when + // the timer was expected to fire. + for mt := range timers { + mt.mu.Lock() + mt.mu.firesAt = time.Time{} + mt.mu.Unlock() + } - mc.mu.RLock() - wg := mc.waitGroups[t] - mc.mu.RUnlock() + // Release the lock before calling the timer's callback fn since the + // callback fn might try to schedule a timer which requires obtaining + // mc's lock. + mc.mu.Unlock() - wg.Wait() + for mt := range timers { + mt.f() + } + // The timer callbacks may have scheduled a timer to fire immediately. + // We spawn goroutines for these timers and need to wait for them to + // finish before proceeding so that timer callbacks are called in the + // right order. + mc.runningTimers.wait() mc.mu.Lock() - delete(mc.waitGroups, t) - mc.mu.Unlock() } - if now := mc.clock.Now(); until.After(now) { - mc.clock.Advance(until.Sub(now)) + + mc.mu.now = until +} + +func (mc *ManualClock) resetTimer(mt *manualTimer, d time.Duration) { + mc.mu.Lock() + defer mc.mu.Unlock() + + mt.mu.Lock() + defer mt.mu.Unlock() + + if !mt.mu.firesAt.IsZero() { + mc.stopTimerLocked(mt) } + + mc.resetTimerLocked(mt, d) +} + +func (mc *ManualClock) stopTimer(mt *manualTimer) bool { + mc.mu.Lock() + defer mc.mu.Unlock() + + mt.mu.Lock() + defer mt.mu.Unlock() + + if mt.mu.firesAt.IsZero() { + return false + } + + mc.stopTimerLocked(mt) + return true } type manualTimer struct { clock *ManualClock - timer clockwork.Timer + f func() - mu sync.RWMutex - until time.Time + mu struct { + sync.Mutex + + // firesAt is the time when the timer will fire. + // + // Zero only when the timer is not active. + firesAt time.Time + } } var _ tcpip.Timer = (*manualTimer)(nil) // Reset implements tcpip.Timer.Reset. -func (t *manualTimer) Reset(d time.Duration) { - if !t.timer.Reset(d) { - return - } - - t.mu.Lock() - defer t.mu.Unlock() - - t.clock.removeWait(t.until) - t.until = t.clock.clock.Now().Add(d) - t.clock.addWait(t.until) +func (mt *manualTimer) Reset(d time.Duration) { + mt.clock.resetTimer(mt, d) } // Stop implements tcpip.Timer.Stop. -func (t *manualTimer) Stop() bool { - if !t.timer.Stop() { - return false - } - - t.mu.RLock() - defer t.mu.RUnlock() - - t.clock.removeWait(t.until) - return true +func (mt *manualTimer) Stop() bool { + return mt.clock.stopTimer(mt) } type timeHeap []time.Time |