diff options
Diffstat (limited to 'pkg/sentry/fsimpl/fuse')
-rw-r--r-- | pkg/sentry/fsimpl/fuse/BUILD | 32 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection.go | 365 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection_control.go (renamed from pkg/sentry/fsimpl/fuse/init.go) | 171 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection_test.go | 117 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/dev.go | 208 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/dev_test.go | 105 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/directory.go | 105 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/file.go | 133 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/fusefs.go | 586 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/read_write.go | 244 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/regular_file.go | 230 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/request_response.go | 229 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/utils_test.go | 132 |
13 files changed, 2148 insertions, 509 deletions
diff --git a/pkg/sentry/fsimpl/fuse/BUILD b/pkg/sentry/fsimpl/fuse/BUILD index 999111deb..2158b1bbc 100644 --- a/pkg/sentry/fsimpl/fuse/BUILD +++ b/pkg/sentry/fsimpl/fuse/BUILD @@ -15,21 +15,42 @@ go_template_instance( }, ) +go_template_instance( + name = "inode_refs", + out = "inode_refs.go", + package = "fuse", + prefix = "inode", + template = "//pkg/refsvfs2:refs_template", + types = { + "T": "inode", + }, +) + go_library( name = "fuse", srcs = [ "connection.go", + "connection_control.go", "dev.go", + "directory.go", + "file.go", "fusefs.go", - "init.go", + "inode_refs.go", + "read_write.go", "register.go", + "regular_file.go", "request_list.go", + "request_response.go", ], visibility = ["//pkg/sentry:internal"], deps = [ "//pkg/abi/linux", "//pkg/context", "//pkg/log", + "//pkg/marshal", + "//pkg/refs", + "//pkg/refsvfs2", + "//pkg/safemem", "//pkg/sentry/fsimpl/devtmpfs", "//pkg/sentry/fsimpl/kernfs", "//pkg/sentry/kernel", @@ -39,7 +60,6 @@ go_library( "//pkg/syserror", "//pkg/usermem", "//pkg/waiter", - "//tools/go_marshal/marshal", "@org_golang_x_sys//unix:go_default_library", ], ) @@ -47,10 +67,15 @@ go_library( go_test( name = "fuse_test", size = "small", - srcs = ["dev_test.go"], + srcs = [ + "connection_test.go", + "dev_test.go", + "utils_test.go", + ], library = ":fuse", deps = [ "//pkg/abi/linux", + "//pkg/marshal", "//pkg/sentry/fsimpl/testutil", "//pkg/sentry/kernel", "//pkg/sentry/kernel/auth", @@ -58,6 +83,5 @@ go_test( "//pkg/syserror", "//pkg/usermem", "//pkg/waiter", - "//tools/go_marshal/marshal", ], ) diff --git a/pkg/sentry/fsimpl/fuse/connection.go b/pkg/sentry/fsimpl/fuse/connection.go index 6df2728ab..8ccda1264 100644 --- a/pkg/sentry/fsimpl/fuse/connection.go +++ b/pkg/sentry/fsimpl/fuse/connection.go @@ -15,31 +15,17 @@ package fuse import ( - "errors" - "fmt" "sync" - "sync/atomic" - "syscall" "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/sentry/kernel" - "gvisor.dev/gvisor/pkg/sentry/kernel/auth" "gvisor.dev/gvisor/pkg/sentry/vfs" "gvisor.dev/gvisor/pkg/syserror" "gvisor.dev/gvisor/pkg/waiter" - "gvisor.dev/gvisor/tools/go_marshal/marshal" ) -// maxActiveRequestsDefault is the default setting controlling the upper bound -// on the number of active requests at any given time. -const maxActiveRequestsDefault = 10000 - -// Ordinary requests have even IDs, while interrupts IDs are odd. -// Used to increment the unique ID for each FUSE request. -var reqIDStep uint64 = 2 - const ( // fuseDefaultMaxBackground is the default value for MaxBackground. fuseDefaultMaxBackground = 12 @@ -52,43 +38,39 @@ const ( fuseDefaultMaxPagesPerReq = 32 ) -// Request represents a FUSE operation request that hasn't been sent to the -// server yet. +// connection is the struct by which the sentry communicates with the FUSE server daemon. // -// +stateify savable -type Request struct { - requestEntry - - id linux.FUSEOpID - hdr *linux.FUSEHeaderIn - data []byte -} - -// Response represents an actual response from the server, including the -// response payload. +// Lock order: +// - conn.fd.mu +// - conn.mu +// - conn.asyncMu // // +stateify savable -type Response struct { - opcode linux.FUSEOpcode - hdr linux.FUSEHeaderOut - data []byte -} - -// connection is the struct by which the sentry communicates with the FUSE server daemon. type connection struct { fd *DeviceFD + // mu protects access to struct memebers. + mu sync.Mutex `state:"nosave"` + + // attributeVersion is the version of connection's attributes. + attributeVersion uint64 + + // We target FUSE 7.23. // The following FUSE_INIT flags are currently unsupported by this implementation: - // - FUSE_ATOMIC_O_TRUNC: requires open(..., O_TRUNC) // - FUSE_EXPORT_SUPPORT - // - FUSE_HANDLE_KILLPRIV // - FUSE_POSIX_LOCKS: requires POSIX locks // - FUSE_FLOCK_LOCKS: requires POSIX locks // - FUSE_AUTO_INVAL_DATA: requires page caching eviction - // - FUSE_EXPLICIT_INVAL_DATA: requires page caching eviction // - FUSE_DO_READDIRPLUS/FUSE_READDIRPLUS_AUTO: requires FUSE_READDIRPLUS implementation // - FUSE_ASYNC_DIO - // - FUSE_POSIX_ACL: affects defaultPermissions, posixACL, xattr handler + // - FUSE_PARALLEL_DIROPS (7.25) + // - FUSE_HANDLE_KILLPRIV (7.26) + // - FUSE_POSIX_ACL: affects defaultPermissions, posixACL, xattr handler (7.26) + // - FUSE_ABORT_ERROR (7.27) + // - FUSE_CACHE_SYMLINKS (7.28) + // - FUSE_NO_OPENDIR_SUPPORT (7.29) + // - FUSE_EXPLICIT_INVAL_DATA: requires page caching eviction (7.30) + // - FUSE_MAP_ALIGNMENT (7.31) // initialized after receiving FUSE_INIT reply. // Until it's set, suspend sending FUSE requests. @@ -96,11 +78,7 @@ type connection struct { initialized int32 // initializedChan is used to block requests before initialization. - initializedChan chan struct{} - - // blocked when there are too many outstading backgrounds requests (NumBackground == MaxBackground). - // TODO(gvisor.dev/issue/3185): update the numBackground accordingly; use a channel to block. - blocked bool + initializedChan chan struct{} `state:".(bool)"` // connected (connection established) when a new FUSE file system is created. // Set to false when: @@ -109,48 +87,55 @@ type connection struct { // device release. connected bool - // aborted via sysfs. - // TODO(gvisor.dev/issue/3185): abort all queued requests. - aborted bool - // connInitError if FUSE_INIT encountered error (major version mismatch). // Only set in INIT. connInitError bool // connInitSuccess if FUSE_INIT is successful. // Only set in INIT. - // Used for destory. + // Used for destory (not yet implemented). connInitSuccess bool - // TODO(gvisor.dev/issue/3185): All the queue logic are working in progress. - - // NumberBackground is the number of requests in the background. - numBackground uint16 + // aborted via sysfs, and will send ECONNABORTED to read after disconnection (instead of ENODEV). + // Set only if abortErr is true and via fuse control fs (not yet implemented). + // TODO(gvisor.dev/issue/3525): set this to true when user aborts. + aborted bool - // congestionThreshold for NumBackground. - // Negotiated in FUSE_INIT. - congestionThreshold uint16 + // numWating is the number of requests waiting to be + // sent to FUSE device or being processed by FUSE daemon. + numWaiting uint32 - // maxBackground is the maximum number of NumBackground. - // Block connection when it is reached. - // Negotiated in FUSE_INIT. - maxBackground uint16 + // Terminology note: + // + // - `asyncNumMax` is the `MaxBackground` in the FUSE_INIT_IN struct. + // + // - `asyncCongestionThreshold` is the `CongestionThreshold` in the FUSE_INIT_IN struct. + // + // We call the "background" requests in unix term as async requests. + // The "async requests" in unix term is our async requests that expect a reply, + // i.e. `!request.noReply` - // numActiveBackground is the number of requests in background and has being marked as active. - numActiveBackground uint16 + // asyncMu protects the async request fields. + asyncMu sync.Mutex `state:"nosave"` - // numWating is the number of requests waiting for completion. - numWaiting uint32 + // asyncNum is the number of async requests. + // Protected by asyncMu. + asyncNum uint16 - // TODO(gvisor.dev/issue/3185): BgQueue - // some queue for background queued requests. + // asyncCongestionThreshold the number of async requests. + // Negotiated in FUSE_INIT as "CongestionThreshold". + // TODO(gvisor.dev/issue/3529): add congestion control. + // Protected by asyncMu. + asyncCongestionThreshold uint16 - // bgLock protects: - // MaxBackground, CongestionThreshold, NumBackground, - // NumActiveBackground, BgQueue, Blocked. - bgLock sync.Mutex + // asyncNumMax is the maximum number of asyncNum. + // Connection blocks the async requests when it is reached. + // Negotiated in FUSE_INIT as "MaxBackground". + // Protected by asyncMu. + asyncNumMax uint16 // maxRead is the maximum size of a read buffer in in bytes. + // Initialized from a fuse fs parameter. maxRead uint32 // maxWrite is the maximum size of a write buffer in bytes. @@ -165,23 +150,20 @@ type connection struct { // Negotiated and only set in INIT. minor uint32 - // asyncRead if read pages asynchronously. + // atomicOTrunc is true when FUSE does not send a separate SETATTR request + // before open with O_TRUNC flag. // Negotiated and only set in INIT. - asyncRead bool + atomicOTrunc bool - // abortErr is true if kernel need to return an unique read error after abort. + // asyncRead if read pages asynchronously. // Negotiated and only set in INIT. - abortErr bool + asyncRead bool // writebackCache is true for write-back cache policy, // false for write-through policy. // Negotiated and only set in INIT. writebackCache bool - // cacheSymlinks if filesystem needs to cache READLINK responses in page cache. - // Negotiated and only set in INIT. - cacheSymlinks bool - // bigWrites if doing multi-page cached writes. // Negotiated and only set in INIT. bigWrites bool @@ -189,116 +171,86 @@ type connection struct { // dontMask if filestestem does not apply umask to creation modes. // Negotiated in INIT. dontMask bool + + // noOpen if FUSE server doesn't support open operation. + // This flag only influence performance, not correctness of the program. + noOpen bool +} + +func (conn *connection) saveInitializedChan() bool { + select { + case <-conn.initializedChan: + return true // Closed. + default: + return false // Not closed. + } +} + +func (conn *connection) loadInitializedChan(closed bool) { + conn.initializedChan = make(chan struct{}, 1) + if closed { + close(conn.initializedChan) + } } // newFUSEConnection creates a FUSE connection to fd. -func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, maxInFlightRequests uint64) (*connection, error) { +func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, opts *filesystemOptions) (*connection, error) { // Mark the device as ready so it can be used. /dev/fuse can only be used if the FD was used to // mount a FUSE filesystem. fuseFD := fd.Impl().(*DeviceFD) - fuseFD.mounted = true // Create the writeBuf for the header to be stored in. hdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) fuseFD.writeBuf = make([]byte, hdrLen) fuseFD.completions = make(map[linux.FUSEOpID]*futureResponse) - fuseFD.fullQueueCh = make(chan struct{}, maxInFlightRequests) + fuseFD.fullQueueCh = make(chan struct{}, opts.maxActiveRequests) fuseFD.writeCursor = 0 return &connection{ - fd: fuseFD, - maxBackground: fuseDefaultMaxBackground, - congestionThreshold: fuseDefaultCongestionThreshold, - maxPages: fuseDefaultMaxPagesPerReq, - initializedChan: make(chan struct{}), - connected: true, - }, nil -} - -// SetInitialized atomically sets the connection as initialized. -func (conn *connection) SetInitialized() { - // Unblock the requests sent before INIT. - close(conn.initializedChan) - - // Close the channel first to avoid the non-atomic situation - // where conn.initialized is true but there are - // tasks being blocked on the channel. - // And it prevents the newer tasks from gaining - // unnecessary higher chance to be issued before the blocked one. - - atomic.StoreInt32(&(conn.initialized), int32(1)) -} - -// IsInitialized atomically check if the connection is initialized. -// pairs with SetInitialized(). -func (conn *connection) Initialized() bool { - return atomic.LoadInt32(&(conn.initialized)) != 0 -} - -// NewRequest creates a new request that can be sent to the FUSE server. -func (conn *connection) NewRequest(creds *auth.Credentials, pid uint32, ino uint64, opcode linux.FUSEOpcode, payload marshal.Marshallable) (*Request, error) { - conn.fd.mu.Lock() - defer conn.fd.mu.Unlock() - conn.fd.nextOpID += linux.FUSEOpID(reqIDStep) - - hdrLen := (*linux.FUSEHeaderIn)(nil).SizeBytes() - hdr := linux.FUSEHeaderIn{ - Len: uint32(hdrLen + payload.SizeBytes()), - Opcode: opcode, - Unique: conn.fd.nextOpID, - NodeID: ino, - UID: uint32(creds.EffectiveKUID), - GID: uint32(creds.EffectiveKGID), - PID: pid, - } - - buf := make([]byte, hdr.Len) - hdr.MarshalUnsafe(buf[:hdrLen]) - payload.MarshalUnsafe(buf[hdrLen:]) - - return &Request{ - id: hdr.Unique, - hdr: &hdr, - data: buf, + fd: fuseFD, + asyncNumMax: fuseDefaultMaxBackground, + asyncCongestionThreshold: fuseDefaultCongestionThreshold, + maxRead: opts.maxRead, + maxPages: fuseDefaultMaxPagesPerReq, + initializedChan: make(chan struct{}), + connected: true, }, nil } -// Call makes a request to the server and blocks the invoking task until a -// server responds with a response. Task should never be nil. -// Requests will not be sent before the connection is initialized. -// For async tasks, use CallAsync(). -func (conn *connection) Call(t *kernel.Task, r *Request) (*Response, error) { - // Block requests sent before connection is initalized. - if !conn.Initialized() { - if err := t.Block(conn.initializedChan); err != nil { - return nil, err - } - } - - return conn.call(t, r) +// CallAsync makes an async (aka background) request. +// It's a simple wrapper around Call(). +func (conn *connection) CallAsync(t *kernel.Task, r *Request) error { + r.async = true + _, err := conn.Call(t, r) + return err } -// CallAsync makes an async (aka background) request. -// Those requests either do not expect a response (e.g. release) or -// the response should be handled by others (e.g. init). -// Return immediately unless the connection is blocked (before initialization). -// Async call example: init, release, forget, aio, interrupt. +// Call makes a request to the server. +// Block before the connection is initialized. // When the Request is FUSE_INIT, it will not be blocked before initialization. -func (conn *connection) CallAsync(t *kernel.Task, r *Request) error { +// Task should never be nil. +// +// For a sync request, it blocks the invoking task until +// a server responds with a response. +// +// For an async request (that do not expect a response immediately), +// it returns directly unless being blocked either before initialization +// or when there are too many async requests ongoing. +// +// Example for async request: +// init, readahead, write, async read/write, fuse_notify_reply, +// non-sync release, interrupt, forget. +// +// The forget request does not have a reply, +// as documented in include/uapi/linux/fuse.h:FUSE_FORGET. +func (conn *connection) Call(t *kernel.Task, r *Request) (*Response, error) { // Block requests sent before connection is initalized. if !conn.Initialized() && r.hdr.Opcode != linux.FUSE_INIT { if err := t.Block(conn.initializedChan); err != nil { - return err + return nil, err } } - // This should be the only place that invokes call() with a nil task. - _, err := conn.call(nil, r) - return err -} - -// call makes a call without blocking checks. -func (conn *connection) call(t *kernel.Task, r *Request) (*Response, error) { if !conn.connected { return nil, syserror.ENOTCONN } @@ -315,31 +267,6 @@ func (conn *connection) call(t *kernel.Task, r *Request) (*Response, error) { return fut.resolve(t) } -// Error returns the error of the FUSE call. -func (r *Response) Error() error { - errno := r.hdr.Error - if errno >= 0 { - return nil - } - - sysErrNo := syscall.Errno(-errno) - return error(sysErrNo) -} - -// UnmarshalPayload unmarshals the response data into m. -func (r *Response) UnmarshalPayload(m marshal.Marshallable) error { - hdrLen := r.hdr.SizeBytes() - haveDataLen := r.hdr.Len - uint32(hdrLen) - wantDataLen := uint32(m.SizeBytes()) - - if haveDataLen < wantDataLen { - return fmt.Errorf("payload too small. Minimum data lenth required: %d, but got data length %d", wantDataLen, haveDataLen) - } - - m.UnmarshalUnsafe(r.data[hdrLen:]) - return nil -} - // callFuture makes a request to the server and returns a future response. // Call resolve() when the response needs to be fulfilled. func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, error) { @@ -358,11 +285,6 @@ func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, // if there are always too many ongoing requests all the time. The // supported maxActiveRequests setting should be really high to avoid this. for conn.fd.numActiveRequests == conn.fd.fs.opts.maxActiveRequests { - if t == nil { - // Since there is no task that is waiting. We must error out. - return nil, errors.New("FUSE request queue full") - } - log.Infof("Blocking request %v from being queued. Too many active requests: %v", r.id, conn.fd.numActiveRequests) conn.fd.mu.Unlock() @@ -378,9 +300,19 @@ func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, // callFutureLocked makes a request to the server and returns a future response. func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureResponse, error) { + // Check connected again holding conn.mu. + conn.mu.Lock() + if !conn.connected { + conn.mu.Unlock() + // we checked connected before, + // this must be due to aborted connection. + return nil, syserror.ECONNABORTED + } + conn.mu.Unlock() + conn.fd.queue.PushBack(r) - conn.fd.numActiveRequests += 1 - fut := newFutureResponse(r.hdr.Opcode) + conn.fd.numActiveRequests++ + fut := newFutureResponse(r) conn.fd.completions[r.id] = fut // Signal the readers that there is something to read. @@ -388,50 +320,3 @@ func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureRes return fut, nil } - -// futureResponse represents an in-flight request, that may or may not have -// completed yet. Convert it to a resolved Response by calling Resolve, but note -// that this may block. -// -// +stateify savable -type futureResponse struct { - opcode linux.FUSEOpcode - ch chan struct{} - hdr *linux.FUSEHeaderOut - data []byte -} - -// newFutureResponse creates a future response to a FUSE request. -func newFutureResponse(opcode linux.FUSEOpcode) *futureResponse { - return &futureResponse{ - opcode: opcode, - ch: make(chan struct{}), - } -} - -// resolve blocks the task until the server responds to its corresponding request, -// then returns a resolved response. -func (f *futureResponse) resolve(t *kernel.Task) (*Response, error) { - // If there is no Task associated with this request - then we don't try to resolve - // the response. Instead, the task writing the response (proxy to the server) will - // process the response on our behalf. - if t == nil { - log.Infof("fuse.Response.resolve: Not waiting on a response from server.") - return nil, nil - } - - if err := t.Block(f.ch); err != nil { - return nil, err - } - - return f.getResponse(), nil -} - -// getResponse creates a Response from the data the futureResponse has. -func (f *futureResponse) getResponse() *Response { - return &Response{ - opcode: f.opcode, - hdr: *f.hdr, - data: f.data, - } -} diff --git a/pkg/sentry/fsimpl/fuse/init.go b/pkg/sentry/fsimpl/fuse/connection_control.go index 779c2bd3f..bfde78559 100644 --- a/pkg/sentry/fsimpl/fuse/init.go +++ b/pkg/sentry/fsimpl/fuse/connection_control.go @@ -15,7 +15,11 @@ package fuse import ( + "sync/atomic" + "syscall" + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/kernel/auth" ) @@ -29,9 +33,10 @@ const ( // Follow the same behavior as unix fuse implementation. fuseMaxTimeGranNs = 1000000000 - // Minimum value for MaxWrite. + // Minimum value for MaxWrite and MaxRead. // Follow the same behavior as unix fuse implementation. fuseMinMaxWrite = 4096 + fuseMinMaxRead = 4096 // Temporary default value for max readahead, 128kb. fuseDefaultMaxReadahead = 131072 @@ -49,6 +54,26 @@ var ( MaxUserCongestionThreshold uint16 = fuseDefaultCongestionThreshold ) +// SetInitialized atomically sets the connection as initialized. +func (conn *connection) SetInitialized() { + // Unblock the requests sent before INIT. + close(conn.initializedChan) + + // Close the channel first to avoid the non-atomic situation + // where conn.initialized is true but there are + // tasks being blocked on the channel. + // And it prevents the newer tasks from gaining + // unnecessary higher chance to be issued before the blocked one. + + atomic.StoreInt32(&(conn.initialized), int32(1)) +} + +// IsInitialized atomically check if the connection is initialized. +// pairs with SetInitialized(). +func (conn *connection) Initialized() bool { + return atomic.LoadInt32(&(conn.initialized)) != 0 +} + // InitSend sends a FUSE_INIT request. func (conn *connection) InitSend(creds *auth.Credentials, pid uint32) error { in := linux.FUSEInitIn{ @@ -70,29 +95,31 @@ func (conn *connection) InitSend(creds *auth.Credentials, pid uint32) error { } // InitRecv receives a FUSE_INIT reply and process it. +// +// Preconditions: conn.asyncMu must not be held if minor verion is newer than 13. func (conn *connection) InitRecv(res *Response, hasSysAdminCap bool) error { if err := res.Error(); err != nil { return err } - var out linux.FUSEInitOut - if err := res.UnmarshalPayload(&out); err != nil { + initRes := fuseInitRes{initLen: res.DataLen()} + if err := res.UnmarshalPayload(&initRes); err != nil { return err } - return conn.initProcessReply(&out, hasSysAdminCap) + return conn.initProcessReply(&initRes.initOut, hasSysAdminCap) } // Process the FUSE_INIT reply from the FUSE server. +// It tries to acquire the conn.asyncMu lock if minor version is newer than 13. func (conn *connection) initProcessReply(out *linux.FUSEInitOut, hasSysAdminCap bool) error { + // No matter error or not, always set initialzied. + // to unblock the blocked requests. + defer conn.SetInitialized() + // No support for old major fuse versions. if out.Major != linux.FUSE_KERNEL_VERSION { conn.connInitError = true - - // Set the connection as initialized and unblock the blocked requests - // (i.e. return error for them). - conn.SetInitialized() - return nil } @@ -100,29 +127,14 @@ func (conn *connection) initProcessReply(out *linux.FUSEInitOut, hasSysAdminCap conn.connInitSuccess = true conn.minor = out.Minor - // No support for limits before minor version 13. - if out.Minor >= 13 { - conn.bgLock.Lock() - - if out.MaxBackground > 0 { - conn.maxBackground = out.MaxBackground - - if !hasSysAdminCap && - conn.maxBackground > MaxUserBackgroundRequest { - conn.maxBackground = MaxUserBackgroundRequest - } - } - - if out.CongestionThreshold > 0 { - conn.congestionThreshold = out.CongestionThreshold - - if !hasSysAdminCap && - conn.congestionThreshold > MaxUserCongestionThreshold { - conn.congestionThreshold = MaxUserCongestionThreshold - } - } - - conn.bgLock.Unlock() + // No support for negotiating MaxWrite before minor version 5. + if out.Minor >= 5 { + conn.maxWrite = out.MaxWrite + } else { + conn.maxWrite = fuseMinMaxWrite + } + if conn.maxWrite < fuseMinMaxWrite { + conn.maxWrite = fuseMinMaxWrite } // No support for the following flags before minor version 6. @@ -131,8 +143,6 @@ func (conn *connection) initProcessReply(out *linux.FUSEInitOut, hasSysAdminCap conn.bigWrites = out.Flags&linux.FUSE_BIG_WRITES != 0 conn.dontMask = out.Flags&linux.FUSE_DONT_MASK != 0 conn.writebackCache = out.Flags&linux.FUSE_WRITEBACK_CACHE != 0 - conn.cacheSymlinks = out.Flags&linux.FUSE_CACHE_SYMLINKS != 0 - conn.abortErr = out.Flags&linux.FUSE_ABORT_ERROR != 0 // TODO(gvisor.dev/issue/3195): figure out how to use TimeGran (0 < TimeGran <= fuseMaxTimeGranNs). @@ -148,19 +158,90 @@ func (conn *connection) initProcessReply(out *linux.FUSEInitOut, hasSysAdminCap } } - // No support for negotiating MaxWrite before minor version 5. - if out.Minor >= 5 { - conn.maxWrite = out.MaxWrite - } else { - conn.maxWrite = fuseMinMaxWrite + // No support for limits before minor version 13. + if out.Minor >= 13 { + conn.asyncMu.Lock() + + if out.MaxBackground > 0 { + conn.asyncNumMax = out.MaxBackground + + if !hasSysAdminCap && + conn.asyncNumMax > MaxUserBackgroundRequest { + conn.asyncNumMax = MaxUserBackgroundRequest + } + } + + if out.CongestionThreshold > 0 { + conn.asyncCongestionThreshold = out.CongestionThreshold + + if !hasSysAdminCap && + conn.asyncCongestionThreshold > MaxUserCongestionThreshold { + conn.asyncCongestionThreshold = MaxUserCongestionThreshold + } + } + + conn.asyncMu.Unlock() } - if conn.maxWrite < fuseMinMaxWrite { - conn.maxWrite = fuseMinMaxWrite + + return nil +} + +// Abort this FUSE connection. +// It tries to acquire conn.fd.mu, conn.lock, conn.bgLock in order. +// All possible requests waiting or blocking will be aborted. +// +// Preconditions: conn.fd.mu is locked. +func (conn *connection) Abort(ctx context.Context) { + conn.mu.Lock() + conn.asyncMu.Lock() + + if !conn.connected { + conn.asyncMu.Unlock() + conn.mu.Unlock() + conn.fd.mu.Unlock() + return } - // Set connection as initialized and unblock the requests - // issued before init. - conn.SetInitialized() + conn.connected = false - return nil + // Empty the `fd.queue` that holds the requests + // not yet read by the FUSE daemon yet. + // These are a subset of the requests in `fuse.completion` map. + for !conn.fd.queue.Empty() { + req := conn.fd.queue.Front() + conn.fd.queue.Remove(req) + } + + var terminate []linux.FUSEOpID + + // 2. Collect the requests have not been sent to FUSE daemon, + // or have not received a reply. + for unique := range conn.fd.completions { + terminate = append(terminate, unique) + } + + // Release locks to avoid deadlock. + conn.asyncMu.Unlock() + conn.mu.Unlock() + + // 1. The requets blocked before initialization. + // Will reach call() `connected` check and return. + if !conn.Initialized() { + conn.SetInitialized() + } + + // 2. Terminate the requests collected above. + // Set ECONNABORTED error. + // sendError() will remove them from `fd.completion` map. + // Will enter the path of a normally received error. + for _, toTerminate := range terminate { + conn.fd.sendError(ctx, -int32(syscall.ECONNABORTED), toTerminate) + } + + // 3. The requests not yet written to FUSE device. + // Early terminate. + // Will reach callFutureLocked() `connected` check and return. + close(conn.fd.fullQueueCh) + + // TODO(gvisor.dev/issue/3528): Forget all pending forget reqs. } diff --git a/pkg/sentry/fsimpl/fuse/connection_test.go b/pkg/sentry/fsimpl/fuse/connection_test.go new file mode 100644 index 000000000..91d16c1cf --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/connection_test.go @@ -0,0 +1,117 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "math/rand" + "syscall" + "testing" + + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/syserror" +) + +// TestConnectionInitBlock tests if initialization +// correctly blocks and unblocks the connection. +// Since it's unfeasible to test kernelTask.Block() in unit test, +// the code in Call() are not tested here. +func TestConnectionInitBlock(t *testing.T) { + s := setup(t) + defer s.Destroy() + + k := kernel.KernelFromContext(s.Ctx) + + conn, _, err := newTestConnection(s, k, maxActiveRequestsDefault) + if err != nil { + t.Fatalf("newTestConnection: %v", err) + } + + select { + case <-conn.initializedChan: + t.Fatalf("initializedChan should be blocking before SetInitialized") + default: + } + + conn.SetInitialized() + + select { + case <-conn.initializedChan: + default: + t.Fatalf("initializedChan should not be blocking after SetInitialized") + } +} + +func TestConnectionAbort(t *testing.T) { + s := setup(t) + defer s.Destroy() + + k := kernel.KernelFromContext(s.Ctx) + creds := auth.CredentialsFromContext(s.Ctx) + task := kernel.TaskFromContext(s.Ctx) + + const numRequests uint64 = 256 + + conn, _, err := newTestConnection(s, k, numRequests) + if err != nil { + t.Fatalf("newTestConnection: %v", err) + } + + testObj := &testPayload{ + data: rand.Uint32(), + } + + var futNormal []*futureResponse + + for i := 0; i < int(numRequests); i++ { + req, err := conn.NewRequest(creds, uint32(i), uint64(i), 0, testObj) + if err != nil { + t.Fatalf("NewRequest creation failed: %v", err) + } + fut, err := conn.callFutureLocked(task, req) + if err != nil { + t.Fatalf("callFutureLocked failed: %v", err) + } + futNormal = append(futNormal, fut) + } + + conn.Abort(s.Ctx) + + // Abort should unblock the initialization channel. + // Note: no test requests are actually blocked on `conn.initializedChan`. + select { + case <-conn.initializedChan: + default: + t.Fatalf("initializedChan should not be blocking after SetInitialized") + } + + // Abort will return ECONNABORTED error to unblocked requests. + for _, fut := range futNormal { + if fut.getResponse().hdr.Error != -int32(syscall.ECONNABORTED) { + t.Fatalf("Incorrect error code received for aborted connection: %v", fut.getResponse().hdr.Error) + } + } + + // After abort, Call() should return directly with ENOTCONN. + req, err := conn.NewRequest(creds, 0, 0, 0, testObj) + if err != nil { + t.Fatalf("NewRequest creation failed: %v", err) + } + _, err = conn.Call(task, req) + if err != syserror.ENOTCONN { + t.Fatalf("Incorrect error code received for Call() after connection aborted") + } + +} diff --git a/pkg/sentry/fsimpl/fuse/dev.go b/pkg/sentry/fsimpl/fuse/dev.go index e522ff9a0..1b86a4b4c 100644 --- a/pkg/sentry/fsimpl/fuse/dev.go +++ b/pkg/sentry/fsimpl/fuse/dev.go @@ -19,7 +19,6 @@ import ( "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/context" - "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/sentry/kernel" "gvisor.dev/gvisor/pkg/sentry/kernel/auth" "gvisor.dev/gvisor/pkg/sentry/vfs" @@ -32,6 +31,8 @@ import ( const fuseDevMinor = 229 // fuseDevice implements vfs.Device for /dev/fuse. +// +// +stateify savable type fuseDevice struct{} // Open implements vfs.Device.Open. @@ -50,15 +51,14 @@ func (fuseDevice) Open(ctx context.Context, mnt *vfs.Mount, vfsd *vfs.Dentry, op } // DeviceFD implements vfs.FileDescriptionImpl for /dev/fuse. +// +// +stateify savable type DeviceFD struct { vfsfd vfs.FileDescription vfs.FileDescriptionDefaultImpl vfs.DentryMetadataFileDescriptionImpl vfs.NoLockFD - // mounted specifies whether a FUSE filesystem was mounted using the DeviceFD. - mounted bool - // nextOpID is used to create new requests. nextOpID linux.FUSEOpID @@ -83,7 +83,7 @@ type DeviceFD struct { writeCursorFR *futureResponse // mu protects all the queues, maps, buffers and cursors and nextOpID. - mu sync.Mutex + mu sync.Mutex `state:"nosave"` // waitQueue is used to notify interested parties when the device becomes // readable or writable. @@ -92,21 +92,36 @@ type DeviceFD struct { // fullQueueCh is a channel used to synchronize the readers with the writers. // Writers (inbound requests to the filesystem) block if there are too many // unprocessed in-flight requests. - fullQueueCh chan struct{} + fullQueueCh chan struct{} `state:".(int)"` // fs is the FUSE filesystem that this FD is being used for. fs *filesystem } +func (fd *DeviceFD) saveFullQueueCh() int { + return cap(fd.fullQueueCh) +} + +func (fd *DeviceFD) loadFullQueueCh(capacity int) { + fd.fullQueueCh = make(chan struct{}, capacity) +} + // Release implements vfs.FileDescriptionImpl.Release. -func (fd *DeviceFD) Release(context.Context) { - fd.fs.conn.connected = false +func (fd *DeviceFD) Release(ctx context.Context) { + if fd.fs != nil { + fd.fs.conn.mu.Lock() + fd.fs.conn.connected = false + fd.fs.conn.mu.Unlock() + + fd.fs.VFSFilesystem().DecRef(ctx) + fd.fs = nil + } } // PRead implements vfs.FileDescriptionImpl.PRead. func (fd *DeviceFD) PRead(ctx context.Context, dst usermem.IOSequence, offset int64, opts vfs.ReadOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } @@ -116,10 +131,16 @@ func (fd *DeviceFD) PRead(ctx context.Context, dst usermem.IOSequence, offset in // Read implements vfs.FileDescriptionImpl.Read. func (fd *DeviceFD) Read(ctx context.Context, dst usermem.IOSequence, opts vfs.ReadOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } + // Return ENODEV if the filesystem is umounted. + if fd.fs.umounted { + // TODO(gvisor.dev/issue/3525): return ECONNABORTED if aborted via fuse control fs. + return 0, syserror.ENODEV + } + // We require that any Read done on this filesystem have a sane minimum // read buffer. It must have the capacity for the fixed parts of any request // header (Linux uses the request header and the FUSEWriteIn header for this @@ -143,58 +164,82 @@ func (fd *DeviceFD) Read(ctx context.Context, dst usermem.IOSequence, opts vfs.R } // readLocked implements the reading of the fuse device while locked with DeviceFD.mu. +// +// Preconditions: dst is large enough for any reasonable request. func (fd *DeviceFD) readLocked(ctx context.Context, dst usermem.IOSequence, opts vfs.ReadOptions) (int64, error) { - if fd.queue.Empty() { - return 0, syserror.ErrWouldBlock - } + var req *Request - var readCursor uint32 - var bytesRead int64 - for { - req := fd.queue.Front() - if dst.NumBytes() < int64(req.hdr.Len) { - // The request is too large. Cannot process it. All requests must be smaller than the - // negotiated size as specified by Connection.MaxWrite set as part of the FUSE_INIT - // handshake. - errno := -int32(syscall.EIO) - if req.hdr.Opcode == linux.FUSE_SETXATTR { - errno = -int32(syscall.E2BIG) - } + // Find the first valid request. + // For the normal case this loop only execute once. + for !fd.queue.Empty() { + req = fd.queue.Front() - // Return the error to the calling task. - if err := fd.sendError(ctx, errno, req); err != nil { - return 0, err - } + if int64(req.hdr.Len)+int64(len(req.payload)) <= dst.NumBytes() { + break + } - // We're done with this request. - fd.queue.Remove(req) + // The request is too large. Cannot process it. All requests must be smaller than the + // negotiated size as specified by Connection.MaxWrite set as part of the FUSE_INIT + // handshake. + errno := -int32(syscall.EIO) + if req.hdr.Opcode == linux.FUSE_SETXATTR { + errno = -int32(syscall.E2BIG) + } - // Restart the read as this request was invalid. - log.Warningf("fuse.DeviceFD.Read: request found was too large. Restarting read.") - return fd.readLocked(ctx, dst, opts) + // Return the error to the calling task. + if err := fd.sendError(ctx, errno, req.hdr.Unique); err != nil { + return 0, err } - n, err := dst.CopyOut(ctx, req.data[readCursor:]) + // We're done with this request. + fd.queue.Remove(req) + req = nil + } + + if req == nil { + return 0, syserror.ErrWouldBlock + } + + // We already checked the size: dst must be able to fit the whole request. + // Now we write the marshalled header, the payload, + // and the potential additional payload + // to the user memory IOSequence. + + n, err := dst.CopyOut(ctx, req.data) + if err != nil { + return 0, err + } + if n != len(req.data) { + return 0, syserror.EIO + } + + if req.hdr.Opcode == linux.FUSE_WRITE { + written, err := dst.DropFirst(n).CopyOut(ctx, req.payload) if err != nil { return 0, err } - readCursor += uint32(n) - bytesRead += int64(n) - - if readCursor >= req.hdr.Len { - // Fully done with this req, remove it from the queue. - fd.queue.Remove(req) - break + if written != len(req.payload) { + return 0, syserror.EIO } + n += int(written) } - return bytesRead, nil + // Fully done with this req, remove it from the queue. + fd.queue.Remove(req) + + // Remove noReply ones from map of requests expecting a reply. + if req.noReply { + fd.numActiveRequests -= 1 + delete(fd.completions, req.hdr.Unique) + } + + return int64(n), nil } // PWrite implements vfs.FileDescriptionImpl.PWrite. func (fd *DeviceFD) PWrite(ctx context.Context, src usermem.IOSequence, offset int64, opts vfs.WriteOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } @@ -211,10 +256,15 @@ func (fd *DeviceFD) Write(ctx context.Context, src usermem.IOSequence, opts vfs. // writeLocked implements writing to the fuse device while locked with DeviceFD.mu. func (fd *DeviceFD) writeLocked(ctx context.Context, src usermem.IOSequence, opts vfs.WriteOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } + // Return ENODEV if the filesystem is umounted. + if fd.fs.umounted { + return 0, syserror.ENODEV + } + var cn, n int64 hdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) @@ -276,7 +326,8 @@ func (fd *DeviceFD) writeLocked(ctx context.Context, src usermem.IOSequence, opt fut, ok := fd.completions[hdr.Unique] if !ok { - // Server sent us a response for a request we never sent? + // Server sent us a response for a request we never sent, + // or for which we already received a reply (e.g. aborted), an unlikely event. return 0, syserror.EINVAL } @@ -307,8 +358,23 @@ func (fd *DeviceFD) writeLocked(ctx context.Context, src usermem.IOSequence, opt // Readiness implements vfs.FileDescriptionImpl.Readiness. func (fd *DeviceFD) Readiness(mask waiter.EventMask) waiter.EventMask { + fd.mu.Lock() + defer fd.mu.Unlock() + return fd.readinessLocked(mask) +} + +// readinessLocked implements checking the readiness of the fuse device while +// locked with DeviceFD.mu. +func (fd *DeviceFD) readinessLocked(mask waiter.EventMask) waiter.EventMask { var ready waiter.EventMask - ready |= waiter.EventOut // FD is always writable + + if fd.fs.umounted { + ready |= waiter.EventErr + return ready & mask + } + + // FD is always writable. + ready |= waiter.EventOut if !fd.queue.Empty() { // Have reqs available, FD is readable. ready |= waiter.EventIn @@ -330,7 +396,7 @@ func (fd *DeviceFD) EventUnregister(e *waiter.Entry) { // Seek implements vfs.FileDescriptionImpl.Seek. func (fd *DeviceFD) Seek(ctx context.Context, offset int64, whence int32) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } @@ -338,59 +404,59 @@ func (fd *DeviceFD) Seek(ctx context.Context, offset int64, whence int32) (int64 } // sendResponse sends a response to the waiting task (if any). +// +// Preconditions: fd.mu must be held. func (fd *DeviceFD) sendResponse(ctx context.Context, fut *futureResponse) error { - // See if the running task need to perform some action before returning. - // Since we just finished writing the future, we can be sure that - // getResponse generates a populated response. - if err := fd.noReceiverAction(ctx, fut.getResponse()); err != nil { - return err - } + // Signal the task waiting on a response if any. + defer close(fut.ch) // Signal that the queue is no longer full. select { case fd.fullQueueCh <- struct{}{}: default: } - fd.numActiveRequests -= 1 + fd.numActiveRequests-- + + if fut.async { + return fd.asyncCallBack(ctx, fut.getResponse()) + } - // Signal the task waiting on a response. - close(fut.ch) return nil } -// sendError sends an error response to the waiting task (if any). -func (fd *DeviceFD) sendError(ctx context.Context, errno int32, req *Request) error { +// sendError sends an error response to the waiting task (if any) by calling sendResponse(). +// +// Preconditions: fd.mu must be held. +func (fd *DeviceFD) sendError(ctx context.Context, errno int32, unique linux.FUSEOpID) error { // Return the error to the calling task. outHdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) respHdr := linux.FUSEHeaderOut{ Len: outHdrLen, Error: errno, - Unique: req.hdr.Unique, + Unique: unique, } fut, ok := fd.completions[respHdr.Unique] if !ok { - // Server sent us a response for a request we never sent? + // A response for a request we never sent, + // or for which we already received a reply (e.g. aborted). return syserror.EINVAL } delete(fd.completions, respHdr.Unique) fut.hdr = &respHdr - if err := fd.sendResponse(ctx, fut); err != nil { - return err - } - - return nil + return fd.sendResponse(ctx, fut) } -// noReceiverAction has the calling kernel.Task do some action if its known that no -// receiver is going to be waiting on the future channel. This is to be used by: -// FUSE_INIT. -func (fd *DeviceFD) noReceiverAction(ctx context.Context, r *Response) error { - if r.opcode == linux.FUSE_INIT { +// asyncCallBack executes pre-defined callback function for async requests. +// Currently used by: FUSE_INIT. +func (fd *DeviceFD) asyncCallBack(ctx context.Context, r *Response) error { + switch r.opcode { + case linux.FUSE_INIT: creds := auth.CredentialsFromContext(ctx) rootUserNs := kernel.KernelFromContext(ctx).RootUserNamespace() return fd.fs.conn.InitRecv(r, creds.HasCapabilityIn(linux.CAP_SYS_ADMIN, rootUserNs)) + // TODO(gvisor.dev/issue/3247): support async read: correctly process the response. } return nil diff --git a/pkg/sentry/fsimpl/fuse/dev_test.go b/pkg/sentry/fsimpl/fuse/dev_test.go index 1ffe7ccd2..5986133e9 100644 --- a/pkg/sentry/fsimpl/fuse/dev_test.go +++ b/pkg/sentry/fsimpl/fuse/dev_test.go @@ -16,7 +16,6 @@ package fuse import ( "fmt" - "io" "math/rand" "testing" @@ -28,17 +27,12 @@ import ( "gvisor.dev/gvisor/pkg/syserror" "gvisor.dev/gvisor/pkg/usermem" "gvisor.dev/gvisor/pkg/waiter" - "gvisor.dev/gvisor/tools/go_marshal/marshal" ) // echoTestOpcode is the Opcode used during testing. The server used in tests // will simply echo the payload back with the appropriate headers. const echoTestOpcode linux.FUSEOpcode = 1000 -type testPayload struct { - data uint32 -} - // TestFUSECommunication tests that the communication layer between the Sentry and the // FUSE server daemon works as expected. func TestFUSECommunication(t *testing.T) { @@ -327,102 +321,3 @@ func fuseServerRun(t *testing.T, s *testutil.System, k *kernel.Kernel, fd *vfs.F } } } - -func setup(t *testing.T) *testutil.System { - k, err := testutil.Boot() - if err != nil { - t.Fatalf("Error creating kernel: %v", err) - } - - ctx := k.SupervisorContext() - creds := auth.CredentialsFromContext(ctx) - - k.VFS().MustRegisterFilesystemType(Name, &FilesystemType{}, &vfs.RegisterFilesystemTypeOptions{ - AllowUserList: true, - AllowUserMount: true, - }) - - mntns, err := k.VFS().NewMountNamespace(ctx, creds, "", "tmpfs", &vfs.GetFilesystemOptions{}) - if err != nil { - t.Fatalf("NewMountNamespace(): %v", err) - } - - return testutil.NewSystem(ctx, t, k.VFS(), mntns) -} - -// newTestConnection creates a fuse connection that the sentry can communicate with -// and the FD for the server to communicate with. -func newTestConnection(system *testutil.System, k *kernel.Kernel, maxActiveRequests uint64) (*connection, *vfs.FileDescription, error) { - vfsObj := &vfs.VirtualFilesystem{} - fuseDev := &DeviceFD{} - - if err := vfsObj.Init(system.Ctx); err != nil { - return nil, nil, err - } - - vd := vfsObj.NewAnonVirtualDentry("genCountFD") - defer vd.DecRef(system.Ctx) - if err := fuseDev.vfsfd.Init(fuseDev, linux.O_RDWR|linux.O_CREAT, vd.Mount(), vd.Dentry(), &vfs.FileDescriptionOptions{}); err != nil { - return nil, nil, err - } - - fsopts := filesystemOptions{ - maxActiveRequests: maxActiveRequests, - } - fs, err := NewFUSEFilesystem(system.Ctx, 0, &fsopts, &fuseDev.vfsfd) - if err != nil { - return nil, nil, err - } - - return fs.conn, &fuseDev.vfsfd, nil -} - -// SizeBytes implements marshal.Marshallable.SizeBytes. -func (t *testPayload) SizeBytes() int { - return 4 -} - -// MarshalBytes implements marshal.Marshallable.MarshalBytes. -func (t *testPayload) MarshalBytes(dst []byte) { - usermem.ByteOrder.PutUint32(dst[:4], t.data) -} - -// UnmarshalBytes implements marshal.Marshallable.UnmarshalBytes. -func (t *testPayload) UnmarshalBytes(src []byte) { - *t = testPayload{data: usermem.ByteOrder.Uint32(src[:4])} -} - -// Packed implements marshal.Marshallable.Packed. -func (t *testPayload) Packed() bool { - return true -} - -// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe. -func (t *testPayload) MarshalUnsafe(dst []byte) { - t.MarshalBytes(dst) -} - -// UnmarshalUnsafe implements marshal.Marshallable.UnmarshalUnsafe. -func (t *testPayload) UnmarshalUnsafe(src []byte) { - t.UnmarshalBytes(src) -} - -// CopyOutN implements marshal.Marshallable.CopyOutN. -func (t *testPayload) CopyOutN(task marshal.Task, addr usermem.Addr, limit int) (int, error) { - panic("not implemented") -} - -// CopyOut implements marshal.Marshallable.CopyOut. -func (t *testPayload) CopyOut(task marshal.Task, addr usermem.Addr) (int, error) { - panic("not implemented") -} - -// CopyIn implements marshal.Marshallable.CopyIn. -func (t *testPayload) CopyIn(task marshal.Task, addr usermem.Addr) (int, error) { - panic("not implemented") -} - -// WriteTo implements io.WriterTo.WriteTo. -func (t *testPayload) WriteTo(w io.Writer) (int64, error) { - panic("not implemented") -} diff --git a/pkg/sentry/fsimpl/fuse/directory.go b/pkg/sentry/fsimpl/fuse/directory.go new file mode 100644 index 000000000..8f220a04b --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/directory.go @@ -0,0 +1,105 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "sync/atomic" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/context" + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/sentry/vfs" + "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/usermem" +) + +type directoryFD struct { + fileDescription +} + +// Allocate implements directoryFD.Allocate. +func (*directoryFD) Allocate(ctx context.Context, mode, offset, length uint64) error { + return syserror.EISDIR +} + +// PRead implements vfs.FileDescriptionImpl.PRead. +func (*directoryFD) PRead(ctx context.Context, dst usermem.IOSequence, offset int64, opts vfs.ReadOptions) (int64, error) { + return 0, syserror.EISDIR +} + +// Read implements vfs.FileDescriptionImpl.Read. +func (*directoryFD) Read(ctx context.Context, dst usermem.IOSequence, opts vfs.ReadOptions) (int64, error) { + return 0, syserror.EISDIR +} + +// PWrite implements vfs.FileDescriptionImpl.PWrite. +func (*directoryFD) PWrite(ctx context.Context, src usermem.IOSequence, offset int64, opts vfs.WriteOptions) (int64, error) { + return 0, syserror.EISDIR +} + +// Write implements vfs.FileDescriptionImpl.Write. +func (*directoryFD) Write(ctx context.Context, src usermem.IOSequence, opts vfs.WriteOptions) (int64, error) { + return 0, syserror.EISDIR +} + +// IterDirents implements vfs.FileDescriptionImpl.IterDirents. +func (dir *directoryFD) IterDirents(ctx context.Context, callback vfs.IterDirentsCallback) error { + fusefs := dir.inode().fs + task, creds := kernel.TaskFromContext(ctx), auth.CredentialsFromContext(ctx) + + in := linux.FUSEReadIn{ + Fh: dir.Fh, + Offset: uint64(atomic.LoadInt64(&dir.off)), + Size: linux.FUSE_PAGE_SIZE, + Flags: dir.statusFlags(), + } + + // TODO(gVisor.dev/issue/3404): Support FUSE_READDIRPLUS. + req, err := fusefs.conn.NewRequest(creds, uint32(task.ThreadID()), dir.inode().nodeID, linux.FUSE_READDIR, &in) + if err != nil { + return err + } + + res, err := fusefs.conn.Call(task, req) + if err != nil { + return err + } + if err := res.Error(); err != nil { + return err + } + + var out linux.FUSEDirents + if err := res.UnmarshalPayload(&out); err != nil { + return err + } + + for _, fuseDirent := range out.Dirents { + nextOff := int64(fuseDirent.Meta.Off) + dirent := vfs.Dirent{ + Name: fuseDirent.Name, + Type: uint8(fuseDirent.Meta.Type), + Ino: fuseDirent.Meta.Ino, + NextOff: nextOff, + } + + if err := callback.Handle(dirent); err != nil { + return err + } + atomic.StoreInt64(&dir.off, nextOff) + } + + return nil +} diff --git a/pkg/sentry/fsimpl/fuse/file.go b/pkg/sentry/fsimpl/fuse/file.go new file mode 100644 index 000000000..83f2816b7 --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/file.go @@ -0,0 +1,133 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/context" + "gvisor.dev/gvisor/pkg/sentry/fsimpl/kernfs" + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/sentry/vfs" + "gvisor.dev/gvisor/pkg/usermem" +) + +// fileDescription implements vfs.FileDescriptionImpl for fuse. +type fileDescription struct { + vfsfd vfs.FileDescription + vfs.FileDescriptionDefaultImpl + vfs.DentryMetadataFileDescriptionImpl + vfs.NoLockFD + + // the file handle used in userspace. + Fh uint64 + + // Nonseekable is indicate cannot perform seek on a file. + Nonseekable bool + + // DirectIO suggest fuse to use direct io operation. + DirectIO bool + + // OpenFlag is the flag returned by open. + OpenFlag uint32 + + // off is the file offset. + off int64 +} + +func (fd *fileDescription) dentry() *kernfs.Dentry { + return fd.vfsfd.Dentry().Impl().(*kernfs.Dentry) +} + +func (fd *fileDescription) inode() *inode { + return fd.dentry().Inode().(*inode) +} + +func (fd *fileDescription) filesystem() *vfs.Filesystem { + return fd.vfsfd.VirtualDentry().Mount().Filesystem() +} + +func (fd *fileDescription) statusFlags() uint32 { + return fd.vfsfd.StatusFlags() +} + +// Release implements vfs.FileDescriptionImpl.Release. +func (fd *fileDescription) Release(ctx context.Context) { + // no need to release if FUSE server doesn't implement Open. + conn := fd.inode().fs.conn + if conn.noOpen { + return + } + + in := linux.FUSEReleaseIn{ + Fh: fd.Fh, + Flags: fd.statusFlags(), + } + // TODO(gvisor.dev/issue/3245): add logic when we support file lock owner. + var opcode linux.FUSEOpcode + if fd.inode().Mode().IsDir() { + opcode = linux.FUSE_RELEASEDIR + } else { + opcode = linux.FUSE_RELEASE + } + kernelTask := kernel.TaskFromContext(ctx) + // ignoring errors and FUSE server reply is analogous to Linux's behavior. + req, err := conn.NewRequest(auth.CredentialsFromContext(ctx), uint32(kernelTask.ThreadID()), fd.inode().nodeID, opcode, &in) + if err != nil { + // No way to invoke Call() with an errored request. + return + } + // The reply will be ignored since no callback is defined in asyncCallBack(). + conn.CallAsync(kernelTask, req) +} + +// PRead implements vfs.FileDescriptionImpl.PRead. +func (fd *fileDescription) PRead(ctx context.Context, dst usermem.IOSequence, offset int64, opts vfs.ReadOptions) (int64, error) { + return 0, nil +} + +// Read implements vfs.FileDescriptionImpl.Read. +func (fd *fileDescription) Read(ctx context.Context, dst usermem.IOSequence, opts vfs.ReadOptions) (int64, error) { + return 0, nil +} + +// PWrite implements vfs.FileDescriptionImpl.PWrite. +func (fd *fileDescription) PWrite(ctx context.Context, src usermem.IOSequence, offset int64, opts vfs.WriteOptions) (int64, error) { + return 0, nil +} + +// Write implements vfs.FileDescriptionImpl.Write. +func (fd *fileDescription) Write(ctx context.Context, src usermem.IOSequence, opts vfs.WriteOptions) (int64, error) { + return 0, nil +} + +// Seek implements vfs.FileDescriptionImpl.Seek. +func (fd *fileDescription) Seek(ctx context.Context, offset int64, whence int32) (int64, error) { + return 0, nil +} + +// Stat implements vfs.FileDescriptionImpl.Stat. +func (fd *fileDescription) Stat(ctx context.Context, opts vfs.StatOptions) (linux.Statx, error) { + fs := fd.filesystem() + inode := fd.inode() + return inode.Stat(ctx, fs, opts) +} + +// SetStat implements vfs.FileDescriptionImpl.SetStat. +func (fd *fileDescription) SetStat(ctx context.Context, opts vfs.SetStatOptions) error { + fs := fd.filesystem() + creds := auth.CredentialsFromContext(ctx) + return fd.inode().setAttr(ctx, fs, creds, opts, true, fd.Fh) +} diff --git a/pkg/sentry/fsimpl/fuse/fusefs.go b/pkg/sentry/fsimpl/fuse/fusefs.go index 83c24ec25..e7ef5998e 100644 --- a/pkg/sentry/fsimpl/fuse/fusefs.go +++ b/pkg/sentry/fsimpl/fuse/fusefs.go @@ -16,24 +16,36 @@ package fuse import ( + "math" "strconv" + "sync" + "sync/atomic" "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/log" + "gvisor.dev/gvisor/pkg/marshal" "gvisor.dev/gvisor/pkg/sentry/fsimpl/kernfs" "gvisor.dev/gvisor/pkg/sentry/kernel" "gvisor.dev/gvisor/pkg/sentry/kernel/auth" "gvisor.dev/gvisor/pkg/sentry/vfs" "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/waiter" ) // Name is the default filesystem name. const Name = "fuse" +// maxActiveRequestsDefault is the default setting controlling the upper bound +// on the number of active requests at any given time. +const maxActiveRequestsDefault = 10000 + // FilesystemType implements vfs.FilesystemType. +// +// +stateify savable type FilesystemType struct{} +// +stateify savable type filesystemOptions struct { // userID specifies the numeric uid of the mount owner. // This option should not be specified by the filesystem owner. @@ -56,9 +68,16 @@ type filesystemOptions struct { // exist at any time. Any further requests will block when trying to // Call the server. maxActiveRequests uint64 + + // maxRead is the max number of bytes to read, + // specified as "max_read" in fs parameters. + // If not specified by user, use math.MaxUint32 as default value. + maxRead uint32 } // filesystem implements vfs.FilesystemImpl. +// +// +stateify savable type filesystem struct { kernfs.Filesystem devMinor uint32 @@ -69,6 +88,9 @@ type filesystem struct { // opts is the options the fusefs is initialized with. opts *filesystemOptions + + // umounted is true if filesystem.Release() has been called. + umounted bool } // Name implements vfs.FilesystemType.Name. @@ -76,6 +98,9 @@ func (FilesystemType) Name() string { return Name } +// Release implements vfs.FilesystemType.Release. +func (FilesystemType) Release(ctx context.Context) {} + // GetFilesystem implements vfs.FilesystemType.GetFilesystem. func (fsType FilesystemType) GetFilesystem(ctx context.Context, vfsObj *vfs.VirtualFilesystem, creds *auth.Credentials, source string, opts vfs.GetFilesystemOptions) (*vfs.Filesystem, *vfs.Dentry, error) { devMinor, err := vfsObj.GetAnonBlockDevMinor() @@ -142,14 +167,29 @@ func (fsType FilesystemType) GetFilesystem(ctx context.Context, vfsObj *vfs.Virt // Set the maxInFlightRequests option. fsopts.maxActiveRequests = maxActiveRequestsDefault + if maxReadStr, ok := mopts["max_read"]; ok { + delete(mopts, "max_read") + maxRead, err := strconv.ParseUint(maxReadStr, 10, 32) + if err != nil { + log.Warningf("%s.GetFilesystem: invalid max_read: max_read=%s", fsType.Name(), maxReadStr) + return nil, nil, syserror.EINVAL + } + if maxRead < fuseMinMaxRead { + maxRead = fuseMinMaxRead + } + fsopts.maxRead = uint32(maxRead) + } else { + fsopts.maxRead = math.MaxUint32 + } + // Check for unparsed options. if len(mopts) != 0 { - log.Warningf("%s.GetFilesystem: unknown options: %v", fsType.Name(), mopts) + log.Warningf("%s.GetFilesystem: unsupported or unknown options: %v", fsType.Name(), mopts) return nil, nil, syserror.EINVAL } // Create a new FUSE filesystem. - fs, err := NewFUSEFilesystem(ctx, devMinor, &fsopts, fuseFd) + fs, err := newFUSEFilesystem(ctx, devMinor, &fsopts, fuseFd) if err != nil { log.Warningf("%s.NewFUSEFilesystem: failed with error: %v", fsType.Name(), err) return nil, nil, err @@ -165,26 +205,28 @@ func (fsType FilesystemType) GetFilesystem(ctx context.Context, vfsObj *vfs.Virt } // root is the fusefs root directory. - root := fs.newInode(creds, fsopts.rootMode) + root := fs.newRootInode(ctx, creds, fsopts.rootMode) return fs.VFSFilesystem(), root.VFSDentry(), nil } -// NewFUSEFilesystem creates a new FUSE filesystem. -func NewFUSEFilesystem(ctx context.Context, devMinor uint32, opts *filesystemOptions, device *vfs.FileDescription) (*filesystem, error) { - fs := &filesystem{ - devMinor: devMinor, - opts: opts, - } - - conn, err := newFUSEConnection(ctx, device, opts.maxActiveRequests) +// newFUSEFilesystem creates a new FUSE filesystem. +func newFUSEFilesystem(ctx context.Context, devMinor uint32, opts *filesystemOptions, device *vfs.FileDescription) (*filesystem, error) { + conn, err := newFUSEConnection(ctx, device, opts) if err != nil { log.Warningf("fuse.NewFUSEFilesystem: NewFUSEConnection failed with error: %v", err) return nil, syserror.EINVAL } - fs.conn = conn fuseFD := device.Impl().(*DeviceFD) + + fs := &filesystem{ + devMinor: devMinor, + opts: opts, + conn: conn, + } + + fs.VFSFilesystem().IncRef() fuseFD.fs = fs return fs, nil @@ -192,39 +234,361 @@ func NewFUSEFilesystem(ctx context.Context, devMinor uint32, opts *filesystemOpt // Release implements vfs.FilesystemImpl.Release. func (fs *filesystem) Release(ctx context.Context) { + fs.conn.fd.mu.Lock() + + fs.umounted = true + fs.conn.Abort(ctx) + // Notify all the waiters on this fd. + fs.conn.fd.waitQueue.Notify(waiter.EventIn) + + fs.conn.fd.mu.Unlock() + fs.Filesystem.VFSFilesystem().VirtualFilesystem().PutAnonBlockDevMinor(fs.devMinor) fs.Filesystem.Release(ctx) } // inode implements kernfs.Inode. +// +// +stateify savable type inode struct { + inodeRefs + kernfs.InodeAlwaysValid kernfs.InodeAttrs - kernfs.InodeNoDynamicLookup - kernfs.InodeNotSymlink kernfs.InodeDirectoryNoNewChildren + kernfs.InodeNotSymlink kernfs.OrderedChildren + // the owning filesystem. fs is immutable. + fs *filesystem + + // metaDataMu protects the metadata of this inode. + metadataMu sync.Mutex + + nodeID uint64 + locks vfs.FileLocks - dentry kernfs.Dentry + // size of the file. + size uint64 + + // attributeVersion is the version of inode's attributes. + attributeVersion uint64 + + // attributeTime is the remaining vaild time of attributes. + attributeTime uint64 + + // version of the inode. + version uint64 + + // link is result of following a symbolic link. + link string } -func (fs *filesystem) newInode(creds *auth.Credentials, mode linux.FileMode) *kernfs.Dentry { - i := &inode{} - i.InodeAttrs.Init(creds, linux.UNNAMED_MAJOR, fs.devMinor, fs.NextIno(), linux.ModeDirectory|0755) +func (fs *filesystem) newRootInode(ctx context.Context, creds *auth.Credentials, mode linux.FileMode) *kernfs.Dentry { + i := &inode{fs: fs, nodeID: 1} + i.InodeAttrs.Init(ctx, creds, linux.UNNAMED_MAJOR, fs.devMinor, 1, linux.ModeDirectory|0755) i.OrderedChildren.Init(kernfs.OrderedChildrenOptions{}) - i.dentry.Init(i) + i.EnableLeakCheck() - return &i.dentry + var d kernfs.Dentry + d.Init(&fs.Filesystem, i) + return &d +} + +func (fs *filesystem) newInode(ctx context.Context, nodeID uint64, attr linux.FUSEAttr) kernfs.Inode { + i := &inode{fs: fs, nodeID: nodeID} + creds := auth.Credentials{EffectiveKGID: auth.KGID(attr.UID), EffectiveKUID: auth.KUID(attr.UID)} + i.InodeAttrs.Init(ctx, &creds, linux.UNNAMED_MAJOR, fs.devMinor, fs.NextIno(), linux.FileMode(attr.Mode)) + atomic.StoreUint64(&i.size, attr.Size) + i.OrderedChildren.Init(kernfs.OrderedChildrenOptions{}) + i.EnableLeakCheck() + return i } // Open implements kernfs.Inode.Open. -func (i *inode) Open(ctx context.Context, rp *vfs.ResolvingPath, vfsd *vfs.Dentry, opts vfs.OpenOptions) (*vfs.FileDescription, error) { - fd, err := kernfs.NewGenericDirectoryFD(rp.Mount(), vfsd, &i.OrderedChildren, &i.locks, &opts) +func (i *inode) Open(ctx context.Context, rp *vfs.ResolvingPath, d *kernfs.Dentry, opts vfs.OpenOptions) (*vfs.FileDescription, error) { + isDir := i.InodeAttrs.Mode().IsDir() + // return error if specified to open directory but inode is not a directory. + if !isDir && opts.Mode.IsDir() { + return nil, syserror.ENOTDIR + } + if opts.Flags&linux.O_LARGEFILE == 0 && atomic.LoadUint64(&i.size) > linux.MAX_NON_LFS { + return nil, syserror.EOVERFLOW + } + + var fd *fileDescription + var fdImpl vfs.FileDescriptionImpl + if isDir { + directoryFD := &directoryFD{} + fd = &(directoryFD.fileDescription) + fdImpl = directoryFD + } else { + regularFD := ®ularFileFD{} + fd = &(regularFD.fileDescription) + fdImpl = regularFD + } + // FOPEN_KEEP_CACHE is the defualt flag for noOpen. + fd.OpenFlag = linux.FOPEN_KEEP_CACHE + + // Only send open request when FUSE server support open or is opening a directory. + if !i.fs.conn.noOpen || isDir { + kernelTask := kernel.TaskFromContext(ctx) + if kernelTask == nil { + log.Warningf("fusefs.Inode.Open: couldn't get kernel task from context") + return nil, syserror.EINVAL + } + + // Build the request. + var opcode linux.FUSEOpcode + if isDir { + opcode = linux.FUSE_OPENDIR + } else { + opcode = linux.FUSE_OPEN + } + + in := linux.FUSEOpenIn{Flags: opts.Flags & ^uint32(linux.O_CREAT|linux.O_EXCL|linux.O_NOCTTY)} + if !i.fs.conn.atomicOTrunc { + in.Flags &= ^uint32(linux.O_TRUNC) + } + + req, err := i.fs.conn.NewRequest(auth.CredentialsFromContext(ctx), uint32(kernelTask.ThreadID()), i.nodeID, opcode, &in) + if err != nil { + return nil, err + } + + // Send the request and receive the reply. + res, err := i.fs.conn.Call(kernelTask, req) + if err != nil { + return nil, err + } + if err := res.Error(); err == syserror.ENOSYS && !isDir { + i.fs.conn.noOpen = true + } else if err != nil { + return nil, err + } else { + out := linux.FUSEOpenOut{} + if err := res.UnmarshalPayload(&out); err != nil { + return nil, err + } + + // Process the reply. + fd.OpenFlag = out.OpenFlag + if isDir { + fd.OpenFlag &= ^uint32(linux.FOPEN_DIRECT_IO) + } + + fd.Fh = out.Fh + } + } + + // TODO(gvisor.dev/issue/3234): invalidate mmap after implemented it for FUSE Inode + fd.DirectIO = fd.OpenFlag&linux.FOPEN_DIRECT_IO != 0 + fdOptions := &vfs.FileDescriptionOptions{} + if fd.OpenFlag&linux.FOPEN_NONSEEKABLE != 0 { + fdOptions.DenyPRead = true + fdOptions.DenyPWrite = true + fd.Nonseekable = true + } + + // If we don't send SETATTR before open (which is indicated by atomicOTrunc) + // and O_TRUNC is set, update the inode's version number and clean existing data + // by setting the file size to 0. + if i.fs.conn.atomicOTrunc && opts.Flags&linux.O_TRUNC != 0 { + i.fs.conn.mu.Lock() + i.fs.conn.attributeVersion++ + i.attributeVersion = i.fs.conn.attributeVersion + atomic.StoreUint64(&i.size, 0) + i.fs.conn.mu.Unlock() + i.attributeTime = 0 + } + + if err := fd.vfsfd.Init(fdImpl, opts.Flags, rp.Mount(), d.VFSDentry(), fdOptions); err != nil { + return nil, err + } + return &fd.vfsfd, nil +} + +// Lookup implements kernfs.Inode.Lookup. +func (i *inode) Lookup(ctx context.Context, name string) (kernfs.Inode, error) { + in := linux.FUSELookupIn{Name: name} + return i.newEntry(ctx, name, 0, linux.FUSE_LOOKUP, &in) +} + +// Keep implements kernfs.Inode.Keep. +func (i *inode) Keep() bool { + // Return true so that kernfs keeps the new dentry pointing to this + // inode in the dentry tree. This is needed because inodes created via + // Lookup are not temporary. They might refer to existing files on server + // that can be Unlink'd/Rmdir'd. + return true +} + +// IterDirents implements kernfs.Inode.IterDirents. +func (*inode) IterDirents(ctx context.Context, mnt *vfs.Mount, callback vfs.IterDirentsCallback, offset, relOffset int64) (int64, error) { + return offset, nil +} + +// NewFile implements kernfs.Inode.NewFile. +func (i *inode) NewFile(ctx context.Context, name string, opts vfs.OpenOptions) (kernfs.Inode, error) { + kernelTask := kernel.TaskFromContext(ctx) + if kernelTask == nil { + log.Warningf("fusefs.Inode.NewFile: couldn't get kernel task from context", i.nodeID) + return nil, syserror.EINVAL + } + in := linux.FUSECreateIn{ + CreateMeta: linux.FUSECreateMeta{ + Flags: opts.Flags, + Mode: uint32(opts.Mode) | linux.S_IFREG, + Umask: uint32(kernelTask.FSContext().Umask()), + }, + Name: name, + } + return i.newEntry(ctx, name, linux.S_IFREG, linux.FUSE_CREATE, &in) +} + +// NewNode implements kernfs.Inode.NewNode. +func (i *inode) NewNode(ctx context.Context, name string, opts vfs.MknodOptions) (kernfs.Inode, error) { + in := linux.FUSEMknodIn{ + MknodMeta: linux.FUSEMknodMeta{ + Mode: uint32(opts.Mode), + Rdev: linux.MakeDeviceID(uint16(opts.DevMajor), opts.DevMinor), + Umask: uint32(kernel.TaskFromContext(ctx).FSContext().Umask()), + }, + Name: name, + } + return i.newEntry(ctx, name, opts.Mode.FileType(), linux.FUSE_MKNOD, &in) +} + +// NewSymlink implements kernfs.Inode.NewSymlink. +func (i *inode) NewSymlink(ctx context.Context, name, target string) (kernfs.Inode, error) { + in := linux.FUSESymLinkIn{ + Name: name, + Target: target, + } + return i.newEntry(ctx, name, linux.S_IFLNK, linux.FUSE_SYMLINK, &in) +} + +// Unlink implements kernfs.Inode.Unlink. +func (i *inode) Unlink(ctx context.Context, name string, child kernfs.Inode) error { + kernelTask := kernel.TaskFromContext(ctx) + if kernelTask == nil { + log.Warningf("fusefs.Inode.newEntry: couldn't get kernel task from context", i.nodeID) + return syserror.EINVAL + } + in := linux.FUSEUnlinkIn{Name: name} + req, err := i.fs.conn.NewRequest(auth.CredentialsFromContext(ctx), uint32(kernelTask.ThreadID()), i.nodeID, linux.FUSE_UNLINK, &in) if err != nil { + return err + } + res, err := i.fs.conn.Call(kernelTask, req) + if err != nil { + return err + } + // only return error, discard res. + return res.Error() +} + +// NewDir implements kernfs.Inode.NewDir. +func (i *inode) NewDir(ctx context.Context, name string, opts vfs.MkdirOptions) (kernfs.Inode, error) { + in := linux.FUSEMkdirIn{ + MkdirMeta: linux.FUSEMkdirMeta{ + Mode: uint32(opts.Mode), + Umask: uint32(kernel.TaskFromContext(ctx).FSContext().Umask()), + }, + Name: name, + } + return i.newEntry(ctx, name, linux.S_IFDIR, linux.FUSE_MKDIR, &in) +} + +// RmDir implements kernfs.Inode.RmDir. +func (i *inode) RmDir(ctx context.Context, name string, child kernfs.Inode) error { + fusefs := i.fs + task, creds := kernel.TaskFromContext(ctx), auth.CredentialsFromContext(ctx) + + in := linux.FUSERmDirIn{Name: name} + req, err := fusefs.conn.NewRequest(creds, uint32(task.ThreadID()), i.nodeID, linux.FUSE_RMDIR, &in) + if err != nil { + return err + } + + res, err := i.fs.conn.Call(task, req) + if err != nil { + return err + } + return res.Error() +} + +// newEntry calls FUSE server for entry creation and allocates corresponding entry according to response. +// Shared by FUSE_MKNOD, FUSE_MKDIR, FUSE_SYMLINK, FUSE_LINK and FUSE_LOOKUP. +func (i *inode) newEntry(ctx context.Context, name string, fileType linux.FileMode, opcode linux.FUSEOpcode, payload marshal.Marshallable) (kernfs.Inode, error) { + kernelTask := kernel.TaskFromContext(ctx) + if kernelTask == nil { + log.Warningf("fusefs.Inode.newEntry: couldn't get kernel task from context", i.nodeID) + return nil, syserror.EINVAL + } + req, err := i.fs.conn.NewRequest(auth.CredentialsFromContext(ctx), uint32(kernelTask.ThreadID()), i.nodeID, opcode, payload) + if err != nil { + return nil, err + } + res, err := i.fs.conn.Call(kernelTask, req) + if err != nil { + return nil, err + } + if err := res.Error(); err != nil { + return nil, err + } + out := linux.FUSEEntryOut{} + if err := res.UnmarshalPayload(&out); err != nil { return nil, err } - return fd.VFSFileDescription(), nil + if opcode != linux.FUSE_LOOKUP && ((out.Attr.Mode&linux.S_IFMT)^uint32(fileType) != 0 || out.NodeID == 0 || out.NodeID == linux.FUSE_ROOT_ID) { + return nil, syserror.EIO + } + child := i.fs.newInode(ctx, out.NodeID, out.Attr) + return child, nil +} + +// Getlink implements kernfs.Inode.Getlink. +func (i *inode) Getlink(ctx context.Context, mnt *vfs.Mount) (vfs.VirtualDentry, string, error) { + path, err := i.Readlink(ctx, mnt) + return vfs.VirtualDentry{}, path, err +} + +// Readlink implements kernfs.Inode.Readlink. +func (i *inode) Readlink(ctx context.Context, mnt *vfs.Mount) (string, error) { + if i.Mode().FileType()&linux.S_IFLNK == 0 { + return "", syserror.EINVAL + } + if len(i.link) == 0 { + kernelTask := kernel.TaskFromContext(ctx) + if kernelTask == nil { + log.Warningf("fusefs.Inode.Readlink: couldn't get kernel task from context") + return "", syserror.EINVAL + } + req, err := i.fs.conn.NewRequest(auth.CredentialsFromContext(ctx), uint32(kernelTask.ThreadID()), i.nodeID, linux.FUSE_READLINK, &linux.FUSEEmptyIn{}) + if err != nil { + return "", err + } + res, err := i.fs.conn.Call(kernelTask, req) + if err != nil { + return "", err + } + i.link = string(res.data[res.hdr.SizeBytes():]) + if !mnt.Options().ReadOnly { + i.attributeTime = 0 + } + } + return i.link, nil +} + +// getFUSEAttr returns a linux.FUSEAttr of this inode stored in local cache. +// TODO(gvisor.dev/issue/3679): Add support for other fields. +func (i *inode) getFUSEAttr() linux.FUSEAttr { + return linux.FUSEAttr{ + Ino: i.Ino(), + Size: atomic.LoadUint64(&i.size), + Mode: uint32(i.Mode()), + } } // statFromFUSEAttr makes attributes from linux.FUSEAttr to linux.Statx. The @@ -280,45 +644,179 @@ func statFromFUSEAttr(attr linux.FUSEAttr, mask, devMinor uint32) linux.Statx { return stat } -// Stat implements kernfs.Inode.Stat. -func (i *inode) Stat(ctx context.Context, fs *vfs.Filesystem, opts vfs.StatOptions) (linux.Statx, error) { - fusefs := fs.Impl().(*filesystem) - conn := fusefs.conn - task, creds := kernel.TaskFromContext(ctx), auth.CredentialsFromContext(ctx) +// getAttr gets the attribute of this inode by issuing a FUSE_GETATTR request +// or read from local cache. It updates the corresponding attributes if +// necessary. +func (i *inode) getAttr(ctx context.Context, fs *vfs.Filesystem, opts vfs.StatOptions, flags uint32, fh uint64) (linux.FUSEAttr, error) { + attributeVersion := atomic.LoadUint64(&i.fs.conn.attributeVersion) + + // TODO(gvisor.dev/issue/3679): send the request only if + // - invalid local cache for fields specified in the opts.Mask + // - forced update + // - i.attributeTime expired + // If local cache is still valid, return local cache. + // Currently we always send a request, + // and we always set the metadata with the new result, + // unless attributeVersion has changed. + + task := kernel.TaskFromContext(ctx) if task == nil { log.Warningf("couldn't get kernel task from context") - return linux.Statx{}, syserror.EINVAL + return linux.FUSEAttr{}, syserror.EINVAL } - var in linux.FUSEGetAttrIn - // We don't set any attribute in the request, because in VFS2 fstat(2) will - // finally be translated into vfs.FilesystemImpl.StatAt() (see - // pkg/sentry/syscalls/linux/vfs2/stat.go), resulting in the same flow - // as stat(2). Thus GetAttrFlags and Fh variable will never be used in VFS2. - req, err := conn.NewRequest(creds, uint32(task.ThreadID()), i.Ino(), linux.FUSE_GETATTR, &in) + creds := auth.CredentialsFromContext(ctx) + + in := linux.FUSEGetAttrIn{ + GetAttrFlags: flags, + Fh: fh, + } + req, err := i.fs.conn.NewRequest(creds, uint32(task.ThreadID()), i.nodeID, linux.FUSE_GETATTR, &in) if err != nil { - return linux.Statx{}, err + return linux.FUSEAttr{}, err } - res, err := conn.Call(task, req) + res, err := i.fs.conn.Call(task, req) if err != nil { - return linux.Statx{}, err + return linux.FUSEAttr{}, err } if err := res.Error(); err != nil { - return linux.Statx{}, err + return linux.FUSEAttr{}, err } var out linux.FUSEGetAttrOut if err := res.UnmarshalPayload(&out); err != nil { - return linux.Statx{}, err + return linux.FUSEAttr{}, err + } + + // Local version is newer, return the local one. + // Skip the update. + if attributeVersion != 0 && atomic.LoadUint64(&i.attributeVersion) > attributeVersion { + return i.getFUSEAttr(), nil } - // Set all metadata into kernfs.InodeAttrs. - if err := i.SetStat(ctx, fs, creds, vfs.SetStatOptions{ - Stat: statFromFUSEAttr(out.Attr, linux.STATX_ALL, fusefs.devMinor), + // Set the metadata of kernfs.InodeAttrs. + if err := i.InodeAttrs.SetStat(ctx, fs, creds, vfs.SetStatOptions{ + Stat: statFromFUSEAttr(out.Attr, linux.STATX_ALL, i.fs.devMinor), }); err != nil { + return linux.FUSEAttr{}, err + } + + // Set the size if no error (after SetStat() check). + atomic.StoreUint64(&i.size, out.Attr.Size) + + return out.Attr, nil +} + +// reviseAttr attempts to update the attributes for internal purposes +// by calling getAttr with a pre-specified mask. +// Used by read, write, lseek. +func (i *inode) reviseAttr(ctx context.Context, flags uint32, fh uint64) error { + // Never need atime for internal purposes. + _, err := i.getAttr(ctx, i.fs.VFSFilesystem(), vfs.StatOptions{ + Mask: linux.STATX_BASIC_STATS &^ linux.STATX_ATIME, + }, flags, fh) + return err +} + +// Stat implements kernfs.Inode.Stat. +func (i *inode) Stat(ctx context.Context, fs *vfs.Filesystem, opts vfs.StatOptions) (linux.Statx, error) { + attr, err := i.getAttr(ctx, fs, opts, 0, 0) + if err != nil { return linux.Statx{}, err } - return statFromFUSEAttr(out.Attr, opts.Mask, fusefs.devMinor), nil + return statFromFUSEAttr(attr, opts.Mask, i.fs.devMinor), nil +} + +// DecRef implements kernfs.Inode.DecRef. +func (i *inode) DecRef(ctx context.Context) { + i.inodeRefs.DecRef(func() { i.Destroy(ctx) }) +} + +// StatFS implements kernfs.Inode.StatFS. +func (i *inode) StatFS(ctx context.Context, fs *vfs.Filesystem) (linux.Statfs, error) { + // TODO(gvisor.dev/issues/3413): Complete the implementation of statfs. + return vfs.GenericStatFS(linux.FUSE_SUPER_MAGIC), nil +} + +// fattrMaskFromStats converts vfs.SetStatOptions.Stat.Mask to linux stats mask +// aligned with the attribute mask defined in include/linux/fs.h. +func fattrMaskFromStats(mask uint32) uint32 { + var fuseAttrMask uint32 + maskMap := map[uint32]uint32{ + linux.STATX_MODE: linux.FATTR_MODE, + linux.STATX_UID: linux.FATTR_UID, + linux.STATX_GID: linux.FATTR_GID, + linux.STATX_SIZE: linux.FATTR_SIZE, + linux.STATX_ATIME: linux.FATTR_ATIME, + linux.STATX_MTIME: linux.FATTR_MTIME, + linux.STATX_CTIME: linux.FATTR_CTIME, + } + for statxMask, fattrMask := range maskMap { + if mask&statxMask != 0 { + fuseAttrMask |= fattrMask + } + } + return fuseAttrMask +} + +// SetStat implements kernfs.Inode.SetStat. +func (i *inode) SetStat(ctx context.Context, fs *vfs.Filesystem, creds *auth.Credentials, opts vfs.SetStatOptions) error { + return i.setAttr(ctx, fs, creds, opts, false, 0) +} + +func (i *inode) setAttr(ctx context.Context, fs *vfs.Filesystem, creds *auth.Credentials, opts vfs.SetStatOptions, useFh bool, fh uint64) error { + conn := i.fs.conn + task := kernel.TaskFromContext(ctx) + if task == nil { + log.Warningf("couldn't get kernel task from context") + return syserror.EINVAL + } + + // We should retain the original file type when assigning new mode. + fileType := uint16(i.Mode()) & linux.S_IFMT + fattrMask := fattrMaskFromStats(opts.Stat.Mask) + if useFh { + fattrMask |= linux.FATTR_FH + } + in := linux.FUSESetAttrIn{ + Valid: fattrMask, + Fh: fh, + Size: opts.Stat.Size, + Atime: uint64(opts.Stat.Atime.Sec), + Mtime: uint64(opts.Stat.Mtime.Sec), + Ctime: uint64(opts.Stat.Ctime.Sec), + AtimeNsec: opts.Stat.Atime.Nsec, + MtimeNsec: opts.Stat.Mtime.Nsec, + CtimeNsec: opts.Stat.Ctime.Nsec, + Mode: uint32(fileType | opts.Stat.Mode), + UID: opts.Stat.UID, + GID: opts.Stat.GID, + } + req, err := conn.NewRequest(creds, uint32(task.ThreadID()), i.nodeID, linux.FUSE_SETATTR, &in) + if err != nil { + return err + } + + res, err := conn.Call(task, req) + if err != nil { + return err + } + if err := res.Error(); err != nil { + return err + } + out := linux.FUSEGetAttrOut{} + if err := res.UnmarshalPayload(&out); err != nil { + return err + } + + // Set the metadata of kernfs.InodeAttrs. + if err := i.InodeAttrs.SetStat(ctx, fs, creds, vfs.SetStatOptions{ + Stat: statFromFUSEAttr(out.Attr, linux.STATX_ALL, i.fs.devMinor), + }); err != nil { + return err + } + + return nil } diff --git a/pkg/sentry/fsimpl/fuse/read_write.go b/pkg/sentry/fsimpl/fuse/read_write.go new file mode 100644 index 000000000..2d396e84c --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/read_write.go @@ -0,0 +1,244 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "io" + "sync/atomic" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/context" + "gvisor.dev/gvisor/pkg/log" + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/usermem" +) + +// ReadInPages sends FUSE_READ requests for the size after round it up to +// a multiple of page size, blocks on it for reply, processes the reply +// and returns the payload (or joined payloads) as a byte slice. +// This is used for the general purpose reading. +// We do not support direct IO (which read the exact number of bytes) +// at this moment. +func (fs *filesystem) ReadInPages(ctx context.Context, fd *regularFileFD, off uint64, size uint32) ([][]byte, uint32, error) { + attributeVersion := atomic.LoadUint64(&fs.conn.attributeVersion) + + t := kernel.TaskFromContext(ctx) + if t == nil { + log.Warningf("fusefs.Read: couldn't get kernel task from context") + return nil, 0, syserror.EINVAL + } + + // Round up to a multiple of page size. + readSize, _ := usermem.PageRoundUp(uint64(size)) + + // One request cannnot exceed either maxRead or maxPages. + maxPages := fs.conn.maxRead >> usermem.PageShift + if maxPages > uint32(fs.conn.maxPages) { + maxPages = uint32(fs.conn.maxPages) + } + + var outs [][]byte + var sizeRead uint32 + + // readSize is a multiple of usermem.PageSize. + // Always request bytes as a multiple of pages. + pagesRead, pagesToRead := uint32(0), uint32(readSize>>usermem.PageShift) + + // Reuse the same struct for unmarshalling to avoid unnecessary memory allocation. + in := linux.FUSEReadIn{ + Fh: fd.Fh, + LockOwner: 0, // TODO(gvisor.dev/issue/3245): file lock + ReadFlags: 0, // TODO(gvisor.dev/issue/3245): |= linux.FUSE_READ_LOCKOWNER + Flags: fd.statusFlags(), + } + + // This loop is intended for fragmented read where the bytes to read is + // larger than either the maxPages or maxRead. + // For the majority of reads with normal size, this loop should only + // execute once. + for pagesRead < pagesToRead { + pagesCanRead := pagesToRead - pagesRead + if pagesCanRead > maxPages { + pagesCanRead = maxPages + } + + in.Offset = off + (uint64(pagesRead) << usermem.PageShift) + in.Size = pagesCanRead << usermem.PageShift + + req, err := fs.conn.NewRequest(auth.CredentialsFromContext(ctx), uint32(t.ThreadID()), fd.inode().nodeID, linux.FUSE_READ, &in) + if err != nil { + return nil, 0, err + } + + // TODO(gvisor.dev/issue/3247): support async read. + + res, err := fs.conn.Call(t, req) + if err != nil { + return nil, 0, err + } + if err := res.Error(); err != nil { + return nil, 0, err + } + + // Not enough bytes in response, + // either we reached EOF, + // or the FUSE server sends back a response + // that cannot even fit the hdr. + if len(res.data) <= res.hdr.SizeBytes() { + // We treat both case as EOF here for now + // since there is no reliable way to detect + // the over-short hdr case. + break + } + + // Directly using the slice to avoid extra copy. + out := res.data[res.hdr.SizeBytes():] + + outs = append(outs, out) + sizeRead += uint32(len(out)) + + pagesRead += pagesCanRead + } + + defer fs.ReadCallback(ctx, fd, off, size, sizeRead, attributeVersion) + + // No bytes returned: offset >= EOF. + if len(outs) == 0 { + return nil, 0, io.EOF + } + + return outs, sizeRead, nil +} + +// ReadCallback updates several information after receiving a read response. +// Due to readahead, sizeRead can be larger than size. +func (fs *filesystem) ReadCallback(ctx context.Context, fd *regularFileFD, off uint64, size uint32, sizeRead uint32, attributeVersion uint64) { + // TODO(gvisor.dev/issue/3247): support async read. + // If this is called by an async read, correctly process it. + // May need to update the signature. + + i := fd.inode() + i.InodeAttrs.TouchAtime(ctx, fd.vfsfd.Mount()) + + // Reached EOF. + if sizeRead < size { + // TODO(gvisor.dev/issue/3630): If we have writeback cache, then we need to fill this hole. + // Might need to update the buf to be returned from the Read(). + + // Update existing size. + newSize := off + uint64(sizeRead) + fs.conn.mu.Lock() + if attributeVersion == i.attributeVersion && newSize < atomic.LoadUint64(&i.size) { + fs.conn.attributeVersion++ + i.attributeVersion = i.fs.conn.attributeVersion + atomic.StoreUint64(&i.size, newSize) + } + fs.conn.mu.Unlock() + } +} + +// Write sends FUSE_WRITE requests and return the bytes +// written according to the response. +// +// Preconditions: len(data) == size. +func (fs *filesystem) Write(ctx context.Context, fd *regularFileFD, off uint64, size uint32, data []byte) (uint32, error) { + t := kernel.TaskFromContext(ctx) + if t == nil { + log.Warningf("fusefs.Read: couldn't get kernel task from context") + return 0, syserror.EINVAL + } + + // One request cannnot exceed either maxWrite or maxPages. + maxWrite := uint32(fs.conn.maxPages) << usermem.PageShift + if maxWrite > fs.conn.maxWrite { + maxWrite = fs.conn.maxWrite + } + + // Reuse the same struct for unmarshalling to avoid unnecessary memory allocation. + in := linux.FUSEWriteIn{ + Fh: fd.Fh, + // TODO(gvisor.dev/issue/3245): file lock + LockOwner: 0, + // TODO(gvisor.dev/issue/3245): |= linux.FUSE_READ_LOCKOWNER + // TODO(gvisor.dev/issue/3237): |= linux.FUSE_WRITE_CACHE (not added yet) + WriteFlags: 0, + Flags: fd.statusFlags(), + } + + inode := fd.inode() + var written uint32 + + // This loop is intended for fragmented write where the bytes to write is + // larger than either the maxWrite or maxPages or when bigWrites is false. + // Unless a small value for max_write is explicitly used, this loop + // is expected to execute only once for the majority of the writes. + for written < size { + toWrite := size - written + + // Limit the write size to one page. + // Note that the bigWrites flag is obsolete, + // latest libfuse always sets it on. + if !fs.conn.bigWrites && toWrite > usermem.PageSize { + toWrite = usermem.PageSize + } + + // Limit the write size to maxWrite. + if toWrite > maxWrite { + toWrite = maxWrite + } + + in.Offset = off + uint64(written) + in.Size = toWrite + + req, err := fs.conn.NewRequest(auth.CredentialsFromContext(ctx), uint32(t.ThreadID()), inode.nodeID, linux.FUSE_WRITE, &in) + if err != nil { + return 0, err + } + + req.payload = data[written : written+toWrite] + + // TODO(gvisor.dev/issue/3247): support async write. + + res, err := fs.conn.Call(t, req) + if err != nil { + return 0, err + } + if err := res.Error(); err != nil { + return 0, err + } + + out := linux.FUSEWriteOut{} + if err := res.UnmarshalPayload(&out); err != nil { + return 0, err + } + + // Write more than requested? EIO. + if out.Size > toWrite { + return 0, syserror.EIO + } + + written += out.Size + + // Break if short write. Not necessarily an error. + if out.Size != toWrite { + break + } + } + inode.InodeAttrs.TouchCMtime(ctx) + + return written, nil +} diff --git a/pkg/sentry/fsimpl/fuse/regular_file.go b/pkg/sentry/fsimpl/fuse/regular_file.go new file mode 100644 index 000000000..5bdd096c3 --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/regular_file.go @@ -0,0 +1,230 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "io" + "math" + "sync" + "sync/atomic" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/context" + "gvisor.dev/gvisor/pkg/sentry/vfs" + "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/usermem" +) + +type regularFileFD struct { + fileDescription + + // off is the file offset. + off int64 + // offMu protects off. + offMu sync.Mutex +} + +// PRead implements vfs.FileDescriptionImpl.PRead. +func (fd *regularFileFD) PRead(ctx context.Context, dst usermem.IOSequence, offset int64, opts vfs.ReadOptions) (int64, error) { + if offset < 0 { + return 0, syserror.EINVAL + } + + // Check that flags are supported. + // + // TODO(gvisor.dev/issue/2601): Support select preadv2 flags. + if opts.Flags&^linux.RWF_HIPRI != 0 { + return 0, syserror.EOPNOTSUPP + } + + size := dst.NumBytes() + if size == 0 { + // Early return if count is 0. + return 0, nil + } else if size > math.MaxUint32 { + // FUSE only supports uint32 for size. + // Overflow. + return 0, syserror.EINVAL + } + + // TODO(gvisor.dev/issue/3678): Add direct IO support. + + inode := fd.inode() + + // Reading beyond EOF, update file size if outdated. + if uint64(offset+size) > atomic.LoadUint64(&inode.size) { + if err := inode.reviseAttr(ctx, linux.FUSE_GETATTR_FH, fd.Fh); err != nil { + return 0, err + } + // If the offset after update is still too large, return error. + if uint64(offset) >= atomic.LoadUint64(&inode.size) { + return 0, io.EOF + } + } + + // Truncate the read with updated file size. + fileSize := atomic.LoadUint64(&inode.size) + if uint64(offset+size) > fileSize { + size = int64(fileSize) - offset + } + + buffers, n, err := inode.fs.ReadInPages(ctx, fd, uint64(offset), uint32(size)) + if err != nil { + return 0, err + } + + // TODO(gvisor.dev/issue/3237): support indirect IO (e.g. caching), + // store the bytes that were read ahead. + + // Update the number of bytes to copy for short read. + if n < uint32(size) { + size = int64(n) + } + + // Copy the bytes read to the dst. + // This loop is intended for fragmented reads. + // For the majority of reads, this loop only execute once. + var copied int64 + for _, buffer := range buffers { + toCopy := int64(len(buffer)) + if copied+toCopy > size { + toCopy = size - copied + } + cp, err := dst.DropFirst64(copied).CopyOut(ctx, buffer[:toCopy]) + if err != nil { + return 0, err + } + if int64(cp) != toCopy { + return 0, syserror.EIO + } + copied += toCopy + } + + return copied, nil +} + +// Read implements vfs.FileDescriptionImpl.Read. +func (fd *regularFileFD) Read(ctx context.Context, dst usermem.IOSequence, opts vfs.ReadOptions) (int64, error) { + fd.offMu.Lock() + n, err := fd.PRead(ctx, dst, fd.off, opts) + fd.off += n + fd.offMu.Unlock() + return n, err +} + +// PWrite implements vfs.FileDescriptionImpl.PWrite. +func (fd *regularFileFD) PWrite(ctx context.Context, src usermem.IOSequence, offset int64, opts vfs.WriteOptions) (int64, error) { + n, _, err := fd.pwrite(ctx, src, offset, opts) + return n, err +} + +// Write implements vfs.FileDescriptionImpl.Write. +func (fd *regularFileFD) Write(ctx context.Context, src usermem.IOSequence, opts vfs.WriteOptions) (int64, error) { + fd.offMu.Lock() + n, off, err := fd.pwrite(ctx, src, fd.off, opts) + fd.off = off + fd.offMu.Unlock() + return n, err +} + +// pwrite returns the number of bytes written, final offset and error. The +// final offset should be ignored by PWrite. +func (fd *regularFileFD) pwrite(ctx context.Context, src usermem.IOSequence, offset int64, opts vfs.WriteOptions) (written, finalOff int64, err error) { + if offset < 0 { + return 0, offset, syserror.EINVAL + } + + // Check that flags are supported. + // + // TODO(gvisor.dev/issue/2601): Support select preadv2 flags. + if opts.Flags&^linux.RWF_HIPRI != 0 { + return 0, offset, syserror.EOPNOTSUPP + } + + inode := fd.inode() + inode.metadataMu.Lock() + defer inode.metadataMu.Unlock() + + // If the file is opened with O_APPEND, update offset to file size. + // Note: since our Open() implements the interface of kernfs, + // and kernfs currently does not support O_APPEND, this will never + // be true before we switch out from kernfs. + if fd.vfsfd.StatusFlags()&linux.O_APPEND != 0 { + // Locking inode.metadataMu is sufficient for reading size + offset = int64(inode.size) + } + + srclen := src.NumBytes() + + if srclen > math.MaxUint32 { + // FUSE only supports uint32 for size. + // Overflow. + return 0, offset, syserror.EINVAL + } + if end := offset + srclen; end < offset { + // Overflow. + return 0, offset, syserror.EINVAL + } + + srclen, err = vfs.CheckLimit(ctx, offset, srclen) + if err != nil { + return 0, offset, err + } + + if srclen == 0 { + // Return before causing any side effects. + return 0, offset, nil + } + + src = src.TakeFirst64(srclen) + + // TODO(gvisor.dev/issue/3237): Add cache support: + // buffer cache. Ideally we write from src to our buffer cache first. + // The slice passed to fs.Write() should be a slice from buffer cache. + data := make([]byte, srclen) + // Reason for making a copy here: connection.Call() blocks on kerneltask, + // which in turn acquires mm.activeMu lock. Functions like CopyInTo() will + // attemp to acquire the mm.activeMu lock as well -> deadlock. + // We must finish reading from the userspace memory before + // t.Block() deactivates it. + cp, err := src.CopyIn(ctx, data) + if err != nil { + return 0, offset, err + } + if int64(cp) != srclen { + return 0, offset, syserror.EIO + } + + n, err := fd.inode().fs.Write(ctx, fd, uint64(offset), uint32(srclen), data) + if err != nil { + return 0, offset, err + } + + if n == 0 { + // We have checked srclen != 0 previously. + // If err == nil, then it's a short write and we return EIO. + return 0, offset, syserror.EIO + } + + written = int64(n) + finalOff = offset + written + + if finalOff > int64(inode.size) { + atomic.StoreUint64(&inode.size, uint64(finalOff)) + atomic.AddUint64(&inode.fs.conn.attributeVersion, 1) + } + + return +} diff --git a/pkg/sentry/fsimpl/fuse/request_response.go b/pkg/sentry/fsimpl/fuse/request_response.go new file mode 100644 index 000000000..7fa00569b --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/request_response.go @@ -0,0 +1,229 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "fmt" + "syscall" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/marshal" + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/usermem" +) + +// fuseInitRes is a variable-length wrapper of linux.FUSEInitOut. The FUSE +// server may implement an older version of FUSE protocol, which contains a +// linux.FUSEInitOut with less attributes. +// +// Dynamically-sized objects cannot be marshalled. +type fuseInitRes struct { + marshal.StubMarshallable + + // initOut contains the response from the FUSE server. + initOut linux.FUSEInitOut + + // initLen is the total length of bytes of the response. + initLen uint32 +} + +// UnmarshalBytes deserializes src to the initOut attribute in a fuseInitRes. +func (r *fuseInitRes) UnmarshalBytes(src []byte) { + out := &r.initOut + + // Introduced before FUSE kernel version 7.13. + out.Major = uint32(usermem.ByteOrder.Uint32(src[:4])) + src = src[4:] + out.Minor = uint32(usermem.ByteOrder.Uint32(src[:4])) + src = src[4:] + out.MaxReadahead = uint32(usermem.ByteOrder.Uint32(src[:4])) + src = src[4:] + out.Flags = uint32(usermem.ByteOrder.Uint32(src[:4])) + src = src[4:] + out.MaxBackground = uint16(usermem.ByteOrder.Uint16(src[:2])) + src = src[2:] + out.CongestionThreshold = uint16(usermem.ByteOrder.Uint16(src[:2])) + src = src[2:] + out.MaxWrite = uint32(usermem.ByteOrder.Uint32(src[:4])) + src = src[4:] + + // Introduced in FUSE kernel version 7.23. + if len(src) >= 4 { + out.TimeGran = uint32(usermem.ByteOrder.Uint32(src[:4])) + src = src[4:] + } + // Introduced in FUSE kernel version 7.28. + if len(src) >= 2 { + out.MaxPages = uint16(usermem.ByteOrder.Uint16(src[:2])) + src = src[2:] + } +} + +// SizeBytes is the size of the payload of the FUSE_INIT response. +func (r *fuseInitRes) SizeBytes() int { + return int(r.initLen) +} + +// Ordinary requests have even IDs, while interrupts IDs are odd. +// Used to increment the unique ID for each FUSE request. +var reqIDStep uint64 = 2 + +// Request represents a FUSE operation request that hasn't been sent to the +// server yet. +// +// +stateify savable +type Request struct { + requestEntry + + id linux.FUSEOpID + hdr *linux.FUSEHeaderIn + data []byte + + // payload for this request: extra bytes to write after + // the data slice. Used by FUSE_WRITE. + payload []byte + + // If this request is async. + async bool + // If we don't care its response. + // Manually set by the caller. + noReply bool +} + +// NewRequest creates a new request that can be sent to the FUSE server. +func (conn *connection) NewRequest(creds *auth.Credentials, pid uint32, ino uint64, opcode linux.FUSEOpcode, payload marshal.Marshallable) (*Request, error) { + conn.fd.mu.Lock() + defer conn.fd.mu.Unlock() + conn.fd.nextOpID += linux.FUSEOpID(reqIDStep) + + hdrLen := (*linux.FUSEHeaderIn)(nil).SizeBytes() + hdr := linux.FUSEHeaderIn{ + Len: uint32(hdrLen + payload.SizeBytes()), + Opcode: opcode, + Unique: conn.fd.nextOpID, + NodeID: ino, + UID: uint32(creds.EffectiveKUID), + GID: uint32(creds.EffectiveKGID), + PID: pid, + } + + buf := make([]byte, hdr.Len) + + // TODO(gVisor.dev/issue/3698): Use the unsafe version once go_marshal is safe to use again. + hdr.MarshalBytes(buf[:hdrLen]) + payload.MarshalBytes(buf[hdrLen:]) + + return &Request{ + id: hdr.Unique, + hdr: &hdr, + data: buf, + }, nil +} + +// futureResponse represents an in-flight request, that may or may not have +// completed yet. Convert it to a resolved Response by calling Resolve, but note +// that this may block. +// +// +stateify savable +type futureResponse struct { + opcode linux.FUSEOpcode + ch chan struct{} + hdr *linux.FUSEHeaderOut + data []byte + + // If this request is async. + async bool +} + +// newFutureResponse creates a future response to a FUSE request. +func newFutureResponse(req *Request) *futureResponse { + return &futureResponse{ + opcode: req.hdr.Opcode, + ch: make(chan struct{}), + async: req.async, + } +} + +// resolve blocks the task until the server responds to its corresponding request, +// then returns a resolved response. +func (f *futureResponse) resolve(t *kernel.Task) (*Response, error) { + // Return directly for async requests. + if f.async { + return nil, nil + } + + if err := t.Block(f.ch); err != nil { + return nil, err + } + + return f.getResponse(), nil +} + +// getResponse creates a Response from the data the futureResponse has. +func (f *futureResponse) getResponse() *Response { + return &Response{ + opcode: f.opcode, + hdr: *f.hdr, + data: f.data, + } +} + +// Response represents an actual response from the server, including the +// response payload. +// +// +stateify savable +type Response struct { + opcode linux.FUSEOpcode + hdr linux.FUSEHeaderOut + data []byte +} + +// Error returns the error of the FUSE call. +func (r *Response) Error() error { + errno := r.hdr.Error + if errno >= 0 { + return nil + } + + sysErrNo := syscall.Errno(-errno) + return error(sysErrNo) +} + +// DataLen returns the size of the response without the header. +func (r *Response) DataLen() uint32 { + return r.hdr.Len - uint32(r.hdr.SizeBytes()) +} + +// UnmarshalPayload unmarshals the response data into m. +func (r *Response) UnmarshalPayload(m marshal.Marshallable) error { + hdrLen := r.hdr.SizeBytes() + haveDataLen := r.hdr.Len - uint32(hdrLen) + wantDataLen := uint32(m.SizeBytes()) + + if haveDataLen < wantDataLen { + return fmt.Errorf("payload too small. Minimum data lenth required: %d, but got data length %d", wantDataLen, haveDataLen) + } + + // The response data is empty unless there is some payload. And so, doesn't + // need to be unmarshalled. + if r.data == nil { + return nil + } + + // TODO(gVisor.dev/issue/3698): Use the unsafe version once go_marshal is safe to use again. + m.UnmarshalBytes(r.data[hdrLen:]) + return nil +} diff --git a/pkg/sentry/fsimpl/fuse/utils_test.go b/pkg/sentry/fsimpl/fuse/utils_test.go new file mode 100644 index 000000000..e1d9e3365 --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/utils_test.go @@ -0,0 +1,132 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "io" + "testing" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/marshal" + "gvisor.dev/gvisor/pkg/sentry/fsimpl/testutil" + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/sentry/vfs" + "gvisor.dev/gvisor/pkg/usermem" +) + +func setup(t *testing.T) *testutil.System { + k, err := testutil.Boot() + if err != nil { + t.Fatalf("Error creating kernel: %v", err) + } + + ctx := k.SupervisorContext() + creds := auth.CredentialsFromContext(ctx) + + k.VFS().MustRegisterFilesystemType(Name, &FilesystemType{}, &vfs.RegisterFilesystemTypeOptions{ + AllowUserList: true, + AllowUserMount: true, + }) + + mntns, err := k.VFS().NewMountNamespace(ctx, creds, "", "tmpfs", &vfs.MountOptions{}) + if err != nil { + t.Fatalf("NewMountNamespace(): %v", err) + } + + return testutil.NewSystem(ctx, t, k.VFS(), mntns) +} + +// newTestConnection creates a fuse connection that the sentry can communicate with +// and the FD for the server to communicate with. +func newTestConnection(system *testutil.System, k *kernel.Kernel, maxActiveRequests uint64) (*connection, *vfs.FileDescription, error) { + vfsObj := &vfs.VirtualFilesystem{} + fuseDev := &DeviceFD{} + + if err := vfsObj.Init(system.Ctx); err != nil { + return nil, nil, err + } + + vd := vfsObj.NewAnonVirtualDentry("genCountFD") + defer vd.DecRef(system.Ctx) + if err := fuseDev.vfsfd.Init(fuseDev, linux.O_RDWR|linux.O_CREAT, vd.Mount(), vd.Dentry(), &vfs.FileDescriptionOptions{}); err != nil { + return nil, nil, err + } + + fsopts := filesystemOptions{ + maxActiveRequests: maxActiveRequests, + } + fs, err := newFUSEFilesystem(system.Ctx, 0, &fsopts, &fuseDev.vfsfd) + if err != nil { + return nil, nil, err + } + + return fs.conn, &fuseDev.vfsfd, nil +} + +type testPayload struct { + marshal.StubMarshallable + data uint32 +} + +// SizeBytes implements marshal.Marshallable.SizeBytes. +func (t *testPayload) SizeBytes() int { + return 4 +} + +// MarshalBytes implements marshal.Marshallable.MarshalBytes. +func (t *testPayload) MarshalBytes(dst []byte) { + usermem.ByteOrder.PutUint32(dst[:4], t.data) +} + +// UnmarshalBytes implements marshal.Marshallable.UnmarshalBytes. +func (t *testPayload) UnmarshalBytes(src []byte) { + *t = testPayload{data: usermem.ByteOrder.Uint32(src[:4])} +} + +// Packed implements marshal.Marshallable.Packed. +func (t *testPayload) Packed() bool { + return true +} + +// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe. +func (t *testPayload) MarshalUnsafe(dst []byte) { + t.MarshalBytes(dst) +} + +// UnmarshalUnsafe implements marshal.Marshallable.UnmarshalUnsafe. +func (t *testPayload) UnmarshalUnsafe(src []byte) { + t.UnmarshalBytes(src) +} + +// CopyOutN implements marshal.Marshallable.CopyOutN. +func (t *testPayload) CopyOutN(task marshal.CopyContext, addr usermem.Addr, limit int) (int, error) { + panic("not implemented") +} + +// CopyOut implements marshal.Marshallable.CopyOut. +func (t *testPayload) CopyOut(task marshal.CopyContext, addr usermem.Addr) (int, error) { + panic("not implemented") +} + +// CopyIn implements marshal.Marshallable.CopyIn. +func (t *testPayload) CopyIn(task marshal.CopyContext, addr usermem.Addr) (int, error) { + panic("not implemented") +} + +// WriteTo implements io.WriterTo.WriteTo. +func (t *testPayload) WriteTo(w io.Writer) (int64, error) { + panic("not implemented") +} |