diff options
author | Jinmou Li <jinmli@google.com> | 2020-09-09 16:22:33 -0700 |
---|---|---|
committer | Andrei Vagin <avagin@gmail.com> | 2020-09-16 12:19:30 -0700 |
commit | d459bb3372384a4d16fe0d9791847285449dc184 (patch) | |
tree | 28d820caae82a9f6dbd924c370a5390f24ce1c7d | |
parent | 4edc56d3e99b161f2f3311bc22a51012bf0a90ee (diff) |
Add FUSE umount support
This change implements Release for the FUSE filesystem
and expected behaviors of the FUSE devices.
It includes several checks for aborted connection
in the path for making a request and a function
to abort all the ongoing FUSE requests in order.
-rw-r--r-- | pkg/sentry/fsimpl/fuse/BUILD | 6 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection.go | 11 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection_control.go | 62 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection_test.go | 127 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/dev.go | 51 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/dev_test.go | 4 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/fusefs.go | 12 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/utils_test.go | 111 | ||||
-rw-r--r-- | pkg/syserror/syserror.go | 1 |
9 files changed, 363 insertions, 22 deletions
diff --git a/pkg/sentry/fsimpl/fuse/BUILD b/pkg/sentry/fsimpl/fuse/BUILD index 5b60b1277..045d7ab08 100644 --- a/pkg/sentry/fsimpl/fuse/BUILD +++ b/pkg/sentry/fsimpl/fuse/BUILD @@ -66,7 +66,11 @@ go_library( go_test( name = "fuse_test", size = "small", - srcs = ["dev_test.go"], + srcs = [ + "connection_test.go", + "dev_test.go", + "utils_test.go", + ], library = ":fuse", deps = [ "//pkg/abi/linux", diff --git a/pkg/sentry/fsimpl/fuse/connection.go b/pkg/sentry/fsimpl/fuse/connection.go index 72eac4179..5960d9368 100644 --- a/pkg/sentry/fsimpl/fuse/connection.go +++ b/pkg/sentry/fsimpl/fuse/connection.go @@ -180,7 +180,6 @@ func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, opts *filesys // Mark the device as ready so it can be used. /dev/fuse can only be used if the FD was used to // mount a FUSE filesystem. fuseFD := fd.Impl().(*DeviceFD) - fuseFD.mounted = true // Create the writeBuf for the header to be stored in. hdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) @@ -283,6 +282,16 @@ func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, // callFutureLocked makes a request to the server and returns a future response. func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureResponse, error) { + // Check connected again holding conn.mu. + conn.mu.Lock() + if !conn.connected { + conn.mu.Unlock() + // we checked connected before, + // this must be due to aborted connection. + return nil, syserror.ECONNABORTED + } + conn.mu.Unlock() + conn.fd.queue.PushBack(r) conn.fd.numActiveRequests++ fut := newFutureResponse(r) diff --git a/pkg/sentry/fsimpl/fuse/connection_control.go b/pkg/sentry/fsimpl/fuse/connection_control.go index d84d9caf2..a63c66e7c 100644 --- a/pkg/sentry/fsimpl/fuse/connection_control.go +++ b/pkg/sentry/fsimpl/fuse/connection_control.go @@ -16,8 +16,10 @@ package fuse import ( "sync/atomic" + "syscall" "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/kernel/auth" ) @@ -181,3 +183,63 @@ func (conn *connection) initProcessReply(out *linux.FUSEInitOut, hasSysAdminCap return nil } + +// Abort this FUSE connection. +// It tries to acquire conn.fd.mu, conn.lock, conn.bgLock in order. +// All possible requests waiting or blocking will be aborted. +func (conn *connection) Abort(ctx context.Context) { + conn.fd.mu.Lock() + conn.mu.Lock() + conn.asyncMu.Lock() + + if !conn.connected { + conn.asyncMu.Unlock() + conn.mu.Unlock() + conn.fd.mu.Unlock() + return + } + + conn.connected = false + + // Empty the `fd.queue` that holds the requests + // not yet read by the FUSE daemon yet. + // These are a subset of the requests in `fuse.completion` map. + for !conn.fd.queue.Empty() { + req := conn.fd.queue.Front() + conn.fd.queue.Remove(req) + } + + var terminate []linux.FUSEOpID + + // 2. Collect the requests have not been sent to FUSE daemon, + // or have not received a reply. + for unique := range conn.fd.completions { + terminate = append(terminate, unique) + } + + // Release all locks to avoid deadlock. + conn.asyncMu.Unlock() + conn.mu.Unlock() + conn.fd.mu.Unlock() + + // 1. The requets blocked before initialization. + // Will reach call() `connected` check and return. + if !conn.Initialized() { + conn.SetInitialized() + } + + // 2. Terminate the requests collected above. + // Set ECONNABORTED error. + // sendError() will remove them from `fd.completion` map. + // Will enter the path of a normally received error. + for _, toTerminate := range terminate { + conn.fd.sendError(ctx, -int32(syscall.ECONNABORTED), toTerminate) + } + + // 3. The requests not yet written to FUSE device. + // Early terminate. + // Will reach callFutureLocked() `connected` check and return. + close(conn.fd.fullQueueCh) + + // TODO(gvisor.dev/issue/3528): Forget all pending forget reqs. +} diff --git a/pkg/sentry/fsimpl/fuse/connection_test.go b/pkg/sentry/fsimpl/fuse/connection_test.go new file mode 100644 index 000000000..6aa77b36d --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/connection_test.go @@ -0,0 +1,127 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "math/rand" + "syscall" + "testing" + + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/syserror" +) + +// TestConnectionInitBlock tests if initialization +// correctly blocks and unblocks the connection. +// Since it's unfeasible to test kernelTask.Block() in unit test, +// the code in Call() are not tested here. +func TestConnectionInitBlock(t *testing.T) { + s := setup(t) + defer s.Destroy() + + k := kernel.KernelFromContext(s.Ctx) + + conn, _, err := newTestConnection(s, k, maxActiveRequestsDefault) + if err != nil { + t.Fatalf("newTestConnection: %v", err) + } + + select { + case <-conn.initializedChan: + t.Fatalf("initializedChan should be blocking before SetInitialized") + default: + } + + conn.SetInitialized() + + select { + case <-conn.initializedChan: + default: + t.Fatalf("initializedChan should not be blocking after SetInitialized") + } +} + +func TestConnectionAbort(t *testing.T) { + s := setup(t) + defer s.Destroy() + + k := kernel.KernelFromContext(s.Ctx) + creds := auth.CredentialsFromContext(s.Ctx) + task := kernel.TaskFromContext(s.Ctx) + + const maxActiveRequest uint64 = 10 + + conn, _, err := newTestConnection(s, k, maxActiveRequest) + if err != nil { + t.Fatalf("newTestConnection: %v", err) + } + + testObj := &testPayload{ + data: rand.Uint32(), + } + + var futNormal []*futureResponse + + for i := 0; i < 2*int(maxActiveRequest); i++ { + req, err := conn.NewRequest(creds, uint32(i), uint64(i), 0, testObj) + if err != nil { + t.Fatalf("NewRequest creation failed: %v", err) + } + if i < int(maxActiveRequest) { + // Issue the requests that will not be blocked due to maxActiveRequest. + fut, err := conn.callFutureLocked(task, req) + if err != nil { + t.Fatalf("callFutureLocked failed: %v", err) + } + futNormal = append(futNormal, fut) + } else { + go func(t *testing.T) { + // The requests beyond maxActiveRequest will be blocked and receive expected errors. + _, err := conn.callFutureLocked(task, req) + if err != syserror.ECONNABORTED && err != syserror.ENOTCONN { + t.Fatalf("Incorrect error code received for blocked callFutureLocked() on aborted connection") + } + }(t) + } + } + + conn.Abort(s.Ctx) + + // Abort should unblock the initialization channel. + select { + case <-conn.initializedChan: + default: + t.Fatalf("initializedChan should not be blocking after SetInitialized") + } + + // Abort will return ECONNABORTED error to unblocked requests. + for _, fut := range futNormal { + if fut.getResponse().hdr.Error != -int32(syscall.ECONNABORTED) { + t.Fatalf("Incorrect error code received for aborted connection") + } + } + + // After abort, Call() should return directly with ENOTCONN. + req, err := conn.NewRequest(creds, 0, 0, 0, testObj) + if err != nil { + t.Fatalf("NewRequest creation failed: %v", err) + } + _, err = conn.Call(task, req) + if err != syserror.ENOTCONN { + t.Fatalf("Incorrect error code received for Call() after connection aborted") + } + +} diff --git a/pkg/sentry/fsimpl/fuse/dev.go b/pkg/sentry/fsimpl/fuse/dev.go index 6f0b896cb..64c3e32e2 100644 --- a/pkg/sentry/fsimpl/fuse/dev.go +++ b/pkg/sentry/fsimpl/fuse/dev.go @@ -55,9 +55,6 @@ type DeviceFD struct { vfs.DentryMetadataFileDescriptionImpl vfs.NoLockFD - // mounted specifies whether a FUSE filesystem was mounted using the DeviceFD. - mounted bool - // nextOpID is used to create new requests. nextOpID linux.FUSEOpID @@ -99,13 +96,15 @@ type DeviceFD struct { // Release implements vfs.FileDescriptionImpl.Release. func (fd *DeviceFD) Release(context.Context) { - fd.fs.conn.connected = false + if fd.fs != nil { + fd.fs.conn.connected = false + } } // PRead implements vfs.FileDescriptionImpl.PRead. func (fd *DeviceFD) PRead(ctx context.Context, dst usermem.IOSequence, offset int64, opts vfs.ReadOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } @@ -115,10 +114,16 @@ func (fd *DeviceFD) PRead(ctx context.Context, dst usermem.IOSequence, offset in // Read implements vfs.FileDescriptionImpl.Read. func (fd *DeviceFD) Read(ctx context.Context, dst usermem.IOSequence, opts vfs.ReadOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } + // Return ENODEV if the filesystem is umounted. + if fd.fs.umounted { + // TODO(gvisor.dev/issue/3525): return ECONNABORTED if aborted via fuse control fs. + return 0, syserror.ENODEV + } + // We require that any Read done on this filesystem have a sane minimum // read buffer. It must have the capacity for the fixed parts of any request // header (Linux uses the request header and the FUSEWriteIn header for this @@ -165,7 +170,7 @@ func (fd *DeviceFD) readLocked(ctx context.Context, dst usermem.IOSequence, opts } // Return the error to the calling task. - if err := fd.sendError(ctx, errno, req); err != nil { + if err := fd.sendError(ctx, errno, req.hdr.Unique); err != nil { return 0, err } @@ -217,7 +222,7 @@ func (fd *DeviceFD) readLocked(ctx context.Context, dst usermem.IOSequence, opts // PWrite implements vfs.FileDescriptionImpl.PWrite. func (fd *DeviceFD) PWrite(ctx context.Context, src usermem.IOSequence, offset int64, opts vfs.WriteOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } @@ -234,10 +239,15 @@ func (fd *DeviceFD) Write(ctx context.Context, src usermem.IOSequence, opts vfs. // writeLocked implements writing to the fuse device while locked with DeviceFD.mu. func (fd *DeviceFD) writeLocked(ctx context.Context, src usermem.IOSequence, opts vfs.WriteOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } + // Return ENODEV if the filesystem is umounted. + if fd.fs.umounted { + return 0, syserror.ENODEV + } + var cn, n int64 hdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) @@ -303,7 +313,8 @@ func (fd *DeviceFD) writeLocked(ctx context.Context, src usermem.IOSequence, opt // Currently we simply discard the reply for FUSE_RELEASE. return n + src.NumBytes(), nil } - // Server sent us a response for a request we never sent? + // Server sent us a response for a request we never sent, + // or for which we already received a reply (e.g. aborted), an unlikely event. return 0, syserror.EINVAL } @@ -343,7 +354,14 @@ func (fd *DeviceFD) Readiness(mask waiter.EventMask) waiter.EventMask { // locked with DeviceFD.mu. func (fd *DeviceFD) readinessLocked(mask waiter.EventMask) waiter.EventMask { var ready waiter.EventMask - ready |= waiter.EventOut // FD is always writable + + if fd.fs.umounted { + ready |= waiter.EventErr + return ready & mask + } + + // FD is always writable. + ready |= waiter.EventOut if !fd.queue.Empty() { // Have reqs available, FD is readable. ready |= waiter.EventIn @@ -365,7 +383,7 @@ func (fd *DeviceFD) EventUnregister(e *waiter.Entry) { // Seek implements vfs.FileDescriptionImpl.Seek. func (fd *DeviceFD) Seek(ctx context.Context, offset int64, whence int32) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } @@ -391,19 +409,20 @@ func (fd *DeviceFD) sendResponse(ctx context.Context, fut *futureResponse) error return nil } -// sendError sends an error response to the waiting task (if any). -func (fd *DeviceFD) sendError(ctx context.Context, errno int32, req *Request) error { +// sendError sends an error response to the waiting task (if any) by calling sendResponse(). +func (fd *DeviceFD) sendError(ctx context.Context, errno int32, unique linux.FUSEOpID) error { // Return the error to the calling task. outHdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) respHdr := linux.FUSEHeaderOut{ Len: outHdrLen, Error: errno, - Unique: req.hdr.Unique, + Unique: unique, } fut, ok := fd.completions[respHdr.Unique] if !ok { - // Server sent us a response for a request we never sent? + // A response for a request we never sent, + // or for which we already received a reply (e.g. aborted). return syserror.EINVAL } delete(fd.completions, respHdr.Unique) diff --git a/pkg/sentry/fsimpl/fuse/dev_test.go b/pkg/sentry/fsimpl/fuse/dev_test.go index 59928d75e..5abdf82d3 100644 --- a/pkg/sentry/fsimpl/fuse/dev_test.go +++ b/pkg/sentry/fsimpl/fuse/dev_test.go @@ -35,10 +35,6 @@ import ( // will simply echo the payload back with the appropriate headers. const echoTestOpcode linux.FUSEOpcode = 1000 -type testPayload struct { - data uint32 -} - // TestFUSECommunication tests that the communication layer between the Sentry and the // FUSE server daemon works as expected. func TestFUSECommunication(t *testing.T) { diff --git a/pkg/sentry/fsimpl/fuse/fusefs.go b/pkg/sentry/fsimpl/fuse/fusefs.go index 30725182a..f4ed73c12 100644 --- a/pkg/sentry/fsimpl/fuse/fusefs.go +++ b/pkg/sentry/fsimpl/fuse/fusefs.go @@ -29,6 +29,7 @@ import ( "gvisor.dev/gvisor/pkg/sentry/kernel/auth" "gvisor.dev/gvisor/pkg/sentry/vfs" "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/waiter" "gvisor.dev/gvisor/tools/go_marshal/marshal" ) @@ -82,6 +83,9 @@ type filesystem struct { // opts is the options the fusefs is initialized with. opts *filesystemOptions + + // umounted is true if filesystem.Release() has been called. + umounted bool } // Name implements vfs.FilesystemType.Name. @@ -221,6 +225,14 @@ func newFUSEFilesystem(ctx context.Context, devMinor uint32, opts *filesystemOpt // Release implements vfs.FilesystemImpl.Release. func (fs *filesystem) Release(ctx context.Context) { + fs.umounted = true + fs.conn.Abort(ctx) + + fs.conn.fd.mu.Lock() + // Notify all the waiters on this fd. + fs.conn.fd.waitQueue.Notify(waiter.EventIn) + fs.conn.fd.mu.Unlock() + fs.Filesystem.VFSFilesystem().VirtualFilesystem().PutAnonBlockDevMinor(fs.devMinor) fs.Filesystem.Release(ctx) } diff --git a/pkg/sentry/fsimpl/fuse/utils_test.go b/pkg/sentry/fsimpl/fuse/utils_test.go new file mode 100644 index 000000000..436322830 --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/utils_test.go @@ -0,0 +1,111 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "testing" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/sentry/fsimpl/testutil" + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/sentry/vfs" + "gvisor.dev/gvisor/pkg/usermem" + "gvisor.dev/gvisor/tools/go_marshal/marshal" +) + +func setup(t *testing.T) *testutil.System { + k, err := testutil.Boot() + if err != nil { + t.Fatalf("Error creating kernel: %v", err) + } + + ctx := k.SupervisorContext() + creds := auth.CredentialsFromContext(ctx) + + k.VFS().MustRegisterFilesystemType(Name, &FilesystemType{}, &vfs.RegisterFilesystemTypeOptions{ + AllowUserList: true, + AllowUserMount: true, + }) + + mntns, err := k.VFS().NewMountNamespace(ctx, creds, "", "tmpfs", &vfs.GetFilesystemOptions{}) + if err != nil { + t.Fatalf("NewMountNamespace(): %v", err) + } + + return testutil.NewSystem(ctx, t, k.VFS(), mntns) +} + +// newTestConnection creates a fuse connection that the sentry can communicate with +// and the FD for the server to communicate with. +func newTestConnection(system *testutil.System, k *kernel.Kernel, maxActiveRequests uint64) (*connection, *vfs.FileDescription, error) { + vfsObj := &vfs.VirtualFilesystem{} + fuseDev := &DeviceFD{} + + if err := vfsObj.Init(system.Ctx); err != nil { + return nil, nil, err + } + + vd := vfsObj.NewAnonVirtualDentry("genCountFD") + defer vd.DecRef(system.Ctx) + if err := fuseDev.vfsfd.Init(fuseDev, linux.O_RDWR|linux.O_CREAT, vd.Mount(), vd.Dentry(), &vfs.FileDescriptionOptions{}); err != nil { + return nil, nil, err + } + + fsopts := filesystemOptions{ + maxActiveRequests: maxActiveRequests, + } + fs, err := newFUSEFilesystem(system.Ctx, 0, &fsopts, &fuseDev.vfsfd) + if err != nil { + return nil, nil, err + } + + return fs.conn, &fuseDev.vfsfd, nil +} + +type testPayload struct { + marshal.StubMarshallable + data uint32 +} + +// SizeBytes implements marshal.Marshallable.SizeBytes. +func (t *testPayload) SizeBytes() int { + return 4 +} + +// MarshalBytes implements marshal.Marshallable.MarshalBytes. +func (t *testPayload) MarshalBytes(dst []byte) { + usermem.ByteOrder.PutUint32(dst[:4], t.data) +} + +// UnmarshalBytes implements marshal.Marshallable.UnmarshalBytes. +func (t *testPayload) UnmarshalBytes(src []byte) { + *t = testPayload{data: usermem.ByteOrder.Uint32(src[:4])} +} + +// Packed implements marshal.Marshallable.Packed. +func (t *testPayload) Packed() bool { + return true +} + +// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe. +func (t *testPayload) MarshalUnsafe(dst []byte) { + t.MarshalBytes(dst) +} + +// UnmarshalUnsafe implements marshal.Marshallable.UnmarshalUnsafe. +func (t *testPayload) UnmarshalUnsafe(src []byte) { + t.UnmarshalBytes(src) +} diff --git a/pkg/syserror/syserror.go b/pkg/syserror/syserror.go index fe9f50169..f516c8e46 100644 --- a/pkg/syserror/syserror.go +++ b/pkg/syserror/syserror.go @@ -33,6 +33,7 @@ var ( EBADFD = error(syscall.EBADFD) EBUSY = error(syscall.EBUSY) ECHILD = error(syscall.ECHILD) + ECONNABORTED = error(syscall.ECONNABORTED) ECONNREFUSED = error(syscall.ECONNREFUSED) ECONNRESET = error(syscall.ECONNRESET) EDEADLK = error(syscall.EDEADLK) |