From e806466fc5a1331df1e643226297064e0eb31075 Mon Sep 17 00:00:00 2001 From: Jamie Liu Date: Fri, 21 Jun 2019 14:45:57 -0700 Subject: Add //pkg/flipcall. Flipcall is a (conceptually) simple local-only RPC mechanism. Compared to unet, Flipcall does not support passing FDs (support for which will be provided out of band by another package), requires users to establish connections manually, and requires user management of concurrency since each connected Endpoint pair supports only a single RPC at a time; however, it improves performance by using shared memory for data (reducing memory copies) and using futexes for control signaling (which is much cheaper than sendto/recvfrom/sendmsg/recvmsg). PiperOrigin-RevId: 254471986 --- pkg/flipcall/endpoint_unsafe.go | 238 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 pkg/flipcall/endpoint_unsafe.go (limited to 'pkg/flipcall/endpoint_unsafe.go') diff --git a/pkg/flipcall/endpoint_unsafe.go b/pkg/flipcall/endpoint_unsafe.go new file mode 100644 index 000000000..8319955e0 --- /dev/null +++ b/pkg/flipcall/endpoint_unsafe.go @@ -0,0 +1,238 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "fmt" + "math" + "reflect" + "sync/atomic" + "syscall" + "unsafe" +) + +// An Endpoint provides the ability to synchronously transfer data and control +// to a connected peer Endpoint, which may be in another process. +// +// Since the Endpoint control transfer model is synchronous, at any given time +// one Endpoint "has control" (designated the *active* Endpoint), and the other +// is "waiting for control" (designated the *inactive* Endpoint). Users of the +// flipcall package arbitrarily designate one Endpoint as initially-active, and +// the other as initially-inactive; in a client/server protocol, the client +// Endpoint is usually initially-active (able to send a request) and the server +// Endpoint is usually initially-inactive (waiting for a request). The +// initially-active Endpoint writes data to be sent to Endpoint.Data(), and +// then synchronously transfers control to the inactive Endpoint by calling +// Endpoint.SendRecv(), becoming the inactive Endpoint in the process. The +// initially-inactive Endpoint waits for control by calling +// Endpoint.RecvFirst(); receiving control causes it to become the active +// Endpoint. After this, the protocol is symmetric: the active Endpoint reads +// data sent by the peer by reading from Endpoint.Data(), writes data to be +// sent to the peer into Endpoint.Data(), and then calls Endpoint.SendRecv() to +// exchange roles with the peer, which blocks until the peer has done the same. +type Endpoint struct { + // shutdown is non-zero if Endpoint.Shutdown() has been called. shutdown is + // accessed using atomic memory operations. + shutdown uint32 + + // dataCap is the size of the datagram part of the packet window in bytes. + // dataCap is immutable. + dataCap uint32 + + // packet is the beginning of the packet window. packet is immutable. + packet unsafe.Pointer + + ctrl endpointControlState +} + +// Init must be called on zero-value Endpoints before first use. If it +// succeeds, Destroy() must be called once the Endpoint is no longer in use. +// +// ctrlMode specifies how connected Endpoints will exchange control. Both +// connected Endpoints must specify the same value for ctrlMode. +// +// pwd represents the packet window used to exchange data with the peer +// Endpoint. FD may differ between Endpoints if they are in different +// processes, but must represent the same file. The packet window must +// initially be filled with zero bytes. +func (ep *Endpoint) Init(ctrlMode ControlMode, pwd PacketWindowDescriptor) error { + if pwd.Length < pageSize { + return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize) + } + if pwd.Length > math.MaxUint32 { + return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32) + } + m, _, e := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset)) + if e != 0 { + return fmt.Errorf("failed to mmap packet window: %v", e) + } + ep.dataCap = uint32(pwd.Length) - uint32(packetHeaderBytes) + ep.packet = (unsafe.Pointer)(m) + if err := ep.initControlState(ctrlMode); err != nil { + ep.unmapPacket() + return err + } + return nil +} + +// NewEndpoint is a convenience function that returns an initialized Endpoint +// allocated on the heap. +func NewEndpoint(ctrlMode ControlMode, pwd PacketWindowDescriptor) (*Endpoint, error) { + var ep Endpoint + if err := ep.Init(ctrlMode, pwd); err != nil { + return nil, err + } + return &ep, nil +} + +func (ep *Endpoint) unmapPacket() { + syscall.Syscall(syscall.SYS_MUNMAP, uintptr(ep.packet), uintptr(ep.dataCap)+packetHeaderBytes, 0) + ep.dataCap = 0 + ep.packet = nil +} + +// Destroy releases resources owned by ep. No other Endpoint methods may be +// called after Destroy. +func (ep *Endpoint) Destroy() { + ep.unmapPacket() +} + +// Packets consist of an 8-byte header followed by an arbitrarily-sized +// datagram. The header consists of: +// +// - A 4-byte native-endian sequence number, which is incremented by the active +// Endpoint after it finishes writing to the packet window. The sequence number +// is needed to handle spurious wakeups. +// +// - A 4-byte native-endian datagram length in bytes. +const ( + sizeofUint32 = unsafe.Sizeof(uint32(0)) + packetHeaderBytes = 2 * sizeofUint32 +) + +func (ep *Endpoint) seq() *uint32 { + return (*uint32)(ep.packet) +} + +func (ep *Endpoint) dataLen() *uint32 { + return (*uint32)((unsafe.Pointer)(uintptr(ep.packet) + sizeofUint32)) +} + +// DataCap returns the maximum datagram size supported by ep in bytes. +func (ep *Endpoint) DataCap() uint32 { + return ep.dataCap +} + +func (ep *Endpoint) data() unsafe.Pointer { + return unsafe.Pointer(uintptr(ep.packet) + packetHeaderBytes) +} + +// Data returns the datagram part of ep's packet window as a byte slice. +// +// Note that the packet window is shared with the potentially-untrusted peer +// Endpoint, which may concurrently mutate the contents of the packet window. +// Thus: +// +// - Readers must not assume that two reads of the same byte in Data() will +// return the same result. In other words, readers should read any given byte +// in Data() at most once. +// +// - Writers must not assume that they will read back the same data that they +// have written. In other words, writers should avoid reading from Data() at +// all. +func (ep *Endpoint) Data() []byte { + var bs []byte + bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs)) + bsReflect.Data = uintptr(ep.data()) + bsReflect.Len = int(ep.DataCap()) + bsReflect.Cap = bsReflect.Len + return bs +} + +// SendRecv transfers control to the peer Endpoint, causing its call to +// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given +// datagram length, then blocks until the peer Endpoint calls +// Endpoint.SendRecv() or Endpoint.SendLast(). +// +// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has +// returned an error. ep.SendLast() has never been called. +func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) { + dataCap := ep.DataCap() + if dataLen > dataCap { + return 0, fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap) + } + atomic.StoreUint32(ep.dataLen(), dataLen) + if err := ep.doRoundTrip(); err != nil { + return 0, err + } + recvDataLen := atomic.LoadUint32(ep.dataLen()) + if recvDataLen > dataCap { + return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap) + } + return recvDataLen, nil +} + +// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then +// returns the datagram length specified by that call. +// +// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never +// been called. +func (ep *Endpoint) RecvFirst() (uint32, error) { + if err := ep.doWaitFirst(); err != nil { + return 0, err + } + recvDataLen := atomic.LoadUint32(ep.dataLen()) + if dataCap := ep.DataCap(); recvDataLen > dataCap { + return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap) + } + return recvDataLen, nil +} + +// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or +// Endpoint.RecvFirst() to return with the given datagram length. +// +// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has +// returned an error. ep.SendLast() has never been called. +func (ep *Endpoint) SendLast(dataLen uint32) error { + dataCap := ep.DataCap() + if dataLen > dataCap { + return fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap) + } + atomic.StoreUint32(ep.dataLen(), dataLen) + if err := ep.doNotifyLast(); err != nil { + return err + } + return nil +} + +// Shutdown causes concurrent and future calls to ep.SendRecv(), +// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not +// wait for concurrent calls to return. +func (ep *Endpoint) Shutdown() { + if atomic.SwapUint32(&ep.shutdown, 1) == 0 { + ep.interruptForShutdown() + } +} + +func (ep *Endpoint) isShutdown() bool { + return atomic.LoadUint32(&ep.shutdown) != 0 +} + +type endpointShutdownError struct{} + +// Error implements error.Error. +func (endpointShutdownError) Error() string { + return "Endpoint.Shutdown() has been called" +} -- cgit v1.2.3