summaryrefslogtreecommitdiffhomepage
path: root/pkg/p9/client.go
diff options
context:
space:
mode:
authorgVisor bot <gvisor-bot@google.com>2019-09-13 06:41:23 +0000
committergVisor bot <gvisor-bot@google.com>2019-09-13 06:41:23 +0000
commite14a0a36cb11bd6adb8b9ee79834eccdbd5ac15e (patch)
tree0b1ce89a674bf02c61ae4061a432e0a9eea3f4c6 /pkg/p9/client.go
parentc65b5a8d44c27127001097108af9032ad6bb4286 (diff)
parenta8834fc555539bd6b0b46936c4a79817812658ff (diff)
Merge release-20190806.1-143-ga8834fc (automated)
Diffstat (limited to 'pkg/p9/client.go')
-rw-r--r--pkg/p9/client.go280
1 files changed, 263 insertions, 17 deletions
diff --git a/pkg/p9/client.go b/pkg/p9/client.go
index 7dc20aeef..123f54e29 100644
--- a/pkg/p9/client.go
+++ b/pkg/p9/client.go
@@ -20,6 +20,8 @@ import (
"sync"
"syscall"
+ "golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/flipcall"
"gvisor.dev/gvisor/pkg/log"
"gvisor.dev/gvisor/pkg/unet"
)
@@ -77,6 +79,45 @@ type Client struct {
// fidPool is the collection of available fids.
fidPool pool
+ // messageSize is the maximum total size of a message.
+ messageSize uint32
+
+ // payloadSize is the maximum payload size of a read or write.
+ //
+ // For large reads and writes this means that the read or write is
+ // broken up into buffer-size/payloadSize requests.
+ payloadSize uint32
+
+ // version is the agreed upon version X of 9P2000.L.Google.X.
+ // version 0 implies 9P2000.L.
+ version uint32
+
+ // sendRecv is the transport function.
+ //
+ // This is determined dynamically based on whether or not the server
+ // supports flipcall channels (preferred as it is faster and more
+ // efficient, and does not require tags).
+ sendRecv func(message, message) error
+
+ // -- below corresponds to sendRecvChannel --
+
+ // channelsMu protects channels.
+ channelsMu sync.Mutex
+
+ // channelsWg is a wait group for active clients.
+ channelsWg sync.WaitGroup
+
+ // channels are the set of initialized IPCs channels.
+ channels []*channel
+
+ // inuse is set when the channels are actually in use.
+ //
+ // This is a fixed-size slice, and the entries will be nil when the
+ // corresponding channel is available.
+ inuse []*channel
+
+ // -- below corresponds to sendRecvLegacy --
+
// pending is the set of pending messages.
pending map[Tag]*response
pendingMu sync.Mutex
@@ -89,19 +130,6 @@ type Client struct {
// Whoever writes to this channel is permitted to call recv. When
// finished calling recv, this channel should be emptied.
recvr chan bool
-
- // messageSize is the maximum total size of a message.
- messageSize uint32
-
- // payloadSize is the maximum payload size of a read or write
- // request. For large reads and writes this means that the
- // read or write is broken up into buffer-size/payloadSize
- // requests.
- payloadSize uint32
-
- // version is the agreed upon version X of 9P2000.L.Google.X.
- // version 0 implies 9P2000.L.
- version uint32
}
// NewClient creates a new client. It performs a Tversion exchange with
@@ -138,8 +166,15 @@ func NewClient(socket *unet.Socket, messageSize uint32, version string) (*Client
return nil, ErrBadVersionString
}
for {
+ // Always exchange the version using the legacy version of the
+ // protocol. If the protocol supports flipcall, then we switch
+ // our sendRecv function to use that functionality. Otherwise,
+ // we stick to sendRecvLegacy.
rversion := Rversion{}
- err := c.sendRecv(&Tversion{Version: versionString(requested), MSize: messageSize}, &rversion)
+ err := c.sendRecvLegacy(&Tversion{
+ Version: versionString(requested),
+ MSize: messageSize,
+ }, &rversion)
// The server told us to try again with a lower version.
if err == syscall.EAGAIN {
@@ -165,9 +200,125 @@ func NewClient(socket *unet.Socket, messageSize uint32, version string) (*Client
c.version = version
break
}
+
+ // Can we switch to use the more advanced channels and create
+ // independent channels for communication? Prefer it if possible.
+ if versionSupportsFlipcall(c.version) {
+ // Attempt to initialize IPC-based communication.
+ for i := 0; i < channelsPerClient; i++ {
+ if err := c.openChannel(i); err != nil {
+ log.Warningf("error opening flipcall channel: %v", err)
+ break // Stop.
+ }
+ }
+ if len(c.channels) >= 1 {
+ // At least one channel created.
+ c.sendRecv = c.sendRecvChannel
+
+ // If we are using channels for communication, then we must poll
+ // for shutdown events on the main socket. If the socket happens
+ // to shutdown, then we will close the channels as well. This is
+ // necessary because channels can hang forever if the server dies
+ // while we're expecting a response.
+ go c.watch(socket) // S/R-SAFE: not relevant.
+ } else {
+ // Channel setup failed; fallback.
+ c.sendRecv = c.sendRecvLegacy
+ }
+ } else {
+ // No channels available: use the legacy mechanism.
+ c.sendRecv = c.sendRecvLegacy
+ }
+
return c, nil
}
+// watch watches the given socket and calls Close on hang up events.
+//
+// This is intended to be called as a goroutine.
+func (c *Client) watch(socket *unet.Socket) {
+ events := []unix.PollFd{
+ unix.PollFd{
+ Fd: int32(socket.FD()),
+ Events: unix.POLLHUP | unix.POLLRDHUP,
+ },
+ }
+
+ for {
+ // Wait for a shutdown event.
+ n, err := unix.Ppoll(events, nil, nil)
+ if n == 0 || err == syscall.EAGAIN {
+ continue
+ }
+ break
+ }
+
+ // Close everything down: this will kick all active clients off any
+ // pending requests. Note that Close must be safe to call concurrently,
+ // and multiple times (see Close below).
+ c.Close()
+}
+
+// openChannel attempts to open a client channel.
+//
+// Note that this function returns naked errors which should not be propagated
+// directly to a caller. It is expected that the errors will be logged and a
+// fallback path will be used instead.
+func (c *Client) openChannel(id int) error {
+ var (
+ rchannel0 Rchannel
+ rchannel1 Rchannel
+ res = new(channel)
+ )
+
+ // Open the data channel.
+ if err := c.sendRecvLegacy(&Tchannel{
+ ID: uint32(id),
+ Control: 0,
+ }, &rchannel0); err != nil {
+ return fmt.Errorf("error handling Tchannel message: %v", err)
+ }
+ if rchannel0.FilePayload() == nil {
+ return fmt.Errorf("missing file descriptor on primary channel")
+ }
+
+ // We don't need to hold this.
+ defer rchannel0.FilePayload().Close()
+
+ // Open the channel for file descriptors.
+ if err := c.sendRecvLegacy(&Tchannel{
+ ID: uint32(id),
+ Control: 1,
+ }, &rchannel1); err != nil {
+ return err
+ }
+ if rchannel1.FilePayload() == nil {
+ return fmt.Errorf("missing file descriptor on file descriptor channel")
+ }
+
+ // Construct the endpoints.
+ res.desc = flipcall.PacketWindowDescriptor{
+ FD: rchannel0.FilePayload().FD(),
+ Offset: int64(rchannel0.Offset),
+ Length: int(rchannel0.Length),
+ }
+ if err := res.data.Init(flipcall.ClientSide, res.desc); err != nil {
+ rchannel1.FilePayload().Close()
+ return err
+ }
+
+ // The fds channel owns the control payload, and it will be closed when
+ // the channel object is closed.
+ res.fds.Init(rchannel1.FilePayload().Release())
+
+ // Save the channel.
+ c.channelsMu.Lock()
+ defer c.channelsMu.Unlock()
+ c.channels = append(c.channels, res)
+ c.inuse = append(c.inuse, nil)
+ return nil
+}
+
// handleOne handles a single incoming message.
//
// This should only be called with the token from recvr. Note that the received
@@ -247,10 +398,10 @@ func (c *Client) waitAndRecv(done chan error) error {
}
}
-// sendRecv performs a roundtrip message exchange.
+// sendRecvLegacy performs a roundtrip message exchange.
//
// This is called by internal functions.
-func (c *Client) sendRecv(t message, r message) error {
+func (c *Client) sendRecvLegacy(t message, r message) error {
tag, ok := c.tagPool.Get()
if !ok {
return ErrOutOfTags
@@ -296,12 +447,107 @@ func (c *Client) sendRecv(t message, r message) error {
return nil
}
+// sendRecvChannel uses channels to send a message.
+func (c *Client) sendRecvChannel(t message, r message) error {
+ c.channelsMu.Lock()
+ if len(c.channels) == 0 {
+ // No channel available.
+ c.channelsMu.Unlock()
+ return c.sendRecvLegacy(t, r)
+ }
+
+ // Find the last used channel.
+ //
+ // Note that we must add one to the wait group while holding the
+ // channel mutex, in order for the Wait operation to be race-free
+ // below. The Wait operation shuts down all in use channels and
+ // waits for them to return, but must do so holding the mutex.
+ idx := len(c.channels) - 1
+ ch := c.channels[idx]
+ c.channels = c.channels[:idx]
+ c.inuse[idx] = ch
+ c.channelsWg.Add(1)
+ c.channelsMu.Unlock()
+
+ // Ensure that it's connected.
+ if !ch.connected {
+ ch.connected = true
+ if err := ch.data.Connect(); err != nil {
+ // The channel is unusable, so don't return it.
+ ch.Close()
+ c.channelsWg.Done()
+ return err
+ }
+ }
+
+ // Send the message.
+ err := ch.sendRecv(c, t, r)
+ if err != nil {
+ // On shutdown, we'll see ENOENT. This is a normal situation, and
+ // we shouldn't generate a spurious warning message in that case.
+ log.Debugf("error calling sendRecvChannel: %v", err)
+ }
+ c.channelsWg.Done()
+
+ // Return the channel.
+ //
+ // Note that we check the channel from the inuse slice here. This
+ // prevents a race where Close is called, which clears inuse, and
+ // means that we will not actually return the closed channel.
+ c.channelsMu.Lock()
+ if c.inuse[idx] != nil {
+ c.channels = append(c.channels, ch)
+ c.inuse[idx] = nil
+ }
+ c.channelsMu.Unlock()
+
+ return err
+}
+
// Version returns the negotiated 9P2000.L.Google version number.
func (c *Client) Version() uint32 {
return c.version
}
-// Close closes the underlying socket.
+// Close closes the underlying socket and channels.
+//
+// Because Close may be called asynchronously from watch, it must be
+// safe to call concurrently and multiple times.
func (c *Client) Close() error {
+ c.channelsMu.Lock()
+ defer c.channelsMu.Unlock()
+
+ // Close all inactive channels.
+ for _, ch := range c.channels {
+ ch.Shutdown()
+ ch.Close()
+ }
+ // Close all active channels.
+ for _, ch := range c.inuse {
+ if ch != nil {
+ log.Debugf("shutting down active channel@%p...", ch)
+ ch.Shutdown()
+ }
+ }
+
+ // Wait for active users.
+ c.channelsWg.Wait()
+
+ // Close all previously active channels.
+ for i, ch := range c.inuse {
+ if ch != nil {
+ ch.Close()
+
+ // Clear the inuse entry here so that it will not be returned
+ // to the channel slice, which is cleared below. See the
+ // comment at the end of sendRecvChannel.
+ c.inuse[i] = nil
+ }
+ }
+ c.channels = nil // Prevent use again.
+
+ // Close the main socket. Note that operation is safe to be called
+ // multiple times, unlikely the channel Close operations above, which
+ // we are careful to ensure aren't called twice.
return c.socket.Close()
}