summaryrefslogtreecommitdiffhomepage
path: root/pkg/syncevent/broadcaster.go
blob: dabf08895d7a8ed3f0b67e9e3fa71830d6014d70 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Copyright 2020 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncevent

import (
	"gvisor.dev/gvisor/pkg/sync"
)

// Broadcaster is an implementation of Source that supports any number of
// subscribed Receivers.
//
// The zero value of Broadcaster is valid and has no subscribed Receivers.
// Broadcaster is not copyable by value.
//
// All Broadcaster methods may be called concurrently from multiple goroutines.
type Broadcaster struct {
	// Broadcaster is implemented as a hash table where keys are assigned by
	// the Broadcaster and returned as SubscriptionIDs, making it safe to use
	// the identity function for hashing. The hash table resolves collisions
	// using linear probing and features Robin Hood insertion and backward
	// shift deletion in order to support a relatively high load factor
	// efficiently, which matters since the cost of Broadcast is linear in the
	// size of the table.

	// mu protects the following fields.
	mu sync.Mutex

	// Invariants: len(table) is 0 or a power of 2.
	table []broadcasterSlot

	// load is the number of entries in table with receiver != nil.
	load int

	lastID SubscriptionID
}

type broadcasterSlot struct {
	// Invariants: If receiver == nil, then filter == NoEvents and id == 0.
	// Otherwise, id != 0.
	receiver *Receiver
	filter   Set
	id       SubscriptionID
}

const (
	broadcasterMinNonZeroTableSize = 2 // must be a power of 2 > 1

	broadcasterMaxLoadNum = 13
	broadcasterMaxLoadDen = 16
)

// SubscribeEvents implements Source.SubscribeEvents.
func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID {
	b.mu.Lock()

	// Assign an ID for this subscription.
	b.lastID++
	id := b.lastID

	// Expand the table if over the maximum load factor:
	//
	//          load / len(b.table) > broadcasterMaxLoadNum / broadcasterMaxLoadDen
	// load * broadcasterMaxLoadDen > broadcasterMaxLoadNum * len(b.table)
	b.load++
	if (b.load * broadcasterMaxLoadDen) > (broadcasterMaxLoadNum * len(b.table)) {
		// Double the number of slots in the new table.
		newlen := broadcasterMinNonZeroTableSize
		if len(b.table) != 0 {
			newlen = 2 * len(b.table)
		}
		if newlen <= cap(b.table) {
			// Reuse excess capacity in the current table, moving entries not
			// already in their first-probed positions to better ones.
			newtable := b.table[:newlen]
			newmask := uint64(newlen - 1)
			for i := range b.table {
				if b.table[i].receiver != nil && uint64(b.table[i].id)&newmask != uint64(i) {
					entry := b.table[i]
					b.table[i] = broadcasterSlot{}
					broadcasterTableInsert(newtable, entry.id, entry.receiver, entry.filter)
				}
			}
			b.table = newtable
		} else {
			newtable := make([]broadcasterSlot, newlen)
			// Copy existing entries to the new table.
			for i := range b.table {
				if b.table[i].receiver != nil {
					broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
				}
			}
			// Switch to the new table.
			b.table = newtable
		}
	}

	broadcasterTableInsert(b.table, id, r, filter)
	b.mu.Unlock()
	return id
}

// Preconditions:
// * table must not be full.
// * len(table) is a power of 2.
func broadcasterTableInsert(table []broadcasterSlot, id SubscriptionID, r *Receiver, filter Set) {
	entry := broadcasterSlot{
		receiver: r,
		filter:   filter,
		id:       id,
	}
	mask := uint64(len(table) - 1)
	i := uint64(id) & mask
	disp := uint64(0)
	for {
		if table[i].receiver == nil {
			table[i] = entry
			return
		}
		// If we've been displaced farther from our first-probed slot than the
		// element stored in this one, swap elements and switch to inserting
		// the replaced one. (This is Robin Hood insertion.)
		slotDisp := (i - uint64(table[i].id)) & mask
		if disp > slotDisp {
			table[i], entry = entry, table[i]
			disp = slotDisp
		}
		i = (i + 1) & mask
		disp++
	}
}

// UnsubscribeEvents implements Source.UnsubscribeEvents.
func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID) {
	b.mu.Lock()

	mask := uint64(len(b.table) - 1)
	i := uint64(id) & mask
	for {
		if b.table[i].id == id {
			// Found the element to remove. Move all subsequent elements
			// backward until we either find an empty slot, or an element that
			// is already in its first-probed slot. (This is backward shift
			// deletion.)
			for {
				next := (i + 1) & mask
				if b.table[next].receiver == nil {
					break
				}
				if uint64(b.table[next].id)&mask == next {
					break
				}
				b.table[i] = b.table[next]
				i = next
			}
			b.table[i] = broadcasterSlot{}
			break
		}
		i = (i + 1) & mask
	}

	// If a table 1/4 of the current size would still be at or under the
	// maximum load factor (i.e. the current table size is at least two
	// expansions bigger than necessary), halve the size of the table to reduce
	// the cost of Broadcast. Since we are concerned with iteration time and
	// not memory usage, reuse the existing slice to reduce future allocations
	// from table re-expansion.
	b.load--
	if len(b.table) > broadcasterMinNonZeroTableSize && (b.load*(4*broadcasterMaxLoadDen)) <= (broadcasterMaxLoadNum*len(b.table)) {
		newlen := len(b.table) / 2
		newtable := b.table[:newlen]
		for i := newlen; i < len(b.table); i++ {
			if b.table[i].receiver != nil {
				broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
				b.table[i] = broadcasterSlot{}
			}
		}
		b.table = newtable
	}

	b.mu.Unlock()
}

// Broadcast notifies all Receivers subscribed to the Broadcaster of the subset
// of events to which they subscribed. The order in which Receivers are
// notified is unspecified.
func (b *Broadcaster) Broadcast(events Set) {
	b.mu.Lock()
	for i := range b.table {
		if intersection := events & b.table[i].filter; intersection != 0 {
			// We don't need to check if broadcasterSlot.receiver is nil, since
			// if it is then broadcasterSlot.filter is 0.
			b.table[i].receiver.Notify(intersection)
		}
	}
	b.mu.Unlock()
}

// FilteredEvents returns the set of events for which Broadcast will notify at
// least one Receiver, i.e. the union of filters for all subscribed Receivers.
func (b *Broadcaster) FilteredEvents() Set {
	var es Set
	b.mu.Lock()
	for i := range b.table {
		es |= b.table[i].filter
	}
	b.mu.Unlock()
	return es
}