From 55c553ae8c7937be4a7e10e0c7a727d132317e89 Mon Sep 17 00:00:00 2001 From: gVisor bot Date: Tue, 18 Feb 2020 15:17:45 -0800 Subject: Add //pkg/syncevent. Package syncevent is intended to subsume ~all uses of channels in the sentry (including //pkg/waiter), as well as //pkg/sleep. Compared to channels: - Delivery of events to a syncevent.Receiver allows *synchronous* execution of an arbitrary callback, whereas delivery of events to a channel requires a goroutine to receive from that channel, resulting in substantial scheduling overhead. (This is also part of the motivation for the waiter package.) - syncevent.Waiter can wait on multiple event sources without the high O(N) overhead of select. (This is the same motivation as for the sleep package.) Compared to the waiter package: - syncevent.Waiters are intended to be persistent (i.e. per-kernel.Task), and syncevent.Broadcaster (analogous to waiter.Queue) is a hash table rather than a linked list, such that blocking is (usually) allocation-free. - syncevent.Source (analogous to waiter.Waitable) does not include an equivalent to waiter.Waitable.Readiness(), since this is inappropriate for transient events (see e.g. //pkg/sentry/kernel/time.ClockEventSource). Compared to the sleep package: - syncevent events are represented by bits in a bitmask rather than discrete sleep.Waker objects, reducing overhead and making it feasible to broadcast events to multiple syncevent.Receivers. - syncevent.Receiver invokes an arbitrary callback, which is required by the sentry's epoll implementation. (syncevent.Waiter, which is analogous to sleep.Sleeper, pairs a syncevent.Receiver with a callback that wakes a waiting goroutine; the implementation of this aspect is nearly identical to that of sleep.Sleeper, except that it represents *runtime.g as unsafe.Pointer rather than uintptr.) - syncevent.Waiter.Wait (analogous to sleep.Sleeper.Fetch(block=true)) does not automatically un-assert returned events. This is useful in cases where the path for handling an event is not the same as the path that observes it, such as for application signals (a la Linux's TIF_SIGPENDING). - Unlike sleep.Sleeper, which Fetches Wakers in the order that they were Asserted, the event bitmasks used by syncevent.Receiver have no way of preserving event arrival order. (This is similar to select, which goes out of its way to randomize event ordering.) The disadvantage of the syncevent package is that, since events are represented by bits in a uint64 bitmask, each syncevent.Receiver can "only" multiplex between 64 distinct events; this does not affect any known use case. Benchmarks: BenchmarkBroadcasterSubscribeUnsubscribe BenchmarkBroadcasterSubscribeUnsubscribe-12 45133884 26.3 ns/op BenchmarkMapSubscribeUnsubscribe BenchmarkMapSubscribeUnsubscribe-12 28504662 41.8 ns/op BenchmarkQueueSubscribeUnsubscribe BenchmarkQueueSubscribeUnsubscribe-12 22747668 45.6 ns/op BenchmarkBroadcasterSubscribeUnsubscribeBatch BenchmarkBroadcasterSubscribeUnsubscribeBatch-12 31609177 37.8 ns/op BenchmarkMapSubscribeUnsubscribeBatch BenchmarkMapSubscribeUnsubscribeBatch-12 17563906 62.1 ns/op BenchmarkQueueSubscribeUnsubscribeBatch BenchmarkQueueSubscribeUnsubscribeBatch-12 26248838 46.6 ns/op BenchmarkBroadcasterBroadcastRedundant BenchmarkBroadcasterBroadcastRedundant/0 BenchmarkBroadcasterBroadcastRedundant/0-12 100907563 11.8 ns/op BenchmarkBroadcasterBroadcastRedundant/1 BenchmarkBroadcasterBroadcastRedundant/1-12 85103068 13.3 ns/op BenchmarkBroadcasterBroadcastRedundant/4 BenchmarkBroadcasterBroadcastRedundant/4-12 52716502 22.3 ns/op BenchmarkBroadcasterBroadcastRedundant/16 BenchmarkBroadcasterBroadcastRedundant/16-12 20278165 58.7 ns/op BenchmarkBroadcasterBroadcastRedundant/64 BenchmarkBroadcasterBroadcastRedundant/64-12 5905428 205 ns/op BenchmarkMapBroadcastRedundant BenchmarkMapBroadcastRedundant/0 BenchmarkMapBroadcastRedundant/0-12 87532734 13.5 ns/op BenchmarkMapBroadcastRedundant/1 BenchmarkMapBroadcastRedundant/1-12 28488411 36.3 ns/op BenchmarkMapBroadcastRedundant/4 BenchmarkMapBroadcastRedundant/4-12 19628920 60.9 ns/op BenchmarkMapBroadcastRedundant/16 BenchmarkMapBroadcastRedundant/16-12 6026980 192 ns/op BenchmarkMapBroadcastRedundant/64 BenchmarkMapBroadcastRedundant/64-12 1640858 754 ns/op BenchmarkQueueBroadcastRedundant BenchmarkQueueBroadcastRedundant/0 BenchmarkQueueBroadcastRedundant/0-12 96904807 12.0 ns/op BenchmarkQueueBroadcastRedundant/1 BenchmarkQueueBroadcastRedundant/1-12 73521873 16.3 ns/op BenchmarkQueueBroadcastRedundant/4 BenchmarkQueueBroadcastRedundant/4-12 39209468 31.2 ns/op BenchmarkQueueBroadcastRedundant/16 BenchmarkQueueBroadcastRedundant/16-12 10810058 105 ns/op BenchmarkQueueBroadcastRedundant/64 BenchmarkQueueBroadcastRedundant/64-12 2998046 376 ns/op BenchmarkBroadcasterBroadcastAck BenchmarkBroadcasterBroadcastAck/1 BenchmarkBroadcasterBroadcastAck/1-12 44472397 26.4 ns/op BenchmarkBroadcasterBroadcastAck/4 BenchmarkBroadcasterBroadcastAck/4-12 17653509 69.7 ns/op BenchmarkBroadcasterBroadcastAck/16 BenchmarkBroadcasterBroadcastAck/16-12 4082617 260 ns/op BenchmarkBroadcasterBroadcastAck/64 BenchmarkBroadcasterBroadcastAck/64-12 1220534 1027 ns/op BenchmarkMapBroadcastAck BenchmarkMapBroadcastAck/1 BenchmarkMapBroadcastAck/1-12 26760705 44.2 ns/op BenchmarkMapBroadcastAck/4 BenchmarkMapBroadcastAck/4-12 11495636 100 ns/op BenchmarkMapBroadcastAck/16 BenchmarkMapBroadcastAck/16-12 2937590 343 ns/op BenchmarkMapBroadcastAck/64 BenchmarkMapBroadcastAck/64-12 861037 1344 ns/op BenchmarkQueueBroadcastAck BenchmarkQueueBroadcastAck/1 BenchmarkQueueBroadcastAck/1-12 19832679 55.0 ns/op BenchmarkQueueBroadcastAck/4 BenchmarkQueueBroadcastAck/4-12 5618214 189 ns/op BenchmarkQueueBroadcastAck/16 BenchmarkQueueBroadcastAck/16-12 1569980 713 ns/op BenchmarkQueueBroadcastAck/64 BenchmarkQueueBroadcastAck/64-12 437672 2814 ns/op BenchmarkWaiterNotifyRedundant BenchmarkWaiterNotifyRedundant-12 650823090 1.96 ns/op BenchmarkSleeperNotifyRedundant BenchmarkSleeperNotifyRedundant-12 619871544 1.61 ns/op BenchmarkChannelNotifyRedundant BenchmarkChannelNotifyRedundant-12 298903778 3.67 ns/op BenchmarkWaiterNotifyWaitAck BenchmarkWaiterNotifyWaitAck-12 68358360 17.8 ns/op BenchmarkSleeperNotifyWaitAck BenchmarkSleeperNotifyWaitAck-12 25044883 41.2 ns/op BenchmarkChannelNotifyWaitAck BenchmarkChannelNotifyWaitAck-12 29572404 40.2 ns/op BenchmarkSleeperMultiNotifyWaitAck BenchmarkSleeperMultiNotifyWaitAck-12 16122969 73.8 ns/op BenchmarkWaiterTempNotifyWaitAck BenchmarkWaiterTempNotifyWaitAck-12 46111489 25.8 ns/op BenchmarkSleeperTempNotifyWaitAck BenchmarkSleeperTempNotifyWaitAck-12 15541882 73.6 ns/op BenchmarkWaiterNotifyWaitMultiAck BenchmarkWaiterNotifyWaitMultiAck-12 65878500 18.2 ns/op BenchmarkSleeperNotifyWaitMultiAck BenchmarkSleeperNotifyWaitMultiAck-12 28798623 41.5 ns/op BenchmarkChannelNotifyWaitMultiAck BenchmarkChannelNotifyWaitMultiAck-12 11308468 101 ns/op BenchmarkWaiterNotifyAsyncWaitAck BenchmarkWaiterNotifyAsyncWaitAck-12 2475387 492 ns/op BenchmarkSleeperNotifyAsyncWaitAck BenchmarkSleeperNotifyAsyncWaitAck-12 2184507 518 ns/op BenchmarkChannelNotifyAsyncWaitAck BenchmarkChannelNotifyAsyncWaitAck-12 2120365 562 ns/op BenchmarkWaiterNotifyAsyncWaitMultiAck BenchmarkWaiterNotifyAsyncWaitMultiAck-12 2351247 494 ns/op BenchmarkSleeperNotifyAsyncWaitMultiAck BenchmarkSleeperNotifyAsyncWaitMultiAck-12 2205799 522 ns/op BenchmarkChannelNotifyAsyncWaitMultiAck BenchmarkChannelNotifyAsyncWaitMultiAck-12 1238079 928 ns/op Updates #1074 PiperOrigin-RevId: 295834087 --- pkg/syncevent/broadcaster_test.go | 376 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 376 insertions(+) create mode 100644 pkg/syncevent/broadcaster_test.go (limited to 'pkg/syncevent/broadcaster_test.go') diff --git a/pkg/syncevent/broadcaster_test.go b/pkg/syncevent/broadcaster_test.go new file mode 100644 index 000000000..e88779e23 --- /dev/null +++ b/pkg/syncevent/broadcaster_test.go @@ -0,0 +1,376 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncevent + +import ( + "fmt" + "math/rand" + "testing" + + "gvisor.dev/gvisor/pkg/sync" + "gvisor.dev/gvisor/pkg/waiter" +) + +func TestBroadcasterFilter(t *testing.T) { + const numReceivers = 2 * MaxEvents + + var br Broadcaster + ws := make([]Waiter, numReceivers) + for i := range ws { + ws[i].Init() + br.SubscribeEvents(ws[i].Receiver(), 1<<(i%MaxEvents)) + } + for ev := 0; ev < MaxEvents; ev++ { + br.Broadcast(1 << ev) + for i := range ws { + want := NoEvents + if i%MaxEvents == ev { + want = 1 << ev + } + if got := ws[i].Receiver().PendingAndAckAll(); got != want { + t.Errorf("after Broadcast of event %d: waiter %d has pending event set %#x, wanted %#x", ev, i, got, want) + } + } + } +} + +// TestBroadcasterManySubscriptions tests that subscriptions are not lost by +// table expansion/compaction. +func TestBroadcasterManySubscriptions(t *testing.T) { + const numReceivers = 5000 // arbitrary + + var br Broadcaster + ws := make([]Waiter, numReceivers) + for i := range ws { + ws[i].Init() + } + + ids := make([]SubscriptionID, numReceivers) + for i := 0; i < numReceivers; i++ { + // Subscribe receiver i. + ids[i] = br.SubscribeEvents(ws[i].Receiver(), 1) + // Check that receivers [0, i] are subscribed. + br.Broadcast(1) + for j := 0; j <= i; j++ { + if ws[j].Pending() != 1 { + t.Errorf("receiver %d did not receive an event after subscription of receiver %d", j, i) + } + ws[j].Ack(1) + } + } + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numReceivers) + for i := 0; i < numReceivers; i++ { + // Unsubscribe receiver unsub[i]. + br.UnsubscribeEvents(ids[unsub[i]]) + // Check that receivers [unsub[0], unsub[i]] are not subscribed, and that + // receivers (unsub[i], unsub[numReceivers]) are still subscribed. + br.Broadcast(1) + for j := 0; j <= i; j++ { + if ws[unsub[j]].Pending() != 0 { + t.Errorf("unsub iteration %d: receiver %d received an event after unsubscription of receiver %d", i, unsub[j], unsub[i]) + } + } + for j := i + 1; j < numReceivers; j++ { + if ws[unsub[j]].Pending() != 1 { + t.Errorf("unsub iteration %d: receiver %d did not receive an event after unsubscription of receiver %d", i, unsub[j], unsub[i]) + } + ws[unsub[j]].Ack(1) + } + } +} + +var ( + receiverCountsNonZero = []int{1, 4, 16, 64} + receiverCountsIncludingZero = append([]int{0}, receiverCountsNonZero...) +) + +// BenchmarkBroadcasterX, BenchmarkMapX, and BenchmarkQueueX benchmark usage +// pattern X (described in terms of Broadcaster) with Broadcaster, a +// Mutex-protected map[*Receiver]Set, and waiter.Queue respectively. + +// BenchmarkXxxSubscribeUnsubscribe measures the cost of a Subscribe/Unsubscribe +// cycle. + +func BenchmarkBroadcasterSubscribeUnsubscribe(b *testing.B) { + var br Broadcaster + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + id := br.SubscribeEvents(w.Receiver(), 1) + br.UnsubscribeEvents(id) + } +} + +func BenchmarkMapSubscribeUnsubscribe(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + mu.Lock() + m[w.Receiver()] = Set(1) + mu.Unlock() + mu.Lock() + delete(m, w.Receiver()) + mu.Unlock() + } +} + +func BenchmarkQueueSubscribeUnsubscribe(b *testing.B) { + var q waiter.Queue + e, _ := waiter.NewChannelEntry(nil) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.EventRegister(&e, 1) + q.EventUnregister(&e) + } +} + +// BenchmarkXxxSubscribeUnsubscribeBatch is similar to +// BenchmarkXxxSubscribeUnsubscribe, but subscribes and unsubscribes a large +// number of Receivers at a time in order to measure the amortized overhead of +// table expansion/compaction. (Since waiter.Queue is implemented using a +// linked list, BenchmarkQueueSubscribeUnsubscribe and +// BenchmarkQueueSubscribeUnsubscribeBatch should produce nearly the same +// result.) + +const numBatchReceivers = 1000 + +func BenchmarkBroadcasterSubscribeUnsubscribeBatch(b *testing.B) { + var br Broadcaster + ws := make([]Waiter, numBatchReceivers) + for i := range ws { + ws[i].Init() + } + ids := make([]SubscriptionID, numBatchReceivers) + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numBatchReceivers) + + b.ResetTimer() + for i := 0; i < b.N/numBatchReceivers; i++ { + for j := 0; j < numBatchReceivers; j++ { + ids[j] = br.SubscribeEvents(ws[j].Receiver(), 1) + } + for j := 0; j < numBatchReceivers; j++ { + br.UnsubscribeEvents(ids[unsub[j]]) + } + } +} + +func BenchmarkMapSubscribeUnsubscribeBatch(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + ws := make([]Waiter, numBatchReceivers) + for i := range ws { + ws[i].Init() + } + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numBatchReceivers) + + b.ResetTimer() + for i := 0; i < b.N/numBatchReceivers; i++ { + for j := 0; j < numBatchReceivers; j++ { + mu.Lock() + m[ws[j].Receiver()] = Set(1) + mu.Unlock() + } + for j := 0; j < numBatchReceivers; j++ { + mu.Lock() + delete(m, ws[unsub[j]].Receiver()) + mu.Unlock() + } + } +} + +func BenchmarkQueueSubscribeUnsubscribeBatch(b *testing.B) { + var q waiter.Queue + es := make([]waiter.Entry, numBatchReceivers) + for i := range es { + es[i], _ = waiter.NewChannelEntry(nil) + } + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numBatchReceivers) + + b.ResetTimer() + for i := 0; i < b.N/numBatchReceivers; i++ { + for j := 0; j < numBatchReceivers; j++ { + q.EventRegister(&es[j], 1) + } + for j := 0; j < numBatchReceivers; j++ { + q.EventUnregister(&es[unsub[j]]) + } + } +} + +// BenchmarkXxxBroadcastRedundant measures how long it takes to Broadcast +// already-pending events to multiple Receivers. + +func BenchmarkBroadcasterBroadcastRedundant(b *testing.B) { + for _, n := range receiverCountsIncludingZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var br Broadcaster + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + br.SubscribeEvents(ws[i].Receiver(), 1) + } + br.Broadcast(1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + br.Broadcast(1) + } + }) + } +} + +func BenchmarkMapBroadcastRedundant(b *testing.B) { + for _, n := range receiverCountsIncludingZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + m[ws[i].Receiver()] = Set(1) + } + mu.Lock() + for r := range m { + r.Notify(1) + } + mu.Unlock() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + mu.Lock() + for r := range m { + r.Notify(1) + } + mu.Unlock() + } + }) + } +} + +func BenchmarkQueueBroadcastRedundant(b *testing.B) { + for _, n := range receiverCountsIncludingZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var q waiter.Queue + for i := 0; i < n; i++ { + e, _ := waiter.NewChannelEntry(nil) + q.EventRegister(&e, 1) + } + q.Notify(1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Notify(1) + } + }) + } +} + +// BenchmarkXxxBroadcastAck measures how long it takes to Broadcast events to +// multiple Receivers, check that all Receivers have received the event, and +// clear the event from all Receivers. + +func BenchmarkBroadcasterBroadcastAck(b *testing.B) { + for _, n := range receiverCountsNonZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var br Broadcaster + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + br.SubscribeEvents(ws[i].Receiver(), 1) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + br.Broadcast(1) + for j := range ws { + if got, want := ws[j].Pending(), Set(1); got != want { + b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want) + } + ws[j].Ack(1) + } + } + }) + } +} + +func BenchmarkMapBroadcastAck(b *testing.B) { + for _, n := range receiverCountsNonZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + m[ws[i].Receiver()] = Set(1) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + mu.Lock() + for r := range m { + r.Notify(1) + } + mu.Unlock() + for j := range ws { + if got, want := ws[j].Pending(), Set(1); got != want { + b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want) + } + ws[j].Ack(1) + } + } + }) + } +} + +func BenchmarkQueueBroadcastAck(b *testing.B) { + for _, n := range receiverCountsNonZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var q waiter.Queue + chs := make([]chan struct{}, n) + for i := range chs { + e, ch := waiter.NewChannelEntry(nil) + q.EventRegister(&e, 1) + chs[i] = ch + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Notify(1) + for _, ch := range chs { + select { + case <-ch: + default: + b.Fatalf("channel did not receive event") + } + } + } + }) + } +} -- cgit v1.2.3