diff options
-rw-r--r-- | pkg/flipcall/BUILD | 5 | ||||
-rw-r--r-- | pkg/flipcall/ctrl_futex.go | 146 | ||||
-rw-r--r-- | pkg/flipcall/endpoint_futex.go | 45 | ||||
-rw-r--r-- | pkg/flipcall/endpoint_unsafe.go | 238 | ||||
-rw-r--r-- | pkg/flipcall/flipcall.go | 219 | ||||
-rw-r--r-- | pkg/flipcall/flipcall_example_test.go | 20 | ||||
-rw-r--r-- | pkg/flipcall/flipcall_test.go | 299 | ||||
-rw-r--r-- | pkg/flipcall/flipcall_unsafe.go | 69 | ||||
-rw-r--r-- | pkg/flipcall/futex_linux.go | 103 | ||||
-rw-r--r-- | pkg/flipcall/io.go | 113 | ||||
-rw-r--r-- | pkg/flipcall/packet_window_allocator.go | 6 |
11 files changed, 786 insertions, 477 deletions
diff --git a/pkg/flipcall/BUILD b/pkg/flipcall/BUILD index 7126fc45f..bd1d614b6 100644 --- a/pkg/flipcall/BUILD +++ b/pkg/flipcall/BUILD @@ -5,10 +5,11 @@ package(licenses = ["notice"]) go_library( name = "flipcall", srcs = [ - "endpoint_futex.go", - "endpoint_unsafe.go", + "ctrl_futex.go", "flipcall.go", + "flipcall_unsafe.go", "futex_linux.go", + "io.go", "packet_window_allocator.go", ], importpath = "gvisor.dev/gvisor/pkg/flipcall", diff --git a/pkg/flipcall/ctrl_futex.go b/pkg/flipcall/ctrl_futex.go new file mode 100644 index 000000000..865b6f640 --- /dev/null +++ b/pkg/flipcall/ctrl_futex.go @@ -0,0 +1,146 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "encoding/json" + "fmt" + "math" + "sync/atomic" + + "gvisor.dev/gvisor/pkg/log" +) + +type endpointControlImpl struct { + state int32 +} + +// Bits in endpointControlImpl.state. +const ( + epsBlocked = 1 << iota + epsShutdown +) + +func (ep *Endpoint) ctrlInit(opts ...EndpointOption) error { + if len(opts) != 0 { + return fmt.Errorf("unknown EndpointOption: %T", opts[0]) + } + return nil +} + +type ctrlHandshakeRequest struct{} + +type ctrlHandshakeResponse struct{} + +func (ep *Endpoint) ctrlConnect() error { + if err := ep.enterFutexWait(); err != nil { + return err + } + _, err := ep.futexConnect(&ctrlHandshakeRequest{}) + ep.exitFutexWait() + return err +} + +func (ep *Endpoint) ctrlWaitFirst() error { + if err := ep.enterFutexWait(); err != nil { + return err + } + defer ep.exitFutexWait() + + // Wait for the handshake request. + if err := ep.futexSwitchFromPeer(); err != nil { + return err + } + + // Read the handshake request. + reqLen := atomic.LoadUint32(ep.dataLen()) + if reqLen > ep.dataCap { + return fmt.Errorf("invalid handshake request length %d (maximum %d)", reqLen, ep.dataCap) + } + var req ctrlHandshakeRequest + if err := json.NewDecoder(ep.NewReader(reqLen)).Decode(&req); err != nil { + return fmt.Errorf("error reading handshake request: %v", err) + } + + // Write the handshake response. + w := ep.NewWriter() + if err := json.NewEncoder(w).Encode(ctrlHandshakeResponse{}); err != nil { + return fmt.Errorf("error writing handshake response: %v", err) + } + *ep.dataLen() = w.Len() + + // Return control to the client. + if err := ep.futexSwitchToPeer(); err != nil { + return err + } + + // Wait for the first non-handshake message. + return ep.futexSwitchFromPeer() +} + +func (ep *Endpoint) ctrlRoundTrip() error { + if err := ep.futexSwitchToPeer(); err != nil { + return err + } + if err := ep.enterFutexWait(); err != nil { + return err + } + err := ep.futexSwitchFromPeer() + ep.exitFutexWait() + return err +} + +func (ep *Endpoint) ctrlWakeLast() error { + return ep.futexSwitchToPeer() +} + +func (ep *Endpoint) enterFutexWait() error { + switch eps := atomic.AddInt32(&ep.ctrl.state, epsBlocked); eps { + case epsBlocked: + return nil + case epsBlocked | epsShutdown: + atomic.AddInt32(&ep.ctrl.state, -epsBlocked) + return shutdownError{} + default: + // Most likely due to ep.enterFutexWait() being called concurrently + // from multiple goroutines. + panic(fmt.Sprintf("invalid flipcall.Endpoint.ctrl.state before flipcall.Endpoint.enterFutexWait(): %v", eps-epsBlocked)) + } +} + +func (ep *Endpoint) exitFutexWait() { + atomic.AddInt32(&ep.ctrl.state, -epsBlocked) +} + +func (ep *Endpoint) ctrlShutdown() { + // Set epsShutdown to ensure that future calls to ep.enterFutexWait() fail. + if atomic.AddInt32(&ep.ctrl.state, epsShutdown)&epsBlocked != 0 { + // Wake the blocked thread. This must loop because it's possible that + // FUTEX_WAKE occurs after the waiter sets epsBlocked, but before it + // blocks in FUTEX_WAIT. + for { + // Wake MaxInt32 threads to prevent a broken or malicious peer from + // swallowing our wakeup by FUTEX_WAITing from multiple threads. + if err := ep.futexWakeConnState(math.MaxInt32); err != nil { + log.Warningf("failed to FUTEX_WAKE Endpoints: %v", err) + break + } + yieldThread() + if atomic.LoadInt32(&ep.ctrl.state)&epsBlocked == 0 { + break + } + } + } +} diff --git a/pkg/flipcall/endpoint_futex.go b/pkg/flipcall/endpoint_futex.go deleted file mode 100644 index 5cab02b1d..000000000 --- a/pkg/flipcall/endpoint_futex.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2019 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package flipcall - -import ( - "fmt" -) - -type endpointControlState struct{} - -func (ep *Endpoint) initControlState(ctrlMode ControlMode) error { - if ctrlMode != ControlModeFutex { - return fmt.Errorf("unsupported control mode: %v", ctrlMode) - } - return nil -} - -func (ep *Endpoint) doRoundTrip() error { - return ep.doFutexRoundTrip() -} - -func (ep *Endpoint) doWaitFirst() error { - return ep.doFutexWaitFirst() -} - -func (ep *Endpoint) doNotifyLast() error { - return ep.doFutexNotifyLast() -} - -// Preconditions: ep.isShutdown() == true. -func (ep *Endpoint) interruptForShutdown() { - ep.doFutexInterruptForShutdown() -} diff --git a/pkg/flipcall/endpoint_unsafe.go b/pkg/flipcall/endpoint_unsafe.go deleted file mode 100644 index 8319955e0..000000000 --- a/pkg/flipcall/endpoint_unsafe.go +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright 2019 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package flipcall - -import ( - "fmt" - "math" - "reflect" - "sync/atomic" - "syscall" - "unsafe" -) - -// An Endpoint provides the ability to synchronously transfer data and control -// to a connected peer Endpoint, which may be in another process. -// -// Since the Endpoint control transfer model is synchronous, at any given time -// one Endpoint "has control" (designated the *active* Endpoint), and the other -// is "waiting for control" (designated the *inactive* Endpoint). Users of the -// flipcall package arbitrarily designate one Endpoint as initially-active, and -// the other as initially-inactive; in a client/server protocol, the client -// Endpoint is usually initially-active (able to send a request) and the server -// Endpoint is usually initially-inactive (waiting for a request). The -// initially-active Endpoint writes data to be sent to Endpoint.Data(), and -// then synchronously transfers control to the inactive Endpoint by calling -// Endpoint.SendRecv(), becoming the inactive Endpoint in the process. The -// initially-inactive Endpoint waits for control by calling -// Endpoint.RecvFirst(); receiving control causes it to become the active -// Endpoint. After this, the protocol is symmetric: the active Endpoint reads -// data sent by the peer by reading from Endpoint.Data(), writes data to be -// sent to the peer into Endpoint.Data(), and then calls Endpoint.SendRecv() to -// exchange roles with the peer, which blocks until the peer has done the same. -type Endpoint struct { - // shutdown is non-zero if Endpoint.Shutdown() has been called. shutdown is - // accessed using atomic memory operations. - shutdown uint32 - - // dataCap is the size of the datagram part of the packet window in bytes. - // dataCap is immutable. - dataCap uint32 - - // packet is the beginning of the packet window. packet is immutable. - packet unsafe.Pointer - - ctrl endpointControlState -} - -// Init must be called on zero-value Endpoints before first use. If it -// succeeds, Destroy() must be called once the Endpoint is no longer in use. -// -// ctrlMode specifies how connected Endpoints will exchange control. Both -// connected Endpoints must specify the same value for ctrlMode. -// -// pwd represents the packet window used to exchange data with the peer -// Endpoint. FD may differ between Endpoints if they are in different -// processes, but must represent the same file. The packet window must -// initially be filled with zero bytes. -func (ep *Endpoint) Init(ctrlMode ControlMode, pwd PacketWindowDescriptor) error { - if pwd.Length < pageSize { - return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize) - } - if pwd.Length > math.MaxUint32 { - return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32) - } - m, _, e := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset)) - if e != 0 { - return fmt.Errorf("failed to mmap packet window: %v", e) - } - ep.dataCap = uint32(pwd.Length) - uint32(packetHeaderBytes) - ep.packet = (unsafe.Pointer)(m) - if err := ep.initControlState(ctrlMode); err != nil { - ep.unmapPacket() - return err - } - return nil -} - -// NewEndpoint is a convenience function that returns an initialized Endpoint -// allocated on the heap. -func NewEndpoint(ctrlMode ControlMode, pwd PacketWindowDescriptor) (*Endpoint, error) { - var ep Endpoint - if err := ep.Init(ctrlMode, pwd); err != nil { - return nil, err - } - return &ep, nil -} - -func (ep *Endpoint) unmapPacket() { - syscall.Syscall(syscall.SYS_MUNMAP, uintptr(ep.packet), uintptr(ep.dataCap)+packetHeaderBytes, 0) - ep.dataCap = 0 - ep.packet = nil -} - -// Destroy releases resources owned by ep. No other Endpoint methods may be -// called after Destroy. -func (ep *Endpoint) Destroy() { - ep.unmapPacket() -} - -// Packets consist of an 8-byte header followed by an arbitrarily-sized -// datagram. The header consists of: -// -// - A 4-byte native-endian sequence number, which is incremented by the active -// Endpoint after it finishes writing to the packet window. The sequence number -// is needed to handle spurious wakeups. -// -// - A 4-byte native-endian datagram length in bytes. -const ( - sizeofUint32 = unsafe.Sizeof(uint32(0)) - packetHeaderBytes = 2 * sizeofUint32 -) - -func (ep *Endpoint) seq() *uint32 { - return (*uint32)(ep.packet) -} - -func (ep *Endpoint) dataLen() *uint32 { - return (*uint32)((unsafe.Pointer)(uintptr(ep.packet) + sizeofUint32)) -} - -// DataCap returns the maximum datagram size supported by ep in bytes. -func (ep *Endpoint) DataCap() uint32 { - return ep.dataCap -} - -func (ep *Endpoint) data() unsafe.Pointer { - return unsafe.Pointer(uintptr(ep.packet) + packetHeaderBytes) -} - -// Data returns the datagram part of ep's packet window as a byte slice. -// -// Note that the packet window is shared with the potentially-untrusted peer -// Endpoint, which may concurrently mutate the contents of the packet window. -// Thus: -// -// - Readers must not assume that two reads of the same byte in Data() will -// return the same result. In other words, readers should read any given byte -// in Data() at most once. -// -// - Writers must not assume that they will read back the same data that they -// have written. In other words, writers should avoid reading from Data() at -// all. -func (ep *Endpoint) Data() []byte { - var bs []byte - bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs)) - bsReflect.Data = uintptr(ep.data()) - bsReflect.Len = int(ep.DataCap()) - bsReflect.Cap = bsReflect.Len - return bs -} - -// SendRecv transfers control to the peer Endpoint, causing its call to -// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given -// datagram length, then blocks until the peer Endpoint calls -// Endpoint.SendRecv() or Endpoint.SendLast(). -// -// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has -// returned an error. ep.SendLast() has never been called. -func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) { - dataCap := ep.DataCap() - if dataLen > dataCap { - return 0, fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap) - } - atomic.StoreUint32(ep.dataLen(), dataLen) - if err := ep.doRoundTrip(); err != nil { - return 0, err - } - recvDataLen := atomic.LoadUint32(ep.dataLen()) - if recvDataLen > dataCap { - return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap) - } - return recvDataLen, nil -} - -// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then -// returns the datagram length specified by that call. -// -// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never -// been called. -func (ep *Endpoint) RecvFirst() (uint32, error) { - if err := ep.doWaitFirst(); err != nil { - return 0, err - } - recvDataLen := atomic.LoadUint32(ep.dataLen()) - if dataCap := ep.DataCap(); recvDataLen > dataCap { - return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap) - } - return recvDataLen, nil -} - -// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or -// Endpoint.RecvFirst() to return with the given datagram length. -// -// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has -// returned an error. ep.SendLast() has never been called. -func (ep *Endpoint) SendLast(dataLen uint32) error { - dataCap := ep.DataCap() - if dataLen > dataCap { - return fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap) - } - atomic.StoreUint32(ep.dataLen(), dataLen) - if err := ep.doNotifyLast(); err != nil { - return err - } - return nil -} - -// Shutdown causes concurrent and future calls to ep.SendRecv(), -// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not -// wait for concurrent calls to return. -func (ep *Endpoint) Shutdown() { - if atomic.SwapUint32(&ep.shutdown, 1) == 0 { - ep.interruptForShutdown() - } -} - -func (ep *Endpoint) isShutdown() bool { - return atomic.LoadUint32(&ep.shutdown) != 0 -} - -type endpointShutdownError struct{} - -// Error implements error.Error. -func (endpointShutdownError) Error() string { - return "Endpoint.Shutdown() has been called" -} diff --git a/pkg/flipcall/flipcall.go b/pkg/flipcall/flipcall.go index 79a1e418a..5c9212c33 100644 --- a/pkg/flipcall/flipcall.go +++ b/pkg/flipcall/flipcall.go @@ -13,20 +13,217 @@ // limitations under the License. // Package flipcall implements a protocol providing Fast Local Interprocess -// Procedure Calls. +// Procedure Calls between mutually-distrusting processes. package flipcall -// ControlMode defines how control is exchanged across a connection. -type ControlMode uint8 +import ( + "fmt" + "math" + "sync/atomic" + "syscall" +) -const ( - // ControlModeInvalid is invalid, and exists so that ControlMode fields in - // structs must be explicitly initialized. - ControlModeInvalid ControlMode = iota +// An Endpoint provides the ability to synchronously transfer data and control +// to a connected peer Endpoint, which may be in another process. +// +// Since the Endpoint control transfer model is synchronous, at any given time +// one Endpoint "has control" (designated the active Endpoint), and the other +// is "waiting for control" (designated the inactive Endpoint). Users of the +// flipcall package designate one Endpoint as the client, which is initially +// active, and the other as the server, which is initially inactive. See +// flipcall_example_test.go for usage. +type Endpoint struct { + // packet is a pointer to the beginning of the packet window. (Since this + // is a raw OS memory mapping and not a Go object, it does not need to be + // represented as an unsafe.Pointer.) packet is immutable. + packet uintptr + + // dataCap is the size of the datagram part of the packet window in bytes. + // dataCap is immutable. + dataCap uint32 + + // shutdown is non-zero if Endpoint.Shutdown() has been called, or if the + // Endpoint has acknowledged shutdown initiated by the peer. shutdown is + // accessed using atomic memory operations. + shutdown uint32 + + // activeState is csClientActive if this is a client Endpoint and + // csServerActive if this is a server Endpoint. + activeState uint32 + + // inactiveState is csServerActive if this is a client Endpoint and + // csClientActive if this is a server Endpoint. + inactiveState uint32 + + ctrl endpointControlImpl +} + +// Init must be called on zero-value Endpoints before first use. If it +// succeeds, ep.Destroy() must be called once the Endpoint is no longer in use. +// +// pwd represents the packet window used to exchange data with the peer +// Endpoint. FD may differ between Endpoints if they are in different +// processes, but must represent the same file. The packet window must +// initially be filled with zero bytes. +func (ep *Endpoint) Init(pwd PacketWindowDescriptor, opts ...EndpointOption) error { + if pwd.Length < pageSize { + return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize) + } + if pwd.Length > math.MaxUint32 { + return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32) + } + m, _, e := syscall.RawSyscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset)) + if e != 0 { + return fmt.Errorf("failed to mmap packet window: %v", e) + } + ep.packet = m + ep.dataCap = uint32(pwd.Length) - uint32(PacketHeaderBytes) + // These will be overwritten by ep.Connect() for client Endpoints. + ep.activeState = csServerActive + ep.inactiveState = csClientActive + if err := ep.ctrlInit(opts...); err != nil { + ep.unmapPacket() + return err + } + return nil +} + +// NewEndpoint is a convenience function that returns an initialized Endpoint +// allocated on the heap. +func NewEndpoint(pwd PacketWindowDescriptor, opts ...EndpointOption) (*Endpoint, error) { + var ep Endpoint + if err := ep.Init(pwd, opts...); err != nil { + return nil, err + } + return &ep, nil +} + +// An EndpointOption configures an Endpoint. +type EndpointOption interface { + isEndpointOption() +} + +// Destroy releases resources owned by ep. No other Endpoint methods may be +// called after Destroy. +func (ep *Endpoint) Destroy() { + ep.unmapPacket() +} + +func (ep *Endpoint) unmapPacket() { + syscall.RawSyscall(syscall.SYS_MUNMAP, ep.packet, uintptr(ep.dataCap)+PacketHeaderBytes, 0) + ep.packet = 0 +} - // ControlModeFutex uses shared futex operations on packet control words. - ControlModeFutex +// Shutdown causes concurrent and future calls to ep.Connect(), ep.SendRecv(), +// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not +// wait for concurrent calls to return. The effect of Shutdown on the peer +// Endpoint is unspecified. Successive calls to Shutdown have no effect. +// +// Shutdown is the only Endpoint method that may be called concurrently with +// other methods on the same Endpoint. +func (ep *Endpoint) Shutdown() { + if atomic.SwapUint32(&ep.shutdown, 1) != 0 { + // ep.Shutdown() has previously been called. + return + } + ep.ctrlShutdown() +} + +// isShutdownLocally returns true if ep.Shutdown() has been called. +func (ep *Endpoint) isShutdownLocally() bool { + return atomic.LoadUint32(&ep.shutdown) != 0 +} + +type shutdownError struct{} + +// Error implements error.Error. +func (shutdownError) Error() string { + return "flipcall connection shutdown" +} - // controlModeCount is the number of ControlModes in this list. - controlModeCount +// DataCap returns the maximum datagram size supported by ep. Equivalently, +// DataCap returns len(ep.Data()). +func (ep *Endpoint) DataCap() uint32 { + return ep.dataCap +} + +// Connection state. +const ( + // The client is, by definition, initially active, so this must be 0. + csClientActive = 0 + csServerActive = 1 ) + +// Connect designates ep as a client Endpoint and blocks until the peer +// Endpoint has called Endpoint.RecvFirst(). +// +// Preconditions: ep.Connect(), ep.RecvFirst(), ep.SendRecv(), and +// ep.SendLast() have never been called. +func (ep *Endpoint) Connect() error { + ep.activeState = csClientActive + ep.inactiveState = csServerActive + return ep.ctrlConnect() +} + +// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then +// returns the datagram length specified by that call. +// +// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never +// been called. +func (ep *Endpoint) RecvFirst() (uint32, error) { + if err := ep.ctrlWaitFirst(); err != nil { + return 0, err + } + recvDataLen := atomic.LoadUint32(ep.dataLen()) + if recvDataLen > ep.dataCap { + return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap) + } + return recvDataLen, nil +} + +// SendRecv transfers control to the peer Endpoint, causing its call to +// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given +// datagram length, then blocks until the peer Endpoint calls +// Endpoint.SendRecv() or Endpoint.SendLast(). +// +// Preconditions: dataLen <= ep.DataCap(). No previous call to ep.SendRecv() or +// ep.RecvFirst() has returned an error. ep.SendLast() has never been called. +// If ep is a client Endpoint, ep.Connect() has previously been called and +// returned nil. +func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) { + if dataLen > ep.dataCap { + panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap)) + } + // This store can safely be non-atomic: Under correct operation we should + // be the only thread writing ep.dataLen(), and ep.ctrlRoundTrip() will + // synchronize with the receiver. We will not read from ep.dataLen() until + // after ep.ctrlRoundTrip(), so if the peer is mutating it concurrently then + // they can only shoot themselves in the foot. + *ep.dataLen() = dataLen + if err := ep.ctrlRoundTrip(); err != nil { + return 0, err + } + recvDataLen := atomic.LoadUint32(ep.dataLen()) + if recvDataLen > ep.dataCap { + return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap) + } + return recvDataLen, nil +} + +// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or +// Endpoint.RecvFirst() to return with the given datagram length. +// +// Preconditions: dataLen <= ep.DataCap(). No previous call to ep.SendRecv() or +// ep.RecvFirst() has returned an error. ep.SendLast() has never been called. +// If ep is a client Endpoint, ep.Connect() has previously been called and +// returned nil. +func (ep *Endpoint) SendLast(dataLen uint32) error { + if dataLen > ep.dataCap { + panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap)) + } + *ep.dataLen() = dataLen + if err := ep.ctrlWakeLast(); err != nil { + return err + } + return nil +} diff --git a/pkg/flipcall/flipcall_example_test.go b/pkg/flipcall/flipcall_example_test.go index 572a1f119..edb6a8bef 100644 --- a/pkg/flipcall/flipcall_example_test.go +++ b/pkg/flipcall/flipcall_example_test.go @@ -17,6 +17,7 @@ package flipcall import ( "bytes" "fmt" + "sync" ) func Example() { @@ -36,20 +37,21 @@ func Example() { if err != nil { panic(err) } - clientEP, err := NewEndpoint(ControlModeFutex, pwd) - if err != nil { + var clientEP Endpoint + if err := clientEP.Init(pwd); err != nil { panic(err) } defer clientEP.Destroy() - serverEP, err := NewEndpoint(ControlModeFutex, pwd) - if err != nil { + var serverEP Endpoint + if err := serverEP.Init(pwd); err != nil { panic(err) } defer serverEP.Destroy() - serverDone := make(chan struct{}) + var serverRun sync.WaitGroup + serverRun.Add(1) go func() { - defer func() { serverDone <- struct{}{} }() + defer serverRun.Done() i := 0 var buf bytes.Buffer // wait for first request @@ -76,9 +78,13 @@ func Example() { }() defer func() { serverEP.Shutdown() - <-serverDone + serverRun.Wait() }() + // establish connection as client + if err := clientEP.Connect(); err != nil { + panic(err) + } var buf bytes.Buffer for i := 0; i < count; i++ { // write request diff --git a/pkg/flipcall/flipcall_test.go b/pkg/flipcall/flipcall_test.go index 20d3002f0..da9d736ab 100644 --- a/pkg/flipcall/flipcall_test.go +++ b/pkg/flipcall/flipcall_test.go @@ -15,197 +15,240 @@ package flipcall import ( + "runtime" + "sync" "testing" "time" ) var testPacketWindowSize = pageSize -func testSendRecv(t *testing.T, ctrlMode ControlMode) { - pwa, err := NewPacketWindowAllocator() - if err != nil { - t.Fatalf("failed to create PacketWindowAllocator: %v", err) +type testConnection struct { + pwa PacketWindowAllocator + clientEP Endpoint + serverEP Endpoint +} + +func newTestConnectionWithOptions(tb testing.TB, clientOpts, serverOpts []EndpointOption) *testConnection { + c := &testConnection{} + if err := c.pwa.Init(); err != nil { + tb.Fatalf("failed to create PacketWindowAllocator: %v", err) } - defer pwa.Destroy() - pwd, err := pwa.Allocate(testPacketWindowSize) + pwd, err := c.pwa.Allocate(testPacketWindowSize) if err != nil { - t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err) + c.pwa.Destroy() + tb.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err) } - - sendEP, err := NewEndpoint(ctrlMode, pwd) - if err != nil { - t.Fatalf("failed to create Endpoint: %v", err) + if err := c.clientEP.Init(pwd, clientOpts...); err != nil { + c.pwa.Destroy() + tb.Fatalf("failed to create client Endpoint: %v", err) } - defer sendEP.Destroy() - recvEP, err := NewEndpoint(ctrlMode, pwd) - if err != nil { - t.Fatalf("failed to create Endpoint: %v", err) + if err := c.serverEP.Init(pwd, serverOpts...); err != nil { + c.pwa.Destroy() + c.clientEP.Destroy() + tb.Fatalf("failed to create server Endpoint: %v", err) } - defer recvEP.Destroy() + return c +} - otherThreadDone := make(chan struct{}) +func newTestConnection(tb testing.TB) *testConnection { + return newTestConnectionWithOptions(tb, nil, nil) +} + +func (c *testConnection) destroy() { + c.pwa.Destroy() + c.clientEP.Destroy() + c.serverEP.Destroy() +} + +func testSendRecv(t *testing.T, c *testConnection) { + var serverRun sync.WaitGroup + serverRun.Add(1) go func() { - defer func() { otherThreadDone <- struct{}{} }() - t.Logf("initially-inactive Endpoint waiting for packet 1") - if _, err := recvEP.RecvFirst(); err != nil { - t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err) + defer serverRun.Done() + t.Logf("server Endpoint waiting for packet 1") + if _, err := c.serverEP.RecvFirst(); err != nil { + t.Fatalf("server Endpoint.RecvFirst() failed: %v", err) } - t.Logf("initially-inactive Endpoint got packet 1, sending packet 2 and waiting for packet 3") - if _, err := recvEP.SendRecv(0); err != nil { - t.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err) + t.Logf("server Endpoint got packet 1, sending packet 2 and waiting for packet 3") + if _, err := c.serverEP.SendRecv(0); err != nil { + t.Fatalf("server Endpoint.SendRecv() failed: %v", err) } - t.Logf("initially-inactive Endpoint got packet 3") + t.Logf("server Endpoint got packet 3") }() defer func() { - t.Logf("waiting for initially-inactive Endpoint goroutine to complete") - <-otherThreadDone + // Ensure that the server goroutine is cleaned up before + // c.serverEP.Destroy(), even if the test fails. + c.serverEP.Shutdown() + serverRun.Wait() }() - t.Logf("initially-active Endpoint sending packet 1 and waiting for packet 2") - if _, err := sendEP.SendRecv(0); err != nil { - t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err) + t.Logf("client Endpoint establishing connection") + if err := c.clientEP.Connect(); err != nil { + t.Fatalf("client Endpoint.Connect() failed: %v", err) } - t.Logf("initially-active Endpoint got packet 2, sending packet 3") - if err := sendEP.SendLast(0); err != nil { - t.Fatalf("initially-active Endpoint.SendLast() failed: %v", err) + t.Logf("client Endpoint sending packet 1 and waiting for packet 2") + if _, err := c.clientEP.SendRecv(0); err != nil { + t.Fatalf("client Endpoint.SendRecv() failed: %v", err) } + t.Logf("client Endpoint got packet 2, sending packet 3") + if err := c.clientEP.SendLast(0); err != nil { + t.Fatalf("client Endpoint.SendLast() failed: %v", err) + } + t.Logf("waiting for server goroutine to complete") + serverRun.Wait() } -func TestFutexSendRecv(t *testing.T) { - testSendRecv(t, ControlModeFutex) +func TestSendRecv(t *testing.T) { + c := newTestConnection(t) + defer c.destroy() + testSendRecv(t, c) } -func testRecvFirstShutdown(t *testing.T, ctrlMode ControlMode) { - pwa, err := NewPacketWindowAllocator() - if err != nil { - t.Fatalf("failed to create PacketWindowAllocator: %v", err) - } - defer pwa.Destroy() - pwd, err := pwa.Allocate(testPacketWindowSize) - if err != nil { - t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err) - } +func testShutdownConnect(t *testing.T, c *testConnection) { + var clientRun sync.WaitGroup + clientRun.Add(1) + go func() { + defer clientRun.Done() + if err := c.clientEP.Connect(); err == nil { + t.Errorf("client Endpoint.Connect() succeeded unexpectedly") + } + }() + time.Sleep(time.Second) // to allow c.clientEP.Connect() to block + c.clientEP.Shutdown() + clientRun.Wait() +} - ep, err := NewEndpoint(ctrlMode, pwd) - if err != nil { - t.Fatalf("failed to create Endpoint: %v", err) - } - defer ep.Destroy() +func TestShutdownConnect(t *testing.T) { + c := newTestConnection(t) + defer c.destroy() + testShutdownConnect(t, c) +} - otherThreadDone := make(chan struct{}) +func testShutdownRecvFirstBeforeConnect(t *testing.T, c *testConnection) { + var serverRun sync.WaitGroup + serverRun.Add(1) go func() { - defer func() { otherThreadDone <- struct{}{} }() - _, err := ep.RecvFirst() + defer serverRun.Done() + _, err := c.serverEP.RecvFirst() if err == nil { - t.Errorf("Endpoint.RecvFirst() succeeded unexpectedly") + t.Errorf("server Endpoint.RecvFirst() succeeded unexpectedly") } }() - - time.Sleep(time.Second) // to ensure ep.RecvFirst() has blocked - ep.Shutdown() - <-otherThreadDone + time.Sleep(time.Second) // to allow c.serverEP.RecvFirst() to block + c.serverEP.Shutdown() + serverRun.Wait() } -func TestFutexRecvFirstShutdown(t *testing.T) { - testRecvFirstShutdown(t, ControlModeFutex) +func TestShutdownRecvFirstBeforeConnect(t *testing.T) { + c := newTestConnection(t) + defer c.destroy() + testShutdownRecvFirstBeforeConnect(t, c) } -func testSendRecvShutdown(t *testing.T, ctrlMode ControlMode) { - pwa, err := NewPacketWindowAllocator() - if err != nil { - t.Fatalf("failed to create PacketWindowAllocator: %v", err) - } - defer pwa.Destroy() - pwd, err := pwa.Allocate(testPacketWindowSize) - if err != nil { - t.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err) - } - - sendEP, err := NewEndpoint(ctrlMode, pwd) - if err != nil { - t.Fatalf("failed to create Endpoint: %v", err) - } - defer sendEP.Destroy() - recvEP, err := NewEndpoint(ctrlMode, pwd) - if err != nil { - t.Fatalf("failed to create Endpoint: %v", err) - } - defer recvEP.Destroy() - - otherThreadDone := make(chan struct{}) +func testShutdownRecvFirstAfterConnect(t *testing.T, c *testConnection) { + var serverRun sync.WaitGroup + serverRun.Add(1) go func() { - defer func() { otherThreadDone <- struct{}{} }() - if _, err := recvEP.RecvFirst(); err != nil { - t.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err) - } - if _, err := recvEP.SendRecv(0); err == nil { - t.Errorf("initially-inactive Endpoint.SendRecv() succeeded unexpectedly") + defer serverRun.Done() + if _, err := c.serverEP.RecvFirst(); err == nil { + t.Fatalf("server Endpoint.RecvFirst() succeeded unexpectedly") } }() - - if _, err := sendEP.SendRecv(0); err != nil { - t.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err) + defer func() { + // Ensure that the server goroutine is cleaned up before + // c.serverEP.Destroy(), even if the test fails. + c.serverEP.Shutdown() + serverRun.Wait() + }() + if err := c.clientEP.Connect(); err != nil { + t.Fatalf("client Endpoint.Connect() failed: %v", err) } - time.Sleep(time.Second) // to ensure recvEP.SendRecv() has blocked - recvEP.Shutdown() - <-otherThreadDone + c.serverEP.Shutdown() + serverRun.Wait() } -func TestFutexSendRecvShutdown(t *testing.T) { - testSendRecvShutdown(t, ControlModeFutex) +func TestShutdownRecvFirstAfterConnect(t *testing.T) { + c := newTestConnection(t) + defer c.destroy() + testShutdownRecvFirstAfterConnect(t, c) } -func benchmarkSendRecv(b *testing.B, ctrlMode ControlMode) { - pwa, err := NewPacketWindowAllocator() - if err != nil { - b.Fatalf("failed to create PacketWindowAllocator: %v", err) +func testShutdownSendRecv(t *testing.T, c *testConnection) { + var serverRun sync.WaitGroup + serverRun.Add(1) + go func() { + defer serverRun.Done() + if _, err := c.serverEP.RecvFirst(); err != nil { + t.Fatalf("server Endpoint.RecvFirst() failed: %v", err) + } + if _, err := c.serverEP.SendRecv(0); err == nil { + t.Errorf("server Endpoint.SendRecv() succeeded unexpectedly") + } + }() + defer func() { + // Ensure that the server goroutine is cleaned up before + // c.serverEP.Destroy(), even if the test fails. + c.serverEP.Shutdown() + serverRun.Wait() + }() + if err := c.clientEP.Connect(); err != nil { + t.Fatalf("client Endpoint.Connect() failed: %v", err) } - defer pwa.Destroy() - pwd, err := pwa.Allocate(testPacketWindowSize) - if err != nil { - b.Fatalf("PacketWindowAllocator.Allocate() failed: %v", err) + if _, err := c.clientEP.SendRecv(0); err != nil { + t.Fatalf("client Endpoint.SendRecv() failed: %v", err) } + time.Sleep(time.Second) // to allow serverEP.SendRecv() to block + c.serverEP.Shutdown() + serverRun.Wait() +} - sendEP, err := NewEndpoint(ctrlMode, pwd) - if err != nil { - b.Fatalf("failed to create Endpoint: %v", err) - } - defer sendEP.Destroy() - recvEP, err := NewEndpoint(ctrlMode, pwd) - if err != nil { - b.Fatalf("failed to create Endpoint: %v", err) - } - defer recvEP.Destroy() +func TestShutdownSendRecv(t *testing.T) { + c := newTestConnection(t) + defer c.destroy() + testShutdownSendRecv(t, c) +} - otherThreadDone := make(chan struct{}) +func benchmarkSendRecv(b *testing.B, c *testConnection) { + var serverRun sync.WaitGroup + serverRun.Add(1) go func() { - defer func() { otherThreadDone <- struct{}{} }() + defer serverRun.Done() if b.N == 0 { return } - if _, err := recvEP.RecvFirst(); err != nil { - b.Fatalf("initially-inactive Endpoint.RecvFirst() failed: %v", err) + if _, err := c.serverEP.RecvFirst(); err != nil { + b.Fatalf("server Endpoint.RecvFirst() failed: %v", err) } for i := 1; i < b.N; i++ { - if _, err := recvEP.SendRecv(0); err != nil { - b.Fatalf("initially-inactive Endpoint.SendRecv() failed: %v", err) + if _, err := c.serverEP.SendRecv(0); err != nil { + b.Fatalf("server Endpoint.SendRecv() failed: %v", err) } } - if err := recvEP.SendLast(0); err != nil { - b.Fatalf("initially-inactive Endpoint.SendLast() failed: %v", err) + if err := c.serverEP.SendLast(0); err != nil { + b.Fatalf("server Endpoint.SendLast() failed: %v", err) } }() - defer func() { <-otherThreadDone }() + defer func() { + c.serverEP.Shutdown() + serverRun.Wait() + }() + if err := c.clientEP.Connect(); err != nil { + b.Fatalf("client Endpoint.Connect() failed: %v", err) + } + runtime.GC() b.ResetTimer() for i := 0; i < b.N; i++ { - if _, err := sendEP.SendRecv(0); err != nil { - b.Fatalf("initially-active Endpoint.SendRecv() failed: %v", err) + if _, err := c.clientEP.SendRecv(0); err != nil { + b.Fatalf("client Endpoint.SendRecv() failed: %v", err) } } b.StopTimer() } -func BenchmarkFutexSendRecv(b *testing.B) { - benchmarkSendRecv(b, ControlModeFutex) +func BenchmarkSendRecv(b *testing.B) { + c := newTestConnection(b) + defer c.destroy() + benchmarkSendRecv(b, c) } diff --git a/pkg/flipcall/flipcall_unsafe.go b/pkg/flipcall/flipcall_unsafe.go new file mode 100644 index 000000000..7c8977893 --- /dev/null +++ b/pkg/flipcall/flipcall_unsafe.go @@ -0,0 +1,69 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "reflect" + "unsafe" +) + +// Packets consist of an 8-byte header followed by an arbitrarily-sized +// datagram. The header consists of: +// +// - A 4-byte native-endian connection state. +// +// - A 4-byte native-endian datagram length in bytes. +const ( + sizeofUint32 = unsafe.Sizeof(uint32(0)) + + // PacketHeaderBytes is the size of a flipcall packet header in bytes. The + // maximum datagram size supported by a flipcall connection is equal to the + // length of the packet window minus PacketHeaderBytes. + // + // PacketHeaderBytes is exported to support its use in constant + // expressions. Non-constant expressions may prefer to use + // PacketWindowLengthForDataCap(). + PacketHeaderBytes = 2 * sizeofUint32 +) + +func (ep *Endpoint) connState() *uint32 { + return (*uint32)((unsafe.Pointer)(ep.packet)) +} + +func (ep *Endpoint) dataLen() *uint32 { + return (*uint32)((unsafe.Pointer)(ep.packet + sizeofUint32)) +} + +// Data returns the datagram part of ep's packet window as a byte slice. +// +// Note that the packet window is shared with the potentially-untrusted peer +// Endpoint, which may concurrently mutate the contents of the packet window. +// Thus: +// +// - Readers must not assume that two reads of the same byte in Data() will +// return the same result. In other words, readers should read any given byte +// in Data() at most once. +// +// - Writers must not assume that they will read back the same data that they +// have written. In other words, writers should avoid reading from Data() at +// all. +func (ep *Endpoint) Data() []byte { + var bs []byte + bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs)) + bsReflect.Data = ep.packet + PacketHeaderBytes + bsReflect.Len = int(ep.dataCap) + bsReflect.Cap = int(ep.dataCap) + return bs +} diff --git a/pkg/flipcall/futex_linux.go b/pkg/flipcall/futex_linux.go index 3f592ad16..e7dd812b3 100644 --- a/pkg/flipcall/futex_linux.go +++ b/pkg/flipcall/futex_linux.go @@ -17,78 +17,95 @@ package flipcall import ( + "encoding/json" "fmt" - "math" + "runtime" "sync/atomic" "syscall" "gvisor.dev/gvisor/pkg/abi/linux" - "gvisor.dev/gvisor/pkg/log" ) -func (ep *Endpoint) doFutexRoundTrip() error { - ourSeq, err := ep.doFutexNotifySeq() - if err != nil { - return err +func (ep *Endpoint) futexConnect(req *ctrlHandshakeRequest) (ctrlHandshakeResponse, error) { + var resp ctrlHandshakeResponse + + // Write the handshake request. + w := ep.NewWriter() + if err := json.NewEncoder(w).Encode(req); err != nil { + return resp, fmt.Errorf("error writing handshake request: %v", err) } - return ep.doFutexWaitSeq(ourSeq) -} + *ep.dataLen() = w.Len() -func (ep *Endpoint) doFutexWaitFirst() error { - return ep.doFutexWaitSeq(0) -} + // Exchange control with the server. + if err := ep.futexSwitchToPeer(); err != nil { + return resp, err + } + if err := ep.futexSwitchFromPeer(); err != nil { + return resp, err + } -func (ep *Endpoint) doFutexNotifyLast() error { - _, err := ep.doFutexNotifySeq() - return err + // Read the handshake response. + respLen := atomic.LoadUint32(ep.dataLen()) + if respLen > ep.dataCap { + return resp, fmt.Errorf("invalid handshake response length %d (maximum %d)", respLen, ep.dataCap) + } + if err := json.NewDecoder(ep.NewReader(respLen)).Decode(&resp); err != nil { + return resp, fmt.Errorf("error reading handshake response: %v", err) + } + + return resp, nil } -func (ep *Endpoint) doFutexNotifySeq() (uint32, error) { - ourSeq := atomic.AddUint32(ep.seq(), 1) - if err := ep.futexWake(1); err != nil { - return ourSeq, fmt.Errorf("failed to FUTEX_WAKE peer Endpoint: %v", err) +func (ep *Endpoint) futexSwitchToPeer() error { + // Update connection state to indicate that the peer should be active. + if !atomic.CompareAndSwapUint32(ep.connState(), ep.activeState, ep.inactiveState) { + return fmt.Errorf("unexpected connection state before FUTEX_WAKE: %v", atomic.LoadUint32(ep.connState())) } - return ourSeq, nil + + // Wake the peer's Endpoint.futexSwitchFromPeer(). + if err := ep.futexWakeConnState(1); err != nil { + return fmt.Errorf("failed to FUTEX_WAKE peer Endpoint: %v", err) + } + return nil } -func (ep *Endpoint) doFutexWaitSeq(prevSeq uint32) error { - nextSeq := prevSeq + 1 +func (ep *Endpoint) futexSwitchFromPeer() error { for { - if ep.isShutdown() { - return endpointShutdownError{} - } - if err := ep.futexWait(prevSeq); err != nil { - return fmt.Errorf("failed to FUTEX_WAIT for peer Endpoint: %v", err) - } - seq := atomic.LoadUint32(ep.seq()) - if seq == nextSeq { + switch cs := atomic.LoadUint32(ep.connState()); cs { + case ep.activeState: return nil + case ep.inactiveState: + // Continue to FUTEX_WAIT. + default: + return fmt.Errorf("unexpected connection state before FUTEX_WAIT: %v", cs) } - if seq != prevSeq { - return fmt.Errorf("invalid packet sequence number %d (expected %d or %d)", seq, prevSeq, nextSeq) + if ep.isShutdownLocally() { + return shutdownError{} + } + if err := ep.futexWaitConnState(ep.inactiveState); err != nil { + return fmt.Errorf("failed to FUTEX_WAIT for peer Endpoint: %v", err) } } } -func (ep *Endpoint) doFutexInterruptForShutdown() { - // Wake MaxInt32 threads to prevent a malicious or broken peer from - // swallowing our wakeup by FUTEX_WAITing from multiple threads. - if err := ep.futexWake(math.MaxInt32); err != nil { - log.Warningf("failed to FUTEX_WAKE Endpoint: %v", err) - } -} - -func (ep *Endpoint) futexWake(numThreads int32) error { - if _, _, e := syscall.RawSyscall(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAKE, uintptr(numThreads)); e != 0 { +func (ep *Endpoint) futexWakeConnState(numThreads int32) error { + if _, _, e := syscall.RawSyscall(syscall.SYS_FUTEX, ep.packet, linux.FUTEX_WAKE, uintptr(numThreads)); e != 0 { return e } return nil } -func (ep *Endpoint) futexWait(seq uint32) error { - _, _, e := syscall.Syscall6(syscall.SYS_FUTEX, uintptr(ep.packet), linux.FUTEX_WAIT, uintptr(seq), 0, 0, 0) +func (ep *Endpoint) futexWaitConnState(curState uint32) error { + _, _, e := syscall.Syscall6(syscall.SYS_FUTEX, ep.packet, linux.FUTEX_WAIT, uintptr(curState), 0, 0, 0) if e != 0 && e != syscall.EAGAIN && e != syscall.EINTR { return e } return nil } + +func yieldThread() { + syscall.Syscall(syscall.SYS_SCHED_YIELD, 0, 0, 0) + // The thread we're trying to yield to may be waiting for a Go runtime P. + // runtime.Gosched() will hand off ours if necessary. + runtime.Gosched() +} diff --git a/pkg/flipcall/io.go b/pkg/flipcall/io.go new file mode 100644 index 000000000..85e40b932 --- /dev/null +++ b/pkg/flipcall/io.go @@ -0,0 +1,113 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "fmt" + "io" +) + +// DatagramReader implements io.Reader by reading a datagram from an Endpoint's +// packet window. Its use is optional; users that can use Endpoint.Data() more +// efficiently are advised to do so. +type DatagramReader struct { + ep *Endpoint + off uint32 + end uint32 +} + +// Init must be called on zero-value DatagramReaders before first use. +// +// Preconditions: dataLen is 0, or was returned by a previous call to +// ep.RecvFirst() or ep.SendRecv(). +func (r *DatagramReader) Init(ep *Endpoint, dataLen uint32) { + r.ep = ep + r.Reset(dataLen) +} + +// Reset causes r to begin reading a new datagram of the given length from the +// associated Endpoint. +// +// Preconditions: dataLen is 0, or was returned by a previous call to the +// associated Endpoint's RecvFirst() or SendRecv() methods. +func (r *DatagramReader) Reset(dataLen uint32) { + if dataLen > r.ep.dataCap { + panic(fmt.Sprintf("invalid dataLen (%d) > ep.dataCap (%d)", dataLen, r.ep.dataCap)) + } + r.off = 0 + r.end = dataLen +} + +// NewReader is a convenience function that returns an initialized +// DatagramReader allocated on the heap. +// +// Preconditions: dataLen was returned by a previous call to ep.RecvFirst() or +// ep.SendRecv(). +func (ep *Endpoint) NewReader(dataLen uint32) *DatagramReader { + r := &DatagramReader{} + r.Init(ep, dataLen) + return r +} + +// Read implements io.Reader.Read. +func (r *DatagramReader) Read(dst []byte) (int, error) { + n := copy(dst, r.ep.Data()[r.off:r.end]) + r.off += uint32(n) + if r.off == r.end { + return n, io.EOF + } + return n, nil +} + +// DatagramWriter implements io.Writer by writing a datagram to an Endpoint's +// packet window. Its use is optional; users that can use Endpoint.Data() more +// efficiently are advised to do so. +type DatagramWriter struct { + ep *Endpoint + off uint32 +} + +// Init must be called on zero-value DatagramWriters before first use. +func (w *DatagramWriter) Init(ep *Endpoint) { + w.ep = ep +} + +// Reset causes w to begin writing a new datagram to the associated Endpoint. +func (w *DatagramWriter) Reset() { + w.off = 0 +} + +// NewWriter is a convenience function that returns an initialized +// DatagramWriter allocated on the heap. +func (ep *Endpoint) NewWriter() *DatagramWriter { + w := &DatagramWriter{} + w.Init(ep) + return w +} + +// Write implements io.Writer.Write. +func (w *DatagramWriter) Write(src []byte) (int, error) { + n := copy(w.ep.Data()[w.off:w.ep.dataCap], src) + w.off += uint32(n) + if n != len(src) { + return n, fmt.Errorf("datagram would exceed maximum size of %d bytes", w.ep.dataCap) + } + return n, nil +} + +// Len returns the length of the written datagram. +func (w *DatagramWriter) Len() uint32 { + return w.off +} diff --git a/pkg/flipcall/packet_window_allocator.go b/pkg/flipcall/packet_window_allocator.go index 7b455b24d..ccb918fab 100644 --- a/pkg/flipcall/packet_window_allocator.go +++ b/pkg/flipcall/packet_window_allocator.go @@ -34,10 +34,10 @@ func init() { // This is depended on by roundUpToPage(). panic(fmt.Sprintf("system page size (%d) is not a power of 2", pageSize)) } - if uintptr(pageSize) < packetHeaderBytes { + if uintptr(pageSize) < PacketHeaderBytes { // This is required since Endpoint.Init() imposes a minimum packet // window size of 1 page. - panic(fmt.Sprintf("system page size (%d) is less than packet header size (%d)", pageSize, packetHeaderBytes)) + panic(fmt.Sprintf("system page size (%d) is less than packet header size (%d)", pageSize, PacketHeaderBytes)) } } @@ -59,7 +59,7 @@ type PacketWindowDescriptor struct { // PacketWindowLengthForDataCap returns the minimum packet window size required // to accommodate datagrams of the given size in bytes. func PacketWindowLengthForDataCap(dataCap uint32) int { - return roundUpToPage(int(dataCap) + int(packetHeaderBytes)) + return roundUpToPage(int(dataCap) + int(PacketHeaderBytes)) } func roundUpToPage(x int) int { |