diff options
author | gVisor bot <gvisor-bot@google.com> | 2020-02-18 15:17:45 -0800 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2020-02-18 15:18:48 -0800 |
commit | 55c553ae8c7937be4a7e10e0c7a727d132317e89 (patch) | |
tree | 09cb8c5859259a06ce38c3f71b849d0d7c6ea184 /pkg | |
parent | 737a3d072ef6e3edf5099505e41deed49f9e5b5c (diff) |
Add //pkg/syncevent.
Package syncevent is intended to subsume ~all uses of channels in the sentry
(including //pkg/waiter), as well as //pkg/sleep.
Compared to channels:
- Delivery of events to a syncevent.Receiver allows *synchronous* execution of
an arbitrary callback, whereas delivery of events to a channel requires a
goroutine to receive from that channel, resulting in substantial scheduling
overhead. (This is also part of the motivation for the waiter package.)
- syncevent.Waiter can wait on multiple event sources without the high O(N)
overhead of select. (This is the same motivation as for the sleep package.)
Compared to the waiter package:
- syncevent.Waiters are intended to be persistent (i.e. per-kernel.Task), and
syncevent.Broadcaster (analogous to waiter.Queue) is a hash table rather than
a linked list, such that blocking is (usually) allocation-free.
- syncevent.Source (analogous to waiter.Waitable) does not include an equivalent
to waiter.Waitable.Readiness(), since this is inappropriate for transient
events (see e.g. //pkg/sentry/kernel/time.ClockEventSource).
Compared to the sleep package:
- syncevent events are represented by bits in a bitmask rather than discrete
sleep.Waker objects, reducing overhead and making it feasible to broadcast
events to multiple syncevent.Receivers.
- syncevent.Receiver invokes an arbitrary callback, which is required by the
sentry's epoll implementation. (syncevent.Waiter, which is analogous to
sleep.Sleeper, pairs a syncevent.Receiver with a callback that wakes a
waiting goroutine; the implementation of this aspect is nearly identical to
that of sleep.Sleeper, except that it represents *runtime.g as unsafe.Pointer
rather than uintptr.)
- syncevent.Waiter.Wait (analogous to sleep.Sleeper.Fetch(block=true)) does not
automatically un-assert returned events. This is useful in cases where the
path for handling an event is not the same as the path that observes it, such
as for application signals (a la Linux's TIF_SIGPENDING).
- Unlike sleep.Sleeper, which Fetches Wakers in the order that they were
Asserted, the event bitmasks used by syncevent.Receiver have no way of
preserving event arrival order. (This is similar to select, which goes out of
its way to randomize event ordering.)
The disadvantage of the syncevent package is that, since events are represented
by bits in a uint64 bitmask, each syncevent.Receiver can "only" multiplex
between 64 distinct events; this does not affect any known use case.
Benchmarks:
BenchmarkBroadcasterSubscribeUnsubscribe
BenchmarkBroadcasterSubscribeUnsubscribe-12 45133884 26.3 ns/op
BenchmarkMapSubscribeUnsubscribe
BenchmarkMapSubscribeUnsubscribe-12 28504662 41.8 ns/op
BenchmarkQueueSubscribeUnsubscribe
BenchmarkQueueSubscribeUnsubscribe-12 22747668 45.6 ns/op
BenchmarkBroadcasterSubscribeUnsubscribeBatch
BenchmarkBroadcasterSubscribeUnsubscribeBatch-12 31609177 37.8 ns/op
BenchmarkMapSubscribeUnsubscribeBatch
BenchmarkMapSubscribeUnsubscribeBatch-12 17563906 62.1 ns/op
BenchmarkQueueSubscribeUnsubscribeBatch
BenchmarkQueueSubscribeUnsubscribeBatch-12 26248838 46.6 ns/op
BenchmarkBroadcasterBroadcastRedundant
BenchmarkBroadcasterBroadcastRedundant/0
BenchmarkBroadcasterBroadcastRedundant/0-12 100907563 11.8 ns/op
BenchmarkBroadcasterBroadcastRedundant/1
BenchmarkBroadcasterBroadcastRedundant/1-12 85103068 13.3 ns/op
BenchmarkBroadcasterBroadcastRedundant/4
BenchmarkBroadcasterBroadcastRedundant/4-12 52716502 22.3 ns/op
BenchmarkBroadcasterBroadcastRedundant/16
BenchmarkBroadcasterBroadcastRedundant/16-12 20278165 58.7 ns/op
BenchmarkBroadcasterBroadcastRedundant/64
BenchmarkBroadcasterBroadcastRedundant/64-12 5905428 205 ns/op
BenchmarkMapBroadcastRedundant
BenchmarkMapBroadcastRedundant/0
BenchmarkMapBroadcastRedundant/0-12 87532734 13.5 ns/op
BenchmarkMapBroadcastRedundant/1
BenchmarkMapBroadcastRedundant/1-12 28488411 36.3 ns/op
BenchmarkMapBroadcastRedundant/4
BenchmarkMapBroadcastRedundant/4-12 19628920 60.9 ns/op
BenchmarkMapBroadcastRedundant/16
BenchmarkMapBroadcastRedundant/16-12 6026980 192 ns/op
BenchmarkMapBroadcastRedundant/64
BenchmarkMapBroadcastRedundant/64-12 1640858 754 ns/op
BenchmarkQueueBroadcastRedundant
BenchmarkQueueBroadcastRedundant/0
BenchmarkQueueBroadcastRedundant/0-12 96904807 12.0 ns/op
BenchmarkQueueBroadcastRedundant/1
BenchmarkQueueBroadcastRedundant/1-12 73521873 16.3 ns/op
BenchmarkQueueBroadcastRedundant/4
BenchmarkQueueBroadcastRedundant/4-12 39209468 31.2 ns/op
BenchmarkQueueBroadcastRedundant/16
BenchmarkQueueBroadcastRedundant/16-12 10810058 105 ns/op
BenchmarkQueueBroadcastRedundant/64
BenchmarkQueueBroadcastRedundant/64-12 2998046 376 ns/op
BenchmarkBroadcasterBroadcastAck
BenchmarkBroadcasterBroadcastAck/1
BenchmarkBroadcasterBroadcastAck/1-12 44472397 26.4 ns/op
BenchmarkBroadcasterBroadcastAck/4
BenchmarkBroadcasterBroadcastAck/4-12 17653509 69.7 ns/op
BenchmarkBroadcasterBroadcastAck/16
BenchmarkBroadcasterBroadcastAck/16-12 4082617 260 ns/op
BenchmarkBroadcasterBroadcastAck/64
BenchmarkBroadcasterBroadcastAck/64-12 1220534 1027 ns/op
BenchmarkMapBroadcastAck
BenchmarkMapBroadcastAck/1
BenchmarkMapBroadcastAck/1-12 26760705 44.2 ns/op
BenchmarkMapBroadcastAck/4
BenchmarkMapBroadcastAck/4-12 11495636 100 ns/op
BenchmarkMapBroadcastAck/16
BenchmarkMapBroadcastAck/16-12 2937590 343 ns/op
BenchmarkMapBroadcastAck/64
BenchmarkMapBroadcastAck/64-12 861037 1344 ns/op
BenchmarkQueueBroadcastAck
BenchmarkQueueBroadcastAck/1
BenchmarkQueueBroadcastAck/1-12 19832679 55.0 ns/op
BenchmarkQueueBroadcastAck/4
BenchmarkQueueBroadcastAck/4-12 5618214 189 ns/op
BenchmarkQueueBroadcastAck/16
BenchmarkQueueBroadcastAck/16-12 1569980 713 ns/op
BenchmarkQueueBroadcastAck/64
BenchmarkQueueBroadcastAck/64-12 437672 2814 ns/op
BenchmarkWaiterNotifyRedundant
BenchmarkWaiterNotifyRedundant-12 650823090 1.96 ns/op
BenchmarkSleeperNotifyRedundant
BenchmarkSleeperNotifyRedundant-12 619871544 1.61 ns/op
BenchmarkChannelNotifyRedundant
BenchmarkChannelNotifyRedundant-12 298903778 3.67 ns/op
BenchmarkWaiterNotifyWaitAck
BenchmarkWaiterNotifyWaitAck-12 68358360 17.8 ns/op
BenchmarkSleeperNotifyWaitAck
BenchmarkSleeperNotifyWaitAck-12 25044883 41.2 ns/op
BenchmarkChannelNotifyWaitAck
BenchmarkChannelNotifyWaitAck-12 29572404 40.2 ns/op
BenchmarkSleeperMultiNotifyWaitAck
BenchmarkSleeperMultiNotifyWaitAck-12 16122969 73.8 ns/op
BenchmarkWaiterTempNotifyWaitAck
BenchmarkWaiterTempNotifyWaitAck-12 46111489 25.8 ns/op
BenchmarkSleeperTempNotifyWaitAck
BenchmarkSleeperTempNotifyWaitAck-12 15541882 73.6 ns/op
BenchmarkWaiterNotifyWaitMultiAck
BenchmarkWaiterNotifyWaitMultiAck-12 65878500 18.2 ns/op
BenchmarkSleeperNotifyWaitMultiAck
BenchmarkSleeperNotifyWaitMultiAck-12 28798623 41.5 ns/op
BenchmarkChannelNotifyWaitMultiAck
BenchmarkChannelNotifyWaitMultiAck-12 11308468 101 ns/op
BenchmarkWaiterNotifyAsyncWaitAck
BenchmarkWaiterNotifyAsyncWaitAck-12 2475387 492 ns/op
BenchmarkSleeperNotifyAsyncWaitAck
BenchmarkSleeperNotifyAsyncWaitAck-12 2184507 518 ns/op
BenchmarkChannelNotifyAsyncWaitAck
BenchmarkChannelNotifyAsyncWaitAck-12 2120365 562 ns/op
BenchmarkWaiterNotifyAsyncWaitMultiAck
BenchmarkWaiterNotifyAsyncWaitMultiAck-12 2351247 494 ns/op
BenchmarkSleeperNotifyAsyncWaitMultiAck
BenchmarkSleeperNotifyAsyncWaitMultiAck-12 2205799 522 ns/op
BenchmarkChannelNotifyAsyncWaitMultiAck
BenchmarkChannelNotifyAsyncWaitMultiAck-12 1238079 928 ns/op
Updates #1074
PiperOrigin-RevId: 295834087
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/syncevent/BUILD | 39 | ||||
-rw-r--r-- | pkg/syncevent/broadcaster.go | 218 | ||||
-rw-r--r-- | pkg/syncevent/broadcaster_test.go | 376 | ||||
-rw-r--r-- | pkg/syncevent/receiver.go | 103 | ||||
-rw-r--r-- | pkg/syncevent/source.go | 59 | ||||
-rw-r--r-- | pkg/syncevent/syncevent.go | 32 | ||||
-rw-r--r-- | pkg/syncevent/syncevent_example_test.go | 108 | ||||
-rw-r--r-- | pkg/syncevent/waiter_amd64.s | 32 | ||||
-rw-r--r-- | pkg/syncevent/waiter_arm64.s | 34 | ||||
-rw-r--r-- | pkg/syncevent/waiter_asm_unsafe.go | 24 | ||||
-rw-r--r-- | pkg/syncevent/waiter_noasm_unsafe.go | 39 | ||||
-rw-r--r-- | pkg/syncevent/waiter_test.go | 414 | ||||
-rw-r--r-- | pkg/syncevent/waiter_unsafe.go | 206 |
13 files changed, 1684 insertions, 0 deletions
diff --git a/pkg/syncevent/BUILD b/pkg/syncevent/BUILD new file mode 100644 index 000000000..0500a22cf --- /dev/null +++ b/pkg/syncevent/BUILD @@ -0,0 +1,39 @@ +load("//tools:defs.bzl", "go_library", "go_test") + +licenses(["notice"]) + +go_library( + name = "syncevent", + srcs = [ + "broadcaster.go", + "receiver.go", + "source.go", + "syncevent.go", + "waiter_amd64.s", + "waiter_arm64.s", + "waiter_asm_unsafe.go", + "waiter_noasm_unsafe.go", + "waiter_unsafe.go", + ], + visibility = ["//:sandbox"], + deps = [ + "//pkg/atomicbitops", + "//pkg/sync", + ], +) + +go_test( + name = "syncevent_test", + size = "small", + srcs = [ + "broadcaster_test.go", + "syncevent_example_test.go", + "waiter_test.go", + ], + library = ":syncevent", + deps = [ + "//pkg/sleep", + "//pkg/sync", + "//pkg/waiter", + ], +) diff --git a/pkg/syncevent/broadcaster.go b/pkg/syncevent/broadcaster.go new file mode 100644 index 000000000..4bff59e7d --- /dev/null +++ b/pkg/syncevent/broadcaster.go @@ -0,0 +1,218 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncevent + +import ( + "gvisor.dev/gvisor/pkg/sync" +) + +// Broadcaster is an implementation of Source that supports any number of +// subscribed Receivers. +// +// The zero value of Broadcaster is valid and has no subscribed Receivers. +// Broadcaster is not copyable by value. +// +// All Broadcaster methods may be called concurrently from multiple goroutines. +type Broadcaster struct { + // Broadcaster is implemented as a hash table where keys are assigned by + // the Broadcaster and returned as SubscriptionIDs, making it safe to use + // the identity function for hashing. The hash table resolves collisions + // using linear probing and features Robin Hood insertion and backward + // shift deletion in order to support a relatively high load factor + // efficiently, which matters since the cost of Broadcast is linear in the + // size of the table. + + // mu protects the following fields. + mu sync.Mutex + + // Invariants: len(table) is 0 or a power of 2. + table []broadcasterSlot + + // load is the number of entries in table with receiver != nil. + load int + + lastID SubscriptionID +} + +type broadcasterSlot struct { + // Invariants: If receiver == nil, then filter == NoEvents and id == 0. + // Otherwise, id != 0. + receiver *Receiver + filter Set + id SubscriptionID +} + +const ( + broadcasterMinNonZeroTableSize = 2 // must be a power of 2 > 1 + + broadcasterMaxLoadNum = 13 + broadcasterMaxLoadDen = 16 +) + +// SubscribeEvents implements Source.SubscribeEvents. +func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID { + b.mu.Lock() + + // Assign an ID for this subscription. + b.lastID++ + id := b.lastID + + // Expand the table if over the maximum load factor: + // + // load / len(b.table) > broadcasterMaxLoadNum / broadcasterMaxLoadDen + // load * broadcasterMaxLoadDen > broadcasterMaxLoadNum * len(b.table) + b.load++ + if (b.load * broadcasterMaxLoadDen) > (broadcasterMaxLoadNum * len(b.table)) { + // Double the number of slots in the new table. + newlen := broadcasterMinNonZeroTableSize + if len(b.table) != 0 { + newlen = 2 * len(b.table) + } + if newlen <= cap(b.table) { + // Reuse excess capacity in the current table, moving entries not + // already in their first-probed positions to better ones. + newtable := b.table[:newlen] + newmask := uint64(newlen - 1) + for i := range b.table { + if b.table[i].receiver != nil && uint64(b.table[i].id)&newmask != uint64(i) { + entry := b.table[i] + b.table[i] = broadcasterSlot{} + broadcasterTableInsert(newtable, entry.id, entry.receiver, entry.filter) + } + } + b.table = newtable + } else { + newtable := make([]broadcasterSlot, newlen) + // Copy existing entries to the new table. + for i := range b.table { + if b.table[i].receiver != nil { + broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter) + } + } + // Switch to the new table. + b.table = newtable + } + } + + broadcasterTableInsert(b.table, id, r, filter) + b.mu.Unlock() + return id +} + +// Preconditions: table must not be full. len(table) is a power of 2. +func broadcasterTableInsert(table []broadcasterSlot, id SubscriptionID, r *Receiver, filter Set) { + entry := broadcasterSlot{ + receiver: r, + filter: filter, + id: id, + } + mask := uint64(len(table) - 1) + i := uint64(id) & mask + disp := uint64(0) + for { + if table[i].receiver == nil { + table[i] = entry + return + } + // If we've been displaced farther from our first-probed slot than the + // element stored in this one, swap elements and switch to inserting + // the replaced one. (This is Robin Hood insertion.) + slotDisp := (i - uint64(table[i].id)) & mask + if disp > slotDisp { + table[i], entry = entry, table[i] + disp = slotDisp + } + i = (i + 1) & mask + disp++ + } +} + +// UnsubscribeEvents implements Source.UnsubscribeEvents. +func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID) { + b.mu.Lock() + + mask := uint64(len(b.table) - 1) + i := uint64(id) & mask + for { + if b.table[i].id == id { + // Found the element to remove. Move all subsequent elements + // backward until we either find an empty slot, or an element that + // is already in its first-probed slot. (This is backward shift + // deletion.) + for { + next := (i + 1) & mask + if b.table[next].receiver == nil { + break + } + if uint64(b.table[next].id)&mask == next { + break + } + b.table[i] = b.table[next] + i = next + } + b.table[i] = broadcasterSlot{} + break + } + i = (i + 1) & mask + } + + // If a table 1/4 of the current size would still be at or under the + // maximum load factor (i.e. the current table size is at least two + // expansions bigger than necessary), halve the size of the table to reduce + // the cost of Broadcast. Since we are concerned with iteration time and + // not memory usage, reuse the existing slice to reduce future allocations + // from table re-expansion. + b.load-- + if len(b.table) > broadcasterMinNonZeroTableSize && (b.load*(4*broadcasterMaxLoadDen)) <= (broadcasterMaxLoadNum*len(b.table)) { + newlen := len(b.table) / 2 + newtable := b.table[:newlen] + for i := newlen; i < len(b.table); i++ { + if b.table[i].receiver != nil { + broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter) + b.table[i] = broadcasterSlot{} + } + } + b.table = newtable + } + + b.mu.Unlock() +} + +// Broadcast notifies all Receivers subscribed to the Broadcaster of the subset +// of events to which they subscribed. The order in which Receivers are +// notified is unspecified. +func (b *Broadcaster) Broadcast(events Set) { + b.mu.Lock() + for i := range b.table { + if intersection := events & b.table[i].filter; intersection != 0 { + // We don't need to check if broadcasterSlot.receiver is nil, since + // if it is then broadcasterSlot.filter is 0. + b.table[i].receiver.Notify(intersection) + } + } + b.mu.Unlock() +} + +// FilteredEvents returns the set of events for which Broadcast will notify at +// least one Receiver, i.e. the union of filters for all subscribed Receivers. +func (b *Broadcaster) FilteredEvents() Set { + var es Set + b.mu.Lock() + for i := range b.table { + es |= b.table[i].filter + } + b.mu.Unlock() + return es +} diff --git a/pkg/syncevent/broadcaster_test.go b/pkg/syncevent/broadcaster_test.go new file mode 100644 index 000000000..e88779e23 --- /dev/null +++ b/pkg/syncevent/broadcaster_test.go @@ -0,0 +1,376 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncevent + +import ( + "fmt" + "math/rand" + "testing" + + "gvisor.dev/gvisor/pkg/sync" + "gvisor.dev/gvisor/pkg/waiter" +) + +func TestBroadcasterFilter(t *testing.T) { + const numReceivers = 2 * MaxEvents + + var br Broadcaster + ws := make([]Waiter, numReceivers) + for i := range ws { + ws[i].Init() + br.SubscribeEvents(ws[i].Receiver(), 1<<(i%MaxEvents)) + } + for ev := 0; ev < MaxEvents; ev++ { + br.Broadcast(1 << ev) + for i := range ws { + want := NoEvents + if i%MaxEvents == ev { + want = 1 << ev + } + if got := ws[i].Receiver().PendingAndAckAll(); got != want { + t.Errorf("after Broadcast of event %d: waiter %d has pending event set %#x, wanted %#x", ev, i, got, want) + } + } + } +} + +// TestBroadcasterManySubscriptions tests that subscriptions are not lost by +// table expansion/compaction. +func TestBroadcasterManySubscriptions(t *testing.T) { + const numReceivers = 5000 // arbitrary + + var br Broadcaster + ws := make([]Waiter, numReceivers) + for i := range ws { + ws[i].Init() + } + + ids := make([]SubscriptionID, numReceivers) + for i := 0; i < numReceivers; i++ { + // Subscribe receiver i. + ids[i] = br.SubscribeEvents(ws[i].Receiver(), 1) + // Check that receivers [0, i] are subscribed. + br.Broadcast(1) + for j := 0; j <= i; j++ { + if ws[j].Pending() != 1 { + t.Errorf("receiver %d did not receive an event after subscription of receiver %d", j, i) + } + ws[j].Ack(1) + } + } + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numReceivers) + for i := 0; i < numReceivers; i++ { + // Unsubscribe receiver unsub[i]. + br.UnsubscribeEvents(ids[unsub[i]]) + // Check that receivers [unsub[0], unsub[i]] are not subscribed, and that + // receivers (unsub[i], unsub[numReceivers]) are still subscribed. + br.Broadcast(1) + for j := 0; j <= i; j++ { + if ws[unsub[j]].Pending() != 0 { + t.Errorf("unsub iteration %d: receiver %d received an event after unsubscription of receiver %d", i, unsub[j], unsub[i]) + } + } + for j := i + 1; j < numReceivers; j++ { + if ws[unsub[j]].Pending() != 1 { + t.Errorf("unsub iteration %d: receiver %d did not receive an event after unsubscription of receiver %d", i, unsub[j], unsub[i]) + } + ws[unsub[j]].Ack(1) + } + } +} + +var ( + receiverCountsNonZero = []int{1, 4, 16, 64} + receiverCountsIncludingZero = append([]int{0}, receiverCountsNonZero...) +) + +// BenchmarkBroadcasterX, BenchmarkMapX, and BenchmarkQueueX benchmark usage +// pattern X (described in terms of Broadcaster) with Broadcaster, a +// Mutex-protected map[*Receiver]Set, and waiter.Queue respectively. + +// BenchmarkXxxSubscribeUnsubscribe measures the cost of a Subscribe/Unsubscribe +// cycle. + +func BenchmarkBroadcasterSubscribeUnsubscribe(b *testing.B) { + var br Broadcaster + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + id := br.SubscribeEvents(w.Receiver(), 1) + br.UnsubscribeEvents(id) + } +} + +func BenchmarkMapSubscribeUnsubscribe(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + mu.Lock() + m[w.Receiver()] = Set(1) + mu.Unlock() + mu.Lock() + delete(m, w.Receiver()) + mu.Unlock() + } +} + +func BenchmarkQueueSubscribeUnsubscribe(b *testing.B) { + var q waiter.Queue + e, _ := waiter.NewChannelEntry(nil) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.EventRegister(&e, 1) + q.EventUnregister(&e) + } +} + +// BenchmarkXxxSubscribeUnsubscribeBatch is similar to +// BenchmarkXxxSubscribeUnsubscribe, but subscribes and unsubscribes a large +// number of Receivers at a time in order to measure the amortized overhead of +// table expansion/compaction. (Since waiter.Queue is implemented using a +// linked list, BenchmarkQueueSubscribeUnsubscribe and +// BenchmarkQueueSubscribeUnsubscribeBatch should produce nearly the same +// result.) + +const numBatchReceivers = 1000 + +func BenchmarkBroadcasterSubscribeUnsubscribeBatch(b *testing.B) { + var br Broadcaster + ws := make([]Waiter, numBatchReceivers) + for i := range ws { + ws[i].Init() + } + ids := make([]SubscriptionID, numBatchReceivers) + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numBatchReceivers) + + b.ResetTimer() + for i := 0; i < b.N/numBatchReceivers; i++ { + for j := 0; j < numBatchReceivers; j++ { + ids[j] = br.SubscribeEvents(ws[j].Receiver(), 1) + } + for j := 0; j < numBatchReceivers; j++ { + br.UnsubscribeEvents(ids[unsub[j]]) + } + } +} + +func BenchmarkMapSubscribeUnsubscribeBatch(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + ws := make([]Waiter, numBatchReceivers) + for i := range ws { + ws[i].Init() + } + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numBatchReceivers) + + b.ResetTimer() + for i := 0; i < b.N/numBatchReceivers; i++ { + for j := 0; j < numBatchReceivers; j++ { + mu.Lock() + m[ws[j].Receiver()] = Set(1) + mu.Unlock() + } + for j := 0; j < numBatchReceivers; j++ { + mu.Lock() + delete(m, ws[unsub[j]].Receiver()) + mu.Unlock() + } + } +} + +func BenchmarkQueueSubscribeUnsubscribeBatch(b *testing.B) { + var q waiter.Queue + es := make([]waiter.Entry, numBatchReceivers) + for i := range es { + es[i], _ = waiter.NewChannelEntry(nil) + } + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numBatchReceivers) + + b.ResetTimer() + for i := 0; i < b.N/numBatchReceivers; i++ { + for j := 0; j < numBatchReceivers; j++ { + q.EventRegister(&es[j], 1) + } + for j := 0; j < numBatchReceivers; j++ { + q.EventUnregister(&es[unsub[j]]) + } + } +} + +// BenchmarkXxxBroadcastRedundant measures how long it takes to Broadcast +// already-pending events to multiple Receivers. + +func BenchmarkBroadcasterBroadcastRedundant(b *testing.B) { + for _, n := range receiverCountsIncludingZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var br Broadcaster + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + br.SubscribeEvents(ws[i].Receiver(), 1) + } + br.Broadcast(1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + br.Broadcast(1) + } + }) + } +} + +func BenchmarkMapBroadcastRedundant(b *testing.B) { + for _, n := range receiverCountsIncludingZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + m[ws[i].Receiver()] = Set(1) + } + mu.Lock() + for r := range m { + r.Notify(1) + } + mu.Unlock() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + mu.Lock() + for r := range m { + r.Notify(1) + } + mu.Unlock() + } + }) + } +} + +func BenchmarkQueueBroadcastRedundant(b *testing.B) { + for _, n := range receiverCountsIncludingZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var q waiter.Queue + for i := 0; i < n; i++ { + e, _ := waiter.NewChannelEntry(nil) + q.EventRegister(&e, 1) + } + q.Notify(1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Notify(1) + } + }) + } +} + +// BenchmarkXxxBroadcastAck measures how long it takes to Broadcast events to +// multiple Receivers, check that all Receivers have received the event, and +// clear the event from all Receivers. + +func BenchmarkBroadcasterBroadcastAck(b *testing.B) { + for _, n := range receiverCountsNonZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var br Broadcaster + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + br.SubscribeEvents(ws[i].Receiver(), 1) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + br.Broadcast(1) + for j := range ws { + if got, want := ws[j].Pending(), Set(1); got != want { + b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want) + } + ws[j].Ack(1) + } + } + }) + } +} + +func BenchmarkMapBroadcastAck(b *testing.B) { + for _, n := range receiverCountsNonZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + m[ws[i].Receiver()] = Set(1) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + mu.Lock() + for r := range m { + r.Notify(1) + } + mu.Unlock() + for j := range ws { + if got, want := ws[j].Pending(), Set(1); got != want { + b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want) + } + ws[j].Ack(1) + } + } + }) + } +} + +func BenchmarkQueueBroadcastAck(b *testing.B) { + for _, n := range receiverCountsNonZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var q waiter.Queue + chs := make([]chan struct{}, n) + for i := range chs { + e, ch := waiter.NewChannelEntry(nil) + q.EventRegister(&e, 1) + chs[i] = ch + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Notify(1) + for _, ch := range chs { + select { + case <-ch: + default: + b.Fatalf("channel did not receive event") + } + } + } + }) + } +} diff --git a/pkg/syncevent/receiver.go b/pkg/syncevent/receiver.go new file mode 100644 index 000000000..5c86e5400 --- /dev/null +++ b/pkg/syncevent/receiver.go @@ -0,0 +1,103 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncevent + +import ( + "sync/atomic" + + "gvisor.dev/gvisor/pkg/atomicbitops" +) + +// Receiver is an event sink that holds pending events and invokes a callback +// whenever new events become pending. Receiver's methods may be called +// concurrently from multiple goroutines. +// +// Receiver.Init() must be called before first use. +type Receiver struct { + // pending is the set of pending events. pending is accessed using atomic + // memory operations. + pending uint64 + + // cb is notified when new events become pending. cb is immutable after + // Init(). + cb ReceiverCallback +} + +// ReceiverCallback receives callbacks from a Receiver. +type ReceiverCallback interface { + // NotifyPending is called when the corresponding Receiver has new pending + // events. + // + // NotifyPending is called synchronously from Receiver.Notify(), so + // implementations must not take locks that may be held by callers of + // Receiver.Notify(). NotifyPending may be called concurrently from + // multiple goroutines. + NotifyPending() +} + +// Init must be called before first use of r. +func (r *Receiver) Init(cb ReceiverCallback) { + r.cb = cb +} + +// Pending returns the set of pending events. +func (r *Receiver) Pending() Set { + return Set(atomic.LoadUint64(&r.pending)) +} + +// Notify sets the given events as pending. +func (r *Receiver) Notify(es Set) { + p := Set(atomic.LoadUint64(&r.pending)) + // Optimization: Skip the atomic CAS on r.pending if all events are + // already pending. + if p&es == es { + return + } + // When this is uncontended (the common case), CAS is faster than + // atomic-OR because the former is inlined and the latter (which we + // implement in assembly ourselves) is not. + if !atomic.CompareAndSwapUint64(&r.pending, uint64(p), uint64(p|es)) { + // If the CAS fails, fall back to atomic-OR. + atomicbitops.OrUint64(&r.pending, uint64(es)) + } + r.cb.NotifyPending() +} + +// Ack unsets the given events as pending. +func (r *Receiver) Ack(es Set) { + p := Set(atomic.LoadUint64(&r.pending)) + // Optimization: Skip the atomic CAS on r.pending if all events are + // already not pending. + if p&es == 0 { + return + } + // When this is uncontended (the common case), CAS is faster than + // atomic-AND because the former is inlined and the latter (which we + // implement in assembly ourselves) is not. + if !atomic.CompareAndSwapUint64(&r.pending, uint64(p), uint64(p&^es)) { + // If the CAS fails, fall back to atomic-AND. + atomicbitops.AndUint64(&r.pending, ^uint64(es)) + } +} + +// PendingAndAckAll unsets all events as pending and returns the set of +// previously-pending events. +// +// PendingAndAckAll should only be used in preference to a call to Pending +// followed by a conditional call to Ack when the caller expects events to be +// pending (e.g. after a call to ReceiverCallback.NotifyPending()). +func (r *Receiver) PendingAndAckAll() Set { + return Set(atomic.SwapUint64(&r.pending, 0)) +} diff --git a/pkg/syncevent/source.go b/pkg/syncevent/source.go new file mode 100644 index 000000000..ddffb171a --- /dev/null +++ b/pkg/syncevent/source.go @@ -0,0 +1,59 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncevent + +// Source represents an event source. +type Source interface { + // SubscribeEvents causes the Source to notify the given Receiver of the + // given subset of events. + // + // Preconditions: r != nil. The ReceiverCallback for r must not take locks + // that are ordered prior to the Source; for example, it cannot call any + // Source methods. + SubscribeEvents(r *Receiver, filter Set) SubscriptionID + + // UnsubscribeEvents causes the Source to stop notifying the Receiver + // subscribed by a previous call to SubscribeEvents that returned the given + // SubscriptionID. + // + // Preconditions: UnsubscribeEvents may be called at most once for any + // given SubscriptionID. + UnsubscribeEvents(id SubscriptionID) +} + +// SubscriptionID identifies a call to Source.SubscribeEvents. +type SubscriptionID uint64 + +// UnsubscribeAndAck is a convenience function that unsubscribes r from the +// given events from src and also clears them from r. +func UnsubscribeAndAck(src Source, r *Receiver, filter Set, id SubscriptionID) { + src.UnsubscribeEvents(id) + r.Ack(filter) +} + +// NoopSource implements Source by never sending events to subscribed +// Receivers. +type NoopSource struct{} + +// SubscribeEvents implements Source.SubscribeEvents. +func (NoopSource) SubscribeEvents(*Receiver, Set) SubscriptionID { + return 0 +} + +// UnsubscribeEvents implements Source.UnsubscribeEvents. +func (NoopSource) UnsubscribeEvents(SubscriptionID) { +} + +// See Broadcaster for a non-noop implementations of Source. diff --git a/pkg/syncevent/syncevent.go b/pkg/syncevent/syncevent.go new file mode 100644 index 000000000..9fb6a06de --- /dev/null +++ b/pkg/syncevent/syncevent.go @@ -0,0 +1,32 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package syncevent provides efficient primitives for goroutine +// synchronization based on event bitmasks. +package syncevent + +// Set is a bitmask where each bit represents a distinct user-defined event. +// The event package does not treat any bits in Set specially. +type Set uint64 + +const ( + // NoEvents is a Set containing no events. + NoEvents = Set(0) + + // AllEvents is a Set containing all possible events. + AllEvents = ^Set(0) + + // MaxEvents is the number of distinct events that can be represented by a Set. + MaxEvents = 64 +) diff --git a/pkg/syncevent/syncevent_example_test.go b/pkg/syncevent/syncevent_example_test.go new file mode 100644 index 000000000..bfb18e2ea --- /dev/null +++ b/pkg/syncevent/syncevent_example_test.go @@ -0,0 +1,108 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncevent + +import ( + "fmt" + "sync/atomic" + "time" +) + +func Example_ioReadinessInterrputible() { + const ( + evReady = Set(1 << iota) + evInterrupt + ) + errNotReady := fmt.Errorf("not ready for I/O") + + // State of some I/O object. + var ( + br Broadcaster + ready uint32 + ) + doIO := func() error { + if atomic.LoadUint32(&ready) == 0 { + return errNotReady + } + return nil + } + go func() { + // The I/O object eventually becomes ready for I/O. + time.Sleep(100 * time.Millisecond) + // When it does, it first ensures that future calls to isReady() return + // true, then broadcasts the readiness event to Receivers. + atomic.StoreUint32(&ready, 1) + br.Broadcast(evReady) + }() + + // Each user of the I/O object owns a Waiter. + var w Waiter + w.Init() + // The Waiter may be asynchronously interruptible, e.g. for signal + // handling in the sentry. + go func() { + time.Sleep(200 * time.Millisecond) + w.Receiver().Notify(evInterrupt) + }() + + // To use the I/O object: + // + // Optionally, if the I/O object is likely to be ready, attempt I/O first. + err := doIO() + if err == nil { + // Success, we're done. + return /* nil */ + } + if err != errNotReady { + // Failure, I/O failed for some reason other than readiness. + return /* err */ + } + // Subscribe for readiness events from the I/O object. + id := br.SubscribeEvents(w.Receiver(), evReady) + // When we are finished blocking, unsubscribe from readiness events and + // remove readiness events from the pending event set. + defer UnsubscribeAndAck(&br, w.Receiver(), evReady, id) + for { + // Attempt I/O again. This must be done after the call to SubscribeEvents, + // since the I/O object might have become ready between the previous call + // to doIO and the call to SubscribeEvents. + err = doIO() + if err == nil { + return /* nil */ + } + if err != errNotReady { + return /* err */ + } + // Block until either the I/O object indicates it is ready, or we are + // interrupted. + events := w.Wait() + if events&evInterrupt != 0 { + // In the specific case of sentry signal handling, signal delivery + // is handled by another system, so we aren't responsible for + // acknowledging evInterrupt. + return /* errInterrupted */ + } + // Note that, in a concurrent context, the I/O object might become + // ready and then not ready again. To handle this: + // + // - evReady must be acknowledged before calling doIO() again (rather + // than after), so that if the I/O object becomes ready *again* after + // the call to doIO(), the readiness event is not lost. + // + // - We must loop instead of just calling doIO() once after receiving + // evReady. + w.Ack(evReady) + } +} diff --git a/pkg/syncevent/waiter_amd64.s b/pkg/syncevent/waiter_amd64.s new file mode 100644 index 000000000..985b56ae5 --- /dev/null +++ b/pkg/syncevent/waiter_amd64.s @@ -0,0 +1,32 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "textflag.h" + +// See waiter_noasm_unsafe.go for a description of waiterUnlock. +// +// func waiterUnlock(g unsafe.Pointer, wg *unsafe.Pointer) bool +TEXT ·waiterUnlock(SB),NOSPLIT,$0-24 + MOVQ g+0(FP), DI + MOVQ wg+8(FP), SI + + MOVQ $·preparingG(SB), AX + LOCK + CMPXCHGQ DI, 0(SI) + + SETEQ AX + MOVB AX, ret+16(FP) + + RET + diff --git a/pkg/syncevent/waiter_arm64.s b/pkg/syncevent/waiter_arm64.s new file mode 100644 index 000000000..20d7ac23b --- /dev/null +++ b/pkg/syncevent/waiter_arm64.s @@ -0,0 +1,34 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "textflag.h" + +// See waiter_noasm_unsafe.go for a description of waiterUnlock. +// +// func waiterUnlock(g unsafe.Pointer, wg *unsafe.Pointer) bool +TEXT ·waiterUnlock(SB),NOSPLIT,$0-24 + MOVD wg+8(FP), R0 + MOVD $·preparingG(SB), R1 + MOVD g+0(FP), R2 +again: + LDAXR (R0), R3 + CMP R1, R3 + BNE ok + STLXR R2, (R0), R3 + CBNZ R3, again +ok: + CSET EQ, R0 + MOVB R0, ret+16(FP) + RET + diff --git a/pkg/syncevent/waiter_asm_unsafe.go b/pkg/syncevent/waiter_asm_unsafe.go new file mode 100644 index 000000000..0995e9053 --- /dev/null +++ b/pkg/syncevent/waiter_asm_unsafe.go @@ -0,0 +1,24 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build amd64 arm64 + +package syncevent + +import ( + "unsafe" +) + +// See waiter_noasm_unsafe.go for a description of waiterUnlock. +func waiterUnlock(g unsafe.Pointer, wg *unsafe.Pointer) bool diff --git a/pkg/syncevent/waiter_noasm_unsafe.go b/pkg/syncevent/waiter_noasm_unsafe.go new file mode 100644 index 000000000..1c4b0e39a --- /dev/null +++ b/pkg/syncevent/waiter_noasm_unsafe.go @@ -0,0 +1,39 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// waiterUnlock is called from g0, so when the race detector is enabled, +// waiterUnlock must be implemented in assembly since no race context is +// available. +// +// +build !race +// +build !amd64,!arm64 + +package syncevent + +import ( + "sync/atomic" + "unsafe" +) + +// waiterUnlock is the "unlock function" passed to runtime.gopark by +// Waiter.Wait*. wg is &Waiter.g, and g is a pointer to the calling runtime.g. +// waiterUnlock returns true if Waiter.Wait should sleep and false if sleeping +// should be aborted. +// +//go:nosplit +func waiterUnlock(g unsafe.Pointer, wg *unsafe.Pointer) bool { + // The only way this CAS can fail is if a call to Waiter.NotifyPending() + // has replaced *wg with nil, in which case we should not sleep. + return atomic.CompareAndSwapPointer(wg, (unsafe.Pointer)(&preparingG), g) +} diff --git a/pkg/syncevent/waiter_test.go b/pkg/syncevent/waiter_test.go new file mode 100644 index 000000000..3c8cbcdd8 --- /dev/null +++ b/pkg/syncevent/waiter_test.go @@ -0,0 +1,414 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncevent + +import ( + "sync/atomic" + "testing" + "time" + + "gvisor.dev/gvisor/pkg/sleep" + "gvisor.dev/gvisor/pkg/sync" +) + +func TestWaiterAlreadyPending(t *testing.T) { + var w Waiter + w.Init() + want := Set(1) + w.Notify(want) + if got := w.Wait(); got != want { + t.Errorf("Waiter.Wait: got %#x, wanted %#x", got, want) + } +} + +func TestWaiterAsyncNotify(t *testing.T) { + var w Waiter + w.Init() + want := Set(1) + go func() { + time.Sleep(100 * time.Millisecond) + w.Notify(want) + }() + if got := w.Wait(); got != want { + t.Errorf("Waiter.Wait: got %#x, wanted %#x", got, want) + } +} + +func TestWaiterWaitFor(t *testing.T) { + var w Waiter + w.Init() + evWaited := Set(1) + evOther := Set(2) + w.Notify(evOther) + notifiedEvent := uint32(0) + go func() { + time.Sleep(100 * time.Millisecond) + atomic.StoreUint32(¬ifiedEvent, 1) + w.Notify(evWaited) + }() + if got, want := w.WaitFor(evWaited), evWaited|evOther; got != want { + t.Errorf("Waiter.WaitFor: got %#x, wanted %#x", got, want) + } + if atomic.LoadUint32(¬ifiedEvent) == 0 { + t.Errorf("Waiter.WaitFor returned before goroutine notified waited-for event") + } +} + +func TestWaiterWaitAndAckAll(t *testing.T) { + var w Waiter + w.Init() + w.Notify(AllEvents) + if got := w.WaitAndAckAll(); got != AllEvents { + t.Errorf("Waiter.WaitAndAckAll: got %#x, wanted %#x", got, AllEvents) + } + if got := w.Pending(); got != NoEvents { + t.Errorf("Waiter.WaitAndAckAll did not ack all events: got %#x, wanted 0", got) + } +} + +// BenchmarkWaiterX, BenchmarkSleeperX, and BenchmarkChannelX benchmark usage +// pattern X (described in terms of Waiter) with Waiter, sleep.Sleeper, and +// buffered chan struct{} respectively. When the maximum number of event +// sources is relevant, we use 3 event sources because this is representative +// of the kernel.Task.block() use case: an interrupt source, a timeout source, +// and the actual event source being waited on. + +// Event set used by most benchmarks. +const evBench Set = 1 + +// BenchmarkXxxNotifyRedundant measures how long it takes to notify a Waiter of +// an event that is already pending. + +func BenchmarkWaiterNotifyRedundant(b *testing.B) { + var w Waiter + w.Init() + w.Notify(evBench) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Notify(evBench) + } +} + +func BenchmarkSleeperNotifyRedundant(b *testing.B) { + var s sleep.Sleeper + var w sleep.Waker + s.AddWaker(&w, 0) + w.Assert() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Assert() + } +} + +func BenchmarkChannelNotifyRedundant(b *testing.B) { + ch := make(chan struct{}, 1) + ch <- struct{}{} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + select { + case ch <- struct{}{}: + default: + } + } +} + +// BenchmarkXxxNotifyWaitAck measures how long it takes to notify a Waiter an +// event, return that event using a blocking check, and then unset the event as +// pending. + +func BenchmarkWaiterNotifyWaitAck(b *testing.B) { + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Notify(evBench) + w.Wait() + w.Ack(evBench) + } +} + +func BenchmarkSleeperNotifyWaitAck(b *testing.B) { + var s sleep.Sleeper + var w sleep.Waker + s.AddWaker(&w, 0) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Assert() + s.Fetch(true) + } +} + +func BenchmarkChannelNotifyWaitAck(b *testing.B) { + ch := make(chan struct{}, 1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // notify + select { + case ch <- struct{}{}: + default: + } + + // wait + ack + <-ch + } +} + +// BenchmarkSleeperMultiNotifyWaitAck is equivalent to +// BenchmarkSleeperNotifyWaitAck, but also includes allocation of a +// temporary sleep.Waker. This is necessary when multiple goroutines may wait +// for the same event, since each sleep.Waker can wake only a single +// sleep.Sleeper. +// +// The syncevent package does not require a distinct object for each +// waiter-waker relationship, so BenchmarkWaiterNotifyWaitAck and +// BenchmarkWaiterMultiNotifyWaitAck would be identical. The analogous state +// for channels, runtime.sudog, is inescapably runtime-allocated, so +// BenchmarkChannelNotifyWaitAck and BenchmarkChannelMultiNotifyWaitAck would +// also be identical. + +func BenchmarkSleeperMultiNotifyWaitAck(b *testing.B) { + var s sleep.Sleeper + // The sleep package doesn't provide sync.Pool allocation of Wakers; + // we do for a fairer comparison. + wakerPool := sync.Pool{ + New: func() interface{} { + return &sleep.Waker{} + }, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w := wakerPool.Get().(*sleep.Waker) + s.AddWaker(w, 0) + w.Assert() + s.Fetch(true) + s.Done() + wakerPool.Put(w) + } +} + +// BenchmarkXxxTempNotifyWaitAck is equivalent to NotifyWaitAck, but also +// includes allocation of a temporary Waiter. This models the case where a +// goroutine not already associated with a Waiter needs one in order to block. +// +// The analogous state for channels is built into runtime.g, so +// BenchmarkChannelNotifyWaitAck and BenchmarkChannelTempNotifyWaitAck would be +// identical. + +func BenchmarkWaiterTempNotifyWaitAck(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + w := GetWaiter() + w.Notify(evBench) + w.Wait() + w.Ack(evBench) + PutWaiter(w) + } +} + +func BenchmarkSleeperTempNotifyWaitAck(b *testing.B) { + // The sleep package doesn't provide sync.Pool allocation of Sleepers; + // we do for a fairer comparison. + sleeperPool := sync.Pool{ + New: func() interface{} { + return &sleep.Sleeper{} + }, + } + var w sleep.Waker + + b.ResetTimer() + for i := 0; i < b.N; i++ { + s := sleeperPool.Get().(*sleep.Sleeper) + s.AddWaker(&w, 0) + w.Assert() + s.Fetch(true) + s.Done() + sleeperPool.Put(s) + } +} + +// BenchmarkXxxNotifyWaitMultiAck is equivalent to NotifyWaitAck, but allows +// for multiple event sources. + +func BenchmarkWaiterNotifyWaitMultiAck(b *testing.B) { + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Notify(evBench) + if e := w.Wait(); e != evBench { + b.Fatalf("Wait: got %#x, wanted %#x", e, evBench) + } + w.Ack(evBench) + } +} + +func BenchmarkSleeperNotifyWaitMultiAck(b *testing.B) { + var s sleep.Sleeper + var ws [3]sleep.Waker + for i := range ws { + s.AddWaker(&ws[i], i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ws[0].Assert() + if id, _ := s.Fetch(true); id != 0 { + b.Fatalf("Fetch: got %d, wanted 0", id) + } + } +} + +func BenchmarkChannelNotifyWaitMultiAck(b *testing.B) { + ch0 := make(chan struct{}, 1) + ch1 := make(chan struct{}, 1) + ch2 := make(chan struct{}, 1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // notify + select { + case ch0 <- struct{}{}: + default: + } + + // wait + clear + select { + case <-ch0: + // ok + case <-ch1: + b.Fatalf("received from ch1") + case <-ch2: + b.Fatalf("received from ch2") + } + } +} + +// BenchmarkXxxNotifyAsyncWaitAck measures how long it takes to wait for an +// event while another goroutine signals the event. This assumes that a new +// goroutine doesn't run immediately (i.e. the creator of a new goroutine is +// allowed to go to sleep before the new goroutine has a chance to run). + +func BenchmarkWaiterNotifyAsyncWaitAck(b *testing.B) { + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + w.Notify(1) + }() + w.Wait() + w.Ack(evBench) + } +} + +func BenchmarkSleeperNotifyAsyncWaitAck(b *testing.B) { + var s sleep.Sleeper + var w sleep.Waker + s.AddWaker(&w, 0) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + w.Assert() + }() + s.Fetch(true) + } +} + +func BenchmarkChannelNotifyAsyncWaitAck(b *testing.B) { + ch := make(chan struct{}, 1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + select { + case ch <- struct{}{}: + default: + } + }() + <-ch + } +} + +// BenchmarkXxxNotifyAsyncWaitMultiAck is equivalent to NotifyAsyncWaitAck, but +// allows for multiple event sources. + +func BenchmarkWaiterNotifyAsyncWaitMultiAck(b *testing.B) { + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + w.Notify(evBench) + }() + if e := w.Wait(); e != evBench { + b.Fatalf("Wait: got %#x, wanted %#x", e, evBench) + } + w.Ack(evBench) + } +} + +func BenchmarkSleeperNotifyAsyncWaitMultiAck(b *testing.B) { + var s sleep.Sleeper + var ws [3]sleep.Waker + for i := range ws { + s.AddWaker(&ws[i], i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + ws[0].Assert() + }() + if id, _ := s.Fetch(true); id != 0 { + b.Fatalf("Fetch: got %d, expected 0", id) + } + } +} + +func BenchmarkChannelNotifyAsyncWaitMultiAck(b *testing.B) { + ch0 := make(chan struct{}, 1) + ch1 := make(chan struct{}, 1) + ch2 := make(chan struct{}, 1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + select { + case ch0 <- struct{}{}: + default: + } + }() + + select { + case <-ch0: + // ok + case <-ch1: + b.Fatalf("received from ch1") + case <-ch2: + b.Fatalf("received from ch2") + } + } +} diff --git a/pkg/syncevent/waiter_unsafe.go b/pkg/syncevent/waiter_unsafe.go new file mode 100644 index 000000000..112e0e604 --- /dev/null +++ b/pkg/syncevent/waiter_unsafe.go @@ -0,0 +1,206 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build go1.11 +// +build !go1.15 + +// Check go:linkname function signatures when updating Go version. + +package syncevent + +import ( + "sync/atomic" + "unsafe" + + "gvisor.dev/gvisor/pkg/sync" +) + +//go:linkname gopark runtime.gopark +func gopark(unlockf func(unsafe.Pointer, *unsafe.Pointer) bool, wg *unsafe.Pointer, reason uint8, traceEv byte, traceskip int) + +//go:linkname goready runtime.goready +func goready(g unsafe.Pointer, traceskip int) + +const ( + waitReasonSelect = 9 // Go: src/runtime/runtime2.go + traceEvGoBlockSelect = 24 // Go: src/runtime/trace.go +) + +// Waiter allows a goroutine to block on pending events received by a Receiver. +// +// Waiter.Init() must be called before first use. +type Waiter struct { + r Receiver + + // g is one of: + // + // - nil: No goroutine is blocking in Wait. + // + // - &preparingG: A goroutine is in Wait preparing to sleep, but hasn't yet + // completed waiterUnlock(). Thus the wait can only be interrupted by + // replacing the value of g with nil (the G may not be in state Gwaiting + // yet, so we can't call goready.) + // + // - Otherwise: g is a pointer to the runtime.g in state Gwaiting for the + // goroutine blocked in Wait, which can only be woken by calling goready. + g unsafe.Pointer `state:"zerovalue"` +} + +// Sentinel object for Waiter.g. +var preparingG struct{} + +// Init must be called before first use of w. +func (w *Waiter) Init() { + w.r.Init(w) +} + +// Receiver returns the Receiver that receives events that unblock calls to +// w.Wait(). +func (w *Waiter) Receiver() *Receiver { + return &w.r +} + +// Pending returns the set of pending events. +func (w *Waiter) Pending() Set { + return w.r.Pending() +} + +// Wait blocks until at least one event is pending, then returns the set of +// pending events. It does not affect the set of pending events; callers must +// call w.Ack() to do so, or use w.WaitAndAck() instead. +// +// Precondition: Only one goroutine may call any Wait* method at a time. +func (w *Waiter) Wait() Set { + return w.WaitFor(AllEvents) +} + +// WaitFor blocks until at least one event in es is pending, then returns the +// set of pending events (including those not in es). It does not affect the +// set of pending events; callers must call w.Ack() to do so. +// +// Precondition: Only one goroutine may call any Wait* method at a time. +func (w *Waiter) WaitFor(es Set) Set { + for { + // Optimization: Skip the atomic store to w.g if an event is already + // pending. + if p := w.r.Pending(); p&es != NoEvents { + return p + } + + // Indicate that we're preparing to go to sleep. + atomic.StorePointer(&w.g, (unsafe.Pointer)(&preparingG)) + + // If an event is pending, abort the sleep. + if p := w.r.Pending(); p&es != NoEvents { + atomic.StorePointer(&w.g, nil) + return p + } + + // If w.g is still preparingG (i.e. w.NotifyPending() has not been + // called or has not reached atomic.SwapPointer()), go to sleep until + // w.NotifyPending() => goready(). + gopark(waiterUnlock, &w.g, waitReasonSelect, traceEvGoBlockSelect, 0) + } +} + +// Ack marks the given events as not pending. +func (w *Waiter) Ack(es Set) { + w.r.Ack(es) +} + +// WaitAndAckAll blocks until at least one event is pending, then marks all +// events as not pending and returns the set of previously-pending events. +// +// Precondition: Only one goroutine may call any Wait* method at a time. +func (w *Waiter) WaitAndAckAll() Set { + // Optimization: Skip the atomic store to w.g if an event is already + // pending. Call Pending() first since, in the common case that events are + // not yet pending, this skips an atomic swap on w.r.pending. + if w.r.Pending() != NoEvents { + if p := w.r.PendingAndAckAll(); p != NoEvents { + return p + } + } + + for { + // Indicate that we're preparing to go to sleep. + atomic.StorePointer(&w.g, (unsafe.Pointer)(&preparingG)) + + // If an event is pending, abort the sleep. + if w.r.Pending() != NoEvents { + if p := w.r.PendingAndAckAll(); p != NoEvents { + atomic.StorePointer(&w.g, nil) + return p + } + } + + // If w.g is still preparingG (i.e. w.NotifyPending() has not been + // called or has not reached atomic.SwapPointer()), go to sleep until + // w.NotifyPending() => goready(). + gopark(waiterUnlock, &w.g, waitReasonSelect, traceEvGoBlockSelect, 0) + + // Check for pending events. We call PendingAndAckAll() directly now since + // we only expect to be woken after events become pending. + if p := w.r.PendingAndAckAll(); p != NoEvents { + return p + } + } +} + +// Notify marks the given events as pending, possibly unblocking concurrent +// calls to w.Wait() or w.WaitFor(). +func (w *Waiter) Notify(es Set) { + w.r.Notify(es) +} + +// NotifyPending implements ReceiverCallback.NotifyPending. Users of Waiter +// should not call NotifyPending. +func (w *Waiter) NotifyPending() { + // Optimization: Skip the atomic swap on w.g if there is no sleeping + // goroutine. NotifyPending is called after w.r.Pending() is updated, so + // concurrent and future calls to w.Wait() will observe pending events and + // abort sleeping. + if atomic.LoadPointer(&w.g) == nil { + return + } + // Wake a sleeping G, or prevent a G that is preparing to sleep from doing + // so. Swap is needed here to ensure that only one call to NotifyPending + // calls goready. + if g := atomic.SwapPointer(&w.g, nil); g != nil && g != (unsafe.Pointer)(&preparingG) { + goready(g, 0) + } +} + +var waiterPool = sync.Pool{ + New: func() interface{} { + w := &Waiter{} + w.Init() + return w + }, +} + +// GetWaiter returns an unused Waiter. PutWaiter should be called to release +// the Waiter once it is no longer needed. +// +// Where possible, users should prefer to associate each goroutine that calls +// Waiter.Wait() with a distinct pre-allocated Waiter to avoid allocation of +// Waiters in hot paths. +func GetWaiter() *Waiter { + return waiterPool.Get().(*Waiter) +} + +// PutWaiter releases an unused Waiter previously returned by GetWaiter. +func PutWaiter(w *Waiter) { + waiterPool.Put(w) +} |