summaryrefslogtreecommitdiffhomepage
path: root/pkg/lisafs/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/lisafs/client.go')
-rw-r--r--pkg/lisafs/client.go55
1 files changed, 55 insertions, 0 deletions
diff --git a/pkg/lisafs/client.go b/pkg/lisafs/client.go
index c99f8c73d..ccf1b9f72 100644
--- a/pkg/lisafs/client.go
+++ b/pkg/lisafs/client.go
@@ -20,12 +20,19 @@ import (
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/cleanup"
+ "gvisor.dev/gvisor/pkg/context"
"gvisor.dev/gvisor/pkg/flipcall"
"gvisor.dev/gvisor/pkg/log"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/unet"
)
+const (
+ // fdsToCloseBatchSize is the number of closed FDs batched before an Close
+ // RPC is made to close them all. fdsToCloseBatchSize is immutable.
+ fdsToCloseBatchSize = 100
+)
+
// Client helps manage a connection to the lisafs server and pass messages
// efficiently. There is a 1:1 mapping between a Connection and a Client.
type Client struct {
@@ -53,6 +60,12 @@ type Client struct {
// maxMessageSize is the maximum payload length (in bytes) that can be sent.
// It is initialized on Mount and is immutable.
maxMessageSize uint32
+
+ // fdsToClose tracks the FDs to close. It caches the FDs no longer being used
+ // by the client and closes them in one shot. It is not preserved across
+ // checkpoint/restore as FDIDs are not preserved.
+ fdsMu sync.Mutex
+ fdsToClose []FDID
}
// NewClient creates a new client for communication with the server. It mounts
@@ -66,6 +79,7 @@ func NewClient(sock *unet.Socket, mountPath string) (*Client, *Inode, error) {
channels: make([]*channel, 0, maxChans),
availableChannels: make([]*channel, 0, maxChans),
maxMessageSize: 1 << 20, // 1 MB for now.
+ fdsToClose: make([]FDID, 0, fdsToCloseBatchSize),
}
// Start a goroutine to check socket health. This goroutine is also
@@ -245,6 +259,47 @@ func (c *Client) IsSupported(m MID) bool {
return int(m) < len(c.supported) && c.supported[m]
}
+// CloseFDBatched either queues the passed FD to be closed or makes a batch
+// RPC to close all the accumulated FDs-to-close.
+func (c *Client) CloseFDBatched(ctx context.Context, fd FDID) {
+ c.fdsMu.Lock()
+ c.fdsToClose = append(c.fdsToClose, fd)
+ if len(c.fdsToClose) < fdsToCloseBatchSize {
+ c.fdsMu.Unlock()
+ return
+ }
+
+ // Flush the cache. We should not hold fdsMu while making an RPC, so be sure
+ // to copy the fdsToClose to another buffer before unlocking fdsMu.
+ var toCloseArr [fdsToCloseBatchSize]FDID
+ toClose := toCloseArr[:len(c.fdsToClose)]
+ copy(toClose, c.fdsToClose)
+
+ // Clear fdsToClose so other FDIDs can be appended.
+ c.fdsToClose = c.fdsToClose[:0]
+ c.fdsMu.Unlock()
+
+ req := CloseReq{FDs: toClose}
+ ctx.UninterruptibleSleepStart(false)
+ err := c.SndRcvMessage(Close, uint32(req.SizeBytes()), req.MarshalBytes, NoopUnmarshal, nil)
+ ctx.UninterruptibleSleepFinish(false)
+ if err != nil {
+ log.Warningf("lisafs: batch closing FDs returned error: %v", err)
+ }
+}
+
+// SyncFDs makes a Fsync RPC to sync multiple FDs.
+func (c *Client) SyncFDs(ctx context.Context, fds []FDID) error {
+ if len(fds) == 0 {
+ return nil
+ }
+ req := FsyncReq{FDs: fds}
+ ctx.UninterruptibleSleepStart(false)
+ err := c.SndRcvMessage(FSync, uint32(req.SizeBytes()), req.MarshalBytes, NoopUnmarshal, nil)
+ ctx.UninterruptibleSleepFinish(false)
+ return err
+}
+
// SndRcvMessage invokes reqMarshal to marshal the request onto the payload
// buffer, wakes up the server to process the request, waits for the response
// and invokes respUnmarshal with the response payload. respFDs is populated