diff options
Diffstat (limited to 'pkg/amutex')
-rw-r--r-- | pkg/amutex/BUILD | 17 | ||||
-rw-r--r-- | pkg/amutex/amutex.go | 114 | ||||
-rw-r--r-- | pkg/amutex/amutex_test.go | 93 |
3 files changed, 224 insertions, 0 deletions
diff --git a/pkg/amutex/BUILD b/pkg/amutex/BUILD new file mode 100644 index 000000000..442096319 --- /dev/null +++ b/pkg/amutex/BUILD @@ -0,0 +1,17 @@ +package(licenses = ["notice"]) # Apache 2.0 + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "amutex", + srcs = ["amutex.go"], + importpath = "gvisor.googlesource.com/gvisor/pkg/amutex", + visibility = ["//:sandbox"], +) + +go_test( + name = "amutex_test", + size = "small", + srcs = ["amutex_test.go"], + embed = [":amutex"], +) diff --git a/pkg/amutex/amutex.go b/pkg/amutex/amutex.go new file mode 100644 index 000000000..1cb73359a --- /dev/null +++ b/pkg/amutex/amutex.go @@ -0,0 +1,114 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package amutex provides the implementation of an abortable mutex. It allows +// the Lock() function to be canceled while it waits to acquire the mutex. +package amutex + +import ( + "sync/atomic" +) + +// Sleeper must be implemented by users of the abortable mutex to allow for +// cancelation of waits. +type Sleeper interface { + // SleepStart is called by the AbortableMutex.Lock() function when the + // mutex is contended and the goroutine is about to sleep. + // + // A channel can be returned that causes the sleep to be canceled if + // it's readable. If no cancelation is desired, nil can be returned. + SleepStart() <-chan struct{} + + // SleepFinish is called by AbortableMutex.Lock() once a contended mutex + // is acquired or the wait is aborted. + SleepFinish(success bool) +} + +// NoopSleeper is a stateless no-op implementation of Sleeper for anonymous +// embedding in other types that do not support cancelation. +type NoopSleeper struct{} + +// SleepStart implements Sleeper.SleepStart. +func (NoopSleeper) SleepStart() <-chan struct{} { + return nil +} + +// SleepFinish implements Sleeper.SleepFinish. +func (NoopSleeper) SleepFinish(success bool) {} + +// AbortableMutex is an abortable mutex. It allows Lock() to be aborted while it +// waits to acquire the mutex. +type AbortableMutex struct { + v int32 + ch chan struct{} +} + +// Init initializes the abortable mutex. +func (m *AbortableMutex) Init() { + m.v = 1 + m.ch = make(chan struct{}, 1) +} + +// Lock attempts to acquire the mutex, returning true on success. If something +// is written to the "c" while Lock waits, the wait is aborted and false is +// returned instead. +func (m *AbortableMutex) Lock(s Sleeper) bool { + // Uncontended case. + if atomic.AddInt32(&m.v, -1) == 0 { + return true + } + + var c <-chan struct{} + if s != nil { + c = s.SleepStart() + } + + for { + // Try to acquire the mutex again, at the same time making sure + // that m.v is negative, which indicates to the owner of the + // lock that it is contended, which ill force it to try to wake + // someone up when it releases the mutex. + if v := atomic.LoadInt32(&m.v); v >= 0 && atomic.SwapInt32(&m.v, -1) == 1 { + if s != nil { + s.SleepFinish(true) + } + return true + } + + // Wait for the owner to wake us up before trying again, or for + // the wait to be aborted by the provided channel. + select { + case <-m.ch: + case <-c: + // s must be non-nil, otherwise c would be nil and we'd + // never reach this path. + s.SleepFinish(false) + return false + } + } +} + +// Unlock releases the mutex. +func (m *AbortableMutex) Unlock() { + if atomic.SwapInt32(&m.v, 1) == 0 { + // There were no pending waiters. + return + } + + // Wake some waiter up. + select { + case m.ch <- struct{}{}: + default: + } +} diff --git a/pkg/amutex/amutex_test.go b/pkg/amutex/amutex_test.go new file mode 100644 index 000000000..876e47b19 --- /dev/null +++ b/pkg/amutex/amutex_test.go @@ -0,0 +1,93 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package amutex + +import ( + "sync" + "testing" + "time" +) + +type sleeper struct { + ch chan struct{} +} + +func (s *sleeper) SleepStart() <-chan struct{} { + return s.ch +} + +func (*sleeper) SleepFinish(bool) { +} + +func TestMutualExclusion(t *testing.T) { + var m AbortableMutex + m.Init() + + // Test mutual exclusion by running "gr" goroutines concurrently, and + // have each one increment a counter "iters" times within the critical + // section established by the mutex. + // + // If at the end of the counter is not gr * iters, then we know that + // goroutines ran concurrently within the critical section. + // + // If one of the goroutines doesn't complete, it's likely a bug that + // causes to to wait forever. + const gr = 1000 + const iters = 100000 + v := 0 + var wg sync.WaitGroup + for i := 0; i < gr; i++ { + wg.Add(1) + go func() { + for j := 0; j < iters; j++ { + m.Lock(nil) + v++ + m.Unlock() + } + wg.Done() + }() + } + + wg.Wait() + + if v != gr*iters { + t.Fatalf("Bad count: got %v, want %v", v, gr*iters) + } +} + +func TestAbortWait(t *testing.T) { + var s sleeper + var m AbortableMutex + m.Init() + + // Lock the mutex. + m.Lock(&s) + + // Lock again, but this time cancel after 500ms. + s.ch = make(chan struct{}, 1) + go func() { + time.Sleep(500 * time.Millisecond) + s.ch <- struct{}{} + }() + if v := m.Lock(&s); v { + t.Fatalf("Lock succeeded when it should have failed") + } + + // Lock again, but cancel right away. + s.ch <- struct{}{} + if v := m.Lock(&s); v { + t.Fatalf("Lock succeeded when it should have failed") + } +} |