summaryrefslogtreecommitdiffhomepage
path: root/pkg/amutex/amutex.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/amutex/amutex.go')
-rw-r--r--pkg/amutex/amutex.go114
1 files changed, 114 insertions, 0 deletions
diff --git a/pkg/amutex/amutex.go b/pkg/amutex/amutex.go
new file mode 100644
index 000000000..1cb73359a
--- /dev/null
+++ b/pkg/amutex/amutex.go
@@ -0,0 +1,114 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package amutex provides the implementation of an abortable mutex. It allows
+// the Lock() function to be canceled while it waits to acquire the mutex.
+package amutex
+
+import (
+ "sync/atomic"
+)
+
+// Sleeper must be implemented by users of the abortable mutex to allow for
+// cancelation of waits.
+type Sleeper interface {
+ // SleepStart is called by the AbortableMutex.Lock() function when the
+ // mutex is contended and the goroutine is about to sleep.
+ //
+ // A channel can be returned that causes the sleep to be canceled if
+ // it's readable. If no cancelation is desired, nil can be returned.
+ SleepStart() <-chan struct{}
+
+ // SleepFinish is called by AbortableMutex.Lock() once a contended mutex
+ // is acquired or the wait is aborted.
+ SleepFinish(success bool)
+}
+
+// NoopSleeper is a stateless no-op implementation of Sleeper for anonymous
+// embedding in other types that do not support cancelation.
+type NoopSleeper struct{}
+
+// SleepStart implements Sleeper.SleepStart.
+func (NoopSleeper) SleepStart() <-chan struct{} {
+ return nil
+}
+
+// SleepFinish implements Sleeper.SleepFinish.
+func (NoopSleeper) SleepFinish(success bool) {}
+
+// AbortableMutex is an abortable mutex. It allows Lock() to be aborted while it
+// waits to acquire the mutex.
+type AbortableMutex struct {
+ v int32
+ ch chan struct{}
+}
+
+// Init initializes the abortable mutex.
+func (m *AbortableMutex) Init() {
+ m.v = 1
+ m.ch = make(chan struct{}, 1)
+}
+
+// Lock attempts to acquire the mutex, returning true on success. If something
+// is written to the "c" while Lock waits, the wait is aborted and false is
+// returned instead.
+func (m *AbortableMutex) Lock(s Sleeper) bool {
+ // Uncontended case.
+ if atomic.AddInt32(&m.v, -1) == 0 {
+ return true
+ }
+
+ var c <-chan struct{}
+ if s != nil {
+ c = s.SleepStart()
+ }
+
+ for {
+ // Try to acquire the mutex again, at the same time making sure
+ // that m.v is negative, which indicates to the owner of the
+ // lock that it is contended, which ill force it to try to wake
+ // someone up when it releases the mutex.
+ if v := atomic.LoadInt32(&m.v); v >= 0 && atomic.SwapInt32(&m.v, -1) == 1 {
+ if s != nil {
+ s.SleepFinish(true)
+ }
+ return true
+ }
+
+ // Wait for the owner to wake us up before trying again, or for
+ // the wait to be aborted by the provided channel.
+ select {
+ case <-m.ch:
+ case <-c:
+ // s must be non-nil, otherwise c would be nil and we'd
+ // never reach this path.
+ s.SleepFinish(false)
+ return false
+ }
+ }
+}
+
+// Unlock releases the mutex.
+func (m *AbortableMutex) Unlock() {
+ if atomic.SwapInt32(&m.v, 1) == 0 {
+ // There were no pending waiters.
+ return
+ }
+
+ // Wake some waiter up.
+ select {
+ case m.ch <- struct{}{}:
+ default:
+ }
+}