summaryrefslogtreecommitdiffhomepage
path: root/pkg/flipcall/endpoint_unsafe.go
blob: 8319955e083abaf9f70063dd97617ebfcbc660d4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
// Copyright 2019 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flipcall

import (
	"fmt"
	"math"
	"reflect"
	"sync/atomic"
	"syscall"
	"unsafe"
)

// An Endpoint provides the ability to synchronously transfer data and control
// to a connected peer Endpoint, which may be in another process.
//
// Since the Endpoint control transfer model is synchronous, at any given time
// one Endpoint "has control" (designated the *active* Endpoint), and the other
// is "waiting for control" (designated the *inactive* Endpoint). Users of the
// flipcall package arbitrarily designate one Endpoint as initially-active, and
// the other as initially-inactive; in a client/server protocol, the client
// Endpoint is usually initially-active (able to send a request) and the server
// Endpoint is usually initially-inactive (waiting for a request). The
// initially-active Endpoint writes data to be sent to Endpoint.Data(), and
// then synchronously transfers control to the inactive Endpoint by calling
// Endpoint.SendRecv(), becoming the inactive Endpoint in the process. The
// initially-inactive Endpoint waits for control by calling
// Endpoint.RecvFirst(); receiving control causes it to become the active
// Endpoint. After this, the protocol is symmetric: the active Endpoint reads
// data sent by the peer by reading from Endpoint.Data(), writes data to be
// sent to the peer into Endpoint.Data(), and then calls Endpoint.SendRecv() to
// exchange roles with the peer, which blocks until the peer has done the same.
type Endpoint struct {
	// shutdown is non-zero if Endpoint.Shutdown() has been called. shutdown is
	// accessed using atomic memory operations.
	shutdown uint32

	// dataCap is the size of the datagram part of the packet window in bytes.
	// dataCap is immutable.
	dataCap uint32

	// packet is the beginning of the packet window. packet is immutable.
	packet unsafe.Pointer

	ctrl endpointControlState
}

// Init must be called on zero-value Endpoints before first use. If it
// succeeds, Destroy() must be called once the Endpoint is no longer in use.
//
// ctrlMode specifies how connected Endpoints will exchange control. Both
// connected Endpoints must specify the same value for ctrlMode.
//
// pwd represents the packet window used to exchange data with the peer
// Endpoint. FD may differ between Endpoints if they are in different
// processes, but must represent the same file. The packet window must
// initially be filled with zero bytes.
func (ep *Endpoint) Init(ctrlMode ControlMode, pwd PacketWindowDescriptor) error {
	if pwd.Length < pageSize {
		return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize)
	}
	if pwd.Length > math.MaxUint32 {
		return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32)
	}
	m, _, e := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset))
	if e != 0 {
		return fmt.Errorf("failed to mmap packet window: %v", e)
	}
	ep.dataCap = uint32(pwd.Length) - uint32(packetHeaderBytes)
	ep.packet = (unsafe.Pointer)(m)
	if err := ep.initControlState(ctrlMode); err != nil {
		ep.unmapPacket()
		return err
	}
	return nil
}

// NewEndpoint is a convenience function that returns an initialized Endpoint
// allocated on the heap.
func NewEndpoint(ctrlMode ControlMode, pwd PacketWindowDescriptor) (*Endpoint, error) {
	var ep Endpoint
	if err := ep.Init(ctrlMode, pwd); err != nil {
		return nil, err
	}
	return &ep, nil
}

func (ep *Endpoint) unmapPacket() {
	syscall.Syscall(syscall.SYS_MUNMAP, uintptr(ep.packet), uintptr(ep.dataCap)+packetHeaderBytes, 0)
	ep.dataCap = 0
	ep.packet = nil
}

// Destroy releases resources owned by ep. No other Endpoint methods may be
// called after Destroy.
func (ep *Endpoint) Destroy() {
	ep.unmapPacket()
}

