summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJinmou Li <jinmli@google.com>2020-09-03 18:41:26 +0000
committerAndrei Vagin <avagin@gmail.com>2020-09-16 12:19:30 -0700
commitf1219ec5f17b10f18d7d5e6605ea27ecba40c409 (patch)
tree534694b19326b01ec837e3c90fe7c65bbd48da6f
parent2fbbe3b76864bc5b42f05d84166008b25e599bdb (diff)
Refactor FUSE connection for readability and structure
This change decouples the code that is weakly tied to the connection struct from connection.go, rename variables and files with more meaningful choices, adds detailed comments, explains lock orders, and adds other minor improvement to make the existing FUSE code more readable and more organized. Purpose is to avoid too much code in one file and provide better structure for the future commits.
-rw-r--r--pkg/sentry/fsimpl/fuse/BUILD2
-rw-r--r--pkg/sentry/fsimpl/fuse/connection.go264
-rw-r--r--pkg/sentry/fsimpl/fuse/connection_control.go (renamed from pkg/sentry/fsimpl/fuse/init.go)98
-rw-r--r--pkg/sentry/fsimpl/fuse/dev_test.go2
-rw-r--r--pkg/sentry/fsimpl/fuse/fusefs.go25
-rw-r--r--pkg/sentry/fsimpl/fuse/request_response.go150
6 files changed, 273 insertions, 268 deletions
diff --git a/pkg/sentry/fsimpl/fuse/BUILD b/pkg/sentry/fsimpl/fuse/BUILD
index 5085d0521..5b60b1277 100644
--- a/pkg/sentry/fsimpl/fuse/BUILD
+++ b/pkg/sentry/fsimpl/fuse/BUILD
@@ -30,11 +30,11 @@ go_library(
name = "fuse",
srcs = [
"connection.go",
+ "connection_control.go",
"dev.go",
"directory.go",
"file.go",
"fusefs.go",
- "init.go",
"inode_refs.go",
"read_write.go",
"register.go",
diff --git a/pkg/sentry/fsimpl/fuse/connection.go b/pkg/sentry/fsimpl/fuse/connection.go
index b3f981459..880956c75 100644
--- a/pkg/sentry/fsimpl/fuse/connection.go
+++ b/pkg/sentry/fsimpl/fuse/connection.go
@@ -16,32 +16,18 @@ package fuse
import (
"errors"
- "fmt"
"sync"
- "sync/atomic"
- "syscall"
-
- "gvisor.dev/gvisor/tools/go_marshal/marshal"
"gvisor.dev/gvisor/pkg/abi/linux"
"gvisor.dev/gvisor/pkg/context"
"gvisor.dev/gvisor/pkg/log"
"gvisor.dev/gvisor/pkg/marshal"
"gvisor.dev/gvisor/pkg/sentry/kernel"
- "gvisor.dev/gvisor/pkg/sentry/kernel/auth"
"gvisor.dev/gvisor/pkg/sentry/vfs"
"gvisor.dev/gvisor/pkg/syserror"
"gvisor.dev/gvisor/pkg/waiter"
)
-// maxActiveRequestsDefault is the default setting controlling the upper bound
-// on the number of active requests at any given time.
-const maxActiveRequestsDefault = 10000
-
-// Ordinary requests have even IDs, while interrupts IDs are odd.
-// Used to increment the unique ID for each FUSE request.
-var reqIDStep uint64 = 2
-
const (
// fuseDefaultMaxBackground is the default value for MaxBackground.
fuseDefaultMaxBackground = 12
@@ -54,33 +40,11 @@ const (
fuseDefaultMaxPagesPerReq = 32
)
-// Request represents a FUSE operation request that hasn't been sent to the
-// server yet.
-//
-// +stateify savable
-type Request struct {
- requestEntry
-
- id linux.FUSEOpID
- hdr *linux.FUSEHeaderIn
- data []byte
-
- // payload for this request: extra bytes to write after
- // the data slice. Used by FUSE_WRITE.
- payload []byte
-}
-
-// Response represents an actual response from the server, including the
-// response payload.
-//
-// +stateify savable
-type Response struct {
- opcode linux.FUSEOpcode
- hdr linux.FUSEHeaderOut
- data []byte
-}
-
// connection is the struct by which the sentry communicates with the FUSE server daemon.
+// Lock order:
+// - conn.fd.mu
+// - conn.mu
+// - conn.asyncMu
type connection struct {
fd *DeviceFD
@@ -115,10 +79,6 @@ type connection struct {
// initializedChan is used to block requests before initialization.
initializedChan chan struct{}
- // blocked when there are too many outstading backgrounds requests (NumBackground == MaxBackground).
- // TODO(gvisor.dev/issue/3185): update the numBackground accordingly; use a channel to block.
- blocked bool
-
// connected (connection established) when a new FUSE file system is created.
// Set to false when:
// umount,
@@ -126,51 +86,52 @@ type connection struct {
// device release.
connected bool
- // aborted via sysfs.
- // TODO(gvisor.dev/issue/3185): abort all queued requests.
- aborted bool
-
- // atomicOTrunc is true when FUSE does not send a separate SETATTR request
- // before open with O_TRUNC flag.
- // Negotiated and only set in INIT.
- atomicOTrunc bool
-
// connInitError if FUSE_INIT encountered error (major version mismatch).
// Only set in INIT.
connInitError bool
// connInitSuccess if FUSE_INIT is successful.
// Only set in INIT.
- // Used for destory.
+ // Used for destory (not yet implemented).
connInitSuccess bool
- // TODO(gvisor.dev/issue/3185): All the queue logic are working in progress.
-
- // NumberBackground is the number of requests in the background.
- numBackground uint16
+ // aborted via sysfs, and will send ECONNABORTED to read after disconnection (instead of ENODEV).
+ // Set only if abortErr is true and via fuse control fs (not yet implemented).
+ // TODO(gvisor.dev/issue/3525): set this to true when user aborts.
+ aborted bool
- // congestionThreshold for NumBackground.
- // Negotiated in FUSE_INIT.
- congestionThreshold uint16
+ // numWating is the number of requests waiting to be
+ // sent to FUSE device or being processed by FUSE daemon.
+ numWaiting uint32
- // maxBackground is the maximum number of NumBackground.
- // Block connection when it is reached.
- // Negotiated in FUSE_INIT.
- maxBackground uint16
+ // Terminology note:
+ //
+ // - `asyncNumMax` is the `MaxBackground` in the FUSE_INIT_IN struct.
+ //
+ // - `asyncCongestionThreshold` is the `CongestionThreshold` in the FUSE_INIT_IN struct.
+ //
+ // We call the "background" requests in unix term as async requests.
+ // The "async requests" in unix term is our async requests that expect a reply,
+ // i.e. `!requestOptions.noReply`
- // numActiveBackground is the number of requests in background and has being marked as active.
- numActiveBackground uint16
+ // asyncMu protects the async request fields.
+ asyncMu sync.Mutex
- // numWating is the number of requests waiting for completion.
- numWaiting uint32
+ // asyncNum is the number of async requests.
+ // Protected by asyncMu.
+ asyncNum uint16
- // TODO(gvisor.dev/issue/3185): BgQueue
- // some queue for background queued requests.
+ // asyncCongestionThreshold the number of async requests.
+ // Negotiated in FUSE_INIT as "CongestionThreshold".
+ // TODO(gvisor.dev/issue/3529): add congestion control.
+ // Protected by asyncMu.
+ asyncCongestionThreshold uint16
- // bgLock protects:
- // MaxBackground, CongestionThreshold, NumBackground,
- // NumActiveBackground, BgQueue, Blocked.
- bgLock sync.Mutex
+ // asyncNumMax is the maximum number of asyncNum.
+ // Connection blocks the async requests when it is reached.
+ // Negotiated in FUSE_INIT as "MaxBackground".
+ // Protected by asyncMu.
+ asyncNumMax uint16
// maxRead is the maximum size of a read buffer in in bytes.
// Initialized from a fuse fs parameter.
@@ -188,6 +149,11 @@ type connection struct {
// Negotiated and only set in INIT.
minor uint32
+ // atomicOTrunc is true when FUSE does not send a separate SETATTR request
+ // before open with O_TRUNC flag.
+ // Negotiated and only set in INIT.
+ atomicOTrunc bool
+
// asyncRead if read pages asynchronously.
// Negotiated and only set in INIT.
asyncRead bool
@@ -225,63 +191,13 @@ func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, opts *filesys
fuseFD.writeCursor = 0
return &connection{
- fd: fuseFD,
- maxBackground: fuseDefaultMaxBackground,
- congestionThreshold: fuseDefaultCongestionThreshold,
- maxRead: opts.maxRead,
- maxPages: fuseDefaultMaxPagesPerReq,
- initializedChan: make(chan struct{}),
- connected: true,
- }, nil
-}
-
-// SetInitialized atomically sets the connection as initialized.
-func (conn *connection) SetInitialized() {
- // Unblock the requests sent before INIT.
- close(conn.initializedChan)
-
- // Close the channel first to avoid the non-atomic situation
- // where conn.initialized is true but there are
- // tasks being blocked on the channel.
- // And it prevents the newer tasks from gaining
- // unnecessary higher chance to be issued before the blocked one.
-
- atomic.StoreInt32(&(conn.initialized), int32(1))
-}
-
-// IsInitialized atomically check if the connection is initialized.
-// pairs with SetInitialized().
-func (conn *connection) Initialized() bool {
- return atomic.LoadInt32(&(conn.initialized)) != 0
-}
-
-// NewRequest creates a new request that can be sent to the FUSE server.
-func (conn *connection) NewRequest(creds *auth.Credentials, pid uint32, ino uint64, opcode linux.FUSEOpcode, payload marshal.Marshallable) (*Request, error) {
- conn.fd.mu.Lock()
- defer conn.fd.mu.Unlock()
- conn.fd.nextOpID += linux.FUSEOpID(reqIDStep)
-
- hdrLen := (*linux.FUSEHeaderIn)(nil).SizeBytes()
- hdr := linux.FUSEHeaderIn{
- Len: uint32(hdrLen + payload.SizeBytes()),
- Opcode: opcode,
- Unique: conn.fd.nextOpID,
- NodeID: ino,
- UID: uint32(creds.EffectiveKUID),
- GID: uint32(creds.EffectiveKGID),
- PID: pid,
- }
-
- buf := make([]byte, hdr.Len)
-
- // TODO(gVisor.dev/3698): Use the unsafe version once go_marshal is safe to use again.
- hdr.MarshalBytes(buf[:hdrLen])
- payload.MarshalBytes(buf[hdrLen:])
-
- return &Request{
- id: hdr.Unique,
- hdr: &hdr,
- data: buf,
+ fd: fuseFD,
+ asyncNumMax: fuseDefaultMaxBackground,
+ asyncCongestionThreshold: fuseDefaultCongestionThreshold,
+ maxRead: opts.maxRead,
+ maxPages: fuseDefaultMaxPagesPerReq,
+ initializedChan: make(chan struct{}),
+ connected: true,
}, nil
}
@@ -337,43 +253,6 @@ func (conn *connection) call(t *kernel.Task, r *Request) (*Response, error) {
return fut.resolve(t)
}
-// Error returns the error of the FUSE call.
-func (r *Response) Error() error {
- errno := r.hdr.Error
- if errno >= 0 {
- return nil
- }
-
- sysErrNo := syscall.Errno(-errno)
- return error(sysErrNo)
-}
-
-// DataLen returns the size of the response without the header.
-func (r *Response) DataLen() uint32 {
- return r.hdr.Len - uint32(r.hdr.SizeBytes())
-}
-
-// UnmarshalPayload unmarshals the response data into m.
-func (r *Response) UnmarshalPayload(m marshal.Marshallable) error {
- hdrLen := r.hdr.SizeBytes()
- haveDataLen := r.hdr.Len - uint32(hdrLen)
- wantDataLen := uint32(m.SizeBytes())
-
- if haveDataLen < wantDataLen {
- return fmt.Errorf("payload too small. Minimum data lenth required: %d, but got data length %d", wantDataLen, haveDataLen)
- }
-
- // The response data is empty unless there is some payload. And so, doesn't
- // need to be unmarshalled.
- if r.data == nil {
- return nil
- }
-
- // TODO(gVisor.dev/3698): Use the unsafe version once go_marshal is safe to use again.
- m.UnmarshalBytes(r.data[hdrLen:])
- return nil
-}
-
// callFuture makes a request to the server and returns a future response.
// Call resolve() when the response needs to be fulfilled.
func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, error) {
@@ -422,50 +301,3 @@ func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureRes
return fut, nil
}
-
-// futureResponse represents an in-flight request, that may or may not have
-// completed yet. Convert it to a resolved Response by calling Resolve, but note
-// that this may block.
-//
-// +stateify savable
-type futureResponse struct {
- opcode linux.FUSEOpcode
- ch chan struct{}
- hdr *linux.FUSEHeaderOut
- data []byte
-}
-
-// newFutureResponse creates a future response to a FUSE request.
-func newFutureResponse(opcode linux.FUSEOpcode) *futureResponse {
- return &futureResponse{
- opcode: opcode,
- ch: make(chan struct{}),
- }
-}
-
-// resolve blocks the task until the server responds to its corresponding request,
-// then returns a resolved response.
-func (f *futureResponse) resolve(t *kernel.Task) (*Response, error) {
- // If there is no Task associated with this request - then we don't try to resolve
- // the response. Instead, the task writing the response (proxy to the server) will
- // process the response on our behalf.
- if t == nil {
- log.Infof("fuse.Response.resolve: Not waiting on a response from server.")
- return nil, nil
- }
-
- if err := t.Block(f.ch); err != nil {
- return nil, err
- }
-
- return f.getResponse(), nil
-}
-
-// getResponse creates a Response from the data the futureResponse has.
-func (f *futureResponse) getResponse() *Response {
- return &Response{
- opcode: f.opcode,
- hdr: *f.hdr,
- data: f.data,
- }
-}
diff --git a/pkg/sentry/fsimpl/fuse/init.go b/pkg/sentry/fsimpl/fuse/connection_control.go
index a47309b6e..d84d9caf2 100644
--- a/pkg/sentry/fsimpl/fuse/init.go
+++ b/pkg/sentry/fsimpl/fuse/connection_control.go
@@ -15,6 +15,8 @@
package fuse
import (
+ "sync/atomic"
+
"gvisor.dev/gvisor/pkg/abi/linux"
"gvisor.dev/gvisor/pkg/sentry/kernel/auth"
)
@@ -50,6 +52,26 @@ var (
MaxUserCongestionThreshold uint16 = fuseDefaultCongestionThreshold
)
+// SetInitialized atomically sets the connection as initialized.
+func (conn *connection) SetInitialized() {
+ // Unblock the requests sent before INIT.
+ close(conn.initializedChan)
+
+ // Close the channel first to avoid the non-atomic situation
+ // where conn.initialized is true but there are
+ // tasks being blocked on the channel.
+ // And it prevents the newer tasks from gaining
+ // unnecessary higher chance to be issued before the blocked one.
+
+ atomic.StoreInt32(&(conn.initialized), int32(1))
+}
+
+// IsInitialized atomically check if the connection is initialized.
+// pairs with SetInitialized().
+func (conn *connection) Initialized() bool {
+ return atomic.LoadInt32(&(conn.initialized)) != 0
+}
+
// InitSend sends a FUSE_INIT request.
func (conn *connection) InitSend(creds *auth.Credentials, pid uint32) error {
in := linux.FUSEInitIn{
@@ -85,15 +107,15 @@ func (conn *connection) InitRecv(res *Response, hasSysAdminCap bool) error {
}
// Process the FUSE_INIT reply from the FUSE server.
+// It tries to acquire the conn.asyncMu lock if minor version is newer than 13.
func (conn *connection) initProcessReply(out *linux.FUSEInitOut, hasSysAdminCap bool) error {
+ // No matter error or not, always set initialzied.
+ // to unblock the blocked requests.
+ defer conn.SetInitialized()
+
// No support for old major fuse versions.
if out.Major != linux.FUSE_KERNEL_VERSION {
conn.connInitError = true
-
- // Set the connection as initialized and unblock the blocked requests
- // (i.e. return error for them).
- conn.SetInitialized()
-
return nil
}
@@ -101,29 +123,14 @@ func (conn *connection) initProcessReply(out *linux.FUSEInitOut, hasSysAdminCap
conn.connInitSuccess = true
conn.minor = out.Minor
- // No support for limits before minor version 13.
- if out.Minor >= 13 {
- conn.bgLock.Lock()
-
- if out.MaxBackground > 0 {
- conn.maxBackground = out.MaxBackground
-
- if !hasSysAdminCap &&
- conn.maxBackground > MaxUserBackgroundRequest {
- conn.maxBackground = MaxUserBackgroundRequest
- }
- }
-
- if out.CongestionThreshold > 0 {
- conn.congestionThreshold = out.CongestionThreshold
-
- if !hasSysAdminCap &&
- conn.congestionThreshold > MaxUserCongestionThreshold {
- conn.congestionThreshold = MaxUserCongestionThreshold
- }
- }
-
- conn.bgLock.Unlock()
+ // No support for negotiating MaxWrite before minor version 5.
+ if out.Minor >= 5 {
+ conn.maxWrite = out.MaxWrite
+ } else {
+ conn.maxWrite = fuseMinMaxWrite
+ }
+ if conn.maxWrite < fuseMinMaxWrite {
+ conn.maxWrite = fuseMinMaxWrite
}
// No support for the following flags before minor version 6.
@@ -147,19 +154,30 @@ func (conn *connection) initProcessReply(out *linux.FUSEInitOut, hasSysAdminCap
}
}
- // No support for negotiating MaxWrite before minor version 5.
- if out.Minor >= 5 {
- conn.maxWrite = out.MaxWrite
- } else {
- conn.maxWrite = fuseMinMaxWrite
- }
- if conn.maxWrite < fuseMinMaxWrite {
- conn.maxWrite = fuseMinMaxWrite
- }
+ // No support for limits before minor version 13.
+ if out.Minor >= 13 {
+ conn.asyncMu.Lock()
+
+ if out.MaxBackground > 0 {
+ conn.asyncNumMax = out.MaxBackground
+
+ if !hasSysAdminCap &&
+ conn.asyncNumMax > MaxUserBackgroundRequest {
+ conn.asyncNumMax = MaxUserBackgroundRequest
+ }
+ }
- // Set connection as initialized and unblock the requests
- // issued before init.
- conn.SetInitialized()
+ if out.CongestionThreshold > 0 {
+ conn.asyncCongestionThreshold = out.CongestionThreshold
+
+ if !hasSysAdminCap &&
+ conn.asyncCongestionThreshold > MaxUserCongestionThreshold {
+ conn.asyncCongestionThreshold = MaxUserCongestionThreshold
+ }
+ }
+
+ conn.asyncMu.Unlock()
+ }
return nil
}
diff --git a/pkg/sentry/fsimpl/fuse/dev_test.go b/pkg/sentry/fsimpl/fuse/dev_test.go
index c18921dd8..59928d75e 100644
--- a/pkg/sentry/fsimpl/fuse/dev_test.go
+++ b/pkg/sentry/fsimpl/fuse/dev_test.go
@@ -369,7 +369,7 @@ func newTestConnection(system *testutil.System, k *kernel.Kernel, maxActiveReque
fsopts := filesystemOptions{
maxActiveRequests: maxActiveRequests,
}
- fs, err := NewFUSEFilesystem(system.Ctx, 0, &fsopts, &fuseDev.vfsfd)
+ fs, err := newFUSEFilesystem(system.Ctx, 0, &fsopts, &fuseDev.vfsfd)
if err != nil {
return nil, nil, err
}
diff --git a/pkg/sentry/fsimpl/fuse/fusefs.go b/pkg/sentry/fsimpl/fuse/fusefs.go
index 572245303..30725182a 100644
--- a/pkg/sentry/fsimpl/fuse/fusefs.go
+++ b/pkg/sentry/fsimpl/fuse/fusefs.go
@@ -35,6 +35,10 @@ import (
// Name is the default filesystem name.
const Name = "fuse"
+// maxActiveRequestsDefault is the default setting controlling the upper bound
+// on the number of active requests at any given time.
+const maxActiveRequestsDefault = 10000
+
// FilesystemType implements vfs.FilesystemType.
type FilesystemType struct{}
@@ -168,12 +172,12 @@ func (fsType FilesystemType) GetFilesystem(ctx context.Context, vfsObj *vfs.Virt
// Check for unparsed options.
if len(mopts) != 0 {
- log.Warningf("%s.GetFilesystem: unknown options: %v", fsType.Name(), mopts)
+ log.Warningf("%s.GetFilesystem: unsupported or unknown options: %v", fsType.Name(), mopts)
return nil, nil, syserror.EINVAL
}
// Create a new FUSE filesystem.
- fs, err := NewFUSEFilesystem(ctx, devMinor, &fsopts, fuseFd)
+ fs, err := newFUSEFilesystem(ctx, devMinor, &fsopts, fuseFd)
if err != nil {
log.Warningf("%s.NewFUSEFilesystem: failed with error: %v", fsType.Name(), err)
return nil, nil, err
@@ -194,21 +198,22 @@ func (fsType FilesystemType) GetFilesystem(ctx context.Context, vfsObj *vfs.Virt
return fs.VFSFilesystem(), root.VFSDentry(), nil
}
-// NewFUSEFilesystem creates a new FUSE filesystem.
-func NewFUSEFilesystem(ctx context.Context, devMinor uint32, opts *filesystemOptions, device *vfs.FileDescription) (*filesystem, error) {
- fs := &filesystem{
- devMinor: devMinor,
- opts: opts,
- }
-
+// newFUSEFilesystem creates a new FUSE filesystem.
+func newFUSEFilesystem(ctx context.Context, devMinor uint32, opts *filesystemOptions, device *vfs.FileDescription) (*filesystem, error) {
conn, err := newFUSEConnection(ctx, device, opts)
if err != nil {
log.Warningf("fuse.NewFUSEFilesystem: NewFUSEConnection failed with error: %v", err)
return nil, syserror.EINVAL
}
- fs.conn = conn
fuseFD := device.Impl().(*DeviceFD)
+
+ fs := &filesystem{
+ devMinor: devMinor,
+ opts: opts,
+ conn: conn,
+ }
+
fuseFD.fs = fs
return fs, nil
diff --git a/pkg/sentry/fsimpl/fuse/request_response.go b/pkg/sentry/fsimpl/fuse/request_response.go
index a69b21221..648eaf263 100644
--- a/pkg/sentry/fsimpl/fuse/request_response.go
+++ b/pkg/sentry/fsimpl/fuse/request_response.go
@@ -15,7 +15,13 @@
package fuse
import (
+ "fmt"
+ "syscall"
+
"gvisor.dev/gvisor/pkg/abi/linux"
+ "gvisor.dev/gvisor/pkg/log"
+ "gvisor.dev/gvisor/pkg/sentry/kernel"
+ "gvisor.dev/gvisor/pkg/sentry/kernel/auth"
"gvisor.dev/gvisor/pkg/usermem"
"gvisor.dev/gvisor/tools/go_marshal/marshal"
)
@@ -71,3 +77,147 @@ func (r *fuseInitRes) UnmarshalBytes(src []byte) {
func (r *fuseInitRes) SizeBytes() int {
return int(r.initLen)
}
+
+// Ordinary requests have even IDs, while interrupts IDs are odd.
+// Used to increment the unique ID for each FUSE request.
+var reqIDStep uint64 = 2
+
+// Request represents a FUSE operation request that hasn't been sent to the
+// server yet.
+//
+// +stateify savable
+type Request struct {
+ requestEntry
+
+ id linux.FUSEOpID
+ hdr *linux.FUSEHeaderIn
+ data []byte
+
+ // payload for this request: extra bytes to write after
+ // the data slice. Used by FUSE_WRITE.
+ payload []byte
+}
+
+// NewRequest creates a new request that can be sent to the FUSE server.
+func (conn *connection) NewRequest(creds *auth.Credentials, pid uint32, ino uint64, opcode linux.FUSEOpcode, payload marshal.Marshallable) (*Request, error) {
+ conn.fd.mu.Lock()
+ defer conn.fd.mu.Unlock()
+ conn.fd.nextOpID += linux.FUSEOpID(reqIDStep)
+
+ hdrLen := (*linux.FUSEHeaderIn)(nil).SizeBytes()
+ hdr := linux.FUSEHeaderIn{
+ Len: uint32(hdrLen + payload.SizeBytes()),
+ Opcode: opcode,
+ Unique: conn.fd.nextOpID,
+ NodeID: ino,
+ UID: uint32(creds.EffectiveKUID),
+ GID: uint32(creds.EffectiveKGID),
+ PID: pid,
+ }
+
+ buf := make([]byte, hdr.Len)
+
+ // TODO(gVisor.dev/3698): Use the unsafe version once go_marshal is safe to use again.
+ hdr.MarshalBytes(buf[:hdrLen])
+ payload.MarshalBytes(buf[hdrLen:])
+
+ return &Request{
+ id: hdr.Unique,
+ hdr: &hdr,
+ data: buf,
+ }, nil
+}
+
+// futureResponse represents an in-flight request, that may or may not have
+// completed yet. Convert it to a resolved Response by calling Resolve, but note
+// that this may block.
+//
+// +stateify savable
+type futureResponse struct {
+ opcode linux.FUSEOpcode
+ ch chan struct{}
+ hdr *linux.FUSEHeaderOut
+ data []byte
+}
+
+// newFutureResponse creates a future response to a FUSE request.
+func newFutureResponse(opcode linux.FUSEOpcode) *futureResponse {
+ return &futureResponse{
+ opcode: opcode,
+ ch: make(chan struct{}),
+ }
+}
+
+// resolve blocks the task until the server responds to its corresponding request,
+// then returns a resolved response.
+func (f *futureResponse) resolve(t *kernel.Task) (*Response, error) {
+ // If there is no Task associated with this request - then we don't try to resolve
+ // the response. Instead, the task writing the response (proxy to the server) will
+ // process the response on our behalf.
+ if t == nil {
+ log.Infof("fuse.Response.resolve: Not waiting on a response from server.")
+ return nil, nil
+ }
+
+ if err := t.Block(f.ch); err != nil {
+ return nil, err
+ }
+
+ return f.getResponse(), nil
+}
+
+// getResponse creates a Response from the data the futureResponse has.
+func (f *futureResponse) getResponse() *Response {
+ return &Response{
+ opcode: f.opcode,
+ hdr: *f.hdr,
+ data: f.data,
+ }
+}
+
+// Response represents an actual response from the server, including the
+// response payload.
+//
+// +stateify savable
+type Response struct {
+ opcode linux.FUSEOpcode
+ hdr linux.FUSEHeaderOut
+ data []byte
+}
+
+// Error returns the error of the FUSE call.
+func (r *Response) Error() error {
+ errno := r.hdr.Error
+ if errno >= 0 {
+ return nil
+ }
+
+ sysErrNo := syscall.Errno(-errno)
+ return error(sysErrNo)
+}
+
+// DataLen returns the size of the response without the header.
+func (r *Response) DataLen() uint32 {
+ return r.hdr.Len - uint32(r.hdr.SizeBytes())
+}
+
+// UnmarshalPayload unmarshals the response data into m.
+func (r *Response) UnmarshalPayload(m marshal.Marshallable) error {
+ hdrLen := r.hdr.SizeBytes()
+ haveDataLen := r.hdr.Len - uint32(hdrLen)
+ wantDataLen := uint32(m.SizeBytes())
+
+ if haveDataLen < wantDataLen {
+ return fmt.Errorf("payload too small. Minimum data lenth required: %d, but got data length %d", wantDataLen, haveDataLen)
+ }
+
+ // The response data is empty unless there is some payload. And so, doesn't
+ // need to be unmarshalled.
+ if r.data == nil {
+ return nil
+ }
+
+ // TODO(gVisor.dev/3698): Use the unsafe version once go_marshal is safe to use again.
+ m.UnmarshalBytes(r.data[hdrLen:])
+ return nil
+}