diff options
author | Jamie Liu <jamieliu@google.com> | 2019-07-31 12:54:49 -0700 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2019-07-31 12:56:04 -0700 |
commit | cbe145247a748177e62b268f402cbe76e5b1ca35 (patch) | |
tree | 079949c800a6f8f50f242f559a302f73f9c8fa7e /pkg/flipcall/ctrl_futex.go | |
parent | cf2b2d97d512a91261f72abe40b163c61d52705f (diff) |
Flipcall refinements.
Note that some of these changes affect the protocol in
backward-incompatible ways.
- Replace use of "initially-active" and "initially-inactive" with
"client" and "server" respectively for clarity.
- Fix a race condition involving Endpoint.Shutdown() by repeatedly
invoking FUTEX_WAKE until it is confirmed that no local thread is
blocked in FUTEX_WAIT.
- Drop flipcall.ControlMode.
PiperOrigin-RevId: 260981382
Diffstat (limited to 'pkg/flipcall/ctrl_futex.go')
-rw-r--r-- | pkg/flipcall/ctrl_futex.go | 146 |
1 files changed, 146 insertions, 0 deletions
diff --git a/pkg/flipcall/ctrl_futex.go b/pkg/flipcall/ctrl_futex.go new file mode 100644 index 000000000..865b6f640 --- /dev/null +++ b/pkg/flipcall/ctrl_futex.go @@ -0,0 +1,146 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "encoding/json" + "fmt" + "math" + "sync/atomic" + + "gvisor.dev/gvisor/pkg/log" +) + +type endpointControlImpl struct { + state int32 +} + +// Bits in endpointControlImpl.state. +const ( + epsBlocked = 1 << iota + epsShutdown +) + +func (ep *Endpoint) ctrlInit(opts ...EndpointOption) error { + if len(opts) != 0 { + return fmt.Errorf("unknown EndpointOption: %T", opts[0]) + } + return nil +} + +type ctrlHandshakeRequest struct{} + +type ctrlHandshakeResponse struct{} + +func (ep *Endpoint) ctrlConnect() error { + if err := ep.enterFutexWait(); err != nil { + return err + } + _, err := ep.futexConnect(&ctrlHandshakeRequest{}) + ep.exitFutexWait() + return err +} + +func (ep *Endpoint) ctrlWaitFirst() error { + if err := ep.enterFutexWait(); err != nil { + return err + } + defer ep.exitFutexWait() + + // Wait for the handshake request. + if err := ep.futexSwitchFromPeer(); err != nil { + return err + } + + // Read the handshake request. + reqLen := atomic.LoadUint32(ep.dataLen()) + if reqLen > ep.dataCap { + return fmt.Errorf("invalid handshake request length %d (maximum %d)", reqLen, ep.dataCap) + } + var req ctrlHandshakeRequest + if err := json.NewDecoder(ep.NewReader(reqLen)).Decode(&req); err != nil { + return fmt.Errorf("error reading handshake request: %v", err) + } + + // Write the handshake response. + w := ep.NewWriter() + if err := json.NewEncoder(w).Encode(ctrlHandshakeResponse{}); err != nil { + return fmt.Errorf("error writing handshake response: %v", err) + } + *ep.dataLen() = w.Len() + + // Return control to the client. + if err := ep.futexSwitchToPeer(); err != nil { + return err + } + + // Wait for the first non-handshake message. + return ep.futexSwitchFromPeer() +} + +func (ep *Endpoint) ctrlRoundTrip() error { + if err := ep.futexSwitchToPeer(); err != nil { + return err + } + if err := ep.enterFutexWait(); err != nil { + return err + } + err := ep.futexSwitchFromPeer() + ep.exitFutexWait() + return err +} + +func (ep *Endpoint) ctrlWakeLast() error { + return ep.futexSwitchToPeer() +} + +func (ep *Endpoint) enterFutexWait() error { + switch eps := atomic.AddInt32(&ep.ctrl.state, epsBlocked); eps { + case epsBlocked: + return nil + case epsBlocked | epsShutdown: + atomic.AddInt32(&ep.ctrl.state, -epsBlocked) + return shutdownError{} + default: + // Most likely due to ep.enterFutexWait() being called concurrently + // from multiple goroutines. + panic(fmt.Sprintf("invalid flipcall.Endpoint.ctrl.state before flipcall.Endpoint.enterFutexWait(): %v", eps-epsBlocked)) + } +} + +func (ep *Endpoint) exitFutexWait() { + atomic.AddInt32(&ep.ctrl.state, -epsBlocked) +} + +func (ep *Endpoint) ctrlShutdown() { + // Set epsShutdown to ensure that future calls to ep.enterFutexWait() fail. + if atomic.AddInt32(&ep.ctrl.state, epsShutdown)&epsBlocked != 0 { + // Wake the blocked thread. This must loop because it's possible that + // FUTEX_WAKE occurs after the waiter sets epsBlocked, but before it + // blocks in FUTEX_WAIT. + for { + // Wake MaxInt32 threads to prevent a broken or malicious peer from + // swallowing our wakeup by FUTEX_WAITing from multiple threads. + if err := ep.futexWakeConnState(math.MaxInt32); err != nil { + log.Warningf("failed to FUTEX_WAKE Endpoints: %v", err) + break + } + yieldThread() + if atomic.LoadInt32(&ep.ctrl.state)&epsBlocked == 0 { + break + } + } + } +} |