summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJamie Liu <jamieliu@google.com>2019-06-21 14:45:57 -0700
committergVisor bot <gvisor-bot@google.com>2019-06-21 14:47:04 -0700
commite806466fc5a1331df1e643226297064e0eb31075 (patch)
treea6646ae458821908ff98a39e9f171d2cdceb699f
parent5ba16d51a950d55684c0348a9445784363467c9c (diff)
Add //pkg/flipcall.
Flipcall is a (conceptually) simple local-only RPC mechanism. Compared to unet, Flipcall does not support passing FDs (support for which will be provided out of band by another package), requires users to establish connections manually, and requires user management of concurrency since each connected Endpoint pair supports only a single RPC at a time; however, it improves performance by using shared memory for data (reducing memory copies) and using futexes for control signaling (which is much cheaper than sendto/recvfrom/sendmsg/recvmsg). PiperOrigin-RevId: 254471986
-rw-r--r--pkg/flipcall/BUILD31
-rw-r--r--pkg/flipcall/endpoint_futex.go45
-rw-r--r--pkg/flipcall/endpoint_unsafe.go238
-rw-r--r--pkg/flipcall/flipcall.go32
-rw-r--r--pkg/flipcall/flipcall_example_test.go106
-rw-r--r--pkg/flipcall/flipcall_test.go211
-rw-r--r--pkg/flipcall/futex_linux.go94
-rw-r--r--pkg/flipcall/packet_window_allocator.go166
8 files changed, 923 insertions, 0 deletions
diff --git a/pkg/flipcall/BUILD b/pkg/flipcall/BUILD
new file mode 100644
index 000000000..7126fc45f
--- /dev/null
+++ b/pkg/flipcall/BUILD
@@ -0,0 +1,31 @@
+load("//tools/go_stateify:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "flipcall",
+ srcs = [
+ "endpoint_futex.go",
+ "endpoint_unsafe.go",
+ "flipcall.go",
+ "futex_linux.go",
+ "packet_window_allocator.go",
+ ],
+ importpath = "gvisor.dev/gvisor/pkg/flipcall",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/abi/linux",
+ "//pkg/log",
+ "//pkg/memutil",
+ ],
+)
+
+go_test(
+ name = "flipcall_test",
+ size = "small",
+ srcs = [
+ "flipcall_example_test.go",
+ "flipcall_test.go",
+ ],
+ embed = [":flipcall"],
+)
diff --git a/pkg/flipcall/endpoint_futex.go b/pkg/flipcall/endpoint_futex.go
new file mode 100644
index 000000000..5cab02b1d
--- /dev/null
+++ b/pkg/flipcall/endpoint_futex.go
@@ -0,0 +1,45 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "fmt"
+)
+
+type endpointControlState struct{}
+
+func (ep *Endpoint) initControlState(ctrlMode ControlMode) error {
+ if ctrlMode != ControlModeFutex {
+ return fmt.Errorf("unsupported control mode: %v", ctrlMode)
+ }
+ return nil
+}
+
+func (ep *Endpoint) doRoundTrip() error {
+ return ep.doFutexRoundTrip()
+}
+
+func (ep *Endpoint) doWaitFirst() error {
+ return ep.doFutexWaitFirst()
+}
+
+func (ep *Endpoint) doNotifyLast() error {
+ return ep.doFutexNotifyLast()
+}
+
+// Preconditions: ep.isShutdown() == true.
+func (ep *Endpoint) interruptForShutdown() {
+ ep.doFutexInterruptForShutdown()
+}
diff --git a/pkg/flipcall/endpoint_unsafe.go b/pkg/flipcall/endpoint_unsafe.go
new file mode 100644
index 000000000..8319955e0
--- /dev/null
+++ b/pkg/flipcall/endpoint_unsafe.go
@@ -0,0 +1,238 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "fmt"
+ "math"
+ "reflect"
+ "sync/atomic"
+ "syscall"
+ "unsafe"
+)
+
+// An Endpoint provides the ability to synchronously transfer data and control
+// to a connected peer Endpoint, which may be in another process.
+//
+// Since the Endpoint control transfer model is synchronous, at any given time
+// one Endpoint "has control" (designated the *active* Endpoint), and the other
+// is "waiting for control" (designated the *inactive* Endpoint). Users of the
+// flipcall package arbitrarily designate one Endpoint as initially-active, and
+// the other as initially-inactive; in a client/server protocol, the client
+// Endpoint is usually initially-active (able to send a request) and the server
+// Endpoint is usually initially-inactive (waiting for a request). The
+// initially-active Endpoint writes data to be sent to Endpoint.Data(), and
+// then synchronously transfers control to the inactive Endpoint by calling
+// Endpoint.SendRecv(), becoming the inactive Endpoint in the process. The
+// initially-inactive Endpoint waits for control by calling
+// Endpoint.RecvFirst(); receiving control causes it to become the active
+// Endpoint. After this, the protocol is symmetric: the active Endpoint reads
+// data sent by the peer by reading from Endpoint.Data(), writes data to be
+// sent to the peer into Endpoint.Data(), and then calls Endpoint.SendRecv() to
+// exchange roles with the peer, which blocks until the peer has done the same.
+type Endpoint struct {
+ // shutdown is non-zero if Endpoint.Shutdown() has been called. shutdown is
+ // accessed using atomic memory operations.
+ shutdown uint32
+
+ // dataCap is the size of the datagram part of the packet window in bytes.
+ // dataCap is immutable.
+ dataCap uint32
+
+ // packet is the beginning of the packet window. packet is immutable.
+ packet unsafe.Pointer
+
+ ctrl endpointControlState
+}
+
+// Init must be called on zero-value Endpoints before first use. If it
+// succeeds, Destroy() must be called once the Endpoint is no longer in use.
+//
+// ctrlMode specifies how connected Endpoints will exchange control. Both
+// connected Endpoints must specify the same value for ctrlMode.
+//
+// pwd represents the packet window used to exchange data with the peer
+// Endpoint. FD may differ between Endpoints if they are in different
+// processes, but must represent the same file. The packet window must
+// initially be filled with zero bytes.
+func (ep *Endpoint) Init(ctrlMode ControlMode, pwd PacketWindowDescriptor) error {
+ if pwd.Length < pageSize {
+ return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize)
+ }
+ if pwd.Length > math.MaxUint32 {
+ return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32)
+ }
+ m, _, e := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset))
+ if e != 0 {
+ return fmt.Errorf("failed to mmap packet window: %v", e)
+ }
+ ep.dataCap = uint32(pwd.Length) - uint32(packetHeaderBytes)
+ ep.packet = (unsafe.Pointer)(m)
+ if err := ep.initControlState(ctrlMode); err != nil {
+ ep.unmapPacket()
+ return err
+ }
+ return nil
+}
+
+// NewEndpoint is a convenience function that returns an initialized Endpoint
+// allocated on the heap.
+func NewEndpoint(ctrlMode ControlMode, pwd PacketWindowDescriptor) (*Endpoint, error) {
+ var ep Endpoint
+ if err := ep.Init(ctrlMode, pwd); err != nil {
+ return nil, err
+ }
+ return &ep, nil
+}
+
+func (ep *Endpoint) unmapPacket() {
+ syscall.Syscall(syscall.SYS_MUNMAP, uintptr(ep.packet), uintptr(ep.dataCap)+packetHeaderBytes, 0)
+ ep.dataCap = 0
+ ep.packet = nil
+}
+
+// Destroy releases resources owned by ep. No other Endpoint methods may be
+// called after Destroy.
+func (ep *Endpoint) Destroy() {
+ ep.unmapPacket()
+}
+
+// Packets consist of an 8-byte header followed by an arbitrarily-sized
+// datagram. The header consists of:
+//
+// - A 4-byte native-endian sequence number, which is incremented by the active
+// Endpoint after it finishes writing to the packet window. The sequence number
+// is needed to handle spurious wakeups.
+//
+// - A 4-byte native-endian datagram length in bytes.
+const (
+ sizeofUint32 = unsafe.Sizeof(uint32(0))
+ packetHeaderBytes = 2 * sizeofUint32
+)
+
+func (ep *Endpoint) seq() *uint32 {
+ return (*uint32)(ep.packet)
+}
+
+func (ep *Endpoint) dataLen() *uint32 {
+ return (*uint32)((unsafe.Pointer)(uintptr(ep.packet) + sizeofUint32))
+}
+
+// DataCap returns the maximum datagram size supported by ep in bytes.
+func (ep *Endpoint) DataCap() uint32 {
+ return ep.dataCap
+}
+
+func (ep *Endpoint) data() unsafe.Pointer {
+ return unsafe.Pointer(uintptr(ep.packet) + packetHeaderBytes)
+}
+
+// Data returns the datagram part of ep's packet window as a byte slice.
+//
+// Note that the packet window is shared with the potentially-untrusted peer
+// Endpoint, which may concurrently mutate the contents of the packet window.
+// Thus:
+//
+// - Readers must not assume that two reads of the same byte in Data() will
+// return the same result. In other words, readers should read any given byte
+// in Data() at most once.
+//
+// - Writers must not assume that they will read back the same data that they
+// have written. In other words, writers should avoid reading from Data() at
+// all.
+func (ep *Endpoint) Data() []byte {
+ var bs []byte
+ bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs))
+ bsReflect.Data = uintptr(ep.data())
+ bsReflect.Len = int(ep.DataCap())
+ bsReflect.Cap = bsReflect.Len
+ return bs
+}
+
+// SendRecv transfers control to the peer Endpoint, causing its call to
+// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given
+// datagram length, then blocks until the peer Endpoint calls
+// Endpoint.SendRecv() or Endpoint.SendLast().
+//
+// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
+// returned an error. ep.SendLast() has never been called.
+func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
+ dataCap := ep.DataCap()
+ if dataLen > dataCap {
+ return 0, fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
+ }
+ atomic.StoreUint32(ep.dataLen(), dataLen)
+ if err := ep.doRoundTrip(); err != nil {
+ return 0, err
+ }
+ recvDataLen := atomic.LoadUint32(ep.dataLen())
+ if recvDataLen > dataCap {
+ return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
+ }
+ return recvDataLen, nil
+}
+
+// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then
+// returns the datagram length specified by that call.
+//
+// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never
+// been called.
+func (ep *Endpoint) RecvFirst() (uint32, error) {
+ if err := ep.doWaitFirst(); err != nil {
+ return 0, err
+ }
+ recvDataLen := atomic.LoadUint32(ep.dataLen())
+ if dataCap := ep.DataCap(); recvDataLen > dataCap {
+ return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
+ }
+ return recvDataLen, nil
+}
+
+// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or
+// Endpoint.RecvFirst() to return with the given datagram length.
+//
+// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
+// returned an error. ep.SendLast() has never been called.
+func (ep *Endpoint) SendLast(dataLen uint32) error {
+ dataCap := ep.DataCap()
+ if dataLen > dataCap {
+ return fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
+ }
+ atomic.StoreUint32(ep.dataLen(), dataLen)
+ if err := ep.doNotifyLast(); err != nil {
+ return err
+ }
+ return nil
+}
+
+// Shutdown causes concurrent and future calls to ep.SendRecv(),
+// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not
+// wait for concurrent calls to return.
+func (ep *Endpoint) Shutdown() {
+ if atomic.SwapUint32(&ep.shutdown, 1) == 0 {
+ ep.interruptForShutdown()
+ }
+}
+
+func (ep *Endpoint) isShutdown() bool {
+ return atomic.LoadUint32(&ep.shutdown) != 0
+}
+
+type endpointShutdownError struct{}
+
+// Error implements error.Error.
+func (endpointShutdownError) Error() string {
+ return "Endpoint.Shutdown() has been called"
+}
diff --git a/pkg/flipcall/flipcall.go b/pkg/flipcall/flipcall.go
new file mode 100644
index 000000000..79a1e418a
--- /dev/null
+++ b/pkg/flipcall/flipcall.go
@@ -0,0 +1,32 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package flipcall implements a protocol providing Fast Local Interprocess
+// Procedure Calls.
+package flipcall
+
+// ControlMode defines how control is exchanged across a connection.
+type ControlMode uint8
+
+const (
+ // ControlModeInvalid is invalid, and exists so that ControlMode fields in
+ // structs must be explicitly initialized.
+ ControlModeInvalid ControlMode = iota
+
+ // ControlModeFutex uses shared futex operations on packet control words.
+ ControlModeFutex
+
+ // controlModeCount is the number of ControlModes in this list.
+ controlModeCount
+)
diff --git a/pkg/flipcall/flipcall_example_test.go b/pkg/flipcall/flipcall_example_test.go
new file mode 100644
index 000000000..572a1f119
--- /dev/null
+++ b/pkg/flipcall/flipcall_example_test.go
@@ -0,0 +1,106 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "bytes"
+ "fmt"
+)
+
+func Example() {
+ const (
+ reqPrefix = "request "
+ respPrefix = "response "
+ count = 3
+ maxMessageLen = len(respPrefix) + 1 // 1 digit
+ )
+
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ panic(err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(PacketWindowLengthForDataCap(uint32(maxMessageLen)))
+ if err != nil {
+ panic(err)
+ }
+ clientEP, err := NewEndpoint(ControlModeFutex, pwd)
+ if err != nil {
+ panic(err)
+ }
+ defer clientEP.Destroy()
+ serverEP, err := NewEndpoint(ControlModeFutex, pwd)
+ if err != nil {
+ panic(err)
+ }
+ defer serverEP.Destroy()
+
+ serverDone := make(chan struct{})
+ go func() {
+ defer func() { serverDone <- struct{}{} }()
+ i := 0
+ var buf bytes.Buffer
+ // wait for first request
+ n, err := serverEP.RecvFirst()
+ if err != nil {
+ return
+ }
+ for {
+ // read request
+ buf.Reset()
+ buf.Write(serverEP.Data()[:n])
+ fmt.Println(buf.String())
+ // write response
+ buf.Reset()
+ fmt.Fprintf(&buf, "%s%d", respPrefix, i)
+ copy(serverEP.Data(), buf.Bytes())
+ // send response and wait for next request
+ n, err = serverEP.SendRecv(uint32(buf.Len()))
+ if err != nil {
+ return
+ }
+ i++
+ }
+ }()
+ defer func() {
+ serverEP.Shutdown()
+ <-serverDone
+ }()
+
+ var buf bytes.Buffer
+ for i := 0; i < count; i++ {
+ // write request
+ buf.Reset()
+ fmt.Fprintf(&buf, "%s%d", reqPrefix, i)
+ copy(clientEP.Data(), buf.Bytes())
+ // send request and wait for response
+ n, err := clientEP.SendRecv(uint32(buf.Len()))
+ if err != nil {
+ panic(err)
+ }
+ // read response
+ buf.Reset()
+ buf.Write(clientEP.Data()[:n])
+ fmt.Println(buf.String())
+ }
+
+ // Output:
+ // request 0
+ // response 0
+ // request 1
+ // response 1
+ // request 2
+ // response 2
+}
diff --git a/pkg/flipcall/flipcall_test.go b/pkg/flipcall/flipcall_test.go
new file mode 100644
index 000000000..20d3002f0
--- /dev/null
+++ b/pkg/flipcall/flipcall_test.go
@@ -0,0 +1,211 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "testing"
+ "time"
+)
+
+var testPacketWindowSize = pageSize
+
+func testSendRecv(t *testing.T, ctrlMode ControlMode) {
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ t.Fatalf("failed to create PacketWindowAllocator: %v", err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(testPacketWindowSize)
+ if err != nil {
+ t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ }
+
+ sendEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer sendEP.Destroy()
+ recvEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer recvEP.Destroy()
+
+ otherThreadDone := make(chan struct{})
+ go func() {
+ defer func() { otherThreadDone <- struct{}{} }()
+ t.Logf("initially-inactive Endpoint waiting for packet 1")
+ if _, err := recvEP.RecvFirst(); err != nil {
+ t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err)
+ }
+ t.Logf("initially-inactive Endpoint got packet 1, sending packet 2 and waiting for packet 3")
+ if _, err := recvEP.SendRecv(0); err != nil {
+ t.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err)
+ }
+ t.Logf("initially-inactive Endpoint got packet 3")
+ }()
+ defer func() {
+ t.Logf("waiting for initially-inactive Endpoint goroutine to complete")
+ <-otherThreadDone
+ }()
+
+ t.Logf("initially-active Endpoint sending packet 1 and waiting for packet 2")
+ if _, err := sendEP.SendRecv(0); err != nil {
+ t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err)
+ }
+ t.Logf("initially-active Endpoint got packet 2, sending packet 3")
+ if err := sendEP.SendLast(0); err != nil {
+ t.Fatalf("initially-active Endpoint.SendLast() failed: %v", err)
+ }
+}
+
+func TestFutexSendRecv(t *testing.T) {
+ testSendRecv(t, ControlModeFutex)
+}
+
+func testRecvFirstShutdown(t *testing.T, ctrlMode ControlMode) {
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ t.Fatalf("failed to create PacketWindowAllocator: %v", err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(testPacketWindowSize)
+ if err != nil {
+ t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ }
+
+ ep, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer ep.Destroy()
+
+ otherThreadDone := make(chan struct{})
+ go func() {
+ defer func() { otherThreadDone <- struct{}{} }()
+ _, err := ep.RecvFirst()
+ if err == nil {
+ t.Errorf("Endpoint.RecvFirst() succeeded unexpectedly")
+ }
+ }()
+
+ time.Sleep(time.Second) // to ensure ep.RecvFirst() has blocked
+ ep.Shutdown()
+ <-otherThreadDone
+}
+
+func TestFutexRecvFirstShutdown(t *testing.T) {
+ testRecvFirstShutdown(t, ControlModeFutex)
+}
+
+func testSendRecvShutdown(t *testing.T, ctrlMode ControlMode) {
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ t.Fatalf("failed to create PacketWindowAllocator: %v", err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(testPacketWindowSize)
+ if err != nil {
+ t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ }
+
+ sendEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer sendEP.Destroy()
+ recvEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer recvEP.Destroy()
+
+ otherThreadDone := make(chan struct{})
+ go func() {
+ defer func() { otherThreadDone <- struct{}{} }()
+ if _, err := recvEP.RecvFirst(); err != nil {
+ t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err)
+ }
+ if _, err := recvEP.SendRecv(0); err == nil {
+ t.Errorf("initially-inactive Endpoint.SendRecv() succeeded unexpectedly")
+ }
+ }()
+
+ if _, err := sendEP.SendRecv(0); err != nil {
+ t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err)
+ }
+ time.Sleep(time.Second) // to ensure recvEP.SendRecv() has blocked
+ recvEP.Shutdown()
+ <-otherThreadDone
+}
+
+func TestFutexSendRecvShutdown(t *testing.T) {
+ testSendRecvShutdown(t, ControlModeFutex)
+}
+
+func benchmarkSendRecv(b *testing.B, ctrlMode ControlMode) {
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ b.Fatalf("failed to create PacketWindowAllocator: %v", err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(testPacketWindowSize)
+ if err != nil {
+ b.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ }
+
+ sendEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ b.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer sendEP.Destroy()
+ recvEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ b.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer recvEP.Destroy()
+
+ otherThreadDone := make(chan struct{})
+ go func() {
+ defer func() { otherThreadDone <- struct{}{} }()
+ if b.N == 0 {
+ return
+ }
+ if _, err := recvEP.RecvFirst(); err != nil {
+ b.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err)
+ }
+ for i := 1; i < b.N; i++ {
+ if _, err := recvEP.SendRecv(0); err != nil {
+ b.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err)
+ }
+ }
+ if err := recvEP.SendLast(0); err != nil {
+ b.Fatalf("initially-inactive Endpoint.SendLast() failed: %v", err)
+ }
+ }()
+ defer func() { <-otherThreadDone }()
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if _, err := sendEP.SendRecv(0); err != nil {
+ b.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err)
+ }
+ }
+ b.StopTimer()
+}
+
+func BenchmarkFutexSendRecv(b *testing.B) {
+ benchmarkSendRecv(b, ControlModeFutex)
+}
diff --git a/pkg/flipcall/futex_linux.go b/pkg/flipcall/futex_linux.go
new file mode 100644
index 000000000..3f592ad16
--- /dev/null
+++ b/pkg/flipcall/futex_linux.go
@@ -0,0 +1,94 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+package flipcall
+
+import (
+ "fmt"
+ "math"
+ "sync/atomic"
+ "syscall"
+
+ "gvisor.dev/gvisor/pkg/abi/linux"
+ "gvisor.dev/gvisor/pkg/log"
+)
+
+func (ep *Endpoint) doFutexRoundTrip() error {
+ ourSeq, err := ep.doFutexNotifySeq()
+ if err != nil {
+ return err
+ }
+ return ep.doFutexWaitSeq(ourSeq)
+}
+
+func (ep *Endpoint) doFutexWaitFirst() error {
+ return ep.doFutexWaitSeq(0)
+}
+
+func (ep *Endpoint) doFutexNotifyLast() error {
+ _, err := ep.doFutexNotifySeq()
+ return err
+}
+
+func (ep *Endpoint) doFutexNotifySeq() (uint32, error) {
+ ourSeq := atomic.AddUint32(ep.seq(), 1)
+ if err := ep.futexWake(1); err != nil {
+ return ourSeq, fmt.Errorf("failed to FUTEX_WAKE peer Endpoint: %v", err)
+ }
+ return ourSeq, nil
+}
+
+func (ep *Endpoint) doFutexWaitSeq(prevSeq uint32) error {
+ nextSeq := prevSeq + 1
+ for {
+ if ep.isShutdown() {
+ return endpointShutdownError{}
+ }
+ if err := ep.futexWait(prevSeq); err != nil {
+ return fmt.Errorf("failed to FUTEX_WAIT for peer Endpoint: %v", err)
+ }
+ seq := atomic.LoadUint32(ep.seq())
+ if seq == nextSeq {
+ return nil
+ }
+ if seq != prevSeq {
+ return fmt.Errorf("invalid packet sequence number %d (expected %d or %d)", seq, prevSeq, nextSeq)
+ }
+ }
+}
+
+func (ep *Endpoint) doFutexInterruptForShutdown() {
+ // Wake MaxInt32 threads to prevent a malicious or broken peer from
+ // swallowing our wakeup by FUTEX_WAITing from multiple threads.
+ if err := ep.futexWake(math.MaxInt32); err != nil {
+ log.Warningf("failed to FUTEX_WAKE Endpoint: %v", err)
+ }
+}
+
+func (ep *Endpoint) futexWake(numThreads int32) error {
+ if _, _, e := syscall.RawSyscall(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAKE, uintptr(numThreads)); e != 0 {
+ return e
+ }
+ return nil
+}
+
+func (ep *Endpoint) futexWait(seq uint32) error {
+ _, _, e := syscall.Syscall6(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAIT, uintptr(seq), 0, 0, 0)
+ if e != 0 && e != syscall.EAGAIN && e != syscall.EINTR {
+ return e
+ }
+ return nil
+}
diff --git a/pkg/flipcall/packet_window_allocator.go b/pkg/flipcall/packet_window_allocator.go
new file mode 100644
index 000000000..7b455b24d
--- /dev/null
+++ b/pkg/flipcall/packet_window_allocator.go
@@ -0,0 +1,166 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "fmt"
+ "math/bits"
+ "os"
+ "syscall"
+
+ "gvisor.dev/gvisor/pkg/abi/linux"
+ "gvisor.dev/gvisor/pkg/memutil"
+)
+
+var (
+ pageSize = os.Getpagesize()
+ pageMask = pageSize - 1
+)
+
+func init() {
+ if bits.OnesCount(uint(pageSize)) != 1 {
+ // This is depended on by roundUpToPage().
+ panic(fmt.Sprintf("system page size (%d) is not a power of 2", pageSize))
+ }
+ if uintptr(pageSize) < packetHeaderBytes {
+ // This is required since Endpoint.Init() imposes a minimum packet
+ // window size of 1 page.
+ panic(fmt.Sprintf("system page size (%d) is less than packet header size (%d)", pageSize, packetHeaderBytes))
+ }
+}
+
+// PacketWindowDescriptor represents a packet window, a range of pages in a
+// shared memory file that is used to exchange packets between partner
+// Endpoints.
+type PacketWindowDescriptor struct {
+ // FD is the file descriptor representing the shared memory file.
+ FD int
+
+ // Offset is the offset into the shared memory file at which the packet
+ // window begins.
+ Offset int64
+
+ // Length is the size of the packet window in bytes.
+ Length int
+}
+
+// PacketWindowLengthForDataCap returns the minimum packet window size required
+// to accommodate datagrams of the given size in bytes.
+func PacketWindowLengthForDataCap(dataCap uint32) int {
+ return roundUpToPage(int(dataCap) + int(packetHeaderBytes))
+}
+
+func roundUpToPage(x int) int {
+ return (x + pageMask) &^ pageMask
+}
+
+// A PacketWindowAllocator owns a shared memory file, and allocates packet
+// windows from it.
+type PacketWindowAllocator struct {
+ fd int
+ nextAlloc int64
+ fileSize int64
+}
+
+// Init must be called on zero-value PacketWindowAllocators before first use.
+// If it succeeds, Destroy() must be called once the PacketWindowAllocator is
+// no longer in use.
+func (pwa *PacketWindowAllocator) Init() error {
+ fd, err := memutil.CreateMemFD("flipcall_packet_windows", linux.MFD_CLOEXEC|linux.MFD_ALLOW_SEALING)
+ if err != nil {
+ return fmt.Errorf("failed to create memfd: %v", err)
+ }
+ // Apply F_SEAL_SHRINK to prevent either party from causing SIGBUS in the
+ // other by truncating the file, and F_SEAL_SEAL to prevent either party
+ // from applying F_SEAL_GROW or F_SEAL_WRITE.
+ if _, _, e := syscall.RawSyscall(syscall.SYS_FCNTL, uintptr(fd), linux.F_ADD_SEALS, linux.F_SEAL_SHRINK|linux.F_SEAL_SEAL); e != 0 {
+ syscall.Close(fd)
+ return fmt.Errorf("failed to apply memfd seals: %v", e)
+ }
+ pwa.fd = fd
+ return nil
+}
+
+// NewPacketWindowAllocator is a convenience function that returns an
+// initialized PacketWindowAllocator allocated on the heap.
+func NewPacketWindowAllocator() (*PacketWindowAllocator, error) {
+ var pwa PacketWindowAllocator
+ if err := pwa.Init(); err != nil {
+ return nil, err
+ }
+ return &pwa, nil
+}
+
+// Destroy releases resources owned by pwa. This invalidates file descriptors
+// previously returned by pwa.FD() and pwd.Allocate().
+func (pwa *PacketWindowAllocator) Destroy() {
+ syscall.Close(pwa.fd)
+}
+
+// FD represents the file descriptor of the shared memory file backing pwa.
+func (pwa *PacketWindowAllocator) FD() int {
+ return pwa.fd
+}
+
+// Allocate allocates a new packet window of at least the given size and
+// returns a PacketWindowDescriptor representing it.
+//
+// Preconditions: size > 0.
+func (pwa *PacketWindowAllocator) Allocate(size int) (PacketWindowDescriptor, error) {
+ if size <= 0 {
+ return PacketWindowDescriptor{}, fmt.Errorf("invalid size: %d", size)
+ }
+ // Page-align size to ensure that pwa.nextAlloc remains page-aligned.
+ size = roundUpToPage(size)
+ if size <= 0 {
+ return PacketWindowDescriptor{}, fmt.Errorf("size %d overflows after rounding up to page size", size)
+ }
+ end := pwa.nextAlloc + int64(size) // overflow checked by ensureFileSize
+ if err := pwa.ensureFileSize(end); err != nil {
+ return PacketWindowDescriptor{}, err
+ }
+ start := pwa.nextAlloc
+ pwa.nextAlloc = end
+ return PacketWindowDescriptor{
+ FD: pwa.fd,
+ Offset: start,
+ Length: size,
+ }, nil
+}
+
+func (pwa *PacketWindowAllocator) ensureFileSize(min int64) error {
+ if min <= 0 {
+ return fmt.Errorf("file size would overflow")
+ }
+ if pwa.fileSize >= min {
+ return nil
+ }
+ newSize := 2 * pwa.fileSize
+ if newSize == 0 {
+ newSize = int64(pageSize)
+ }
+ for newSize < min {
+ newNewSize := newSize * 2
+ if newNewSize <= 0 {
+ return fmt.Errorf("file size would overflow")
+ }
+ newSize = newNewSize
+ }
+ if err := syscall.Ftruncate(pwa.fd, newSize); err != nil {
+ return fmt.Errorf("ftruncate failed: %v", err)
+ }
+ pwa.fileSize = newSize
+ return nil
+}