summaryrefslogtreecommitdiffhomepage
path: root/pkg/flipcall
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/flipcall')
-rw-r--r--pkg/flipcall/BUILD31
-rw-r--r--pkg/flipcall/endpoint_futex.go45
-rw-r--r--pkg/flipcall/endpoint_unsafe.go238
-rw-r--r--pkg/flipcall/flipcall.go32
-rw-r--r--pkg/flipcall/flipcall_example_test.go106
-rw-r--r--pkg/flipcall/flipcall_test.go211
-rw-r--r--pkg/flipcall/futex_linux.go94
-rw-r--r--pkg/flipcall/packet_window_allocator.go166
8 files changed, 923 insertions, 0 deletions
diff --git a/pkg/flipcall/BUILD b/pkg/flipcall/BUILD
new file mode 100644
index 000000000..7126fc45f
--- /dev/null
+++ b/pkg/flipcall/BUILD
@@ -0,0 +1,31 @@
+load("//tools/go_stateify:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "flipcall",
+ srcs = [
+ "endpoint_futex.go",
+ "endpoint_unsafe.go",
+ "flipcall.go",
+ "futex_linux.go",
+ "packet_window_allocator.go",
+ ],
+ importpath = "gvisor.dev/gvisor/pkg/flipcall",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/abi/linux",
+ "//pkg/log",
+ "//pkg/memutil",
+ ],
+)
+
+go_test(
+ name = "flipcall_test",
+ size = "small",
+ srcs = [
+ "flipcall_example_test.go",
+ "flipcall_test.go",
+ ],
+ embed = [":flipcall"],
+)
diff --git a/pkg/flipcall/endpoint_futex.go b/pkg/flipcall/endpoint_futex.go
new file mode 100644
index 000000000..5cab02b1d
--- /dev/null
+++ b/pkg/flipcall/endpoint_futex.go
@@ -0,0 +1,45 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "fmt"
+)
+
+type endpointControlState struct{}
+
+func (ep *Endpoint) initControlState(ctrlMode ControlMode) error {
+ if ctrlMode != ControlModeFutex {
+ return fmt.Errorf("unsupported control mode: %v", ctrlMode)
+ }
+ return nil
+}
+
+func (ep *Endpoint) doRoundTrip() error {
+ return ep.doFutexRoundTrip()
+}
+
+func (ep *Endpoint) doWaitFirst() error {
+ return ep.doFutexWaitFirst()
+}
+
+func (ep *Endpoint) doNotifyLast() error {
+ return ep.doFutexNotifyLast()
+}
+
+// Preconditions: ep.isShutdown() == true.
+func (ep *Endpoint) interruptForShutdown() {
+ ep.doFutexInterruptForShutdown()
+}
diff --git a/pkg/flipcall/endpoint_unsafe.go b/pkg/flipcall/endpoint_unsafe.go
new file mode 100644
index 000000000..8319955e0
--- /dev/null
+++ b/pkg/flipcall/endpoint_unsafe.go
@@ -0,0 +1,238 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "fmt"
+ "math"
+ "reflect"
+ "sync/atomic"
+ "syscall"
+ "unsafe"
+)
+
+// An Endpoint provides the ability to synchronously transfer data and control
+// to a connected peer Endpoint, which may be in another process.
+//
+// Since the Endpoint control transfer model is synchronous, at any given time
+// one Endpoint "has control" (designated the *active* Endpoint), and the other
+// is "waiting for control" (designated the *inactive* Endpoint). Users of the
+// flipcall package arbitrarily designate one Endpoint as initially-active, and
+// the other as initially-inactive; in a client/server protocol, the client
+// Endpoint is usually initially-active (able to send a request) and the server
+// Endpoint is usually initially-inactive (waiting for a request). The
+// initially-active Endpoint writes data to be sent to Endpoint.Data(), and
+// then synchronously transfers control to the inactive Endpoint by calling
+// Endpoint.SendRecv(), becoming the inactive Endpoint in the process. The
+// initially-inactive Endpoint waits for control by calling
+// Endpoint.RecvFirst(); receiving control causes it to become the active
+// Endpoint. After this, the protocol is symmetric: the active Endpoint reads
+// data sent by the peer by reading from Endpoint.Data(), writes data to be
+// sent to the peer into Endpoint.Data(), and then calls Endpoint.SendRecv() to
+// exchange roles with the peer, which blocks until the peer has done the same.
+type Endpoint struct {
+ // shutdown is non-zero if Endpoint.Shutdown() has been called. shutdown is
+ // accessed using atomic memory operations.
+ shutdown uint32
+
+ // dataCap is the size of the datagram part of the packet window in bytes.
+ // dataCap is immutable.
+ dataCap uint32
+
+ // packet is the beginning of the packet window. packet is immutable.
+ packet unsafe.Pointer
+
+ ctrl endpointControlState
+}
+
+// Init must be called on zero-value Endpoints before first use. If it
+// succeeds, Destroy() must be called once the Endpoint is no longer in use.
+//
+// ctrlMode specifies how connected Endpoints will exchange control. Both
+// connected Endpoints must specify the same value for ctrlMode.
+//
+// pwd represents the packet window used to exchange data with the peer
+// Endpoint. FD may differ between Endpoints if they are in different
+// processes, but must represent the same file. The packet window must
+// initially be filled with zero bytes.
+func (ep *Endpoint) Init(ctrlMode ControlMode, pwd PacketWindowDescriptor) error {
+ if pwd.Length < pageSize {
+ return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize)
+ }
+ if pwd.Length > math.MaxUint32 {
+ return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32)
+ }
+ m, _, e := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset))
+ if e != 0 {
+ return fmt.Errorf("failed to mmap packet window: %v", e)
+ }
+ ep.dataCap = uint32(pwd.Length) - uint32(packetHeaderBytes)
+ ep.packet = (unsafe.Pointer)(m)
+ if err := ep.initControlState(ctrlMode); err != nil {
+ ep.unmapPacket()
+ return err
+ }
+ return nil
+}
+
+// NewEndpoint is a convenience function that returns an initialized Endpoint
+// allocated on the heap.
+func NewEndpoint(ctrlMode ControlMode, pwd PacketWindowDescriptor) (*Endpoint, error) {
+ var ep Endpoint
+ if err := ep.Init(ctrlMode, pwd); err != nil {
+ return nil, err
+ }
+ return &ep, nil
+}
+
+func (ep *Endpoint) unmapPacket() {
+ syscall.Syscall(syscall.SYS_MUNMAP, uintptr(ep.packet), uintptr(ep.dataCap)+packetHeaderBytes, 0)
+ ep.dataCap = 0
+ ep.packet = nil
+}
+
+// Destroy releases resources owned by ep. No other Endpoint methods may be
+// called after Destroy.
+func (ep *Endpoint) Destroy() {
+ ep.unmapPacket()
+}
+
+// Packets consist of an 8-byte header followed by an arbitrarily-sized
+// datagram. The header consists of:
+//
+// - A 4-byte native-endian sequence number, which is incremented by the active
+// Endpoint after it finishes writing to the packet window. The sequence number
+// is needed to handle spurious wakeups.
+//
+// - A 4-byte native-endian datagram length in bytes.
+const (
+ sizeofUint32 = unsafe.Sizeof(uint32(0))
+ packetHeaderBytes = 2 * sizeofUint32
+)
+
+func (ep *Endpoint) seq() *uint32 {
+ return (*uint32)(ep.packet)
+}
+
+func (ep *Endpoint) dataLen() *uint32 {
+ return (*uint32)((unsafe.Pointer)(uintptr(ep.packet) + sizeofUint32))
+}
+
+// DataCap returns the maximum datagram size supported by ep in bytes.
+func (ep *Endpoint) DataCap() uint32 {
+ return ep.dataCap
+}
+
+func (ep *Endpoint) data() unsafe.Pointer {
+ return unsafe.Pointer(uintptr(ep.packet) + packetHeaderBytes)
+}
+
+// Data returns the datagram part of ep's packet window as a byte slice.
+//
+// Note that the packet window is shared with the potentially-untrusted peer
+// Endpoint, which may concurrently mutate the contents of the packet window.
+// Thus:
+//
+// - Readers must not assume that two reads of the same byte in Data() will
+// return the same result. In other words, readers should read any given byte
+// in Data() at most once.
+//
+// - Writers must not assume that they will read back the same data that they
+// have written. In other words, writers should avoid reading from Data() at
+// all.
+func (ep *Endpoint) Data() []byte {
+ var bs []byte
+ bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs))
+ bsReflect.Data = uintptr(ep.data())
+ bsReflect.Len = int(ep.DataCap())
+ bsReflect.Cap = bsReflect.Len
+ return bs
+}
+
+// SendRecv transfers control to the peer Endpoint, causing its call to
+// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given
+// datagram length, then blocks until the peer Endpoint calls
+// Endpoint.SendRecv() or Endpoint.SendLast().
+//
+// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
+// returned an error. ep.SendLast() has never been called.
+func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
+ dataCap := ep.DataCap()
+ if dataLen > dataCap {
+ return 0, fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
+ }
+ atomic.StoreUint32(ep.dataLen(), dataLen)
+ if err := ep.doRoundTrip(); err != nil {
+ return 0, err
+ }
+ recvDataLen := atomic.LoadUint32(ep.dataLen())
+ if recvDataLen > dataCap {
+ return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
+ }
+ return recvDataLen, nil
+}
+
+// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then
+// returns the datagram length specified by that call.
+//
+// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never
+// been called.
+func (ep *Endpoint) RecvFirst() (uint32, error) {
+ if err := ep.doWaitFirst(); err != nil {
+ return 0, err
+ }
+ recvDataLen := atomic.LoadUint32(ep.dataLen())
+ if dataCap := ep.DataCap(); recvDataLen > dataCap {
+ return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
+ }
+ return recvDataLen, nil
+}
+
+// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or
+// Endpoint.RecvFirst() to return with the given datagram length.
+//
+// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
+// returned an error. ep.SendLast() has never been called.
+func (ep *Endpoint) SendLast(dataLen uint32) error {
+ dataCap := ep.DataCap()
+ if dataLen > dataCap {
+ return fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
+ }
+ atomic.StoreUint32(ep.dataLen(), dataLen)
+ if err := ep.doNotifyLast(); err != nil {
+ return err
+ }
+ return nil
+}
+
+// Shutdown causes concurrent and future calls to ep.SendRecv(),
+// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not
+// wait for concurrent calls to return.
+func (ep *Endpoint) Shutdown() {
+ if atomic.SwapUint32(&ep.shutdown, 1) == 0 {
+ ep.interruptForShutdown()
+ }
+}
+
+func (ep *Endpoint) isShutdown() bool {
+ return atomic.LoadUint32(&ep.shutdown) != 0
+}
+
+type endpointShutdownError struct{}
+
+// Error implements error.Error.
+func (endpointShutdownError) Error() string {
+ return "Endpoint.Shutdown() has been called"
+}
diff --git a/pkg/flipcall/flipcall.go b/pkg/flipcall/flipcall.go
new file mode 100644
index 000000000..79a1e418a
--- /dev/null
+++ b/pkg/flipcall/flipcall.go
@@ -0,0 +1,32 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package flipcall implements a protocol providing Fast Local Interprocess
+// Procedure Calls.
+package flipcall
+
+// ControlMode defines how control is exchanged across a connection.
+type ControlMode uint8
+
+const (
+ // ControlModeInvalid is invalid, and exists so that ControlMode fields in
+ // structs must be explicitly initialized.
+ ControlModeInvalid ControlMode = iota
+
+ // ControlModeFutex uses shared futex operations on packet control words.
+ ControlModeFutex
+
+ // controlModeCount is the number of ControlModes in this list.
+ controlModeCount
+)
diff --git a/pkg/flipcall/flipcall_example_test.go b/pkg/flipcall/flipcall_example_test.go
new file mode 100644
index 000000000..572a1f119
--- /dev/null
+++ b/pkg/flipcall/flipcall_example_test.go
@@ -0,0 +1,106 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "bytes"
+ "fmt"
+)
+
+func Example() {
+ const (
+ reqPrefix = "request "
+ respPrefix = "response "
+ count = 3
+ maxMessageLen = len(respPrefix) + 1 // 1 digit
+ )
+
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ panic(err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(PacketWindowLengthForDataCap(uint32(maxMessageLen)))
+ if err != nil {
+ panic(err)
+ }
+ clientEP, err := NewEndpoint(ControlModeFutex, pwd)
+ if err != nil {
+ panic(err)
+ }
+ defer clientEP.Destroy()
+ serverEP, err := NewEndpoint(ControlModeFutex, pwd)
+ if err != nil {
+ panic(err)
+ }
+ defer serverEP.Destroy()
+
+ serverDone := make(chan struct{})
+ go func() {
+ defer func() { serverDone <- struct{}{} }()
+ i := 0
+ var buf bytes.Buffer
+ // wait for first request
+ n, err := serverEP.RecvFirst()
+ if err != nil {
+ return
+ }
+ for {
+ // read request
+ buf.Reset()
+ buf.Write(serverEP.Data()[:n])
+ fmt.Println(buf.String())
+ // write response
+ buf.Reset()
+ fmt.Fprintf(&buf, "%s%d", respPrefix, i)
+ copy(serverEP.Data(), buf.Bytes())
+ // send response and wait for next request
+ n, err = serverEP.SendRecv(uint32(buf.Len()))
+ if err != nil {
+ return
+ }
+ i++
+ }
+ }()
+ defer func() {
+ serverEP.Shutdown()
+ <-serverDone
+ }()
+
+ var buf bytes.Buffer
+ for i := 0; i < count; i++ {
+ // write request
+ buf.Reset()
+ fmt.Fprintf(&buf, "%s%d", reqPrefix, i)
+ copy(clientEP.Data(), buf.Bytes())
+ // send request and wait for response
+ n, err := clientEP.SendRecv(uint32(buf.Len()))
+ if err != nil {
+ panic(err)
+ }
+ // read response
+ buf.Reset()
+ buf.Write(clientEP.Data()[:n])
+ fmt.Println(buf.String())
+ }
+
+ // Output:
+ // request 0
+ // response 0
+ // request 1
+ // response 1
+ // request 2
+ // response 2
+}
diff --git a/pkg/flipcall/flipcall_test.go b/pkg/flipcall/flipcall_test.go
new file mode 100644
index 000000000..20d3002f0
--- /dev/null
+++ b/pkg/flipcall/flipcall_test.go
@@ -0,0 +1,211 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "testing"
+ "time"
+)
+
+var testPacketWindowSize = pageSize
+
+func testSendRecv(t *testing.T, ctrlMode ControlMode) {
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ t.Fatalf("failed to create PacketWindowAllocator: %v", err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(testPacketWindowSize)
+ if err != nil {
+ t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ }
+
+ sendEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer sendEP.Destroy()
+ recvEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer recvEP.Destroy()
+
+ otherThreadDone := make(chan struct{})
+ go func() {
+ defer func() { otherThreadDone <- struct{}{} }()
+ t.Logf("initially-inactive Endpoint waiting for packet 1")
+ if _, err := recvEP.RecvFirst(); err != nil {
+ t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err)
+ }
+ t.Logf("initially-inactive Endpoint got packet 1, sending packet 2 and waiting for packet 3")
+ if _, err := recvEP.SendRecv(0); err != nil {
+ t.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err)
+ }
+ t.Logf("initially-inactive Endpoint got packet 3")
+ }()
+ defer func() {
+ t.Logf("waiting for initially-inactive Endpoint goroutine to complete")
+ <-otherThreadDone
+ }()
+
+ t.Logf("initially-active Endpoint sending packet 1 and waiting for packet 2")
+ if _, err := sendEP.SendRecv(0); err != nil {
+ t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err)
+ }
+ t.Logf("initially-active Endpoint got packet 2, sending packet 3")
+ if err := sendEP.SendLast(0); err != nil {
+ t.Fatalf("initially-active Endpoint.SendLast() failed: %v", err)
+ }
+}
+
+func TestFutexSendRecv(t *testing.T) {
+ testSendRecv(t, ControlModeFutex)
+}
+
+func testRecvFirstShutdown(t *testing.T, ctrlMode ControlMode) {
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ t.Fatalf("failed to create PacketWindowAllocator: %v", err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(testPacketWindowSize)
+ if err != nil {
+ t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ }
+
+ ep, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer ep.Destroy()
+
+ otherThreadDone := make(chan struct{})
+ go func() {
+ defer func() { otherThreadDone <- struct{}{} }()
+ _, err := ep.RecvFirst()
+ if err == nil {
+ t.Errorf("Endpoint.RecvFirst() succeeded unexpectedly")
+ }
+ }()
+
+ time.Sleep(time.Second) // to ensure ep.RecvFirst() has blocked
+ ep.Shutdown()
+ <-otherThreadDone
+}
+
+func TestFutexRecvFirstShutdown(t *testing.T) {
+ testRecvFirstShutdown(t, ControlModeFutex)
+}
+
+func testSendRecvShutdown(t *testing.T, ctrlMode ControlMode) {
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ t.Fatalf("failed to create PacketWindowAllocator: %v", err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(testPacketWindowSize)
+ if err != nil {
+ t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ }
+
+ sendEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer sendEP.Destroy()
+ recvEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ t.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer recvEP.Destroy()
+
+ otherThreadDone := make(chan struct{})
+ go func() {
+ defer func() { otherThreadDone <- struct{}{} }()
+ if _, err := recvEP.RecvFirst(); err != nil {
+ t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err)
+ }
+ if _, err := recvEP.SendRecv(0); err == nil {
+ t.Errorf("initially-inactive Endpoint.SendRecv() succeeded unexpectedly")
+ }
+ }()
+
+ if _, err := sendEP.SendRecv(0); err != nil {
+ t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err)
+ }
+ time.Sleep(time.Second) // to ensure recvEP.SendRecv() has blocked
+ recvEP.Shutdown()
+ <-otherThreadDone
+}
+
+func TestFutexSendRecvShutdown(t *testing.T) {
+ testSendRecvShutdown(t, ControlModeFutex)
+}
+
+func benchmarkSendRecv(b *testing.B, ctrlMode ControlMode) {
+ pwa, err := NewPacketWindowAllocator()
+ if err != nil {
+ b.Fatalf("failed to create PacketWindowAllocator: %v", err)
+ }
+ defer pwa.Destroy()
+ pwd, err := pwa.Allocate(testPacketWindowSize)
+ if err != nil {
+ b.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ }
+
+ sendEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ b.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer sendEP.Destroy()
+ recvEP, err := NewEndpoint(ctrlMode, pwd)
+ if err != nil {
+ b.Fatalf("failed to create Endpoint: %v", err)
+ }
+ defer recvEP.Destroy()
+
+ otherThreadDone := make(chan struct{})
+ go func() {
+ defer func() { otherThreadDone <- struct{}{} }()
+ if b.N == 0 {
+ return
+ }
+ if _, err := recvEP.RecvFirst(); err != nil {
+ b.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err)
+ }
+ for i := 1; i < b.N; i++ {
+ if _, err := recvEP.SendRecv(0); err != nil {
+ b.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err)
+ }
+ }
+ if err := recvEP.SendLast(0); err != nil {
+ b.Fatalf("initially-inactive Endpoint.SendLast() failed: %v", err)
+ }
+ }()
+ defer func() { <-otherThreadDone }()
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if _, err := sendEP.SendRecv(0); err != nil {
+ b.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err)
+ }
+ }
+ b.StopTimer()
+}
+
+func BenchmarkFutexSendRecv(b *testing.B) {
+ benchmarkSendRecv(b, ControlModeFutex)
+}
diff --git a/pkg/flipcall/futex_linux.go b/pkg/flipcall/futex_linux.go
new file mode 100644
index 000000000..3f592ad16
--- /dev/null
+++ b/pkg/flipcall/futex_linux.go
@@ -0,0 +1,94 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+package flipcall
+
+import (
+ "fmt"
+ "math"
+ "sync/atomic"
+ "syscall"
+
+ "gvisor.dev/gvisor/pkg/abi/linux"
+ "gvisor.dev/gvisor/pkg/log"
+)
+
+func (ep *Endpoint) doFutexRoundTrip() error {
+ ourSeq, err := ep.doFutexNotifySeq()
+ if err != nil {
+ return err
+ }
+ return ep.doFutexWaitSeq(ourSeq)
+}
+
+func (ep *Endpoint) doFutexWaitFirst() error {
+ return ep.doFutexWaitSeq(0)
+}
+
+func (ep *Endpoint) doFutexNotifyLast() error {
+ _, err := ep.doFutexNotifySeq()
+ return err
+}
+
+func (ep *Endpoint) doFutexNotifySeq() (uint32, error) {
+ ourSeq := atomic.AddUint32(ep.seq(), 1)
+ if err := ep.futexWake(1); err != nil {
+ return ourSeq, fmt.Errorf("failed to FUTEX_WAKE peer Endpoint: %v", err)
+ }
+ return ourSeq, nil
+}
+
+func (ep *Endpoint) doFutexWaitSeq(prevSeq uint32) error {
+ nextSeq := prevSeq + 1
+ for {
+ if ep.isShutdown() {
+ return endpointShutdownError{}
+ }
+ if err := ep.futexWait(prevSeq); err != nil {
+ return fmt.Errorf("failed to FUTEX_WAIT for peer Endpoint: %v", err)
+ }
+ seq := atomic.LoadUint32(ep.seq())
+ if seq == nextSeq {
+ return nil
+ }
+ if seq != prevSeq {
+ return fmt.Errorf("invalid packet sequence number %d (expected %d or %d)", seq, prevSeq, nextSeq)
+ }
+ }
+}
+
+func (ep *Endpoint) doFutexInterruptForShutdown() {
+ // Wake MaxInt32 threads to prevent a malicious or broken peer from
+ // swallowing our wakeup by FUTEX_WAITing from multiple threads.
+ if err := ep.futexWake(math.MaxInt32); err != nil {
+ log.Warningf("failed to FUTEX_WAKE Endpoint: %v", err)
+ }
+}
+
+func (ep *Endpoint) futexWake(numThreads int32) error {
+ if _, _, e := syscall.RawSyscall(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAKE, uintptr(numThreads)); e != 0 {
+ return e
+ }
+ return nil
+}
+
+func (ep *Endpoint) futexWait(seq uint32) error {
+ _, _, e := syscall.Syscall6(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAIT, uintptr(seq), 0, 0, 0)
+ if e != 0 && e != syscall.EAGAIN && e != syscall.EINTR {
+ return e
+ }
+ return nil
+}
diff --git a/pkg/flipcall/packet_window_allocator.go b/pkg/flipcall/packet_window_allocator.go
new file mode 100644
index 000000000..7b455b24d
--- /dev/null
+++ b/pkg/flipcall/packet_window_allocator.go
@@ -0,0 +1,166 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "fmt"
+ "math/bits"
+ "os"
+ "syscall"
+
+ "gvisor.dev/gvisor/pkg/abi/linux"
+ "gvisor.dev/gvisor/pkg/memutil"
+)
+
+var (
+ pageSize = os.Getpagesize()
+ pageMask = pageSize - 1
+)
+
+func init() {
+ if bits.OnesCount(uint(pageSize)) != 1 {
+ // This is depended on by roundUpToPage().
+ panic(fmt.Sprintf("system page size (%d) is not a power of 2", pageSize))
+ }
+ if uintptr(pageSize) < packetHeaderBytes {
+ // This is required since Endpoint.Init() imposes a minimum packet
+ // window size of 1 page.
+ panic(fmt.Sprintf("system page size (%d) is less than packet header size (%d)", pageSize, packetHeaderBytes))
+ }
+}
+
+// PacketWindowDescriptor represents a packet window, a range of pages in a
+// shared memory file that is used to exchange packets between partner
+// Endpoints.
+type PacketWindowDescriptor struct {
+ // FD is the file descriptor representing the shared memory file.
+ FD int
+
+ // Offset is the offset into the shared memory file at which the packet
+ // window begins.
+ Offset int64
+
+ // Length is the size of the packet window in bytes.
+ Length int
+}
+
+// PacketWindowLengthForDataCap returns the minimum packet window size required
+// to accommodate datagrams of the given size in bytes.
+func PacketWindowLengthForDataCap(dataCap uint32) int {
+ return roundUpToPage(int(dataCap) + int(packetHeaderBytes))
+}
+
+func roundUpToPage(x int) int {
+ return (x + pageMask) &^ pageMask
+}
+
+// A PacketWindowAllocator owns a shared memory file, and allocates packet
+// windows from it.
+type PacketWindowAllocator struct {
+ fd int
+ nextAlloc int64
+ fileSize int64
+}
+
+// Init must be called on zero-value PacketWindowAllocators before first use.
+// If it succeeds, Destroy() must be called once the PacketWindowAllocator is
+// no longer in use.
+func (pwa *PacketWindowAllocator) Init() error {
+ fd, err := memutil.CreateMemFD("flipcall_packet_windows", linux.MFD_CLOEXEC|linux.MFD_ALLOW_SEALING)
+ if err != nil {
+ return fmt.Errorf("failed to create memfd: %v", err)
+ }
+ // Apply F_SEAL_SHRINK to prevent either party from causing SIGBUS in the
+ // other by truncating the file, and F_SEAL_SEAL to prevent either party
+ // from applying F_SEAL_GROW or F_SEAL_WRITE.
+ if _, _, e := syscall.RawSyscall(syscall.SYS_FCNTL, uintptr(fd), linux.F_ADD_SEALS, linux.F_SEAL_SHRINK|linux.F_SEAL_SEAL); e != 0 {
+ syscall.Close(fd)
+ return fmt.Errorf("failed to apply memfd seals: %v", e)
+ }
+ pwa.fd = fd
+ return nil
+}
+
+// NewPacketWindowAllocator is a convenience function that returns an
+// initialized PacketWindowAllocator allocated on the heap.
+func NewPacketWindowAllocator() (*PacketWindowAllocator, error) {
+ var pwa PacketWindowAllocator
+ if err := pwa.Init(); err != nil {
+ return nil, err
+ }
+ return &pwa, nil
+}
+
+// Destroy releases resources owned by pwa. This invalidates file descriptors
+// previously returned by pwa.FD() and pwd.Allocate().
+func (pwa *PacketWindowAllocator) Destroy() {
+ syscall.Close(pwa.fd)
+}
+
+// FD represents the file descriptor of the shared memory file backing pwa.
+func (pwa *PacketWindowAllocator) FD() int {
+ return pwa.fd
+}
+
+// Allocate allocates a new packet window of at least the given size and
+// returns a PacketWindowDescriptor representing it.
+//
+// Preconditions: size > 0.
+func (pwa *PacketWindowAllocator) Allocate(size int) (PacketWindowDescriptor, error) {
+ if size <= 0 {
+ return PacketWindowDescriptor{}, fmt.Errorf("invalid size: %d", size)
+ }
+ // Page-align size to ensure that pwa.nextAlloc remains page-aligned.
+ size = roundUpToPage(size)
+ if size <= 0 {
+ return PacketWindowDescriptor{}, fmt.Errorf("size %d overflows after rounding up to page size", size)
+ }
+ end := pwa.nextAlloc + int64(size) // overflow checked by ensureFileSize
+ if err := pwa.ensureFileSize(end); err != nil {
+ return PacketWindowDescriptor{}, err
+ }
+ start := pwa.nextAlloc
+ pwa.nextAlloc = end
+ return PacketWindowDescriptor{
+ FD: pwa.fd,
+ Offset: start,
+ Length: size,
+ }, nil
+}
+
+func (pwa *PacketWindowAllocator) ensureFileSize(min int64) error {
+ if min <= 0 {
+ return fmt.Errorf("file size would overflow")
+ }
+ if pwa.fileSize >= min {
+ return nil
+ }
+ newSize := 2 * pwa.fileSize
+ if newSize == 0 {
+ newSize = int64(pageSize)
+ }
+ for newSize < min {
+ newNewSize := newSize * 2
+ if newNewSize <= 0 {
+ return fmt.Errorf("file size would overflow")
+ }
+ newSize = newNewSize
+ }
+ if err := syscall.Ftruncate(pwa.fd, newSize); err != nil {
+ return fmt.Errorf("ftruncate failed: %v", err)
+ }
+ pwa.fileSize = newSize
+ return nil
+}