summaryrefslogtreecommitdiffhomepage
path: root/pkg/sleep
diff options
context:
space:
mode:
authorGoogler <noreply@google.com>2018-04-27 10:37:02 -0700
committerAdin Scannell <ascannell@google.com>2018-04-28 01:44:26 -0400
commitd02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch)
tree54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/sleep
parentf70210e742919f40aa2f0934a22f1c9ba6dada62 (diff)
Check in gVisor.
PiperOrigin-RevId: 194583126 Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/sleep')
-rw-r--r--pkg/sleep/BUILD24
-rw-r--r--pkg/sleep/commit_amd64.s21
-rw-r--r--pkg/sleep/commit_arm64.s5
-rw-r--r--pkg/sleep/commit_asm.go10
-rw-r--r--pkg/sleep/commit_noasm.go32
-rw-r--r--pkg/sleep/sleep_test.go528
-rw-r--r--pkg/sleep/sleep_unsafe.go385
7 files changed, 1005 insertions, 0 deletions
diff --git a/pkg/sleep/BUILD b/pkg/sleep/BUILD
new file mode 100644
index 000000000..ab3f9ad99
--- /dev/null
+++ b/pkg/sleep/BUILD
@@ -0,0 +1,24 @@
+package(licenses = ["notice"]) # BSD
+
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "sleep",
+ srcs = [
+ "commit_amd64.s",
+ "commit_asm.go",
+ "commit_noasm.go",
+ "sleep_unsafe.go",
+ ],
+ importpath = "gvisor.googlesource.com/gvisor/pkg/sleep",
+ visibility = ["//:sandbox"],
+)
+
+go_test(
+ name = "sleep_test",
+ size = "medium",
+ srcs = [
+ "sleep_test.go",
+ ],
+ embed = [":sleep"],
+)
diff --git a/pkg/sleep/commit_amd64.s b/pkg/sleep/commit_amd64.s
new file mode 100644
index 000000000..a5b620bf8
--- /dev/null
+++ b/pkg/sleep/commit_amd64.s
@@ -0,0 +1,21 @@
+#include "textflag.h"
+
+#define preparingG 1
+
+// See commit_noasm.go for a description of commitSleep.
+//
+// func commitSleep(g uintptr, waitingG *uintptr) bool
+TEXT ·commitSleep(SB),NOSPLIT,$0-24
+ MOVQ waitingG+8(FP), CX
+ MOVQ g+0(FP), DX
+
+ // Store the G in waitingG if it's still preparingG. If it's anything
+ // else it means a waker has aborted the sleep.
+ MOVQ $preparingG, AX
+ LOCK
+ CMPXCHGQ DX, 0(CX)
+
+ SETEQ AX
+ MOVB AX, ret+16(FP)
+
+ RET
diff --git a/pkg/sleep/commit_arm64.s b/pkg/sleep/commit_arm64.s
new file mode 100644
index 000000000..9d351d00a
--- /dev/null
+++ b/pkg/sleep/commit_arm64.s
@@ -0,0 +1,5 @@
+// Copyright 2017 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Empty assembly file so empty func definitions work.
diff --git a/pkg/sleep/commit_asm.go b/pkg/sleep/commit_asm.go
new file mode 100644
index 000000000..b7589dfef
--- /dev/null
+++ b/pkg/sleep/commit_asm.go
@@ -0,0 +1,10 @@
+// Copyright 2017 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build amd64
+
+package sleep
+
+// See commit_noasm.go for a description of commitSleep.
+func commitSleep(g uintptr, waitingG *uintptr) bool
diff --git a/pkg/sleep/commit_noasm.go b/pkg/sleep/commit_noasm.go
new file mode 100644
index 000000000..22c734e1d
--- /dev/null
+++ b/pkg/sleep/commit_noasm.go
@@ -0,0 +1,32 @@
+// Copyright 2017 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !race
+// +build !amd64
+
+package sleep
+
+import "sync/atomic"
+
+// commitSleep signals to wakers that the given g is now sleeping. Wakers can
+// then fetch it and wake it.
+//
+// The commit may fail if wakers have been asserted after our last check, in
+// which case they will have set s.waitingG to zero.
+//
+// It is written in assembly because it is called from g0, so it doesn't have
+// a race context.
+func commitSleep(g uintptr, waitingG *uintptr) bool {
+ for {
+ // Check if the wait was aborted.
+ if atomic.LoadUintptr(waitingG) == 0 {
+ return false
+ }
+
+ // Try to store the G so that wakers know who to wake.
+ if atomic.CompareAndSwapUintptr(waitingG, preparingG, g) {
+ return true
+ }
+ }
+}
diff --git a/pkg/sleep/sleep_test.go b/pkg/sleep/sleep_test.go
new file mode 100644
index 000000000..eba18014c
--- /dev/null
+++ b/pkg/sleep/sleep_test.go
@@ -0,0 +1,528 @@
+package sleep
+
+import (
+ "math/rand"
+ "runtime"
+ "testing"
+ "time"
+)
+
+// ZeroWakerNotAsserted tests that a zero-value waker is in non-asserted state.
+func ZeroWakerNotAsserted(t *testing.T) {
+ var w Waker
+ if w.IsAsserted() {
+ t.Fatalf("Zero waker is asserted")
+ }
+
+ if w.Clear() {
+ t.Fatalf("Zero waker is asserted")
+ }
+}
+
+// AssertedWakerAfterAssert tests that a waker properly reports its state as
+// asserted once its Assert() method is called.
+func AssertedWakerAfterAssert(t *testing.T) {
+ var w Waker
+ w.Assert()
+ if !w.IsAsserted() {
+ t.Fatalf("Asserted waker is not reported as such")
+ }
+
+ if !w.Clear() {
+ t.Fatalf("Asserted waker is not reported as such")
+ }
+}
+
+// AssertedWakerAfterTwoAsserts tests that a waker properly reports its state as
+// asserted once its Assert() method is called twice.
+func AssertedWakerAfterTwoAsserts(t *testing.T) {
+ var w Waker
+ w.Assert()
+ w.Assert()
+ if !w.IsAsserted() {
+ t.Fatalf("Asserted waker is not reported as such")
+ }
+
+ if !w.Clear() {
+ t.Fatalf("Asserted waker is not reported as such")
+ }
+}
+
+// NotAssertedWakerWithSleeper tests that a waker properly reports its state as
+// not asserted after a sleeper is associated with it.
+func NotAssertedWakerWithSleeper(t *testing.T) {
+ var w Waker
+ var s Sleeper
+ s.AddWaker(&w, 0)
+ if w.IsAsserted() {
+ t.Fatalf("Non-asserted waker is reported as asserted")
+ }
+
+ if w.Clear() {
+ t.Fatalf("Non-asserted waker is reported as asserted")
+ }
+}
+
+// NotAssertedWakerAfterWake tests that a waker properly reports its state as
+// not asserted after a previous assert is consumed by a sleeper. That is, tests
+// the "edge-triggered" behavior.
+func NotAssertedWakerAfterWake(t *testing.T) {
+ var w Waker
+ var s Sleeper
+ s.AddWaker(&w, 0)
+ w.Assert()
+ s.Fetch(true)
+ if w.IsAsserted() {
+ t.Fatalf("Consumed waker is reported as asserted")
+ }
+
+ if w.Clear() {
+ t.Fatalf("Consumed waker is reported as asserted")
+ }
+}
+
+// AssertedWakerBeforeAdd tests that a waker causes a sleeper to not sleep if
+// it's already asserted before being added.
+func AssertedWakerBeforeAdd(t *testing.T) {
+ var w Waker
+ var s Sleeper
+ w.Assert()
+ s.AddWaker(&w, 0)
+
+ if _, ok := s.Fetch(false); !ok {
+ t.Fatalf("Fetch failed even though asserted waker was added")
+ }
+}
+
+// ClearedWaker tests that a waker properly reports its state as not asserted
+// after it is cleared.
+func ClearedWaker(t *testing.T) {
+ var w Waker
+ w.Assert()
+ w.Clear()
+ if w.IsAsserted() {
+ t.Fatalf("Cleared waker is reported as asserted")
+ }
+
+ if w.Clear() {
+ t.Fatalf("Cleared waker is reported as asserted")
+ }
+}
+
+// ClearedWakerWithSleeper tests that a waker properly reports its state as
+// not asserted when it is cleared while it has a sleeper associated with it.
+func ClearedWakerWithSleeper(t *testing.T) {
+ var w Waker
+ var s Sleeper
+ s.AddWaker(&w, 0)
+ w.Clear()
+ if w.IsAsserted() {
+ t.Fatalf("Cleared waker is reported as asserted")
+ }
+
+ if w.Clear() {
+ t.Fatalf("Cleared waker is reported as asserted")
+ }
+}
+
+// ClearedWakerAssertedWithSleeper tests that a waker properly reports its state
+// as not asserted when it is cleared while it has a sleeper associated with it
+// and has been asserted.
+func ClearedWakerAssertedWithSleeper(t *testing.T) {
+ var w Waker
+ var s Sleeper
+ s.AddWaker(&w, 0)
+ w.Assert()
+ w.Clear()
+ if w.IsAsserted() {
+ t.Fatalf("Cleared waker is reported as asserted")
+ }
+
+ if w.Clear() {
+ t.Fatalf("Cleared waker is reported as asserted")
+ }
+}
+
+// TestBlock tests that a sleeper actually blocks waiting for the waker to
+// assert its state.
+func TestBlock(t *testing.T) {
+ var w Waker
+ var s Sleeper
+
+ s.AddWaker(&w, 0)
+
+ // Assert waker after one second.
+ before := time.Now()
+ go func() {
+ time.Sleep(1 * time.Second)
+ w.Assert()
+ }()
+
+ // Fetch the result and make sure it took at least 500ms.
+ if _, ok := s.Fetch(true); !ok {
+ t.Fatalf("Fetch failed unexpectedly")
+ }
+ if d := time.Now().Sub(before); d < 500*time.Millisecond {
+ t.Fatalf("Duration was too short: %v", d)
+ }
+
+ // Check that already-asserted waker completes inline.
+ w.Assert()
+ if _, ok := s.Fetch(true); !ok {
+ t.Fatalf("Fetch failed unexpectedly")
+ }
+
+ // Check that fetch sleeps if waker had been asserted but was reset
+ // before Fetch is called.
+ w.Assert()
+ w.Clear()
+ before = time.Now()
+ go func() {
+ time.Sleep(1 * time.Second)
+ w.Assert()
+ }()
+ if _, ok := s.Fetch(true); !ok {
+ t.Fatalf("Fetch failed unexpectedly")
+ }
+ if d := time.Now().Sub(before); d < 500*time.Millisecond {
+ t.Fatalf("Duration was too short: %v", d)
+ }
+}
+
+// TestNonBlock checks that a sleeper won't block if waker isn't asserted.
+func TestNonBlock(t *testing.T) {
+ var w Waker
+ var s Sleeper
+
+ // Don't block when there's no waker.
+ if _, ok := s.Fetch(false); ok {
+ t.Fatalf("Fetch succeeded when there is no waker")
+ }
+
+ // Don't block when waker isn't asserted.
+ s.AddWaker(&w, 0)
+ if _, ok := s.Fetch(false); ok {
+ t.Fatalf("Fetch succeeded when waker was not asserted")
+ }
+
+ // Don't block when waker was asserted, but isn't anymore.
+ w.Assert()
+ w.Clear()
+ if _, ok := s.Fetch(false); ok {
+ t.Fatalf("Fetch succeeded when waker was not asserted anymore")
+ }
+
+ // Don't block when waker was consumed by previous Fetch().
+ w.Assert()
+ if _, ok := s.Fetch(false); !ok {
+ t.Fatalf("Fetch failed even though waker was asserted")
+ }
+
+ if _, ok := s.Fetch(false); ok {
+ t.Fatalf("Fetch succeeded when waker had been consumed")
+ }
+}
+
+// TestMultiple checks that a sleeper can wait for and receives notifications
+// from multiple wakers.
+func TestMultiple(t *testing.T) {
+ s := Sleeper{}
+ w1 := Waker{}
+ w2 := Waker{}
+
+ s.AddWaker(&w1, 0)
+ s.AddWaker(&w2, 1)
+
+ w1.Assert()
+ w2.Assert()
+
+ v, ok := s.Fetch(false)
+ if !ok {
+ t.Fatalf("Fetch failed when there are asserted wakers")
+ }
+
+ if v != 0 && v != 1 {
+ t.Fatalf("Unexpected waker id: %v", v)
+ }
+
+ want := 1 - v
+ v, ok = s.Fetch(false)
+ if !ok {
+ t.Fatalf("Fetch failed when there is an asserted waker")
+ }
+
+ if v != want {
+ t.Fatalf("Unexpected waker id, got %v, want %v", v, want)
+ }
+}
+
+// TestDoneFunction tests if calling Done() on a sleeper works properly.
+func TestDoneFunction(t *testing.T) {
+ // Trivial case of no waker.
+ s := Sleeper{}
+ s.Done()
+
+ // Cases when the sleeper has n wakers, but none are asserted.
+ for n := 1; n < 20; n++ {
+ s := Sleeper{}
+ w := make([]Waker, n)
+ for j := 0; j < n; j++ {
+ s.AddWaker(&w[j], j)
+ }
+ s.Done()
+ }
+
+ // Cases when the sleeper has n wakers, and only the i-th one is
+ // asserted.
+ for n := 1; n < 20; n++ {
+ for i := 0; i < n; i++ {
+ s := Sleeper{}
+ w := make([]Waker, n)
+ for j := 0; j < n; j++ {
+ s.AddWaker(&w[j], j)
+ }
+ w[i].Assert()
+ s.Done()
+ }
+ }
+
+ // Cases when the sleeper has n wakers, and the i-th one is asserted
+ // and cleared.
+ for n := 1; n < 20; n++ {
+ for i := 0; i < n; i++ {
+ s := Sleeper{}
+ w := make([]Waker, n)
+ for j := 0; j < n; j++ {
+ s.AddWaker(&w[j], j)
+ }
+ w[i].Assert()
+ w[i].Clear()
+ s.Done()
+ }
+ }
+
+ // Cases when the sleeper has n wakers, with a random number of them
+ // asserted.
+ for n := 1; n < 20; n++ {
+ for iters := 0; iters < 1000; iters++ {
+ s := Sleeper{}
+ w := make([]Waker, n)
+ for j := 0; j < n; j++ {
+ s.AddWaker(&w[j], j)
+ }
+
+ // Pick the number of asserted elements, then assert
+ // random wakers.
+ asserted := rand.Int() % (n + 1)
+ for j := 0; j < asserted; j++ {
+ w[rand.Int()%n].Assert()
+ }
+ s.Done()
+ }
+ }
+}
+
+// TestRace tests that multiple wakers can continuously send wake requests to
+// the sleeper.
+func TestRace(t *testing.T) {
+ const wakers = 100
+ const wakeRequests = 10000
+
+ counts := make([]int, wakers)
+ w := make([]Waker, wakers)
+ s := Sleeper{}
+
+ // Associate each waker and start goroutines that will assert them.
+ for i := range w {
+ s.AddWaker(&w[i], i)
+ go func(w *Waker) {
+ n := 0
+ for n < wakeRequests {
+ if !w.IsAsserted() {
+ w.Assert()
+ n++
+ } else {
+ runtime.Gosched()
+ }
+ }
+ }(&w[i])
+ }
+
+ // Wait for all wake up notifications from all wakers.
+ for i := 0; i < wakers*wakeRequests; i++ {
+ v, _ := s.Fetch(true)
+ counts[v]++
+ }
+
+ // Check that we got the right number for each.
+ for i, v := range counts {
+ if v != wakeRequests {
+ t.Errorf("Waker %v only got %v wakes", i, v)
+ }
+ }
+}
+
+// BenchmarkSleeperMultiSelect measures how long it takes to fetch a wake up
+// from 4 wakers when at least one is already asserted.
+func BenchmarkSleeperMultiSelect(b *testing.B) {
+ const count = 4
+ s := Sleeper{}
+ w := make([]Waker, count)
+ for i := range w {
+ s.AddWaker(&w[i], i)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ w[count-1].Assert()
+ s.Fetch(true)
+ }
+}
+
+// BenchmarkGoMultiSelect measures how long it takes to fetch a zero-length
+// struct from one of 4 channels when at least one is ready.
+func BenchmarkGoMultiSelect(b *testing.B) {
+ const count = 4
+ ch := make([]chan struct{}, count)
+ for i := range ch {
+ ch[i] = make(chan struct{}, 1)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ ch[count-1] <- struct{}{}
+ select {
+ case <-ch[0]:
+ case <-ch[1]:
+ case <-ch[2]:
+ case <-ch[3]:
+ }
+ }
+}
+
+// BenchmarkSleeperSingleSelect measures how long it takes to fetch a wake up
+// from one waker that is already asserted.
+func BenchmarkSleeperSingleSelect(b *testing.B) {
+ s := Sleeper{}
+ w := Waker{}
+ s.AddWaker(&w, 0)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ w.Assert()
+ s.Fetch(true)
+ }
+}
+
+// BenchmarkGoSingleSelect measures how long it takes to fetch a zero-length
+// struct from a channel that already has it buffered.
+func BenchmarkGoSingleSelect(b *testing.B) {
+ ch := make(chan struct{}, 1)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ ch <- struct{}{}
+ <-ch
+ }
+}
+
+// BenchmarkSleeperAssertNonWaiting measures how long it takes to assert a
+// channel that is already asserted.
+func BenchmarkSleeperAssertNonWaiting(b *testing.B) {
+ w := Waker{}
+ w.Assert()
+ for i := 0; i < b.N; i++ {
+ w.Assert()
+ }
+
+}
+
+// BenchmarkGoAssertNonWaiting measures how long it takes to write to a channel
+// that has already something written to it.
+func BenchmarkGoAssertNonWaiting(b *testing.B) {
+ ch := make(chan struct{}, 1)
+ ch <- struct{}{}
+ for i := 0; i < b.N; i++ {
+ select {
+ case ch <- struct{}{}:
+ default:
+ }
+ }
+}
+
+// BenchmarkSleeperWaitOnSingleSelect measures how long it takes to wait on one
+// waker channel while another goroutine wakes up the sleeper. This assumes that
+// a new goroutine doesn't run immediately (i.e., the creator of a new goroutine
+// is allowed to go to sleep before the new goroutine has a chance to run).
+func BenchmarkSleeperWaitOnSingleSelect(b *testing.B) {
+ s := Sleeper{}
+ w := Waker{}
+ s.AddWaker(&w, 0)
+ for i := 0; i < b.N; i++ {
+ go func() {
+ w.Assert()
+ }()
+ s.Fetch(true)
+ }
+
+}
+
+// BenchmarkGoWaitOnSingleSelect measures how long it takes to wait on one
+// channel while another goroutine wakes up the sleeper. This assumes that a new
+// goroutine doesn't run immediately (i.e., the creator of a new goroutine is
+// allowed to go to sleep before the new goroutine has a chance to run).
+func BenchmarkGoWaitOnSingleSelect(b *testing.B) {
+ ch := make(chan struct{}, 1)
+ for i := 0; i < b.N; i++ {
+ go func() {
+ ch <- struct{}{}
+ }()
+ <-ch
+ }
+}
+
+// BenchmarkSleeperWaitOnMultiSelect measures how long it takes to wait on 4
+// wakers while another goroutine wakes up the sleeper. This assumes that a new
+// goroutine doesn't run immediately (i.e., the creator of a new goroutine is
+// allowed to go to sleep before the new goroutine has a chance to run).
+func BenchmarkSleeperWaitOnMultiSelect(b *testing.B) {
+ const count = 4
+ s := Sleeper{}
+ w := make([]Waker, count)
+ for i := range w {
+ s.AddWaker(&w[i], i)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ go func() {
+ w[count-1].Assert()
+ }()
+ s.Fetch(true)
+ }
+}
+
+// BenchmarkGoWaitOnMultiSelect measures how long it takes to wait on 4 channels
+// while another goroutine wakes up the sleeper. This assumes that a new
+// goroutine doesn't run immediately (i.e., the creator of a new goroutine is
+// allowed to go to sleep before the new goroutine has a chance to run).
+func BenchmarkGoWaitOnMultiSelect(b *testing.B) {
+ const count = 4
+ ch := make([]chan struct{}, count)
+ for i := range ch {
+ ch[i] = make(chan struct{}, 1)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ go func() {
+ ch[count-1] <- struct{}{}
+ }()
+ select {
+ case <-ch[0]:
+ case <-ch[1]:
+ case <-ch[2]:
+ case <-ch[3]:
+ }
+ }
+}
diff --git a/pkg/sleep/sleep_unsafe.go b/pkg/sleep/sleep_unsafe.go
new file mode 100644
index 000000000..5ecb7a3ac
--- /dev/null
+++ b/pkg/sleep/sleep_unsafe.go
@@ -0,0 +1,385 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package sleep allows goroutines to efficiently sleep on multiple sources of
+// notifications (wakers). It offers O(1) complexity, which is different from
+// multi-channel selects which have O(n) complexity (where n is the number of
+// channels) and a considerable constant factor.
+//
+// It is similar to edge-triggered epoll waits, where the user registers each
+// object of interest once, and then can repeatedly wait on all of them.
+//
+// A Waker object is used to wake a sleeping goroutine (G) up, or prevent it
+// from going to sleep next. A Sleeper object is used to receive notifications
+// from wakers, and if no notifications are available, to optionally sleep until
+// one becomes available.
+//
+// A Waker can be associated with at most one Sleeper, but a Sleeper can be
+// associated with multiple Wakers. A Sleeper has a list of asserted (ready)
+// wakers; when Fetch() is called repeatedly, elements from this list are
+// returned until the list becomes empty in which case the goroutine goes to
+// sleep. When Assert() is called on a Waker, it adds itself to the Sleeper's
+// asserted list and wakes the G up from its sleep if needed.
+//
+// Sleeper objects are expected to be used as follows, with just one goroutine
+// executing this code:
+//
+// // One time set-up.
+// s := sleep.Sleeper{}
+// s.AddWaker(&w1, constant1)
+// s.AddWaker(&w2, constant2)
+//
+// // Called repeatedly.
+// for {
+// switch id, _ := s.Fetch(true); id {
+// case constant1:
+// // Do work triggered by w1 being asserted.
+// case constant2:
+// // Do work triggered by w2 being asserted.
+// }
+// }
+//
+// And Waker objects are expected to call w.Assert() when they want the sleeper
+// to wake up and perform work.
+//
+// The notifications are edge-triggered, which means that if a Waker calls
+// Assert() several times before the sleeper has the chance to wake up, it will
+// only be notified once and should perform all pending work (alternatively, it
+// can also call Assert() on the waker, to ensure that it will wake up again).
+//
+// The "unsafeness" here is in the casts to/from unsafe.Pointer, which is safe
+// when only one type is used for each unsafe.Pointer (which is the case here),
+// we should just make sure that this remains the case in the future. The usage
+// of unsafe package could be confined to sharedWaker and sharedSleeper types
+// that would hold pointers in atomic.Pointers, but the go compiler currently
+// can't optimize these as well (it won't inline their method calls), which
+// reduces performance.
+package sleep
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+const (
+ // preparingG is stored in sleepers to indicate that they're preparing
+ // to sleep.
+ preparingG = 1
+)
+
+var (
+ // assertedSleeper is a sentinel sleeper. A pointer to it is stored in
+ // wakers that are asserted.
+ assertedSleeper Sleeper
+)
+
+//go:linkname gopark runtime.gopark
+func gopark(unlockf func(uintptr, *uintptr) bool, wg *uintptr, reason string, traceEv byte, traceskip int)
+
+//go:linkname goready runtime.goready
+func goready(g uintptr, traceskip int)
+
+// Sleeper allows a goroutine to sleep and receive wake up notifications from
+// Wakers in an efficient way.
+//
+// This is similar to edge-triggered epoll in that wakers are added to the
+// sleeper once and the sleeper can then repeatedly sleep in O(1) time while
+// waiting on all wakers.
+//
+// None of the methods in a Sleeper can be called concurrently. Wakers that have
+// been added to a sleeper A can only be added to another sleeper after A.Done()
+// returns. These restrictions allow this to be implemented lock-free.
+//
+// This struct is thread-compatible.
+type Sleeper struct {
+ // sharedList is a "stack" of asserted wakers. They atomically add
+ // themselves to the front of this list as they become asserted.
+ sharedList unsafe.Pointer
+
+ // localList is a list of asserted wakers that is only accessible to the
+ // waiter, and thus doesn't have to be accessed atomically. When
+ // fetching more wakers, the waiter will first go through this list, and
+ // only when it's empty will it atomically fetch wakers from
+ // sharedList.
+ localList *Waker
+
+ // allWakers is a list with all wakers that have been added to this
+ // sleeper. It is used during cleanup to remove associations.
+ allWakers *Waker
+
+ // waitingG holds the G that is sleeping, if any. It is used by wakers
+ // to determine which G, if any, they should wake.
+ waitingG uintptr
+}
+
+// AddWaker associates the given waker to the sleeper. id is the value to be
+// returned when the sleeper is woken by the given waker.
+func (s *Sleeper) AddWaker(w *Waker, id int) {
+ // Add the waker to the list of all wakers.
+ w.allWakersNext = s.allWakers
+ s.allWakers = w
+ w.id = id
+
+ // Try to associate the waker with the sleeper. If it's already
+ // asserted, we simply enqueue it in the "ready" list.
+ for {
+ p := (*Sleeper)(atomic.LoadPointer(&w.s))
+ if p == &assertedSleeper {
+ s.enqueueAssertedWaker(w)
+ return
+ }
+
+ if atomic.CompareAndSwapPointer(&w.s, usleeper(p), usleeper(s)) {
+ return
+ }
+ }
+}
+
+// nextWaker returns the next waker in the notification list, blocking if
+// needed.
+func (s *Sleeper) nextWaker(block bool) *Waker {
+ // Attempt to replenish the local list if it's currently empty.
+ if s.localList == nil {
+ for atomic.LoadPointer(&s.sharedList) == nil {
+ // Fail request if caller requested that we
+ // don't block.
+ if !block {
+ return nil
+ }
+
+ // Indicate to wakers that we're about to sleep,
+ // this allows them to abort the wait by setting
+ // waitingG back to zero (which we'll notice
+ // before committing the sleep).
+ atomic.StoreUintptr(&s.waitingG, preparingG)
+
+ // Check if something was queued while we were
+ // preparing to sleep. We need this interleaving
+ // to avoid missing wake ups.
+ if atomic.LoadPointer(&s.sharedList) != nil {
+ atomic.StoreUintptr(&s.waitingG, 0)
+ break
+ }
+
+ // Try to commit the sleep and report it to the
+ // tracer as a select.
+ //
+ // gopark puts the caller to sleep and calls
+ // commitSleep to decide whether to immediately
+ // wake the caller up or to leave it sleeping.
+ const traceEvGoBlockSelect = 24
+ gopark(commitSleep, &s.waitingG, "sleeper", traceEvGoBlockSelect, 0)
+ }
+
+ // Pull the shared list out and reverse it in the local
+ // list. Given that wakers push themselves in reverse
+ // order, we fix things here.
+ v := (*Waker)(atomic.SwapPointer(&s.sharedList, nil))
+ for v != nil {
+ cur := v
+ v = v.next
+
+ cur.next = s.localList
+ s.localList = cur
+ }
+ }
+
+ // Remove the waker in the front of the list.
+ w := s.localList
+ s.localList = w.next
+
+ return w
+}
+
+// Fetch fetches the next wake-up notification. If a notification is immediately
+// available, it is returned right away. Otherwise, the behavior depends on the
+// value of 'block': if true, the current goroutine blocks until a notification
+// arrives, then returns it; if false, returns 'ok' as false.
+//
+// When 'ok' is true, the value of 'id' corresponds to the id associated with
+// the waker; when 'ok' is false, 'id' is undefined.
+//
+// N.B. This method is *not* thread-safe. Only one goroutine at a time is
+// allowed to call this method.
+func (s *Sleeper) Fetch(block bool) (id int, ok bool) {
+ for {
+ w := s.nextWaker(block)
+ if w == nil {
+ return -1, false
+ }
+
+ // Reassociate the waker with the sleeper. If the waker was
+ // still asserted we can return it, otherwise try the next one.
+ old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s)))
+ if old == &assertedSleeper {
+ return w.id, true
+ }
+ }
+}
+
+// Done is used to indicate that the caller won't use this Sleeper anymore. It
+// removes the association with all wakers so that they can be safely reused
+// by another sleeper after Done() returns.
+func (s *Sleeper) Done() {
+ // Remove all associations that we can, and build a list of the ones
+ // we could not. An association can be removed right away from waker w
+ // if w.s has a pointer to the sleeper, that is, the waker is not
+ // asserted yet. By atomically switching w.s to nil, we guarantee that
+ // subsequent calls to Assert() on the waker will not result in it being
+ // queued to this sleeper.
+ var pending *Waker
+ w := s.allWakers
+ for w != nil {
+ next := w.allWakersNext
+ for {
+ t := atomic.LoadPointer(&w.s)
+ if t != usleeper(s) {
+ w.allWakersNext = pending
+ pending = w
+ break
+ }
+
+ if atomic.CompareAndSwapPointer(&w.s, t, nil) {
+ break
+ }
+ }
+ w = next
+ }
+
+ // The associations that we could not remove are either asserted, or in
+ // the process of being asserted, or have been asserted and cleared
+ // before being pulled from the sleeper lists. We must wait for them all
+ // to make it to the sleeper lists, so that we know that the wakers
+ // won't do any more work towards waking this sleeper up.
+ for pending != nil {
+ pulled := s.nextWaker(true)
+
+ // Remove the waker we just pulled from the list of associated
+ // wakers.
+ prev := &pending
+ for w := *prev; w != nil; w = *prev {
+ if pulled == w {
+ *prev = w.allWakersNext
+ break
+ }
+ prev = &w.allWakersNext
+ }
+ }
+ s.allWakers = nil
+}
+
+// enqueueAssertedWaker enqueues an asserted waker to the "ready" circular list
+// of wakers that want to notify the sleeper.
+func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
+ // Add the new waker to the front of the list.
+ for {
+ v := (*Waker)(atomic.LoadPointer(&s.sharedList))
+ w.next = v
+ if atomic.CompareAndSwapPointer(&s.sharedList, uwaker(v), uwaker(w)) {
+ break
+ }
+ }
+
+ for {
+ // Nothing to do if there isn't a G waiting.
+ g := atomic.LoadUintptr(&s.waitingG)
+ if g == 0 {
+ return
+ }
+
+ // Signal to the sleeper that a waker has been asserted.
+ if atomic.CompareAndSwapUintptr(&s.waitingG, g, 0) {
+ if g != preparingG {
+ // We managed to get a G. Wake it up.
+ goready(g, 0)
+ }
+ }
+ }
+}
+
+// Waker represents a source of wake-up notifications to be sent to sleepers. A
+// waker can be associated with at most one sleeper at a time, and at any given
+// time is either in asserted or non-asserted state.
+//
+// Once asserted, the waker remains so until it is manually cleared or a sleeper
+// consumes its assertion (i.e., a sleeper wakes up or is prevented from going
+// to sleep due to the waker).
+//
+// This struct is thread-safe, that is, its methods can be called concurrently
+// by multiple goroutines.
+type Waker struct {
+ // s is the sleeper that this waker can wake up. Only one sleeper at a
+ // time is allowed. This field can have three classes of values:
+ // nil -- the waker is not asserted: it either is not associated with
+ // a sleeper, or is queued to a sleeper due to being previously
+ // asserted. This is the zero value.
+ // &assertedSleeper -- the waker is asserted.
+ // otherwise -- the waker is not asserted, and is associated with the
+ // given sleeper. Once it transitions to asserted state, the
+ // associated sleeper will be woken.
+ s unsafe.Pointer
+
+ // next is used to form a linked list of asserted wakers in a sleeper.
+ next *Waker
+
+ // allWakersNext is used to form a linked list of all wakers associated
+ // to a given sleeper.
+ allWakersNext *Waker
+
+ // id is the value to be returned to sleepers when they wake up due to
+ // this waker being asserted.
+ id int
+}
+
+// Assert moves the waker to an asserted state, if it isn't asserted yet. When
+// asserted, the waker will cause its matching sleeper to wake up.
+func (w *Waker) Assert() {
+ // Nothing to do if the waker is already asserted. This check allows us
+ // to complete this case (already asserted) without any interlocked
+ // operations on x86.
+ if atomic.LoadPointer(&w.s) == usleeper(&assertedSleeper) {
+ return
+ }
+
+ // Mark the waker as asserted, and wake up a sleeper if there is one.
+ switch s := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(&assertedSleeper))); s {
+ case nil:
+ case &assertedSleeper:
+ default:
+ s.enqueueAssertedWaker(w)
+ }
+}
+
+// Clear moves the waker to then non-asserted state and returns whether it was
+// asserted before being cleared.
+//
+// N.B. The waker isn't removed from the "ready" list of a sleeper (if it
+// happens to be in one), but the sleeper will notice that it is not asserted
+// anymore and won't return it to the caller.
+func (w *Waker) Clear() bool {
+ // Nothing to do if the waker is not asserted. This check allows us to
+ // complete this case (already not asserted) without any interlocked
+ // operations on x86.
+ if atomic.LoadPointer(&w.s) != usleeper(&assertedSleeper) {
+ return false
+ }
+
+ // Try to store nil in the sleeper, which indicates that the waker is
+ // not asserted.
+ return atomic.CompareAndSwapPointer(&w.s, usleeper(&assertedSleeper), nil)
+}
+
+// IsAsserted returns whether the waker is currently asserted (i.e., if it's
+// currently in a state that would cause its matching sleeper to wake up).
+func (w *Waker) IsAsserted() bool {
+ return (*Sleeper)(atomic.LoadPointer(&w.s)) == &assertedSleeper
+}
+
+func usleeper(s *Sleeper) unsafe.Pointer {
+ return unsafe.Pointer(s)
+}
+
+func uwaker(w *Waker) unsafe.Pointer {
+ return unsafe.Pointer(w)
+}