diff options
Diffstat (limited to 'pkg/syncevent')
-rw-r--r-- | pkg/syncevent/BUILD | 35 | ||||
-rw-r--r-- | pkg/syncevent/broadcaster.go | 220 | ||||
-rw-r--r-- | pkg/syncevent/broadcaster_test.go | 376 | ||||
-rw-r--r-- | pkg/syncevent/receiver.go | 103 | ||||
-rw-r--r-- | pkg/syncevent/source.go | 61 | ||||
-rw-r--r-- | pkg/syncevent/syncevent.go | 32 | ||||
-rw-r--r-- | pkg/syncevent/syncevent_example_test.go | 108 | ||||
-rw-r--r-- | pkg/syncevent/waiter_test.go | 414 | ||||
-rw-r--r-- | pkg/syncevent/waiter_unsafe.go | 197 |
9 files changed, 0 insertions, 1546 deletions
diff --git a/pkg/syncevent/BUILD b/pkg/syncevent/BUILD deleted file mode 100644 index 42c553308..000000000 --- a/pkg/syncevent/BUILD +++ /dev/null @@ -1,35 +0,0 @@ -load("//tools:defs.bzl", "go_library", "go_test") - -licenses(["notice"]) - -go_library( - name = "syncevent", - srcs = [ - "broadcaster.go", - "receiver.go", - "source.go", - "syncevent.go", - "waiter_unsafe.go", - ], - visibility = ["//:sandbox"], - deps = [ - "//pkg/atomicbitops", - "//pkg/sync", - ], -) - -go_test( - name = "syncevent_test", - size = "small", - srcs = [ - "broadcaster_test.go", - "syncevent_example_test.go", - "waiter_test.go", - ], - library = ":syncevent", - deps = [ - "//pkg/sleep", - "//pkg/sync", - "//pkg/waiter", - ], -) diff --git a/pkg/syncevent/broadcaster.go b/pkg/syncevent/broadcaster.go deleted file mode 100644 index dabf08895..000000000 --- a/pkg/syncevent/broadcaster.go +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2020 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package syncevent - -import ( - "gvisor.dev/gvisor/pkg/sync" -) - -// Broadcaster is an implementation of Source that supports any number of -// subscribed Receivers. -// -// The zero value of Broadcaster is valid and has no subscribed Receivers. -// Broadcaster is not copyable by value. -// -// All Broadcaster methods may be called concurrently from multiple goroutines. -type Broadcaster struct { - // Broadcaster is implemented as a hash table where keys are assigned by - // the Broadcaster and returned as SubscriptionIDs, making it safe to use - // the identity function for hashing. The hash table resolves collisions - // using linear probing and features Robin Hood insertion and backward - // shift deletion in order to support a relatively high load factor - // efficiently, which matters since the cost of Broadcast is linear in the - // size of the table. - - // mu protects the following fields. - mu sync.Mutex - - // Invariants: len(table) is 0 or a power of 2. - table []broadcasterSlot - - // load is the number of entries in table with receiver != nil. - load int - - lastID SubscriptionID -} - -type broadcasterSlot struct { - // Invariants: If receiver == nil, then filter == NoEvents and id == 0. - // Otherwise, id != 0. - receiver *Receiver - filter Set - id SubscriptionID -} - -const ( - broadcasterMinNonZeroTableSize = 2 // must be a power of 2 > 1 - - broadcasterMaxLoadNum = 13 - broadcasterMaxLoadDen = 16 -) - -// SubscribeEvents implements Source.SubscribeEvents. -func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID { - b.mu.Lock() - - // Assign an ID for this subscription. - b.lastID++ - id := b.lastID - - // Expand the table if over the maximum load factor: - // - // load / len(b.table) > broadcasterMaxLoadNum / broadcasterMaxLoadDen - // load * broadcasterMaxLoadDen > broadcasterMaxLoadNum * len(b.table) - b.load++ - if (b.load * broadcasterMaxLoadDen) > (broadcasterMaxLoadNum * len(b.table)) { - // Double the number of slots in the new table. - newlen := broadcasterMinNonZeroTableSize - if len(b.table) != 0 { - newlen = 2 * len(b.table) - } - if newlen <= cap(b.table) { - // Reuse excess capacity in the current table, moving entries not - // already in their first-probed positions to better ones. - newtable := b.table[:newlen] - newmask := uint64(newlen - 1) - for i := range b.table { - if b.table[i].receiver != nil && uint64(b.table[i].id)&newmask != uint64(i) { - entry := b.table[i] - b.table[i] = broadcasterSlot{} - broadcasterTableInsert(newtable, entry.id, entry.receiver, entry.filter) - } - } - b.table = newtable - } else { - newtable := make([]broadcasterSlot, newlen) - // Copy existing entries to the new table. - for i := range b.table { - if b.table[i].receiver != nil { - broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter) - } - } - // Switch to the new table. - b.table = newtable - } - } - - broadcasterTableInsert(b.table, id, r, filter) - b.mu.Unlock() - return id -} - -// Preconditions: -// * table must not be full. -// * len(table) is a power of 2. -func broadcasterTableInsert(table []broadcasterSlot, id SubscriptionID, r *Receiver, filter Set) { - entry := broadcasterSlot{ - receiver: r, - filter: filter, - id: id, - } - mask := uint64(len(table) - 1) - i := uint64(id) & mask - disp := uint64(0) - for { - if table[i].receiver == nil { - table[i] = entry - return - } - // If we've been displaced farther from our first-probed slot than the - // element stored in this one, swap elements and switch to inserting - // the replaced one. (This is Robin Hood insertion.) - slotDisp := (i - uint64(table[i].id)) & mask - if disp > slotDisp { - table[i], entry = entry, table[i] - disp = slotDisp - } - i = (i + 1) & mask - disp++ - } -} - -// UnsubscribeEvents implements Source.UnsubscribeEvents. -func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID) { - b.mu.Lock() - - mask := uint64(len(b.table) - 1) - i := uint64(id) & mask - for { - if b.table[i].id == id { - // Found the element to remove. Move all subsequent elements - // backward until we either find an empty slot, or an element that - // is already in its first-probed slot. (This is backward shift - // deletion.) - for { - next := (i + 1) & mask - if b.table[next].receiver == nil { - break - } - if uint64(b.table[next].id)&mask == next { - break - } - b.table[i] = b.table[next] - i = next - } - b.table[i] = broadcasterSlot{} - break - } - i = (i + 1) & mask - } - - // If a table 1/4 of the current size would still be at or under the - // maximum load factor (i.e. the current table size is at least two - // expansions bigger than necessary), halve the size of the table to reduce - // the cost of Broadcast. Since we are concerned with iteration time and - // not memory usage, reuse the existing slice to reduce future allocations - // from table re-expansion. - b.load-- - if len(b.table) > broadcasterMinNonZeroTableSize && (b.load*(4*broadcasterMaxLoadDen)) <= (broadcasterMaxLoadNum*len(b.table)) { - newlen := len(b.table) / 2 - newtable := b.table[:newlen] - for i := newlen; i < len(b.table); i++ { - if b.table[i].receiver != nil { - broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter) - b.table[i] = broadcasterSlot{} - } - } - b.table = newtable - } - - b.mu.Unlock() -} - -// Broadcast notifies all Receivers subscribed to the Broadcaster of the subset -// of events to which they subscribed. The order in which Receivers are -// notified is unspecified. -func (b *Broadcaster) Broadcast(events Set) { - b.mu.Lock() - for i := range b.table { - if intersection := events & b.table[i].filter; intersection != 0 { - // We don't need to check if broadcasterSlot.receiver is nil, since - // if it is then broadcasterSlot.filter is 0. - b.table[i].receiver.Notify(intersection) - } - } - b.mu.Unlock() -} - -// FilteredEvents returns the set of events for which Broadcast will notify at -// least one Receiver, i.e. the union of filters for all subscribed Receivers. -func (b *Broadcaster) FilteredEvents() Set { - var es Set - b.mu.Lock() - for i := range b.table { - es |= b.table[i].filter - } - b.mu.Unlock() - return es -} diff --git a/pkg/syncevent/broadcaster_test.go b/pkg/syncevent/broadcaster_test.go deleted file mode 100644 index e88779e23..000000000 --- a/pkg/syncevent/broadcaster_test.go +++ /dev/null @@ -1,376 +0,0 @@ -// Copyright 2020 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package syncevent - -import ( - "fmt" - "math/rand" - "testing" - - "gvisor.dev/gvisor/pkg/sync" - "gvisor.dev/gvisor/pkg/waiter" -) - -func TestBroadcasterFilter(t *testing.T) { - const numReceivers = 2 * MaxEvents - - var br Broadcaster - ws := make([]Waiter, numReceivers) - for i := range ws { - ws[i].Init() - br.SubscribeEvents(ws[i].Receiver(), 1<<(i%MaxEvents)) - } - for ev := 0; ev < MaxEvents; ev++ { - br.Broadcast(1 << ev) - for i := range ws { - want := NoEvents - if i%MaxEvents == ev { - want = 1 << ev - } - if got := ws[i].Receiver().PendingAndAckAll(); got != want { - t.Errorf("after Broadcast of event %d: waiter %d has pending event set %#x, wanted %#x", ev, i, got, want) - } - } - } -} - -// TestBroadcasterManySubscriptions tests that subscriptions are not lost by -// table expansion/compaction. -func TestBroadcasterManySubscriptions(t *testing.T) { - const numReceivers = 5000 // arbitrary - - var br Broadcaster - ws := make([]Waiter, numReceivers) - for i := range ws { - ws[i].Init() - } - - ids := make([]SubscriptionID, numReceivers) - for i := 0; i < numReceivers; i++ { - // Subscribe receiver i. - ids[i] = br.SubscribeEvents(ws[i].Receiver(), 1) - // Check that receivers [0, i] are subscribed. - br.Broadcast(1) - for j := 0; j <= i; j++ { - if ws[j].Pending() != 1 { - t.Errorf("receiver %d did not receive an event after subscription of receiver %d", j, i) - } - ws[j].Ack(1) - } - } - - // Generate a random order for unsubscriptions. - unsub := rand.Perm(numReceivers) - for i := 0; i < numReceivers; i++ { - // Unsubscribe receiver unsub[i]. - br.UnsubscribeEvents(ids[unsub[i]]) - // Check that receivers [unsub[0], unsub[i]] are not subscribed, and that - // receivers (unsub[i], unsub[numReceivers]) are still subscribed. - br.Broadcast(1) - for j := 0; j <= i; j++ { - if ws[unsub[j]].Pending() != 0 { - t.Errorf("unsub iteration %d: receiver %d received an event after unsubscription of receiver %d", i, unsub[j], unsub[i]) - } - } - for j := i + 1; j < numReceivers; j++ { - if ws[unsub[j]].Pending() != 1 { - t.Errorf("unsub iteration %d: receiver %d did not receive an event after unsubscription of receiver %d", i, unsub[j], unsub[i]) - } - ws[unsub[j]].Ack(1) - } - } -} - -var ( - receiverCountsNonZero = []int{1, 4, 16, 64} - receiverCountsIncludingZero = append([]int{0}, receiverCountsNonZero...) -) - -// BenchmarkBroadcasterX, BenchmarkMapX, and BenchmarkQueueX benchmark usage -// pattern X (described in terms of Broadcaster) with Broadcaster, a -// Mutex-protected map[*Receiver]Set, and waiter.Queue respectively. - -// BenchmarkXxxSubscribeUnsubscribe measures the cost of a Subscribe/Unsubscribe -// cycle. - -func BenchmarkBroadcasterSubscribeUnsubscribe(b *testing.B) { - var br Broadcaster - var w Waiter - w.Init() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - id := br.SubscribeEvents(w.Receiver(), 1) - br.UnsubscribeEvents(id) - } -} - -func BenchmarkMapSubscribeUnsubscribe(b *testing.B) { - var mu sync.Mutex - m := make(map[*Receiver]Set) - var w Waiter - w.Init() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - mu.Lock() - m[w.Receiver()] = Set(1) - mu.Unlock() - mu.Lock() - delete(m, w.Receiver()) - mu.Unlock() - } -} - -func BenchmarkQueueSubscribeUnsubscribe(b *testing.B) { - var q waiter.Queue - e, _ := waiter.NewChannelEntry(nil) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - q.EventRegister(&e, 1) - q.EventUnregister(&e) - } -} - -// BenchmarkXxxSubscribeUnsubscribeBatch is similar to -// BenchmarkXxxSubscribeUnsubscribe, but subscribes and unsubscribes a large -// number of Receivers at a time in order to measure the amortized overhead of -// table expansion/compaction. (Since waiter.Queue is implemented using a -// linked list, BenchmarkQueueSubscribeUnsubscribe and -// BenchmarkQueueSubscribeUnsubscribeBatch should produce nearly the same -// result.) - -const numBatchReceivers = 1000 - -func BenchmarkBroadcasterSubscribeUnsubscribeBatch(b *testing.B) { - var br Broadcaster - ws := make([]Waiter, numBatchReceivers) - for i := range ws { - ws[i].Init() - } - ids := make([]SubscriptionID, numBatchReceivers) - - // Generate a random order for unsubscriptions. - unsub := rand.Perm(numBatchReceivers) - - b.ResetTimer() - for i := 0; i < b.N/numBatchReceivers; i++ { - for j := 0; j < numBatchReceivers; j++ { - ids[j] = br.SubscribeEvents(ws[j].Receiver(), 1) - } - for j := 0; j < numBatchReceivers; j++ { - br.UnsubscribeEvents(ids[unsub[j]]) - } - } -} - -func BenchmarkMapSubscribeUnsubscribeBatch(b *testing.B) { - var mu sync.Mutex - m := make(map[*Receiver]Set) - ws := make([]Waiter, numBatchReceivers) - for i := range ws { - ws[i].Init() - } - - // Generate a random order for unsubscriptions. - unsub := rand.Perm(numBatchReceivers) - - b.ResetTimer() - for i := 0; i < b.N/numBatchReceivers; i++ { - for j := 0; j < numBatchReceivers; j++ { - mu.Lock() - m[ws[j].Receiver()] = Set(1) - mu.Unlock() - } - for j := 0; j < numBatchReceivers; j++ { - mu.Lock() - delete(m, ws[unsub[j]].Receiver()) - mu.Unlock() - } - } -} - -func BenchmarkQueueSubscribeUnsubscribeBatch(b *testing.B) { - var q waiter.Queue - es := make([]waiter.Entry, numBatchReceivers) - for i := range es { - es[i], _ = waiter.NewChannelEntry(nil) - } - - // Generate a random order for unsubscriptions. - unsub := rand.Perm(numBatchReceivers) - - b.ResetTimer() - for i := 0; i < b.N/numBatchReceivers; i++ { - for j := 0; j < numBatchReceivers; j++ { - q.EventRegister(&es[j], 1) - } - for j := 0; j < numBatchReceivers; j++ { - q.EventUnregister(&es[unsub[j]]) - } - } -} - -// BenchmarkXxxBroadcastRedundant measures how long it takes to Broadcast -// already-pending events to multiple Receivers. - -func BenchmarkBroadcasterBroadcastRedundant(b *testing.B) { - for _, n := range receiverCountsIncludingZero { - b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { - var br Broadcaster - ws := make([]Waiter, n) - for i := range ws { - ws[i].Init() - br.SubscribeEvents(ws[i].Receiver(), 1) - } - br.Broadcast(1) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - br.Broadcast(1) - } - }) - } -} - -func BenchmarkMapBroadcastRedundant(b *testing.B) { - for _, n := range receiverCountsIncludingZero { - b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { - var mu sync.Mutex - m := make(map[*Receiver]Set) - ws := make([]Waiter, n) - for i := range ws { - ws[i].Init() - m[ws[i].Receiver()] = Set(1) - } - mu.Lock() - for r := range m { - r.Notify(1) - } - mu.Unlock() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - mu.Lock() - for r := range m { - r.Notify(1) - } - mu.Unlock() - } - }) - } -} - -func BenchmarkQueueBroadcastRedundant(b *testing.B) { - for _, n := range receiverCountsIncludingZero { - b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { - var q waiter.Queue - for i := 0; i < n; i++ { - e, _ := waiter.NewChannelEntry(nil) - q.EventRegister(&e, 1) - } - q.Notify(1) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - q.Notify(1) - } - }) - } -} - -// BenchmarkXxxBroadcastAck measures how long it takes to Broadcast events to -// multiple Receivers, check that all Receivers have received the event, and -// clear the event from all Receivers. - -func BenchmarkBroadcasterBroadcastAck(b *testing.B) { - for _, n := range receiverCountsNonZero { - b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { - var br Broadcaster - ws := make([]Waiter, n) - for i := range ws { - ws[i].Init() - br.SubscribeEvents(ws[i].Receiver(), 1) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - br.Broadcast(1) - for j := range ws { - if got, want := ws[j].Pending(), Set(1); got != want { - b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want) - } - ws[j].Ack(1) - } - } - }) - } -} - -func BenchmarkMapBroadcastAck(b *testing.B) { - for _, n := range receiverCountsNonZero { - b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { - var mu sync.Mutex - m := make(map[*Receiver]Set) - ws := make([]Waiter, n) - for i := range ws { - ws[i].Init() - m[ws[i].Receiver()] = Set(1) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - mu.Lock() - for r := range m { - r.Notify(1) - } - mu.Unlock() - for j := range ws { - if got, want := ws[j].Pending(), Set(1); got != want { - b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want) - } - ws[j].Ack(1) - } - } - }) - } -} - -func BenchmarkQueueBroadcastAck(b *testing.B) { - for _, n := range receiverCountsNonZero { - b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { - var q waiter.Queue - chs := make([]chan struct{}, n) - for i := range chs { - e, ch := waiter.NewChannelEntry(nil) - q.EventRegister(&e, 1) - chs[i] = ch - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - q.Notify(1) - for _, ch := range chs { - select { - case <-ch: - default: - b.Fatalf("channel did not receive event") - } - } - } - }) - } -} diff --git a/pkg/syncevent/receiver.go b/pkg/syncevent/receiver.go deleted file mode 100644 index 5c86e5400..000000000 --- a/pkg/syncevent/receiver.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2020 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package syncevent - -import ( - "sync/atomic" - - "gvisor.dev/gvisor/pkg/atomicbitops" -) - -// Receiver is an event sink that holds pending events and invokes a callback -// whenever new events become pending. Receiver's methods may be called -// concurrently from multiple goroutines. -// -// Receiver.Init() must be called before first use. -type Receiver struct { - // pending is the set of pending events. pending is accessed using atomic - // memory operations. - pending uint64 - - // cb is notified when new events become pending. cb is immutable after - // Init(). - cb ReceiverCallback -} - -// ReceiverCallback receives callbacks from a Receiver. -type ReceiverCallback interface { - // NotifyPending is called when the corresponding Receiver has new pending - // events. - // - // NotifyPending is called synchronously from Receiver.Notify(), so - // implementations must not take locks that may be held by callers of - // Receiver.Notify(). NotifyPending may be called concurrently from - // multiple goroutines. - NotifyPending() -} - -// Init must be called before first use of r. -func (r *Receiver) Init(cb ReceiverCallback) { - r.cb = cb -} - -// Pending returns the set of pending events. -func (r *Receiver) Pending() Set { - return Set(atomic.LoadUint64(&r.pending)) -} - -// Notify sets the given events as pending. -func (r *Receiver) Notify(es Set) { - p := Set(atomic.LoadUint64(&r.pending)) - // Optimization: Skip the atomic CAS on r.pending if all events are - // already pending. - if p&es == es { - return - } - // When this is uncontended (the common case), CAS is faster than - // atomic-OR because the former is inlined and the latter (which we - // implement in assembly ourselves) is not. - if !atomic.CompareAndSwapUint64(&r.pending, uint64(p), uint64(p|es)) { - // If the CAS fails, fall back to atomic-OR. - atomicbitops.OrUint64(&r.pending, uint64(es)) - } - r.cb.NotifyPending() -} - -// Ack unsets the given events as pending. -func (r *Receiver) Ack(es Set) { - p := Set(atomic.LoadUint64(&r.pending)) - // Optimization: Skip the atomic CAS on r.pending if all events are - // already not pending. - if p&es == 0 { - return - } - // When this is uncontended (the common case), CAS is faster than - // atomic-AND because the former is inlined and the latter (which we - // implement in assembly ourselves) is not. - if !atomic.CompareAndSwapUint64(&r.pending, uint64(p), uint64(p&^es)) { - // If the CAS fails, fall back to atomic-AND. - atomicbitops.AndUint64(&r.pending, ^uint64(es)) - } -} - -// PendingAndAckAll unsets all events as pending and returns the set of -// previously-pending events. -// -// PendingAndAckAll should only be used in preference to a call to Pending -// followed by a conditional call to Ack when the caller expects events to be -// pending (e.g. after a call to ReceiverCallback.NotifyPending()). -func (r *Receiver) PendingAndAckAll() Set { - return Set(atomic.SwapUint64(&r.pending, 0)) -} diff --git a/pkg/syncevent/source.go b/pkg/syncevent/source.go deleted file mode 100644 index d3d0f34c5..000000000 --- a/pkg/syncevent/source.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2020 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package syncevent - -// Source represents an event source. -type Source interface { - // SubscribeEvents causes the Source to notify the given Receiver of the - // given subset of events. - // - // Preconditions: - // * r != nil. - // * The ReceiverCallback for r must not take locks that are ordered - // prior to the Source; for example, it cannot call any Source - // methods. - SubscribeEvents(r *Receiver, filter Set) SubscriptionID - - // UnsubscribeEvents causes the Source to stop notifying the Receiver - // subscribed by a previous call to SubscribeEvents that returned the given - // SubscriptionID. - // - // Preconditions: UnsubscribeEvents may be called at most once for any - // given SubscriptionID. - UnsubscribeEvents(id SubscriptionID) -} - -// SubscriptionID identifies a call to Source.SubscribeEvents. -type SubscriptionID uint64 - -// UnsubscribeAndAck is a convenience function that unsubscribes r from the -// given events from src and also clears them from r. -func UnsubscribeAndAck(src Source, r *Receiver, filter Set, id SubscriptionID) { - src.UnsubscribeEvents(id) - r.Ack(filter) -} - -// NoopSource implements Source by never sending events to subscribed -// Receivers. -type NoopSource struct{} - -// SubscribeEvents implements Source.SubscribeEvents. -func (NoopSource) SubscribeEvents(*Receiver, Set) SubscriptionID { - return 0 -} - -// UnsubscribeEvents implements Source.UnsubscribeEvents. -func (NoopSource) UnsubscribeEvents(SubscriptionID) { -} - -// See Broadcaster for a non-noop implementations of Source. diff --git a/pkg/syncevent/syncevent.go b/pkg/syncevent/syncevent.go deleted file mode 100644 index 9fb6a06de..000000000 --- a/pkg/syncevent/syncevent.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2020 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package syncevent provides efficient primitives for goroutine -// synchronization based on event bitmasks. -package syncevent - -// Set is a bitmask where each bit represents a distinct user-defined event. -// The event package does not treat any bits in Set specially. -type Set uint64 - -const ( - // NoEvents is a Set containing no events. - NoEvents = Set(0) - - // AllEvents is a Set containing all possible events. - AllEvents = ^Set(0) - - // MaxEvents is the number of distinct events that can be represented by a Set. - MaxEvents = 64 -) diff --git a/pkg/syncevent/syncevent_example_test.go b/pkg/syncevent/syncevent_example_test.go deleted file mode 100644 index bfb18e2ea..000000000 --- a/pkg/syncevent/syncevent_example_test.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2020 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package syncevent - -import ( - "fmt" - "sync/atomic" - "time" -) - -func Example_ioReadinessInterrputible() { - const ( - evReady = Set(1 << iota) - evInterrupt - ) - errNotReady := fmt.Errorf("not ready for I/O") - - // State of some I/O object. - var ( - br Broadcaster - ready uint32 - ) - doIO := func() error { - if atomic.LoadUint32(&ready) == 0 { - return errNotReady - } - return nil - } - go func() { - // The I/O object eventually becomes ready for I/O. - time.Sleep(100 * time.Millisecond) - // When it does, it first ensures that future calls to isReady() return - // true, then broadcasts the readiness event to Receivers. - atomic.StoreUint32(&ready, 1) - br.Broadcast(evReady) - }() - - // Each user of the I/O object owns a Waiter. - var w Waiter - w.Init() - // The Waiter may be asynchronously interruptible, e.g. for signal - // handling in the sentry. - go func() { - time.Sleep(200 * time.Millisecond) - w.Receiver().Notify(evInterrupt) - }() - - // To use the I/O object: - // - // Optionally, if the I/O object is likely to be ready, attempt I/O first. - err := doIO() - if err == nil { - // Success, we're done. - return /* nil */ - } - if err != errNotReady { - // Failure, I/O failed for some reason other than readiness. - return /* err */ - } - // Subscribe for readiness events from the I/O object. - id := br.SubscribeEvents(w.Receiver(), evReady) - // When we are finished blocking, unsubscribe from readiness events and - // remove readiness events from the pending event set. - defer UnsubscribeAndAck(&br, w.Receiver(), evReady, id) - for { - // Attempt I/O again. This must be done after the call to SubscribeEvents, - // since the I/O object might have become ready between the previous call - // to doIO and the call to SubscribeEvents. - err = doIO() - if err == nil { - return /* nil */ - } - if err != errNotReady { - return /* err */ - } - // Block until either the I/O object indicates it is ready, or we are - // interrupted. - events := w.Wait() - if events&evInterrupt != 0 { - // In the specific case of sentry signal handling, signal delivery - // is handled by another system, so we aren't responsible for - // acknowledging evInterrupt. - return /* errInterrupted */ - } - // Note that, in a concurrent context, the I/O object might become - // ready and then not ready again. To handle this: - // - // - evReady must be acknowledged before calling doIO() again (rather - // than after), so that if the I/O object becomes ready *again* after - // the call to doIO(), the readiness event is not lost. - // - // - We must loop instead of just calling doIO() once after receiving - // evReady. - w.Ack(evReady) - } -} diff --git a/pkg/syncevent/waiter_test.go b/pkg/syncevent/waiter_test.go deleted file mode 100644 index 3c8cbcdd8..000000000 --- a/pkg/syncevent/waiter_test.go +++ /dev/null @@ -1,414 +0,0 @@ -// Copyright 2020 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package syncevent - -import ( - "sync/atomic" - "testing" - "time" - - "gvisor.dev/gvisor/pkg/sleep" - "gvisor.dev/gvisor/pkg/sync" -) - -func TestWaiterAlreadyPending(t *testing.T) { - var w Waiter - w.Init() - want := Set(1) - w.Notify(want) - if got := w.Wait(); got != want { - t.Errorf("Waiter.Wait: got %#x, wanted %#x", got, want) - } -} - -func TestWaiterAsyncNotify(t *testing.T) { - var w Waiter - w.Init() - want := Set(1) - go func() { - time.Sleep(100 * time.Millisecond) - w.Notify(want) - }() - if got := w.Wait(); got != want { - t.Errorf("Waiter.Wait: got %#x, wanted %#x", got, want) - } -} - -func TestWaiterWaitFor(t *testing.T) { - var w Waiter - w.Init() - evWaited := Set(1) - evOther := Set(2) - w.Notify(evOther) - notifiedEvent := uint32(0) - go func() { - time.Sleep(100 * time.Millisecond) - atomic.StoreUint32(¬ifiedEvent, 1) - w.Notify(evWaited) - }() - if got, want := w.WaitFor(evWaited), evWaited|evOther; got != want { - t.Errorf("Waiter.WaitFor: got %#x, wanted %#x", got, want) - } - if atomic.LoadUint32(¬ifiedEvent) == 0 { - t.Errorf("Waiter.WaitFor returned before goroutine notified waited-for event") - } -} - -func TestWaiterWaitAndAckAll(t *testing.T) { - var w Waiter - w.Init() - w.Notify(AllEvents) - if got := w.WaitAndAckAll(); got != AllEvents { - t.Errorf("Waiter.WaitAndAckAll: got %#x, wanted %#x", got, AllEvents) - } - if got := w.Pending(); got != NoEvents { - t.Errorf("Waiter.WaitAndAckAll did not ack all events: got %#x, wanted 0", got) - } -} - -// BenchmarkWaiterX, BenchmarkSleeperX, and BenchmarkChannelX benchmark usage -// pattern X (described in terms of Waiter) with Waiter, sleep.Sleeper, and -// buffered chan struct{} respectively. When the maximum number of event -// sources is relevant, we use 3 event sources because this is representative -// of the kernel.Task.block() use case: an interrupt source, a timeout source, -// and the actual event source being waited on. - -// Event set used by most benchmarks. -const evBench Set = 1 - -// BenchmarkXxxNotifyRedundant measures how long it takes to notify a Waiter of -// an event that is already pending. - -func BenchmarkWaiterNotifyRedundant(b *testing.B) { - var w Waiter - w.Init() - w.Notify(evBench) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - w.Notify(evBench) - } -} - -func BenchmarkSleeperNotifyRedundant(b *testing.B) { - var s sleep.Sleeper - var w sleep.Waker - s.AddWaker(&w, 0) - w.Assert() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - w.Assert() - } -} - -func BenchmarkChannelNotifyRedundant(b *testing.B) { - ch := make(chan struct{}, 1) - ch <- struct{}{} - - b.ResetTimer() - for i := 0; i < b.N; i++ { - select { - case ch <- struct{}{}: - default: - } - } -} - -// BenchmarkXxxNotifyWaitAck measures how long it takes to notify a Waiter an -// event, return that event using a blocking check, and then unset the event as -// pending. - -func BenchmarkWaiterNotifyWaitAck(b *testing.B) { - var w Waiter - w.Init() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - w.Notify(evBench) - w.Wait() - w.Ack(evBench) - } -} - -func BenchmarkSleeperNotifyWaitAck(b *testing.B) { - var s sleep.Sleeper - var w sleep.Waker - s.AddWaker(&w, 0) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - w.Assert() - s.Fetch(true) - } -} - -func BenchmarkChannelNotifyWaitAck(b *testing.B) { - ch := make(chan struct{}, 1) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - // notify - select { - case ch <- struct{}{}: - default: - } - - // wait + ack - <-ch - } -} - -// BenchmarkSleeperMultiNotifyWaitAck is equivalent to -// BenchmarkSleeperNotifyWaitAck, but also includes allocation of a -// temporary sleep.Waker. This is necessary when multiple goroutines may wait -// for the same event, since each sleep.Waker can wake only a single -// sleep.Sleeper. -// -// The syncevent package does not require a distinct object for each -// waiter-waker relationship, so BenchmarkWaiterNotifyWaitAck and -// BenchmarkWaiterMultiNotifyWaitAck would be identical. The analogous state -// for channels, runtime.sudog, is inescapably runtime-allocated, so -// BenchmarkChannelNotifyWaitAck and BenchmarkChannelMultiNotifyWaitAck would -// also be identical. - -func BenchmarkSleeperMultiNotifyWaitAck(b *testing.B) { - var s sleep.Sleeper - // The sleep package doesn't provide sync.Pool allocation of Wakers; - // we do for a fairer comparison. - wakerPool := sync.Pool{ - New: func() interface{} { - return &sleep.Waker{} - }, - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - w := wakerPool.Get().(*sleep.Waker) - s.AddWaker(w, 0) - w.Assert() - s.Fetch(true) - s.Done() - wakerPool.Put(w) - } -} - -// BenchmarkXxxTempNotifyWaitAck is equivalent to NotifyWaitAck, but also -// includes allocation of a temporary Waiter. This models the case where a -// goroutine not already associated with a Waiter needs one in order to block. -// -// The analogous state for channels is built into runtime.g, so -// BenchmarkChannelNotifyWaitAck and BenchmarkChannelTempNotifyWaitAck would be -// identical. - -func BenchmarkWaiterTempNotifyWaitAck(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - w := GetWaiter() - w.Notify(evBench) - w.Wait() - w.Ack(evBench) - PutWaiter(w) - } -} - -func BenchmarkSleeperTempNotifyWaitAck(b *testing.B) { - // The sleep package doesn't provide sync.Pool allocation of Sleepers; - // we do for a fairer comparison. - sleeperPool := sync.Pool{ - New: func() interface{} { - return &sleep.Sleeper{} - }, - } - var w sleep.Waker - - b.ResetTimer() - for i := 0; i < b.N; i++ { - s := sleeperPool.Get().(*sleep.Sleeper) - s.AddWaker(&w, 0) - w.Assert() - s.Fetch(true) - s.Done() - sleeperPool.Put(s) - } -} - -// BenchmarkXxxNotifyWaitMultiAck is equivalent to NotifyWaitAck, but allows -// for multiple event sources. - -func BenchmarkWaiterNotifyWaitMultiAck(b *testing.B) { - var w Waiter - w.Init() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - w.Notify(evBench) - if e := w.Wait(); e != evBench { - b.Fatalf("Wait: got %#x, wanted %#x", e, evBench) - } - w.Ack(evBench) - } -} - -func BenchmarkSleeperNotifyWaitMultiAck(b *testing.B) { - var s sleep.Sleeper - var ws [3]sleep.Waker - for i := range ws { - s.AddWaker(&ws[i], i) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - ws[0].Assert() - if id, _ := s.Fetch(true); id != 0 { - b.Fatalf("Fetch: got %d, wanted 0", id) - } - } -} - -func BenchmarkChannelNotifyWaitMultiAck(b *testing.B) { - ch0 := make(chan struct{}, 1) - ch1 := make(chan struct{}, 1) - ch2 := make(chan struct{}, 1) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - // notify - select { - case ch0 <- struct{}{}: - default: - } - - // wait + clear - select { - case <-ch0: - // ok - case <-ch1: - b.Fatalf("received from ch1") - case <-ch2: - b.Fatalf("received from ch2") - } - } -} - -// BenchmarkXxxNotifyAsyncWaitAck measures how long it takes to wait for an -// event while another goroutine signals the event. This assumes that a new -// goroutine doesn't run immediately (i.e. the creator of a new goroutine is -// allowed to go to sleep before the new goroutine has a chance to run). - -func BenchmarkWaiterNotifyAsyncWaitAck(b *testing.B) { - var w Waiter - w.Init() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - go func() { - w.Notify(1) - }() - w.Wait() - w.Ack(evBench) - } -} - -func BenchmarkSleeperNotifyAsyncWaitAck(b *testing.B) { - var s sleep.Sleeper - var w sleep.Waker - s.AddWaker(&w, 0) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - go func() { - w.Assert() - }() - s.Fetch(true) - } -} - -func BenchmarkChannelNotifyAsyncWaitAck(b *testing.B) { - ch := make(chan struct{}, 1) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - go func() { - select { - case ch <- struct{}{}: - default: - } - }() - <-ch - } -} - -// BenchmarkXxxNotifyAsyncWaitMultiAck is equivalent to NotifyAsyncWaitAck, but -// allows for multiple event sources. - -func BenchmarkWaiterNotifyAsyncWaitMultiAck(b *testing.B) { - var w Waiter - w.Init() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - go func() { - w.Notify(evBench) - }() - if e := w.Wait(); e != evBench { - b.Fatalf("Wait: got %#x, wanted %#x", e, evBench) - } - w.Ack(evBench) - } -} - -func BenchmarkSleeperNotifyAsyncWaitMultiAck(b *testing.B) { - var s sleep.Sleeper - var ws [3]sleep.Waker - for i := range ws { - s.AddWaker(&ws[i], i) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - go func() { - ws[0].Assert() - }() - if id, _ := s.Fetch(true); id != 0 { - b.Fatalf("Fetch: got %d, expected 0", id) - } - } -} - -func BenchmarkChannelNotifyAsyncWaitMultiAck(b *testing.B) { - ch0 := make(chan struct{}, 1) - ch1 := make(chan struct{}, 1) - ch2 := make(chan struct{}, 1) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - go func() { - select { - case ch0 <- struct{}{}: - default: - } - }() - - select { - case <-ch0: - // ok - case <-ch1: - b.Fatalf("received from ch1") - case <-ch2: - b.Fatalf("received from ch2") - } - } -} diff --git a/pkg/syncevent/waiter_unsafe.go b/pkg/syncevent/waiter_unsafe.go deleted file mode 100644 index b6ed2852d..000000000 --- a/pkg/syncevent/waiter_unsafe.go +++ /dev/null @@ -1,197 +0,0 @@ -// Copyright 2020 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package syncevent - -import ( - "sync/atomic" - "unsafe" - - "gvisor.dev/gvisor/pkg/sync" -) - -// Waiter allows a goroutine to block on pending events received by a Receiver. -// -// Waiter.Init() must be called before first use. -type Waiter struct { - r Receiver - - // g is one of: - // - // - 0: No goroutine is blocking in Wait. - // - // - preparingG: A goroutine is in Wait preparing to sleep, but hasn't yet - // completed waiterUnlock(). Thus the wait can only be interrupted by - // replacing the value of g with 0 (the G may not be in state Gwaiting yet, - // so we can't call goready.) - // - // - Otherwise: g is a pointer to the runtime.g in state Gwaiting for the - // goroutine blocked in Wait, which can only be woken by calling goready. - g uintptr `state:"zerovalue"` -} - -const preparingG = 1 - -// Init must be called before first use of w. -func (w *Waiter) Init() { - w.r.Init(w) -} - -// Receiver returns the Receiver that receives events that unblock calls to -// w.Wait(). -func (w *Waiter) Receiver() *Receiver { - return &w.r -} - -// Pending returns the set of pending events. -func (w *Waiter) Pending() Set { - return w.r.Pending() -} - -// Wait blocks until at least one event is pending, then returns the set of -// pending events. It does not affect the set of pending events; callers must -// call w.Ack() to do so, or use w.WaitAndAck() instead. -// -// Precondition: Only one goroutine may call any Wait* method at a time. -func (w *Waiter) Wait() Set { - return w.WaitFor(AllEvents) -} - -// WaitFor blocks until at least one event in es is pending, then returns the -// set of pending events (including those not in es). It does not affect the -// set of pending events; callers must call w.Ack() to do so. -// -// Precondition: Only one goroutine may call any Wait* method at a time. -func (w *Waiter) WaitFor(es Set) Set { - for { - // Optimization: Skip the atomic store to w.g if an event is already - // pending. - if p := w.r.Pending(); p&es != NoEvents { - return p - } - - // Indicate that we're preparing to go to sleep. - atomic.StoreUintptr(&w.g, preparingG) - - // If an event is pending, abort the sleep. - if p := w.r.Pending(); p&es != NoEvents { - atomic.StoreUintptr(&w.g, 0) - return p - } - - // If w.g is still preparingG (i.e. w.NotifyPending() has not been - // called or has not reached atomic.SwapUintptr()), go to sleep until - // w.NotifyPending() => goready(). - sync.Gopark(waiterCommit, unsafe.Pointer(&w.g), sync.WaitReasonSelect, sync.TraceEvGoBlockSelect, 0) - } -} - -//go:norace -//go:nosplit -func waiterCommit(g uintptr, wg unsafe.Pointer) bool { - // The only way this CAS can fail is if a call to Waiter.NotifyPending() - // has replaced *wg with nil, in which case we should not sleep. - return sync.RaceUncheckedAtomicCompareAndSwapUintptr((*uintptr)(wg), preparingG, g) -} - -// Ack marks the given events as not pending. -func (w *Waiter) Ack(es Set) { - w.r.Ack(es) -} - -// WaitAndAckAll blocks until at least one event is pending, then marks all -// events as not pending and returns the set of previously-pending events. -// -// Precondition: Only one goroutine may call any Wait* method at a time. -func (w *Waiter) WaitAndAckAll() Set { - // Optimization: Skip the atomic store to w.g if an event is already - // pending. Call Pending() first since, in the common case that events are - // not yet pending, this skips an atomic swap on w.r.pending. - if w.r.Pending() != NoEvents { - if p := w.r.PendingAndAckAll(); p != NoEvents { - return p - } - } - - for { - // Indicate that we're preparing to go to sleep. - atomic.StoreUintptr(&w.g, preparingG) - - // If an event is pending, abort the sleep. - if w.r.Pending() != NoEvents { - if p := w.r.PendingAndAckAll(); p != NoEvents { - atomic.StoreUintptr(&w.g, 0) - return p - } - } - - // If w.g is still preparingG (i.e. w.NotifyPending() has not been - // called or has not reached atomic.SwapUintptr()), go to sleep until - // w.NotifyPending() => goready(). - sync.Gopark(waiterCommit, unsafe.Pointer(&w.g), sync.WaitReasonSelect, sync.TraceEvGoBlockSelect, 0) - - // Check for pending events. We call PendingAndAckAll() directly now since - // we only expect to be woken after events become pending. - if p := w.r.PendingAndAckAll(); p != NoEvents { - return p - } - } -} - -// Notify marks the given events as pending, possibly unblocking concurrent -// calls to w.Wait() or w.WaitFor(). -func (w *Waiter) Notify(es Set) { - w.r.Notify(es) -} - -// NotifyPending implements ReceiverCallback.NotifyPending. Users of Waiter -// should not call NotifyPending. -func (w *Waiter) NotifyPending() { - // Optimization: Skip the atomic swap on w.g if there is no sleeping - // goroutine. NotifyPending is called after w.r.Pending() is updated, so - // concurrent and future calls to w.Wait() will observe pending events and - // abort sleeping. - if atomic.LoadUintptr(&w.g) == 0 { - return - } - // Wake a sleeping G, or prevent a G that is preparing to sleep from doing - // so. Swap is needed here to ensure that only one call to NotifyPending - // calls goready. - if g := atomic.SwapUintptr(&w.g, 0); g > preparingG { - sync.Goready(g, 0) - } -} - -var waiterPool = sync.Pool{ - New: func() interface{} { - w := &Waiter{} - w.Init() - return w - }, -} - -// GetWaiter returns an unused Waiter. PutWaiter should be called to release -// the Waiter once it is no longer needed. -// -// Where possible, users should prefer to associate each goroutine that calls -// Waiter.Wait() with a distinct pre-allocated Waiter to avoid allocation of -// Waiters in hot paths. -func GetWaiter() *Waiter { - return waiterPool.Get().(*Waiter) -} - -// PutWaiter releases an unused Waiter previously returned by GetWaiter. -func PutWaiter(w *Waiter) { - waiterPool.Put(w) -} |