diff options
author | kevin.xu <cming.xu@gmail.com> | 2020-04-27 21:51:31 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-27 21:51:31 +0800 |
commit | e896ca54db67524afc20b644d43c72185e72dc0e (patch) | |
tree | 2a16f3a62a5cafd098f1f028c621f1b655589d69 /pkg/syncevent/waiter_unsafe.go | |
parent | 1f19624fa127d7d59cabe29593cc80b7fe6c81f8 (diff) | |
parent | 3c67754663f424f2ebbc0ff2a4c80e30618d5355 (diff) |
Merge pull request #1 from google/master
catch up
Diffstat (limited to 'pkg/syncevent/waiter_unsafe.go')
-rw-r--r-- | pkg/syncevent/waiter_unsafe.go | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/pkg/syncevent/waiter_unsafe.go b/pkg/syncevent/waiter_unsafe.go new file mode 100644 index 000000000..112e0e604 --- /dev/null +++ b/pkg/syncevent/waiter_unsafe.go @@ -0,0 +1,206 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build go1.11 +// +build !go1.15 + +// Check go:linkname function signatures when updating Go version. + +package syncevent + +import ( + "sync/atomic" + "unsafe" + + "gvisor.dev/gvisor/pkg/sync" +) + +//go:linkname gopark runtime.gopark +func gopark(unlockf func(unsafe.Pointer, *unsafe.Pointer) bool, wg *unsafe.Pointer, reason uint8, traceEv byte, traceskip int) + +//go:linkname goready runtime.goready +func goready(g unsafe.Pointer, traceskip int) + +const ( + waitReasonSelect = 9 // Go: src/runtime/runtime2.go + traceEvGoBlockSelect = 24 // Go: src/runtime/trace.go +) + +// Waiter allows a goroutine to block on pending events received by a Receiver. +// +// Waiter.Init() must be called before first use. +type Waiter struct { + r Receiver + + // g is one of: + // + // - nil: No goroutine is blocking in Wait. + // + // - &preparingG: A goroutine is in Wait preparing to sleep, but hasn't yet + // completed waiterUnlock(). Thus the wait can only be interrupted by + // replacing the value of g with nil (the G may not be in state Gwaiting + // yet, so we can't call goready.) + // + // - Otherwise: g is a pointer to the runtime.g in state Gwaiting for the + // goroutine blocked in Wait, which can only be woken by calling goready. + g unsafe.Pointer `state:"zerovalue"` +} + +// Sentinel object for Waiter.g. +var preparingG struct{} + +// Init must be called before first use of w. +func (w *Waiter) Init() { + w.r.Init(w) +} + +// Receiver returns the Receiver that receives events that unblock calls to +// w.Wait(). +func (w *Waiter) Receiver() *Receiver { + return &w.r +} + +// Pending returns the set of pending events. +func (w *Waiter) Pending() Set { + return w.r.Pending() +} + +// Wait blocks until at least one event is pending, then returns the set of +// pending events. It does not affect the set of pending events; callers must +// call w.Ack() to do so, or use w.WaitAndAck() instead. +// +// Precondition: Only one goroutine may call any Wait* method at a time. +func (w *Waiter) Wait() Set { + return w.WaitFor(AllEvents) +} + +// WaitFor blocks until at least one event in es is pending, then returns the +// set of pending events (including those not in es). It does not affect the +// set of pending events; callers must call w.Ack() to do so. +// +// Precondition: Only one goroutine may call any Wait* method at a time. +func (w *Waiter) WaitFor(es Set) Set { + for { + // Optimization: Skip the atomic store to w.g if an event is already + // pending. + if p := w.r.Pending(); p&es != NoEvents { + return p + } + + // Indicate that we're preparing to go to sleep. + atomic.StorePointer(&w.g, (unsafe.Pointer)(&preparingG)) + + // If an event is pending, abort the sleep. + if p := w.r.Pending(); p&es != NoEvents { + atomic.StorePointer(&w.g, nil) + return p + } + + // If w.g is still preparingG (i.e. w.NotifyPending() has not been + // called or has not reached atomic.SwapPointer()), go to sleep until + // w.NotifyPending() => goready(). + gopark(waiterUnlock, &w.g, waitReasonSelect, traceEvGoBlockSelect, 0) + } +} + +// Ack marks the given events as not pending. +func (w *Waiter) Ack(es Set) { + w.r.Ack(es) +} + +// WaitAndAckAll blocks until at least one event is pending, then marks all +// events as not pending and returns the set of previously-pending events. +// +// Precondition: Only one goroutine may call any Wait* method at a time. +func (w *Waiter) WaitAndAckAll() Set { + // Optimization: Skip the atomic store to w.g if an event is already + // pending. Call Pending() first since, in the common case that events are + // not yet pending, this skips an atomic swap on w.r.pending. + if w.r.Pending() != NoEvents { + if p := w.r.PendingAndAckAll(); p != NoEvents { + return p + } + } + + for { + // Indicate that we're preparing to go to sleep. + atomic.StorePointer(&w.g, (unsafe.Pointer)(&preparingG)) + + // If an event is pending, abort the sleep. + if w.r.Pending() != NoEvents { + if p := w.r.PendingAndAckAll(); p != NoEvents { + atomic.StorePointer(&w.g, nil) + return p + } + } + + // If w.g is still preparingG (i.e. w.NotifyPending() has not been + // called or has not reached atomic.SwapPointer()), go to sleep until + // w.NotifyPending() => goready(). + gopark(waiterUnlock, &w.g, waitReasonSelect, traceEvGoBlockSelect, 0) + + // Check for pending events. We call PendingAndAckAll() directly now since + // we only expect to be woken after events become pending. + if p := w.r.PendingAndAckAll(); p != NoEvents { + return p + } + } +} + +// Notify marks the given events as pending, possibly unblocking concurrent +// calls to w.Wait() or w.WaitFor(). +func (w *Waiter) Notify(es Set) { + w.r.Notify(es) +} + +// NotifyPending implements ReceiverCallback.NotifyPending. Users of Waiter +// should not call NotifyPending. +func (w *Waiter) NotifyPending() { + // Optimization: Skip the atomic swap on w.g if there is no sleeping + // goroutine. NotifyPending is called after w.r.Pending() is updated, so + // concurrent and future calls to w.Wait() will observe pending events and + // abort sleeping. + if atomic.LoadPointer(&w.g) == nil { + return + } + // Wake a sleeping G, or prevent a G that is preparing to sleep from doing + // so. Swap is needed here to ensure that only one call to NotifyPending + // calls goready. + if g := atomic.SwapPointer(&w.g, nil); g != nil && g != (unsafe.Pointer)(&preparingG) { + goready(g, 0) + } +} + +var waiterPool = sync.Pool{ + New: func() interface{} { + w := &Waiter{} + w.Init() + return w + }, +} + +// GetWaiter returns an unused Waiter. PutWaiter should be called to release +// the Waiter once it is no longer needed. +// +// Where possible, users should prefer to associate each goroutine that calls +// Waiter.Wait() with a distinct pre-allocated Waiter to avoid allocation of +// Waiters in hot paths. +func GetWaiter() *Waiter { + return waiterPool.Get().(*Waiter) +} + +// PutWaiter releases an unused Waiter previously returned by GetWaiter. +func PutWaiter(w *Waiter) { + waiterPool.Put(w) +} |