diff options
author | Googler <noreply@google.com> | 2018-04-27 10:37:02 -0700 |
---|---|---|
committer | Adin Scannell <ascannell@google.com> | 2018-04-28 01:44:26 -0400 |
commit | d02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch) | |
tree | 54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/sentry/kernel/futex | |
parent | f70210e742919f40aa2f0934a22f1c9ba6dada62 (diff) |
Check in gVisor.
PiperOrigin-RevId: 194583126
Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/sentry/kernel/futex')
-rw-r--r-- | pkg/sentry/kernel/futex/BUILD | 48 | ||||
-rw-r--r-- | pkg/sentry/kernel/futex/futex.go | 405 | ||||
-rw-r--r-- | pkg/sentry/kernel/futex/futex_test.go | 500 |
3 files changed, 953 insertions, 0 deletions
diff --git a/pkg/sentry/kernel/futex/BUILD b/pkg/sentry/kernel/futex/BUILD new file mode 100644 index 000000000..de9897c58 --- /dev/null +++ b/pkg/sentry/kernel/futex/BUILD @@ -0,0 +1,48 @@ +package(licenses = ["notice"]) # Apache 2.0 + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("//tools/go_generics:defs.bzl", "go_template_instance") +load("//tools/go_stateify:defs.bzl", "go_stateify") + +go_template_instance( + name = "waiter_list", + out = "waiter_list.go", + package = "futex", + prefix = "waiter", + template = "//pkg/ilist:generic_list", + types = { + "Linker": "*Waiter", + }, +) + +go_stateify( + name = "futex_state", + srcs = [ + "futex.go", + "waiter_list.go", + ], + out = "futex_state.go", + package = "futex", +) + +go_library( + name = "futex", + srcs = [ + "futex.go", + "futex_state.go", + "waiter_list.go", + ], + importpath = "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/futex", + visibility = ["//pkg/sentry:internal"], + deps = [ + "//pkg/state", + "//pkg/syserror", + ], +) + +go_test( + name = "futex_test", + size = "small", + srcs = ["futex_test.go"], + embed = [":futex"], +) diff --git a/pkg/sentry/kernel/futex/futex.go b/pkg/sentry/kernel/futex/futex.go new file mode 100644 index 000000000..b3ba57a2c --- /dev/null +++ b/pkg/sentry/kernel/futex/futex.go @@ -0,0 +1,405 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package futex provides an implementation of the futex interface as found in +// the Linux kernel. It allows one to easily transform Wait() calls into waits +// on a channel, which is useful in a Go-based kernel, for example. +package futex + +import ( + "sync" + "sync/atomic" + + "gvisor.googlesource.com/gvisor/pkg/syserror" +) + +// Checker abstracts memory accesses. This is useful because the "addresses" +// used in this package may not be real addresses (they could be indices of an +// array, for example), or they could be mapped via some special mechanism. +// +// TODO: Replace this with usermem.IO. +type Checker interface { + // Check should validate that given address contains the given value. + // If it does not contain the value, syserror.EAGAIN must be returned. + // Any other error may be returned, which will be propagated. + Check(addr uintptr, val uint32) error + + // Op should atomically perform the operation encoded in op on the data + // pointed to by addr, then apply the comparison encoded in op to the + // original value at addr, returning the result. + // Note that op is an opaque operation whose behaviour is defined + // outside of the futex manager. + Op(addr uintptr, op uint32) (bool, error) +} + +// Waiter is the struct which gets enqueued into buckets for wake up routines +// and requeue routines to scan and notify. Once a Waiter has been enqueued by +// WaitPrepare(), callers may listen on C for wake up events. +type Waiter struct { + // Synchronization: + // + // - A Waiter that is not enqueued in a bucket is exclusively owned (no + // synchronization applies). + // + // - A Waiter is enqueued in a bucket by calling WaitPrepare(). After this, + // waiterEntry, complete, and addr are protected by the bucket.mu ("bucket + // lock") of the containing bucket, and bitmask is immutable. complete and + // addr are additionally mutated using atomic memory operations, ensuring + // that they can be read using atomic memory operations without holding the + // bucket lock. + // + // - A Waiter is only guaranteed to be no longer queued after calling + // WaitComplete(). + + // waiterEntry links Waiter into bucket.waiters. + waiterEntry + + // complete is 1 if the Waiter was removed from its bucket by a wakeup and + // 0 otherwise. + complete int32 + + // C is sent to when the Waiter is woken. + C chan struct{} + + // addr is the address being waited on. + addr uintptr + + // The bitmask we're waiting on. + // This is used the case of a FUTEX_WAKE_BITSET. + bitmask uint32 +} + +// NewWaiter returns a new unqueued Waiter. +func NewWaiter() *Waiter { + return &Waiter{ + C: make(chan struct{}, 1), + } +} + +// bucket holds a list of waiters for a given address hash. +type bucket struct { + // mu protects waiters and contained Waiter state. See comment in Waiter. + mu sync.Mutex `state:"nosave"` + + waiters waiterList `state:"zerovalue"` +} + +// wakeLocked wakes up to n waiters matching the bitmask at the addr for this +// bucket and returns the number of waiters woken. +// +// Preconditions: b.mu must be locked. +func (b *bucket) wakeLocked(addr uintptr, bitmask uint32, n int) int { + done := 0 + for w := b.waiters.Front(); done < n && w != nil; { + if w.addr != addr || w.bitmask&bitmask == 0 { + // Not matching. + w = w.Next() + continue + } + + // Remove from the bucket and wake the waiter. + woke := w + w = w.Next() // Next iteration. + b.waiters.Remove(woke) + woke.C <- struct{}{} + + // NOTE: The above channel write establishes a write barrier + // according to the memory model, so nothing may be ordered + // around it. Since we've dequeued w and will never touch it + // again, we can safely store 1 to w.complete here and allow + // the WaitComplete() to short-circuit grabbing the bucket + // lock. If they somehow miss the w.complete, we are still + // holding the lock, so we can know that they won't dequeue w, + // assume it's free and have the below operation afterwards. + atomic.StoreInt32(&woke.complete, 1) + done++ + } + return done +} + +// requeueLocked takes n waiters from the bucket and moves them to naddr on the +// bucket "to". +// +// Preconditions: b and to must be locked. +func (b *bucket) requeueLocked(to *bucket, addr, naddr uintptr, n int) int { + done := 0 + for w := b.waiters.Front(); done < n && w != nil; { + if w.addr != addr { + // Not matching. + w = w.Next() + continue + } + + requeued := w + w = w.Next() // Next iteration. + b.waiters.Remove(requeued) + atomic.StoreUintptr(&requeued.addr, naddr) + to.waiters.PushBack(requeued) + done++ + } + return done +} + +const ( + // bucketCount is the number of buckets per Manager. By having many of + // these we reduce contention when concurrent yet unrelated calls are made. + bucketCount = 1 << bucketCountBits + bucketCountBits = 10 +) + +func checkAddr(addr uintptr) error { + // Ensure the address is aligned. + // It must be a DWORD boundary. + if addr&0x3 != 0 { + return syserror.EINVAL + } + + return nil +} + +// bucketIndexForAddr returns the index into Manager.buckets for addr. +func bucketIndexForAddr(addr uintptr) uintptr { + // - The bottom 2 bits of addr must be 0, per checkAddr. + // + // - On amd64, the top 16 bits of addr (bits 48-63) must be equal to bit 47 + // for a canonical address, and (on all existing platforms) bit 47 must be + // 0 for an application address. + // + // Thus 19 bits of addr are "useless" for hashing, leaving only 45 "useful" + // bits. We choose one of the simplest possible hash functions that at + // least uses all 45 useful bits in the output, given that bucketCountBits + // == 10. This hash function also has the property that it will usually map + // adjacent addresses to adjacent buckets, slightly improving memory + // locality when an application synchronization structure uses multiple + // nearby futexes. + // + // Note that despite the large number of arithmetic operations in the + // function, many components can be computed in parallel, such that the + // critical path is 1 bit shift + 3 additions (2 in h1, then h1 + h2). This + // is also why h1 and h2 are grouped separately; for "(addr >> 2) + ... + + // (addr >> 42)" without any additional grouping, the compiler puts all 4 + // additions in the critical path. + h1 := (addr >> 2) + (addr >> 12) + (addr >> 22) + h2 := (addr >> 32) + (addr >> 42) + return (h1 + h2) % bucketCount +} + +// Manager holds futex state for a single virtual address space. +type Manager struct { + buckets [bucketCount]bucket +} + +// NewManager returns an initialized futex manager. +// N.B. we use virtual address to tag futexes, so it only works for private +// (within a single process) futex. +func NewManager() *Manager { + return &Manager{} +} + +// lockBucket returns a locked bucket for the given addr. +// +// Preconditions: checkAddr(addr) == nil. +func (m *Manager) lockBucket(addr uintptr) *bucket { + b := &m.buckets[bucketIndexForAddr(addr)] + b.mu.Lock() + return b +} + +// lockBuckets returns locked buckets for the given addrs. +// +// Preconditions: checkAddr(addr1) == checkAddr(addr2) == nil. +func (m *Manager) lockBuckets(addr1 uintptr, addr2 uintptr) (*bucket, *bucket) { + i1 := bucketIndexForAddr(addr1) + i2 := bucketIndexForAddr(addr2) + b1 := &m.buckets[i1] + b2 := &m.buckets[i2] + + // Ensure that buckets are locked in a consistent order (lowest index + // first) to avoid circular locking. + switch { + case i1 < i2: + b1.mu.Lock() + b2.mu.Lock() + case i2 < i1: + b2.mu.Lock() + b1.mu.Lock() + default: + b1.mu.Lock() + } + + return b1, b2 +} + +// Wake wakes up to n waiters matching the bitmask on the given addr. +// The number of waiters woken is returned. +func (m *Manager) Wake(addr uintptr, bitmask uint32, n int) (int, error) { + if err := checkAddr(addr); err != nil { + return 0, err + } + + b := m.lockBucket(addr) + // This function is very hot; avoid defer. + r := b.wakeLocked(addr, bitmask, n) + b.mu.Unlock() + return r, nil +} + +func (m *Manager) doRequeue(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) { + if err := checkAddr(addr); err != nil { + return 0, err + } + if err := checkAddr(naddr); err != nil { + return 0, err + } + + b1, b2 := m.lockBuckets(addr, naddr) + defer b1.mu.Unlock() + if b2 != b1 { + defer b2.mu.Unlock() + } + + // Check our value. + // This only applied for RequeueCmp(). + if c != nil { + if err := c.Check(addr, val); err != nil { + return 0, err + } + } + + // Wake the number required. + done := b1.wakeLocked(addr, ^uint32(0), nwake) + + // Requeue the number required. + b1.requeueLocked(b2, addr, naddr, nreq) + + return done, nil +} + +// Requeue wakes up to nwake waiters on the given addr, and unconditionally +// requeues up to nreq waiters on naddr. +func (m *Manager) Requeue(addr uintptr, naddr uintptr, nwake int, nreq int) (int, error) { + return m.doRequeue(nil, addr, 0, naddr, nwake, nreq) +} + +// RequeueCmp atomically checks that the addr contains val (via the Checker), +// wakes up to nwake waiters on addr and then unconditionally requeues nreq +// waiters on naddr. +func (m *Manager) RequeueCmp(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) { + return m.doRequeue(c, addr, val, naddr, nwake, nreq) +} + +// WakeOp atomically applies op to the memory address addr2, wakes up to nwake1 +// waiters unconditionally from addr1, and, based on the original value at addr2 +// and a comparison encoded in op, wakes up to nwake2 waiters from addr2. +// It returns the total number of waiters woken. +func (m *Manager) WakeOp(c Checker, addr1 uintptr, addr2 uintptr, nwake1 int, nwake2 int, op uint32) (int, error) { + if err := checkAddr(addr1); err != nil { + return 0, err + } + if err := checkAddr(addr2); err != nil { + return 0, err + } + + b1, b2 := m.lockBuckets(addr1, addr2) + + done := 0 + cond, err := c.Op(addr2, op) + if err == nil { + // Wake up up to nwake1 entries from the first bucket. + done = b1.wakeLocked(addr1, ^uint32(0), nwake1) + + // Wake up up to nwake2 entries from the second bucket if the + // operation yielded true. + if cond { + done += b2.wakeLocked(addr2, ^uint32(0), nwake2) + } + } + + b1.mu.Unlock() + if b2 != b1 { + b2.mu.Unlock() + } + return done, err +} + +// WaitPrepare atomically checks that addr contains val (via the Checker), then +// enqueues w to be woken by a send to w.C. If WaitPrepare returns nil, the +// Waiter must be subsequently removed by calling WaitComplete, whether or not +// a wakeup is received on w.C. +func (m *Manager) WaitPrepare(w *Waiter, c Checker, addr uintptr, val uint32, bitmask uint32) error { + if err := checkAddr(addr); err != nil { + return err + } + + // Prepare the Waiter before taking the bucket lock. + w.complete = 0 + select { + case <-w.C: + default: + } + w.addr = addr + w.bitmask = bitmask + + b := m.lockBucket(addr) + // This function is very hot; avoid defer. + + // Perform our atomic check. + if err := c.Check(addr, val); err != nil { + b.mu.Unlock() + return err + } + + // Add the waiter to the bucket. + b.waiters.PushBack(w) + + b.mu.Unlock() + return nil +} + +// WaitComplete must be called when a Waiter previously added by WaitPrepare is +// no longer eligible to be woken. +func (m *Manager) WaitComplete(w *Waiter) { + // Can we short-circuit acquiring the lock? + // This is the happy path where a notification + // was received and we don't need to dequeue this + // waiter from any list (or take any locks). + if atomic.LoadInt32(&w.complete) != 0 { + return + } + + // Take the bucket lock. Note that without holding the bucket lock, the + // waiter is not guaranteed to stay in that bucket, so after we take the + // bucket lock, we must ensure that the bucket hasn't changed: if it + // happens to have changed, we release the old bucket lock and try again + // with the new bucket; if it hasn't changed, we know it won't change now + // because we hold the lock. + var b *bucket + for { + addr := atomic.LoadUintptr(&w.addr) + b = m.lockBucket(addr) + // We still have to use an atomic load here, because if w was racily + // requeued then w.addr is not protected by b.mu. + if addr == atomic.LoadUintptr(&w.addr) { + break + } + b.mu.Unlock() + } + + // Remove waiter from the bucket. w.complete can only be stored with b.mu + // locked, so this load doesn't need to use sync/atomic. + if w.complete == 0 { + b.waiters.Remove(w) + } + b.mu.Unlock() +} diff --git a/pkg/sentry/kernel/futex/futex_test.go b/pkg/sentry/kernel/futex/futex_test.go new file mode 100644 index 000000000..7b81358ec --- /dev/null +++ b/pkg/sentry/kernel/futex/futex_test.go @@ -0,0 +1,500 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package futex + +import ( + "math" + "runtime" + "sync" + "sync/atomic" + "syscall" + "testing" + "unsafe" +) + +const ( + testMutexSize = 4 + testMutexLocked uint32 = 1 + testMutexUnlocked uint32 = 0 +) + +// testData implements the Checker interface, and allows us to +// treat the address passed for futex operations as an index in +// a byte slice for testing simplicity. +type testData []byte + +func newTestData(size uint) testData { + return make([]byte, size) +} + +func (t testData) Check(addr uintptr, val uint32) error { + if val != atomic.LoadUint32((*uint32)(unsafe.Pointer(&t[addr]))) { + return syscall.EAGAIN + } + return nil +} + +func (t testData) Op(addr uintptr, val uint32) (bool, error) { + return val == 0, nil +} + +// testMutex ties together a testData slice, an address, and a +// futex manager in order to implement the sync.Locker interface. +// Beyond being used as a Locker, this is a simple mechanism for +// changing the underlying values for simpler tests. +type testMutex struct { + a uintptr + d testData + m *Manager +} + +func newTestMutex(addr uintptr, d testData, m *Manager) *testMutex { + return &testMutex{a: addr, d: d, m: m} +} + +// Lock acquires the testMutex. +// This may wait for it to be available via the futex manager. +func (t *testMutex) Lock() { + for { + // Attempt to grab the lock. + if atomic.CompareAndSwapUint32( + ((*uint32)(unsafe.Pointer(&t.d[t.a]))), + testMutexUnlocked, + testMutexLocked) { + // Lock held. + return + } + + // Wait for it to be "not locked". + w := NewWaiter() + err := t.m.WaitPrepare(w, t.d, t.a, testMutexLocked, ^uint32(0)) + if err == syscall.EAGAIN { + continue + } + if err != nil { + // Should never happen. + panic("WaitPrepare returned unexpected error: " + err.Error()) + } + <-w.C + t.m.WaitComplete(w) + } +} + +// Unlock releases the testMutex. +// This will notify any waiters via the futex manager. +func (t *testMutex) Unlock() { + // Unlock. + atomic.StoreUint32(((*uint32)(unsafe.Pointer(&t.d[t.a]))), testMutexUnlocked) + + // Notify all waiters. + t.m.Wake(t.a, ^uint32(0), math.MaxInt32) +} + +func TestFutexWake(t *testing.T) { + m := NewManager() + d := newTestData(testMutexSize) + + // Wait for it to be locked. + // (This won't trigger the wake in testMutex) + w := NewWaiter() + m.WaitPrepare(w, d, 0, testMutexUnlocked, ^uint32(0)) + + // Wake the single thread. + if _, err := m.Wake(0, ^uint32(0), 1); err != nil { + t.Error("wake error:", err) + } + + <-w.C + m.WaitComplete(w) +} + +func TestFutexWakeBitmask(t *testing.T) { + m := NewManager() + d := newTestData(testMutexSize) + + // Wait for it to be locked. + // (This won't trigger the wake in testMutex) + w := NewWaiter() + m.WaitPrepare(w, d, 0, testMutexUnlocked, 0x0000ffff) + + // Wake the single thread, not using the bitmask. + if _, err := m.Wake(0, 0xffff0000, 1); err != nil { + t.Error("wake non-matching bitmask error:", err) + } + + select { + case <-w.C: + t.Error("w is alive?") + default: + } + + // Now use a matching bitmask. + if _, err := m.Wake(0, 0x00000001, 1); err != nil { + t.Error("wake matching bitmask error:", err) + } + + <-w.C + m.WaitComplete(w) +} + +func TestFutexWakeTwo(t *testing.T) { + m := NewManager() + d := newTestData(testMutexSize) + + // Wait for it to be locked. + // (This won't trigger the wake in testMutex) + w1 := NewWaiter() + w2 := NewWaiter() + w3 := NewWaiter() + m.WaitPrepare(w1, d, 0, testMutexUnlocked, ^uint32(0)) + m.WaitPrepare(w2, d, 0, testMutexUnlocked, ^uint32(0)) + m.WaitPrepare(w3, d, 0, testMutexUnlocked, ^uint32(0)) + + // Wake exactly two threads. + if _, err := m.Wake(0, ^uint32(0), 2); err != nil { + t.Error("wake error:", err) + } + + // Ensure exactly two are alive. + // We don't get guarantees about exactly which two, + // (although we expect them to be w1 and w2). + awake := 0 + for { + select { + case <-w1.C: + awake++ + case <-w2.C: + awake++ + case <-w3.C: + awake++ + default: + if awake != 2 { + t.Error("awake != 2?") + } + + // Success. + return + } + } +} + +func TestFutexWakeUnrelated(t *testing.T) { + m := NewManager() + d := newTestData(2 * testMutexSize) + + // Wait for it to be locked. + w1 := NewWaiter() + w2 := NewWaiter() + m.WaitPrepare(w1, d, 0*testMutexSize, testMutexUnlocked, ^uint32(0)) + m.WaitPrepare(w2, d, 1*testMutexSize, testMutexUnlocked, ^uint32(0)) + + // Wake only the second one. + if _, err := m.Wake(1*testMutexSize, ^uint32(0), 2); err != nil { + t.Error("wake error:", err) + } + + // Ensure only r2 is alive. + select { + case <-w1.C: + t.Error("w1 is alive?") + default: + } + <-w2.C +} + +// This function was shamelessly stolen from mutex_test.go. +func HammerMutex(l sync.Locker, loops int, cdone chan bool) { + for i := 0; i < loops; i++ { + l.Lock() + runtime.Gosched() + l.Unlock() + } + cdone <- true +} + +func TestFutexStress(t *testing.T) { + m := NewManager() + d := newTestData(testMutexSize) + tm := newTestMutex(0*testMutexSize, d, m) + c := make(chan bool) + + for i := 0; i < 10; i++ { + go HammerMutex(tm, 1000, c) + } + + for i := 0; i < 10; i++ { + <-c + } +} + +func TestWakeOpEmpty(t *testing.T) { + m := NewManager() + d := newTestData(8) + + n, err := m.WakeOp(d, 0, 4, 10, 10, 0) + if err != nil { + t.Fatalf("WakeOp failed: %v", err) + } + + if n != 0 { + t.Fatalf("Invalid number of wakes: want 0, got %d", n) + } +} + +func TestWakeOpFirstNonEmpty(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address 0. + w1 := NewWaiter() + if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w1) + + w2 := NewWaiter() + if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w2) + + // Wake up all waiters on address 0. + n, err := m.WakeOp(d, 0, 4, 10, 10, 0) + if err != nil { + t.Fatalf("WakeOp failed: %v", err) + } + + if n != 2 { + t.Fatalf("Invalid number of wakes: want 2, got %d", n) + } +} + +func TestWakeOpSecondNonEmpty(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address 4. + w1 := NewWaiter() + if err := m.WaitPrepare(w1, d, 4, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w1) + + w2 := NewWaiter() + if err := m.WaitPrepare(w2, d, 4, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w2) + + // Wake up all waiters on address 4. + n, err := m.WakeOp(d, 0, 4, 10, 10, 0) + if err != nil { + t.Fatalf("WakeOp failed: %v", err) + } + + if n != 2 { + t.Fatalf("Invalid number of wakes: want 2, got %d", n) + } +} + +func TestWakeOpSecondNonEmptyFailingOp(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address 4. + w1 := NewWaiter() + if err := m.WaitPrepare(w1, d, 4, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w1) + + w2 := NewWaiter() + if err := m.WaitPrepare(w2, d, 4, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w2) + + // Wake up all waiters on address 4. + n, err := m.WakeOp(d, 0, 4, 10, 10, 1) + if err != nil { + t.Fatalf("WakeOp failed: %v", err) + } + + if n != 0 { + t.Fatalf("Invalid number of wakes: want 0, got %d", n) + } +} + +func TestWakeOpAllNonEmpty(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address 0. + w1 := NewWaiter() + if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w1) + + w2 := NewWaiter() + if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w2) + + // Add two waiters on address 4. + w3 := NewWaiter() + if err := m.WaitPrepare(w3, d, 4, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w3) + + w4 := NewWaiter() + if err := m.WaitPrepare(w4, d, 4, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w4) + + // Wake up all waiters on both addresses. + n, err := m.WakeOp(d, 0, 4, 10, 10, 0) + if err != nil { + t.Fatalf("WakeOp failed: %v", err) + } + + if n != 4 { + t.Fatalf("Invalid number of wakes: want 4, got %d", n) + } +} + +func TestWakeOpAllNonEmptyFailingOp(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address 0. + w1 := NewWaiter() + if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w1) + + w2 := NewWaiter() + if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w2) + + // Add two waiters on address 4. + w3 := NewWaiter() + if err := m.WaitPrepare(w3, d, 4, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w3) + + w4 := NewWaiter() + if err := m.WaitPrepare(w4, d, 4, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w4) + + // Wake up all waiters on both addresses. + n, err := m.WakeOp(d, 0, 4, 10, 10, 1) + if err != nil { + t.Fatalf("WakeOp failed: %v", err) + } + + if n != 2 { + t.Fatalf("Invalid number of wakes: want 2, got %d", n) + } +} + +func TestWakeOpSameAddress(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add four waiters on address 0. + w1 := NewWaiter() + if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w1) + + w2 := NewWaiter() + if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w2) + + w3 := NewWaiter() + if err := m.WaitPrepare(w3, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w3) + + w4 := NewWaiter() + if err := m.WaitPrepare(w4, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w4) + + // Use the same address, with one at most one waiter from each. + n, err := m.WakeOp(d, 0, 0, 1, 1, 0) + if err != nil { + t.Fatalf("WakeOp failed: %v", err) + } + + if n != 2 { + t.Fatalf("Invalid number of wakes: want 2, got %d", n) + } +} + +func TestWakeOpSameAddressFailingOp(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add four waiters on address 0. + w1 := NewWaiter() + if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w1) + + w2 := NewWaiter() + if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w2) + + w3 := NewWaiter() + if err := m.WaitPrepare(w3, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w3) + + w4 := NewWaiter() + if err := m.WaitPrepare(w4, d, 0, 0, ^uint32(0)); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + defer m.WaitComplete(w4) + + // Use the same address, with one at most one waiter from each. + n, err := m.WakeOp(d, 0, 0, 1, 1, 1) + if err != nil { + t.Fatalf("WakeOp failed: %v", err) + } + + if n != 1 { + t.Fatalf("Invalid number of wakes: want 1, got %d", n) + } +} |