diff options
Diffstat (limited to 'pkg/flipcall/ctrl_futex.go')
-rw-r--r-- | pkg/flipcall/ctrl_futex.go | 83 |
1 files changed, 58 insertions, 25 deletions
diff --git a/pkg/flipcall/ctrl_futex.go b/pkg/flipcall/ctrl_futex.go index e7c3a3a0b..2e8452a02 100644 --- a/pkg/flipcall/ctrl_futex.go +++ b/pkg/flipcall/ctrl_futex.go @@ -40,17 +40,41 @@ func (ep *Endpoint) ctrlInit(opts ...EndpointOption) error { return nil } -type ctrlHandshakeRequest struct{} - -type ctrlHandshakeResponse struct{} - func (ep *Endpoint) ctrlConnect() error { if err := ep.enterFutexWait(); err != nil { return err } - _, err := ep.futexConnect(&ctrlHandshakeRequest{}) - ep.exitFutexWait() - return err + defer ep.exitFutexWait() + + // Write the connection request. + w := ep.NewWriter() + if err := json.NewEncoder(w).Encode(struct{}{}); err != nil { + return fmt.Errorf("error writing connection request: %v", err) + } + *ep.dataLen() = w.Len() + + // Exchange control with the server. + if err := ep.futexSetPeerActive(); err != nil { + return err + } + if err := ep.futexWakePeer(); err != nil { + return err + } + if err := ep.futexWaitUntilActive(); err != nil { + return err + } + + // Read the connection response. + var resp struct{} + respLen := atomic.LoadUint32(ep.dataLen()) + if respLen > ep.dataCap { + return fmt.Errorf("invalid connection response length %d (maximum %d)", respLen, ep.dataCap) + } + if err := json.NewDecoder(ep.NewReader(respLen)).Decode(&resp); err != nil { + return fmt.Errorf("error reading connection response: %v", err) + } + + return nil } func (ep *Endpoint) ctrlWaitFirst() error { @@ -59,52 +83,61 @@ func (ep *Endpoint) ctrlWaitFirst() error { } defer ep.exitFutexWait() - // Wait for the handshake request. - if err := ep.futexSwitchFromPeer(); err != nil { + // Wait for the connection request. + if err := ep.futexWaitUntilActive(); err != nil { return err } - // Read the handshake request. + // Read the connection request. reqLen := atomic.LoadUint32(ep.dataLen()) if reqLen > ep.dataCap { - return fmt.Errorf("invalid handshake request length %d (maximum %d)", reqLen, ep.dataCap) + return fmt.Errorf("invalid connection request length %d (maximum %d)", reqLen, ep.dataCap) } - var req ctrlHandshakeRequest + var req struct{} if err := json.NewDecoder(ep.NewReader(reqLen)).Decode(&req); err != nil { - return fmt.Errorf("error reading handshake request: %v", err) + return fmt.Errorf("error reading connection request: %v", err) } - // Write the handshake response. + // Write the connection response. w := ep.NewWriter() - if err := json.NewEncoder(w).Encode(ctrlHandshakeResponse{}); err != nil { - return fmt.Errorf("error writing handshake response: %v", err) + if err := json.NewEncoder(w).Encode(struct{}{}); err != nil { + return fmt.Errorf("error writing connection response: %v", err) } *ep.dataLen() = w.Len() // Return control to the client. raceBecomeInactive() - if err := ep.futexSwitchToPeer(); err != nil { + if err := ep.futexSetPeerActive(); err != nil { + return err + } + if err := ep.futexWakePeer(); err != nil { return err } - // Wait for the first non-handshake message. - return ep.futexSwitchFromPeer() + // Wait for the first non-connection message. + return ep.futexWaitUntilActive() } func (ep *Endpoint) ctrlRoundTrip() error { - if err := ep.futexSwitchToPeer(); err != nil { + if err := ep.enterFutexWait(); err != nil { return err } - if err := ep.enterFutexWait(); err != nil { + defer ep.exitFutexWait() + + if err := ep.futexSetPeerActive(); err != nil { return err } - err := ep.futexSwitchFromPeer() - ep.exitFutexWait() - return err + if err := ep.futexWakePeer(); err != nil { + return err + } + return ep.futexWaitUntilActive() } func (ep *Endpoint) ctrlWakeLast() error { - return ep.futexSwitchToPeer() + if err := ep.futexSetPeerActive(); err != nil { + return err + } + return ep.futexWakePeer() } func (ep *Endpoint) enterFutexWait() error { |