summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/fsimpl/fuse/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/sentry/fsimpl/fuse/connection.go')
-rw-r--r--pkg/sentry/fsimpl/fuse/connection.go348
1 files changed, 241 insertions, 107 deletions
diff --git a/pkg/sentry/fsimpl/fuse/connection.go b/pkg/sentry/fsimpl/fuse/connection.go
index a7402c149..6df2728ab 100644
--- a/pkg/sentry/fsimpl/fuse/connection.go
+++ b/pkg/sentry/fsimpl/fuse/connection.go
@@ -15,17 +15,31 @@
package fuse
import (
+ "errors"
+ "fmt"
"sync"
+ "sync/atomic"
+ "syscall"
"gvisor.dev/gvisor/pkg/abi/linux"
"gvisor.dev/gvisor/pkg/context"
"gvisor.dev/gvisor/pkg/log"
"gvisor.dev/gvisor/pkg/sentry/kernel"
+ "gvisor.dev/gvisor/pkg/sentry/kernel/auth"
"gvisor.dev/gvisor/pkg/sentry/vfs"
"gvisor.dev/gvisor/pkg/syserror"
"gvisor.dev/gvisor/pkg/waiter"
+ "gvisor.dev/gvisor/tools/go_marshal/marshal"
)
+// maxActiveRequestsDefault is the default setting controlling the upper bound
+// on the number of active requests at any given time.
+const maxActiveRequestsDefault = 10000
+
+// Ordinary requests have even IDs, while interrupts IDs are odd.
+// Used to increment the unique ID for each FUSE request.
+var reqIDStep uint64 = 2
+
const (
// fuseDefaultMaxBackground is the default value for MaxBackground.
fuseDefaultMaxBackground = 12
@@ -38,36 +52,43 @@ const (
fuseDefaultMaxPagesPerReq = 32
)
+// Request represents a FUSE operation request that hasn't been sent to the
+// server yet.
+//
+// +stateify savable
+type Request struct {
+ requestEntry
+
+ id linux.FUSEOpID
+ hdr *linux.FUSEHeaderIn
+ data []byte
+}
+
+// Response represents an actual response from the server, including the
+// response payload.
+//
+// +stateify savable
+type Response struct {
+ opcode linux.FUSEOpcode
+ hdr linux.FUSEHeaderOut
+ data []byte
+}
+
// connection is the struct by which the sentry communicates with the FUSE server daemon.
-// Lock order:
-// - conn.fd.mu
-// - conn.mu
-// - conn.asyncMu
type connection struct {
fd *DeviceFD
- // mu protects access to struct memebers.
- mu sync.Mutex
-
- // attributeVersion is the version of connection's attributes.
- attributeVersion uint64
-
- // We target FUSE 7.23.
// The following FUSE_INIT flags are currently unsupported by this implementation:
+ // - FUSE_ATOMIC_O_TRUNC: requires open(..., O_TRUNC)
// - FUSE_EXPORT_SUPPORT
+ // - FUSE_HANDLE_KILLPRIV
// - FUSE_POSIX_LOCKS: requires POSIX locks
// - FUSE_FLOCK_LOCKS: requires POSIX locks
// - FUSE_AUTO_INVAL_DATA: requires page caching eviction
+ // - FUSE_EXPLICIT_INVAL_DATA: requires page caching eviction
// - FUSE_DO_READDIRPLUS/FUSE_READDIRPLUS_AUTO: requires FUSE_READDIRPLUS implementation
// - FUSE_ASYNC_DIO
- // - FUSE_PARALLEL_DIROPS (7.25)
- // - FUSE_HANDLE_KILLPRIV (7.26)
- // - FUSE_POSIX_ACL: affects defaultPermissions, posixACL, xattr handler (7.26)
- // - FUSE_ABORT_ERROR (7.27)
- // - FUSE_CACHE_SYMLINKS (7.28)
- // - FUSE_NO_OPENDIR_SUPPORT (7.29)
- // - FUSE_EXPLICIT_INVAL_DATA: requires page caching eviction (7.30)
- // - FUSE_MAP_ALIGNMENT (7.31)
+ // - FUSE_POSIX_ACL: affects defaultPermissions, posixACL, xattr handler
// initialized after receiving FUSE_INIT reply.
// Until it's set, suspend sending FUSE requests.
@@ -77,6 +98,10 @@ type connection struct {
// initializedChan is used to block requests before initialization.
initializedChan chan struct{}
+ // blocked when there are too many outstading backgrounds requests (NumBackground == MaxBackground).
+ // TODO(gvisor.dev/issue/3185): update the numBackground accordingly; use a channel to block.
+ blocked bool
+
// connected (connection established) when a new FUSE file system is created.
// Set to false when:
// umount,
@@ -84,55 +109,48 @@ type connection struct {
// device release.
connected bool
+ // aborted via sysfs.
+ // TODO(gvisor.dev/issue/3185): abort all queued requests.
+ aborted bool
+
// connInitError if FUSE_INIT encountered error (major version mismatch).
// Only set in INIT.
connInitError bool
// connInitSuccess if FUSE_INIT is successful.
// Only set in INIT.
- // Used for destory (not yet implemented).
+ // Used for destory.
connInitSuccess bool
- // aborted via sysfs, and will send ECONNABORTED to read after disconnection (instead of ENODEV).
- // Set only if abortErr is true and via fuse control fs (not yet implemented).
- // TODO(gvisor.dev/issue/3525): set this to true when user aborts.
- aborted bool
+ // TODO(gvisor.dev/issue/3185): All the queue logic are working in progress.
- // numWating is the number of requests waiting to be
- // sent to FUSE device or being processed by FUSE daemon.
- numWaiting uint32
+ // NumberBackground is the number of requests in the background.
+ numBackground uint16
- // Terminology note:
- //
- // - `asyncNumMax` is the `MaxBackground` in the FUSE_INIT_IN struct.
- //
- // - `asyncCongestionThreshold` is the `CongestionThreshold` in the FUSE_INIT_IN struct.
- //
- // We call the "background" requests in unix term as async requests.
- // The "async requests" in unix term is our async requests that expect a reply,
- // i.e. `!requestOptions.noReply`
+ // congestionThreshold for NumBackground.
+ // Negotiated in FUSE_INIT.
+ congestionThreshold uint16
+
+ // maxBackground is the maximum number of NumBackground.
+ // Block connection when it is reached.
+ // Negotiated in FUSE_INIT.
+ maxBackground uint16
- // asyncMu protects the async request fields.
- asyncMu sync.Mutex
+ // numActiveBackground is the number of requests in background and has being marked as active.
+ numActiveBackground uint16
- // asyncNum is the number of async requests.
- // Protected by asyncMu.
- asyncNum uint16
+ // numWating is the number of requests waiting for completion.
+ numWaiting uint32
- // asyncCongestionThreshold the number of async requests.
- // Negotiated in FUSE_INIT as "CongestionThreshold".
- // TODO(gvisor.dev/issue/3529): add congestion control.
- // Protected by asyncMu.
- asyncCongestionThreshold uint16
+ // TODO(gvisor.dev/issue/3185): BgQueue
+ // some queue for background queued requests.
- // asyncNumMax is the maximum number of asyncNum.
- // Connection blocks the async requests when it is reached.
- // Negotiated in FUSE_INIT as "MaxBackground".
- // Protected by asyncMu.
- asyncNumMax uint16
+ // bgLock protects:
+ // MaxBackground, CongestionThreshold, NumBackground,
+ // NumActiveBackground, BgQueue, Blocked.
+ bgLock sync.Mutex
// maxRead is the maximum size of a read buffer in in bytes.
- // Initialized from a fuse fs parameter.
maxRead uint32
// maxWrite is the maximum size of a write buffer in bytes.
@@ -147,20 +165,23 @@ type connection struct {
// Negotiated and only set in INIT.
minor uint32
- // atomicOTrunc is true when FUSE does not send a separate SETATTR request
- // before open with O_TRUNC flag.
- // Negotiated and only set in INIT.
- atomicOTrunc bool
-
// asyncRead if read pages asynchronously.
// Negotiated and only set in INIT.
asyncRead bool
+ // abortErr is true if kernel need to return an unique read error after abort.
+ // Negotiated and only set in INIT.
+ abortErr bool
+
// writebackCache is true for write-back cache policy,
// false for write-through policy.
// Negotiated and only set in INIT.
writebackCache bool
+ // cacheSymlinks if filesystem needs to cache READLINK responses in page cache.
+ // Negotiated and only set in INIT.
+ cacheSymlinks bool
+
// bigWrites if doing multi-page cached writes.
// Negotiated and only set in INIT.
bigWrites bool
@@ -168,70 +189,116 @@ type connection struct {
// dontMask if filestestem does not apply umask to creation modes.
// Negotiated in INIT.
dontMask bool
-
- // noOpen if FUSE server doesn't support open operation.
- // This flag only influence performance, not correctness of the program.
- noOpen bool
}
// newFUSEConnection creates a FUSE connection to fd.
-func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, opts *filesystemOptions) (*connection, error) {
+func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, maxInFlightRequests uint64) (*connection, error) {
// Mark the device as ready so it can be used. /dev/fuse can only be used if the FD was used to
// mount a FUSE filesystem.
fuseFD := fd.Impl().(*DeviceFD)
+ fuseFD.mounted = true
// Create the writeBuf for the header to be stored in.
hdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes())
fuseFD.writeBuf = make([]byte, hdrLen)
fuseFD.completions = make(map[linux.FUSEOpID]*futureResponse)
- fuseFD.fullQueueCh = make(chan struct{}, opts.maxActiveRequests)
+ fuseFD.fullQueueCh = make(chan struct{}, maxInFlightRequests)
fuseFD.writeCursor = 0
return &connection{
- fd: fuseFD,
- asyncNumMax: fuseDefaultMaxBackground,
- asyncCongestionThreshold: fuseDefaultCongestionThreshold,
- maxRead: opts.maxRead,
- maxPages: fuseDefaultMaxPagesPerReq,
- initializedChan: make(chan struct{}),
- connected: true,
+ fd: fuseFD,
+ maxBackground: fuseDefaultMaxBackground,
+ congestionThreshold: fuseDefaultCongestionThreshold,
+ maxPages: fuseDefaultMaxPagesPerReq,
+ initializedChan: make(chan struct{}),
+ connected: true,
}, nil
}
-// CallAsync makes an async (aka background) request.
-// It's a simple wrapper around Call().
-func (conn *connection) CallAsync(t *kernel.Task, r *Request) error {
- r.async = true
- _, err := conn.Call(t, r)
- return err
+// SetInitialized atomically sets the connection as initialized.
+func (conn *connection) SetInitialized() {
+ // Unblock the requests sent before INIT.
+ close(conn.initializedChan)
+
+ // Close the channel first to avoid the non-atomic situation
+ // where conn.initialized is true but there are
+ // tasks being blocked on the channel.
+ // And it prevents the newer tasks from gaining
+ // unnecessary higher chance to be issued before the blocked one.
+
+ atomic.StoreInt32(&(conn.initialized), int32(1))
}
-// Call makes a request to the server.
-// Block before the connection is initialized.
-// When the Request is FUSE_INIT, it will not be blocked before initialization.
-// Task should never be nil.
-//
-// For a sync request, it blocks the invoking task until
-// a server responds with a response.
-//
-// For an async request (that do not expect a response immediately),
-// it returns directly unless being blocked either before initialization
-// or when there are too many async requests ongoing.
-//
-// Example for async request:
-// init, readahead, write, async read/write, fuse_notify_reply,
-// non-sync release, interrupt, forget.
-//
-// The forget request does not have a reply,
-// as documented in include/uapi/linux/fuse.h:FUSE_FORGET.
+// IsInitialized atomically check if the connection is initialized.
+// pairs with SetInitialized().
+func (conn *connection) Initialized() bool {
+ return atomic.LoadInt32(&(conn.initialized)) != 0
+}
+
+// NewRequest creates a new request that can be sent to the FUSE server.
+func (conn *connection) NewRequest(creds *auth.Credentials, pid uint32, ino uint64, opcode linux.FUSEOpcode, payload marshal.Marshallable) (*Request, error) {
+ conn.fd.mu.Lock()
+ defer conn.fd.mu.Unlock()
+ conn.fd.nextOpID += linux.FUSEOpID(reqIDStep)
+
+ hdrLen := (*linux.FUSEHeaderIn)(nil).SizeBytes()
+ hdr := linux.FUSEHeaderIn{
+ Len: uint32(hdrLen + payload.SizeBytes()),
+ Opcode: opcode,
+ Unique: conn.fd.nextOpID,
+ NodeID: ino,
+ UID: uint32(creds.EffectiveKUID),
+ GID: uint32(creds.EffectiveKGID),
+ PID: pid,
+ }
+
+ buf := make([]byte, hdr.Len)
+ hdr.MarshalUnsafe(buf[:hdrLen])
+ payload.MarshalUnsafe(buf[hdrLen:])
+
+ return &Request{
+ id: hdr.Unique,
+ hdr: &hdr,
+ data: buf,
+ }, nil
+}
+
+// Call makes a request to the server and blocks the invoking task until a
+// server responds with a response. Task should never be nil.
+// Requests will not be sent before the connection is initialized.
+// For async tasks, use CallAsync().
func (conn *connection) Call(t *kernel.Task, r *Request) (*Response, error) {
// Block requests sent before connection is initalized.
- if !conn.Initialized() && r.hdr.Opcode != linux.FUSE_INIT {
+ if !conn.Initialized() {
if err := t.Block(conn.initializedChan); err != nil {
return nil, err
}
}
+ return conn.call(t, r)
+}
+
+// CallAsync makes an async (aka background) request.
+// Those requests either do not expect a response (e.g. release) or
+// the response should be handled by others (e.g. init).
+// Return immediately unless the connection is blocked (before initialization).
+// Async call example: init, release, forget, aio, interrupt.
+// When the Request is FUSE_INIT, it will not be blocked before initialization.
+func (conn *connection) CallAsync(t *kernel.Task, r *Request) error {
+ // Block requests sent before connection is initalized.
+ if !conn.Initialized() && r.hdr.Opcode != linux.FUSE_INIT {
+ if err := t.Block(conn.initializedChan); err != nil {
+ return err
+ }
+ }
+
+ // This should be the only place that invokes call() with a nil task.
+ _, err := conn.call(nil, r)
+ return err
+}
+
+// call makes a call without blocking checks.
+func (conn *connection) call(t *kernel.Task, r *Request) (*Response, error) {
if !conn.connected {
return nil, syserror.ENOTCONN
}
@@ -248,6 +315,31 @@ func (conn *connection) Call(t *kernel.Task, r *Request) (*Response, error) {
return fut.resolve(t)
}
+// Error returns the error of the FUSE call.
+func (r *Response) Error() error {
+ errno := r.hdr.Error
+ if errno >= 0 {
+ return nil
+ }
+
+ sysErrNo := syscall.Errno(-errno)
+ return error(sysErrNo)
+}
+
+// UnmarshalPayload unmarshals the response data into m.
+func (r *Response) UnmarshalPayload(m marshal.Marshallable) error {
+ hdrLen := r.hdr.SizeBytes()
+ haveDataLen := r.hdr.Len - uint32(hdrLen)
+ wantDataLen := uint32(m.SizeBytes())
+
+ if haveDataLen < wantDataLen {
+ return fmt.Errorf("payload too small. Minimum data lenth required: %d, but got data length %d", wantDataLen, haveDataLen)
+ }
+
+ m.UnmarshalUnsafe(r.data[hdrLen:])
+ return nil
+}
+
// callFuture makes a request to the server and returns a future response.
// Call resolve() when the response needs to be fulfilled.
func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, error) {
@@ -266,6 +358,11 @@ func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse,
// if there are always too many ongoing requests all the time. The
// supported maxActiveRequests setting should be really high to avoid this.
for conn.fd.numActiveRequests == conn.fd.fs.opts.maxActiveRequests {
+ if t == nil {
+ // Since there is no task that is waiting. We must error out.
+ return nil, errors.New("FUSE request queue full")
+ }
+
log.Infof("Blocking request %v from being queued. Too many active requests: %v",
r.id, conn.fd.numActiveRequests)
conn.fd.mu.Unlock()
@@ -281,19 +378,9 @@ func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse,
// callFutureLocked makes a request to the server and returns a future response.
func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureResponse, error) {
- // Check connected again holding conn.mu.
- conn.mu.Lock()
- if !conn.connected {
- conn.mu.Unlock()
- // we checked connected before,
- // this must be due to aborted connection.
- return nil, syserror.ECONNABORTED
- }
- conn.mu.Unlock()
-
conn.fd.queue.PushBack(r)
- conn.fd.numActiveRequests++
- fut := newFutureResponse(r)
+ conn.fd.numActiveRequests += 1
+ fut := newFutureResponse(r.hdr.Opcode)
conn.fd.completions[r.id] = fut
// Signal the readers that there is something to read.
@@ -301,3 +388,50 @@ func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureRes
return fut, nil
}
+
+// futureResponse represents an in-flight request, that may or may not have
+// completed yet. Convert it to a resolved Response by calling Resolve, but note
+// that this may block.
+//
+// +stateify savable
+type futureResponse struct {
+ opcode linux.FUSEOpcode
+ ch chan struct{}
+ hdr *linux.FUSEHeaderOut
+ data []byte
+}
+
+// newFutureResponse creates a future response to a FUSE request.
+func newFutureResponse(opcode linux.FUSEOpcode) *futureResponse {
+ return &futureResponse{
+ opcode: opcode,
+ ch: make(chan struct{}),
+ }
+}
+
+// resolve blocks the task until the server responds to its corresponding request,
+// then returns a resolved response.
+func (f *futureResponse) resolve(t *kernel.Task) (*Response, error) {
+ // If there is no Task associated with this request - then we don't try to resolve
+ // the response. Instead, the task writing the response (proxy to the server) will
+ // process the response on our behalf.
+ if t == nil {
+ log.Infof("fuse.Response.resolve: Not waiting on a response from server.")
+ return nil, nil
+ }
+
+ if err := t.Block(f.ch); err != nil {
+ return nil, err
+ }
+
+ return f.getResponse(), nil
+}
+
+// getResponse creates a Response from the data the futureResponse has.
+func (f *futureResponse) getResponse() *Response {
+ return &Response{
+ opcode: f.opcode,
+ hdr: *f.hdr,
+ data: f.data,
+ }
+}