summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/kernel/futex
diff options
context:
space:
mode:
authorGoogler <noreply@google.com>2018-04-27 10:37:02 -0700
committerAdin Scannell <ascannell@google.com>2018-04-28 01:44:26 -0400
commitd02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch)
tree54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/sentry/kernel/futex
parentf70210e742919f40aa2f0934a22f1c9ba6dada62 (diff)
Check in gVisor.
PiperOrigin-RevId: 194583126 Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/sentry/kernel/futex')
-rw-r--r--pkg/sentry/kernel/futex/BUILD48
-rw-r--r--pkg/sentry/kernel/futex/futex.go405
-rw-r--r--pkg/sentry/kernel/futex/futex_test.go500
3 files changed, 953 insertions, 0 deletions
diff --git a/pkg/sentry/kernel/futex/BUILD b/pkg/sentry/kernel/futex/BUILD
new file mode 100644
index 000000000..de9897c58
--- /dev/null
+++ b/pkg/sentry/kernel/futex/BUILD
@@ -0,0 +1,48 @@
+package(licenses = ["notice"]) # Apache 2.0
+
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("//tools/go_generics:defs.bzl", "go_template_instance")
+load("//tools/go_stateify:defs.bzl", "go_stateify")
+
+go_template_instance(
+ name = "waiter_list",
+ out = "waiter_list.go",
+ package = "futex",
+ prefix = "waiter",
+ template = "//pkg/ilist:generic_list",
+ types = {
+ "Linker": "*Waiter",
+ },
+)
+
+go_stateify(
+ name = "futex_state",
+ srcs = [
+ "futex.go",
+ "waiter_list.go",
+ ],
+ out = "futex_state.go",
+ package = "futex",
+)
+
+go_library(
+ name = "futex",
+ srcs = [
+ "futex.go",
+ "futex_state.go",
+ "waiter_list.go",
+ ],
+ importpath = "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/futex",
+ visibility = ["//pkg/sentry:internal"],
+ deps = [
+ "//pkg/state",
+ "//pkg/syserror",
+ ],
+)
+
+go_test(
+ name = "futex_test",
+ size = "small",
+ srcs = ["futex_test.go"],
+ embed = [":futex"],
+)
diff --git a/pkg/sentry/kernel/futex/futex.go b/pkg/sentry/kernel/futex/futex.go
new file mode 100644
index 000000000..b3ba57a2c
--- /dev/null
+++ b/pkg/sentry/kernel/futex/futex.go
@@ -0,0 +1,405 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package futex provides an implementation of the futex interface as found in
+// the Linux kernel. It allows one to easily transform Wait() calls into waits
+// on a channel, which is useful in a Go-based kernel, for example.
+package futex
+
+import (
+ "sync"
+ "sync/atomic"
+
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+)
+
+// Checker abstracts memory accesses. This is useful because the "addresses"
+// used in this package may not be real addresses (they could be indices of an
+// array, for example), or they could be mapped via some special mechanism.
+//
+// TODO: Replace this with usermem.IO.
+type Checker interface {
+ // Check should validate that given address contains the given value.
+ // If it does not contain the value, syserror.EAGAIN must be returned.
+ // Any other error may be returned, which will be propagated.
+ Check(addr uintptr, val uint32) error
+
+ // Op should atomically perform the operation encoded in op on the data
+ // pointed to by addr, then apply the comparison encoded in op to the
+ // original value at addr, returning the result.
+ // Note that op is an opaque operation whose behaviour is defined
+ // outside of the futex manager.
+ Op(addr uintptr, op uint32) (bool, error)
+}
+
+// Waiter is the struct which gets enqueued into buckets for wake up routines
+// and requeue routines to scan and notify. Once a Waiter has been enqueued by
+// WaitPrepare(), callers may listen on C for wake up events.
+type Waiter struct {
+ // Synchronization:
+ //
+ // - A Waiter that is not enqueued in a bucket is exclusively owned (no
+ // synchronization applies).
+ //
+ // - A Waiter is enqueued in a bucket by calling WaitPrepare(). After this,
+ // waiterEntry, complete, and addr are protected by the bucket.mu ("bucket
+ // lock") of the containing bucket, and bitmask is immutable. complete and
+ // addr are additionally mutated using atomic memory operations, ensuring
+ // that they can be read using atomic memory operations without holding the
+ // bucket lock.
+ //
+ // - A Waiter is only guaranteed to be no longer queued after calling
+ // WaitComplete().
+
+ // waiterEntry links Waiter into bucket.waiters.
+ waiterEntry
+
+ // complete is 1 if the Waiter was removed from its bucket by a wakeup and
+ // 0 otherwise.
+ complete int32
+
+ // C is sent to when the Waiter is woken.
+ C chan struct{}
+
+ // addr is the address being waited on.
+ addr uintptr
+
+ // The bitmask we're waiting on.
+ // This is used the case of a FUTEX_WAKE_BITSET.
+ bitmask uint32
+}
+
+// NewWaiter returns a new unqueued Waiter.
+func NewWaiter() *Waiter {
+ return &Waiter{
+ C: make(chan struct{}, 1),
+ }
+}
+
+// bucket holds a list of waiters for a given address hash.
+type bucket struct {
+ // mu protects waiters and contained Waiter state. See comment in Waiter.
+ mu sync.Mutex `state:"nosave"`
+
+ waiters waiterList `state:"zerovalue"`
+}
+
+// wakeLocked wakes up to n waiters matching the bitmask at the addr for this
+// bucket and returns the number of waiters woken.
+//
+// Preconditions: b.mu must be locked.
+func (b *bucket) wakeLocked(addr uintptr, bitmask uint32, n int) int {
+ done := 0
+ for w := b.waiters.Front(); done < n && w != nil; {
+ if w.addr != addr || w.bitmask&bitmask == 0 {
+ // Not matching.
+ w = w.Next()
+ continue
+ }
+
+ // Remove from the bucket and wake the waiter.
+ woke := w
+ w = w.Next() // Next iteration.
+ b.waiters.Remove(woke)
+ woke.C <- struct{}{}
+
+ // NOTE: The above channel write establishes a write barrier
+ // according to the memory model, so nothing may be ordered
+ // around it. Since we've dequeued w and will never touch it
+ // again, we can safely store 1 to w.complete here and allow
+ // the WaitComplete() to short-circuit grabbing the bucket
+ // lock. If they somehow miss the w.complete, we are still
+ // holding the lock, so we can know that they won't dequeue w,
+ // assume it's free and have the below operation afterwards.
+ atomic.StoreInt32(&woke.complete, 1)
+ done++
+ }
+ return done
+}
+
+// requeueLocked takes n waiters from the bucket and moves them to naddr on the
+// bucket "to".
+//
+// Preconditions: b and to must be locked.
+func (b *bucket) requeueLocked(to *bucket, addr, naddr uintptr, n int) int {
+ done := 0
+ for w := b.waiters.Front(); done < n && w != nil; {
+ if w.addr != addr {
+ // Not matching.
+ w = w.Next()
+ continue
+ }
+
+ requeued := w
+ w = w.Next() // Next iteration.
+ b.waiters.Remove(requeued)
+ atomic.StoreUintptr(&requeued.addr, naddr)
+ to.waiters.PushBack(requeued)
+ done++
+ }
+ return done
+}
+
+const (
+ // bucketCount is the number of buckets per Manager. By having many of
+ // these we reduce contention when concurrent yet unrelated calls are made.
+ bucketCount = 1 << bucketCountBits
+ bucketCountBits = 10
+)
+
+func checkAddr(addr uintptr) error {
+ // Ensure the address is aligned.
+ // It must be a DWORD boundary.
+ if addr&0x3 != 0 {
+ return syserror.EINVAL
+ }
+
+ return nil
+}
+
+// bucketIndexForAddr returns the index into Manager.buckets for addr.
+func bucketIndexForAddr(addr uintptr) uintptr {
+ // - The bottom 2 bits of addr must be 0, per checkAddr.
+ //
+ // - On amd64, the top 16 bits of addr (bits 48-63) must be equal to bit 47
+ // for a canonical address, and (on all existing platforms) bit 47 must be
+ // 0 for an application address.
+ //
+ // Thus 19 bits of addr are "useless" for hashing, leaving only 45 "useful"
+ // bits. We choose one of the simplest possible hash functions that at
+ // least uses all 45 useful bits in the output, given that bucketCountBits
+ // == 10. This hash function also has the property that it will usually map
+ // adjacent addresses to adjacent buckets, slightly improving memory
+ // locality when an application synchronization structure uses multiple
+ // nearby futexes.
+ //
+ // Note that despite the large number of arithmetic operations in the
+ // function, many components can be computed in parallel, such that the
+ // critical path is 1 bit shift + 3 additions (2 in h1, then h1 + h2). This
+ // is also why h1 and h2 are grouped separately; for "(addr >> 2) + ... +
+ // (addr >> 42)" without any additional grouping, the compiler puts all 4
+ // additions in the critical path.
+ h1 := (addr >> 2) + (addr >> 12) + (addr >> 22)
+ h2 := (addr >> 32) + (addr >> 42)
+ return (h1 + h2) % bucketCount
+}
+
+// Manager holds futex state for a single virtual address space.
+type Manager struct {
+ buckets [bucketCount]bucket
+}
+
+// NewManager returns an initialized futex manager.
+// N.B. we use virtual address to tag futexes, so it only works for private
+// (within a single process) futex.
+func NewManager() *Manager {
+ return &Manager{}
+}
+
+// lockBucket returns a locked bucket for the given addr.
+//
+// Preconditions: checkAddr(addr) == nil.
+func (m *Manager) lockBucket(addr uintptr) *bucket {
+ b := &m.buckets[bucketIndexForAddr(addr)]
+ b.mu.Lock()
+ return b
+}
+
+// lockBuckets returns locked buckets for the given addrs.
+//
+// Preconditions: checkAddr(addr1) == checkAddr(addr2) == nil.
+func (m *Manager) lockBuckets(addr1 uintptr, addr2 uintptr) (*bucket, *bucket) {
+ i1 := bucketIndexForAddr(addr1)
+ i2 := bucketIndexForAddr(addr2)
+ b1 := &m.buckets[i1]
+ b2 := &m.buckets[i2]
+
+ // Ensure that buckets are locked in a consistent order (lowest index
+ // first) to avoid circular locking.
+ switch {
+ case i1 < i2:
+ b1.mu.Lock()
+ b2.mu.Lock()
+ case i2 < i1:
+ b2.mu.Lock()
+ b1.mu.Lock()
+ default:
+ b1.mu.Lock()
+ }
+
+ return b1, b2
+}
+
+// Wake wakes up to n waiters matching the bitmask on the given addr.
+// The number of waiters woken is returned.
+func (m *Manager) Wake(addr uintptr, bitmask uint32, n int) (int, error) {
+ if err := checkAddr(addr); err != nil {
+ return 0, err
+ }
+
+ b := m.lockBucket(addr)
+ // This function is very hot; avoid defer.
+ r := b.wakeLocked(addr, bitmask, n)
+ b.mu.Unlock()
+ return r, nil
+}
+
+func (m *Manager) doRequeue(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) {
+ if err := checkAddr(addr); err != nil {
+ return 0, err
+ }
+ if err := checkAddr(naddr); err != nil {
+ return 0, err
+ }
+
+ b1, b2 := m.lockBuckets(addr, naddr)
+ defer b1.mu.Unlock()
+ if b2 != b1 {
+ defer b2.mu.Unlock()
+ }
+
+ // Check our value.
+ // This only applied for RequeueCmp().
+ if c != nil {
+ if err := c.Check(addr, val); err != nil {
+ return 0, err
+ }
+ }
+
+ // Wake the number required.
+ done := b1.wakeLocked(addr, ^uint32(0), nwake)
+
+ // Requeue the number required.
+ b1.requeueLocked(b2, addr, naddr, nreq)
+
+ return done, nil
+}
+
+// Requeue wakes up to nwake waiters on the given addr, and unconditionally
+// requeues up to nreq waiters on naddr.
+func (m *Manager) Requeue(addr uintptr, naddr uintptr, nwake int, nreq int) (int, error) {
+ return m.doRequeue(nil, addr, 0, naddr, nwake, nreq)
+}
+
+// RequeueCmp atomically checks that the addr contains val (via the Checker),
+// wakes up to nwake waiters on addr and then unconditionally requeues nreq
+// waiters on naddr.
+func (m *Manager) RequeueCmp(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) {
+ return m.doRequeue(c, addr, val, naddr, nwake, nreq)
+}
+
+// WakeOp atomically applies op to the memory address addr2, wakes up to nwake1
+// waiters unconditionally from addr1, and, based on the original value at addr2
+// and a comparison encoded in op, wakes up to nwake2 waiters from addr2.
+// It returns the total number of waiters woken.
+func (m *Manager) WakeOp(c Checker, addr1 uintptr, addr2 uintptr, nwake1 int, nwake2 int, op uint32) (int, error) {
+ if err := checkAddr(addr1); err != nil {
+ return 0, err
+ }
+ if err := checkAddr(addr2); err != nil {
+ return 0, err
+ }
+
+ b1, b2 := m.lockBuckets(addr1, addr2)
+
+ done := 0
+ cond, err := c.Op(addr2, op)
+ if err == nil {
+ // Wake up up to nwake1 entries from the first bucket.
+ done = b1.wakeLocked(addr1, ^uint32(0), nwake1)
+
+ // Wake up up to nwake2 entries from the second bucket if the
+ // operation yielded true.
+ if cond {
+ done += b2.wakeLocked(addr2, ^uint32(0), nwake2)
+ }
+ }
+
+ b1.mu.Unlock()
+ if b2 != b1 {
+ b2.mu.Unlock()
+ }
+ return done, err
+}
+
+// WaitPrepare atomically checks that addr contains val (via the Checker), then
+// enqueues w to be woken by a send to w.C. If WaitPrepare returns nil, the
+// Waiter must be subsequently removed by calling WaitComplete, whether or not
+// a wakeup is received on w.C.
+func (m *Manager) WaitPrepare(w *Waiter, c Checker, addr uintptr, val uint32, bitmask uint32) error {
+ if err := checkAddr(addr); err != nil {
+ return err
+ }
+
+ // Prepare the Waiter before taking the bucket lock.
+ w.complete = 0
+ select {
+ case <-w.C:
+ default:
+ }
+ w.addr = addr
+ w.bitmask = bitmask
+
+ b := m.lockBucket(addr)
+ // This function is very hot; avoid defer.
+
+ // Perform our atomic check.
+ if err := c.Check(addr, val); err != nil {
+ b.mu.Unlock()
+ return err
+ }
+
+ // Add the waiter to the bucket.
+ b.waiters.PushBack(w)
+
+ b.mu.Unlock()
+ return nil
+}
+
+// WaitComplete must be called when a Waiter previously added by WaitPrepare is
+// no longer eligible to be woken.
+func (m *Manager) WaitComplete(w *Waiter) {
+ // Can we short-circuit acquiring the lock?
+ // This is the happy path where a notification
+ // was received and we don't need to dequeue this
+ // waiter from any list (or take any locks).
+ if atomic.LoadInt32(&w.complete) != 0 {
+ return
+ }
+
+ // Take the bucket lock. Note that without holding the bucket lock, the
+ // waiter is not guaranteed to stay in that bucket, so after we take the
+ // bucket lock, we must ensure that the bucket hasn't changed: if it
+ // happens to have changed, we release the old bucket lock and try again
+ // with the new bucket; if it hasn't changed, we know it won't change now
+ // because we hold the lock.
+ var b *bucket
+ for {
+ addr := atomic.LoadUintptr(&w.addr)
+ b = m.lockBucket(addr)
+ // We still have to use an atomic load here, because if w was racily
+ // requeued then w.addr is not protected by b.mu.
+ if addr == atomic.LoadUintptr(&w.addr) {
+ break
+ }
+ b.mu.Unlock()
+ }
+
+ // Remove waiter from the bucket. w.complete can only be stored with b.mu
+ // locked, so this load doesn't need to use sync/atomic.
+ if w.complete == 0 {
+ b.waiters.Remove(w)
+ }
+ b.mu.Unlock()
+}
diff --git a/pkg/sentry/kernel/futex/futex_test.go b/pkg/sentry/kernel/futex/futex_test.go
new file mode 100644
index 000000000..7b81358ec
--- /dev/null
+++ b/pkg/sentry/kernel/futex/futex_test.go
@@ -0,0 +1,500 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package futex
+
+import (
+ "math"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "syscall"
+ "testing"
+ "unsafe"
+)
+
+const (
+ testMutexSize = 4
+ testMutexLocked uint32 = 1
+ testMutexUnlocked uint32 = 0
+)
+
+// testData implements the Checker interface, and allows us to
+// treat the address passed for futex operations as an index in
+// a byte slice for testing simplicity.
+type testData []byte
+
+func newTestData(size uint) testData {
+ return make([]byte, size)
+}
+
+func (t testData) Check(addr uintptr, val uint32) error {
+ if val != atomic.LoadUint32((*uint32)(unsafe.Pointer(&t[addr]))) {
+ return syscall.EAGAIN
+ }
+ return nil
+}
+
+func (t testData) Op(addr uintptr, val uint32) (bool, error) {
+ return val == 0, nil
+}
+
+// testMutex ties together a testData slice, an address, and a
+// futex manager in order to implement the sync.Locker interface.
+// Beyond being used as a Locker, this is a simple mechanism for
+// changing the underlying values for simpler tests.
+type testMutex struct {
+ a uintptr
+ d testData
+ m *Manager
+}
+
+func newTestMutex(addr uintptr, d testData, m *Manager) *testMutex {
+ return &testMutex{a: addr, d: d, m: m}
+}
+
+// Lock acquires the testMutex.
+// This may wait for it to be available via the futex manager.
+func (t *testMutex) Lock() {
+ for {
+ // Attempt to grab the lock.
+ if atomic.CompareAndSwapUint32(
+ ((*uint32)(unsafe.Pointer(&t.d[t.a]))),
+ testMutexUnlocked,
+ testMutexLocked) {
+ // Lock held.
+ return
+ }
+
+ // Wait for it to be "not locked".
+ w := NewWaiter()
+ err := t.m.WaitPrepare(w, t.d, t.a, testMutexLocked, ^uint32(0))
+ if err == syscall.EAGAIN {
+ continue
+ }
+ if err != nil {
+ // Should never happen.
+ panic("WaitPrepare returned unexpected error: " + err.Error())
+ }
+ <-w.C
+ t.m.WaitComplete(w)
+ }
+}
+
+// Unlock releases the testMutex.
+// This will notify any waiters via the futex manager.
+func (t *testMutex) Unlock() {
+ // Unlock.
+ atomic.StoreUint32(((*uint32)(unsafe.Pointer(&t.d[t.a]))), testMutexUnlocked)
+
+ // Notify all waiters.
+ t.m.Wake(t.a, ^uint32(0), math.MaxInt32)
+}
+
+func TestFutexWake(t *testing.T) {
+ m := NewManager()
+ d := newTestData(testMutexSize)
+
+ // Wait for it to be locked.
+ // (This won't trigger the wake in testMutex)
+ w := NewWaiter()
+ m.WaitPrepare(w, d, 0, testMutexUnlocked, ^uint32(0))
+
+ // Wake the single thread.
+ if _, err := m.Wake(0, ^uint32(0), 1); err != nil {
+ t.Error("wake error:", err)
+ }
+
+ <-w.C
+ m.WaitComplete(w)
+}
+
+func TestFutexWakeBitmask(t *testing.T) {
+ m := NewManager()
+ d := newTestData(testMutexSize)
+
+ // Wait for it to be locked.
+ // (This won't trigger the wake in testMutex)
+ w := NewWaiter()
+ m.WaitPrepare(w, d, 0, testMutexUnlocked, 0x0000ffff)
+
+ // Wake the single thread, not using the bitmask.
+ if _, err := m.Wake(0, 0xffff0000, 1); err != nil {
+ t.Error("wake non-matching bitmask error:", err)
+ }
+
+ select {
+ case <-w.C:
+ t.Error("w is alive?")
+ default:
+ }
+
+ // Now use a matching bitmask.
+ if _, err := m.Wake(0, 0x00000001, 1); err != nil {
+ t.Error("wake matching bitmask error:", err)
+ }
+
+ <-w.C
+ m.WaitComplete(w)
+}
+
+func TestFutexWakeTwo(t *testing.T) {
+ m := NewManager()
+ d := newTestData(testMutexSize)
+
+ // Wait for it to be locked.
+ // (This won't trigger the wake in testMutex)
+ w1 := NewWaiter()
+ w2 := NewWaiter()
+ w3 := NewWaiter()
+ m.WaitPrepare(w1, d, 0, testMutexUnlocked, ^uint32(0))
+ m.WaitPrepare(w2, d, 0, testMutexUnlocked, ^uint32(0))
+ m.WaitPrepare(w3, d, 0, testMutexUnlocked, ^uint32(0))
+
+ // Wake exactly two threads.
+ if _, err := m.Wake(0, ^uint32(0), 2); err != nil {
+ t.Error("wake error:", err)
+ }
+
+ // Ensure exactly two are alive.
+ // We don't get guarantees about exactly which two,
+ // (although we expect them to be w1 and w2).
+ awake := 0
+ for {
+ select {
+ case <-w1.C:
+ awake++
+ case <-w2.C:
+ awake++
+ case <-w3.C:
+ awake++
+ default:
+ if awake != 2 {
+ t.Error("awake != 2?")
+ }
+
+ // Success.
+ return
+ }
+ }
+}
+
+func TestFutexWakeUnrelated(t *testing.T) {
+ m := NewManager()
+ d := newTestData(2 * testMutexSize)
+
+ // Wait for it to be locked.
+ w1 := NewWaiter()
+ w2 := NewWaiter()
+ m.WaitPrepare(w1, d, 0*testMutexSize, testMutexUnlocked, ^uint32(0))
+ m.WaitPrepare(w2, d, 1*testMutexSize, testMutexUnlocked, ^uint32(0))
+
+ // Wake only the second one.
+ if _, err := m.Wake(1*testMutexSize, ^uint32(0), 2); err != nil {
+ t.Error("wake error:", err)
+ }
+
+ // Ensure only r2 is alive.
+ select {
+ case <-w1.C:
+ t.Error("w1 is alive?")
+ default:
+ }
+ <-w2.C
+}
+
+// This function was shamelessly stolen from mutex_test.go.
+func HammerMutex(l sync.Locker, loops int, cdone chan bool) {
+ for i := 0; i < loops; i++ {
+ l.Lock()
+ runtime.Gosched()
+ l.Unlock()
+ }
+ cdone <- true
+}
+
+func TestFutexStress(t *testing.T) {
+ m := NewManager()
+ d := newTestData(testMutexSize)
+ tm := newTestMutex(0*testMutexSize, d, m)
+ c := make(chan bool)
+
+ for i := 0; i < 10; i++ {
+ go HammerMutex(tm, 1000, c)
+ }
+
+ for i := 0; i < 10; i++ {
+ <-c
+ }
+}
+
+func TestWakeOpEmpty(t *testing.T) {
+ m := NewManager()
+ d := newTestData(8)
+
+ n, err := m.WakeOp(d, 0, 4, 10, 10, 0)
+ if err != nil {
+ t.Fatalf("WakeOp failed: %v", err)
+ }
+
+ if n != 0 {
+ t.Fatalf("Invalid number of wakes: want 0, got %d", n)
+ }
+}
+
+func TestWakeOpFirstNonEmpty(t *testing.T) {
+ m := NewManager()
+ d := newTestData(8)
+
+ // Add two waiters on address 0.
+ w1 := NewWaiter()
+ if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w1)
+
+ w2 := NewWaiter()
+ if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w2)
+
+ // Wake up all waiters on address 0.
+ n, err := m.WakeOp(d, 0, 4, 10, 10, 0)
+ if err != nil {
+ t.Fatalf("WakeOp failed: %v", err)
+ }
+
+ if n != 2 {
+ t.Fatalf("Invalid number of wakes: want 2, got %d", n)
+ }
+}
+
+func TestWakeOpSecondNonEmpty(t *testing.T) {
+ m := NewManager()
+ d := newTestData(8)
+
+ // Add two waiters on address 4.
+ w1 := NewWaiter()
+ if err := m.WaitPrepare(w1, d, 4, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w1)
+
+ w2 := NewWaiter()
+ if err := m.WaitPrepare(w2, d, 4, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w2)
+
+ // Wake up all waiters on address 4.
+ n, err := m.WakeOp(d, 0, 4, 10, 10, 0)
+ if err != nil {
+ t.Fatalf("WakeOp failed: %v", err)
+ }
+
+ if n != 2 {
+ t.Fatalf("Invalid number of wakes: want 2, got %d", n)
+ }
+}
+
+func TestWakeOpSecondNonEmptyFailingOp(t *testing.T) {
+ m := NewManager()
+ d := newTestData(8)
+
+ // Add two waiters on address 4.
+ w1 := NewWaiter()
+ if err := m.WaitPrepare(w1, d, 4, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w1)
+
+ w2 := NewWaiter()
+ if err := m.WaitPrepare(w2, d, 4, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w2)
+
+ // Wake up all waiters on address 4.
+ n, err := m.WakeOp(d, 0, 4, 10, 10, 1)
+ if err != nil {
+ t.Fatalf("WakeOp failed: %v", err)
+ }
+
+ if n != 0 {
+ t.Fatalf("Invalid number of wakes: want 0, got %d", n)
+ }
+}
+
+func TestWakeOpAllNonEmpty(t *testing.T) {
+ m := NewManager()
+ d := newTestData(8)
+
+ // Add two waiters on address 0.
+ w1 := NewWaiter()
+ if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w1)
+
+ w2 := NewWaiter()
+ if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w2)
+
+ // Add two waiters on address 4.
+ w3 := NewWaiter()
+ if err := m.WaitPrepare(w3, d, 4, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w3)
+
+ w4 := NewWaiter()
+ if err := m.WaitPrepare(w4, d, 4, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w4)
+
+ // Wake up all waiters on both addresses.
+ n, err := m.WakeOp(d, 0, 4, 10, 10, 0)
+ if err != nil {
+ t.Fatalf("WakeOp failed: %v", err)
+ }
+
+ if n != 4 {
+ t.Fatalf("Invalid number of wakes: want 4, got %d", n)
+ }
+}
+
+func TestWakeOpAllNonEmptyFailingOp(t *testing.T) {
+ m := NewManager()
+ d := newTestData(8)
+
+ // Add two waiters on address 0.
+ w1 := NewWaiter()
+ if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w1)
+
+ w2 := NewWaiter()
+ if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w2)
+
+ // Add two waiters on address 4.
+ w3 := NewWaiter()
+ if err := m.WaitPrepare(w3, d, 4, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w3)
+
+ w4 := NewWaiter()
+ if err := m.WaitPrepare(w4, d, 4, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w4)
+
+ // Wake up all waiters on both addresses.
+ n, err := m.WakeOp(d, 0, 4, 10, 10, 1)
+ if err != nil {
+ t.Fatalf("WakeOp failed: %v", err)
+ }
+
+ if n != 2 {
+ t.Fatalf("Invalid number of wakes: want 2, got %d", n)
+ }
+}
+
+func TestWakeOpSameAddress(t *testing.T) {
+ m := NewManager()
+ d := newTestData(8)
+
+ // Add four waiters on address 0.
+ w1 := NewWaiter()
+ if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w1)
+
+ w2 := NewWaiter()
+ if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w2)
+
+ w3 := NewWaiter()
+ if err := m.WaitPrepare(w3, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w3)
+
+ w4 := NewWaiter()
+ if err := m.WaitPrepare(w4, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w4)
+
+ // Use the same address, with one at most one waiter from each.
+ n, err := m.WakeOp(d, 0, 0, 1, 1, 0)
+ if err != nil {
+ t.Fatalf("WakeOp failed: %v", err)
+ }
+
+ if n != 2 {
+ t.Fatalf("Invalid number of wakes: want 2, got %d", n)
+ }
+}
+
+func TestWakeOpSameAddressFailingOp(t *testing.T) {
+ m := NewManager()
+ d := newTestData(8)
+
+ // Add four waiters on address 0.
+ w1 := NewWaiter()
+ if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w1)
+
+ w2 := NewWaiter()
+ if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w2)
+
+ w3 := NewWaiter()
+ if err := m.WaitPrepare(w3, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w3)
+
+ w4 := NewWaiter()
+ if err := m.WaitPrepare(w4, d, 0, 0, ^uint32(0)); err != nil {
+ t.Fatalf("WaitPrepare failed: %v", err)
+ }
+ defer m.WaitComplete(w4)
+
+ // Use the same address, with one at most one waiter from each.
+ n, err := m.WakeOp(d, 0, 0, 1, 1, 1)
+ if err != nil {
+ t.Fatalf("WakeOp failed: %v", err)
+ }
+
+ if n != 1 {
+ t.Fatalf("Invalid number of wakes: want 1, got %d", n)
+ }
+}