From 2e19a8b951e9402b28b4e601e65c51e69c815db1 Mon Sep 17 00:00:00 2001 From: Jinmou Li Date: Sat, 27 Jun 2020 02:14:56 +0000 Subject: Add FUSE_INIT This change allows the sentry to send FUSE_INIT request and process the reply. It adds the corresponding structs, employs the fuse device to send and read the message, and stores the results of negotiation in corresponding places (inside connection struct). It adds a CallAsync() function to the FUSE connection interface: - like Call(), but it's for requests that do not expect immediate response (init, release, interrupt etc.) - will block if the connection hasn't initialized, which is the same for Call() --- pkg/sentry/fsimpl/fuse/connection.go | 234 +++++++++++++++++++++++++++++++---- 1 file changed, 208 insertions(+), 26 deletions(-) (limited to 'pkg/sentry/fsimpl/fuse/connection.go') diff --git a/pkg/sentry/fsimpl/fuse/connection.go b/pkg/sentry/fsimpl/fuse/connection.go index f330da0bd..6df2728ab 100644 --- a/pkg/sentry/fsimpl/fuse/connection.go +++ b/pkg/sentry/fsimpl/fuse/connection.go @@ -17,6 +17,8 @@ package fuse import ( "errors" "fmt" + "sync" + "sync/atomic" "syscall" "gvisor.dev/gvisor/pkg/abi/linux" @@ -25,18 +27,29 @@ import ( "gvisor.dev/gvisor/pkg/sentry/kernel" "gvisor.dev/gvisor/pkg/sentry/kernel/auth" "gvisor.dev/gvisor/pkg/sentry/vfs" + "gvisor.dev/gvisor/pkg/syserror" "gvisor.dev/gvisor/pkg/waiter" "gvisor.dev/gvisor/tools/go_marshal/marshal" ) -// MaxActiveRequestsDefault is the default setting controlling the upper bound +// maxActiveRequestsDefault is the default setting controlling the upper bound // on the number of active requests at any given time. -const MaxActiveRequestsDefault = 10000 +const maxActiveRequestsDefault = 10000 -var ( - // Ordinary requests have even IDs, while interrupts IDs are odd. - InitReqBit uint64 = 1 - ReqIDStep uint64 = 2 +// Ordinary requests have even IDs, while interrupts IDs are odd. +// Used to increment the unique ID for each FUSE request. +var reqIDStep uint64 = 2 + +const ( + // fuseDefaultMaxBackground is the default value for MaxBackground. + fuseDefaultMaxBackground = 12 + + // fuseDefaultCongestionThreshold is the default value for CongestionThreshold, + // and is 75% of the default maximum of MaxGround. + fuseDefaultCongestionThreshold = (fuseDefaultMaxBackground * 3 / 4) + + // fuseDefaultMaxPagesPerReq is the default value for MaxPagesPerReq. + fuseDefaultMaxPagesPerReq = 32 ) // Request represents a FUSE operation request that hasn't been sent to the @@ -61,17 +74,125 @@ type Response struct { data []byte } -// Connection is the struct by which the sentry communicates with the FUSE server daemon. -type Connection struct { +// connection is the struct by which the sentry communicates with the FUSE server daemon. +type connection struct { fd *DeviceFD - // MaxWrite is the daemon's maximum size of a write buffer. - // This is negotiated during FUSE_INIT. - MaxWrite uint32 + // The following FUSE_INIT flags are currently unsupported by this implementation: + // - FUSE_ATOMIC_O_TRUNC: requires open(..., O_TRUNC) + // - FUSE_EXPORT_SUPPORT + // - FUSE_HANDLE_KILLPRIV + // - FUSE_POSIX_LOCKS: requires POSIX locks + // - FUSE_FLOCK_LOCKS: requires POSIX locks + // - FUSE_AUTO_INVAL_DATA: requires page caching eviction + // - FUSE_EXPLICIT_INVAL_DATA: requires page caching eviction + // - FUSE_DO_READDIRPLUS/FUSE_READDIRPLUS_AUTO: requires FUSE_READDIRPLUS implementation + // - FUSE_ASYNC_DIO + // - FUSE_POSIX_ACL: affects defaultPermissions, posixACL, xattr handler + + // initialized after receiving FUSE_INIT reply. + // Until it's set, suspend sending FUSE requests. + // Use SetInitialized() and IsInitialized() for atomic access. + initialized int32 + + // initializedChan is used to block requests before initialization. + initializedChan chan struct{} + + // blocked when there are too many outstading backgrounds requests (NumBackground == MaxBackground). + // TODO(gvisor.dev/issue/3185): update the numBackground accordingly; use a channel to block. + blocked bool + + // connected (connection established) when a new FUSE file system is created. + // Set to false when: + // umount, + // connection abort, + // device release. + connected bool + + // aborted via sysfs. + // TODO(gvisor.dev/issue/3185): abort all queued requests. + aborted bool + + // connInitError if FUSE_INIT encountered error (major version mismatch). + // Only set in INIT. + connInitError bool + + // connInitSuccess if FUSE_INIT is successful. + // Only set in INIT. + // Used for destory. + connInitSuccess bool + + // TODO(gvisor.dev/issue/3185): All the queue logic are working in progress. + + // NumberBackground is the number of requests in the background. + numBackground uint16 + + // congestionThreshold for NumBackground. + // Negotiated in FUSE_INIT. + congestionThreshold uint16 + + // maxBackground is the maximum number of NumBackground. + // Block connection when it is reached. + // Negotiated in FUSE_INIT. + maxBackground uint16 + + // numActiveBackground is the number of requests in background and has being marked as active. + numActiveBackground uint16 + + // numWating is the number of requests waiting for completion. + numWaiting uint32 + + // TODO(gvisor.dev/issue/3185): BgQueue + // some queue for background queued requests. + + // bgLock protects: + // MaxBackground, CongestionThreshold, NumBackground, + // NumActiveBackground, BgQueue, Blocked. + bgLock sync.Mutex + + // maxRead is the maximum size of a read buffer in in bytes. + maxRead uint32 + + // maxWrite is the maximum size of a write buffer in bytes. + // Negotiated in FUSE_INIT. + maxWrite uint32 + + // maxPages is the maximum number of pages for a single request to use. + // Negotiated in FUSE_INIT. + maxPages uint16 + + // minor version of the FUSE protocol. + // Negotiated and only set in INIT. + minor uint32 + + // asyncRead if read pages asynchronously. + // Negotiated and only set in INIT. + asyncRead bool + + // abortErr is true if kernel need to return an unique read error after abort. + // Negotiated and only set in INIT. + abortErr bool + + // writebackCache is true for write-back cache policy, + // false for write-through policy. + // Negotiated and only set in INIT. + writebackCache bool + + // cacheSymlinks if filesystem needs to cache READLINK responses in page cache. + // Negotiated and only set in INIT. + cacheSymlinks bool + + // bigWrites if doing multi-page cached writes. + // Negotiated and only set in INIT. + bigWrites bool + + // dontMask if filestestem does not apply umask to creation modes. + // Negotiated in INIT. + dontMask bool } -// NewFUSEConnection creates a FUSE connection to fd -func NewFUSEConnection(_ context.Context, fd *vfs.FileDescription, maxInFlightRequests uint64) (*Connection, error) { +// newFUSEConnection creates a FUSE connection to fd. +func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, maxInFlightRequests uint64) (*connection, error) { // Mark the device as ready so it can be used. /dev/fuse can only be used if the FD was used to // mount a FUSE filesystem. fuseFD := fd.Impl().(*DeviceFD) @@ -84,16 +205,41 @@ func NewFUSEConnection(_ context.Context, fd *vfs.FileDescription, maxInFlightRe fuseFD.fullQueueCh = make(chan struct{}, maxInFlightRequests) fuseFD.writeCursor = 0 - return &Connection{ - fd: fuseFD, + return &connection{ + fd: fuseFD, + maxBackground: fuseDefaultMaxBackground, + congestionThreshold: fuseDefaultCongestionThreshold, + maxPages: fuseDefaultMaxPagesPerReq, + initializedChan: make(chan struct{}), + connected: true, }, nil } +// SetInitialized atomically sets the connection as initialized. +func (conn *connection) SetInitialized() { + // Unblock the requests sent before INIT. + close(conn.initializedChan) + + // Close the channel first to avoid the non-atomic situation + // where conn.initialized is true but there are + // tasks being blocked on the channel. + // And it prevents the newer tasks from gaining + // unnecessary higher chance to be issued before the blocked one. + + atomic.StoreInt32(&(conn.initialized), int32(1)) +} + +// IsInitialized atomically check if the connection is initialized. +// pairs with SetInitialized(). +func (conn *connection) Initialized() bool { + return atomic.LoadInt32(&(conn.initialized)) != 0 +} + // NewRequest creates a new request that can be sent to the FUSE server. -func (conn *Connection) NewRequest(creds *auth.Credentials, pid uint32, ino uint64, opcode linux.FUSEOpcode, payload marshal.Marshallable) (*Request, error) { +func (conn *connection) NewRequest(creds *auth.Credentials, pid uint32, ino uint64, opcode linux.FUSEOpcode, payload marshal.Marshallable) (*Request, error) { conn.fd.mu.Lock() defer conn.fd.mu.Unlock() - conn.fd.nextOpID += linux.FUSEOpID(ReqIDStep) + conn.fd.nextOpID += linux.FUSEOpID(reqIDStep) hdrLen := (*linux.FUSEHeaderIn)(nil).SizeBytes() hdr := linux.FUSEHeaderIn{ @@ -118,13 +264,49 @@ func (conn *Connection) NewRequest(creds *auth.Credentials, pid uint32, ino uint } // Call makes a request to the server and blocks the invoking task until a -// server responds with a response. -// NOTE: If no task is provided then the Call will simply enqueue the request -// and return a nil response. No blocking will happen in this case. Instead, -// this is used to signify that the processing of this request will happen by -// the kernel.Task that writes the response. See FUSE_INIT for such an -// invocation. -func (conn *Connection) Call(t *kernel.Task, r *Request) (*Response, error) { +// server responds with a response. Task should never be nil. +// Requests will not be sent before the connection is initialized. +// For async tasks, use CallAsync(). +func (conn *connection) Call(t *kernel.Task, r *Request) (*Response, error) { + // Block requests sent before connection is initalized. + if !conn.Initialized() { + if err := t.Block(conn.initializedChan); err != nil { + return nil, err + } + } + + return conn.call(t, r) +} + +// CallAsync makes an async (aka background) request. +// Those requests either do not expect a response (e.g. release) or +// the response should be handled by others (e.g. init). +// Return immediately unless the connection is blocked (before initialization). +// Async call example: init, release, forget, aio, interrupt. +// When the Request is FUSE_INIT, it will not be blocked before initialization. +func (conn *connection) CallAsync(t *kernel.Task, r *Request) error { + // Block requests sent before connection is initalized. + if !conn.Initialized() && r.hdr.Opcode != linux.FUSE_INIT { + if err := t.Block(conn.initializedChan); err != nil { + return err + } + } + + // This should be the only place that invokes call() with a nil task. + _, err := conn.call(nil, r) + return err +} + +// call makes a call without blocking checks. +func (conn *connection) call(t *kernel.Task, r *Request) (*Response, error) { + if !conn.connected { + return nil, syserror.ENOTCONN + } + + if conn.connInitError { + return nil, syserror.ECONNREFUSED + } + fut, err := conn.callFuture(t, r) if err != nil { return nil, err @@ -160,7 +342,7 @@ func (r *Response) UnmarshalPayload(m marshal.Marshallable) error { // callFuture makes a request to the server and returns a future response. // Call resolve() when the response needs to be fulfilled. -func (conn *Connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, error) { +func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, error) { conn.fd.mu.Lock() defer conn.fd.mu.Unlock() @@ -195,7 +377,7 @@ func (conn *Connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, } // callFutureLocked makes a request to the server and returns a future response. -func (conn *Connection) callFutureLocked(t *kernel.Task, r *Request) (*futureResponse, error) { +func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureResponse, error) { conn.fd.queue.PushBack(r) conn.fd.numActiveRequests += 1 fut := newFutureResponse(r.hdr.Opcode) -- cgit v1.2.3