diff options
Diffstat (limited to 'pkg/flipcall/flipcall.go')
-rw-r--r-- | pkg/flipcall/flipcall.go | 72 |
1 files changed, 49 insertions, 23 deletions
diff --git a/pkg/flipcall/flipcall.go b/pkg/flipcall/flipcall.go index 5c9212c33..386cee42c 100644 --- a/pkg/flipcall/flipcall.go +++ b/pkg/flipcall/flipcall.go @@ -42,11 +42,6 @@ type Endpoint struct { // dataCap is immutable. dataCap uint32 - // shutdown is non-zero if Endpoint.Shutdown() has been called, or if the - // Endpoint has acknowledged shutdown initiated by the peer. shutdown is - // accessed using atomic memory operations. - shutdown uint32 - // activeState is csClientActive if this is a client Endpoint and // csServerActive if this is a server Endpoint. activeState uint32 @@ -55,9 +50,27 @@ type Endpoint struct { // csClientActive if this is a server Endpoint. inactiveState uint32 + // shutdown is non-zero if Endpoint.Shutdown() has been called, or if the + // Endpoint has acknowledged shutdown initiated by the peer. shutdown is + // accessed using atomic memory operations. + shutdown uint32 + ctrl endpointControlImpl } +// EndpointSide indicates which side of a connection an Endpoint belongs to. +type EndpointSide int + +const ( + // ClientSide indicates that an Endpoint is a client (initially-active; + // first method call should be Connect). + ClientSide EndpointSide = iota + + // ServerSide indicates that an Endpoint is a server (initially-inactive; + // first method call should be RecvFirst.) + ServerSide +) + // Init must be called on zero-value Endpoints before first use. If it // succeeds, ep.Destroy() must be called once the Endpoint is no longer in use. // @@ -65,7 +78,17 @@ type Endpoint struct { // Endpoint. FD may differ between Endpoints if they are in different // processes, but must represent the same file. The packet window must // initially be filled with zero bytes. -func (ep *Endpoint) Init(pwd PacketWindowDescriptor, opts ...EndpointOption) error { +func (ep *Endpoint) Init(side EndpointSide, pwd PacketWindowDescriptor, opts ...EndpointOption) error { + switch side { + case ClientSide: + ep.activeState = csClientActive + ep.inactiveState = csServerActive + case ServerSide: + ep.activeState = csServerActive + ep.inactiveState = csClientActive + default: + return fmt.Errorf("invalid EndpointSide: %v", side) + } if pwd.Length < pageSize { return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize) } @@ -78,9 +101,6 @@ func (ep *Endpoint) Init(pwd PacketWindowDescriptor, opts ...EndpointOption) err } ep.packet = m ep.dataCap = uint32(pwd.Length) - uint32(PacketHeaderBytes) - // These will be overwritten by ep.Connect() for client Endpoints. - ep.activeState = csServerActive - ep.inactiveState = csClientActive if err := ep.ctrlInit(opts...); err != nil { ep.unmapPacket() return err @@ -90,9 +110,9 @@ func (ep *Endpoint) Init(pwd PacketWindowDescriptor, opts ...EndpointOption) err // NewEndpoint is a convenience function that returns an initialized Endpoint // allocated on the heap. -func NewEndpoint(pwd PacketWindowDescriptor, opts ...EndpointOption) (*Endpoint, error) { +func NewEndpoint(side EndpointSide, pwd PacketWindowDescriptor, opts ...EndpointOption) (*Endpoint, error) { var ep Endpoint - if err := ep.Init(pwd, opts...); err != nil { + if err := ep.Init(side, pwd, opts...); err != nil { return nil, err } return &ep, nil @@ -115,9 +135,9 @@ func (ep *Endpoint) unmapPacket() { } // Shutdown causes concurrent and future calls to ep.Connect(), ep.SendRecv(), -// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not -// wait for concurrent calls to return. The effect of Shutdown on the peer -// Endpoint is unspecified. Successive calls to Shutdown have no effect. +// ep.RecvFirst(), and ep.SendLast(), as well as the same calls in the peer +// Endpoint, to unblock and return errors. It does not wait for concurrent +// calls to return. Successive calls to Shutdown have no effect. // // Shutdown is the only Endpoint method that may be called concurrently with // other methods on the same Endpoint. @@ -152,28 +172,31 @@ const ( // The client is, by definition, initially active, so this must be 0. csClientActive = 0 csServerActive = 1 + csShutdown = 2 ) -// Connect designates ep as a client Endpoint and blocks until the peer -// Endpoint has called Endpoint.RecvFirst(). +// Connect blocks until the peer Endpoint has called Endpoint.RecvFirst(). // -// Preconditions: ep.Connect(), ep.RecvFirst(), ep.SendRecv(), and -// ep.SendLast() have never been called. +// Preconditions: ep is a client Endpoint. ep.Connect(), ep.RecvFirst(), +// ep.SendRecv(), and ep.SendLast() have never been called. func (ep *Endpoint) Connect() error { - ep.activeState = csClientActive - ep.inactiveState = csServerActive - return ep.ctrlConnect() + err := ep.ctrlConnect() + if err == nil { + raceBecomeActive() + } + return err } // RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then // returns the datagram length specified by that call. // -// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never -// been called. +// Preconditions: ep is a server Endpoint. ep.SendRecv(), ep.RecvFirst(), and +// ep.SendLast() have never been called. func (ep *Endpoint) RecvFirst() (uint32, error) { if err := ep.ctrlWaitFirst(); err != nil { return 0, err } + raceBecomeActive() recvDataLen := atomic.LoadUint32(ep.dataLen()) if recvDataLen > ep.dataCap { return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap) @@ -200,9 +223,11 @@ func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) { // after ep.ctrlRoundTrip(), so if the peer is mutating it concurrently then // they can only shoot themselves in the foot. *ep.dataLen() = dataLen + raceBecomeInactive() if err := ep.ctrlRoundTrip(); err != nil { return 0, err } + raceBecomeActive() recvDataLen := atomic.LoadUint32(ep.dataLen()) if recvDataLen > ep.dataCap { return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap) @@ -222,6 +247,7 @@ func (ep *Endpoint) SendLast(dataLen uint32) error { panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap)) } *ep.dataLen() = dataLen + raceBecomeInactive() if err := ep.ctrlWakeLast(); err != nil { return err } |