// Packets consist of an 8-byte header followed by an arbitrarily-sized
// datagram. The header consists of:
//
// - A 4-byte native-endian sequence number, which is incremented by the active
// Endpoint after it finishes writing to the packet window. The sequence number
// is needed to handle spurious wakeups.
//
// - A 4-byte native-endian datagram length in bytes.
const (
	sizeofUint32      = unsafe.Sizeof(uint32(0))
	packetHeaderBytes = 2 * sizeofUint32
)

func (ep *Endpoint) seq() *uint32 {
	return (*uint32)(ep.packet)
}

func (ep *Endpoint) dataLen() *uint32 {
	return (*uint32)((unsafe.Pointer)(uintptr(ep.packet) + sizeofUint32))
}

// DataCap returns the maximum datagram size supported by ep in bytes.
func (ep *Endpoint) DataCap() uint32 {
	return ep.dataCap
}

func (ep *Endpoint) data() unsafe.Pointer {
	return unsafe.Pointer(uintptr(ep.packet) + packetHeaderBytes)
}

// Data returns the datagram part of ep's packet window as a byte slice.
//
// Note that the packet window is shared with the potentially-untrusted peer
// Endpoint, which may concurrently mutate the contents of the packet window.
// Thus:
//
// - Readers must not assume that two reads of the same byte in Data() will
// return the same result. In other words, readers should read any given byte
// in Data() at most once.
//
// - Writers must not assume that they will read back the same data that they
// have written. In other words, writers should avoid reading from Data() at
// all.
func (ep *Endpoint) Data() []byte {
	var bs []byte
	bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs))
	bsReflect.Data = uintptr(ep.data())
	bsReflect.Len = int(ep.DataCap())
	bsReflect.Cap = bsReflect.Len
	return bs
}

// SendRecv transfers control to the peer Endpoint, causing its call to
// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given
// datagram length, then blocks until the peer Endpoint calls
// Endpoint.SendRecv() or Endpoint.SendLast().
//
// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
// returned an error. ep.SendLast() has never been called.
func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
	dataCap := ep.DataCap()
	if dataLen > dataCap {
		return 0, fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
	}
	atomic.StoreUint32(ep.dataLen(), dataLen)
	if err := ep.doRoundTrip(); err != nil {
		return 0, err
	}
	recvDataLen := atomic.LoadUint32(ep.dataLen())
	if recvDataLen > dataCap {
		return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
	}
	return recvDataLen, nil
}

// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then
// returns the datagram length specified by that call.
//
// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never
// been called.
func (ep *Endpoint) RecvFirst() (uint32, error) {
	if err := ep.doWaitFirst(); err != nil {
		return 0, err
	}
	recvDataLen := atomic.LoadUint32(ep.dataLen())
	if dataCap := ep.DataCap(); recvDataLen > dataCap {
		return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
	}
	return recvDataLen, nil
}

// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or
// Endpoint.RecvFirst() to return with the given datagram length.
//
// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
// returned an error. ep.SendLast() has never been called.
func (ep *Endpoint) SendLast(dataLen uint32) error {
	dataCap := ep.DataCap()
	if dataLen > dataCap {
		return fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
	}
	atomic.StoreUint32(ep.dataLen(), dataLen)
	if err := ep.doNotifyLast(); err != nil {
		return err
	}
	return nil
}

// Shutdown causes concurrent and future calls to ep.SendRecv(),
// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not
// wait for concurrent calls to return.
func (ep *Endpoint) Shutdown() {
	if atomic.SwapUint32(&ep.shutdown, 1) == 0 {
		ep.interruptForShutdown()
	}
}

func (ep *Endpoint) isShutdown() bool {
	return atomic.LoadUint32(&ep.shutdown) != 0
}

type endpointShutdownError struct{}

// Error implements error.Error.
func (endpointShutdownError) Error() string {
	return "Endpoint.Shutdown() has been called"
}