diff options
Diffstat (limited to 'pkg/sentry/kernel/futex/futex.go')
-rw-r--r-- | pkg/sentry/kernel/futex/futex.go | 405 |
1 files changed, 405 insertions, 0 deletions
diff --git a/pkg/sentry/kernel/futex/futex.go b/pkg/sentry/kernel/futex/futex.go new file mode 100644 index 000000000..b3ba57a2c --- /dev/null +++ b/pkg/sentry/kernel/futex/futex.go @@ -0,0 +1,405 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package futex provides an implementation of the futex interface as found in +// the Linux kernel. It allows one to easily transform Wait() calls into waits +// on a channel, which is useful in a Go-based kernel, for example. +package futex + +import ( + "sync" + "sync/atomic" + + "gvisor.googlesource.com/gvisor/pkg/syserror" +) + +// Checker abstracts memory accesses. This is useful because the "addresses" +// used in this package may not be real addresses (they could be indices of an +// array, for example), or they could be mapped via some special mechanism. +// +// TODO: Replace this with usermem.IO. +type Checker interface { + // Check should validate that given address contains the given value. + // If it does not contain the value, syserror.EAGAIN must be returned. + // Any other error may be returned, which will be propagated. + Check(addr uintptr, val uint32) error + + // Op should atomically perform the operation encoded in op on the data + // pointed to by addr, then apply the comparison encoded in op to the + // original value at addr, returning the result. + // Note that op is an opaque operation whose behaviour is defined + // outside of the futex manager. + Op(addr uintptr, op uint32) (bool, error) +} + +// Waiter is the struct which gets enqueued into buckets for wake up routines +// and requeue routines to scan and notify. Once a Waiter has been enqueued by +// WaitPrepare(), callers may listen on C for wake up events. +type Waiter struct { + // Synchronization: + // + // - A Waiter that is not enqueued in a bucket is exclusively owned (no + // synchronization applies). + // + // - A Waiter is enqueued in a bucket by calling WaitPrepare(). After this, + // waiterEntry, complete, and addr are protected by the bucket.mu ("bucket + // lock") of the containing bucket, and bitmask is immutable. complete and + // addr are additionally mutated using atomic memory operations, ensuring + // that they can be read using atomic memory operations without holding the + // bucket lock. + // + // - A Waiter is only guaranteed to be no longer queued after calling + // WaitComplete(). + + // waiterEntry links Waiter into bucket.waiters. + waiterEntry + + // complete is 1 if the Waiter was removed from its bucket by a wakeup and + // 0 otherwise. + complete int32 + + // C is sent to when the Waiter is woken. + C chan struct{} + + // addr is the address being waited on. + addr uintptr + + // The bitmask we're waiting on. + // This is used the case of a FUTEX_WAKE_BITSET. + bitmask uint32 +} + +// NewWaiter returns a new unqueued Waiter. +func NewWaiter() *Waiter { + return &Waiter{ + C: make(chan struct{}, 1), + } +} + +// bucket holds a list of waiters for a given address hash. +type bucket struct { + // mu protects waiters and contained Waiter state. See comment in Waiter. + mu sync.Mutex `state:"nosave"` + + waiters waiterList `state:"zerovalue"` +} + +// wakeLocked wakes up to n waiters matching the bitmask at the addr for this +// bucket and returns the number of waiters woken. +// +// Preconditions: b.mu must be locked. +func (b *bucket) wakeLocked(addr uintptr, bitmask uint32, n int) int { + done := 0 + for w := b.waiters.Front(); done < n && w != nil; { + if w.addr != addr || w.bitmask&bitmask == 0 { + // Not matching. + w = w.Next() + continue + } + + // Remove from the bucket and wake the waiter. + woke := w + w = w.Next() // Next iteration. + b.waiters.Remove(woke) + woke.C <- struct{}{} + + // NOTE: The above channel write establishes a write barrier + // according to the memory model, so nothing may be ordered + // around it. Since we've dequeued w and will never touch it + // again, we can safely store 1 to w.complete here and allow + // the WaitComplete() to short-circuit grabbing the bucket + // lock. If they somehow miss the w.complete, we are still + // holding the lock, so we can know that they won't dequeue w, + // assume it's free and have the below operation afterwards. + atomic.StoreInt32(&woke.complete, 1) + done++ + } + return done +} + +// requeueLocked takes n waiters from the bucket and moves them to naddr on the +// bucket "to". +// +// Preconditions: b and to must be locked. +func (b *bucket) requeueLocked(to *bucket, addr, naddr uintptr, n int) int { + done := 0 + for w := b.waiters.Front(); done < n && w != nil; { + if w.addr != addr { + // Not matching. + w = w.Next() + continue + } + + requeued := w + w = w.Next() // Next iteration. + b.waiters.Remove(requeued) + atomic.StoreUintptr(&requeued.addr, naddr) + to.waiters.PushBack(requeued) + done++ + } + return done +} + +const ( + // bucketCount is the number of buckets per Manager. By having many of + // these we reduce contention when concurrent yet unrelated calls are made. + bucketCount = 1 << bucketCountBits + bucketCountBits = 10 +) + +func checkAddr(addr uintptr) error { + // Ensure the address is aligned. + // It must be a DWORD boundary. + if addr&0x3 != 0 { + return syserror.EINVAL + } + + return nil +} + +// bucketIndexForAddr returns the index into Manager.buckets for addr. +func bucketIndexForAddr(addr uintptr) uintptr { + // - The bottom 2 bits of addr must be 0, per checkAddr. + // + // - On amd64, the top 16 bits of addr (bits 48-63) must be equal to bit 47 + // for a canonical address, and (on all existing platforms) bit 47 must be + // 0 for an application address. + // + // Thus 19 bits of addr are "useless" for hashing, leaving only 45 "useful" + // bits. We choose one of the simplest possible hash functions that at + // least uses all 45 useful bits in the output, given that bucketCountBits + // == 10. This hash function also has the property that it will usually map + // adjacent addresses to adjacent buckets, slightly improving memory + // locality when an application synchronization structure uses multiple + // nearby futexes. + // + // Note that despite the large number of arithmetic operations in the + // function, many components can be computed in parallel, such that the + // critical path is 1 bit shift + 3 additions (2 in h1, then h1 + h2). This + // is also why h1 and h2 are grouped separately; for "(addr >> 2) + ... + + // (addr >> 42)" without any additional grouping, the compiler puts all 4 + // additions in the critical path. + h1 := (addr >> 2) + (addr >> 12) + (addr >> 22) + h2 := (addr >> 32) + (addr >> 42) + return (h1 + h2) % bucketCount +} + +// Manager holds futex state for a single virtual address space. +type Manager struct { + buckets [bucketCount]bucket +} + +// NewManager returns an initialized futex manager. +// N.B. we use virtual address to tag futexes, so it only works for private +// (within a single process) futex. +func NewManager() *Manager { + return &Manager{} +} + +// lockBucket returns a locked bucket for the given addr. +// +// Preconditions: checkAddr(addr) == nil. +func (m *Manager) lockBucket(addr uintptr) *bucket { + b := &m.buckets[bucketIndexForAddr(addr)] + b.mu.Lock() + return b +} + +// lockBuckets returns locked buckets for the given addrs. +// +// Preconditions: checkAddr(addr1) == checkAddr(addr2) == nil. +func (m *Manager) lockBuckets(addr1 uintptr, addr2 uintptr) (*bucket, *bucket) { + i1 := bucketIndexForAddr(addr1) + i2 := bucketIndexForAddr(addr2) + b1 := &m.buckets[i1] + b2 := &m.buckets[i2] + + // Ensure that buckets are locked in a consistent order (lowest index + // first) to avoid circular locking. + switch { + case i1 < i2: + b1.mu.Lock() + b2.mu.Lock() + case i2 < i1: + b2.mu.Lock() + b1.mu.Lock() + default: + b1.mu.Lock() + } + + return b1, b2 +} + +// Wake wakes up to n waiters matching the bitmask on the given addr. +// The number of waiters woken is returned. +func (m *Manager) Wake(addr uintptr, bitmask uint32, n int) (int, error) { + if err := checkAddr(addr); err != nil { + return 0, err + } + + b := m.lockBucket(addr) + // This function is very hot; avoid defer. + r := b.wakeLocked(addr, bitmask, n) + b.mu.Unlock() + return r, nil +} + +func (m *Manager) doRequeue(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) { + if err := checkAddr(addr); err != nil { + return 0, err + } + if err := checkAddr(naddr); err != nil { + return 0, err + } + + b1, b2 := m.lockBuckets(addr, naddr) + defer b1.mu.Unlock() + if b2 != b1 { + defer b2.mu.Unlock() + } + + // Check our value. + // This only applied for RequeueCmp(). + if c != nil { + if err := c.Check(addr, val); err != nil { + return 0, err + } + } + + // Wake the number required. + done := b1.wakeLocked(addr, ^uint32(0), nwake) + + // Requeue the number required. + b1.requeueLocked(b2, addr, naddr, nreq) + + return done, nil +} + +// Requeue wakes up to nwake waiters on the given addr, and unconditionally +// requeues up to nreq waiters on naddr. +func (m *Manager) Requeue(addr uintptr, naddr uintptr, nwake int, nreq int) (int, error) { + return m.doRequeue(nil, addr, 0, naddr, nwake, nreq) +} + +// RequeueCmp atomically checks that the addr contains val (via the Checker), +// wakes up to nwake waiters on addr and then unconditionally requeues nreq +// waiters on naddr. +func (m *Manager) RequeueCmp(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) { + return m.doRequeue(c, addr, val, naddr, nwake, nreq) +} + +// WakeOp atomically applies op to the memory address addr2, wakes up to nwake1 +// waiters unconditionally from addr1, and, based on the original value at addr2 +// and a comparison encoded in op, wakes up to nwake2 waiters from addr2. +// It returns the total number of waiters woken. +func (m *Manager) WakeOp(c Checker, addr1 uintptr, addr2 uintptr, nwake1 int, nwake2 int, op uint32) (int, error) { + if err := checkAddr(addr1); err != nil { + return 0, err + } + if err := checkAddr(addr2); err != nil { + return 0, err + } + + b1, b2 := m.lockBuckets(addr1, addr2) + + done := 0 + cond, err := c.Op(addr2, op) + if err == nil { + // Wake up up to nwake1 entries from the first bucket. + done = b1.wakeLocked(addr1, ^uint32(0), nwake1) + + // Wake up up to nwake2 entries from the second bucket if the + // operation yielded true. + if cond { + done += b2.wakeLocked(addr2, ^uint32(0), nwake2) + } + } + + b1.mu.Unlock() + if b2 != b1 { + b2.mu.Unlock() + } + return done, err +} + +// WaitPrepare atomically checks that addr contains val (via the Checker), then +// enqueues w to be woken by a send to w.C. If WaitPrepare returns nil, the +// Waiter must be subsequently removed by calling WaitComplete, whether or not +// a wakeup is received on w.C. +func (m *Manager) WaitPrepare(w *Waiter, c Checker, addr uintptr, val uint32, bitmask uint32) error { + if err := checkAddr(addr); err != nil { + return err + } + + // Prepare the Waiter before taking the bucket lock. + w.complete = 0 + select { + case <-w.C: + default: + } + w.addr = addr + w.bitmask = bitmask + + b := m.lockBucket(addr) + // This function is very hot; avoid defer. + + // Perform our atomic check. + if err := c.Check(addr, val); err != nil { + b.mu.Unlock() + return err + } + + // Add the waiter to the bucket. + b.waiters.PushBack(w) + + b.mu.Unlock() + return nil +} + +// WaitComplete must be called when a Waiter previously added by WaitPrepare is +// no longer eligible to be woken. +func (m *Manager) WaitComplete(w *Waiter) { + // Can we short-circuit acquiring the lock? + // This is the happy path where a notification + // was received and we don't need to dequeue this + // waiter from any list (or take any locks). + if atomic.LoadInt32(&w.complete) != 0 { + return + } + + // Take the bucket lock. Note that without holding the bucket lock, the + // waiter is not guaranteed to stay in that bucket, so after we take the + // bucket lock, we must ensure that the bucket hasn't changed: if it + // happens to have changed, we release the old bucket lock and try again + // with the new bucket; if it hasn't changed, we know it won't change now + // because we hold the lock. + var b *bucket + for { + addr := atomic.LoadUintptr(&w.addr) + b = m.lockBucket(addr) + // We still have to use an atomic load here, because if w was racily + // requeued then w.addr is not protected by b.mu. + if addr == atomic.LoadUintptr(&w.addr) { + break + } + b.mu.Unlock() + } + + // Remove waiter from the bucket. w.complete can only be stored with b.mu + // locked, so this load doesn't need to use sync/atomic. + if w.complete == 0 { + b.waiters.Remove(w) + } + b.mu.Unlock() +} |