diff options
-rw-r--r-- | pkg/gate/BUILD | 22 | ||||
-rw-r--r-- | pkg/gate/gate.go | 134 | ||||
-rw-r--r-- | pkg/gate/gate_test.go | 192 | ||||
-rw-r--r-- | pkg/sync/BUILD | 3 | ||||
-rw-r--r-- | pkg/sync/gate_test.go | 129 | ||||
-rw-r--r-- | pkg/sync/gate_unsafe.go | 150 | ||||
-rw-r--r-- | pkg/sync/runtime_unsafe.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/link/waitable/BUILD | 2 | ||||
-rw-r--r-- | pkg/tcpip/link/waitable/waitable.go | 6 | ||||
-rw-r--r-- | pkg/unet/BUILD | 2 | ||||
-rw-r--r-- | pkg/unet/unet.go | 4 |
11 files changed, 294 insertions, 356 deletions
diff --git a/pkg/gate/BUILD b/pkg/gate/BUILD deleted file mode 100644 index dd3141143..000000000 --- a/pkg/gate/BUILD +++ /dev/null @@ -1,22 +0,0 @@ -load("//tools:defs.bzl", "go_library", "go_test") - -package(licenses = ["notice"]) - -go_library( - name = "gate", - srcs = [ - "gate.go", - ], - visibility = ["//visibility:public"], -) - -go_test( - name = "gate_test", - srcs = [ - "gate_test.go", - ], - deps = [ - ":gate", - "//pkg/sync", - ], -) diff --git a/pkg/gate/gate.go b/pkg/gate/gate.go deleted file mode 100644 index bda6aae09..000000000 --- a/pkg/gate/gate.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package gate provides a usage Gate synchronization primitive. -package gate - -import ( - "sync/atomic" -) - -const ( - // gateClosed is the bit set in the gate's user count to indicate that - // it has been closed. It is the MSB of the 32-bit field; the other 31 - // bits carry the actual count. - gateClosed = 0x80000000 -) - -// Gate is a synchronization primitive that allows concurrent goroutines to -// "enter" it as long as it hasn't been closed yet. Once it's been closed, -// goroutines cannot enter it anymore, but are allowed to leave, and the closer -// will be informed when all goroutines have left. -// -// Many goroutines are allowed to enter the gate concurrently, but only one is -// allowed to close it. -// -// This is similar to a r/w critical section, except that goroutines "entering" -// never block: they either enter immediately or fail to enter. The closer will -// block waiting for all goroutines currently inside the gate to leave. -// -// This function is implemented efficiently. On x86, only one interlocked -// operation is performed on enter, and one on leave. -// -// This is useful, for example, in cases when a goroutine is trying to clean up -// an object for which multiple goroutines have pointers. In such a case, users -// would be required to enter and leave the gates, and the cleaner would wait -// until all users are gone (and no new ones are allowed) before proceeding. -// -// Users: -// -// if !g.Enter() { -// // Gate is closed, we can't use the object. -// return -// } -// -// // Do something with object. -// [...] -// -// g.Leave() -// -// Closer: -// -// // Prevent new users from using the object, and wait for the existing -// // ones to complete. -// g.Close() -// -// // Clean up the object. -// [...] -// -type Gate struct { - userCount uint32 - done chan struct{} -} - -// Enter tries to enter the gate. It will succeed if it hasn't been closed yet, -// in which case the caller must eventually call Leave(). -// -// This function is thread-safe. -func (g *Gate) Enter() bool { - if g == nil { - return false - } - - for { - v := atomic.LoadUint32(&g.userCount) - if v&gateClosed != 0 { - return false - } - - if atomic.CompareAndSwapUint32(&g.userCount, v, v+1) { - return true - } - } -} - -// Leave leaves the gate. This must only be called after a successful call to -// Enter(). If the gate has been closed and this is the last one inside the -// gate, it will notify the closer that the gate is done. -// -// This function is thread-safe. -func (g *Gate) Leave() { - for { - v := atomic.LoadUint32(&g.userCount) - if v&^gateClosed == 0 { - panic("leaving a gate with zero usage count") - } - - if atomic.CompareAndSwapUint32(&g.userCount, v, v-1) { - if v == gateClosed+1 { - close(g.done) - } - return - } - } -} - -// Close closes the gate for entering, and waits until all goroutines [that are -// currently inside the gate] leave before returning. -// -// Only one goroutine can call this function. -func (g *Gate) Close() { - for { - v := atomic.LoadUint32(&g.userCount) - if v&^gateClosed != 0 && g.done == nil { - g.done = make(chan struct{}) - } - if atomic.CompareAndSwapUint32(&g.userCount, v, v|gateClosed) { - if v&^gateClosed != 0 { - <-g.done - } - return - } - } -} diff --git a/pkg/gate/gate_test.go b/pkg/gate/gate_test.go deleted file mode 100644 index 316015e06..000000000 --- a/pkg/gate/gate_test.go +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package gate_test - -import ( - "runtime" - "testing" - "time" - - "gvisor.dev/gvisor/pkg/gate" - "gvisor.dev/gvisor/pkg/sync" -) - -func TestBasicEnter(t *testing.T) { - var g gate.Gate - - if !g.Enter() { - t.Fatalf("Failed to enter when it should be allowed") - } - - g.Leave() - - g.Close() - - if g.Enter() { - t.Fatalf("Allowed to enter when it should fail") - } -} - -func enterFunc(t *testing.T, g *gate.Gate, enter, leave, reenter chan struct{}, done1, done2, done3 *sync.WaitGroup) { - // Wait until instructed to enter. - <-enter - if !g.Enter() { - t.Errorf("Failed to enter when it should be allowed") - } - - done1.Done() - - // Wait until instructed to leave. - <-leave - g.Leave() - - done2.Done() - - // Wait until instructed to reenter. - <-reenter - if g.Enter() { - t.Errorf("Allowed to enter when it should fail") - } - done3.Done() -} - -func TestConcurrentEnter(t *testing.T) { - var g gate.Gate - var done1, done2, done3 sync.WaitGroup - - // Create 1000 worker goroutines. - enter := make(chan struct{}) - leave := make(chan struct{}) - reenter := make(chan struct{}) - done1.Add(1000) - done2.Add(1000) - done3.Add(1000) - for i := 0; i < 1000; i++ { - go enterFunc(t, &g, enter, leave, reenter, &done1, &done2, &done3) - } - - // Tell them all to enter, then leave. - close(enter) - done1.Wait() - - close(leave) - done2.Wait() - - // Close the gate, then have the workers try to enter again. - g.Close() - close(reenter) - done3.Wait() -} - -func closeFunc(g *gate.Gate, done chan struct{}) { - g.Close() - close(done) -} - -func TestCloseWaits(t *testing.T) { - var g gate.Gate - - // Enter 10 times. - for i := 0; i < 10; i++ { - if !g.Enter() { - t.Fatalf("Failed to enter when it should be allowed") - } - } - - // Launch closer. Check that it doesn't complete. - done := make(chan struct{}) - go closeFunc(&g, done) - - for i := 0; i < 10; i++ { - select { - case <-done: - t.Fatalf("Close function completed too soon") - case <-time.After(100 * time.Millisecond): - } - - g.Leave() - } - - // Now the closer must complete. - <-done -} - -func TestMultipleSerialCloses(t *testing.T) { - var g gate.Gate - - // Enter 10 times. - for i := 0; i < 10; i++ { - if !g.Enter() { - t.Fatalf("Failed to enter when it should be allowed") - } - } - - // Launch closer. Check that it doesn't complete. - done := make(chan struct{}) - go closeFunc(&g, done) - - for i := 0; i < 10; i++ { - select { - case <-done: - t.Fatalf("Close function completed too soon") - case <-time.After(100 * time.Millisecond): - } - - g.Leave() - } - - // Now the closer must complete. - <-done - - // Close again should not block. - done = make(chan struct{}) - go closeFunc(&g, done) - - select { - case <-done: - case <-time.After(2 * time.Second): - t.Fatalf("Second Close is blocking") - } -} - -func worker(g *gate.Gate, done *sync.WaitGroup) { - for { - if !g.Enter() { - break - } - // Golang before v1.14 doesn't preempt busyloops. - runtime.Gosched() - g.Leave() - } - done.Done() -} - -func TestConcurrentAll(t *testing.T) { - var g gate.Gate - var done sync.WaitGroup - - // Launch 1000 goroutines to concurrently enter/leave. - done.Add(1000) - for i := 0; i < 1000; i++ { - go worker(&g, &done) - } - - // Wait for the goroutines to do some work, then close the gate. - time.Sleep(2 * time.Second) - g.Close() - - // Wait for all of them to complete. - done.Wait() -} diff --git a/pkg/sync/BUILD b/pkg/sync/BUILD index 2e2395807..b2c5229e7 100644 --- a/pkg/sync/BUILD +++ b/pkg/sync/BUILD @@ -52,6 +52,7 @@ go_library( "aliases.go", "checklocks_off_unsafe.go", "checklocks_on_unsafe.go", + "gate_unsafe.go", "goyield_go113_unsafe.go", "goyield_unsafe.go", "mutex_unsafe.go", @@ -69,6 +70,7 @@ go_library( stateify = False, visibility = ["//:sandbox"], deps = [ + "//pkg/gohacks", "//pkg/goid", ], ) @@ -77,6 +79,7 @@ go_test( name = "sync_test", size = "small", srcs = [ + "gate_test.go", "mutex_test.go", "rwmutex_test.go", "seqcount_test.go", diff --git a/pkg/sync/gate_test.go b/pkg/sync/gate_test.go new file mode 100644 index 000000000..82ce02b97 --- /dev/null +++ b/pkg/sync/gate_test.go @@ -0,0 +1,129 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "context" + "runtime" + "sync/atomic" + "testing" + "time" +) + +func TestGateBasic(t *testing.T) { + var g Gate + + if !g.Enter() { + t.Fatalf("Enter failed before Close") + } + g.Leave() + + g.Close() + if g.Enter() { + t.Fatalf("Enter succeeded after Close") + } +} + +func TestGateConcurrent(t *testing.T) { + // Each call to testGateConcurrentOnce tests behavior around a single call + // to Gate.Close, so run many short tests to increase the probability of + // flushing out any issues. + totalTime := 5 * time.Second + timePerTest := 20 * time.Millisecond + numTests := int(totalTime / timePerTest) + for i := 0; i < numTests; i++ { + testGateConcurrentOnce(t, timePerTest) + } +} + +func testGateConcurrentOnce(t *testing.T, d time.Duration) { + const numGoroutines = 1000 + + ctx, cancel := context.WithCancel(context.Background()) + var wg WaitGroup + defer func() { + cancel() + wg.Wait() + }() + + var g Gate + closeState := int32(0) // set to 1 before g.Close() and 2 after it returns + + // Start a large number of goroutines that repeatedly attempt to enter the + // gate and get the expected result. + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for ctx.Err() == nil { + closedBeforeEnter := atomic.LoadInt32(&closeState) == 2 + if g.Enter() { + closedBeforeLeave := atomic.LoadInt32(&closeState) == 2 + g.Leave() + if closedBeforeEnter { + t.Errorf("Enter succeeded after Close") + return + } + if closedBeforeLeave { + t.Errorf("Close returned before Leave") + return + } + } else { + if atomic.LoadInt32(&closeState) == 0 { + t.Errorf("Enter failed before Close") + return + } + } + // Go does not preempt busy loops until Go 1.14. + runtime.Gosched() + } + }() + } + + // Allow goroutines to enter the gate successfully for half of the test's + // duration, then close the gate and allow goroutines to fail to enter the + // gate for the remaining half. + time.Sleep(d / 2) + atomic.StoreInt32(&closeState, 1) + g.Close() + atomic.StoreInt32(&closeState, 2) + time.Sleep(d / 2) +} + +func BenchmarkGateEnterLeave(b *testing.B) { + var g Gate + for i := 0; i < b.N; i++ { + g.Enter() + g.Leave() + } +} + +func BenchmarkGateClose(b *testing.B) { + for i := 0; i < b.N; i++ { + var g Gate + g.Close() + } +} + +func BenchmarkGateEnterLeaveAsyncClose(b *testing.B) { + for i := 0; i < b.N; i++ { + var g Gate + g.Enter() + go func() { + g.Leave() + }() + g.Close() + } +} diff --git a/pkg/sync/gate_unsafe.go b/pkg/sync/gate_unsafe.go new file mode 100644 index 000000000..ae32287ef --- /dev/null +++ b/pkg/sync/gate_unsafe.go @@ -0,0 +1,150 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "fmt" + "math" + "sync/atomic" + "unsafe" + + "gvisor.dev/gvisor/pkg/gohacks" +) + +// Gate is a synchronization primitive that allows concurrent goroutines to +// "enter" it as long as it hasn't been closed yet. Once it's been closed, +// goroutines cannot enter it anymore, but are allowed to leave, and the closer +// will be informed when all goroutines have left. +// +// Gate is similar to WaitGroup: +// +// - Gate.Enter() is analogous to WaitGroup.Add(1), but may be called even if +// the Gate counter is 0 and fails if Gate.Close() has been called. +// +// - Gate.Leave() is equivalent to WaitGroup.Done(). +// +// - Gate.Close() is analogous to WaitGroup.Wait(), but also causes future +// calls to Gate.Enter() to fail and may only be called once, from a single +// goroutine. +// +// This is useful, for example, in cases when a goroutine is trying to clean up +// an object for which multiple goroutines have pointers. In such a case, users +// would be required to enter and leave the Gate, and the cleaner would wait +// until all users are gone (and no new ones are allowed) before proceeding. +// +// Users: +// +// if !g.Enter() { +// // Gate is closed, we can't use the object. +// return +// } +// +// // Do something with object. +// [...] +// +// g.Leave() +// +// Closer: +// +// // Prevent new users from using the object, and wait for the existing +// // ones to complete. +// g.Close() +// +// // Clean up the object. +// [...] +// +type Gate struct { + userCount int32 + closingG uintptr +} + +const preparingG = 1 + +// Enter tries to enter the gate. It will succeed if it hasn't been closed yet, +// in which case the caller must eventually call Leave(). +// +// This function is thread-safe. +func (g *Gate) Enter() bool { + if atomic.AddInt32(&g.userCount, 1) > 0 { + return true + } + g.leaveAfterFailedEnter() + return false +} + +// leaveAfterFailedEnter is identical to Leave, but is marked noinline to +// prevent it from being inlined into Enter, since as of this writing inlining +// Leave into Enter prevents Enter from being inlined into its callers. +//go:noinline +func (g *Gate) leaveAfterFailedEnter() { + if atomic.AddInt32(&g.userCount, -1) == math.MinInt32 { + g.leaveClosed() + } +} + +// Leave leaves the gate. This must only be called after a successful call to +// Enter(). If the gate has been closed and this is the last one inside the +// gate, it will notify the closer that the gate is done. +// +// This function is thread-safe. +func (g *Gate) Leave() { + if atomic.AddInt32(&g.userCount, -1) == math.MinInt32 { + g.leaveClosed() + } +} + +func (g *Gate) leaveClosed() { + if atomic.LoadUintptr(&g.closingG) == 0 { + return + } + if g := atomic.SwapUintptr(&g.closingG, 0); g > preparingG { + goready(g, 0) + } +} + +// Close closes the gate, causing future calls to Enter to fail, and waits +// until all goroutines that are currently inside the gate leave before +// returning. +// +// Only one goroutine can call this function. +func (g *Gate) Close() { + if atomic.LoadInt32(&g.userCount) == math.MinInt32 { + // The gate is already closed, with no goroutines inside. For legacy + // reasons, we have to allow Close to be called again in this case. + return + } + if v := atomic.AddInt32(&g.userCount, math.MinInt32); v == math.MinInt32 { + // userCount was already 0. + return + } else if v >= 0 { + panic("concurrent Close of sync.Gate") + } + + if g := atomic.SwapUintptr(&g.closingG, preparingG); g != 0 { + panic(fmt.Sprintf("invalid sync.Gate.closingG during Close: %#x", g)) + } + if atomic.LoadInt32(&g.userCount) == math.MinInt32 { + // The last call to Leave arrived while we were setting up closingG. + return + } + // WaitReasonSemacquire/TraceEvGoBlockSync are consistent with WaitGroup. + gopark(gateCommit, gohacks.Noescape(unsafe.Pointer(&g.closingG)), WaitReasonSemacquire, TraceEvGoBlockSync, 0) +} + +//go:norace +//go:nosplit +func gateCommit(g uintptr, closingG unsafe.Pointer) bool { + return RaceUncheckedAtomicCompareAndSwapUintptr((*uintptr)(closingG), preparingG, g) +} diff --git a/pkg/sync/runtime_unsafe.go b/pkg/sync/runtime_unsafe.go index 119ff832a..158985709 100644 --- a/pkg/sync/runtime_unsafe.go +++ b/pkg/sync/runtime_unsafe.go @@ -56,12 +56,16 @@ func goready(gp uintptr, traceskip int) // Values for the reason argument to gopark, from Go's src/runtime/runtime2.go. const ( - WaitReasonSelect uint8 = 9 + WaitReasonSelect uint8 = 9 + WaitReasonChanReceive uint8 = 14 + WaitReasonSemacquire uint8 = 18 ) // Values for the traceEv argument to gopark, from Go's src/runtime/trace.go. const ( + TraceEvGoBlockRecv byte = 23 TraceEvGoBlockSelect byte = 24 + TraceEvGoBlockSync byte = 25 ) // Rand32 returns a non-cryptographically-secure random uint32. diff --git a/pkg/tcpip/link/waitable/BUILD b/pkg/tcpip/link/waitable/BUILD index 9b4602c1b..b8d417b7d 100644 --- a/pkg/tcpip/link/waitable/BUILD +++ b/pkg/tcpip/link/waitable/BUILD @@ -9,7 +9,7 @@ go_library( ], visibility = ["//visibility:public"], deps = [ - "//pkg/gate", + "//pkg/sync", "//pkg/tcpip", "//pkg/tcpip/header", "//pkg/tcpip/stack", diff --git a/pkg/tcpip/link/waitable/waitable.go b/pkg/tcpip/link/waitable/waitable.go index 20259b285..ce5113746 100644 --- a/pkg/tcpip/link/waitable/waitable.go +++ b/pkg/tcpip/link/waitable/waitable.go @@ -22,7 +22,7 @@ package waitable import ( - "gvisor.dev/gvisor/pkg/gate" + "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/stack" @@ -30,10 +30,10 @@ import ( // Endpoint is a waitable link-layer endpoint. type Endpoint struct { - dispatchGate gate.Gate + dispatchGate sync.Gate dispatcher stack.NetworkDispatcher - writeGate gate.Gate + writeGate sync.Gate lower stack.LinkEndpoint } diff --git a/pkg/unet/BUILD b/pkg/unet/BUILD index a86501fa2..155d99a0d 100644 --- a/pkg/unet/BUILD +++ b/pkg/unet/BUILD @@ -10,7 +10,7 @@ go_library( ], visibility = ["//visibility:public"], deps = [ - "//pkg/gate", + "//pkg/sync", "@org_golang_x_sys//unix:go_default_library", ], ) diff --git a/pkg/unet/unet.go b/pkg/unet/unet.go index c976d7230..22dd40f21 100644 --- a/pkg/unet/unet.go +++ b/pkg/unet/unet.go @@ -23,7 +23,7 @@ import ( "sync/atomic" "syscall" - "gvisor.dev/gvisor/pkg/gate" + "gvisor.dev/gvisor/pkg/sync" ) // backlog is used for the listen request. @@ -67,7 +67,7 @@ func eventFD() (int, error) { // Socket is a connected unix domain socket. type Socket struct { // gate protects use of fd. - gate gate.Gate + gate sync.Gate // fd is the bound socket. // |