summaryrefslogtreecommitdiffhomepage
path: root/pkg/syncevent
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/syncevent')
-rw-r--r--pkg/syncevent/BUILD35
-rw-r--r--pkg/syncevent/broadcaster.go220
-rw-r--r--pkg/syncevent/broadcaster_test.go376
-rw-r--r--pkg/syncevent/receiver.go103
-rw-r--r--pkg/syncevent/source.go61
-rw-r--r--pkg/syncevent/syncevent.go32
-rw-r--r--pkg/syncevent/syncevent_example_test.go108
-rw-r--r--pkg/syncevent/waiter_test.go414
-rw-r--r--pkg/syncevent/waiter_unsafe.go197
9 files changed, 0 insertions, 1546 deletions
diff --git a/pkg/syncevent/BUILD b/pkg/syncevent/BUILD
deleted file mode 100644
index 42c553308..000000000
--- a/pkg/syncevent/BUILD
+++ /dev/null
@@ -1,35 +0,0 @@
-load("//tools:defs.bzl", "go_library", "go_test")
-
-licenses(["notice"])
-
-go_library(
- name = "syncevent",
- srcs = [
- "broadcaster.go",
- "receiver.go",
- "source.go",
- "syncevent.go",
- "waiter_unsafe.go",
- ],
- visibility = ["//:sandbox"],
- deps = [
- "//pkg/atomicbitops",
- "//pkg/sync",
- ],
-)
-
-go_test(
- name = "syncevent_test",
- size = "small",
- srcs = [
- "broadcaster_test.go",
- "syncevent_example_test.go",
- "waiter_test.go",
- ],
- library = ":syncevent",
- deps = [
- "//pkg/sleep",
- "//pkg/sync",
- "//pkg/waiter",
- ],
-)
diff --git a/pkg/syncevent/broadcaster.go b/pkg/syncevent/broadcaster.go
deleted file mode 100644
index dabf08895..000000000
--- a/pkg/syncevent/broadcaster.go
+++ /dev/null
@@ -1,220 +0,0 @@
-// Copyright 2020 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncevent
-
-import (
- "gvisor.dev/gvisor/pkg/sync"
-)
-
-// Broadcaster is an implementation of Source that supports any number of
-// subscribed Receivers.
-//
-// The zero value of Broadcaster is valid and has no subscribed Receivers.
-// Broadcaster is not copyable by value.
-//
-// All Broadcaster methods may be called concurrently from multiple goroutines.
-type Broadcaster struct {
- // Broadcaster is implemented as a hash table where keys are assigned by
- // the Broadcaster and returned as SubscriptionIDs, making it safe to use
- // the identity function for hashing. The hash table resolves collisions
- // using linear probing and features Robin Hood insertion and backward
- // shift deletion in order to support a relatively high load factor
- // efficiently, which matters since the cost of Broadcast is linear in the
- // size of the table.
-
- // mu protects the following fields.
- mu sync.Mutex
-
- // Invariants: len(table) is 0 or a power of 2.
- table []broadcasterSlot
-
- // load is the number of entries in table with receiver != nil.
- load int
-
- lastID SubscriptionID
-}
-
-type broadcasterSlot struct {
- // Invariants: If receiver == nil, then filter == NoEvents and id == 0.
- // Otherwise, id != 0.
- receiver *Receiver
- filter Set
- id SubscriptionID
-}
-
-const (
- broadcasterMinNonZeroTableSize = 2 // must be a power of 2 > 1
-
- broadcasterMaxLoadNum = 13
- broadcasterMaxLoadDen = 16
-)
-
-// SubscribeEvents implements Source.SubscribeEvents.
-func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID {
- b.mu.Lock()
-
- // Assign an ID for this subscription.
- b.lastID++
- id := b.lastID
-
- // Expand the table if over the maximum load factor:
- //
- // load / len(b.table) > broadcasterMaxLoadNum / broadcasterMaxLoadDen
- // load * broadcasterMaxLoadDen > broadcasterMaxLoadNum * len(b.table)
- b.load++
- if (b.load * broadcasterMaxLoadDen) > (broadcasterMaxLoadNum * len(b.table)) {
- // Double the number of slots in the new table.
- newlen := broadcasterMinNonZeroTableSize
- if len(b.table) != 0 {
- newlen = 2 * len(b.table)
- }
- if newlen <= cap(b.table) {
- // Reuse excess capacity in the current table, moving entries not
- // already in their first-probed positions to better ones.
- newtable := b.table[:newlen]
- newmask := uint64(newlen - 1)
- for i := range b.table {
- if b.table[i].receiver != nil && uint64(b.table[i].id)&newmask != uint64(i) {
- entry := b.table[i]
- b.table[i] = broadcasterSlot{}
- broadcasterTableInsert(newtable, entry.id, entry.receiver, entry.filter)
- }
- }
- b.table = newtable
- } else {
- newtable := make([]broadcasterSlot, newlen)
- // Copy existing entries to the new table.
- for i := range b.table {
- if b.table[i].receiver != nil {
- broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
- }
- }
- // Switch to the new table.
- b.table = newtable
- }
- }
-
- broadcasterTableInsert(b.table, id, r, filter)
- b.mu.Unlock()
- return id
-}
-
-// Preconditions:
-// * table must not be full.
-// * len(table) is a power of 2.
-func broadcasterTableInsert(table []broadcasterSlot, id SubscriptionID, r *Receiver, filter Set) {
- entry := broadcasterSlot{
- receiver: r,
- filter: filter,
- id: id,
- }
- mask := uint64(len(table) - 1)
- i := uint64(id) & mask
- disp := uint64(0)
- for {
- if table[i].receiver == nil {
- table[i] = entry
- return
- }
- // If we've been displaced farther from our first-probed slot than the
- // element stored in this one, swap elements and switch to inserting
- // the replaced one. (This is Robin Hood insertion.)
- slotDisp := (i - uint64(table[i].id)) & mask
- if disp > slotDisp {
- table[i], entry = entry, table[i]
- disp = slotDisp
- }
- i = (i + 1) & mask
- disp++
- }
-}
-
-// UnsubscribeEvents implements Source.UnsubscribeEvents.
-func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID) {
- b.mu.Lock()
-
- mask := uint64(len(b.table) - 1)
- i := uint64(id) & mask
- for {
- if b.table[i].id == id {
- // Found the element to remove. Move all subsequent elements
- // backward until we either find an empty slot, or an element that
- // is already in its first-probed slot. (This is backward shift
- // deletion.)
- for {
- next := (i + 1) & mask
- if b.table[next].receiver == nil {
- break
- }
- if uint64(b.table[next].id)&mask == next {
- break
- }
- b.table[i] = b.table[next]
- i = next
- }
- b.table[i] = broadcasterSlot{}
- break
- }
- i = (i + 1) & mask
- }
-
- // If a table 1/4 of the current size would still be at or under the
- // maximum load factor (i.e. the current table size is at least two
- // expansions bigger than necessary), halve the size of the table to reduce
- // the cost of Broadcast. Since we are concerned with iteration time and
- // not memory usage, reuse the existing slice to reduce future allocations
- // from table re-expansion.
- b.load--
- if len(b.table) > broadcasterMinNonZeroTableSize && (b.load*(4*broadcasterMaxLoadDen)) <= (broadcasterMaxLoadNum*len(b.table)) {
- newlen := len(b.table) / 2
- newtable := b.table[:newlen]
- for i := newlen; i < len(b.table); i++ {
- if b.table[i].receiver != nil {
- broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
- b.table[i] = broadcasterSlot{}
- }
- }
- b.table = newtable
- }
-
- b.mu.Unlock()
-}
-
-// Broadcast notifies all Receivers subscribed to the Broadcaster of the subset
-// of events to which they subscribed. The order in which Receivers are
-// notified is unspecified.
-func (b *Broadcaster) Broadcast(events Set) {
- b.mu.Lock()
- for i := range b.table {
- if intersection := events & b.table[i].filter; intersection != 0 {
- // We don't need to check if broadcasterSlot.receiver is nil, since
- // if it is then broadcasterSlot.filter is 0.
- b.table[i].receiver.Notify(intersection)
- }
- }
- b.mu.Unlock()
-}
-
-// FilteredEvents returns the set of events for which Broadcast will notify at
-// least one Receiver, i.e. the union of filters for all subscribed Receivers.
-func (b *Broadcaster) FilteredEvents() Set {
- var es Set
- b.mu.Lock()
- for i := range b.table {
- es |= b.table[i].filter
- }
- b.mu.Unlock()
- return es
-}
diff --git a/pkg/syncevent/broadcaster_test.go b/pkg/syncevent/broadcaster_test.go
deleted file mode 100644
index e88779e23..000000000
--- a/pkg/syncevent/broadcaster_test.go
+++ /dev/null
@@ -1,376 +0,0 @@
-// Copyright 2020 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncevent
-
-import (
- "fmt"
- "math/rand"
- "testing"
-
- "gvisor.dev/gvisor/pkg/sync"
- "gvisor.dev/gvisor/pkg/waiter"
-)
-
-func TestBroadcasterFilter(t *testing.T) {
- const numReceivers = 2 * MaxEvents
-
- var br Broadcaster
- ws := make([]Waiter, numReceivers)
- for i := range ws {
- ws[i].Init()
- br.SubscribeEvents(ws[i].Receiver(), 1<<(i%MaxEvents))
- }
- for ev := 0; ev < MaxEvents; ev++ {
- br.Broadcast(1 << ev)
- for i := range ws {
- want := NoEvents
- if i%MaxEvents == ev {
- want = 1 << ev
- }
- if got := ws[i].Receiver().PendingAndAckAll(); got != want {
- t.Errorf("after Broadcast of event %d: waiter %d has pending event set %#x, wanted %#x", ev, i, got, want)
- }
- }
- }
-}
-
-// TestBroadcasterManySubscriptions tests that subscriptions are not lost by
-// table expansion/compaction.
-func TestBroadcasterManySubscriptions(t *testing.T) {
- const numReceivers = 5000 // arbitrary
-
- var br Broadcaster
- ws := make([]Waiter, numReceivers)
- for i := range ws {
- ws[i].Init()
- }
-
- ids := make([]SubscriptionID, numReceivers)
- for i := 0; i < numReceivers; i++ {
- // Subscribe receiver i.
- ids[i] = br.SubscribeEvents(ws[i].Receiver(), 1)
- // Check that receivers [0, i] are subscribed.
- br.Broadcast(1)
- for j := 0; j <= i; j++ {
- if ws[j].Pending() != 1 {
- t.Errorf("receiver %d did not receive an event after subscription of receiver %d", j, i)
- }
- ws[j].Ack(1)
- }
- }
-
- // Generate a random order for unsubscriptions.
- unsub := rand.Perm(numReceivers)
- for i := 0; i < numReceivers; i++ {
- // Unsubscribe receiver unsub[i].
- br.UnsubscribeEvents(ids[unsub[i]])
- // Check that receivers [unsub[0], unsub[i]] are not subscribed, and that
- // receivers (unsub[i], unsub[numReceivers]) are still subscribed.
- br.Broadcast(1)
- for j := 0; j <= i; j++ {
- if ws[unsub[j]].Pending() != 0 {
- t.Errorf("unsub iteration %d: receiver %d received an event after unsubscription of receiver %d", i, unsub[j], unsub[i])
- }
- }
- for j := i + 1; j < numReceivers; j++ {
- if ws[unsub[j]].Pending() != 1 {
- t.Errorf("unsub iteration %d: receiver %d did not receive an event after unsubscription of receiver %d", i, unsub[j], unsub[i])
- }
- ws[unsub[j]].Ack(1)
- }
- }
-}
-
-var (
- receiverCountsNonZero = []int{1, 4, 16, 64}
- receiverCountsIncludingZero = append([]int{0}, receiverCountsNonZero...)
-)
-
-// BenchmarkBroadcasterX, BenchmarkMapX, and BenchmarkQueueX benchmark usage
-// pattern X (described in terms of Broadcaster) with Broadcaster, a
-// Mutex-protected map[*Receiver]Set, and waiter.Queue respectively.
-
-// BenchmarkXxxSubscribeUnsubscribe measures the cost of a Subscribe/Unsubscribe
-// cycle.
-
-func BenchmarkBroadcasterSubscribeUnsubscribe(b *testing.B) {
- var br Broadcaster
- var w Waiter
- w.Init()
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- id := br.SubscribeEvents(w.Receiver(), 1)
- br.UnsubscribeEvents(id)
- }
-}
-
-func BenchmarkMapSubscribeUnsubscribe(b *testing.B) {
- var mu sync.Mutex
- m := make(map[*Receiver]Set)
- var w Waiter
- w.Init()
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- mu.Lock()
- m[w.Receiver()] = Set(1)
- mu.Unlock()
- mu.Lock()
- delete(m, w.Receiver())
- mu.Unlock()
- }
-}
-
-func BenchmarkQueueSubscribeUnsubscribe(b *testing.B) {
- var q waiter.Queue
- e, _ := waiter.NewChannelEntry(nil)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- q.EventRegister(&e, 1)
- q.EventUnregister(&e)
- }
-}
-
-// BenchmarkXxxSubscribeUnsubscribeBatch is similar to
-// BenchmarkXxxSubscribeUnsubscribe, but subscribes and unsubscribes a large
-// number of Receivers at a time in order to measure the amortized overhead of
-// table expansion/compaction. (Since waiter.Queue is implemented using a
-// linked list, BenchmarkQueueSubscribeUnsubscribe and
-// BenchmarkQueueSubscribeUnsubscribeBatch should produce nearly the same
-// result.)
-
-const numBatchReceivers = 1000
-
-func BenchmarkBroadcasterSubscribeUnsubscribeBatch(b *testing.B) {
- var br Broadcaster
- ws := make([]Waiter, numBatchReceivers)
- for i := range ws {
- ws[i].Init()
- }
- ids := make([]SubscriptionID, numBatchReceivers)
-
- // Generate a random order for unsubscriptions.
- unsub := rand.Perm(numBatchReceivers)
-
- b.ResetTimer()
- for i := 0; i < b.N/numBatchReceivers; i++ {
- for j := 0; j < numBatchReceivers; j++ {
- ids[j] = br.SubscribeEvents(ws[j].Receiver(), 1)
- }
- for j := 0; j < numBatchReceivers; j++ {
- br.UnsubscribeEvents(ids[unsub[j]])
- }
- }
-}
-
-func BenchmarkMapSubscribeUnsubscribeBatch(b *testing.B) {
- var mu sync.Mutex
- m := make(map[*Receiver]Set)
- ws := make([]Waiter, numBatchReceivers)
- for i := range ws {
- ws[i].Init()
- }
-
- // Generate a random order for unsubscriptions.
- unsub := rand.Perm(numBatchReceivers)
-
- b.ResetTimer()
- for i := 0; i < b.N/numBatchReceivers; i++ {
- for j := 0; j < numBatchReceivers; j++ {
- mu.Lock()
- m[ws[j].Receiver()] = Set(1)
- mu.Unlock()
- }
- for j := 0; j < numBatchReceivers; j++ {
- mu.Lock()
- delete(m, ws[unsub[j]].Receiver())
- mu.Unlock()
- }
- }
-}
-
-func BenchmarkQueueSubscribeUnsubscribeBatch(b *testing.B) {
- var q waiter.Queue
- es := make([]waiter.Entry, numBatchReceivers)
- for i := range es {
- es[i], _ = waiter.NewChannelEntry(nil)
- }
-
- // Generate a random order for unsubscriptions.
- unsub := rand.Perm(numBatchReceivers)
-
- b.ResetTimer()
- for i := 0; i < b.N/numBatchReceivers; i++ {
- for j := 0; j < numBatchReceivers; j++ {
- q.EventRegister(&es[j], 1)
- }
- for j := 0; j < numBatchReceivers; j++ {
- q.EventUnregister(&es[unsub[j]])
- }
- }
-}
-
-// BenchmarkXxxBroadcastRedundant measures how long it takes to Broadcast
-// already-pending events to multiple Receivers.
-
-func BenchmarkBroadcasterBroadcastRedundant(b *testing.B) {
- for _, n := range receiverCountsIncludingZero {
- b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
- var br Broadcaster
- ws := make([]Waiter, n)
- for i := range ws {
- ws[i].Init()
- br.SubscribeEvents(ws[i].Receiver(), 1)
- }
- br.Broadcast(1)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- br.Broadcast(1)
- }
- })
- }
-}
-
-func BenchmarkMapBroadcastRedundant(b *testing.B) {
- for _, n := range receiverCountsIncludingZero {
- b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
- var mu sync.Mutex
- m := make(map[*Receiver]Set)
- ws := make([]Waiter, n)
- for i := range ws {
- ws[i].Init()
- m[ws[i].Receiver()] = Set(1)
- }
- mu.Lock()
- for r := range m {
- r.Notify(1)
- }
- mu.Unlock()
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- mu.Lock()
- for r := range m {
- r.Notify(1)
- }
- mu.Unlock()
- }
- })
- }
-}
-
-func BenchmarkQueueBroadcastRedundant(b *testing.B) {
- for _, n := range receiverCountsIncludingZero {
- b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
- var q waiter.Queue
- for i := 0; i < n; i++ {
- e, _ := waiter.NewChannelEntry(nil)
- q.EventRegister(&e, 1)
- }
- q.Notify(1)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- q.Notify(1)
- }
- })
- }
-}
-
-// BenchmarkXxxBroadcastAck measures how long it takes to Broadcast events to
-// multiple Receivers, check that all Receivers have received the event, and
-// clear the event from all Receivers.
-
-func BenchmarkBroadcasterBroadcastAck(b *testing.B) {
- for _, n := range receiverCountsNonZero {
- b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
- var br Broadcaster
- ws := make([]Waiter, n)
- for i := range ws {
- ws[i].Init()
- br.SubscribeEvents(ws[i].Receiver(), 1)
- }
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- br.Broadcast(1)
- for j := range ws {
- if got, want := ws[j].Pending(), Set(1); got != want {
- b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want)
- }
- ws[j].Ack(1)
- }
- }
- })
- }
-}
-
-func BenchmarkMapBroadcastAck(b *testing.B) {
- for _, n := range receiverCountsNonZero {
- b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
- var mu sync.Mutex
- m := make(map[*Receiver]Set)
- ws := make([]Waiter, n)
- for i := range ws {
- ws[i].Init()
- m[ws[i].Receiver()] = Set(1)
- }
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- mu.Lock()
- for r := range m {
- r.Notify(1)
- }
- mu.Unlock()
- for j := range ws {
- if got, want := ws[j].Pending(), Set(1); got != want {
- b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want)
- }
- ws[j].Ack(1)
- }
- }
- })
- }
-}
-
-func BenchmarkQueueBroadcastAck(b *testing.B) {
- for _, n := range receiverCountsNonZero {
- b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
- var q waiter.Queue
- chs := make([]chan struct{}, n)
- for i := range chs {
- e, ch := waiter.NewChannelEntry(nil)
- q.EventRegister(&e, 1)
- chs[i] = ch
- }
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- q.Notify(1)
- for _, ch := range chs {
- select {
- case <-ch:
- default:
- b.Fatalf("channel did not receive event")
- }
- }
- }
- })
- }
-}
diff --git a/pkg/syncevent/receiver.go b/pkg/syncevent/receiver.go
deleted file mode 100644
index 5c86e5400..000000000
--- a/pkg/syncevent/receiver.go
+++ /dev/null
@@ -1,103 +0,0 @@
-// Copyright 2020 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncevent
-
-import (
- "sync/atomic"
-
- "gvisor.dev/gvisor/pkg/atomicbitops"
-)
-
-// Receiver is an event sink that holds pending events and invokes a callback
-// whenever new events become pending. Receiver's methods may be called
-// concurrently from multiple goroutines.
-//
-// Receiver.Init() must be called before first use.
-type Receiver struct {
- // pending is the set of pending events. pending is accessed using atomic
- // memory operations.
- pending uint64
-
- // cb is notified when new events become pending. cb is immutable after
- // Init().
- cb ReceiverCallback
-}
-
-// ReceiverCallback receives callbacks from a Receiver.
-type ReceiverCallback interface {
- // NotifyPending is called when the corresponding Receiver has new pending
- // events.
- //
- // NotifyPending is called synchronously from Receiver.Notify(), so
- // implementations must not take locks that may be held by callers of
- // Receiver.Notify(). NotifyPending may be called concurrently from
- // multiple goroutines.
- NotifyPending()
-}
-
-// Init must be called before first use of r.
-func (r *Receiver) Init(cb ReceiverCallback) {
- r.cb = cb
-}
-
-// Pending returns the set of pending events.
-func (r *Receiver) Pending() Set {
- return Set(atomic.LoadUint64(&r.pending))
-}
-
-// Notify sets the given events as pending.
-func (r *Receiver) Notify(es Set) {
- p := Set(atomic.LoadUint64(&r.pending))
- // Optimization: Skip the atomic CAS on r.pending if all events are
- // already pending.
- if p&es == es {
- return
- }
- // When this is uncontended (the common case), CAS is faster than
- // atomic-OR because the former is inlined and the latter (which we
- // implement in assembly ourselves) is not.
- if !atomic.CompareAndSwapUint64(&r.pending, uint64(p), uint64(p|es)) {
- // If the CAS fails, fall back to atomic-OR.
- atomicbitops.OrUint64(&r.pending, uint64(es))
- }
- r.cb.NotifyPending()
-}
-
-// Ack unsets the given events as pending.
-func (r *Receiver) Ack(es Set) {
- p := Set(atomic.LoadUint64(&r.pending))
- // Optimization: Skip the atomic CAS on r.pending if all events are
- // already not pending.
- if p&es == 0 {
- return
- }
- // When this is uncontended (the common case), CAS is faster than
- // atomic-AND because the former is inlined and the latter (which we
- // implement in assembly ourselves) is not.
- if !atomic.CompareAndSwapUint64(&r.pending, uint64(p), uint64(p&^es)) {
- // If the CAS fails, fall back to atomic-AND.
- atomicbitops.AndUint64(&r.pending, ^uint64(es))
- }
-}
-
-// PendingAndAckAll unsets all events as pending and returns the set of
-// previously-pending events.
-//
-// PendingAndAckAll should only be used in preference to a call to Pending
-// followed by a conditional call to Ack when the caller expects events to be
-// pending (e.g. after a call to ReceiverCallback.NotifyPending()).
-func (r *Receiver) PendingAndAckAll() Set {
- return Set(atomic.SwapUint64(&r.pending, 0))
-}
diff --git a/pkg/syncevent/source.go b/pkg/syncevent/source.go
deleted file mode 100644
index d3d0f34c5..000000000
--- a/pkg/syncevent/source.go
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright 2020 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncevent
-
-// Source represents an event source.
-type Source interface {
- // SubscribeEvents causes the Source to notify the given Receiver of the
- // given subset of events.
- //
- // Preconditions:
- // * r != nil.
- // * The ReceiverCallback for r must not take locks that are ordered
- // prior to the Source; for example, it cannot call any Source
- // methods.
- SubscribeEvents(r *Receiver, filter Set) SubscriptionID
-
- // UnsubscribeEvents causes the Source to stop notifying the Receiver
- // subscribed by a previous call to SubscribeEvents that returned the given
- // SubscriptionID.
- //
- // Preconditions: UnsubscribeEvents may be called at most once for any
- // given SubscriptionID.
- UnsubscribeEvents(id SubscriptionID)
-}
-
-// SubscriptionID identifies a call to Source.SubscribeEvents.
-type SubscriptionID uint64
-
-// UnsubscribeAndAck is a convenience function that unsubscribes r from the
-// given events from src and also clears them from r.
-func UnsubscribeAndAck(src Source, r *Receiver, filter Set, id SubscriptionID) {
- src.UnsubscribeEvents(id)
- r.Ack(filter)
-}
-
-// NoopSource implements Source by never sending events to subscribed
-// Receivers.
-type NoopSource struct{}
-
-// SubscribeEvents implements Source.SubscribeEvents.
-func (NoopSource) SubscribeEvents(*Receiver, Set) SubscriptionID {
- return 0
-}
-
-// UnsubscribeEvents implements Source.UnsubscribeEvents.
-func (NoopSource) UnsubscribeEvents(SubscriptionID) {
-}
-
-// See Broadcaster for a non-noop implementations of Source.
diff --git a/pkg/syncevent/syncevent.go b/pkg/syncevent/syncevent.go
deleted file mode 100644
index 9fb6a06de..000000000
--- a/pkg/syncevent/syncevent.go
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright 2020 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package syncevent provides efficient primitives for goroutine
-// synchronization based on event bitmasks.
-package syncevent
-
-// Set is a bitmask where each bit represents a distinct user-defined event.
-// The event package does not treat any bits in Set specially.
-type Set uint64
-
-const (
- // NoEvents is a Set containing no events.
- NoEvents = Set(0)
-
- // AllEvents is a Set containing all possible events.
- AllEvents = ^Set(0)
-
- // MaxEvents is the number of distinct events that can be represented by a Set.
- MaxEvents = 64
-)
diff --git a/pkg/syncevent/syncevent_example_test.go b/pkg/syncevent/syncevent_example_test.go
deleted file mode 100644
index bfb18e2ea..000000000
--- a/pkg/syncevent/syncevent_example_test.go
+++ /dev/null
@@ -1,108 +0,0 @@
-// Copyright 2020 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncevent
-
-import (
- "fmt"
- "sync/atomic"
- "time"
-)
-
-func Example_ioReadinessInterrputible() {
- const (
- evReady = Set(1 << iota)
- evInterrupt
- )
- errNotReady := fmt.Errorf("not ready for I/O")
-
- // State of some I/O object.
- var (
- br Broadcaster
- ready uint32
- )
- doIO := func() error {
- if atomic.LoadUint32(&ready) == 0 {
- return errNotReady
- }
- return nil
- }
- go func() {
- // The I/O object eventually becomes ready for I/O.
- time.Sleep(100 * time.Millisecond)
- // When it does, it first ensures that future calls to isReady() return
- // true, then broadcasts the readiness event to Receivers.
- atomic.StoreUint32(&ready, 1)
- br.Broadcast(evReady)
- }()
-
- // Each user of the I/O object owns a Waiter.
- var w Waiter
- w.Init()
- // The Waiter may be asynchronously interruptible, e.g. for signal
- // handling in the sentry.
- go func() {
- time.Sleep(200 * time.Millisecond)
- w.Receiver().Notify(evInterrupt)
- }()
-
- // To use the I/O object:
- //
- // Optionally, if the I/O object is likely to be ready, attempt I/O first.
- err := doIO()
- if err == nil {
- // Success, we're done.
- return /* nil */
- }
- if err != errNotReady {
- // Failure, I/O failed for some reason other than readiness.
- return /* err */
- }
- // Subscribe for readiness events from the I/O object.
- id := br.SubscribeEvents(w.Receiver(), evReady)
- // When we are finished blocking, unsubscribe from readiness events and
- // remove readiness events from the pending event set.
- defer UnsubscribeAndAck(&br, w.Receiver(), evReady, id)
- for {
- // Attempt I/O again. This must be done after the call to SubscribeEvents,
- // since the I/O object might have become ready between the previous call
- // to doIO and the call to SubscribeEvents.
- err = doIO()
- if err == nil {
- return /* nil */
- }
- if err != errNotReady {
- return /* err */
- }
- // Block until either the I/O object indicates it is ready, or we are
- // interrupted.
- events := w.Wait()
- if events&evInterrupt != 0 {
- // In the specific case of sentry signal handling, signal delivery
- // is handled by another system, so we aren't responsible for
- // acknowledging evInterrupt.
- return /* errInterrupted */
- }
- // Note that, in a concurrent context, the I/O object might become
- // ready and then not ready again. To handle this:
- //
- // - evReady must be acknowledged before calling doIO() again (rather
- // than after), so that if the I/O object becomes ready *again* after
- // the call to doIO(), the readiness event is not lost.
- //
- // - We must loop instead of just calling doIO() once after receiving
- // evReady.
- w.Ack(evReady)
- }
-}
diff --git a/pkg/syncevent/waiter_test.go b/pkg/syncevent/waiter_test.go
deleted file mode 100644
index 3c8cbcdd8..000000000
--- a/pkg/syncevent/waiter_test.go
+++ /dev/null
@@ -1,414 +0,0 @@
-// Copyright 2020 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncevent
-
-import (
- "sync/atomic"
- "testing"
- "time"
-
- "gvisor.dev/gvisor/pkg/sleep"
- "gvisor.dev/gvisor/pkg/sync"
-)
-
-func TestWaiterAlreadyPending(t *testing.T) {
- var w Waiter
- w.Init()
- want := Set(1)
- w.Notify(want)
- if got := w.Wait(); got != want {
- t.Errorf("Waiter.Wait: got %#x, wanted %#x", got, want)
- }
-}
-
-func TestWaiterAsyncNotify(t *testing.T) {
- var w Waiter
- w.Init()
- want := Set(1)
- go func() {
- time.Sleep(100 * time.Millisecond)
- w.Notify(want)
- }()
- if got := w.Wait(); got != want {
- t.Errorf("Waiter.Wait: got %#x, wanted %#x", got, want)
- }
-}
-
-func TestWaiterWaitFor(t *testing.T) {
- var w Waiter
- w.Init()
- evWaited := Set(1)
- evOther := Set(2)
- w.Notify(evOther)
- notifiedEvent := uint32(0)
- go func() {
- time.Sleep(100 * time.Millisecond)
- atomic.StoreUint32(&notifiedEvent, 1)
- w.Notify(evWaited)
- }()
- if got, want := w.WaitFor(evWaited), evWaited|evOther; got != want {
- t.Errorf("Waiter.WaitFor: got %#x, wanted %#x", got, want)
- }
- if atomic.LoadUint32(&notifiedEvent) == 0 {
- t.Errorf("Waiter.WaitFor returned before goroutine notified waited-for event")
- }
-}
-
-func TestWaiterWaitAndAckAll(t *testing.T) {
- var w Waiter
- w.Init()
- w.Notify(AllEvents)
- if got := w.WaitAndAckAll(); got != AllEvents {
- t.Errorf("Waiter.WaitAndAckAll: got %#x, wanted %#x", got, AllEvents)
- }
- if got := w.Pending(); got != NoEvents {
- t.Errorf("Waiter.WaitAndAckAll did not ack all events: got %#x, wanted 0", got)
- }
-}
-
-// BenchmarkWaiterX, BenchmarkSleeperX, and BenchmarkChannelX benchmark usage
-// pattern X (described in terms of Waiter) with Waiter, sleep.Sleeper, and
-// buffered chan struct{} respectively. When the maximum number of event
-// sources is relevant, we use 3 event sources because this is representative
-// of the kernel.Task.block() use case: an interrupt source, a timeout source,
-// and the actual event source being waited on.
-
-// Event set used by most benchmarks.
-const evBench Set = 1
-
-// BenchmarkXxxNotifyRedundant measures how long it takes to notify a Waiter of
-// an event that is already pending.
-
-func BenchmarkWaiterNotifyRedundant(b *testing.B) {
- var w Waiter
- w.Init()
- w.Notify(evBench)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- w.Notify(evBench)
- }
-}
-
-func BenchmarkSleeperNotifyRedundant(b *testing.B) {
- var s sleep.Sleeper
- var w sleep.Waker
- s.AddWaker(&w, 0)
- w.Assert()
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- w.Assert()
- }
-}
-
-func BenchmarkChannelNotifyRedundant(b *testing.B) {
- ch := make(chan struct{}, 1)
- ch <- struct{}{}
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- select {
- case ch <- struct{}{}:
- default:
- }
- }
-}
-
-// BenchmarkXxxNotifyWaitAck measures how long it takes to notify a Waiter an
-// event, return that event using a blocking check, and then unset the event as
-// pending.
-
-func BenchmarkWaiterNotifyWaitAck(b *testing.B) {
- var w Waiter
- w.Init()
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- w.Notify(evBench)
- w.Wait()
- w.Ack(evBench)
- }
-}
-
-func BenchmarkSleeperNotifyWaitAck(b *testing.B) {
- var s sleep.Sleeper
- var w sleep.Waker
- s.AddWaker(&w, 0)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- w.Assert()
- s.Fetch(true)
- }
-}
-
-func BenchmarkChannelNotifyWaitAck(b *testing.B) {
- ch := make(chan struct{}, 1)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- // notify
- select {
- case ch <- struct{}{}:
- default:
- }
-
- // wait + ack
- <-ch
- }
-}
-
-// BenchmarkSleeperMultiNotifyWaitAck is equivalent to
-// BenchmarkSleeperNotifyWaitAck, but also includes allocation of a
-// temporary sleep.Waker. This is necessary when multiple goroutines may wait
-// for the same event, since each sleep.Waker can wake only a single
-// sleep.Sleeper.
-//
-// The syncevent package does not require a distinct object for each
-// waiter-waker relationship, so BenchmarkWaiterNotifyWaitAck and
-// BenchmarkWaiterMultiNotifyWaitAck would be identical. The analogous state
-// for channels, runtime.sudog, is inescapably runtime-allocated, so
-// BenchmarkChannelNotifyWaitAck and BenchmarkChannelMultiNotifyWaitAck would
-// also be identical.
-
-func BenchmarkSleeperMultiNotifyWaitAck(b *testing.B) {
- var s sleep.Sleeper
- // The sleep package doesn't provide sync.Pool allocation of Wakers;
- // we do for a fairer comparison.
- wakerPool := sync.Pool{
- New: func() interface{} {
- return &sleep.Waker{}
- },
- }
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- w := wakerPool.Get().(*sleep.Waker)
- s.AddWaker(w, 0)
- w.Assert()
- s.Fetch(true)
- s.Done()
- wakerPool.Put(w)
- }
-}
-
-// BenchmarkXxxTempNotifyWaitAck is equivalent to NotifyWaitAck, but also
-// includes allocation of a temporary Waiter. This models the case where a
-// goroutine not already associated with a Waiter needs one in order to block.
-//
-// The analogous state for channels is built into runtime.g, so
-// BenchmarkChannelNotifyWaitAck and BenchmarkChannelTempNotifyWaitAck would be
-// identical.
-
-func BenchmarkWaiterTempNotifyWaitAck(b *testing.B) {
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- w := GetWaiter()
- w.Notify(evBench)
- w.Wait()
- w.Ack(evBench)
- PutWaiter(w)
- }
-}
-
-func BenchmarkSleeperTempNotifyWaitAck(b *testing.B) {
- // The sleep package doesn't provide sync.Pool allocation of Sleepers;
- // we do for a fairer comparison.
- sleeperPool := sync.Pool{
- New: func() interface{} {
- return &sleep.Sleeper{}
- },
- }
- var w sleep.Waker
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- s := sleeperPool.Get().(*sleep.Sleeper)
- s.AddWaker(&w, 0)
- w.Assert()
- s.Fetch(true)
- s.Done()
- sleeperPool.Put(s)
- }
-}
-
-// BenchmarkXxxNotifyWaitMultiAck is equivalent to NotifyWaitAck, but allows
-// for multiple event sources.
-
-func BenchmarkWaiterNotifyWaitMultiAck(b *testing.B) {
- var w Waiter
- w.Init()
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- w.Notify(evBench)
- if e := w.Wait(); e != evBench {
- b.Fatalf("Wait: got %#x, wanted %#x", e, evBench)
- }
- w.Ack(evBench)
- }
-}
-
-func BenchmarkSleeperNotifyWaitMultiAck(b *testing.B) {
- var s sleep.Sleeper
- var ws [3]sleep.Waker
- for i := range ws {
- s.AddWaker(&ws[i], i)
- }
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- ws[0].Assert()
- if id, _ := s.Fetch(true); id != 0 {
- b.Fatalf("Fetch: got %d, wanted 0", id)
- }
- }
-}
-
-func BenchmarkChannelNotifyWaitMultiAck(b *testing.B) {
- ch0 := make(chan struct{}, 1)
- ch1 := make(chan struct{}, 1)
- ch2 := make(chan struct{}, 1)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- // notify
- select {
- case ch0 <- struct{}{}:
- default:
- }
-
- // wait + clear
- select {
- case <-ch0:
- // ok
- case <-ch1:
- b.Fatalf("received from ch1")
- case <-ch2:
- b.Fatalf("received from ch2")
- }
- }
-}
-
-// BenchmarkXxxNotifyAsyncWaitAck measures how long it takes to wait for an
-// event while another goroutine signals the event. This assumes that a new
-// goroutine doesn't run immediately (i.e. the creator of a new goroutine is
-// allowed to go to sleep before the new goroutine has a chance to run).
-
-func BenchmarkWaiterNotifyAsyncWaitAck(b *testing.B) {
- var w Waiter
- w.Init()
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- go func() {
- w.Notify(1)
- }()
- w.Wait()
- w.Ack(evBench)
- }
-}
-
-func BenchmarkSleeperNotifyAsyncWaitAck(b *testing.B) {
- var s sleep.Sleeper
- var w sleep.Waker
- s.AddWaker(&w, 0)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- go func() {
- w.Assert()
- }()
- s.Fetch(true)
- }
-}
-
-func BenchmarkChannelNotifyAsyncWaitAck(b *testing.B) {
- ch := make(chan struct{}, 1)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- go func() {
- select {
- case ch <- struct{}{}:
- default:
- }
- }()
- <-ch
- }
-}
-
-// BenchmarkXxxNotifyAsyncWaitMultiAck is equivalent to NotifyAsyncWaitAck, but
-// allows for multiple event sources.
-
-func BenchmarkWaiterNotifyAsyncWaitMultiAck(b *testing.B) {
- var w Waiter
- w.Init()
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- go func() {
- w.Notify(evBench)
- }()
- if e := w.Wait(); e != evBench {
- b.Fatalf("Wait: got %#x, wanted %#x", e, evBench)
- }
- w.Ack(evBench)
- }
-}
-
-func BenchmarkSleeperNotifyAsyncWaitMultiAck(b *testing.B) {
- var s sleep.Sleeper
- var ws [3]sleep.Waker
- for i := range ws {
- s.AddWaker(&ws[i], i)
- }
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- go func() {
- ws[0].Assert()
- }()
- if id, _ := s.Fetch(true); id != 0 {
- b.Fatalf("Fetch: got %d, expected 0", id)
- }
- }
-}
-
-func BenchmarkChannelNotifyAsyncWaitMultiAck(b *testing.B) {
- ch0 := make(chan struct{}, 1)
- ch1 := make(chan struct{}, 1)
- ch2 := make(chan struct{}, 1)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- go func() {
- select {
- case ch0 <- struct{}{}:
- default:
- }
- }()
-
- select {
- case <-ch0:
- // ok
- case <-ch1:
- b.Fatalf("received from ch1")
- case <-ch2:
- b.Fatalf("received from ch2")
- }
- }
-}
diff --git a/pkg/syncevent/waiter_unsafe.go b/pkg/syncevent/waiter_unsafe.go
deleted file mode 100644
index b6ed2852d..000000000
--- a/pkg/syncevent/waiter_unsafe.go
+++ /dev/null
@@ -1,197 +0,0 @@
-// Copyright 2020 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncevent
-
-import (
- "sync/atomic"
- "unsafe"
-
- "gvisor.dev/gvisor/pkg/sync"
-)
-
-// Waiter allows a goroutine to block on pending events received by a Receiver.
-//
-// Waiter.Init() must be called before first use.
-type Waiter struct {
- r Receiver
-
- // g is one of:
- //
- // - 0: No goroutine is blocking in Wait.
- //
- // - preparingG: A goroutine is in Wait preparing to sleep, but hasn't yet
- // completed waiterUnlock(). Thus the wait can only be interrupted by
- // replacing the value of g with 0 (the G may not be in state Gwaiting yet,
- // so we can't call goready.)
- //
- // - Otherwise: g is a pointer to the runtime.g in state Gwaiting for the
- // goroutine blocked in Wait, which can only be woken by calling goready.
- g uintptr `state:"zerovalue"`
-}
-
-const preparingG = 1
-
-// Init must be called before first use of w.
-func (w *Waiter) Init() {
- w.r.Init(w)
-}
-
-// Receiver returns the Receiver that receives events that unblock calls to
-// w.Wait().
-func (w *Waiter) Receiver() *Receiver {
- return &w.r
-}
-
-// Pending returns the set of pending events.
-func (w *Waiter) Pending() Set {
- return w.r.Pending()
-}
-
-// Wait blocks until at least one event is pending, then returns the set of
-// pending events. It does not affect the set of pending events; callers must
-// call w.Ack() to do so, or use w.WaitAndAck() instead.
-//
-// Precondition: Only one goroutine may call any Wait* method at a time.
-func (w *Waiter) Wait() Set {
- return w.WaitFor(AllEvents)
-}
-
-// WaitFor blocks until at least one event in es is pending, then returns the
-// set of pending events (including those not in es). It does not affect the
-// set of pending events; callers must call w.Ack() to do so.
-//
-// Precondition: Only one goroutine may call any Wait* method at a time.
-func (w *Waiter) WaitFor(es Set) Set {
- for {
- // Optimization: Skip the atomic store to w.g if an event is already
- // pending.
- if p := w.r.Pending(); p&es != NoEvents {
- return p
- }
-
- // Indicate that we're preparing to go to sleep.
- atomic.StoreUintptr(&w.g, preparingG)
-
- // If an event is pending, abort the sleep.
- if p := w.r.Pending(); p&es != NoEvents {
- atomic.StoreUintptr(&w.g, 0)
- return p
- }
-
- // If w.g is still preparingG (i.e. w.NotifyPending() has not been
- // called or has not reached atomic.SwapUintptr()), go to sleep until
- // w.NotifyPending() => goready().
- sync.Gopark(waiterCommit, unsafe.Pointer(&w.g), sync.WaitReasonSelect, sync.TraceEvGoBlockSelect, 0)
- }
-}
-
-//go:norace
-//go:nosplit
-func waiterCommit(g uintptr, wg unsafe.Pointer) bool {
- // The only way this CAS can fail is if a call to Waiter.NotifyPending()
- // has replaced *wg with nil, in which case we should not sleep.
- return sync.RaceUncheckedAtomicCompareAndSwapUintptr((*uintptr)(wg), preparingG, g)
-}
-
-// Ack marks the given events as not pending.
-func (w *Waiter) Ack(es Set) {
- w.r.Ack(es)
-}
-
-// WaitAndAckAll blocks until at least one event is pending, then marks all
-// events as not pending and returns the set of previously-pending events.
-//
-// Precondition: Only one goroutine may call any Wait* method at a time.
-func (w *Waiter) WaitAndAckAll() Set {
- // Optimization: Skip the atomic store to w.g if an event is already
- // pending. Call Pending() first since, in the common case that events are
- // not yet pending, this skips an atomic swap on w.r.pending.
- if w.r.Pending() != NoEvents {
- if p := w.r.PendingAndAckAll(); p != NoEvents {
- return p
- }
- }
-
- for {
- // Indicate that we're preparing to go to sleep.
- atomic.StoreUintptr(&w.g, preparingG)
-
- // If an event is pending, abort the sleep.
- if w.r.Pending() != NoEvents {
- if p := w.r.PendingAndAckAll(); p != NoEvents {
- atomic.StoreUintptr(&w.g, 0)
- return p
- }
- }
-
- // If w.g is still preparingG (i.e. w.NotifyPending() has not been
- // called or has not reached atomic.SwapUintptr()), go to sleep until
- // w.NotifyPending() => goready().
- sync.Gopark(waiterCommit, unsafe.Pointer(&w.g), sync.WaitReasonSelect, sync.TraceEvGoBlockSelect, 0)
-
- // Check for pending events. We call PendingAndAckAll() directly now since
- // we only expect to be woken after events become pending.
- if p := w.r.PendingAndAckAll(); p != NoEvents {
- return p
- }
- }
-}
-
-// Notify marks the given events as pending, possibly unblocking concurrent
-// calls to w.Wait() or w.WaitFor().
-func (w *Waiter) Notify(es Set) {
- w.r.Notify(es)
-}
-
-// NotifyPending implements ReceiverCallback.NotifyPending. Users of Waiter
-// should not call NotifyPending.
-func (w *Waiter) NotifyPending() {
- // Optimization: Skip the atomic swap on w.g if there is no sleeping
- // goroutine. NotifyPending is called after w.r.Pending() is updated, so
- // concurrent and future calls to w.Wait() will observe pending events and
- // abort sleeping.
- if atomic.LoadUintptr(&w.g) == 0 {
- return
- }
- // Wake a sleeping G, or prevent a G that is preparing to sleep from doing
- // so. Swap is needed here to ensure that only one call to NotifyPending
- // calls goready.
- if g := atomic.SwapUintptr(&w.g, 0); g > preparingG {
- sync.Goready(g, 0)
- }
-}
-
-var waiterPool = sync.Pool{
- New: func() interface{} {
- w := &Waiter{}
- w.Init()
- return w
- },
-}
-
-// GetWaiter returns an unused Waiter. PutWaiter should be called to release
-// the Waiter once it is no longer needed.
-//
-// Where possible, users should prefer to associate each goroutine that calls
-// Waiter.Wait() with a distinct pre-allocated Waiter to avoid allocation of
-// Waiters in hot paths.
-func GetWaiter() *Waiter {
- return waiterPool.Get().(*Waiter)
-}
-
-// PutWaiter releases an unused Waiter previously returned by GetWaiter.
-func PutWaiter(w *Waiter) {
- waiterPool.Put(w)
-}