diff options
Diffstat (limited to 'pkg/flipcall')
-rw-r--r-- | pkg/flipcall/BUILD | 31 | ||||
-rw-r--r-- | pkg/flipcall/endpoint_futex.go | 45 | ||||
-rw-r--r-- | pkg/flipcall/endpoint_unsafe.go | 238 | ||||
-rw-r--r-- | pkg/flipcall/flipcall.go | 32 | ||||
-rw-r--r-- | pkg/flipcall/flipcall_example_test.go | 106 | ||||
-rw-r--r-- | pkg/flipcall/flipcall_test.go | 211 | ||||
-rw-r--r-- | pkg/flipcall/futex_linux.go | 94 | ||||
-rw-r--r-- | pkg/flipcall/packet_window_allocator.go | 166 |
8 files changed, 923 insertions, 0 deletions
diff --git a/pkg/flipcall/BUILD b/pkg/flipcall/BUILD new file mode 100644 index 000000000..7126fc45f --- /dev/null +++ b/pkg/flipcall/BUILD @@ -0,0 +1,31 @@ +load("//tools/go_stateify:defs.bzl", "go_library", "go_test") + +package(licenses = ["notice"]) + +go_library( + name = "flipcall", + srcs = [ + "endpoint_futex.go", + "endpoint_unsafe.go", + "flipcall.go", + "futex_linux.go", + "packet_window_allocator.go", + ], + importpath = "gvisor.dev/gvisor/pkg/flipcall", + visibility = ["//visibility:public"], + deps = [ + "//pkg/abi/linux", + "//pkg/log", + "//pkg/memutil", + ], +) + +go_test( + name = "flipcall_test", + size = "small", + srcs = [ + "flipcall_example_test.go", + "flipcall_test.go", + ], + embed = [":flipcall"], +) diff --git a/pkg/flipcall/endpoint_futex.go b/pkg/flipcall/endpoint_futex.go new file mode 100644 index 000000000..5cab02b1d --- /dev/null +++ b/pkg/flipcall/endpoint_futex.go @@ -0,0 +1,45 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "fmt" +) + +type endpointControlState struct{} + +func (ep *Endpoint) initControlState(ctrlMode ControlMode) error { + if ctrlMode != ControlModeFutex { + return fmt.Errorf("unsupported control mode: %v", ctrlMode) + } + return nil +} + +func (ep *Endpoint) doRoundTrip() error { + return ep.doFutexRoundTrip() +} + +func (ep *Endpoint) doWaitFirst() error { + return ep.doFutexWaitFirst() +} + +func (ep *Endpoint) doNotifyLast() error { + return ep.doFutexNotifyLast() +} + +// Preconditions: ep.isShutdown() == true. +func (ep *Endpoint) interruptForShutdown() { + ep.doFutexInterruptForShutdown() +} diff --git a/pkg/flipcall/endpoint_unsafe.go b/pkg/flipcall/endpoint_unsafe.go new file mode 100644 index 000000000..8319955e0 --- /dev/null +++ b/pkg/flipcall/endpoint_unsafe.go @@ -0,0 +1,238 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "fmt" + "math" + "reflect" + "sync/atomic" + "syscall" + "unsafe" +) + +// An Endpoint provides the ability to synchronously transfer data and control +// to a connected peer Endpoint, which may be in another process. +// +// Since the Endpoint control transfer model is synchronous, at any given time +// one Endpoint "has control" (designated the *active* Endpoint), and the other +// is "waiting for control" (designated the *inactive* Endpoint). Users of the +// flipcall package arbitrarily designate one Endpoint as initially-active, and +// the other as initially-inactive; in a client/server protocol, the client +// Endpoint is usually initially-active (able to send a request) and the server +// Endpoint is usually initially-inactive (waiting for a request). The +// initially-active Endpoint writes data to be sent to Endpoint.Data(), and +// then synchronously transfers control to the inactive Endpoint by calling +// Endpoint.SendRecv(), becoming the inactive Endpoint in the process. The +// initially-inactive Endpoint waits for control by calling +// Endpoint.RecvFirst(); receiving control causes it to become the active +// Endpoint. After this, the protocol is symmetric: the active Endpoint reads +// data sent by the peer by reading from Endpoint.Data(), writes data to be +// sent to the peer into Endpoint.Data(), and then calls Endpoint.SendRecv() to +// exchange roles with the peer, which blocks until the peer has done the same. +type Endpoint struct { + // shutdown is non-zero if Endpoint.Shutdown() has been called. shutdown is + // accessed using atomic memory operations. + shutdown uint32 + + // dataCap is the size of the datagram part of the packet window in bytes. + // dataCap is immutable. + dataCap uint32 + + // packet is the beginning of the packet window. packet is immutable. + packet unsafe.Pointer + + ctrl endpointControlState +} + +// Init must be called on zero-value Endpoints before first use. If it +// succeeds, Destroy() must be called once the Endpoint is no longer in use. +// +// ctrlMode specifies how connected Endpoints will exchange control. Both +// connected Endpoints must specify the same value for ctrlMode. +// +// pwd represents the packet window used to exchange data with the peer +// Endpoint. FD may differ between Endpoints if they are in different +// processes, but must represent the same file. The packet window must +// initially be filled with zero bytes. +func (ep *Endpoint) Init(ctrlMode ControlMode, pwd PacketWindowDescriptor) error { + if pwd.Length < pageSize { + return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize) + } + if pwd.Length > math.MaxUint32 { + return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32) + } + m, _, e := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset)) + if e != 0 { + return fmt.Errorf("failed to mmap packet window: %v", e) + } + ep.dataCap = uint32(pwd.Length) - uint32(packetHeaderBytes) + ep.packet = (unsafe.Pointer)(m) + if err := ep.initControlState(ctrlMode); err != nil { + ep.unmapPacket() + return err + } + return nil +} + +// NewEndpoint is a convenience function that returns an initialized Endpoint +// allocated on the heap. +func NewEndpoint(ctrlMode ControlMode, pwd PacketWindowDescriptor) (*Endpoint, error) { + var ep Endpoint + if err := ep.Init(ctrlMode, pwd); err != nil { + return nil, err + } + return &ep, nil +} + +func (ep *Endpoint) unmapPacket() { + syscall.Syscall(syscall.SYS_MUNMAP, uintptr(ep.packet), uintptr(ep.dataCap)+packetHeaderBytes, 0) + ep.dataCap = 0 + ep.packet = nil +} + +// Destroy releases resources owned by ep. No other Endpoint methods may be +// called after Destroy. +func (ep *Endpoint) Destroy() { + ep.unmapPacket() +} + +// Packets consist of an 8-byte header followed by an arbitrarily-sized +// datagram. The header consists of: +// +// - A 4-byte native-endian sequence number, which is incremented by the active +// Endpoint after it finishes writing to the packet window. The sequence number +// is needed to handle spurious wakeups. +// +// - A 4-byte native-endian datagram length in bytes. +const ( + sizeofUint32 = unsafe.Sizeof(uint32(0)) + packetHeaderBytes = 2 * sizeofUint32 +) + +func (ep *Endpoint) seq() *uint32 { + return (*uint32)(ep.packet) +} + +func (ep *Endpoint) dataLen() *uint32 { + return (*uint32)((unsafe.Pointer)(uintptr(ep.packet) + sizeofUint32)) +} + +// DataCap returns the maximum datagram size supported by ep in bytes. +func (ep *Endpoint) DataCap() uint32 { + return ep.dataCap +} + +func (ep *Endpoint) data() unsafe.Pointer { + return unsafe.Pointer(uintptr(ep.packet) + packetHeaderBytes) +} + +// Data returns the datagram part of ep's packet window as a byte slice. +// +// Note that the packet window is shared with the potentially-untrusted peer +// Endpoint, which may concurrently mutate the contents of the packet window. +// Thus: +// +// - Readers must not assume that two reads of the same byte in Data() will +// return the same result. In other words, readers should read any given byte +// in Data() at most once. +// +// - Writers must not assume that they will read back the same data that they +// have written. In other words, writers should avoid reading from Data() at +// all. +func (ep *Endpoint) Data() []byte { + var bs []byte + bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs)) + bsReflect.Data = uintptr(ep.data()) + bsReflect.Len = int(ep.DataCap()) + bsReflect.Cap = bsReflect.Len + return bs +} + +// SendRecv transfers control to the peer Endpoint, causing its call to +// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given +// datagram length, then blocks until the peer Endpoint calls +// Endpoint.SendRecv() or Endpoint.SendLast(). +// +// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has +// returned an error. ep.SendLast() has never been called. +func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) { + dataCap := ep.DataCap() + if dataLen > dataCap { + return 0, fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap) + } + atomic.StoreUint32(ep.dataLen(), dataLen) + if err := ep.doRoundTrip(); err != nil { + return 0, err + } + recvDataLen := atomic.LoadUint32(ep.dataLen()) + if recvDataLen > dataCap { + return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap) + } + return recvDataLen, nil +} + +// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then +// returns the datagram length specified by that call. +// +// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never +// been called. +func (ep *Endpoint) RecvFirst() (uint32, error) { + if err := ep.doWaitFirst(); err != nil { + return 0, err + } + recvDataLen := atomic.LoadUint32(ep.dataLen()) + if dataCap := ep.DataCap(); recvDataLen > dataCap { + return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap) + } + return recvDataLen, nil +} + +// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or +// Endpoint.RecvFirst() to return with the given datagram length. +// +// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has +// returned an error. ep.SendLast() has never been called. +func (ep *Endpoint) SendLast(dataLen uint32) error { + dataCap := ep.DataCap() + if dataLen > dataCap { + return fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap) + } + atomic.StoreUint32(ep.dataLen(), dataLen) + if err := ep.doNotifyLast(); err != nil { + return err + } + return nil +} + +// Shutdown causes concurrent and future calls to ep.SendRecv(), +// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not +// wait for concurrent calls to return. +func (ep *Endpoint) Shutdown() { + if atomic.SwapUint32(&ep.shutdown, 1) == 0 { + ep.interruptForShutdown() + } +} + +func (ep *Endpoint) isShutdown() bool { + return atomic.LoadUint32(&ep.shutdown) != 0 +} + +type endpointShutdownError struct{} + +// Error implements error.Error. +func (endpointShutdownError) Error() string { + return "Endpoint.Shutdown() has been called" +} diff --git a/pkg/flipcall/flipcall.go b/pkg/flipcall/flipcall.go new file mode 100644 index 000000000..79a1e418a --- /dev/null +++ b/pkg/flipcall/flipcall.go @@ -0,0 +1,32 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package flipcall implements a protocol providing Fast Local Interprocess +// Procedure Calls. +package flipcall + +// ControlMode defines how control is exchanged across a connection. +type ControlMode uint8 + +const ( + // ControlModeInvalid is invalid, and exists so that ControlMode fields in + // structs must be explicitly initialized. + ControlModeInvalid ControlMode = iota + + // ControlModeFutex uses shared futex operations on packet control words. + ControlModeFutex + + // controlModeCount is the number of ControlModes in this list. + controlModeCount +) diff --git a/pkg/flipcall/flipcall_example_test.go b/pkg/flipcall/flipcall_example_test.go new file mode 100644 index 000000000..572a1f119 --- /dev/null +++ b/pkg/flipcall/flipcall_example_test.go @@ -0,0 +1,106 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "bytes" + "fmt" +) + +func Example() { + const ( + reqPrefix = "request " + respPrefix = "response " + count = 3 + maxMessageLen = len(respPrefix) + 1 // 1 digit + ) + + pwa, err := NewPacketWindowAllocator() + if err != nil { + panic(err) + } + defer pwa.Destroy() + pwd, err := pwa.Allocate(PacketWindowLengthForDataCap(uint32(maxMessageLen))) + if err != nil { + panic(err) + } + clientEP, err := NewEndpoint(ControlModeFutex, pwd) + if err != nil { + panic(err) + } + defer clientEP.Destroy() + serverEP, err := NewEndpoint(ControlModeFutex, pwd) + if err != nil { + panic(err) + } + defer serverEP.Destroy() + + serverDone := make(chan struct{}) + go func() { + defer func() { serverDone <- struct{}{} }() + i := 0 + var buf bytes.Buffer + // wait for first request + n, err := serverEP.RecvFirst() + if err != nil { + return + } + for { + // read request + buf.Reset() + buf.Write(serverEP.Data()[:n]) + fmt.Println(buf.String()) + // write response + buf.Reset() + fmt.Fprintf(&buf, "%s%d", respPrefix, i) + copy(serverEP.Data(), buf.Bytes()) + // send response and wait for next request + n, err = serverEP.SendRecv(uint32(buf.Len())) + if err != nil { + return + } + i++ + } + }() + defer func() { + serverEP.Shutdown() + <-serverDone + }() + + var buf bytes.Buffer + for i := 0; i < count; i++ { + // write request + buf.Reset() + fmt.Fprintf(&buf, "%s%d", reqPrefix, i) + copy(clientEP.Data(), buf.Bytes()) + // send request and wait for response + n, err := clientEP.SendRecv(uint32(buf.Len())) + if err != nil { + panic(err) + } + // read response + buf.Reset() + buf.Write(clientEP.Data()[:n]) + fmt.Println(buf.String()) + } + + // Output: + // request 0 + // response 0 + // request 1 + // response 1 + // request 2 + // response 2 +} diff --git a/pkg/flipcall/flipcall_test.go b/pkg/flipcall/flipcall_test.go new file mode 100644 index 000000000..20d3002f0 --- /dev/null +++ b/pkg/flipcall/flipcall_test.go @@ -0,0 +1,211 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "testing" + "time" +) + +var testPacketWindowSize = pageSize + +func testSendRecv(t *testing.T, ctrlMode ControlMode) { + pwa, err := NewPacketWindowAllocator() + if err != nil { + t.Fatalf("failed to create PacketWindowAllocator: %v", err) + } + defer pwa.Destroy() + pwd, err := pwa.Allocate(testPacketWindowSize) + if err != nil { + t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err) + } + + sendEP, err := NewEndpoint(ctrlMode, pwd) + if err != nil { + t.Fatalf("failed to create Endpoint: %v", err) + } + defer sendEP.Destroy() + recvEP, err := NewEndpoint(ctrlMode, pwd) + if err != nil { + t.Fatalf("failed to create Endpoint: %v", err) + } + defer recvEP.Destroy() + + otherThreadDone := make(chan struct{}) + go func() { + defer func() { otherThreadDone <- struct{}{} }() + t.Logf("initially-inactive Endpoint waiting for packet 1") + if _, err := recvEP.RecvFirst(); err != nil { + t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err) + } + t.Logf("initially-inactive Endpoint got packet 1, sending packet 2 and waiting for packet 3") + if _, err := recvEP.SendRecv(0); err != nil { + t.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err) + } + t.Logf("initially-inactive Endpoint got packet 3") + }() + defer func() { + t.Logf("waiting for initially-inactive Endpoint goroutine to complete") + <-otherThreadDone + }() + + t.Logf("initially-active Endpoint sending packet 1 and waiting for packet 2") + if _, err := sendEP.SendRecv(0); err != nil { + t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err) + } + t.Logf("initially-active Endpoint got packet 2, sending packet 3") + if err := sendEP.SendLast(0); err != nil { + t.Fatalf("initially-active Endpoint.SendLast() failed: %v", err) + } +} + +func TestFutexSendRecv(t *testing.T) { + testSendRecv(t, ControlModeFutex) +} + +func testRecvFirstShutdown(t *testing.T, ctrlMode ControlMode) { + pwa, err := NewPacketWindowAllocator() + if err != nil { + t.Fatalf("failed to create PacketWindowAllocator: %v", err) + } + defer pwa.Destroy() + pwd, err := pwa.Allocate(testPacketWindowSize) + if err != nil { + t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err) + } + + ep, err := NewEndpoint(ctrlMode, pwd) + if err != nil { + t.Fatalf("failed to create Endpoint: %v", err) + } + defer ep.Destroy() + + otherThreadDone := make(chan struct{}) + go func() { + defer func() { otherThreadDone <- struct{}{} }() + _, err := ep.RecvFirst() + if err == nil { + t.Errorf("Endpoint.RecvFirst() succeeded unexpectedly") + } + }() + + time.Sleep(time.Second) // to ensure ep.RecvFirst() has blocked + ep.Shutdown() + <-otherThreadDone +} + +func TestFutexRecvFirstShutdown(t *testing.T) { + testRecvFirstShutdown(t, ControlModeFutex) +} + +func testSendRecvShutdown(t *testing.T, ctrlMode ControlMode) { + pwa, err := NewPacketWindowAllocator() + if err != nil { + t.Fatalf("failed to create PacketWindowAllocator: %v", err) + } + defer pwa.Destroy() + pwd, err := pwa.Allocate(testPacketWindowSize) + if err != nil { + t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err) + } + + sendEP, err := NewEndpoint(ctrlMode, pwd) + if err != nil { + t.Fatalf("failed to create Endpoint: %v", err) + } + defer sendEP.Destroy() + recvEP, err := NewEndpoint(ctrlMode, pwd) + if err != nil { + t.Fatalf("failed to create Endpoint: %v", err) + } + defer recvEP.Destroy() + + otherThreadDone := make(chan struct{}) + go func() { + defer func() { otherThreadDone <- struct{}{} }() + if _, err := recvEP.RecvFirst(); err != nil { + t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err) + } + if _, err := recvEP.SendRecv(0); err == nil { + t.Errorf("initially-inactive Endpoint.SendRecv() succeeded unexpectedly") + } + }() + + if _, err := sendEP.SendRecv(0); err != nil { + t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err) + } + time.Sleep(time.Second) // to ensure recvEP.SendRecv() has blocked + recvEP.Shutdown() + <-otherThreadDone +} + +func TestFutexSendRecvShutdown(t *testing.T) { + testSendRecvShutdown(t, ControlModeFutex) +} + +func benchmarkSendRecv(b *testing.B, ctrlMode ControlMode) { + pwa, err := NewPacketWindowAllocator() + if err != nil { + b.Fatalf("failed to create PacketWindowAllocator: %v", err) + } + defer pwa.Destroy() + pwd, err := pwa.Allocate(testPacketWindowSize) + if err != nil { + b.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err) + } + + sendEP, err := NewEndpoint(ctrlMode, pwd) + if err != nil { + b.Fatalf("failed to create Endpoint: %v", err) + } + defer sendEP.Destroy() + recvEP, err := NewEndpoint(ctrlMode, pwd) + if err != nil { + b.Fatalf("failed to create Endpoint: %v", err) + } + defer recvEP.Destroy() + + otherThreadDone := make(chan struct{}) + go func() { + defer func() { otherThreadDone <- struct{}{} }() + if b.N == 0 { + return + } + if _, err := recvEP.RecvFirst(); err != nil { + b.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err) + } + for i := 1; i < b.N; i++ { + if _, err := recvEP.SendRecv(0); err != nil { + b.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err) + } + } + if err := recvEP.SendLast(0); err != nil { + b.Fatalf("initially-inactive Endpoint.SendLast() failed: %v", err) + } + }() + defer func() { <-otherThreadDone }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := sendEP.SendRecv(0); err != nil { + b.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err) + } + } + b.StopTimer() +} + +func BenchmarkFutexSendRecv(b *testing.B) { + benchmarkSendRecv(b, ControlModeFutex) +} diff --git a/pkg/flipcall/futex_linux.go b/pkg/flipcall/futex_linux.go new file mode 100644 index 000000000..3f592ad16 --- /dev/null +++ b/pkg/flipcall/futex_linux.go @@ -0,0 +1,94 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package flipcall + +import ( + "fmt" + "math" + "sync/atomic" + "syscall" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/log" +) + +func (ep *Endpoint) doFutexRoundTrip() error { + ourSeq, err := ep.doFutexNotifySeq() + if err != nil { + return err + } + return ep.doFutexWaitSeq(ourSeq) +} + +func (ep *Endpoint) doFutexWaitFirst() error { + return ep.doFutexWaitSeq(0) +} + +func (ep *Endpoint) doFutexNotifyLast() error { + _, err := ep.doFutexNotifySeq() + return err +} + +func (ep *Endpoint) doFutexNotifySeq() (uint32, error) { + ourSeq := atomic.AddUint32(ep.seq(), 1) + if err := ep.futexWake(1); err != nil { + return ourSeq, fmt.Errorf("failed to FUTEX_WAKE peer Endpoint: %v", err) + } + return ourSeq, nil +} + +func (ep *Endpoint) doFutexWaitSeq(prevSeq uint32) error { + nextSeq := prevSeq + 1 + for { + if ep.isShutdown() { + return endpointShutdownError{} + } + if err := ep.futexWait(prevSeq); err != nil { + return fmt.Errorf("failed to FUTEX_WAIT for peer Endpoint: %v", err) + } + seq := atomic.LoadUint32(ep.seq()) + if seq == nextSeq { + return nil + } + if seq != prevSeq { + return fmt.Errorf("invalid packet sequence number %d (expected %d or %d)", seq, prevSeq, nextSeq) + } + } +} + +func (ep *Endpoint) doFutexInterruptForShutdown() { + // Wake MaxInt32 threads to prevent a malicious or broken peer from + // swallowing our wakeup by FUTEX_WAITing from multiple threads. + if err := ep.futexWake(math.MaxInt32); err != nil { + log.Warningf("failed to FUTEX_WAKE Endpoint: %v", err) + } +} + +func (ep *Endpoint) futexWake(numThreads int32) error { + if _, _, e := syscall.RawSyscall(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAKE, uintptr(numThreads)); e != 0 { + return e + } + return nil +} + +func (ep *Endpoint) futexWait(seq uint32) error { + _, _, e := syscall.Syscall6(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAIT, uintptr(seq), 0, 0, 0) + if e != 0 && e != syscall.EAGAIN && e != syscall.EINTR { + return e + } + return nil +} diff --git a/pkg/flipcall/packet_window_allocator.go b/pkg/flipcall/packet_window_allocator.go new file mode 100644 index 000000000..7b455b24d --- /dev/null +++ b/pkg/flipcall/packet_window_allocator.go @@ -0,0 +1,166 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "fmt" + "math/bits" + "os" + "syscall" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/memutil" +) + +var ( + pageSize = os.Getpagesize() + pageMask = pageSize - 1 +) + +func init() { + if bits.OnesCount(uint(pageSize)) != 1 { + // This is depended on by roundUpToPage(). + panic(fmt.Sprintf("system page size (%d) is not a power of 2", pageSize)) + } + if uintptr(pageSize) < packetHeaderBytes { + // This is required since Endpoint.Init() imposes a minimum packet + // window size of 1 page. + panic(fmt.Sprintf("system page size (%d) is less than packet header size (%d)", pageSize, packetHeaderBytes)) + } +} + +// PacketWindowDescriptor represents a packet window, a range of pages in a +// shared memory file that is used to exchange packets between partner +// Endpoints. +type PacketWindowDescriptor struct { + // FD is the file descriptor representing the shared memory file. + FD int + + // Offset is the offset into the shared memory file at which the packet + // window begins. + Offset int64 + + // Length is the size of the packet window in bytes. + Length int +} + +// PacketWindowLengthForDataCap returns the minimum packet window size required +// to accommodate datagrams of the given size in bytes. +func PacketWindowLengthForDataCap(dataCap uint32) int { + return roundUpToPage(int(dataCap) + int(packetHeaderBytes)) +} + +func roundUpToPage(x int) int { + return (x + pageMask) &^ pageMask +} + +// A PacketWindowAllocator owns a shared memory file, and allocates packet +// windows from it. +type PacketWindowAllocator struct { + fd int + nextAlloc int64 + fileSize int64 +} + +// Init must be called on zero-value PacketWindowAllocators before first use. +// If it succeeds, Destroy() must be called once the PacketWindowAllocator is +// no longer in use. +func (pwa *PacketWindowAllocator) Init() error { + fd, err := memutil.CreateMemFD("flipcall_packet_windows", linux.MFD_CLOEXEC|linux.MFD_ALLOW_SEALING) + if err != nil { + return fmt.Errorf("failed to create memfd: %v", err) + } + // Apply F_SEAL_SHRINK to prevent either party from causing SIGBUS in the + // other by truncating the file, and F_SEAL_SEAL to prevent either party + // from applying F_SEAL_GROW or F_SEAL_WRITE. + if _, _, e := syscall.RawSyscall(syscall.SYS_FCNTL, uintptr(fd), linux.F_ADD_SEALS, linux.F_SEAL_SHRINK|linux.F_SEAL_SEAL); e != 0 { + syscall.Close(fd) + return fmt.Errorf("failed to apply memfd seals: %v", e) + } + pwa.fd = fd + return nil +} + +// NewPacketWindowAllocator is a convenience function that returns an +// initialized PacketWindowAllocator allocated on the heap. +func NewPacketWindowAllocator() (*PacketWindowAllocator, error) { + var pwa PacketWindowAllocator + if err := pwa.Init(); err != nil { + return nil, err + } + return &pwa, nil +} + +// Destroy releases resources owned by pwa. This invalidates file descriptors +// previously returned by pwa.FD() and pwd.Allocate(). +func (pwa *PacketWindowAllocator) Destroy() { + syscall.Close(pwa.fd) +} + +// FD represents the file descriptor of the shared memory file backing pwa. +func (pwa *PacketWindowAllocator) FD() int { + return pwa.fd +} + +// Allocate allocates a new packet window of at least the given size and +// returns a PacketWindowDescriptor representing it. +// +// Preconditions: size > 0. +func (pwa *PacketWindowAllocator) Allocate(size int) (PacketWindowDescriptor, error) { + if size <= 0 { + return PacketWindowDescriptor{}, fmt.Errorf("invalid size: %d", size) + } + // Page-align size to ensure that pwa.nextAlloc remains page-aligned. + size = roundUpToPage(size) + if size <= 0 { + return PacketWindowDescriptor{}, fmt.Errorf("size %d overflows after rounding up to page size", size) + } + end := pwa.nextAlloc + int64(size) // overflow checked by ensureFileSize + if err := pwa.ensureFileSize(end); err != nil { + return PacketWindowDescriptor{}, err + } + start := pwa.nextAlloc + pwa.nextAlloc = end + return PacketWindowDescriptor{ + FD: pwa.fd, + Offset: start, + Length: size, + }, nil +} + +func (pwa *PacketWindowAllocator) ensureFileSize(min int64) error { + if min <= 0 { + return fmt.Errorf("file size would overflow") + } + if pwa.fileSize >= min { + return nil + } + newSize := 2 * pwa.fileSize + if newSize == 0 { + newSize = int64(pageSize) + } + for newSize < min { + newNewSize := newSize * 2 + if newNewSize <= 0 { + return fmt.Errorf("file size would overflow") + } + newSize = newNewSize + } + if err := syscall.Ftruncate(pwa.fd, newSize); err != nil { + return fmt.Errorf("ftruncate failed: %v", err) + } + pwa.fileSize = newSize + return nil +} |