summaryrefslogtreecommitdiffhomepage
path: root/pkg/syncevent/broadcaster.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/syncevent/broadcaster.go')
-rw-r--r--pkg/syncevent/broadcaster.go220
1 files changed, 0 insertions, 220 deletions
diff --git a/pkg/syncevent/broadcaster.go b/pkg/syncevent/broadcaster.go
deleted file mode 100644
index dabf08895..000000000
--- a/pkg/syncevent/broadcaster.go
+++ /dev/null
@@ -1,220 +0,0 @@
-// Copyright 2020 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncevent
-
-import (
- "gvisor.dev/gvisor/pkg/sync"
-)
-
-// Broadcaster is an implementation of Source that supports any number of
-// subscribed Receivers.
-//
-// The zero value of Broadcaster is valid and has no subscribed Receivers.
-// Broadcaster is not copyable by value.
-//
-// All Broadcaster methods may be called concurrently from multiple goroutines.
-type Broadcaster struct {
- // Broadcaster is implemented as a hash table where keys are assigned by
- // the Broadcaster and returned as SubscriptionIDs, making it safe to use
- // the identity function for hashing. The hash table resolves collisions
- // using linear probing and features Robin Hood insertion and backward
- // shift deletion in order to support a relatively high load factor
- // efficiently, which matters since the cost of Broadcast is linear in the
- // size of the table.
-
- // mu protects the following fields.
- mu sync.Mutex
-
- // Invariants: len(table) is 0 or a power of 2.
- table []broadcasterSlot
-
- // load is the number of entries in table with receiver != nil.
- load int
-
- lastID SubscriptionID
-}
-
-type broadcasterSlot struct {
- // Invariants: If receiver == nil, then filter == NoEvents and id == 0.
- // Otherwise, id != 0.
- receiver *Receiver
- filter Set
- id SubscriptionID
-}
-
-const (
- broadcasterMinNonZeroTableSize = 2 // must be a power of 2 > 1
-
- broadcasterMaxLoadNum = 13
- broadcasterMaxLoadDen = 16
-)
-
-// SubscribeEvents implements Source.SubscribeEvents.
-func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID {
- b.mu.Lock()
-
- // Assign an ID for this subscription.
- b.lastID++
- id := b.lastID
-
- // Expand the table if over the maximum load factor:
- //
- // load / len(b.table) > broadcasterMaxLoadNum / broadcasterMaxLoadDen
- // load * broadcasterMaxLoadDen > broadcasterMaxLoadNum * len(b.table)
- b.load++
- if (b.load * broadcasterMaxLoadDen) > (broadcasterMaxLoadNum * len(b.table)) {
- // Double the number of slots in the new table.
- newlen := broadcasterMinNonZeroTableSize
- if len(b.table) != 0 {
- newlen = 2 * len(b.table)
- }
- if newlen <= cap(b.table) {
- // Reuse excess capacity in the current table, moving entries not
- // already in their first-probed positions to better ones.
- newtable := b.table[:newlen]
- newmask := uint64(newlen - 1)
- for i := range b.table {
- if b.table[i].receiver != nil && uint64(b.table[i].id)&newmask != uint64(i) {
- entry := b.table[i]
- b.table[i] = broadcasterSlot{}
- broadcasterTableInsert(newtable, entry.id, entry.receiver, entry.filter)
- }
- }
- b.table = newtable
- } else {
- newtable := make([]broadcasterSlot, newlen)
- // Copy existing entries to the new table.
- for i := range b.table {
- if b.table[i].receiver != nil {
- broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
- }
- }
- // Switch to the new table.
- b.table = newtable
- }
- }
-
- broadcasterTableInsert(b.table, id, r, filter)
- b.mu.Unlock()
- return id
-}
-
-// Preconditions:
-// * table must not be full.
-// * len(table) is a power of 2.
-func broadcasterTableInsert(table []broadcasterSlot, id SubscriptionID, r *Receiver, filter Set) {
- entry := broadcasterSlot{
- receiver: r,
- filter: filter,
- id: id,
- }
- mask := uint64(len(table) - 1)
- i := uint64(id) & mask
- disp := uint64(0)
- for {
- if table[i].receiver == nil {
- table[i] = entry
- return
- }
- // If we've been displaced farther from our first-probed slot than the
- // element stored in this one, swap elements and switch to inserting
- // the replaced one. (This is Robin Hood insertion.)
- slotDisp := (i - uint64(table[i].id)) & mask
- if disp > slotDisp {
- table[i], entry = entry, table[i]
- disp = slotDisp
- }
- i = (i + 1) & mask
- disp++
- }
-}
-
-// UnsubscribeEvents implements Source.UnsubscribeEvents.
-func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID) {
- b.mu.Lock()
-
- mask := uint64(len(b.table) - 1)
- i := uint64(id) & mask
- for {
- if b.table[i].id == id {
- // Found the element to remove. Move all subsequent elements
- // backward until we either find an empty slot, or an element that
- // is already in its first-probed slot. (This is backward shift
- // deletion.)
- for {
- next := (i + 1) & mask
- if b.table[next].receiver == nil {
- break
- }
- if uint64(b.table[next].id)&mask == next {
- break
- }
- b.table[i] = b.table[next]
- i = next
- }
- b.table[i] = broadcasterSlot{}
- break
- }
- i = (i + 1) & mask
- }
-
- // If a table 1/4 of the current size would still be at or under the
- // maximum load factor (i.e. the current table size is at least two
- // expansions bigger than necessary), halve the size of the table to reduce
- // the cost of Broadcast. Since we are concerned with iteration time and
- // not memory usage, reuse the existing slice to reduce future allocations
- // from table re-expansion.
- b.load--
- if len(b.table) > broadcasterMinNonZeroTableSize && (b.load*(4*broadcasterMaxLoadDen)) <= (broadcasterMaxLoadNum*len(b.table)) {
- newlen := len(b.table) / 2
- newtable := b.table[:newlen]
- for i := newlen; i < len(b.table); i++ {
- if b.table[i].receiver != nil {
- broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
- b.table[i] = broadcasterSlot{}
- }
- }
- b.table = newtable
- }
-
- b.mu.Unlock()
-}
-
-// Broadcast notifies all Receivers subscribed to the Broadcaster of the subset
-// of events to which they subscribed. The order in which Receivers are
-// notified is unspecified.
-func (b *Broadcaster) Broadcast(events Set) {
- b.mu.Lock()
- for i := range b.table {
- if intersection := events & b.table[i].filter; intersection != 0 {
- // We don't need to check if broadcasterSlot.receiver is nil, since
- // if it is then broadcasterSlot.filter is 0.
- b.table[i].receiver.Notify(intersection)
- }
- }
- b.mu.Unlock()
-}
-
-// FilteredEvents returns the set of events for which Broadcast will notify at
-// least one Receiver, i.e. the union of filters for all subscribed Receivers.
-func (b *Broadcaster) FilteredEvents() Set {
- var es Set
- b.mu.Lock()
- for i := range b.table {
- es |= b.table[i].filter
- }
- b.mu.Unlock()
- return es
-}