diff options
Diffstat (limited to 'pkg/p9/client.go')
-rw-r--r-- | pkg/p9/client.go | 280 |
1 files changed, 263 insertions, 17 deletions
diff --git a/pkg/p9/client.go b/pkg/p9/client.go index 7dc20aeef..123f54e29 100644 --- a/pkg/p9/client.go +++ b/pkg/p9/client.go @@ -20,6 +20,8 @@ import ( "sync" "syscall" + "golang.org/x/sys/unix" + "gvisor.dev/gvisor/pkg/flipcall" "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/unet" ) @@ -77,6 +79,45 @@ type Client struct { // fidPool is the collection of available fids. fidPool pool + // messageSize is the maximum total size of a message. + messageSize uint32 + + // payloadSize is the maximum payload size of a read or write. + // + // For large reads and writes this means that the read or write is + // broken up into buffer-size/payloadSize requests. + payloadSize uint32 + + // version is the agreed upon version X of 9P2000.L.Google.X. + // version 0 implies 9P2000.L. + version uint32 + + // sendRecv is the transport function. + // + // This is determined dynamically based on whether or not the server + // supports flipcall channels (preferred as it is faster and more + // efficient, and does not require tags). + sendRecv func(message, message) error + + // -- below corresponds to sendRecvChannel -- + + // channelsMu protects channels. + channelsMu sync.Mutex + + // channelsWg is a wait group for active clients. + channelsWg sync.WaitGroup + + // channels are the set of initialized IPCs channels. + channels []*channel + + // inuse is set when the channels are actually in use. + // + // This is a fixed-size slice, and the entries will be nil when the + // corresponding channel is available. + inuse []*channel + + // -- below corresponds to sendRecvLegacy -- + // pending is the set of pending messages. pending map[Tag]*response pendingMu sync.Mutex @@ -89,19 +130,6 @@ type Client struct { // Whoever writes to this channel is permitted to call recv. When // finished calling recv, this channel should be emptied. recvr chan bool - - // messageSize is the maximum total size of a message. - messageSize uint32 - - // payloadSize is the maximum payload size of a read or write - // request. For large reads and writes this means that the - // read or write is broken up into buffer-size/payloadSize - // requests. - payloadSize uint32 - - // version is the agreed upon version X of 9P2000.L.Google.X. - // version 0 implies 9P2000.L. - version uint32 } // NewClient creates a new client. It performs a Tversion exchange with @@ -138,8 +166,15 @@ func NewClient(socket *unet.Socket, messageSize uint32, version string) (*Client return nil, ErrBadVersionString } for { + // Always exchange the version using the legacy version of the + // protocol. If the protocol supports flipcall, then we switch + // our sendRecv function to use that functionality. Otherwise, + // we stick to sendRecvLegacy. rversion := Rversion{} - err := c.sendRecv(&Tversion{Version: versionString(requested), MSize: messageSize}, &rversion) + err := c.sendRecvLegacy(&Tversion{ + Version: versionString(requested), + MSize: messageSize, + }, &rversion) // The server told us to try again with a lower version. if err == syscall.EAGAIN { @@ -165,9 +200,125 @@ func NewClient(socket *unet.Socket, messageSize uint32, version string) (*Client c.version = version break } + + // Can we switch to use the more advanced channels and create + // independent channels for communication? Prefer it if possible. + if versionSupportsFlipcall(c.version) { + // Attempt to initialize IPC-based communication. + for i := 0; i < channelsPerClient; i++ { + if err := c.openChannel(i); err != nil { + log.Warningf("error opening flipcall channel: %v", err) + break // Stop. + } + } + if len(c.channels) >= 1 { + // At least one channel created. + c.sendRecv = c.sendRecvChannel + + // If we are using channels for communication, then we must poll + // for shutdown events on the main socket. If the socket happens + // to shutdown, then we will close the channels as well. This is + // necessary because channels can hang forever if the server dies + // while we're expecting a response. + go c.watch(socket) // S/R-SAFE: not relevant. + } else { + // Channel setup failed; fallback. + c.sendRecv = c.sendRecvLegacy + } + } else { + // No channels available: use the legacy mechanism. + c.sendRecv = c.sendRecvLegacy + } + return c, nil } +// watch watches the given socket and calls Close on hang up events. +// +// This is intended to be called as a goroutine. +func (c *Client) watch(socket *unet.Socket) { + events := []unix.PollFd{ + unix.PollFd{ + Fd: int32(socket.FD()), + Events: unix.POLLHUP | unix.POLLRDHUP, + }, + } + + for { + // Wait for a shutdown event. + n, err := unix.Ppoll(events, nil, nil) + if n == 0 || err == syscall.EAGAIN { + continue + } + break + } + + // Close everything down: this will kick all active clients off any + // pending requests. Note that Close must be safe to call concurrently, + // and multiple times (see Close below). + c.Close() +} + +// openChannel attempts to open a client channel. +// +// Note that this function returns naked errors which should not be propagated +// directly to a caller. It is expected that the errors will be logged and a +// fallback path will be used instead. +func (c *Client) openChannel(id int) error { + var ( + rchannel0 Rchannel + rchannel1 Rchannel + res = new(channel) + ) + + // Open the data channel. + if err := c.sendRecvLegacy(&Tchannel{ + ID: uint32(id), + Control: 0, + }, &rchannel0); err != nil { + return fmt.Errorf("error handling Tchannel message: %v", err) + } + if rchannel0.FilePayload() == nil { + return fmt.Errorf("missing file descriptor on primary channel") + } + + // We don't need to hold this. + defer rchannel0.FilePayload().Close() + + // Open the channel for file descriptors. + if err := c.sendRecvLegacy(&Tchannel{ + ID: uint32(id), + Control: 1, + }, &rchannel1); err != nil { + return err + } + if rchannel1.FilePayload() == nil { + return fmt.Errorf("missing file descriptor on file descriptor channel") + } + + // Construct the endpoints. + res.desc = flipcall.PacketWindowDescriptor{ + FD: rchannel0.FilePayload().FD(), + Offset: int64(rchannel0.Offset), + Length: int(rchannel0.Length), + } + if err := res.data.Init(flipcall.ClientSide, res.desc); err != nil { + rchannel1.FilePayload().Close() + return err + } + + // The fds channel owns the control payload, and it will be closed when + // the channel object is closed. + res.fds.Init(rchannel1.FilePayload().Release()) + + // Save the channel. + c.channelsMu.Lock() + defer c.channelsMu.Unlock() + c.channels = append(c.channels, res) + c.inuse = append(c.inuse, nil) + return nil +} + // handleOne handles a single incoming message. // // This should only be called with the token from recvr. Note that the received @@ -247,10 +398,10 @@ func (c *Client) waitAndRecv(done chan error) error { } } -// sendRecv performs a roundtrip message exchange. +// sendRecvLegacy performs a roundtrip message exchange. // // This is called by internal functions. -func (c *Client) sendRecv(t message, r message) error { +func (c *Client) sendRecvLegacy(t message, r message) error { tag, ok := c.tagPool.Get() if !ok { return ErrOutOfTags @@ -296,12 +447,107 @@ func (c *Client) sendRecv(t message, r message) error { return nil } +// sendRecvChannel uses channels to send a message. +func (c *Client) sendRecvChannel(t message, r message) error { + c.channelsMu.Lock() + if len(c.channels) == 0 { + // No channel available. + c.channelsMu.Unlock() + return c.sendRecvLegacy(t, r) + } + + // Find the last used channel. + // + // Note that we must add one to the wait group while holding the + // channel mutex, in order for the Wait operation to be race-free + // below. The Wait operation shuts down all in use channels and + // waits for them to return, but must do so holding the mutex. + idx := len(c.channels) - 1 + ch := c.channels[idx] + c.channels = c.channels[:idx] + c.inuse[idx] = ch + c.channelsWg.Add(1) + c.channelsMu.Unlock() + + // Ensure that it's connected. + if !ch.connected { + ch.connected = true + if err := ch.data.Connect(); err != nil { + // The channel is unusable, so don't return it. + ch.Close() + c.channelsWg.Done() + return err + } + } + + // Send the message. + err := ch.sendRecv(c, t, r) + if err != nil { + // On shutdown, we'll see ENOENT. This is a normal situation, and + // we shouldn't generate a spurious warning message in that case. + log.Debugf("error calling sendRecvChannel: %v", err) + } + c.channelsWg.Done() + + // Return the channel. + // + // Note that we check the channel from the inuse slice here. This + // prevents a race where Close is called, which clears inuse, and + // means that we will not actually return the closed channel. + c.channelsMu.Lock() + if c.inuse[idx] != nil { + c.channels = append(c.channels, ch) + c.inuse[idx] = nil + } + c.channelsMu.Unlock() + + return err +} + // Version returns the negotiated 9P2000.L.Google version number. func (c *Client) Version() uint32 { return c.version } -// Close closes the underlying socket. +// Close closes the underlying socket and channels. +// +// Because Close may be called asynchronously from watch, it must be +// safe to call concurrently and multiple times. func (c *Client) Close() error { + c.channelsMu.Lock() + defer c.channelsMu.Unlock() + + // Close all inactive channels. + for _, ch := range c.channels { + ch.Shutdown() + ch.Close() + } + // Close all active channels. + for _, ch := range c.inuse { + if ch != nil { + log.Debugf("shutting down active channel@%p...", ch) + ch.Shutdown() + } + } + + // Wait for active users. + c.channelsWg.Wait() + + // Close all previously active channels. + for i, ch := range c.inuse { + if ch != nil { + ch.Close() + + // Clear the inuse entry here so that it will not be returned + // to the channel slice, which is cleared below. See the + // comment at the end of sendRecvChannel. + c.inuse[i] = nil + } + } + c.channels = nil // Prevent use again. + + // Close the main socket. Note that operation is safe to be called + // multiple times, unlikely the channel Close operations above, which + // we are careful to ensure aren't called twice. return c.socket.Close() } |