summaryrefslogtreecommitdiffhomepage
path: root/pkg/flipcall
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/flipcall')
-rw-r--r--pkg/flipcall/BUILD4
-rw-r--r--pkg/flipcall/ctrl_futex.go3
-rw-r--r--pkg/flipcall/flipcall.go20
-rw-r--r--pkg/flipcall/flipcall_test.go19
-rw-r--r--pkg/flipcall/flipcall_unsafe.go18
-rw-r--r--pkg/flipcall/futex_linux.go6
6 files changed, 60 insertions, 10 deletions
diff --git a/pkg/flipcall/BUILD b/pkg/flipcall/BUILD
index bd1d614b6..e590a71ba 100644
--- a/pkg/flipcall/BUILD
+++ b/pkg/flipcall/BUILD
@@ -1,4 +1,5 @@
-load("//tools/go_stateify:defs.bzl", "go_library", "go_test")
+load("//tools/go_stateify:defs.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_test")
package(licenses = ["notice"])
@@ -18,6 +19,7 @@ go_library(
"//pkg/abi/linux",
"//pkg/log",
"//pkg/memutil",
+ "//pkg/syncutil",
],
)
diff --git a/pkg/flipcall/ctrl_futex.go b/pkg/flipcall/ctrl_futex.go
index d59159912..e7c3a3a0b 100644
--- a/pkg/flipcall/ctrl_futex.go
+++ b/pkg/flipcall/ctrl_futex.go
@@ -82,6 +82,7 @@ func (ep *Endpoint) ctrlWaitFirst() error {
*ep.dataLen() = w.Len()
// Return control to the client.
+ raceBecomeInactive()
if err := ep.futexSwitchToPeer(); err != nil {
return err
}
@@ -112,7 +113,7 @@ func (ep *Endpoint) enterFutexWait() error {
return nil
case epsBlocked | epsShutdown:
atomic.AddInt32(&ep.ctrl.state, -epsBlocked)
- return shutdownError{}
+ return ShutdownError{}
default:
// Most likely due to ep.enterFutexWait() being called concurrently
// from multiple goroutines.
diff --git a/pkg/flipcall/flipcall.go b/pkg/flipcall/flipcall.go
index 991018684..3cdb576e1 100644
--- a/pkg/flipcall/flipcall.go
+++ b/pkg/flipcall/flipcall.go
@@ -136,8 +136,8 @@ func (ep *Endpoint) unmapPacket() {
// Shutdown causes concurrent and future calls to ep.Connect(), ep.SendRecv(),
// ep.RecvFirst(), and ep.SendLast(), as well as the same calls in the peer
-// Endpoint, to unblock and return errors. It does not wait for concurrent
-// calls to return. Successive calls to Shutdown have no effect.
+// Endpoint, to unblock and return ShutdownErrors. It does not wait for
+// concurrent calls to return. Successive calls to Shutdown have no effect.
//
// Shutdown is the only Endpoint method that may be called concurrently with
// other methods on the same Endpoint.
@@ -154,10 +154,12 @@ func (ep *Endpoint) isShutdownLocally() bool {
return atomic.LoadUint32(&ep.shutdown) != 0
}
-type shutdownError struct{}
+// ShutdownError is returned by most Endpoint methods after Endpoint.Shutdown()
+// has been called.
+type ShutdownError struct{}
// Error implements error.Error.
-func (shutdownError) Error() string {
+func (ShutdownError) Error() string {
return "flipcall connection shutdown"
}
@@ -180,7 +182,11 @@ const (
// Preconditions: ep is a client Endpoint. ep.Connect(), ep.RecvFirst(),
// ep.SendRecv(), and ep.SendLast() have never been called.
func (ep *Endpoint) Connect() error {
- return ep.ctrlConnect()
+ err := ep.ctrlConnect()
+ if err == nil {
+ raceBecomeActive()
+ }
+ return err
}
// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then
@@ -192,6 +198,7 @@ func (ep *Endpoint) RecvFirst() (uint32, error) {
if err := ep.ctrlWaitFirst(); err != nil {
return 0, err
}
+ raceBecomeActive()
recvDataLen := atomic.LoadUint32(ep.dataLen())
if recvDataLen > ep.dataCap {
return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap)
@@ -218,9 +225,11 @@ func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
// after ep.ctrlRoundTrip(), so if the peer is mutating it concurrently then
// they can only shoot themselves in the foot.
*ep.dataLen() = dataLen
+ raceBecomeInactive()
if err := ep.ctrlRoundTrip(); err != nil {
return 0, err
}
+ raceBecomeActive()
recvDataLen := atomic.LoadUint32(ep.dataLen())
if recvDataLen > ep.dataCap {
return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap)
@@ -240,6 +249,7 @@ func (ep *Endpoint) SendLast(dataLen uint32) error {
panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap))
}
*ep.dataLen() = dataLen
+ raceBecomeInactive()
if err := ep.ctrlWakeLast(); err != nil {
return err
}
diff --git a/pkg/flipcall/flipcall_test.go b/pkg/flipcall/flipcall_test.go
index 435e4eeae..168a487ec 100644
--- a/pkg/flipcall/flipcall_test.go
+++ b/pkg/flipcall/flipcall_test.go
@@ -62,6 +62,9 @@ func (c *testConnection) destroy() {
}
func testSendRecv(t *testing.T, c *testConnection) {
+ // This shared variable is used to confirm that synchronization between
+ // flipcall endpoints is visible to the Go race detector.
+ state := 0
var serverRun sync.WaitGroup
serverRun.Add(1)
go func() {
@@ -71,11 +74,19 @@ func testSendRecv(t *testing.T, c *testConnection) {
t.Errorf("server Endpoint.RecvFirst() failed: %v", err)
return
}
+ state++
+ if state != 2 {
+ t.Errorf("shared state counter: got %d, wanted 2", state)
+ }
t.Logf("server Endpoint got packet 1, sending packet 2 and waiting for packet 3")
if _, err := c.serverEP.SendRecv(0); err != nil {
t.Errorf("server Endpoint.SendRecv() failed: %v", err)
return
}
+ state++
+ if state != 4 {
+ t.Errorf("shared state counter: got %d, wanted 4", state)
+ }
t.Logf("server Endpoint got packet 3")
}()
defer func() {
@@ -89,10 +100,18 @@ func testSendRecv(t *testing.T, c *testConnection) {
if err := c.clientEP.Connect(); err != nil {
t.Fatalf("client Endpoint.Connect() failed: %v", err)
}
+ state++
+ if state != 1 {
+ t.Errorf("shared state counter: got %d, wanted 1", state)
+ }
t.Logf("client Endpoint sending packet 1 and waiting for packet 2")
if _, err := c.clientEP.SendRecv(0); err != nil {
t.Fatalf("client Endpoint.SendRecv() failed: %v", err)
}
+ state++
+ if state != 3 {
+ t.Errorf("shared state counter: got %d, wanted 3", state)
+ }
t.Logf("client Endpoint got packet 2, sending packet 3")
if err := c.clientEP.SendLast(0); err != nil {
t.Fatalf("client Endpoint.SendLast() failed: %v", err)
diff --git a/pkg/flipcall/flipcall_unsafe.go b/pkg/flipcall/flipcall_unsafe.go
index 73e6eef29..27b8939fc 100644
--- a/pkg/flipcall/flipcall_unsafe.go
+++ b/pkg/flipcall/flipcall_unsafe.go
@@ -17,6 +17,8 @@ package flipcall
import (
"reflect"
"unsafe"
+
+ "gvisor.dev/gvisor/pkg/syncutil"
)
// Packets consist of a 16-byte header followed by an arbitrarily-sized
@@ -67,3 +69,19 @@ func (ep *Endpoint) Data() []byte {
bsReflect.Cap = int(ep.dataCap)
return bs
}
+
+// ioSync is a dummy variable used to indicate synchronization to the Go race
+// detector. Compare syscall.ioSync.
+var ioSync int64
+
+func raceBecomeActive() {
+ if syncutil.RaceEnabled {
+ syncutil.RaceAcquire((unsafe.Pointer)(&ioSync))
+ }
+}
+
+func raceBecomeInactive() {
+ if syncutil.RaceEnabled {
+ syncutil.RaceReleaseMerge((unsafe.Pointer)(&ioSync))
+ }
+}
diff --git a/pkg/flipcall/futex_linux.go b/pkg/flipcall/futex_linux.go
index b127a2bbb..168c1ccff 100644
--- a/pkg/flipcall/futex_linux.go
+++ b/pkg/flipcall/futex_linux.go
@@ -61,7 +61,7 @@ func (ep *Endpoint) futexSwitchToPeer() error {
if !atomic.CompareAndSwapUint32(ep.connState(), ep.activeState, ep.inactiveState) {
switch cs := atomic.LoadUint32(ep.connState()); cs {
case csShutdown:
- return shutdownError{}
+ return ShutdownError{}
default:
return fmt.Errorf("unexpected connection state before FUTEX_WAKE: %v", cs)
}
@@ -81,14 +81,14 @@ func (ep *Endpoint) futexSwitchFromPeer() error {
return nil
case ep.inactiveState:
if ep.isShutdownLocally() {
- return shutdownError{}
+ return ShutdownError{}
}
if err := ep.futexWaitConnState(ep.inactiveState); err != nil {
return fmt.Errorf("failed to FUTEX_WAIT for peer Endpoint: %v", err)
}
continue
case csShutdown:
- return shutdownError{}
+ return ShutdownError{}
default:
return fmt.Errorf("unexpected connection state before FUTEX_WAIT: %v", cs)
}