diff options
Diffstat (limited to 'pkg/syncevent/broadcaster_test.go')
-rw-r--r-- | pkg/syncevent/broadcaster_test.go | 376 |
1 files changed, 376 insertions, 0 deletions
diff --git a/pkg/syncevent/broadcaster_test.go b/pkg/syncevent/broadcaster_test.go new file mode 100644 index 000000000..e88779e23 --- /dev/null +++ b/pkg/syncevent/broadcaster_test.go @@ -0,0 +1,376 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncevent + +import ( + "fmt" + "math/rand" + "testing" + + "gvisor.dev/gvisor/pkg/sync" + "gvisor.dev/gvisor/pkg/waiter" +) + +func TestBroadcasterFilter(t *testing.T) { + const numReceivers = 2 * MaxEvents + + var br Broadcaster + ws := make([]Waiter, numReceivers) + for i := range ws { + ws[i].Init() + br.SubscribeEvents(ws[i].Receiver(), 1<<(i%MaxEvents)) + } + for ev := 0; ev < MaxEvents; ev++ { + br.Broadcast(1 << ev) + for i := range ws { + want := NoEvents + if i%MaxEvents == ev { + want = 1 << ev + } + if got := ws[i].Receiver().PendingAndAckAll(); got != want { + t.Errorf("after Broadcast of event %d: waiter %d has pending event set %#x, wanted %#x", ev, i, got, want) + } + } + } +} + +// TestBroadcasterManySubscriptions tests that subscriptions are not lost by +// table expansion/compaction. +func TestBroadcasterManySubscriptions(t *testing.T) { + const numReceivers = 5000 // arbitrary + + var br Broadcaster + ws := make([]Waiter, numReceivers) + for i := range ws { + ws[i].Init() + } + + ids := make([]SubscriptionID, numReceivers) + for i := 0; i < numReceivers; i++ { + // Subscribe receiver i. + ids[i] = br.SubscribeEvents(ws[i].Receiver(), 1) + // Check that receivers [0, i] are subscribed. + br.Broadcast(1) + for j := 0; j <= i; j++ { + if ws[j].Pending() != 1 { + t.Errorf("receiver %d did not receive an event after subscription of receiver %d", j, i) + } + ws[j].Ack(1) + } + } + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numReceivers) + for i := 0; i < numReceivers; i++ { + // Unsubscribe receiver unsub[i]. + br.UnsubscribeEvents(ids[unsub[i]]) + // Check that receivers [unsub[0], unsub[i]] are not subscribed, and that + // receivers (unsub[i], unsub[numReceivers]) are still subscribed. + br.Broadcast(1) + for j := 0; j <= i; j++ { + if ws[unsub[j]].Pending() != 0 { + t.Errorf("unsub iteration %d: receiver %d received an event after unsubscription of receiver %d", i, unsub[j], unsub[i]) + } + } + for j := i + 1; j < numReceivers; j++ { + if ws[unsub[j]].Pending() != 1 { + t.Errorf("unsub iteration %d: receiver %d did not receive an event after unsubscription of receiver %d", i, unsub[j], unsub[i]) + } + ws[unsub[j]].Ack(1) + } + } +} + +var ( + receiverCountsNonZero = []int{1, 4, 16, 64} + receiverCountsIncludingZero = append([]int{0}, receiverCountsNonZero...) +) + +// BenchmarkBroadcasterX, BenchmarkMapX, and BenchmarkQueueX benchmark usage +// pattern X (described in terms of Broadcaster) with Broadcaster, a +// Mutex-protected map[*Receiver]Set, and waiter.Queue respectively. + +// BenchmarkXxxSubscribeUnsubscribe measures the cost of a Subscribe/Unsubscribe +// cycle. + +func BenchmarkBroadcasterSubscribeUnsubscribe(b *testing.B) { + var br Broadcaster + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + id := br.SubscribeEvents(w.Receiver(), 1) + br.UnsubscribeEvents(id) + } +} + +func BenchmarkMapSubscribeUnsubscribe(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + var w Waiter + w.Init() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + mu.Lock() + m[w.Receiver()] = Set(1) + mu.Unlock() + mu.Lock() + delete(m, w.Receiver()) + mu.Unlock() + } +} + +func BenchmarkQueueSubscribeUnsubscribe(b *testing.B) { + var q waiter.Queue + e, _ := waiter.NewChannelEntry(nil) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.EventRegister(&e, 1) + q.EventUnregister(&e) + } +} + +// BenchmarkXxxSubscribeUnsubscribeBatch is similar to +// BenchmarkXxxSubscribeUnsubscribe, but subscribes and unsubscribes a large +// number of Receivers at a time in order to measure the amortized overhead of +// table expansion/compaction. (Since waiter.Queue is implemented using a +// linked list, BenchmarkQueueSubscribeUnsubscribe and +// BenchmarkQueueSubscribeUnsubscribeBatch should produce nearly the same +// result.) + +const numBatchReceivers = 1000 + +func BenchmarkBroadcasterSubscribeUnsubscribeBatch(b *testing.B) { + var br Broadcaster + ws := make([]Waiter, numBatchReceivers) + for i := range ws { + ws[i].Init() + } + ids := make([]SubscriptionID, numBatchReceivers) + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numBatchReceivers) + + b.ResetTimer() + for i := 0; i < b.N/numBatchReceivers; i++ { + for j := 0; j < numBatchReceivers; j++ { + ids[j] = br.SubscribeEvents(ws[j].Receiver(), 1) + } + for j := 0; j < numBatchReceivers; j++ { + br.UnsubscribeEvents(ids[unsub[j]]) + } + } +} + +func BenchmarkMapSubscribeUnsubscribeBatch(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + ws := make([]Waiter, numBatchReceivers) + for i := range ws { + ws[i].Init() + } + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numBatchReceivers) + + b.ResetTimer() + for i := 0; i < b.N/numBatchReceivers; i++ { + for j := 0; j < numBatchReceivers; j++ { + mu.Lock() + m[ws[j].Receiver()] = Set(1) + mu.Unlock() + } + for j := 0; j < numBatchReceivers; j++ { + mu.Lock() + delete(m, ws[unsub[j]].Receiver()) + mu.Unlock() + } + } +} + +func BenchmarkQueueSubscribeUnsubscribeBatch(b *testing.B) { + var q waiter.Queue + es := make([]waiter.Entry, numBatchReceivers) + for i := range es { + es[i], _ = waiter.NewChannelEntry(nil) + } + + // Generate a random order for unsubscriptions. + unsub := rand.Perm(numBatchReceivers) + + b.ResetTimer() + for i := 0; i < b.N/numBatchReceivers; i++ { + for j := 0; j < numBatchReceivers; j++ { + q.EventRegister(&es[j], 1) + } + for j := 0; j < numBatchReceivers; j++ { + q.EventUnregister(&es[unsub[j]]) + } + } +} + +// BenchmarkXxxBroadcastRedundant measures how long it takes to Broadcast +// already-pending events to multiple Receivers. + +func BenchmarkBroadcasterBroadcastRedundant(b *testing.B) { + for _, n := range receiverCountsIncludingZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var br Broadcaster + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + br.SubscribeEvents(ws[i].Receiver(), 1) + } + br.Broadcast(1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + br.Broadcast(1) + } + }) + } +} + +func BenchmarkMapBroadcastRedundant(b *testing.B) { + for _, n := range receiverCountsIncludingZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + m[ws[i].Receiver()] = Set(1) + } + mu.Lock() + for r := range m { + r.Notify(1) + } + mu.Unlock() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + mu.Lock() + for r := range m { + r.Notify(1) + } + mu.Unlock() + } + }) + } +} + +func BenchmarkQueueBroadcastRedundant(b *testing.B) { + for _, n := range receiverCountsIncludingZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var q waiter.Queue + for i := 0; i < n; i++ { + e, _ := waiter.NewChannelEntry(nil) + q.EventRegister(&e, 1) + } + q.Notify(1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Notify(1) + } + }) + } +} + +// BenchmarkXxxBroadcastAck measures how long it takes to Broadcast events to +// multiple Receivers, check that all Receivers have received the event, and +// clear the event from all Receivers. + +func BenchmarkBroadcasterBroadcastAck(b *testing.B) { + for _, n := range receiverCountsNonZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var br Broadcaster + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + br.SubscribeEvents(ws[i].Receiver(), 1) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + br.Broadcast(1) + for j := range ws { + if got, want := ws[j].Pending(), Set(1); got != want { + b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want) + } + ws[j].Ack(1) + } + } + }) + } +} + +func BenchmarkMapBroadcastAck(b *testing.B) { + for _, n := range receiverCountsNonZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var mu sync.Mutex + m := make(map[*Receiver]Set) + ws := make([]Waiter, n) + for i := range ws { + ws[i].Init() + m[ws[i].Receiver()] = Set(1) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + mu.Lock() + for r := range m { + r.Notify(1) + } + mu.Unlock() + for j := range ws { + if got, want := ws[j].Pending(), Set(1); got != want { + b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want) + } + ws[j].Ack(1) + } + } + }) + } +} + +func BenchmarkQueueBroadcastAck(b *testing.B) { + for _, n := range receiverCountsNonZero { + b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { + var q waiter.Queue + chs := make([]chan struct{}, n) + for i := range chs { + e, ch := waiter.NewChannelEntry(nil) + q.EventRegister(&e, 1) + chs[i] = ch + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Notify(1) + for _, ch := range chs { + select { + case <-ch: + default: + b.Fatalf("channel did not receive event") + } + } + } + }) + } +} |