From b4357939c0049b7385d98aa0ea7d2d1e00d0710f Mon Sep 17 00:00:00 2001 From: Arthur Sfez Date: Thu, 28 Jan 2021 16:49:56 -0800 Subject: Re-enable ipv4 reassembly packetimpact tests When these specific tests were first added, they would fail when ran by kokoro during the presubmit tests (but they always passed locally). These tests are now passing, so they can be re-enabled. Unclear what changed, one hypothesis is the move from kokoro to buildkite. Fixes #4971 PiperOrigin-RevId: 354425395 --- test/packetimpact/tests/ipv4_fragment_reassembly_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/test/packetimpact/tests/ipv4_fragment_reassembly_test.go b/test/packetimpact/tests/ipv4_fragment_reassembly_test.go index d2203082d..ee050e2c6 100644 --- a/test/packetimpact/tests/ipv4_fragment_reassembly_test.go +++ b/test/packetimpact/tests/ipv4_fragment_reassembly_test.go @@ -45,8 +45,6 @@ func TestIPv4FragmentReassembly(t *testing.T) { ipPayloadLen int fragments []fragmentInfo expectReply bool - skip bool - skipReason string }{ { description: "basic reassembly", @@ -78,8 +76,6 @@ func TestIPv4FragmentReassembly(t *testing.T) { {offset: 2000, size: 1000, id: 7, more: 0}, }, expectReply: true, - skip: true, - skipReason: "gvisor.dev/issues/4971", }, { description: "fragment subset", @@ -91,8 +87,6 @@ func TestIPv4FragmentReassembly(t *testing.T) { {offset: 2000, size: 1000, id: 8, more: 0}, }, expectReply: true, - skip: true, - skipReason: "gvisor.dev/issues/4971", }, { description: "fragment overlap", @@ -104,16 +98,10 @@ func TestIPv4FragmentReassembly(t *testing.T) { {offset: 2000, size: 1000, id: 9, more: 0}, }, expectReply: false, - skip: true, - skipReason: "gvisor.dev/issues/4971", }, } for _, test := range tests { - if test.skip { - t.Skip("%s test skipped: %s", test.description, test.skipReason) - continue - } t.Run(test.description, func(t *testing.T) { dut := testbench.NewDUT(t) conn := dut.Net.NewIPv4Conn(t, testbench.IPv4{}, testbench.IPv4{}) -- cgit v1.2.3 From 56fb2ec1194bff7c57c7bbb84b7b74f63142b915 Mon Sep 17 00:00:00 2001 From: Ghanan Gowripalan Date: Thu, 28 Jan 2021 17:05:44 -0800 Subject: Do not use clockwork for faketime Clockwork does not support timers being reset/stopped from different goroutines. Our current use of clockwork causes data races and gotsan complains about clockwork. This change uses our own implementation of faketime, avoiding data races. PiperOrigin-RevId: 354428208 --- pkg/tcpip/faketime/BUILD | 5 +- pkg/tcpip/faketime/faketime.go | 318 ++++++++++++++++++++++++++++------------- 2 files changed, 221 insertions(+), 102 deletions(-) diff --git a/pkg/tcpip/faketime/BUILD b/pkg/tcpip/faketime/BUILD index 114d43df3..bb9d44aff 100644 --- a/pkg/tcpip/faketime/BUILD +++ b/pkg/tcpip/faketime/BUILD @@ -6,10 +6,7 @@ go_library( name = "faketime", srcs = ["faketime.go"], visibility = ["//visibility:public"], - deps = [ - "//pkg/tcpip", - "@com_github_dpjacques_clockwork//:go_default_library", - ], + deps = ["//pkg/tcpip"], ) go_test( diff --git a/pkg/tcpip/faketime/faketime.go b/pkg/tcpip/faketime/faketime.go index f7a4fbde1..fb819d7a8 100644 --- a/pkg/tcpip/faketime/faketime.go +++ b/pkg/tcpip/faketime/faketime.go @@ -17,10 +17,10 @@ package faketime import ( "container/heap" + "fmt" "sync" "time" - "github.com/dpjacques/clockwork" "gvisor.dev/gvisor/pkg/tcpip" ) @@ -44,38 +44,85 @@ func (*NullClock) AfterFunc(time.Duration, func()) tcpip.Timer { return nil } +type notificationChannels struct { + mu struct { + sync.Mutex + + ch []<-chan struct{} + } +} + +func (n *notificationChannels) add(ch <-chan struct{}) { + n.mu.Lock() + defer n.mu.Unlock() + n.mu.ch = append(n.mu.ch, ch) +} + +// wait returns once all the notification channels are readable. +// +// Channels that are added while waiting on existing channels will be waited on +// as well. +func (n *notificationChannels) wait() { + for { + n.mu.Lock() + ch := n.mu.ch + n.mu.ch = nil + n.mu.Unlock() + + if len(ch) == 0 { + break + } + + for _, c := range ch { + <-c + } + } +} + // ManualClock implements tcpip.Clock and only advances manually with Advance // method. type ManualClock struct { - clock clockwork.FakeClock + // runningTimers tracks the completion of timer callbacks that began running + // immediately upon their scheduling. It is used to ensure the proper ordering + // of timer callback dispatch. + runningTimers notificationChannels + + mu struct { + sync.RWMutex - // mu protects the fields below. - mu sync.RWMutex + // now is the current (fake) time of the clock. + now time.Time - // times is min-heap of times. A heap is used for quick retrieval of the next - // upcoming time of scheduled work. - times *timeHeap + // times is min-heap of times. + times timeHeap - // waitGroups stores one WaitGroup for all work scheduled to execute at the - // same time via AfterFunc. This allows parallel execution of all functions - // passed to AfterFunc scheduled for the same time. - waitGroups map[time.Time]*sync.WaitGroup + // timers holds the timers scheduled for each time. + timers map[time.Time]map[*manualTimer]struct{} + } } // NewManualClock creates a new ManualClock instance. func NewManualClock() *ManualClock { - return &ManualClock{ - clock: clockwork.NewFakeClock(), - times: &timeHeap{}, - waitGroups: make(map[time.Time]*sync.WaitGroup), - } + c := &ManualClock{} + + c.mu.Lock() + defer c.mu.Unlock() + + // Set the initial time to a non-zero value since the zero value is used to + // detect inactive timers. + c.mu.now = time.Unix(0, 0) + c.mu.timers = make(map[time.Time]map[*manualTimer]struct{}) + + return c } var _ tcpip.Clock = (*ManualClock)(nil) // NowNanoseconds implements tcpip.Clock.NowNanoseconds. func (mc *ManualClock) NowNanoseconds() int64 { - return mc.clock.Now().UnixNano() + mc.mu.RLock() + defer mc.mu.RUnlock() + return mc.mu.now.UnixNano() } // NowMonotonic implements tcpip.Clock.NowMonotonic. @@ -85,128 +132,203 @@ func (mc *ManualClock) NowMonotonic() int64 { // AfterFunc implements tcpip.Clock.AfterFunc. func (mc *ManualClock) AfterFunc(d time.Duration, f func()) tcpip.Timer { - until := mc.clock.Now().Add(d) - wg := mc.addWait(until) - return &manualTimer{ + mt := &manualTimer{ clock: mc, - until: until, - timer: mc.clock.AfterFunc(d, func() { - defer wg.Done() - f() - }), + f: f, } -} -// addWait adds an additional wait to the WaitGroup for parallel execution of -// all work scheduled for t. Returns a reference to the WaitGroup modified. -func (mc *ManualClock) addWait(t time.Time) *sync.WaitGroup { - mc.mu.RLock() - wg, ok := mc.waitGroups[t] - mc.mu.RUnlock() + mc.mu.Lock() + defer mc.mu.Unlock() + + mt.mu.Lock() + defer mt.mu.Unlock() - if ok { - wg.Add(1) - return wg + mc.resetTimerLocked(mt, d) + return mt +} + +// resetTimerLocked schedules a timer to be fired after the given duration. +// +// Precondition: mc.mu and mt.mu must be locked. +func (mc *ManualClock) resetTimerLocked(mt *manualTimer, d time.Duration) { + if !mt.mu.firesAt.IsZero() { + panic("tried to reset an active timer") } - mc.mu.Lock() - heap.Push(mc.times, t) - mc.mu.Unlock() + t := mc.mu.now.Add(d) - wg = &sync.WaitGroup{} - wg.Add(1) + if !mc.mu.now.Before(t) { + // If the timer is scheduled to fire immediately, call its callback + // in a new goroutine immediately. + // + // It needs to be called in its own goroutine to escape its current + // execution context - like an actual timer. + ch := make(chan struct{}) + mc.runningTimers.add(ch) - mc.mu.Lock() - mc.waitGroups[t] = wg - mc.mu.Unlock() + go func() { + defer close(ch) + + mt.f() + }() - return wg + return + } + + mt.mu.firesAt = t + + timers, ok := mc.mu.timers[t] + if !ok { + timers = make(map[*manualTimer]struct{}) + mc.mu.timers[t] = timers + heap.Push(&mc.mu.times, t) + } + + timers[mt] = struct{}{} } -// removeWait removes a wait from the WaitGroup for parallel execution of all -// work scheduled for t. -func (mc *ManualClock) removeWait(t time.Time) { - mc.mu.RLock() - defer mc.mu.RUnlock() +// stopTimerLocked stops a timer from firing. +// +// Precondition: mc.mu and mt.mu must be locked. +func (mc *ManualClock) stopTimerLocked(mt *manualTimer) { + t := mt.mu.firesAt + mt.mu.firesAt = time.Time{} + + if t.IsZero() { + panic("tried to stop an inactive timer") + } - wg := mc.waitGroups[t] - wg.Done() + timers, ok := mc.mu.timers[t] + if !ok { + err := fmt.Sprintf("tried to stop an active timer but the clock does not have anything scheduled for the timer @ t = %s %p\nScheduled timers @:", t.UTC(), mt) + for t := range mc.mu.timers { + err += fmt.Sprintf("%s\n", t.UTC()) + } + panic(err) + } + + if _, ok := timers[mt]; !ok { + panic(fmt.Sprintf("did not have an entry in timers for an active timer @ t = %s", t.UTC())) + } + + delete(timers, mt) + + if len(timers) == 0 { + delete(mc.mu.timers, t) + } } // Advance executes all work that have been scheduled to execute within d from -// the current time. Blocks until all work has completed execution. +// the current time. Blocks until all work has completed execution. func (mc *ManualClock) Advance(d time.Duration) { - // Block until all the work is done - until := mc.clock.Now().Add(d) - for { - mc.mu.Lock() - if mc.times.Len() == 0 { - mc.mu.Unlock() - break - } + // We spawn goroutines for timers that were scheduled to fire at the time of + // being reset. Wait for those goroutines to complete before proceeding so + // that timer callbacks are called in the right order. + mc.runningTimers.wait() - t := heap.Pop(mc.times).(time.Time) + mc.mu.Lock() + defer mc.mu.Unlock() + + until := mc.mu.now.Add(d) + for mc.mu.times.Len() > 0 { + t := heap.Pop(&mc.mu.times).(time.Time) if t.After(until) { // No work to do - heap.Push(mc.times, t) - mc.mu.Unlock() + heap.Push(&mc.mu.times, t) break } - mc.mu.Unlock() - diff := t.Sub(mc.clock.Now()) - mc.clock.Advance(diff) + timers := mc.mu.timers[t] + delete(mc.mu.timers, t) + + mc.mu.now = t + + // Mark the timers as inactive since they will be fired. + // + // This needs to be done while holding mc's lock because we remove the entry + // in the map of timers for the current time. If an attempt to stop a + // timer is made after mc's lock was dropped but before the timer is + // marked inactive, we would panic since no entry exists for the time when + // the timer was expected to fire. + for mt := range timers { + mt.mu.Lock() + mt.mu.firesAt = time.Time{} + mt.mu.Unlock() + } - mc.mu.RLock() - wg := mc.waitGroups[t] - mc.mu.RUnlock() + // Release the lock before calling the timer's callback fn since the + // callback fn might try to schedule a timer which requires obtaining + // mc's lock. + mc.mu.Unlock() - wg.Wait() + for mt := range timers { + mt.f() + } + // The timer callbacks may have scheduled a timer to fire immediately. + // We spawn goroutines for these timers and need to wait for them to + // finish before proceeding so that timer callbacks are called in the + // right order. + mc.runningTimers.wait() mc.mu.Lock() - delete(mc.waitGroups, t) - mc.mu.Unlock() } - if now := mc.clock.Now(); until.After(now) { - mc.clock.Advance(until.Sub(now)) + + mc.mu.now = until +} + +func (mc *ManualClock) resetTimer(mt *manualTimer, d time.Duration) { + mc.mu.Lock() + defer mc.mu.Unlock() + + mt.mu.Lock() + defer mt.mu.Unlock() + + if !mt.mu.firesAt.IsZero() { + mc.stopTimerLocked(mt) } + + mc.resetTimerLocked(mt, d) +} + +func (mc *ManualClock) stopTimer(mt *manualTimer) bool { + mc.mu.Lock() + defer mc.mu.Unlock() + + mt.mu.Lock() + defer mt.mu.Unlock() + + if mt.mu.firesAt.IsZero() { + return false + } + + mc.stopTimerLocked(mt) + return true } type manualTimer struct { clock *ManualClock - timer clockwork.Timer + f func() - mu sync.RWMutex - until time.Time + mu struct { + sync.Mutex + + // firesAt is the time when the timer will fire. + // + // Zero only when the timer is not active. + firesAt time.Time + } } var _ tcpip.Timer = (*manualTimer)(nil) // Reset implements tcpip.Timer.Reset. -func (t *manualTimer) Reset(d time.Duration) { - if !t.timer.Reset(d) { - return - } - - t.mu.Lock() - defer t.mu.Unlock() - - t.clock.removeWait(t.until) - t.until = t.clock.clock.Now().Add(d) - t.clock.addWait(t.until) +func (mt *manualTimer) Reset(d time.Duration) { + mt.clock.resetTimer(mt, d) } // Stop implements tcpip.Timer.Stop. -func (t *manualTimer) Stop() bool { - if !t.timer.Stop() { - return false - } - - t.mu.RLock() - defer t.mu.RUnlock() - - t.clock.removeWait(t.until) - return true +func (mt *manualTimer) Stop() bool { + return mt.clock.stopTimer(mt) } type timeHeap []time.Time -- cgit v1.2.3