1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
|
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package futex provides an implementation of the futex interface as found in
// the Linux kernel. It allows one to easily transform Wait() calls into waits
// on a channel, which is useful in a Go-based kernel, for example.
package futex
import (
"sync"
"sync/atomic"
"gvisor.googlesource.com/gvisor/pkg/syserror"
)
// Checker abstracts memory accesses. This is useful because the "addresses"
// used in this package may not be real addresses (they could be indices of an
// array, for example), or they could be mapped via some special mechanism.
//
// TODO: Replace this with usermem.IO.
type Checker interface {
// Check should validate that given address contains the given value.
// If it does not contain the value, syserror.EAGAIN must be returned.
// Any other error may be returned, which will be propagated.
Check(addr uintptr, val uint32) error
// Op should atomically perform the operation encoded in op on the data
// pointed to by addr, then apply the comparison encoded in op to the
// original value at addr, returning the result.
// Note that op is an opaque operation whose behaviour is defined
// outside of the futex manager.
Op(addr uintptr, op uint32) (bool, error)
}
// Waiter is the struct which gets enqueued into buckets for wake up routines
// and requeue routines to scan and notify. Once a Waiter has been enqueued by
// WaitPrepare(), callers may listen on C for wake up events.
type Waiter struct {
// Synchronization:
//
// - A Waiter that is not enqueued in a bucket is exclusively owned (no
// synchronization applies).
//
// - A Waiter is enqueued in a bucket by calling WaitPrepare(). After this,
// waiterEntry, complete, and addr are protected by the bucket.mu ("bucket
// lock") of the containing bucket, and bitmask is immutable. complete and
// addr are additionally mutated using atomic memory operations, ensuring
// that they can be read using atomic memory operations without holding the
// bucket lock.
//
// - A Waiter is only guaranteed to be no longer queued after calling
// WaitComplete().
// waiterEntry links Waiter into bucket.waiters.
waiterEntry
// complete is 1 if the Waiter was removed from its bucket by a wakeup and
// 0 otherwise.
complete int32
// C is sent to when the Waiter is woken.
C chan struct{}
// addr is the address being waited on.
addr uintptr
// The bitmask we're waiting on.
// This is used the case of a FUTEX_WAKE_BITSET.
bitmask uint32
}
// NewWaiter returns a new unqueued Waiter.
func NewWaiter() *Waiter {
return &Waiter{
C: make(chan struct{}, 1),
}
}
// bucket holds a list of waiters for a given address hash.
type bucket struct {
// mu protects waiters and contained Waiter state. See comment in Waiter.
mu sync.Mutex `state:"nosave"`
waiters waiterList `state:"zerovalue"`
}
// wakeLocked wakes up to n waiters matching the bitmask at the addr for this
// bucket and returns the number of waiters woken.
//
// Preconditions: b.mu must be locked.
func (b *bucket) wakeLocked(addr uintptr, bitmask uint32, n int) int {
done := 0
for w := b.waiters.Front(); done < n && w != nil; {
if w.addr != addr || w.bitmask&bitmask == 0 {
// Not matching.
w = w.Next()
continue
}
// Remove from the bucket and wake the waiter.
woke := w
w = w.Next() // Next iteration.
b.waiters.Remove(woke)
woke.C <- struct{}{}
// NOTE: The above channel write establishes a write barrier
// according to the memory model, so nothing may be ordered
// around it. Since we've dequeued w and will never touch it
// again, we can safely store 1 to w.complete here and allow
// the WaitComplete() to short-circuit grabbing the bucket
// lock. If they somehow miss the w.complete, we are still
// holding the lock, so we can know that they won't dequeue w,
// assume it's free and have the below operation afterwards.
atomic.StoreInt32(&woke.complete, 1)
done++
}
return done
}
// requeueLocked takes n waiters from the bucket and moves them to naddr on the
// bucket "to".
//
// Preconditions: b and to must be locked.
func (b *bucket) requeueLocked(to *bucket, addr, naddr uintptr, n int) int {
done := 0
for w := b.waiters.Front(); done < n && w != nil; {
if w.addr != addr {
// Not matching.
w = w.Next()
continue
}
requeued := w
w = w.Next() // Next iteration.
b.waiters.Remove(requeued)
atomic.StoreUintptr(&requeued.addr, naddr)
to.waiters.PushBack(requeued)
done++
}
return done
}
const (
// bucketCount is the number of buckets per Manager. By having many of
// these we reduce contention when concurrent yet unrelated calls are made.
bucketCount = 1 << bucketCountBits
bucketCountBits = 10
)
func checkAddr(addr uintptr) error {
// Ensure the address is aligned.
// It must be a DWORD boundary.
if addr&0x3 != 0 {
return syserror.EINVAL
}
return nil
}
// bucketIndexForAddr returns the index into Manager.buckets for addr.
func bucketIndexForAddr(addr uintptr) uintptr {
// - The bottom 2 bits of addr must be 0, per checkAddr.
//
// - On amd64, the top 16 bits of addr (bits 48-63) must be equal to bit 47
// for a canonical address, and (on all existing platforms) bit 47 must be
// 0 for an application address.
//
// Thus 19 bits of addr are "useless" for hashing, leaving only 45 "useful"
// bits. We choose one of the simplest possible hash functions that at
// least uses all 45 useful bits in the output, given that bucketCountBits
// == 10. This hash function also has the property that it will usually map
// adjacent addresses to adjacent buckets, slightly improving memory
// locality when an application synchronization structure uses multiple
// nearby futexes.
//
// Note that despite the large number of arithmetic operations in the
// function, many components can be computed in parallel, such that the
// critical path is 1 bit shift + 3 additions (2 in h1, then h1 + h2). This
// is also why h1 and h2 are grouped separately; for "(addr >> 2) + ... +
// (addr >> 42)" without any additional grouping, the compiler puts all 4
// additions in the critical path.
h1 := (addr >> 2) + (addr >> 12) + (addr >> 22)
h2 := (addr >> 32) + (addr >> 42)
return (h1 + h2) % bucketCount
}
// Manager holds futex state for a single virtual address space.
type Manager struct {
buckets [bucketCount]bucket `state:"zerovalue"`
}
// NewManager returns an initialized futex manager.
// N.B. we use virtual address to tag futexes, so it only works for private
// (within a single process) futex.
func NewManager() *Manager {
return &Manager{}
}
// lockBucket returns a locked bucket for the given addr.
//
// Preconditions: checkAddr(addr) == nil.
func (m *Manager) lockBucket(addr uintptr) *bucket {
b := &m.buckets[bucketIndexForAddr(addr)]
b.mu.Lock()
return b
}
// lockBuckets returns locked buckets for the given addrs.
//
// Preconditions: checkAddr(addr1) == checkAddr(addr2) == nil.
func (m *Manager) lockBuckets(addr1 uintptr, addr2 uintptr) (*bucket, *bucket) {
i1 := bucketIndexForAddr(addr1)
i2 := bucketIndexForAddr(addr2)
b1 := &m.buckets[i1]
b2 := &m.buckets[i2]
// Ensure that buckets are locked in a consistent order (lowest index
// first) to avoid circular locking.
switch {
case i1 < i2:
b1.mu.Lock()
b2.mu.Lock()
case i2 < i1:
b2.mu.Lock()
b1.mu.Lock()
default:
b1.mu.Lock()
}
return b1, b2
}
// Wake wakes up to n waiters matching the bitmask on the given addr.
// The number of waiters woken is returned.
func (m *Manager) Wake(addr uintptr, bitmask uint32, n int) (int, error) {
if err := checkAddr(addr); err != nil {
return 0, err
}
b := m.lockBucket(addr)
// This function is very hot; avoid defer.
r := b.wakeLocked(addr, bitmask, n)
b.mu.Unlock()
return r, nil
}
func (m *Manager) doRequeue(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) {
if err := checkAddr(addr); err != nil {
return 0, err
}
if err := checkAddr(naddr); err != nil {
return 0, err
}
b1, b2 := m.lockBuckets(addr, naddr)
defer b1.mu.Unlock()
if b2 != b1 {
defer b2.mu.Unlock()
}
// Check our value.
// This only applied for RequeueCmp().
if c != nil {
if err := c.Check(addr, val); err != nil {
return 0, err
}
}
// Wake the number required.
done := b1.wakeLocked(addr, ^uint32(0), nwake)
// Requeue the number required.
b1.requeueLocked(b2, addr, naddr, nreq)
return done, nil
}
// Requeue wakes up to nwake waiters on the given addr, and unconditionally
// requeues up to nreq waiters on naddr.
func (m *Manager) Requeue(addr uintptr, naddr uintptr, nwake int, nreq int) (int, error) {
return m.doRequeue(nil, addr, 0, naddr, nwake, nreq)
}
// RequeueCmp atomically checks that the addr contains val (via the Checker),
// wakes up to nwake waiters on addr and then unconditionally requeues nreq
// waiters on naddr.
func (m *Manager) RequeueCmp(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) {
return m.doRequeue(c, addr, val, naddr, nwake, nreq)
}
// WakeOp atomically applies op to the memory address addr2, wakes up to nwake1
// waiters unconditionally from addr1, and, based on the original value at addr2
// and a comparison encoded in op, wakes up to nwake2 waiters from addr2.
// It returns the total number of waiters woken.
func (m *Manager) WakeOp(c Checker, addr1 uintptr, addr2 uintptr, nwake1 int, nwake2 int, op uint32) (int, error) {
if err := checkAddr(addr1); err != nil {
return 0, err
}
if err := checkAddr(addr2); err != nil {
return 0, err
}
b1, b2 := m.lockBuckets(addr1, addr2)
done := 0
cond, err := c.Op(addr2, op)
if err == nil {
// Wake up up to nwake1 entries from the first bucket.
done = b1.wakeLocked(addr1, ^uint32(0), nwake1)
// Wake up up to nwake2 entries from the second bucket if the
// operation yielded true.
if cond {
done += b2.wakeLocked(addr2, ^uint32(0), nwake2)
}
}
b1.mu.Unlock()
if b2 != b1 {
b2.mu.Unlock()
}
return done, err
}
// WaitPrepare atomically checks that addr contains val (via the Checker), then
// enqueues w to be woken by a send to w.C. If WaitPrepare returns nil, the
// Waiter must be subsequently removed by calling WaitComplete, whether or not
// a wakeup is received on w.C.
func (m *Manager) WaitPrepare(w *Waiter, c Checker, addr uintptr, val uint32, bitmask uint32) error {
if err := checkAddr(addr); err != nil {
return err
}
// Prepare the Waiter before taking the bucket lock.
w.complete = 0
select {
case <-w.C:
default:
}
w.addr = addr
w.bitmask = bitmask
b := m.lockBucket(addr)
// This function is very hot; avoid defer.
// Perform our atomic check.
if err := c.Check(addr, val); err != nil {
b.mu.Unlock()
return err
}
// Add the waiter to the bucket.
b.waiters.PushBack(w)
b.mu.Unlock()
return nil
}
// WaitComplete must be called when a Waiter previously added by WaitPrepare is
// no longer eligible to be woken.
func (m *Manager) WaitComplete(w *Waiter) {
// Can we short-circuit acquiring the lock?
// This is the happy path where a notification
// was received and we don't need to dequeue this
// waiter from any list (or take any locks).
if atomic.LoadInt32(&w.complete) != 0 {
return
}
// Take the bucket lock. Note that without holding the bucket lock, the
// waiter is not guaranteed to stay in that bucket, so after we take the
// bucket lock, we must ensure that the bucket hasn't changed: if it
// happens to have changed, we release the old bucket lock and try again
// with the new bucket; if it hasn't changed, we know it won't change now
// because we hold the lock.
var b *bucket
for {
addr := atomic.LoadUintptr(&w.addr)
b = m.lockBucket(addr)
// We still have to use an atomic load here, because if w was racily
// requeued then w.addr is not protected by b.mu.
if addr == atomic.LoadUintptr(&w.addr) {
break
}
b.mu.Unlock()
}
// Remove waiter from the bucket. w.complete can only be stored with b.mu
// locked, so this load doesn't need to use sync/atomic.
if w.complete == 0 {
b.waiters.Remove(w)
}
b.mu.Unlock()
}
|