summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/flipcall/BUILD5
-rw-r--r--pkg/flipcall/ctrl_futex.go146
-rw-r--r--pkg/flipcall/endpoint_futex.go45
-rw-r--r--pkg/flipcall/endpoint_unsafe.go238
-rw-r--r--pkg/flipcall/flipcall.go219
-rw-r--r--pkg/flipcall/flipcall_example_test.go20
-rw-r--r--pkg/flipcall/flipcall_test.go299
-rw-r--r--pkg/flipcall/flipcall_unsafe.go69
-rw-r--r--pkg/flipcall/futex_linux.go103
-rw-r--r--pkg/flipcall/io.go113
-rw-r--r--pkg/flipcall/packet_window_allocator.go6
11 files changed, 786 insertions, 477 deletions
diff --git a/pkg/flipcall/BUILD b/pkg/flipcall/BUILD
index 7126fc45f..bd1d614b6 100644
--- a/pkg/flipcall/BUILD
+++ b/pkg/flipcall/BUILD
@@ -5,10 +5,11 @@ package(licenses = ["notice"])
go_library(
name = "flipcall",
srcs = [
- "endpoint_futex.go",
- "endpoint_unsafe.go",
+ "ctrl_futex.go",
"flipcall.go",
+ "flipcall_unsafe.go",
"futex_linux.go",
+ "io.go",
"packet_window_allocator.go",
],
importpath = "gvisor.dev/gvisor/pkg/flipcall",
diff --git a/pkg/flipcall/ctrl_futex.go b/pkg/flipcall/ctrl_futex.go
new file mode 100644
index 000000000..865b6f640
--- /dev/null
+++ b/pkg/flipcall/ctrl_futex.go
@@ -0,0 +1,146 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "encoding/json"
+ "fmt"
+ "math"
+ "sync/atomic"
+
+ "gvisor.dev/gvisor/pkg/log"
+)
+
+type endpointControlImpl struct {
+ state int32
+}
+
+// Bits in endpointControlImpl.state.
+const (
+ epsBlocked = 1 << iota
+ epsShutdown
+)
+
+func (ep *Endpoint) ctrlInit(opts ...EndpointOption) error {
+ if len(opts) != 0 {
+ return fmt.Errorf("unknown EndpointOption: %T", opts[0])
+ }
+ return nil
+}
+
+type ctrlHandshakeRequest struct{}
+
+type ctrlHandshakeResponse struct{}
+
+func (ep *Endpoint) ctrlConnect() error {
+ if err := ep.enterFutexWait(); err != nil {
+ return err
+ }
+ _, err := ep.futexConnect(&ctrlHandshakeRequest{})
+ ep.exitFutexWait()
+ return err
+}
+
+func (ep *Endpoint) ctrlWaitFirst() error {
+ if err := ep.enterFutexWait(); err != nil {
+ return err
+ }
+ defer ep.exitFutexWait()
+
+ // Wait for the handshake request.
+ if err := ep.futexSwitchFromPeer(); err != nil {
+ return err
+ }
+
+ // Read the handshake request.
+ reqLen := atomic.LoadUint32(ep.dataLen())
+ if reqLen > ep.dataCap {
+ return fmt.Errorf("invalid handshake request length %d (maximum %d)", reqLen, ep.dataCap)
+ }
+ var req ctrlHandshakeRequest
+ if err := json.NewDecoder(ep.NewReader(reqLen)).Decode(&req); err != nil {
+ return fmt.Errorf("error reading handshake request: %v", err)
+ }
+
+ // Write the handshake response.
+ w := ep.NewWriter()
+ if err := json.NewEncoder(w).Encode(ctrlHandshakeResponse{}); err != nil {
+ return fmt.Errorf("error writing handshake response: %v", err)
+ }
+ *ep.dataLen() = w.Len()
+
+ // Return control to the client.
+ if err := ep.futexSwitchToPeer(); err != nil {
+ return err
+ }
+
+ // Wait for the first non-handshake message.
+ return ep.futexSwitchFromPeer()
+}
+
+func (ep *Endpoint) ctrlRoundTrip() error {
+ if err := ep.futexSwitchToPeer(); err != nil {
+ return err
+ }
+ if err := ep.enterFutexWait(); err != nil {
+ return err
+ }
+ err := ep.futexSwitchFromPeer()
+ ep.exitFutexWait()
+ return err
+}
+
+func (ep *Endpoint) ctrlWakeLast() error {
+ return ep.futexSwitchToPeer()
+}
+
+func (ep *Endpoint) enterFutexWait() error {
+ switch eps := atomic.AddInt32(&ep.ctrl.state, epsBlocked); eps {
+ case epsBlocked:
+ return nil
+ case epsBlocked | epsShutdown:
+ atomic.AddInt32(&ep.ctrl.state, -epsBlocked)
+ return shutdownError{}
+ default:
+ // Most likely due to ep.enterFutexWait() being called concurrently
+ // from multiple goroutines.
+ panic(fmt.Sprintf("invalid flipcall.Endpoint.ctrl.state before flipcall.Endpoint.enterFutexWait(): %v", eps-epsBlocked))
+ }
+}
+
+func (ep *Endpoint) exitFutexWait() {
+ atomic.AddInt32(&ep.ctrl.state, -epsBlocked)
+}
+
+func (ep *Endpoint) ctrlShutdown() {
+ // Set epsShutdown to ensure that future calls to ep.enterFutexWait() fail.
+ if atomic.AddInt32(&ep.ctrl.state, epsShutdown)&epsBlocked != 0 {
+ // Wake the blocked thread. This must loop because it's possible that
+ // FUTEX_WAKE occurs after the waiter sets epsBlocked, but before it
+ // blocks in FUTEX_WAIT.
+ for {
+ // Wake MaxInt32 threads to prevent a broken or malicious peer from
+ // swallowing our wakeup by FUTEX_WAITing from multiple threads.
+ if err := ep.futexWakeConnState(math.MaxInt32); err != nil {
+ log.Warningf("failed to FUTEX_WAKE Endpoints: %v", err)
+ break
+ }
+ yieldThread()
+ if atomic.LoadInt32(&ep.ctrl.state)&epsBlocked == 0 {
+ break
+ }
+ }
+ }
+}
diff --git a/pkg/flipcall/endpoint_futex.go b/pkg/flipcall/endpoint_futex.go
deleted file mode 100644
index 5cab02b1d..000000000
--- a/pkg/flipcall/endpoint_futex.go
+++ /dev/null
@@ -1,45 +0,0 @@
-// Copyright 2019 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package flipcall
-
-import (
- "fmt"
-)
-
-type endpointControlState struct{}
-
-func (ep *Endpoint) initControlState(ctrlMode ControlMode) error {
- if ctrlMode != ControlModeFutex {
- return fmt.Errorf("unsupported control mode: %v", ctrlMode)
- }
- return nil
-}
-
-func (ep *Endpoint) doRoundTrip() error {
- return ep.doFutexRoundTrip()
-}
-
-func (ep *Endpoint) doWaitFirst() error {
- return ep.doFutexWaitFirst()
-}
-
-func (ep *Endpoint) doNotifyLast() error {
- return ep.doFutexNotifyLast()
-}
-
-// Preconditions: ep.isShutdown() == true.
-func (ep *Endpoint) interruptForShutdown() {
- ep.doFutexInterruptForShutdown()
-}
diff --git a/pkg/flipcall/endpoint_unsafe.go b/pkg/flipcall/endpoint_unsafe.go
deleted file mode 100644
index 8319955e0..000000000
--- a/pkg/flipcall/endpoint_unsafe.go
+++ /dev/null
@@ -1,238 +0,0 @@
-// Copyright 2019 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package flipcall
-
-import (
- "fmt"
- "math"
- "reflect"
- "sync/atomic"
- "syscall"
- "unsafe"
-)
-
-// An Endpoint provides the ability to synchronously transfer data and control
-// to a connected peer Endpoint, which may be in another process.
-//
-// Since the Endpoint control transfer model is synchronous, at any given time
-// one Endpoint "has control" (designated the *active* Endpoint), and the other
-// is "waiting for control" (designated the *inactive* Endpoint). Users of the
-// flipcall package arbitrarily designate one Endpoint as initially-active, and
-// the other as initially-inactive; in a client/server protocol, the client
-// Endpoint is usually initially-active (able to send a request) and the server
-// Endpoint is usually initially-inactive (waiting for a request). The
-// initially-active Endpoint writes data to be sent to Endpoint.Data(), and
-// then synchronously transfers control to the inactive Endpoint by calling
-// Endpoint.SendRecv(), becoming the inactive Endpoint in the process. The
-// initially-inactive Endpoint waits for control by calling
-// Endpoint.RecvFirst(); receiving control causes it to become the active
-// Endpoint. After this, the protocol is symmetric: the active Endpoint reads
-// data sent by the peer by reading from Endpoint.Data(), writes data to be
-// sent to the peer into Endpoint.Data(), and then calls Endpoint.SendRecv() to
-// exchange roles with the peer, which blocks until the peer has done the same.
-type Endpoint struct {
- // shutdown is non-zero if Endpoint.Shutdown() has been called. shutdown is
- // accessed using atomic memory operations.
- shutdown uint32
-
- // dataCap is the size of the datagram part of the packet window in bytes.
- // dataCap is immutable.
- dataCap uint32
-
- // packet is the beginning of the packet window. packet is immutable.
- packet unsafe.Pointer
-
- ctrl endpointControlState
-}
-
-// Init must be called on zero-value Endpoints before first use. If it
-// succeeds, Destroy() must be called once the Endpoint is no longer in use.
-//
-// ctrlMode specifies how connected Endpoints will exchange control. Both
-// connected Endpoints must specify the same value for ctrlMode.
-//
-// pwd represents the packet window used to exchange data with the peer
-// Endpoint. FD may differ between Endpoints if they are in different
-// processes, but must represent the same file. The packet window must
-// initially be filled with zero bytes.
-func (ep *Endpoint) Init(ctrlMode ControlMode, pwd PacketWindowDescriptor) error {
- if pwd.Length < pageSize {
- return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize)
- }
- if pwd.Length > math.MaxUint32 {
- return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32)
- }
- m, _, e := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset))
- if e != 0 {
- return fmt.Errorf("failed to mmap packet window: %v", e)
- }
- ep.dataCap = uint32(pwd.Length) - uint32(packetHeaderBytes)
- ep.packet = (unsafe.Pointer)(m)
- if err := ep.initControlState(ctrlMode); err != nil {
- ep.unmapPacket()
- return err
- }
- return nil
-}
-
-// NewEndpoint is a convenience function that returns an initialized Endpoint
-// allocated on the heap.
-func NewEndpoint(ctrlMode ControlMode, pwd PacketWindowDescriptor) (*Endpoint, error) {
- var ep Endpoint
- if err := ep.Init(ctrlMode, pwd); err != nil {
- return nil, err
- }
- return &ep, nil
-}
-
-func (ep *Endpoint) unmapPacket() {
- syscall.Syscall(syscall.SYS_MUNMAP, uintptr(ep.packet), uintptr(ep.dataCap)+packetHeaderBytes, 0)
- ep.dataCap = 0
- ep.packet = nil
-}
-
-// Destroy releases resources owned by ep. No other Endpoint methods may be
-// called after Destroy.
-func (ep *Endpoint) Destroy() {
- ep.unmapPacket()
-}
-
-// Packets consist of an 8-byte header followed by an arbitrarily-sized
-// datagram. The header consists of:
-//
-// - A 4-byte native-endian sequence number, which is incremented by the active
-// Endpoint after it finishes writing to the packet window. The sequence number
-// is needed to handle spurious wakeups.
-//
-// - A 4-byte native-endian datagram length in bytes.
-const (
- sizeofUint32 = unsafe.Sizeof(uint32(0))
- packetHeaderBytes = 2 * sizeofUint32
-)
-
-func (ep *Endpoint) seq() *uint32 {
- return (*uint32)(ep.packet)
-}
-
-func (ep *Endpoint) dataLen() *uint32 {
- return (*uint32)((unsafe.Pointer)(uintptr(ep.packet) + sizeofUint32))
-}
-
-// DataCap returns the maximum datagram size supported by ep in bytes.
-func (ep *Endpoint) DataCap() uint32 {
- return ep.dataCap
-}
-
-func (ep *Endpoint) data() unsafe.Pointer {
- return unsafe.Pointer(uintptr(ep.packet) + packetHeaderBytes)
-}
-
-// Data returns the datagram part of ep's packet window as a byte slice.
-//
-// Note that the packet window is shared with the potentially-untrusted peer
-// Endpoint, which may concurrently mutate the contents of the packet window.
-// Thus:
-//
-// - Readers must not assume that two reads of the same byte in Data() will
-// return the same result. In other words, readers should read any given byte
-// in Data() at most once.
-//
-// - Writers must not assume that they will read back the same data that they
-// have written. In other words, writers should avoid reading from Data() at
-// all.
-func (ep *Endpoint) Data() []byte {
- var bs []byte
- bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs))
- bsReflect.Data = uintptr(ep.data())
- bsReflect.Len = int(ep.DataCap())
- bsReflect.Cap = bsReflect.Len
- return bs
-}
-
-// SendRecv transfers control to the peer Endpoint, causing its call to
-// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given
-// datagram length, then blocks until the peer Endpoint calls
-// Endpoint.SendRecv() or Endpoint.SendLast().
-//
-// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
-// returned an error. ep.SendLast() has never been called.
-func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
- dataCap := ep.DataCap()
- if dataLen > dataCap {
- return 0, fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
- }
- atomic.StoreUint32(ep.dataLen(), dataLen)
- if err := ep.doRoundTrip(); err != nil {
- return 0, err
- }
- recvDataLen := atomic.LoadUint32(ep.dataLen())
- if recvDataLen > dataCap {
- return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
- }
- return recvDataLen, nil
-}
-
-// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then
-// returns the datagram length specified by that call.
-//
-// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never
-// been called.
-func (ep *Endpoint) RecvFirst() (uint32, error) {
- if err := ep.doWaitFirst(); err != nil {
- return 0, err
- }
- recvDataLen := atomic.LoadUint32(ep.dataLen())
- if dataCap := ep.DataCap(); recvDataLen > dataCap {
- return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
- }
- return recvDataLen, nil
-}
-
-// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or
-// Endpoint.RecvFirst() to return with the given datagram length.
-//
-// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
-// returned an error. ep.SendLast() has never been called.
-func (ep *Endpoint) SendLast(dataLen uint32) error {
- dataCap := ep.DataCap()
- if dataLen > dataCap {
- return fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
- }
- atomic.StoreUint32(ep.dataLen(), dataLen)
- if err := ep.doNotifyLast(); err != nil {
- return err
- }
- return nil
-}
-
-// Shutdown causes concurrent and future calls to ep.SendRecv(),
-// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not
-// wait for concurrent calls to return.
-func (ep *Endpoint) Shutdown() {
- if atomic.SwapUint32(&ep.shutdown, 1) == 0 {
- ep.interruptForShutdown()
- }
-}
-
-func (ep *Endpoint) isShutdown() bool {
- return atomic.LoadUint32(&ep.shutdown) != 0
-}
-
-type endpointShutdownError struct{}
-
-// Error implements error.Error.
-func (endpointShutdownError) Error() string {
- return "Endpoint.Shutdown() has been called"
-}
diff --git a/pkg/flipcall/flipcall.go b/pkg/flipcall/flipcall.go
index 79a1e418a..5c9212c33 100644
--- a/pkg/flipcall/flipcall.go
+++ b/pkg/flipcall/flipcall.go
@@ -13,20 +13,217 @@
// limitations under the License.
// Package flipcall implements a protocol providing Fast Local Interprocess
-// Procedure Calls.
+// Procedure Calls between mutually-distrusting processes.
package flipcall
-// ControlMode defines how control is exchanged across a connection.
-type ControlMode uint8
+import (
+ "fmt"
+ "math"
+ "sync/atomic"
+ "syscall"
+)
-const (
- // ControlModeInvalid is invalid, and exists so that ControlMode fields in
- // structs must be explicitly initialized.
- ControlModeInvalid ControlMode = iota
+// An Endpoint provides the ability to synchronously transfer data and control
+// to a connected peer Endpoint, which may be in another process.
+//
+// Since the Endpoint control transfer model is synchronous, at any given time
+// one Endpoint "has control" (designated the active Endpoint), and the other
+// is "waiting for control" (designated the inactive Endpoint). Users of the
+// flipcall package designate one Endpoint as the client, which is initially
+// active, and the other as the server, which is initially inactive. See
+// flipcall_example_test.go for usage.
+type Endpoint struct {
+ // packet is a pointer to the beginning of the packet window. (Since this
+ // is a raw OS memory mapping and not a Go object, it does not need to be
+ // represented as an unsafe.Pointer.) packet is immutable.
+ packet uintptr
+
+ // dataCap is the size of the datagram part of the packet window in bytes.
+ // dataCap is immutable.
+ dataCap uint32
+
+ // shutdown is non-zero if Endpoint.Shutdown() has been called, or if the
+ // Endpoint has acknowledged shutdown initiated by the peer. shutdown is
+ // accessed using atomic memory operations.
+ shutdown uint32
+
+ // activeState is csClientActive if this is a client Endpoint and
+ // csServerActive if this is a server Endpoint.
+ activeState uint32
+
+ // inactiveState is csServerActive if this is a client Endpoint and
+ // csClientActive if this is a server Endpoint.
+ inactiveState uint32
+
+ ctrl endpointControlImpl
+}
+
+// Init must be called on zero-value Endpoints before first use. If it
+// succeeds, ep.Destroy() must be called once the Endpoint is no longer in use.
+//
+// pwd represents the packet window used to exchange data with the peer
+// Endpoint. FD may differ between Endpoints if they are in different
+// processes, but must represent the same file. The packet window must
+// initially be filled with zero bytes.
+func (ep *Endpoint) Init(pwd PacketWindowDescriptor, opts ...EndpointOption) error {
+ if pwd.Length < pageSize {
+ return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize)
+ }
+ if pwd.Length > math.MaxUint32 {
+ return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32)
+ }
+ m, _, e := syscall.RawSyscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset))
+ if e != 0 {
+ return fmt.Errorf("failed to mmap packet window: %v", e)
+ }
+ ep.packet = m
+ ep.dataCap = uint32(pwd.Length) - uint32(PacketHeaderBytes)
+ // These will be overwritten by ep.Connect() for client Endpoints.
+ ep.activeState = csServerActive
+ ep.inactiveState = csClientActive
+ if err := ep.ctrlInit(opts...); err != nil {
+ ep.unmapPacket()
+ return err
+ }
+ return nil
+}
+
+// NewEndpoint is a convenience function that returns an initialized Endpoint
+// allocated on the heap.
+func NewEndpoint(pwd PacketWindowDescriptor, opts ...EndpointOption) (*Endpoint, error) {
+ var ep Endpoint
+ if err := ep.Init(pwd, opts...); err != nil {
+ return nil, err
+ }
+ return &ep, nil
+}
+
+// An EndpointOption configures an Endpoint.
+type EndpointOption interface {
+ isEndpointOption()
+}
+
+// Destroy releases resources owned by ep. No other Endpoint methods may be
+// called after Destroy.
+func (ep *Endpoint) Destroy() {
+ ep.unmapPacket()
+}
+
+func (ep *Endpoint) unmapPacket() {
+ syscall.RawSyscall(syscall.SYS_MUNMAP, ep.packet, uintptr(ep.dataCap)+PacketHeaderBytes, 0)
+ ep.packet = 0
+}
- // ControlModeFutex uses shared futex operations on packet control words.
- ControlModeFutex
+// Shutdown causes concurrent and future calls to ep.Connect(), ep.SendRecv(),
+// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not
+// wait for concurrent calls to return. The effect of Shutdown on the peer
+// Endpoint is unspecified. Successive calls to Shutdown have no effect.
+//
+// Shutdown is the only Endpoint method that may be called concurrently with
+// other methods on the same Endpoint.
+func (ep *Endpoint) Shutdown() {
+ if atomic.SwapUint32(&ep.shutdown, 1) != 0 {
+ // ep.Shutdown() has previously been called.
+ return
+ }
+ ep.ctrlShutdown()
+}
+
+// isShutdownLocally returns true if ep.Shutdown() has been called.
+func (ep *Endpoint) isShutdownLocally() bool {
+ return atomic.LoadUint32(&ep.shutdown) != 0
+}
+
+type shutdownError struct{}
+
+// Error implements error.Error.
+func (shutdownError) Error() string {
+ return "flipcall connection shutdown"
+}
- // controlModeCount is the number of ControlModes in this list.
- controlModeCount
+// DataCap returns the maximum datagram size supported by ep. Equivalently,
+// DataCap returns len(ep.Data()).
+func (ep *Endpoint) DataCap() uint32 {
+ return ep.dataCap
+}
+
+// Connection state.
+const (
+ // The client is, by definition, initially active, so this must be 0.
+ csClientActive = 0
+ csServerActive = 1
)
+
+// Connect designates ep as a client Endpoint and blocks until the peer
+// Endpoint has called Endpoint.RecvFirst().
+//
+// Preconditions: ep.Connect(), ep.RecvFirst(), ep.SendRecv(), and
+// ep.SendLast() have never been called.
+func (ep *Endpoint) Connect() error {
+ ep.activeState = csClientActive
+ ep.inactiveState = csServerActive
+ return ep.ctrlConnect()
+}
+
+// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then
+// returns the datagram length specified by that call.
+//
+// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never
+// been called.
+func (ep *Endpoint) RecvFirst() (uint32, error) {
+ if err := ep.ctrlWaitFirst(); err != nil {
+ return 0, err
+ }
+ recvDataLen := atomic.LoadUint32(ep.dataLen())
+ if recvDataLen > ep.dataCap {
+ return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap)
+ }
+ return recvDataLen, nil
+}
+
+// SendRecv transfers control to the peer Endpoint, causing its call to
+// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given
+// datagram length, then blocks until the peer Endpoint calls
+// Endpoint.SendRecv() or Endpoint.SendLast().
+//
+// Preconditions: dataLen <= ep.DataCap(). No previous call to ep.SendRecv() or
+// ep.RecvFirst() has returned an error. ep.SendLast() has never been called.
+// If ep is a client Endpoint, ep.Connect() has previously been called and
+// returned nil.
+func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
+ if dataLen > ep.dataCap {
+ panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap))
+ }
+ // This store can safely be non-atomic: Under correct operation we should
+ // be the only thread writing ep.dataLen(), and ep.ctrlRoundTrip() will
+ // synchronize with the receiver. We will not read from ep.dataLen() until
+ // after ep.ctrlRoundTrip(), so if the peer is mutating it concurrently then
+ // they can only shoot themselves in the foot.
+ *ep.dataLen() = dataLen
+ if err := ep.ctrlRoundTrip(); err != nil {
+ return 0, err
+ }
+ recvDataLen := atomic.LoadUint32(ep.dataLen())
+ if recvDataLen > ep.dataCap {
+ return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap)
+ }
+ return recvDataLen, nil
+}
+
+// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or
+// Endpoint.RecvFirst() to return with the given datagram length.
+//
+// Preconditions: dataLen <= ep.DataCap(). No previous call to ep.SendRecv() or
+// ep.RecvFirst() has returned an error. ep.SendLast() has never been called.
+// If ep is a client Endpoint, ep.Connect() has previously been called and
+// returned nil.
+func (ep *Endpoint) SendLast(dataLen uint32) error {
+ if dataLen > ep.dataCap {
+ panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap))
+ }
+ *ep.dataLen() = dataLen
+ if err := ep.ctrlWakeLast(); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/pkg/flipcall/flipcall_example_test.go b/pkg/flipcall/flipcall_example_test.go
index 572a1f119..edb6a8bef 100644
--- a/pkg/flipcall/flipcall_example_test.go
+++ b/pkg/flipcall/flipcall_example_test.go
@@ -17,6 +17,7 @@ package flipcall
import (
"bytes"
"fmt"
+ "sync"
)
func Example() {
@@ -36,20 +37,21 @@ func Example() {
if err != nil {
panic(err)
}
- clientEP, err := NewEndpoint(ControlModeFutex, pwd)
- if err != nil {
+ var clientEP Endpoint
+ if err := clientEP.Init(pwd); err != nil {
panic(err)
}
defer clientEP.Destroy()
- serverEP, err := NewEndpoint(ControlModeFutex, pwd)
- if err != nil {
+ var serverEP Endpoint
+ if err := serverEP.Init(pwd); err != nil {
panic(err)
}
defer serverEP.Destroy()
- serverDone := make(chan struct{})
+ var serverRun sync.WaitGroup
+ serverRun.Add(1)
go func() {
- defer func() { serverDone <- struct{}{} }()
+ defer serverRun.Done()
i := 0
var buf bytes.Buffer
// wait for first request
@@ -76,9 +78,13 @@ func Example() {
}()
defer func() {
serverEP.Shutdown()
- <-serverDone
+ serverRun.Wait()
}()
+ // establish connection as client
+ if err := clientEP.Connect(); err != nil {
+ panic(err)
+ }
var buf bytes.Buffer
for i := 0; i < count; i++ {
// write request
diff --git a/pkg/flipcall/flipcall_test.go b/pkg/flipcall/flipcall_test.go
index 20d3002f0..da9d736ab 100644
--- a/pkg/flipcall/flipcall_test.go
+++ b/pkg/flipcall/flipcall_test.go
@@ -15,197 +15,240 @@
package flipcall
import (
+ "runtime"
+ "sync"
"testing"
"time"
)
var testPacketWindowSize = pageSize
-func testSendRecv(t *testing.T, ctrlMode ControlMode) {
- pwa, err := NewPacketWindowAllocator()
- if err != nil {
- t.Fatalf("failed to create PacketWindowAllocator: %v", err)
+type testConnection struct {
+ pwa PacketWindowAllocator
+ clientEP Endpoint
+ serverEP Endpoint
+}
+
+func newTestConnectionWithOptions(tb testing.TB, clientOpts, serverOpts []EndpointOption) *testConnection {
+ c := &testConnection{}
+ if err := c.pwa.Init(); err != nil {
+ tb.Fatalf("failed to create PacketWindowAllocator: %v", err)
}
- defer pwa.Destroy()
- pwd, err := pwa.Allocate(testPacketWindowSize)
+ pwd, err := c.pwa.Allocate(testPacketWindowSize)
if err != nil {
- t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ c.pwa.Destroy()
+ tb.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
}
-
- sendEP, err := NewEndpoint(ctrlMode, pwd)
- if err != nil {
- t.Fatalf("failed to create Endpoint: %v", err)
+ if err := c.clientEP.Init(pwd, clientOpts...); err != nil {
+ c.pwa.Destroy()
+ tb.Fatalf("failed to create client Endpoint: %v", err)
}
- defer sendEP.Destroy()
- recvEP, err := NewEndpoint(ctrlMode, pwd)
- if err != nil {
- t.Fatalf("failed to create Endpoint: %v", err)
+ if err := c.serverEP.Init(pwd, serverOpts...); err != nil {
+ c.pwa.Destroy()
+ c.clientEP.Destroy()
+ tb.Fatalf("failed to create server Endpoint: %v", err)
}
- defer recvEP.Destroy()
+ return c
+}
- otherThreadDone := make(chan struct{})
+func newTestConnection(tb testing.TB) *testConnection {
+ return newTestConnectionWithOptions(tb, nil, nil)
+}
+
+func (c *testConnection) destroy() {
+ c.pwa.Destroy()
+ c.clientEP.Destroy()
+ c.serverEP.Destroy()
+}
+
+func testSendRecv(t *testing.T, c *testConnection) {
+ var serverRun sync.WaitGroup
+ serverRun.Add(1)
go func() {
- defer func() { otherThreadDone <- struct{}{} }()
- t.Logf("initially-inactive Endpoint waiting for packet 1")
- if _, err := recvEP.RecvFirst(); err != nil {
- t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err)
+ defer serverRun.Done()
+ t.Logf("server Endpoint waiting for packet 1")
+ if _, err := c.serverEP.RecvFirst(); err != nil {
+ t.Fatalf("server Endpoint.RecvFirst() failed: %v", err)
}
- t.Logf("initially-inactive Endpoint got packet 1, sending packet 2 and waiting for packet 3")
- if _, err := recvEP.SendRecv(0); err != nil {
- t.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err)
+ t.Logf("server Endpoint got packet 1, sending packet 2 and waiting for packet 3")
+ if _, err := c.serverEP.SendRecv(0); err != nil {
+ t.Fatalf("server Endpoint.SendRecv() failed: %v", err)
}
- t.Logf("initially-inactive Endpoint got packet 3")
+ t.Logf("server Endpoint got packet 3")
}()
defer func() {
- t.Logf("waiting for initially-inactive Endpoint goroutine to complete")
- <-otherThreadDone
+ // Ensure that the server goroutine is cleaned up before
+ // c.serverEP.Destroy(), even if the test fails.
+ c.serverEP.Shutdown()
+ serverRun.Wait()
}()
- t.Logf("initially-active Endpoint sending packet 1 and waiting for packet 2")
- if _, err := sendEP.SendRecv(0); err != nil {
- t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err)
+ t.Logf("client Endpoint establishing connection")
+ if err := c.clientEP.Connect(); err != nil {
+ t.Fatalf("client Endpoint.Connect() failed: %v", err)
}
- t.Logf("initially-active Endpoint got packet 2, sending packet 3")
- if err := sendEP.SendLast(0); err != nil {
- t.Fatalf("initially-active Endpoint.SendLast() failed: %v", err)
+ t.Logf("client Endpoint sending packet 1 and waiting for packet 2")
+ if _, err := c.clientEP.SendRecv(0); err != nil {
+ t.Fatalf("client Endpoint.SendRecv() failed: %v", err)
}
+ t.Logf("client Endpoint got packet 2, sending packet 3")
+ if err := c.clientEP.SendLast(0); err != nil {
+ t.Fatalf("client Endpoint.SendLast() failed: %v", err)
+ }
+ t.Logf("waiting for server goroutine to complete")
+ serverRun.Wait()
}
-func TestFutexSendRecv(t *testing.T) {
- testSendRecv(t, ControlModeFutex)
+func TestSendRecv(t *testing.T) {
+ c := newTestConnection(t)
+ defer c.destroy()
+ testSendRecv(t, c)
}
-func testRecvFirstShutdown(t *testing.T, ctrlMode ControlMode) {
- pwa, err := NewPacketWindowAllocator()
- if err != nil {
- t.Fatalf("failed to create PacketWindowAllocator: %v", err)
- }
- defer pwa.Destroy()
- pwd, err := pwa.Allocate(testPacketWindowSize)
- if err != nil {
- t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
- }
+func testShutdownConnect(t *testing.T, c *testConnection) {
+ var clientRun sync.WaitGroup
+ clientRun.Add(1)
+ go func() {
+ defer clientRun.Done()
+ if err := c.clientEP.Connect(); err == nil {
+ t.Errorf("client Endpoint.Connect() succeeded unexpectedly")
+ }
+ }()
+ time.Sleep(time.Second) // to allow c.clientEP.Connect() to block
+ c.clientEP.Shutdown()
+ clientRun.Wait()
+}
- ep, err := NewEndpoint(ctrlMode, pwd)
- if err != nil {
- t.Fatalf("failed to create Endpoint: %v", err)
- }
- defer ep.Destroy()
+func TestShutdownConnect(t *testing.T) {
+ c := newTestConnection(t)
+ defer c.destroy()
+ testShutdownConnect(t, c)
+}
- otherThreadDone := make(chan struct{})
+func testShutdownRecvFirstBeforeConnect(t *testing.T, c *testConnection) {
+ var serverRun sync.WaitGroup
+ serverRun.Add(1)
go func() {
- defer func() { otherThreadDone <- struct{}{} }()
- _, err := ep.RecvFirst()
+ defer serverRun.Done()
+ _, err := c.serverEP.RecvFirst()
if err == nil {
- t.Errorf("Endpoint.RecvFirst() succeeded unexpectedly")
+ t.Errorf("server Endpoint.RecvFirst() succeeded unexpectedly")
}
}()
-
- time.Sleep(time.Second) // to ensure ep.RecvFirst() has blocked
- ep.Shutdown()
- <-otherThreadDone
+ time.Sleep(time.Second) // to allow c.serverEP.RecvFirst() to block
+ c.serverEP.Shutdown()
+ serverRun.Wait()
}
-func TestFutexRecvFirstShutdown(t *testing.T) {
- testRecvFirstShutdown(t, ControlModeFutex)
+func TestShutdownRecvFirstBeforeConnect(t *testing.T) {
+ c := newTestConnection(t)
+ defer c.destroy()
+ testShutdownRecvFirstBeforeConnect(t, c)
}
-func testSendRecvShutdown(t *testing.T, ctrlMode ControlMode) {
- pwa, err := NewPacketWindowAllocator()
- if err != nil {
- t.Fatalf("failed to create PacketWindowAllocator: %v", err)
- }
- defer pwa.Destroy()
- pwd, err := pwa.Allocate(testPacketWindowSize)
- if err != nil {
- t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
- }
-
- sendEP, err := NewEndpoint(ctrlMode, pwd)
- if err != nil {
- t.Fatalf("failed to create Endpoint: %v", err)
- }
- defer sendEP.Destroy()
- recvEP, err := NewEndpoint(ctrlMode, pwd)
- if err != nil {
- t.Fatalf("failed to create Endpoint: %v", err)
- }
- defer recvEP.Destroy()
-
- otherThreadDone := make(chan struct{})
+func testShutdownRecvFirstAfterConnect(t *testing.T, c *testConnection) {
+ var serverRun sync.WaitGroup
+ serverRun.Add(1)
go func() {
- defer func() { otherThreadDone <- struct{}{} }()
- if _, err := recvEP.RecvFirst(); err != nil {
- t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err)
- }
- if _, err := recvEP.SendRecv(0); err == nil {
- t.Errorf("initially-inactive Endpoint.SendRecv() succeeded unexpectedly")
+ defer serverRun.Done()
+ if _, err := c.serverEP.RecvFirst(); err == nil {
+ t.Fatalf("server Endpoint.RecvFirst() succeeded unexpectedly")
}
}()
-
- if _, err := sendEP.SendRecv(0); err != nil {
- t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err)
+ defer func() {
+ // Ensure that the server goroutine is cleaned up before
+ // c.serverEP.Destroy(), even if the test fails.
+ c.serverEP.Shutdown()
+ serverRun.Wait()
+ }()
+ if err := c.clientEP.Connect(); err != nil {
+ t.Fatalf("client Endpoint.Connect() failed: %v", err)
}
- time.Sleep(time.Second) // to ensure recvEP.SendRecv() has blocked
- recvEP.Shutdown()
- <-otherThreadDone
+ c.serverEP.Shutdown()
+ serverRun.Wait()
}
-func TestFutexSendRecvShutdown(t *testing.T) {
- testSendRecvShutdown(t, ControlModeFutex)
+func TestShutdownRecvFirstAfterConnect(t *testing.T) {
+ c := newTestConnection(t)
+ defer c.destroy()
+ testShutdownRecvFirstAfterConnect(t, c)
}
-func benchmarkSendRecv(b *testing.B, ctrlMode ControlMode) {
- pwa, err := NewPacketWindowAllocator()
- if err != nil {
- b.Fatalf("failed to create PacketWindowAllocator: %v", err)
+func testShutdownSendRecv(t *testing.T, c *testConnection) {
+ var serverRun sync.WaitGroup
+ serverRun.Add(1)
+ go func() {
+ defer serverRun.Done()
+ if _, err := c.serverEP.RecvFirst(); err != nil {
+ t.Fatalf("server Endpoint.RecvFirst() failed: %v", err)
+ }
+ if _, err := c.serverEP.SendRecv(0); err == nil {
+ t.Errorf("server Endpoint.SendRecv() succeeded unexpectedly")
+ }
+ }()
+ defer func() {
+ // Ensure that the server goroutine is cleaned up before
+ // c.serverEP.Destroy(), even if the test fails.
+ c.serverEP.Shutdown()
+ serverRun.Wait()
+ }()
+ if err := c.clientEP.Connect(); err != nil {
+ t.Fatalf("client Endpoint.Connect() failed: %v", err)
}
- defer pwa.Destroy()
- pwd, err := pwa.Allocate(testPacketWindowSize)
- if err != nil {
- b.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err)
+ if _, err := c.clientEP.SendRecv(0); err != nil {
+ t.Fatalf("client Endpoint.SendRecv() failed: %v", err)
}
+ time.Sleep(time.Second) // to allow serverEP.SendRecv() to block
+ c.serverEP.Shutdown()
+ serverRun.Wait()
+}
- sendEP, err := NewEndpoint(ctrlMode, pwd)
- if err != nil {
- b.Fatalf("failed to create Endpoint: %v", err)
- }
- defer sendEP.Destroy()
- recvEP, err := NewEndpoint(ctrlMode, pwd)
- if err != nil {
- b.Fatalf("failed to create Endpoint: %v", err)
- }
- defer recvEP.Destroy()
+func TestShutdownSendRecv(t *testing.T) {
+ c := newTestConnection(t)
+ defer c.destroy()
+ testShutdownSendRecv(t, c)
+}
- otherThreadDone := make(chan struct{})
+func benchmarkSendRecv(b *testing.B, c *testConnection) {
+ var serverRun sync.WaitGroup
+ serverRun.Add(1)
go func() {
- defer func() { otherThreadDone <- struct{}{} }()
+ defer serverRun.Done()
if b.N == 0 {
return
}
- if _, err := recvEP.RecvFirst(); err != nil {
- b.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err)
+ if _, err := c.serverEP.RecvFirst(); err != nil {
+ b.Fatalf("server Endpoint.RecvFirst() failed: %v", err)
}
for i := 1; i < b.N; i++ {
- if _, err := recvEP.SendRecv(0); err != nil {
- b.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err)
+ if _, err := c.serverEP.SendRecv(0); err != nil {
+ b.Fatalf("server Endpoint.SendRecv() failed: %v", err)
}
}
- if err := recvEP.SendLast(0); err != nil {
- b.Fatalf("initially-inactive Endpoint.SendLast() failed: %v", err)
+ if err := c.serverEP.SendLast(0); err != nil {
+ b.Fatalf("server Endpoint.SendLast() failed: %v", err)
}
}()
- defer func() { <-otherThreadDone }()
+ defer func() {
+ c.serverEP.Shutdown()
+ serverRun.Wait()
+ }()
+ if err := c.clientEP.Connect(); err != nil {
+ b.Fatalf("client Endpoint.Connect() failed: %v", err)
+ }
+ runtime.GC()
b.ResetTimer()
for i := 0; i < b.N; i++ {
- if _, err := sendEP.SendRecv(0); err != nil {
- b.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err)
+ if _, err := c.clientEP.SendRecv(0); err != nil {
+ b.Fatalf("client Endpoint.SendRecv() failed: %v", err)
}
}
b.StopTimer()
}
-func BenchmarkFutexSendRecv(b *testing.B) {
- benchmarkSendRecv(b, ControlModeFutex)
+func BenchmarkSendRecv(b *testing.B) {
+ c := newTestConnection(b)
+ defer c.destroy()
+ benchmarkSendRecv(b, c)
}
diff --git a/pkg/flipcall/flipcall_unsafe.go b/pkg/flipcall/flipcall_unsafe.go
new file mode 100644
index 000000000..7c8977893
--- /dev/null
+++ b/pkg/flipcall/flipcall_unsafe.go
@@ -0,0 +1,69 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "reflect"
+ "unsafe"
+)
+
+// Packets consist of an 8-byte header followed by an arbitrarily-sized
+// datagram. The header consists of:
+//
+// - A 4-byte native-endian connection state.
+//
+// - A 4-byte native-endian datagram length in bytes.
+const (
+ sizeofUint32 = unsafe.Sizeof(uint32(0))
+
+ // PacketHeaderBytes is the size of a flipcall packet header in bytes. The
+ // maximum datagram size supported by a flipcall connection is equal to the
+ // length of the packet window minus PacketHeaderBytes.
+ //
+ // PacketHeaderBytes is exported to support its use in constant
+ // expressions. Non-constant expressions may prefer to use
+ // PacketWindowLengthForDataCap().
+ PacketHeaderBytes = 2 * sizeofUint32
+)
+
+func (ep *Endpoint) connState() *uint32 {
+ return (*uint32)((unsafe.Pointer)(ep.packet))
+}
+
+func (ep *Endpoint) dataLen() *uint32 {
+ return (*uint32)((unsafe.Pointer)(ep.packet + sizeofUint32))
+}
+
+// Data returns the datagram part of ep's packet window as a byte slice.
+//
+// Note that the packet window is shared with the potentially-untrusted peer
+// Endpoint, which may concurrently mutate the contents of the packet window.
+// Thus:
+//
+// - Readers must not assume that two reads of the same byte in Data() will
+// return the same result. In other words, readers should read any given byte
+// in Data() at most once.
+//
+// - Writers must not assume that they will read back the same data that they
+// have written. In other words, writers should avoid reading from Data() at
+// all.
+func (ep *Endpoint) Data() []byte {
+ var bs []byte
+ bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs))
+ bsReflect.Data = ep.packet + PacketHeaderBytes
+ bsReflect.Len = int(ep.dataCap)
+ bsReflect.Cap = int(ep.dataCap)
+ return bs
+}
diff --git a/pkg/flipcall/futex_linux.go b/pkg/flipcall/futex_linux.go
index 3f592ad16..e7dd812b3 100644
--- a/pkg/flipcall/futex_linux.go
+++ b/pkg/flipcall/futex_linux.go
@@ -17,78 +17,95 @@
package flipcall
import (
+ "encoding/json"
"fmt"
- "math"
+ "runtime"
"sync/atomic"
"syscall"
"gvisor.dev/gvisor/pkg/abi/linux"
- "gvisor.dev/gvisor/pkg/log"
)
-func (ep *Endpoint) doFutexRoundTrip() error {
- ourSeq, err := ep.doFutexNotifySeq()
- if err != nil {
- return err
+func (ep *Endpoint) futexConnect(req *ctrlHandshakeRequest) (ctrlHandshakeResponse, error) {
+ var resp ctrlHandshakeResponse
+
+ // Write the handshake request.
+ w := ep.NewWriter()
+ if err := json.NewEncoder(w).Encode(req); err != nil {
+ return resp, fmt.Errorf("error writing handshake request: %v", err)
}
- return ep.doFutexWaitSeq(ourSeq)
-}
+ *ep.dataLen() = w.Len()
-func (ep *Endpoint) doFutexWaitFirst() error {
- return ep.doFutexWaitSeq(0)
-}
+ // Exchange control with the server.
+ if err := ep.futexSwitchToPeer(); err != nil {
+ return resp, err
+ }
+ if err := ep.futexSwitchFromPeer(); err != nil {
+ return resp, err
+ }
-func (ep *Endpoint) doFutexNotifyLast() error {
- _, err := ep.doFutexNotifySeq()
- return err
+ // Read the handshake response.
+ respLen := atomic.LoadUint32(ep.dataLen())
+ if respLen > ep.dataCap {
+ return resp, fmt.Errorf("invalid handshake response length %d (maximum %d)", respLen, ep.dataCap)
+ }
+ if err := json.NewDecoder(ep.NewReader(respLen)).Decode(&resp); err != nil {
+ return resp, fmt.Errorf("error reading handshake response: %v", err)
+ }
+
+ return resp, nil
}
-func (ep *Endpoint) doFutexNotifySeq() (uint32, error) {
- ourSeq := atomic.AddUint32(ep.seq(), 1)
- if err := ep.futexWake(1); err != nil {
- return ourSeq, fmt.Errorf("failed to FUTEX_WAKE peer Endpoint: %v", err)
+func (ep *Endpoint) futexSwitchToPeer() error {
+ // Update connection state to indicate that the peer should be active.
+ if !atomic.CompareAndSwapUint32(ep.connState(), ep.activeState, ep.inactiveState) {
+ return fmt.Errorf("unexpected connection state before FUTEX_WAKE: %v", atomic.LoadUint32(ep.connState()))
}
- return ourSeq, nil
+
+ // Wake the peer's Endpoint.futexSwitchFromPeer().
+ if err := ep.futexWakeConnState(1); err != nil {
+ return fmt.Errorf("failed to FUTEX_WAKE peer Endpoint: %v", err)
+ }
+ return nil
}
-func (ep *Endpoint) doFutexWaitSeq(prevSeq uint32) error {
- nextSeq := prevSeq + 1
+func (ep *Endpoint) futexSwitchFromPeer() error {
for {
- if ep.isShutdown() {
- return endpointShutdownError{}
- }
- if err := ep.futexWait(prevSeq); err != nil {
- return fmt.Errorf("failed to FUTEX_WAIT for peer Endpoint: %v", err)
- }
- seq := atomic.LoadUint32(ep.seq())
- if seq == nextSeq {
+ switch cs := atomic.LoadUint32(ep.connState()); cs {
+ case ep.activeState:
return nil
+ case ep.inactiveState:
+ // Continue to FUTEX_WAIT.
+ default:
+ return fmt.Errorf("unexpected connection state before FUTEX_WAIT: %v", cs)
}
- if seq != prevSeq {
- return fmt.Errorf("invalid packet sequence number %d (expected %d or %d)", seq, prevSeq, nextSeq)
+ if ep.isShutdownLocally() {
+ return shutdownError{}
+ }
+ if err := ep.futexWaitConnState(ep.inactiveState); err != nil {
+ return fmt.Errorf("failed to FUTEX_WAIT for peer Endpoint: %v", err)
}
}
}
-func (ep *Endpoint) doFutexInterruptForShutdown() {
- // Wake MaxInt32 threads to prevent a malicious or broken peer from
- // swallowing our wakeup by FUTEX_WAITing from multiple threads.
- if err := ep.futexWake(math.MaxInt32); err != nil {
- log.Warningf("failed to FUTEX_WAKE Endpoint: %v", err)
- }
-}
-
-func (ep *Endpoint) futexWake(numThreads int32) error {
- if _, _, e := syscall.RawSyscall(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAKE, uintptr(numThreads)); e != 0 {
+func (ep *Endpoint) futexWakeConnState(numThreads int32) error {
+ if _, _, e := syscall.RawSyscall(syscall.SYS_FUTEX, ep.packet, linux.FUTEX_WAKE, uintptr(numThreads)); e != 0 {
return e
}
return nil
}
-func (ep *Endpoint) futexWait(seq uint32) error {
- _, _, e := syscall.Syscall6(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAIT, uintptr(seq), 0, 0, 0)
+func (ep *Endpoint) futexWaitConnState(curState uint32) error {
+ _, _, e := syscall.Syscall6(syscall.SYS_FUTEX, ep.packet, linux.FUTEX_WAIT, uintptr(curState), 0, 0, 0)
if e != 0 && e != syscall.EAGAIN && e != syscall.EINTR {
return e
}
return nil
}
+
+func yieldThread() {
+ syscall.Syscall(syscall.SYS_SCHED_YIELD, 0, 0, 0)
+ // The thread we're trying to yield to may be waiting for a Go runtime P.
+ // runtime.Gosched() will hand off ours if necessary.
+ runtime.Gosched()
+}
diff --git a/pkg/flipcall/io.go b/pkg/flipcall/io.go
new file mode 100644
index 000000000..85e40b932
--- /dev/null
+++ b/pkg/flipcall/io.go
@@ -0,0 +1,113 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "fmt"
+ "io"
+)
+
+// DatagramReader implements io.Reader by reading a datagram from an Endpoint's
+// packet window. Its use is optional; users that can use Endpoint.Data() more
+// efficiently are advised to do so.
+type DatagramReader struct {
+ ep *Endpoint
+ off uint32
+ end uint32
+}
+
+// Init must be called on zero-value DatagramReaders before first use.
+//
+// Preconditions: dataLen is 0, or was returned by a previous call to
+// ep.RecvFirst() or ep.SendRecv().
+func (r *DatagramReader) Init(ep *Endpoint, dataLen uint32) {
+ r.ep = ep
+ r.Reset(dataLen)
+}
+
+// Reset causes r to begin reading a new datagram of the given length from the
+// associated Endpoint.
+//
+// Preconditions: dataLen is 0, or was returned by a previous call to the
+// associated Endpoint's RecvFirst() or SendRecv() methods.
+func (r *DatagramReader) Reset(dataLen uint32) {
+ if dataLen > r.ep.dataCap {
+ panic(fmt.Sprintf("invalid dataLen (%d) > ep.dataCap (%d)", dataLen, r.ep.dataCap))
+ }
+ r.off = 0
+ r.end = dataLen
+}
+
+// NewReader is a convenience function that returns an initialized
+// DatagramReader allocated on the heap.
+//
+// Preconditions: dataLen was returned by a previous call to ep.RecvFirst() or
+// ep.SendRecv().
+func (ep *Endpoint) NewReader(dataLen uint32) *DatagramReader {
+ r := &DatagramReader{}
+ r.Init(ep, dataLen)
+ return r
+}
+
+// Read implements io.Reader.Read.
+func (r *DatagramReader) Read(dst []byte) (int, error) {
+ n := copy(dst, r.ep.Data()[r.off:r.end])
+ r.off += uint32(n)
+ if r.off == r.end {
+ return n, io.EOF
+ }
+ return n, nil
+}
+
+// DatagramWriter implements io.Writer by writing a datagram to an Endpoint's
+// packet window. Its use is optional; users that can use Endpoint.Data() more
+// efficiently are advised to do so.
+type DatagramWriter struct {
+ ep *Endpoint
+ off uint32
+}
+
+// Init must be called on zero-value DatagramWriters before first use.
+func (w *DatagramWriter) Init(ep *Endpoint) {
+ w.ep = ep
+}
+
+// Reset causes w to begin writing a new datagram to the associated Endpoint.
+func (w *DatagramWriter) Reset() {
+ w.off = 0
+}
+
+// NewWriter is a convenience function that returns an initialized
+// DatagramWriter allocated on the heap.
+func (ep *Endpoint) NewWriter() *DatagramWriter {
+ w := &DatagramWriter{}
+ w.Init(ep)
+ return w
+}
+
+// Write implements io.Writer.Write.
+func (w *DatagramWriter) Write(src []byte) (int, error) {
+ n := copy(w.ep.Data()[w.off:w.ep.dataCap], src)
+ w.off += uint32(n)
+ if n != len(src) {
+ return n, fmt.Errorf("datagram would exceed maximum size of %d bytes", w.ep.dataCap)
+ }
+ return n, nil
+}
+
+// Len returns the length of the written datagram.
+func (w *DatagramWriter) Len() uint32 {
+ return w.off
+}
diff --git a/pkg/flipcall/packet_window_allocator.go b/pkg/flipcall/packet_window_allocator.go
index 7b455b24d..ccb918fab 100644
--- a/pkg/flipcall/packet_window_allocator.go
+++ b/pkg/flipcall/packet_window_allocator.go
@@ -34,10 +34,10 @@ func init() {
// This is depended on by roundUpToPage().
panic(fmt.Sprintf("system page size (%d) is not a power of 2", pageSize))
}
- if uintptr(pageSize) < packetHeaderBytes {
+ if uintptr(pageSize) < PacketHeaderBytes {
// This is required since Endpoint.Init() imposes a minimum packet
// window size of 1 page.
- panic(fmt.Sprintf("system page size (%d) is less than packet header size (%d)", pageSize, packetHeaderBytes))
+ panic(fmt.Sprintf("system page size (%d) is less than packet header size (%d)", pageSize, PacketHeaderBytes))
}
}
@@ -59,7 +59,7 @@ type PacketWindowDescriptor struct {
// PacketWindowLengthForDataCap returns the minimum packet window size required
// to accommodate datagrams of the given size in bytes.
func PacketWindowLengthForDataCap(dataCap uint32) int {
- return roundUpToPage(int(dataCap) + int(packetHeaderBytes))
+ return roundUpToPage(int(dataCap) + int(PacketHeaderBytes))
}
func roundUpToPage(x int) int {