diff options
Diffstat (limited to 'pkg/sentry/fsimpl/fuse/connection.go')
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection.go | 348 |
1 files changed, 241 insertions, 107 deletions
diff --git a/pkg/sentry/fsimpl/fuse/connection.go b/pkg/sentry/fsimpl/fuse/connection.go index a7402c149..eb1d1a2b7 100644 --- a/pkg/sentry/fsimpl/fuse/connection.go +++ b/pkg/sentry/fsimpl/fuse/connection.go @@ -15,17 +15,31 @@ package fuse import ( + "errors" + "fmt" "sync" + "sync/atomic" + "syscall" "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/log" + "gvisor.dev/gvisor/pkg/marshal" "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" "gvisor.dev/gvisor/pkg/sentry/vfs" "gvisor.dev/gvisor/pkg/syserror" "gvisor.dev/gvisor/pkg/waiter" ) +// maxActiveRequestsDefault is the default setting controlling the upper bound +// on the number of active requests at any given time. +const maxActiveRequestsDefault = 10000 + +// Ordinary requests have even IDs, while interrupts IDs are odd. +// Used to increment the unique ID for each FUSE request. +var reqIDStep uint64 = 2 + const ( // fuseDefaultMaxBackground is the default value for MaxBackground. fuseDefaultMaxBackground = 12 @@ -38,36 +52,43 @@ const ( fuseDefaultMaxPagesPerReq = 32 ) +// Request represents a FUSE operation request that hasn't been sent to the +// server yet. +// +// +stateify savable +type Request struct { + requestEntry + + id linux.FUSEOpID + hdr *linux.FUSEHeaderIn + data []byte +} + +// Response represents an actual response from the server, including the +// response payload. +// +// +stateify savable +type Response struct { + opcode linux.FUSEOpcode + hdr linux.FUSEHeaderOut + data []byte +} + // connection is the struct by which the sentry communicates with the FUSE server daemon. -// Lock order: -// - conn.fd.mu -// - conn.mu -// - conn.asyncMu type connection struct { fd *DeviceFD - // mu protects access to struct memebers. - mu sync.Mutex - - // attributeVersion is the version of connection's attributes. - attributeVersion uint64 - - // We target FUSE 7.23. // The following FUSE_INIT flags are currently unsupported by this implementation: + // - FUSE_ATOMIC_O_TRUNC: requires open(..., O_TRUNC) // - FUSE_EXPORT_SUPPORT + // - FUSE_HANDLE_KILLPRIV // - FUSE_POSIX_LOCKS: requires POSIX locks // - FUSE_FLOCK_LOCKS: requires POSIX locks // - FUSE_AUTO_INVAL_DATA: requires page caching eviction + // - FUSE_EXPLICIT_INVAL_DATA: requires page caching eviction // - FUSE_DO_READDIRPLUS/FUSE_READDIRPLUS_AUTO: requires FUSE_READDIRPLUS implementation // - FUSE_ASYNC_DIO - // - FUSE_PARALLEL_DIROPS (7.25) - // - FUSE_HANDLE_KILLPRIV (7.26) - // - FUSE_POSIX_ACL: affects defaultPermissions, posixACL, xattr handler (7.26) - // - FUSE_ABORT_ERROR (7.27) - // - FUSE_CACHE_SYMLINKS (7.28) - // - FUSE_NO_OPENDIR_SUPPORT (7.29) - // - FUSE_EXPLICIT_INVAL_DATA: requires page caching eviction (7.30) - // - FUSE_MAP_ALIGNMENT (7.31) + // - FUSE_POSIX_ACL: affects defaultPermissions, posixACL, xattr handler // initialized after receiving FUSE_INIT reply. // Until it's set, suspend sending FUSE requests. @@ -77,6 +98,10 @@ type connection struct { // initializedChan is used to block requests before initialization. initializedChan chan struct{} + // blocked when there are too many outstading backgrounds requests (NumBackground == MaxBackground). + // TODO(gvisor.dev/issue/3185): update the numBackground accordingly; use a channel to block. + blocked bool + // connected (connection established) when a new FUSE file system is created. // Set to false when: // umount, @@ -84,55 +109,48 @@ type connection struct { // device release. connected bool + // aborted via sysfs. + // TODO(gvisor.dev/issue/3185): abort all queued requests. + aborted bool + // connInitError if FUSE_INIT encountered error (major version mismatch). // Only set in INIT. connInitError bool // connInitSuccess if FUSE_INIT is successful. // Only set in INIT. - // Used for destory (not yet implemented). + // Used for destory. connInitSuccess bool - // aborted via sysfs, and will send ECONNABORTED to read after disconnection (instead of ENODEV). - // Set only if abortErr is true and via fuse control fs (not yet implemented). - // TODO(gvisor.dev/issue/3525): set this to true when user aborts. - aborted bool + // TODO(gvisor.dev/issue/3185): All the queue logic are working in progress. - // numWating is the number of requests waiting to be - // sent to FUSE device or being processed by FUSE daemon. - numWaiting uint32 + // NumberBackground is the number of requests in the background. + numBackground uint16 - // Terminology note: - // - // - `asyncNumMax` is the `MaxBackground` in the FUSE_INIT_IN struct. - // - // - `asyncCongestionThreshold` is the `CongestionThreshold` in the FUSE_INIT_IN struct. - // - // We call the "background" requests in unix term as async requests. - // The "async requests" in unix term is our async requests that expect a reply, - // i.e. `!requestOptions.noReply` + // congestionThreshold for NumBackground. + // Negotiated in FUSE_INIT. + congestionThreshold uint16 + + // maxBackground is the maximum number of NumBackground. + // Block connection when it is reached. + // Negotiated in FUSE_INIT. + maxBackground uint16 - // asyncMu protects the async request fields. - asyncMu sync.Mutex + // numActiveBackground is the number of requests in background and has being marked as active. + numActiveBackground uint16 - // asyncNum is the number of async requests. - // Protected by asyncMu. - asyncNum uint16 + // numWating is the number of requests waiting for completion. + numWaiting uint32 - // asyncCongestionThreshold the number of async requests. - // Negotiated in FUSE_INIT as "CongestionThreshold". - // TODO(gvisor.dev/issue/3529): add congestion control. - // Protected by asyncMu. - asyncCongestionThreshold uint16 + // TODO(gvisor.dev/issue/3185): BgQueue + // some queue for background queued requests. - // asyncNumMax is the maximum number of asyncNum. - // Connection blocks the async requests when it is reached. - // Negotiated in FUSE_INIT as "MaxBackground". - // Protected by asyncMu. - asyncNumMax uint16 + // bgLock protects: + // MaxBackground, CongestionThreshold, NumBackground, + // NumActiveBackground, BgQueue, Blocked. + bgLock sync.Mutex // maxRead is the maximum size of a read buffer in in bytes. - // Initialized from a fuse fs parameter. maxRead uint32 // maxWrite is the maximum size of a write buffer in bytes. @@ -147,20 +165,23 @@ type connection struct { // Negotiated and only set in INIT. minor uint32 - // atomicOTrunc is true when FUSE does not send a separate SETATTR request - // before open with O_TRUNC flag. - // Negotiated and only set in INIT. - atomicOTrunc bool - // asyncRead if read pages asynchronously. // Negotiated and only set in INIT. asyncRead bool + // abortErr is true if kernel need to return an unique read error after abort. + // Negotiated and only set in INIT. + abortErr bool + // writebackCache is true for write-back cache policy, // false for write-through policy. // Negotiated and only set in INIT. writebackCache bool + // cacheSymlinks if filesystem needs to cache READLINK responses in page cache. + // Negotiated and only set in INIT. + cacheSymlinks bool + // bigWrites if doing multi-page cached writes. // Negotiated and only set in INIT. bigWrites bool @@ -168,70 +189,116 @@ type connection struct { // dontMask if filestestem does not apply umask to creation modes. // Negotiated in INIT. dontMask bool - - // noOpen if FUSE server doesn't support open operation. - // This flag only influence performance, not correctness of the program. - noOpen bool } // newFUSEConnection creates a FUSE connection to fd. -func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, opts *filesystemOptions) (*connection, error) { +func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, maxInFlightRequests uint64) (*connection, error) { // Mark the device as ready so it can be used. /dev/fuse can only be used if the FD was used to // mount a FUSE filesystem. fuseFD := fd.Impl().(*DeviceFD) + fuseFD.mounted = true // Create the writeBuf for the header to be stored in. hdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) fuseFD.writeBuf = make([]byte, hdrLen) fuseFD.completions = make(map[linux.FUSEOpID]*futureResponse) - fuseFD.fullQueueCh = make(chan struct{}, opts.maxActiveRequests) + fuseFD.fullQueueCh = make(chan struct{}, maxInFlightRequests) fuseFD.writeCursor = 0 return &connection{ - fd: fuseFD, - asyncNumMax: fuseDefaultMaxBackground, - asyncCongestionThreshold: fuseDefaultCongestionThreshold, - maxRead: opts.maxRead, - maxPages: fuseDefaultMaxPagesPerReq, - initializedChan: make(chan struct{}), - connected: true, + fd: fuseFD, + maxBackground: fuseDefaultMaxBackground, + congestionThreshold: fuseDefaultCongestionThreshold, + maxPages: fuseDefaultMaxPagesPerReq, + initializedChan: make(chan struct{}), + connected: true, }, nil } -// CallAsync makes an async (aka background) request. -// It's a simple wrapper around Call(). -func (conn *connection) CallAsync(t *kernel.Task, r *Request) error { - r.async = true - _, err := conn.Call(t, r) - return err +// SetInitialized atomically sets the connection as initialized. +func (conn *connection) SetInitialized() { + // Unblock the requests sent before INIT. + close(conn.initializedChan) + + // Close the channel first to avoid the non-atomic situation + // where conn.initialized is true but there are + // tasks being blocked on the channel. + // And it prevents the newer tasks from gaining + // unnecessary higher chance to be issued before the blocked one. + + atomic.StoreInt32(&(conn.initialized), int32(1)) } -// Call makes a request to the server. -// Block before the connection is initialized. -// When the Request is FUSE_INIT, it will not be blocked before initialization. -// Task should never be nil. -// -// For a sync request, it blocks the invoking task until -// a server responds with a response. -// -// For an async request (that do not expect a response immediately), -// it returns directly unless being blocked either before initialization -// or when there are too many async requests ongoing. -// -// Example for async request: -// init, readahead, write, async read/write, fuse_notify_reply, -// non-sync release, interrupt, forget. -// -// The forget request does not have a reply, -// as documented in include/uapi/linux/fuse.h:FUSE_FORGET. +// IsInitialized atomically check if the connection is initialized. +// pairs with SetInitialized(). +func (conn *connection) Initialized() bool { + return atomic.LoadInt32(&(conn.initialized)) != 0 +} + +// NewRequest creates a new request that can be sent to the FUSE server. +func (conn *connection) NewRequest(creds *auth.Credentials, pid uint32, ino uint64, opcode linux.FUSEOpcode, payload marshal.Marshallable) (*Request, error) { + conn.fd.mu.Lock() + defer conn.fd.mu.Unlock() + conn.fd.nextOpID += linux.FUSEOpID(reqIDStep) + + hdrLen := (*linux.FUSEHeaderIn)(nil).SizeBytes() + hdr := linux.FUSEHeaderIn{ + Len: uint32(hdrLen + payload.SizeBytes()), + Opcode: opcode, + Unique: conn.fd.nextOpID, + NodeID: ino, + UID: uint32(creds.EffectiveKUID), + GID: uint32(creds.EffectiveKGID), + PID: pid, + } + + buf := make([]byte, hdr.Len) + hdr.MarshalUnsafe(buf[:hdrLen]) + payload.MarshalUnsafe(buf[hdrLen:]) + + return &Request{ + id: hdr.Unique, + hdr: &hdr, + data: buf, + }, nil +} + +// Call makes a request to the server and blocks the invoking task until a +// server responds with a response. Task should never be nil. +// Requests will not be sent before the connection is initialized. +// For async tasks, use CallAsync(). func (conn *connection) Call(t *kernel.Task, r *Request) (*Response, error) { // Block requests sent before connection is initalized. - if !conn.Initialized() && r.hdr.Opcode != linux.FUSE_INIT { + if !conn.Initialized() { if err := t.Block(conn.initializedChan); err != nil { return nil, err } } + return conn.call(t, r) +} + +// CallAsync makes an async (aka background) request. +// Those requests either do not expect a response (e.g. release) or +// the response should be handled by others (e.g. init). +// Return immediately unless the connection is blocked (before initialization). +// Async call example: init, release, forget, aio, interrupt. +// When the Request is FUSE_INIT, it will not be blocked before initialization. +func (conn *connection) CallAsync(t *kernel.Task, r *Request) error { + // Block requests sent before connection is initalized. + if !conn.Initialized() && r.hdr.Opcode != linux.FUSE_INIT { + if err := t.Block(conn.initializedChan); err != nil { + return err + } + } + + // This should be the only place that invokes call() with a nil task. + _, err := conn.call(nil, r) + return err +} + +// call makes a call without blocking checks. +func (conn *connection) call(t *kernel.Task, r *Request) (*Response, error) { if !conn.connected { return nil, syserror.ENOTCONN } @@ -248,6 +315,31 @@ func (conn *connection) Call(t *kernel.Task, r *Request) (*Response, error) { return fut.resolve(t) } +// Error returns the error of the FUSE call. +func (r *Response) Error() error { + errno := r.hdr.Error + if errno >= 0 { + return nil + } + + sysErrNo := syscall.Errno(-errno) + return error(sysErrNo) +} + +// UnmarshalPayload unmarshals the response data into m. +func (r *Response) UnmarshalPayload(m marshal.Marshallable) error { + hdrLen := r.hdr.SizeBytes() + haveDataLen := r.hdr.Len - uint32(hdrLen) + wantDataLen := uint32(m.SizeBytes()) + + if haveDataLen < wantDataLen { + return fmt.Errorf("payload too small. Minimum data lenth required: %d, but got data length %d", wantDataLen, haveDataLen) + } + + m.UnmarshalUnsafe(r.data[hdrLen:]) + return nil +} + // callFuture makes a request to the server and returns a future response. // Call resolve() when the response needs to be fulfilled. func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, error) { @@ -266,6 +358,11 @@ func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, // if there are always too many ongoing requests all the time. The // supported maxActiveRequests setting should be really high to avoid this. for conn.fd.numActiveRequests == conn.fd.fs.opts.maxActiveRequests { + if t == nil { + // Since there is no task that is waiting. We must error out. + return nil, errors.New("FUSE request queue full") + } + log.Infof("Blocking request %v from being queued. Too many active requests: %v", r.id, conn.fd.numActiveRequests) conn.fd.mu.Unlock() @@ -281,19 +378,9 @@ func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, // callFutureLocked makes a request to the server and returns a future response. func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureResponse, error) { - // Check connected again holding conn.mu. - conn.mu.Lock() - if !conn.connected { - conn.mu.Unlock() - // we checked connected before, - // this must be due to aborted connection. - return nil, syserror.ECONNABORTED - } - conn.mu.Unlock() - conn.fd.queue.PushBack(r) - conn.fd.numActiveRequests++ - fut := newFutureResponse(r) + conn.fd.numActiveRequests += 1 + fut := newFutureResponse(r.hdr.Opcode) conn.fd.completions[r.id] = fut // Signal the readers that there is something to read. @@ -301,3 +388,50 @@ func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureRes return fut, nil } + +// futureResponse represents an in-flight request, that may or may not have +// completed yet. Convert it to a resolved Response by calling Resolve, but note +// that this may block. +// +// +stateify savable +type futureResponse struct { + opcode linux.FUSEOpcode + ch chan struct{} + hdr *linux.FUSEHeaderOut + data []byte +} + +// newFutureResponse creates a future response to a FUSE request. +func newFutureResponse(opcode linux.FUSEOpcode) *futureResponse { + return &futureResponse{ + opcode: opcode, + ch: make(chan struct{}), + } +} + +// resolve blocks the task until the server responds to its corresponding request, +// then returns a resolved response. +func (f *futureResponse) resolve(t *kernel.Task) (*Response, error) { + // If there is no Task associated with this request - then we don't try to resolve + // the response. Instead, the task writing the response (proxy to the server) will + // process the response on our behalf. + if t == nil { + log.Infof("fuse.Response.resolve: Not waiting on a response from server.") + return nil, nil + } + + if err := t.Block(f.ch); err != nil { + return nil, err + } + + return f.getResponse(), nil +} + +// getResponse creates a Response from the data the futureResponse has. +func (f *futureResponse) getResponse() *Response { + return &Response{ + opcode: f.opcode, + hdr: *f.hdr, + data: f.data, + } +} |