diff options
Diffstat (limited to 'pkg/sentry/kernel')
-rw-r--r-- | pkg/sentry/kernel/BUILD | 1 | ||||
-rw-r--r-- | pkg/sentry/kernel/futex/BUILD | 17 | ||||
-rw-r--r-- | pkg/sentry/kernel/futex/futex.go | 382 | ||||
-rw-r--r-- | pkg/sentry/kernel/futex/futex_test.go | 765 | ||||
-rw-r--r-- | pkg/sentry/kernel/kernel.go | 7 | ||||
-rw-r--r-- | pkg/sentry/kernel/task_clone.go | 2 | ||||
-rw-r--r-- | pkg/sentry/kernel/task_context.go | 15 | ||||
-rw-r--r-- | pkg/sentry/kernel/task_exit.go | 2 | ||||
-rw-r--r-- | pkg/sentry/kernel/task_futex.go | 148 |
9 files changed, 828 insertions, 511 deletions
diff --git a/pkg/sentry/kernel/BUILD b/pkg/sentry/kernel/BUILD index 7eb2bffeb..31ad96612 100644 --- a/pkg/sentry/kernel/BUILD +++ b/pkg/sentry/kernel/BUILD @@ -95,6 +95,7 @@ go_library( "task_context.go", "task_exec.go", "task_exit.go", + "task_futex.go", "task_identity.go", "task_list.go", "task_log.go", diff --git a/pkg/sentry/kernel/futex/BUILD b/pkg/sentry/kernel/futex/BUILD index 0ff5b0a95..e13fcb5ff 100644 --- a/pkg/sentry/kernel/futex/BUILD +++ b/pkg/sentry/kernel/futex/BUILD @@ -4,6 +4,17 @@ load("//tools/go_generics:defs.bzl", "go_template_instance") load("//tools/go_stateify:defs.bzl", "go_library", "go_test") go_template_instance( + name = "atomicptr_bucket", + out = "atomicptr_bucket.go", + package = "futex", + suffix = "Bucket", + template = "//pkg/sync:generic_atomicptr", + types = { + "Value": "bucket", + }, +) + +go_template_instance( name = "waiter_list", out = "waiter_list.go", package = "futex", @@ -18,12 +29,16 @@ go_template_instance( go_library( name = "futex", srcs = [ + "atomicptr_bucket.go", "futex.go", "waiter_list.go", ], importpath = "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/futex", visibility = ["//pkg/sentry:internal"], - deps = ["//pkg/syserror"], + deps = [ + "//pkg/sentry/memmap", + "//pkg/syserror", + ], ) go_test( diff --git a/pkg/sentry/kernel/futex/futex.go b/pkg/sentry/kernel/futex/futex.go index 4a1f2a0ef..54b1982a0 100644 --- a/pkg/sentry/kernel/futex/futex.go +++ b/pkg/sentry/kernel/futex/futex.go @@ -19,11 +19,78 @@ package futex import ( "sync" - "sync/atomic" + "gvisor.googlesource.com/gvisor/pkg/sentry/memmap" "gvisor.googlesource.com/gvisor/pkg/syserror" ) +// KeyKind indicates the type of a Key. +type KeyKind int + +const ( + // KindPrivate indicates a private futex (a futex syscall with the + // FUTEX_PRIVATE_FLAG set). + KindPrivate KeyKind = iota + + // KindSharedPrivate indicates a shared futex on a private memory mapping. + // Although KindPrivate and KindSharedPrivate futexes both use memory + // addresses to identify futexes, they do not interoperate (in Linux, the + // two are distinguished by the FUT_OFF_MMSHARED flag, which is used in key + // comparison). + KindSharedPrivate + + // KindSharedMappable indicates a shared futex on a memory mapping other + // than a private anonymous memory mapping. + KindSharedMappable +) + +// Key represents something that a futex waiter may wait on. +type Key struct { + // Kind is the type of the Key. + Kind KeyKind + + // Mappable is the memory-mapped object that is represented by the Key. + // Mappable is always nil if Kind is not KindSharedMappable, and may be nil + // even if it is. + Mappable memmap.Mappable + + // MappingIdentity is the MappingIdentity associated with Mappable. + // MappingIdentity is always nil is Mappable is nil, and may be nil even if + // it isn't. + MappingIdentity memmap.MappingIdentity + + // If Kind is KindPrivate or KindSharedPrivate, Offset is the represented + // memory address. Otherwise, Offset is the represented offset into + // Mappable. + Offset uint64 +} + +func (k *Key) release() { + if k.MappingIdentity != nil { + k.MappingIdentity.DecRef() + } + k.Mappable = nil + k.MappingIdentity = nil +} + +func (k *Key) clone() Key { + if k.MappingIdentity != nil { + k.MappingIdentity.IncRef() + } + return *k +} + +// Preconditions: k.Kind == KindPrivate or KindSharedPrivate. +func (k *Key) addr() uintptr { + return uintptr(k.Offset) +} + +// matches returns true if a wakeup on k2 should wake a waiter waiting on k. +func (k *Key) matches(k2 *Key) bool { + // k.MappingIdentity is ignored; it's only used for reference counting. + return k.Kind == k2.Kind && k.Mappable == k2.Mappable && k.Offset == k2.Offset +} + // Checker abstracts memory accesses. This is useful because the "addresses" // used in this package may not be real addresses (they could be indices of an // array, for example), or they could be mapped via some special mechanism. @@ -41,6 +108,14 @@ type Checker interface { // Note that op is an opaque operation whose behaviour is defined // outside of the futex manager. Op(addr uintptr, op uint32) (bool, error) + + // GetSharedKey returns a Key with kind KindSharedPrivate or + // KindSharedMappable corresponding to the memory mapped at address addr. + // + // If GetSharedKey returns a Key with a non-nil MappingIdentity, a + // reference is held on the MappingIdentity, which must be dropped by the + // caller when the Key is no longer in use. + GetSharedKey(addr uintptr) (Key, error) } // Waiter is the struct which gets enqueued into buckets for wake up routines @@ -53,11 +128,11 @@ type Waiter struct { // synchronization applies). // // - A Waiter is enqueued in a bucket by calling WaitPrepare(). After this, - // waiterEntry, complete, and addr are protected by the bucket.mu ("bucket - // lock") of the containing bucket, and bitmask is immutable. complete and - // addr are additionally mutated using atomic memory operations, ensuring - // that they can be read using atomic memory operations without holding the - // bucket lock. + // waiterEntry, bucket, and key are protected by the bucket.mu ("bucket + // lock") of the containing bucket, and bitmask is immutable. Note that + // since bucket is mutated using atomic memory operations, bucket.Load() + // may be called without holding the bucket lock, although it may change + // racily. See WaitComplete(). // // - A Waiter is only guaranteed to be no longer queued after calling // WaitComplete(). @@ -65,15 +140,15 @@ type Waiter struct { // waiterEntry links Waiter into bucket.waiters. waiterEntry - // complete is 1 if the Waiter was removed from its bucket by a wakeup and - // 0 otherwise. - complete int32 + // bucket is the bucket this waiter is queued in. If bucket is nil, the + // waiter is not waiting and is not in any bucket. + bucket AtomicPtrBucket // C is sent to when the Waiter is woken. C chan struct{} - // addr is the address being waited on. - addr uintptr + // key is what this waiter is waiting on. + key Key // The bitmask we're waiting on. // This is used the case of a FUTEX_WAKE_BITSET. @@ -87,7 +162,14 @@ func NewWaiter() *Waiter { } } +// woken returns true if w has been woken since the last call to WaitPrepare. +func (w *Waiter) woken() bool { + return len(w.C) != 0 +} + // bucket holds a list of waiters for a given address hash. +// +// +stateify savable type bucket struct { // mu protects waiters and contained Waiter state. See comment in Waiter. mu sync.Mutex `state:"nosave"` @@ -99,10 +181,10 @@ type bucket struct { // bucket and returns the number of waiters woken. // // Preconditions: b.mu must be locked. -func (b *bucket) wakeLocked(addr uintptr, bitmask uint32, n int) int { +func (b *bucket) wakeLocked(key *Key, bitmask uint32, n int) int { done := 0 for w := b.waiters.Front(); done < n && w != nil; { - if w.addr != addr || w.bitmask&bitmask == 0 { + if !w.key.matches(key) || w.bitmask&bitmask == 0 { // Not matching. w = w.Next() continue @@ -114,15 +196,15 @@ func (b *bucket) wakeLocked(addr uintptr, bitmask uint32, n int) int { b.waiters.Remove(woke) woke.C <- struct{}{} - // NOTE: The above channel write establishes a write barrier - // according to the memory model, so nothing may be ordered - // around it. Since we've dequeued w and will never touch it - // again, we can safely store 1 to w.complete here and allow - // the WaitComplete() to short-circuit grabbing the bucket - // lock. If they somehow miss the w.complete, we are still - // holding the lock, so we can know that they won't dequeue w, - // assume it's free and have the below operation afterwards. - atomic.StoreInt32(&woke.complete, 1) + // NOTE: The above channel write establishes a write barrier according + // to the memory model, so nothing may be ordered around it. Since + // we've dequeued woke and will never touch it again, we can safely + // store nil to woke.bucket here and allow the WaitComplete() to + // short-circuit grabbing the bucket lock. If they somehow miss the + // store, we are still holding the lock, so we can know that they won't + // dequeue woke, assume it's free and have the below operation + // afterwards. + woke.bucket.Store(nil) done++ } return done @@ -132,10 +214,10 @@ func (b *bucket) wakeLocked(addr uintptr, bitmask uint32, n int) int { // bucket "to". // // Preconditions: b and to must be locked. -func (b *bucket) requeueLocked(to *bucket, addr, naddr uintptr, n int) int { +func (b *bucket) requeueLocked(to *bucket, key, nkey *Key, n int) int { done := 0 for w := b.waiters.Front(); done < n && w != nil; { - if w.addr != addr { + if !w.key.matches(key) { // Not matching. w = w.Next() continue @@ -144,8 +226,10 @@ func (b *bucket) requeueLocked(to *bucket, addr, naddr uintptr, n int) int { requeued := w w = w.Next() // Next iteration. b.waiters.Remove(requeued) - atomic.StoreUintptr(&requeued.addr, naddr) + requeued.key.release() + requeued.key = nkey.clone() to.waiters.PushBack(requeued) + requeued.bucket.Store(to) done++ } return done @@ -158,19 +242,22 @@ const ( bucketCountBits = 10 ) -func checkAddr(addr uintptr) error { +// getKey returns a Key representing address addr in c. +func getKey(c Checker, addr uintptr, private bool) (Key, error) { // Ensure the address is aligned. // It must be a DWORD boundary. if addr&0x3 != 0 { - return syserror.EINVAL + return Key{}, syserror.EINVAL } - - return nil + if private { + return Key{Kind: KindPrivate, Offset: uint64(addr)}, nil + } + return c.GetSharedKey(addr) } // bucketIndexForAddr returns the index into Manager.buckets for addr. func bucketIndexForAddr(addr uintptr) uintptr { - // - The bottom 2 bits of addr must be 0, per checkAddr. + // - The bottom 2 bits of addr must be 0, per getKey. // // - On amd64, the top 16 bits of addr (bits 48-63) must be equal to bit 47 // for a canonical address, and (on all existing platforms) bit 47 must be @@ -199,171 +286,216 @@ func bucketIndexForAddr(addr uintptr) uintptr { // // +stateify savable type Manager struct { - buckets [bucketCount]bucket `state:"zerovalue"` + // privateBuckets holds buckets for KindPrivate and KindSharedPrivate + // futexes. + privateBuckets [bucketCount]bucket `state:"zerovalue"` + + // sharedBucket is the bucket for KindSharedMappable futexes. sharedBucket + // may be shared by multiple Managers. The sharedBucket pointer is + // immutable. + sharedBucket *bucket } // NewManager returns an initialized futex manager. -// N.B. we use virtual address to tag futexes, so it only works for private -// (within a single process) futex. func NewManager() *Manager { - return &Manager{} + return &Manager{ + sharedBucket: &bucket{}, + } } -// lockBucket returns a locked bucket for the given addr. -// -// Preconditions: checkAddr(addr) == nil. -func (m *Manager) lockBucket(addr uintptr) *bucket { - b := &m.buckets[bucketIndexForAddr(addr)] +// Fork returns a new Manager. Shared futex clients using the returned Manager +// may interoperate with those using m. +func (m *Manager) Fork() *Manager { + return &Manager{ + sharedBucket: m.sharedBucket, + } +} + +// lockBucket returns a locked bucket for the given key. +func (m *Manager) lockBucket(k *Key) *bucket { + var b *bucket + if k.Kind == KindSharedMappable { + b = m.sharedBucket + } else { + b = &m.privateBuckets[bucketIndexForAddr(k.addr())] + } b.mu.Lock() return b } -// lockBuckets returns locked buckets for the given addrs. -// -// Preconditions: checkAddr(addr1) == checkAddr(addr2) == nil. -func (m *Manager) lockBuckets(addr1 uintptr, addr2 uintptr) (*bucket, *bucket) { - i1 := bucketIndexForAddr(addr1) - i2 := bucketIndexForAddr(addr2) - b1 := &m.buckets[i1] - b2 := &m.buckets[i2] - - // Ensure that buckets are locked in a consistent order (lowest index - // first) to avoid circular locking. - switch { - case i1 < i2: - b1.mu.Lock() - b2.mu.Lock() - case i2 < i1: - b2.mu.Lock() - b1.mu.Lock() - default: - b1.mu.Lock() +// lockBuckets returns locked buckets for the given keys. +func (m *Manager) lockBuckets(k1, k2 *Key) (*bucket, *bucket) { + // Buckets must be consistently ordered to avoid circular lock + // dependencies. We order buckets in m.privateBuckets by index (lowest + // index first), and all buckets in m.privateBuckets precede + // m.sharedBucket. + + // Handle the common case first: + if k1.Kind != KindSharedMappable && k2.Kind != KindSharedMappable { + i1 := bucketIndexForAddr(k1.addr()) + i2 := bucketIndexForAddr(k2.addr()) + b1 := &m.privateBuckets[i1] + b2 := &m.privateBuckets[i2] + switch { + case i1 < i2: + b1.mu.Lock() + b2.mu.Lock() + case i2 < i1: + b2.mu.Lock() + b1.mu.Lock() + default: + b1.mu.Lock() + } + return b1, b2 } + // At least one of b1 or b2 should be m.sharedBucket. + b1 := m.sharedBucket + b2 := m.sharedBucket + if k1.Kind != KindSharedMappable { + b1 = m.lockBucket(k1) + } else if k2.Kind != KindSharedMappable { + b2 = m.lockBucket(k2) + } + m.sharedBucket.mu.Lock() return b1, b2 } // Wake wakes up to n waiters matching the bitmask on the given addr. // The number of waiters woken is returned. -func (m *Manager) Wake(addr uintptr, bitmask uint32, n int) (int, error) { - if err := checkAddr(addr); err != nil { +func (m *Manager) Wake(c Checker, addr uintptr, private bool, bitmask uint32, n int) (int, error) { + // This function is very hot; avoid defer. + k, err := getKey(c, addr, private) + if err != nil { return 0, err } - b := m.lockBucket(addr) - // This function is very hot; avoid defer. - r := b.wakeLocked(addr, bitmask, n) + b := m.lockBucket(&k) + r := b.wakeLocked(&k, bitmask, n) + b.mu.Unlock() + k.release() return r, nil } -func (m *Manager) doRequeue(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) { - if err := checkAddr(addr); err != nil { +func (m *Manager) doRequeue(c Checker, addr, naddr uintptr, private bool, checkval bool, val uint32, nwake int, nreq int) (int, error) { + k1, err := getKey(c, addr, private) + if err != nil { return 0, err } - if err := checkAddr(naddr); err != nil { + defer k1.release() + k2, err := getKey(c, naddr, private) + if err != nil { return 0, err } + defer k2.release() - b1, b2 := m.lockBuckets(addr, naddr) + b1, b2 := m.lockBuckets(&k1, &k2) defer b1.mu.Unlock() if b2 != b1 { defer b2.mu.Unlock() } - // Check our value. - // This only applied for RequeueCmp(). - if c != nil { + if checkval { if err := c.Check(addr, val); err != nil { return 0, err } } // Wake the number required. - done := b1.wakeLocked(addr, ^uint32(0), nwake) + done := b1.wakeLocked(&k1, ^uint32(0), nwake) // Requeue the number required. - b1.requeueLocked(b2, addr, naddr, nreq) + b1.requeueLocked(b2, &k1, &k2, nreq) return done, nil } // Requeue wakes up to nwake waiters on the given addr, and unconditionally // requeues up to nreq waiters on naddr. -func (m *Manager) Requeue(addr uintptr, naddr uintptr, nwake int, nreq int) (int, error) { - return m.doRequeue(nil, addr, 0, naddr, nwake, nreq) +func (m *Manager) Requeue(c Checker, addr, naddr uintptr, private bool, nwake int, nreq int) (int, error) { + return m.doRequeue(c, addr, naddr, private, false, 0, nwake, nreq) } // RequeueCmp atomically checks that the addr contains val (via the Checker), // wakes up to nwake waiters on addr and then unconditionally requeues nreq // waiters on naddr. -func (m *Manager) RequeueCmp(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) { - return m.doRequeue(c, addr, val, naddr, nwake, nreq) +func (m *Manager) RequeueCmp(c Checker, addr, naddr uintptr, private bool, val uint32, nwake int, nreq int) (int, error) { + return m.doRequeue(c, addr, naddr, private, true, val, nwake, nreq) } // WakeOp atomically applies op to the memory address addr2, wakes up to nwake1 // waiters unconditionally from addr1, and, based on the original value at addr2 // and a comparison encoded in op, wakes up to nwake2 waiters from addr2. // It returns the total number of waiters woken. -func (m *Manager) WakeOp(c Checker, addr1 uintptr, addr2 uintptr, nwake1 int, nwake2 int, op uint32) (int, error) { - if err := checkAddr(addr1); err != nil { +func (m *Manager) WakeOp(c Checker, addr1, addr2 uintptr, private bool, nwake1 int, nwake2 int, op uint32) (int, error) { + k1, err := getKey(c, addr1, private) + if err != nil { return 0, err } - if err := checkAddr(addr2); err != nil { + defer k1.release() + k2, err := getKey(c, addr2, private) + if err != nil { return 0, err } + defer k2.release() - b1, b2 := m.lockBuckets(addr1, addr2) + b1, b2 := m.lockBuckets(&k1, &k2) + defer b1.mu.Unlock() + if b2 != b1 { + defer b2.mu.Unlock() + } done := 0 cond, err := c.Op(addr2, op) - if err == nil { - // Wake up up to nwake1 entries from the first bucket. - done = b1.wakeLocked(addr1, ^uint32(0), nwake1) - - // Wake up up to nwake2 entries from the second bucket if the - // operation yielded true. - if cond { - done += b2.wakeLocked(addr2, ^uint32(0), nwake2) - } + if err != nil { + return 0, err } - b1.mu.Unlock() - if b2 != b1 { - b2.mu.Unlock() + // Wake up up to nwake1 entries from the first bucket. + done = b1.wakeLocked(&k1, ^uint32(0), nwake1) + + // Wake up up to nwake2 entries from the second bucket if the + // operation yielded true. + if cond { + done += b2.wakeLocked(&k2, ^uint32(0), nwake2) } - return done, err + + return done, nil } // WaitPrepare atomically checks that addr contains val (via the Checker), then // enqueues w to be woken by a send to w.C. If WaitPrepare returns nil, the // Waiter must be subsequently removed by calling WaitComplete, whether or not // a wakeup is received on w.C. -func (m *Manager) WaitPrepare(w *Waiter, c Checker, addr uintptr, val uint32, bitmask uint32) error { - if err := checkAddr(addr); err != nil { +func (m *Manager) WaitPrepare(w *Waiter, c Checker, addr uintptr, private bool, val uint32, bitmask uint32) error { + k, err := getKey(c, addr, private) + if err != nil { return err } + // Ownership of k is transferred to w below. // Prepare the Waiter before taking the bucket lock. - w.complete = 0 select { case <-w.C: default: } - w.addr = addr + w.key = k w.bitmask = bitmask - b := m.lockBucket(addr) + b := m.lockBucket(&k) // This function is very hot; avoid defer. // Perform our atomic check. if err := c.Check(addr, val); err != nil { b.mu.Unlock() + w.key.release() return err } // Add the waiter to the bucket. b.waiters.PushBack(w) + w.bucket.Store(b) b.mu.Unlock() return nil @@ -372,36 +504,36 @@ func (m *Manager) WaitPrepare(w *Waiter, c Checker, addr uintptr, val uint32, bi // WaitComplete must be called when a Waiter previously added by WaitPrepare is // no longer eligible to be woken. func (m *Manager) WaitComplete(w *Waiter) { - // Can we short-circuit acquiring the lock? - // This is the happy path where a notification - // was received and we don't need to dequeue this - // waiter from any list (or take any locks). - if atomic.LoadInt32(&w.complete) != 0 { - return - } - - // Take the bucket lock. Note that without holding the bucket lock, the - // waiter is not guaranteed to stay in that bucket, so after we take the - // bucket lock, we must ensure that the bucket hasn't changed: if it - // happens to have changed, we release the old bucket lock and try again - // with the new bucket; if it hasn't changed, we know it won't change now - // because we hold the lock. - var b *bucket + // Remove w from the bucket it's in. for { - addr := atomic.LoadUintptr(&w.addr) - b = m.lockBucket(addr) - // We still have to use an atomic load here, because if w was racily - // requeued then w.addr is not protected by b.mu. - if addr == atomic.LoadUintptr(&w.addr) { + b := w.bucket.Load() + + // If b is nil, the waiter isn't in any bucket anymore. This can't be + // racy because the waiter can't be concurrently re-queued in another + // bucket. + if b == nil { break } - b.mu.Unlock() - } - // Remove waiter from the bucket. w.complete can only be stored with b.mu - // locked, so this load doesn't need to use sync/atomic. - if w.complete == 0 { + // Take the bucket lock. Note that without holding the bucket lock, the + // waiter is not guaranteed to stay in that bucket, so after we take + // the bucket lock, we must ensure that the bucket hasn't changed: if + // it happens to have changed, we release the old bucket lock and try + // again with the new bucket; if it hasn't changed, we know it won't + // change now because we hold the lock. + b.mu.Lock() + if b != w.bucket.Load() { + b.mu.Unlock() + continue + } + + // Remove w from b. b.waiters.Remove(w) + w.bucket.Store(nil) + b.mu.Unlock() + break } - b.mu.Unlock() + + // Release references held by the waiter. + w.key.release() } diff --git a/pkg/sentry/kernel/futex/futex_test.go b/pkg/sentry/kernel/futex/futex_test.go index 7b81358ec..726c26990 100644 --- a/pkg/sentry/kernel/futex/futex_test.go +++ b/pkg/sentry/kernel/futex/futex_test.go @@ -24,17 +24,13 @@ import ( "unsafe" ) -const ( - testMutexSize = 4 - testMutexLocked uint32 = 1 - testMutexUnlocked uint32 = 0 -) - // testData implements the Checker interface, and allows us to // treat the address passed for futex operations as an index in // a byte slice for testing simplicity. type testData []byte +const sizeofInt32 = 4 + func newTestData(size uint) testData { return make([]byte, size) } @@ -50,451 +46,478 @@ func (t testData) Op(addr uintptr, val uint32) (bool, error) { return val == 0, nil } -// testMutex ties together a testData slice, an address, and a -// futex manager in order to implement the sync.Locker interface. -// Beyond being used as a Locker, this is a simple mechanism for -// changing the underlying values for simpler tests. -type testMutex struct { - a uintptr - d testData - m *Manager -} - -func newTestMutex(addr uintptr, d testData, m *Manager) *testMutex { - return &testMutex{a: addr, d: d, m: m} +func (t testData) GetSharedKey(addr uintptr) (Key, error) { + return Key{ + Kind: KindSharedMappable, + Offset: uint64(addr), + }, nil } -// Lock acquires the testMutex. -// This may wait for it to be available via the futex manager. -func (t *testMutex) Lock() { - for { - // Attempt to grab the lock. - if atomic.CompareAndSwapUint32( - ((*uint32)(unsafe.Pointer(&t.d[t.a]))), - testMutexUnlocked, - testMutexLocked) { - // Lock held. - return - } - - // Wait for it to be "not locked". - w := NewWaiter() - err := t.m.WaitPrepare(w, t.d, t.a, testMutexLocked, ^uint32(0)) - if err == syscall.EAGAIN { - continue - } - if err != nil { - // Should never happen. - panic("WaitPrepare returned unexpected error: " + err.Error()) - } - <-w.C - t.m.WaitComplete(w) +func futexKind(private bool) string { + if private { + return "private" } + return "shared" } -// Unlock releases the testMutex. -// This will notify any waiters via the futex manager. -func (t *testMutex) Unlock() { - // Unlock. - atomic.StoreUint32(((*uint32)(unsafe.Pointer(&t.d[t.a]))), testMutexUnlocked) - - // Notify all waiters. - t.m.Wake(t.a, ^uint32(0), math.MaxInt32) +func newPreparedTestWaiter(t *testing.T, m *Manager, c Checker, addr uintptr, private bool, val uint32, bitmask uint32) *Waiter { + w := NewWaiter() + if err := m.WaitPrepare(w, c, addr, private, val, bitmask); err != nil { + t.Fatalf("WaitPrepare failed: %v", err) + } + return w } func TestFutexWake(t *testing.T) { - m := NewManager() - d := newTestData(testMutexSize) - - // Wait for it to be locked. - // (This won't trigger the wake in testMutex) - w := NewWaiter() - m.WaitPrepare(w, d, 0, testMutexUnlocked, ^uint32(0)) + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(sizeofInt32) + + // Start waiting for wakeup. + w := newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(w) + + // Perform a wakeup. + if n, err := m.Wake(d, 0, private, ^uint32(0), 1); err != nil || n != 1 { + t.Errorf("Wake: got (%d, %v), wanted (1, nil)", n, err) + } - // Wake the single thread. - if _, err := m.Wake(0, ^uint32(0), 1); err != nil { - t.Error("wake error:", err) + // Expect the waiter to have been woken. + if !w.woken() { + t.Error("waiter not woken") + } + }) } - - <-w.C - m.WaitComplete(w) } func TestFutexWakeBitmask(t *testing.T) { - m := NewManager() - d := newTestData(testMutexSize) - - // Wait for it to be locked. - // (This won't trigger the wake in testMutex) - w := NewWaiter() - m.WaitPrepare(w, d, 0, testMutexUnlocked, 0x0000ffff) + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(sizeofInt32) + + // Start waiting for wakeup. + w := newPreparedTestWaiter(t, m, d, 0, private, 0, 0x0000ffff) + defer m.WaitComplete(w) + + // Perform a wakeup using the wrong bitmask. + if n, err := m.Wake(d, 0, private, 0xffff0000, 1); err != nil || n != 0 { + t.Errorf("Wake with non-matching bitmask: got (%d, %v), wanted (0, nil)", n, err) + } - // Wake the single thread, not using the bitmask. - if _, err := m.Wake(0, 0xffff0000, 1); err != nil { - t.Error("wake non-matching bitmask error:", err) - } + // Expect the waiter to still be waiting. + if w.woken() { + t.Error("waiter woken unexpectedly") + } - select { - case <-w.C: - t.Error("w is alive?") - default: - } + // Perform a wakeup using the right bitmask. + if n, err := m.Wake(d, 0, private, 0x00000001, 1); err != nil || n != 1 { + t.Errorf("Wake with matching bitmask: got (%d, %v), wanted (1, nil)", n, err) + } - // Now use a matching bitmask. - if _, err := m.Wake(0, 0x00000001, 1); err != nil { - t.Error("wake matching bitmask error:", err) + // Expect that the waiter was woken. + if !w.woken() { + t.Error("waiter not woken") + } + }) } - - <-w.C - m.WaitComplete(w) } func TestFutexWakeTwo(t *testing.T) { - m := NewManager() - d := newTestData(testMutexSize) + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(sizeofInt32) + + // Start three waiters waiting for wakeup. + var ws [3]*Waiter + for i := range ws { + ws[i] = newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(ws[i]) + } - // Wait for it to be locked. - // (This won't trigger the wake in testMutex) - w1 := NewWaiter() - w2 := NewWaiter() - w3 := NewWaiter() - m.WaitPrepare(w1, d, 0, testMutexUnlocked, ^uint32(0)) - m.WaitPrepare(w2, d, 0, testMutexUnlocked, ^uint32(0)) - m.WaitPrepare(w3, d, 0, testMutexUnlocked, ^uint32(0)) - - // Wake exactly two threads. - if _, err := m.Wake(0, ^uint32(0), 2); err != nil { - t.Error("wake error:", err) - } + // Perform two wakeups. + const wakeups = 2 + if n, err := m.Wake(d, 0, private, ^uint32(0), 2); err != nil || n != wakeups { + t.Errorf("Wake: got (%d, %v), wanted (%d, nil)", n, err, wakeups) + } - // Ensure exactly two are alive. - // We don't get guarantees about exactly which two, - // (although we expect them to be w1 and w2). - awake := 0 - for { - select { - case <-w1.C: - awake++ - case <-w2.C: - awake++ - case <-w3.C: - awake++ - default: - if awake != 2 { - t.Error("awake != 2?") - } - - // Success. - return - } + // Expect that exactly two waiters were woken. + // We don't get guarantees about exactly which two, + // (although we expect them to be w1 and w2). + awake := 0 + for i := range ws { + if ws[i].woken() { + awake++ + } + } + if awake != wakeups { + t.Errorf("got %d woken waiters, wanted %d", awake, wakeups) + } + }) } } func TestFutexWakeUnrelated(t *testing.T) { - m := NewManager() - d := newTestData(2 * testMutexSize) - - // Wait for it to be locked. - w1 := NewWaiter() - w2 := NewWaiter() - m.WaitPrepare(w1, d, 0*testMutexSize, testMutexUnlocked, ^uint32(0)) - m.WaitPrepare(w2, d, 1*testMutexSize, testMutexUnlocked, ^uint32(0)) - - // Wake only the second one. - if _, err := m.Wake(1*testMutexSize, ^uint32(0), 2); err != nil { - t.Error("wake error:", err) - } - - // Ensure only r2 is alive. - select { - case <-w1.C: - t.Error("w1 is alive?") - default: - } - <-w2.C -} - -// This function was shamelessly stolen from mutex_test.go. -func HammerMutex(l sync.Locker, loops int, cdone chan bool) { - for i := 0; i < loops; i++ { - l.Lock() - runtime.Gosched() - l.Unlock() - } - cdone <- true -} - -func TestFutexStress(t *testing.T) { - m := NewManager() - d := newTestData(testMutexSize) - tm := newTestMutex(0*testMutexSize, d, m) - c := make(chan bool) - - for i := 0; i < 10; i++ { - go HammerMutex(tm, 1000, c) - } + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(2 * sizeofInt32) + + // Start two waiters waiting for wakeup on different addresses. + w1 := newPreparedTestWaiter(t, m, d, 0*sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w1) + w2 := newPreparedTestWaiter(t, m, d, 1*sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w2) + + // Perform two wakeups on the second address. + if n, err := m.Wake(d, 1*sizeofInt32, private, ^uint32(0), 2); err != nil || n != 1 { + t.Errorf("Wake: got (%d, %v), wanted (1, nil)", n, err) + } - for i := 0; i < 10; i++ { - <-c + // Expect that only the second waiter was woken. + if w1.woken() { + t.Error("w1 woken unexpectedly") + } + if !w2.woken() { + t.Error("w2 not woken") + } + }) } } func TestWakeOpEmpty(t *testing.T) { - m := NewManager() - d := newTestData(8) - - n, err := m.WakeOp(d, 0, 4, 10, 10, 0) - if err != nil { - t.Fatalf("WakeOp failed: %v", err) - } - - if n != 0 { - t.Fatalf("Invalid number of wakes: want 0, got %d", n) + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(2 * sizeofInt32) + + // Perform wakeups with no waiters. + if n, err := m.WakeOp(d, 0, sizeofInt32, private, 10, 10, 0); err != nil || n != 0 { + t.Fatalf("WakeOp: got (%d, %v), wanted (0, nil)", n, err) + } + }) } } func TestWakeOpFirstNonEmpty(t *testing.T) { - m := NewManager() - d := newTestData(8) - - // Add two waiters on address 0. - w1 := NewWaiter() - if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w1) - - w2 := NewWaiter() - if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w2) - - // Wake up all waiters on address 0. - n, err := m.WakeOp(d, 0, 4, 10, 10, 0) - if err != nil { - t.Fatalf("WakeOp failed: %v", err) - } + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address 0. + w1 := newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(w1) + w2 := newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(w2) + + // Perform 10 wakeups on address 0. + if n, err := m.WakeOp(d, 0, sizeofInt32, private, 10, 0, 0); err != nil || n != 2 { + t.Errorf("WakeOp: got (%d, %v), wanted (2, nil)", n, err) + } - if n != 2 { - t.Fatalf("Invalid number of wakes: want 2, got %d", n) + // Expect that both waiters were woken. + if !w1.woken() { + t.Error("w1 not woken") + } + if !w2.woken() { + t.Error("w2 not woken") + } + }) } } func TestWakeOpSecondNonEmpty(t *testing.T) { - m := NewManager() - d := newTestData(8) - - // Add two waiters on address 4. - w1 := NewWaiter() - if err := m.WaitPrepare(w1, d, 4, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w1) - - w2 := NewWaiter() - if err := m.WaitPrepare(w2, d, 4, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w2) - - // Wake up all waiters on address 4. - n, err := m.WakeOp(d, 0, 4, 10, 10, 0) - if err != nil { - t.Fatalf("WakeOp failed: %v", err) - } + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address sizeofInt32. + w1 := newPreparedTestWaiter(t, m, d, sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w1) + w2 := newPreparedTestWaiter(t, m, d, sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w2) + + // Perform 10 wakeups on address sizeofInt32 (contingent on + // d.Op(0), which should succeed). + if n, err := m.WakeOp(d, 0, sizeofInt32, private, 0, 10, 0); err != nil || n != 2 { + t.Errorf("WakeOp: got (%d, %v), wanted (2, nil)", n, err) + } - if n != 2 { - t.Fatalf("Invalid number of wakes: want 2, got %d", n) + // Expect that both waiters were woken. + if !w1.woken() { + t.Error("w1 not woken") + } + if !w2.woken() { + t.Error("w2 not woken") + } + }) } } func TestWakeOpSecondNonEmptyFailingOp(t *testing.T) { - m := NewManager() - d := newTestData(8) - - // Add two waiters on address 4. - w1 := NewWaiter() - if err := m.WaitPrepare(w1, d, 4, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w1) - - w2 := NewWaiter() - if err := m.WaitPrepare(w2, d, 4, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w2) - - // Wake up all waiters on address 4. - n, err := m.WakeOp(d, 0, 4, 10, 10, 1) - if err != nil { - t.Fatalf("WakeOp failed: %v", err) - } + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address sizeofInt32. + w1 := newPreparedTestWaiter(t, m, d, sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w1) + w2 := newPreparedTestWaiter(t, m, d, sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w2) + + // Perform 10 wakeups on address sizeofInt32 (contingent on + // d.Op(1), which should fail). + if n, err := m.WakeOp(d, 0, sizeofInt32, private, 0, 10, 1); err != nil || n != 0 { + t.Errorf("WakeOp: got (%d, %v), wanted (0, nil)", n, err) + } - if n != 0 { - t.Fatalf("Invalid number of wakes: want 0, got %d", n) + // Expect that neither waiter was woken. + if w1.woken() { + t.Error("w1 woken unexpectedly") + } + if w2.woken() { + t.Error("w2 woken unexpectedly") + } + }) } } func TestWakeOpAllNonEmpty(t *testing.T) { - m := NewManager() - d := newTestData(8) + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address 0. + w1 := newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(w1) + w2 := newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(w2) + + // Add two waiters on address sizeofInt32. + w3 := newPreparedTestWaiter(t, m, d, sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w3) + w4 := newPreparedTestWaiter(t, m, d, sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w4) + + // Perform 10 wakeups on address 0 (unconditionally), and 10 + // wakeups on address sizeofInt32 (contingent on d.Op(0), which + // should succeed). + if n, err := m.WakeOp(d, 0, sizeofInt32, private, 10, 10, 0); err != nil || n != 4 { + t.Errorf("WakeOp: got (%d, %v), wanted (4, nil)", n, err) + } - // Add two waiters on address 0. - w1 := NewWaiter() - if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) + // Expect that all waiters were woken. + if !w1.woken() { + t.Error("w1 not woken") + } + if !w2.woken() { + t.Error("w2 not woken") + } + if !w3.woken() { + t.Error("w3 not woken") + } + if !w4.woken() { + t.Error("w4 not woken") + } + }) } - defer m.WaitComplete(w1) +} - w2 := NewWaiter() - if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w2) +func TestWakeOpAllNonEmptyFailingOp(t *testing.T) { + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add two waiters on address 0. + w1 := newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(w1) + w2 := newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(w2) + + // Add two waiters on address sizeofInt32. + w3 := newPreparedTestWaiter(t, m, d, sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w3) + w4 := newPreparedTestWaiter(t, m, d, sizeofInt32, private, 0, ^uint32(0)) + defer m.WaitComplete(w4) + + // Perform 10 wakeups on address 0 (unconditionally), and 10 + // wakeups on address sizeofInt32 (contingent on d.Op(1), which + // should fail). + if n, err := m.WakeOp(d, 0, sizeofInt32, private, 10, 10, 1); err != nil || n != 2 { + t.Errorf("WakeOp: got (%d, %v), wanted (2, nil)", n, err) + } - // Add two waiters on address 4. - w3 := NewWaiter() - if err := m.WaitPrepare(w3, d, 4, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) + // Expect that only the first two waiters were woken. + if !w1.woken() { + t.Error("w1 not woken") + } + if !w2.woken() { + t.Error("w2 not woken") + } + if w3.woken() { + t.Error("w3 woken unexpectedly") + } + if w4.woken() { + t.Error("w4 woken unexpectedly") + } + }) } - defer m.WaitComplete(w3) +} - w4 := NewWaiter() - if err := m.WaitPrepare(w4, d, 4, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w4) +func TestWakeOpSameAddress(t *testing.T) { + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add four waiters on address 0. + var ws [4]*Waiter + for i := range ws { + ws[i] = newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(ws[i]) + } - // Wake up all waiters on both addresses. - n, err := m.WakeOp(d, 0, 4, 10, 10, 0) - if err != nil { - t.Fatalf("WakeOp failed: %v", err) - } + // Perform 1 wakeup on address 0 (unconditionally), and 1 wakeup + // on address 0 (contingent on d.Op(0), which should succeed). + const wakeups = 2 + if n, err := m.WakeOp(d, 0, 0, private, 1, 1, 0); err != nil || n != wakeups { + t.Errorf("WakeOp: got (%d, %v), wanted (%d, nil)", n, err, wakeups) + } - if n != 4 { - t.Fatalf("Invalid number of wakes: want 4, got %d", n) + // Expect that exactly two waiters were woken. + awake := 0 + for i := range ws { + if ws[i].woken() { + awake++ + } + } + if awake != wakeups { + t.Errorf("got %d woken waiters, wanted %d", awake, wakeups) + } + }) } } -func TestWakeOpAllNonEmptyFailingOp(t *testing.T) { - m := NewManager() - d := newTestData(8) - - // Add two waiters on address 0. - w1 := NewWaiter() - if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w1) - - w2 := NewWaiter() - if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w2) +func TestWakeOpSameAddressFailingOp(t *testing.T) { + for _, private := range []bool{false, true} { + t.Run(futexKind(private), func(t *testing.T) { + m := NewManager() + d := newTestData(8) + + // Add four waiters on address 0. + var ws [4]*Waiter + for i := range ws { + ws[i] = newPreparedTestWaiter(t, m, d, 0, private, 0, ^uint32(0)) + defer m.WaitComplete(ws[i]) + } - // Add two waiters on address 4. - w3 := NewWaiter() - if err := m.WaitPrepare(w3, d, 4, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w3) + // Perform 1 wakeup on address 0 (unconditionally), and 1 wakeup + // on address 0 (contingent on d.Op(1), which should fail). + const wakeups = 1 + if n, err := m.WakeOp(d, 0, 0, private, 1, 1, 1); err != nil || n != wakeups { + t.Errorf("WakeOp: got (%d, %v), wanted (%d, nil)", n, err, wakeups) + } - w4 := NewWaiter() - if err := m.WaitPrepare(w4, d, 4, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) + // Expect that exactly one waiter was woken. + awake := 0 + for i := range ws { + if ws[i].woken() { + awake++ + } + } + if awake != wakeups { + t.Errorf("got %d woken waiters, wanted %d", awake, wakeups) + } + }) } - defer m.WaitComplete(w4) +} - // Wake up all waiters on both addresses. - n, err := m.WakeOp(d, 0, 4, 10, 10, 1) - if err != nil { - t.Fatalf("WakeOp failed: %v", err) - } +const ( + testMutexSize = sizeofInt32 + testMutexLocked uint32 = 1 + testMutexUnlocked uint32 = 0 +) - if n != 2 { - t.Fatalf("Invalid number of wakes: want 2, got %d", n) - } +// testMutex ties together a testData slice, an address, and a +// futex manager in order to implement the sync.Locker interface. +// Beyond being used as a Locker, this is a simple mechanism for +// changing the underlying values for simpler tests. +type testMutex struct { + a uintptr + d testData + m *Manager } -func TestWakeOpSameAddress(t *testing.T) { - m := NewManager() - d := newTestData(8) - - // Add four waiters on address 0. - w1 := NewWaiter() - if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w1) +func newTestMutex(addr uintptr, d testData, m *Manager) *testMutex { + return &testMutex{a: addr, d: d, m: m} +} - w2 := NewWaiter() - if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w2) +// Lock acquires the testMutex. +// This may wait for it to be available via the futex manager. +func (t *testMutex) Lock() { + for { + // Attempt to grab the lock. + if atomic.CompareAndSwapUint32( + (*uint32)(unsafe.Pointer(&t.d[t.a])), + testMutexUnlocked, + testMutexLocked) { + // Lock held. + return + } - w3 := NewWaiter() - if err := m.WaitPrepare(w3, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) + // Wait for it to be "not locked". + w := NewWaiter() + err := t.m.WaitPrepare(w, t.d, t.a, true, testMutexLocked, ^uint32(0)) + if err == syscall.EAGAIN { + continue + } + if err != nil { + // Should never happen. + panic("WaitPrepare returned unexpected error: " + err.Error()) + } + <-w.C + t.m.WaitComplete(w) } - defer m.WaitComplete(w3) +} - w4 := NewWaiter() - if err := m.WaitPrepare(w4, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w4) +// Unlock releases the testMutex. +// This will notify any waiters via the futex manager. +func (t *testMutex) Unlock() { + // Unlock. + atomic.StoreUint32((*uint32)(unsafe.Pointer(&t.d[t.a])), testMutexUnlocked) - // Use the same address, with one at most one waiter from each. - n, err := m.WakeOp(d, 0, 0, 1, 1, 0) - if err != nil { - t.Fatalf("WakeOp failed: %v", err) - } + // Notify all waiters. + t.m.Wake(t.d, t.a, true, ^uint32(0), math.MaxInt32) +} - if n != 2 { - t.Fatalf("Invalid number of wakes: want 2, got %d", n) +// This function was shamelessly stolen from mutex_test.go. +func HammerMutex(l sync.Locker, loops int, cdone chan bool) { + for i := 0; i < loops; i++ { + l.Lock() + runtime.Gosched() + l.Unlock() } + cdone <- true } -func TestWakeOpSameAddressFailingOp(t *testing.T) { +func TestMutexStress(t *testing.T) { m := NewManager() - d := newTestData(8) - - // Add four waiters on address 0. - w1 := NewWaiter() - if err := m.WaitPrepare(w1, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w1) - - w2 := NewWaiter() - if err := m.WaitPrepare(w2, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w2) - - w3 := NewWaiter() - if err := m.WaitPrepare(w3, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w3) - - w4 := NewWaiter() - if err := m.WaitPrepare(w4, d, 0, 0, ^uint32(0)); err != nil { - t.Fatalf("WaitPrepare failed: %v", err) - } - defer m.WaitComplete(w4) + d := newTestData(testMutexSize) + tm := newTestMutex(0*testMutexSize, d, m) + c := make(chan bool) - // Use the same address, with one at most one waiter from each. - n, err := m.WakeOp(d, 0, 0, 1, 1, 1) - if err != nil { - t.Fatalf("WakeOp failed: %v", err) + for i := 0; i < 10; i++ { + go HammerMutex(tm, 1000, c) } - if n != 1 { - t.Fatalf("Invalid number of wakes: want 1, got %d", n) + for i := 0; i < 10; i++ { + <-c } } diff --git a/pkg/sentry/kernel/kernel.go b/pkg/sentry/kernel/kernel.go index 1ace0b501..238fd127b 100644 --- a/pkg/sentry/kernel/kernel.go +++ b/pkg/sentry/kernel/kernel.go @@ -49,6 +49,7 @@ import ( "gvisor.googlesource.com/gvisor/pkg/sentry/inet" "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/auth" "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/epoll" + "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/futex" "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/sched" ktime "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/time" "gvisor.googlesource.com/gvisor/pkg/sentry/limits" @@ -108,6 +109,11 @@ type Kernel struct { // Kernel.CreateProcess can succeed. mounts *fs.MountNamespace + // futexes is the "root" futex.Manager, from which all others are forked. + // This is necessary to ensure that shared futexes are coherent across all + // tasks, including those created by CreateProcess. + futexes *futex.Manager + // globalInit is the thread group whose leader has ID 1 in the root PID // namespace. globalInit is stored separately so that it is accessible even // after all tasks in the thread group have exited, such that ID 1 is no @@ -254,6 +260,7 @@ func (k *Kernel) Init(args InitKernelArgs) error { k.vdso = args.Vdso k.realtimeClock = &timekeeperClock{tk: args.Timekeeper, c: sentrytime.Realtime} k.monotonicClock = &timekeeperClock{tk: args.Timekeeper, c: sentrytime.Monotonic} + k.futexes = futex.NewManager() k.netlinkPorts = port.New() return nil diff --git a/pkg/sentry/kernel/task_clone.go b/pkg/sentry/kernel/task_clone.go index 130bd652b..7c469ec46 100644 --- a/pkg/sentry/kernel/task_clone.go +++ b/pkg/sentry/kernel/task_clone.go @@ -200,7 +200,7 @@ func (t *Task) Clone(opts *CloneOptions) (ThreadID, *SyscallControl, error) { ipcns = NewIPCNamespace(userns) } - tc, err := t.tc.Fork(t, !opts.NewAddressSpace) + tc, err := t.tc.Fork(t, t.k, !opts.NewAddressSpace) if err != nil { return 0, nil, err } diff --git a/pkg/sentry/kernel/task_context.go b/pkg/sentry/kernel/task_context.go index 9a59cbd33..d2df7e9d1 100644 --- a/pkg/sentry/kernel/task_context.go +++ b/pkg/sentry/kernel/task_context.go @@ -72,7 +72,7 @@ func (tc *TaskContext) release() { // TaskContext shares an address space with the original; otherwise, the copied // TaskContext has an independent address space that is initially a duplicate // of the original's. -func (tc *TaskContext) Fork(ctx context.Context, shareAddressSpace bool) (*TaskContext, error) { +func (tc *TaskContext) Fork(ctx context.Context, k *Kernel, shareAddressSpace bool) (*TaskContext, error) { newTC := &TaskContext{ Arch: tc.Arch.Fork(), st: tc.st, @@ -93,8 +93,7 @@ func (tc *TaskContext) Fork(ctx context.Context, shareAddressSpace bool) (*TaskC return nil, err } newTC.MemoryManager = newMM - // TODO: revisit when shmem is supported. - newTC.fu = futex.NewManager() + newTC.fu = k.futexes.Fork() } return newTC, nil } @@ -116,14 +115,6 @@ func (t *Task) MemoryManager() *mm.MemoryManager { return t.tc.MemoryManager } -// Futex returns t's futex manager. -// -// Preconditions: The caller must be running on the task goroutine, or t.mu -// must be locked. -func (t *Task) Futex() *futex.Manager { - return t.tc.fu -} - // SyscallTable returns t's syscall table. // // Preconditions: The caller must be running on the task goroutine, or t.mu @@ -175,7 +166,7 @@ func (k *Kernel) LoadTaskImage(ctx context.Context, mounts *fs.MountNamespace, r Name: name, Arch: ac, MemoryManager: m, - fu: futex.NewManager(), + fu: k.futexes.Fork(), st: st, }, nil } diff --git a/pkg/sentry/kernel/task_exit.go b/pkg/sentry/kernel/task_exit.go index a1b24e1c6..f5b45fb17 100644 --- a/pkg/sentry/kernel/task_exit.go +++ b/pkg/sentry/kernel/task_exit.go @@ -247,7 +247,7 @@ func (*runExitMain) execute(t *Task) taskRunState { t.tg.signalHandlers.mu.Unlock() if !signaled { if _, err := t.CopyOut(t.cleartid, ThreadID(0)); err == nil { - t.Futex().Wake(uintptr(t.cleartid), ^uint32(0), 1) + t.Futex().Wake(t.FutexChecker(), uintptr(t.cleartid), false, ^uint32(0), 1) } // If the CopyOut fails, there's nothing we can do. } diff --git a/pkg/sentry/kernel/task_futex.go b/pkg/sentry/kernel/task_futex.go new file mode 100644 index 000000000..62ebbcb0d --- /dev/null +++ b/pkg/sentry/kernel/task_futex.go @@ -0,0 +1,148 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kernel + +import ( + "gvisor.googlesource.com/gvisor/pkg/abi/linux" + "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/futex" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/syserror" +) + +// Futex returns t's futex manager. +// +// Preconditions: The caller must be running on the task goroutine, or t.mu +// must be locked. +func (t *Task) Futex() *futex.Manager { + return t.tc.fu +} + +// FutexChecker returns a futex.Checker that interprets addresses in t's +// address space. +// +// Preconditions: All uses of the returned futex.Checker must be on the task +// goroutine. +func (t *Task) FutexChecker() futex.Checker { + return futexChecker{t} +} + +type futexChecker struct { + t *Task +} + +// Check implements futex.Checker.Check. +func (f futexChecker) Check(addr uintptr, val uint32) error { + // FIXME + in := f.t.CopyScratchBuffer(4) + _, err := f.t.CopyInBytes(usermem.Addr(addr), in) + if err != nil { + return err + } + nval := usermem.ByteOrder.Uint32(in) + if val != nval { + return syserror.EAGAIN + } + return nil +} + +func (f futexChecker) atomicOp(addr uintptr, op func(uint32) uint32) (uint32, error) { + // FIXME + in := f.t.CopyScratchBuffer(4) + _, err := f.t.CopyInBytes(usermem.Addr(addr), in) + if err != nil { + return 0, err + } + o := usermem.ByteOrder.Uint32(in) + mm := f.t.MemoryManager() + for { + n := op(o) + r, err := mm.CompareAndSwapUint32(f.t, usermem.Addr(addr), o, n, usermem.IOOpts{ + AddressSpaceActive: true, + }) + if err != nil { + return 0, err + } + + if r == o { + return o, nil + } + o = r + } +} + +// Op implements futex.Checker.Op, interpreting opIn consistently with Linux. +func (f futexChecker) Op(addr uintptr, opIn uint32) (bool, error) { + op := (opIn >> 28) & 0xf + cmp := (opIn >> 24) & 0xf + opArg := (opIn >> 12) & 0xfff + cmpArg := opIn & 0xfff + + if op&linux.FUTEX_OP_OPARG_SHIFT != 0 { + opArg = 1 << opArg + op &^= linux.FUTEX_OP_OPARG_SHIFT // clear flag + } + + var oldVal uint32 + var err error + switch op { + case linux.FUTEX_OP_SET: + oldVal, err = f.t.MemoryManager().SwapUint32(f.t, usermem.Addr(addr), opArg, usermem.IOOpts{ + AddressSpaceActive: true, + }) + case linux.FUTEX_OP_ADD: + oldVal, err = f.atomicOp(addr, func(a uint32) uint32 { + return a + opArg + }) + case linux.FUTEX_OP_OR: + oldVal, err = f.atomicOp(addr, func(a uint32) uint32 { + return a | opArg + }) + case linux.FUTEX_OP_ANDN: + oldVal, err = f.atomicOp(addr, func(a uint32) uint32 { + return a &^ opArg + }) + case linux.FUTEX_OP_XOR: + oldVal, err = f.atomicOp(addr, func(a uint32) uint32 { + return a ^ opArg + }) + default: + return false, syserror.ENOSYS + } + if err != nil { + return false, err + } + + switch cmp { + case linux.FUTEX_OP_CMP_EQ: + return oldVal == cmpArg, nil + case linux.FUTEX_OP_CMP_NE: + return oldVal != cmpArg, nil + case linux.FUTEX_OP_CMP_LT: + return oldVal < cmpArg, nil + case linux.FUTEX_OP_CMP_LE: + return oldVal <= cmpArg, nil + case linux.FUTEX_OP_CMP_GT: + return oldVal > cmpArg, nil + case linux.FUTEX_OP_CMP_GE: + return oldVal >= cmpArg, nil + default: + return false, syserror.ENOSYS + } +} + +// GetSharedKey implements futex.Checker.GetSharedKey. +func (f futexChecker) GetSharedKey(addr uintptr) (futex.Key, error) { + return f.t.MemoryManager().GetSharedFutexKey(f.t, usermem.Addr(addr)) +} |