summaryrefslogtreecommitdiffhomepage
path: root/pkg/flipcall/ctrl_futex.go
diff options
context:
space:
mode:
authorJamie Liu <jamieliu@google.com>2019-07-31 12:54:49 -0700
committergVisor bot <gvisor-bot@google.com>2019-07-31 12:56:04 -0700
commitcbe145247a748177e62b268f402cbe76e5b1ca35 (patch)
tree079949c800a6f8f50f242f559a302f73f9c8fa7e /pkg/flipcall/ctrl_futex.go
parentcf2b2d97d512a91261f72abe40b163c61d52705f (diff)
Flipcall refinements.
Note that some of these changes affect the protocol in backward-incompatible ways. - Replace use of "initially-active" and "initially-inactive" with "client" and "server" respectively for clarity. - Fix a race condition involving Endpoint.Shutdown() by repeatedly invoking FUTEX_WAKE until it is confirmed that no local thread is blocked in FUTEX_WAIT. - Drop flipcall.ControlMode. PiperOrigin-RevId: 260981382
Diffstat (limited to 'pkg/flipcall/ctrl_futex.go')
-rw-r--r--pkg/flipcall/ctrl_futex.go146
1 files changed, 146 insertions, 0 deletions
diff --git a/pkg/flipcall/ctrl_futex.go b/pkg/flipcall/ctrl_futex.go
new file mode 100644
index 000000000..865b6f640
--- /dev/null
+++ b/pkg/flipcall/ctrl_futex.go
@@ -0,0 +1,146 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flipcall
+
+import (
+ "encoding/json"
+ "fmt"
+ "math"
+ "sync/atomic"
+
+ "gvisor.dev/gvisor/pkg/log"
+)
+
+type endpointControlImpl struct {
+ state int32
+}
+
+// Bits in endpointControlImpl.state.
+const (
+ epsBlocked = 1 << iota
+ epsShutdown
+)
+
+func (ep *Endpoint) ctrlInit(opts ...EndpointOption) error {
+ if len(opts) != 0 {
+ return fmt.Errorf("unknown EndpointOption: %T", opts[0])
+ }
+ return nil
+}
+
+type ctrlHandshakeRequest struct{}
+
+type ctrlHandshakeResponse struct{}
+
+func (ep *Endpoint) ctrlConnect() error {
+ if err := ep.enterFutexWait(); err != nil {
+ return err
+ }
+ _, err := ep.futexConnect(&ctrlHandshakeRequest{})
+ ep.exitFutexWait()
+ return err
+}
+
+func (ep *Endpoint) ctrlWaitFirst() error {
+ if err := ep.enterFutexWait(); err != nil {
+ return err
+ }
+ defer ep.exitFutexWait()
+
+ // Wait for the handshake request.
+ if err := ep.futexSwitchFromPeer(); err != nil {
+ return err
+ }
+
+ // Read the handshake request.
+ reqLen := atomic.LoadUint32(ep.dataLen())
+ if reqLen > ep.dataCap {
+ return fmt.Errorf("invalid handshake request length %d (maximum %d)", reqLen, ep.dataCap)
+ }
+ var req ctrlHandshakeRequest
+ if err := json.NewDecoder(ep.NewReader(reqLen)).Decode(&req); err != nil {
+ return fmt.Errorf("error reading handshake request: %v", err)
+ }
+
+ // Write the handshake response.
+ w := ep.NewWriter()
+ if err := json.NewEncoder(w).Encode(ctrlHandshakeResponse{}); err != nil {
+ return fmt.Errorf("error writing handshake response: %v", err)
+ }
+ *ep.dataLen() = w.Len()
+
+ // Return control to the client.
+ if err := ep.futexSwitchToPeer(); err != nil {
+ return err
+ }
+
+ // Wait for the first non-handshake message.
+ return ep.futexSwitchFromPeer()
+}
+
+func (ep *Endpoint) ctrlRoundTrip() error {
+ if err := ep.futexSwitchToPeer(); err != nil {
+ return err
+ }
+ if err := ep.enterFutexWait(); err != nil {
+ return err
+ }
+ err := ep.futexSwitchFromPeer()
+ ep.exitFutexWait()
+ return err
+}
+
+func (ep *Endpoint) ctrlWakeLast() error {
+ return ep.futexSwitchToPeer()
+}
+
+func (ep *Endpoint) enterFutexWait() error {
+ switch eps := atomic.AddInt32(&ep.ctrl.state, epsBlocked); eps {
+ case epsBlocked:
+ return nil
+ case epsBlocked | epsShutdown:
+ atomic.AddInt32(&ep.ctrl.state, -epsBlocked)
+ return shutdownError{}
+ default:
+ // Most likely due to ep.enterFutexWait() being called concurrently
+ // from multiple goroutines.
+ panic(fmt.Sprintf("invalid flipcall.Endpoint.ctrl.state before flipcall.Endpoint.enterFutexWait(): %v", eps-epsBlocked))
+ }
+}
+
+func (ep *Endpoint) exitFutexWait() {
+ atomic.AddInt32(&ep.ctrl.state, -epsBlocked)
+}
+
+func (ep *Endpoint) ctrlShutdown() {
+ // Set epsShutdown to ensure that future calls to ep.enterFutexWait() fail.
+ if atomic.AddInt32(&ep.ctrl.state, epsShutdown)&epsBlocked != 0 {
+ // Wake the blocked thread. This must loop because it's possible that
+ // FUTEX_WAKE occurs after the waiter sets epsBlocked, but before it
+ // blocks in FUTEX_WAIT.
+ for {
+ // Wake MaxInt32 threads to prevent a broken or malicious peer from
+ // swallowing our wakeup by FUTEX_WAITing from multiple threads.
+ if err := ep.futexWakeConnState(math.MaxInt32); err != nil {
+ log.Warningf("failed to FUTEX_WAKE Endpoints: %v", err)
+ break
+ }
+ yieldThread()
+ if atomic.LoadInt32(&ep.ctrl.state)&epsBlocked == 0 {
+ break
+ }
+ }
+ }
+}