diff options
-rw-r--r-- | pkg/sentry/fsimpl/fuse/BUILD | 6 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection.go | 11 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection_control.go | 62 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/connection_test.go | 127 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/dev.go | 51 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/dev_test.go | 4 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/fusefs.go | 12 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/utils_test.go | 111 | ||||
-rw-r--r-- | pkg/syserror/syserror.go | 1 |
9 files changed, 363 insertions, 22 deletions
diff --git a/pkg/sentry/fsimpl/fuse/BUILD b/pkg/sentry/fsimpl/fuse/BUILD index 23660a708..77025772c 100644 --- a/pkg/sentry/fsimpl/fuse/BUILD +++ b/pkg/sentry/fsimpl/fuse/BUILD @@ -66,7 +66,11 @@ go_library( go_test( name = "fuse_test", size = "small", - srcs = ["dev_test.go"], + srcs = [ + "connection_test.go", + "dev_test.go", + "utils_test.go", + ], library = ":fuse", deps = [ "//pkg/abi/linux", diff --git a/pkg/sentry/fsimpl/fuse/connection.go b/pkg/sentry/fsimpl/fuse/connection.go index 122b7d92f..6009cdf97 100644 --- a/pkg/sentry/fsimpl/fuse/connection.go +++ b/pkg/sentry/fsimpl/fuse/connection.go @@ -179,7 +179,6 @@ func newFUSEConnection(_ context.Context, fd *vfs.FileDescription, opts *filesys // Mark the device as ready so it can be used. /dev/fuse can only be used if the FD was used to // mount a FUSE filesystem. fuseFD := fd.Impl().(*DeviceFD) - fuseFD.mounted = true // Create the writeBuf for the header to be stored in. hdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) @@ -282,6 +281,16 @@ func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, // callFutureLocked makes a request to the server and returns a future response. func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureResponse, error) { + // Check connected again holding conn.mu. + conn.mu.Lock() + if !conn.connected { + conn.mu.Unlock() + // we checked connected before, + // this must be due to aborted connection. + return nil, syserror.ECONNABORTED + } + conn.mu.Unlock() + conn.fd.queue.PushBack(r) conn.fd.numActiveRequests++ fut := newFutureResponse(r) diff --git a/pkg/sentry/fsimpl/fuse/connection_control.go b/pkg/sentry/fsimpl/fuse/connection_control.go index d84d9caf2..a63c66e7c 100644 --- a/pkg/sentry/fsimpl/fuse/connection_control.go +++ b/pkg/sentry/fsimpl/fuse/connection_control.go @@ -16,8 +16,10 @@ package fuse import ( "sync/atomic" + "syscall" "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/kernel/auth" ) @@ -181,3 +183,63 @@ func (conn *connection) initProcessReply(out *linux.FUSEInitOut, hasSysAdminCap return nil } + +// Abort this FUSE connection. +// It tries to acquire conn.fd.mu, conn.lock, conn.bgLock in order. +// All possible requests waiting or blocking will be aborted. +func (conn *connection) Abort(ctx context.Context) { + conn.fd.mu.Lock() + conn.mu.Lock() + conn.asyncMu.Lock() + + if !conn.connected { + conn.asyncMu.Unlock() + conn.mu.Unlock() + conn.fd.mu.Unlock() + return + } + + conn.connected = false + + // Empty the `fd.queue` that holds the requests + // not yet read by the FUSE daemon yet. + // These are a subset of the requests in `fuse.completion` map. + for !conn.fd.queue.Empty() { + req := conn.fd.queue.Front() + conn.fd.queue.Remove(req) + } + + var terminate []linux.FUSEOpID + + // 2. Collect the requests have not been sent to FUSE daemon, + // or have not received a reply. + for unique := range conn.fd.completions { + terminate = append(terminate, unique) + } + + // Release all locks to avoid deadlock. + conn.asyncMu.Unlock() + conn.mu.Unlock() + conn.fd.mu.Unlock() + + // 1. The requets blocked before initialization. + // Will reach call() `connected` check and return. + if !conn.Initialized() { + conn.SetInitialized() + } + + // 2. Terminate the requests collected above. + // Set ECONNABORTED error. + // sendError() will remove them from `fd.completion` map. + // Will enter the path of a normally received error. + for _, toTerminate := range terminate { + conn.fd.sendError(ctx, -int32(syscall.ECONNABORTED), toTerminate) + } + + // 3. The requests not yet written to FUSE device. + // Early terminate. + // Will reach callFutureLocked() `connected` check and return. + close(conn.fd.fullQueueCh) + + // TODO(gvisor.dev/issue/3528): Forget all pending forget reqs. +} diff --git a/pkg/sentry/fsimpl/fuse/connection_test.go b/pkg/sentry/fsimpl/fuse/connection_test.go new file mode 100644 index 000000000..6aa77b36d --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/connection_test.go @@ -0,0 +1,127 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "math/rand" + "syscall" + "testing" + + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/syserror" +) + +// TestConnectionInitBlock tests if initialization +// correctly blocks and unblocks the connection. +// Since it's unfeasible to test kernelTask.Block() in unit test, +// the code in Call() are not tested here. +func TestConnectionInitBlock(t *testing.T) { + s := setup(t) + defer s.Destroy() + + k := kernel.KernelFromContext(s.Ctx) + + conn, _, err := newTestConnection(s, k, maxActiveRequestsDefault) + if err != nil { + t.Fatalf("newTestConnection: %v", err) + } + + select { + case <-conn.initializedChan: + t.Fatalf("initializedChan should be blocking before SetInitialized") + default: + } + + conn.SetInitialized() + + select { + case <-conn.initializedChan: + default: + t.Fatalf("initializedChan should not be blocking after SetInitialized") + } +} + +func TestConnectionAbort(t *testing.T) { + s := setup(t) + defer s.Destroy() + + k := kernel.KernelFromContext(s.Ctx) + creds := auth.CredentialsFromContext(s.Ctx) + task := kernel.TaskFromContext(s.Ctx) + + const maxActiveRequest uint64 = 10 + + conn, _, err := newTestConnection(s, k, maxActiveRequest) + if err != nil { + t.Fatalf("newTestConnection: %v", err) + } + + testObj := &testPayload{ + data: rand.Uint32(), + } + + var futNormal []*futureResponse + + for i := 0; i < 2*int(maxActiveRequest); i++ { + req, err := conn.NewRequest(creds, uint32(i), uint64(i), 0, testObj) + if err != nil { + t.Fatalf("NewRequest creation failed: %v", err) + } + if i < int(maxActiveRequest) { + // Issue the requests that will not be blocked due to maxActiveRequest. + fut, err := conn.callFutureLocked(task, req) + if err != nil { + t.Fatalf("callFutureLocked failed: %v", err) + } + futNormal = append(futNormal, fut) + } else { + go func(t *testing.T) { + // The requests beyond maxActiveRequest will be blocked and receive expected errors. + _, err := conn.callFutureLocked(task, req) + if err != syserror.ECONNABORTED && err != syserror.ENOTCONN { + t.Fatalf("Incorrect error code received for blocked callFutureLocked() on aborted connection") + } + }(t) + } + } + + conn.Abort(s.Ctx) + + // Abort should unblock the initialization channel. + select { + case <-conn.initializedChan: + default: + t.Fatalf("initializedChan should not be blocking after SetInitialized") + } + + // Abort will return ECONNABORTED error to unblocked requests. + for _, fut := range futNormal { + if fut.getResponse().hdr.Error != -int32(syscall.ECONNABORTED) { + t.Fatalf("Incorrect error code received for aborted connection") + } + } + + // After abort, Call() should return directly with ENOTCONN. + req, err := conn.NewRequest(creds, 0, 0, 0, testObj) + if err != nil { + t.Fatalf("NewRequest creation failed: %v", err) + } + _, err = conn.Call(task, req) + if err != syserror.ENOTCONN { + t.Fatalf("Incorrect error code received for Call() after connection aborted") + } + +} diff --git a/pkg/sentry/fsimpl/fuse/dev.go b/pkg/sentry/fsimpl/fuse/dev.go index 6f0b896cb..64c3e32e2 100644 --- a/pkg/sentry/fsimpl/fuse/dev.go +++ b/pkg/sentry/fsimpl/fuse/dev.go @@ -55,9 +55,6 @@ type DeviceFD struct { vfs.DentryMetadataFileDescriptionImpl vfs.NoLockFD - // mounted specifies whether a FUSE filesystem was mounted using the DeviceFD. - mounted bool - // nextOpID is used to create new requests. nextOpID linux.FUSEOpID @@ -99,13 +96,15 @@ type DeviceFD struct { // Release implements vfs.FileDescriptionImpl.Release. func (fd *DeviceFD) Release(context.Context) { - fd.fs.conn.connected = false + if fd.fs != nil { + fd.fs.conn.connected = false + } } // PRead implements vfs.FileDescriptionImpl.PRead. func (fd *DeviceFD) PRead(ctx context.Context, dst usermem.IOSequence, offset int64, opts vfs.ReadOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } @@ -115,10 +114,16 @@ func (fd *DeviceFD) PRead(ctx context.Context, dst usermem.IOSequence, offset in // Read implements vfs.FileDescriptionImpl.Read. func (fd *DeviceFD) Read(ctx context.Context, dst usermem.IOSequence, opts vfs.ReadOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } + // Return ENODEV if the filesystem is umounted. + if fd.fs.umounted { + // TODO(gvisor.dev/issue/3525): return ECONNABORTED if aborted via fuse control fs. + return 0, syserror.ENODEV + } + // We require that any Read done on this filesystem have a sane minimum // read buffer. It must have the capacity for the fixed parts of any request // header (Linux uses the request header and the FUSEWriteIn header for this @@ -165,7 +170,7 @@ func (fd *DeviceFD) readLocked(ctx context.Context, dst usermem.IOSequence, opts } // Return the error to the calling task. - if err := fd.sendError(ctx, errno, req); err != nil { + if err := fd.sendError(ctx, errno, req.hdr.Unique); err != nil { return 0, err } @@ -217,7 +222,7 @@ func (fd *DeviceFD) readLocked(ctx context.Context, dst usermem.IOSequence, opts // PWrite implements vfs.FileDescriptionImpl.PWrite. func (fd *DeviceFD) PWrite(ctx context.Context, src usermem.IOSequence, offset int64, opts vfs.WriteOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } @@ -234,10 +239,15 @@ func (fd *DeviceFD) Write(ctx context.Context, src usermem.IOSequence, opts vfs. // writeLocked implements writing to the fuse device while locked with DeviceFD.mu. func (fd *DeviceFD) writeLocked(ctx context.Context, src usermem.IOSequence, opts vfs.WriteOptions) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } + // Return ENODEV if the filesystem is umounted. + if fd.fs.umounted { + return 0, syserror.ENODEV + } + var cn, n int64 hdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) @@ -303,7 +313,8 @@ func (fd *DeviceFD) writeLocked(ctx context.Context, src usermem.IOSequence, opt // Currently we simply discard the reply for FUSE_RELEASE. return n + src.NumBytes(), nil } - // Server sent us a response for a request we never sent? + // Server sent us a response for a request we never sent, + // or for which we already received a reply (e.g. aborted), an unlikely event. return 0, syserror.EINVAL } @@ -343,7 +354,14 @@ func (fd *DeviceFD) Readiness(mask waiter.EventMask) waiter.EventMask { // locked with DeviceFD.mu. func (fd *DeviceFD) readinessLocked(mask waiter.EventMask) waiter.EventMask { var ready waiter.EventMask - ready |= waiter.EventOut // FD is always writable + + if fd.fs.umounted { + ready |= waiter.EventErr + return ready & mask + } + + // FD is always writable. + ready |= waiter.EventOut if !fd.queue.Empty() { // Have reqs available, FD is readable. ready |= waiter.EventIn @@ -365,7 +383,7 @@ func (fd *DeviceFD) EventUnregister(e *waiter.Entry) { // Seek implements vfs.FileDescriptionImpl.Seek. func (fd *DeviceFD) Seek(ctx context.Context, offset int64, whence int32) (int64, error) { // Operations on /dev/fuse don't make sense until a FUSE filesystem is mounted. - if !fd.mounted { + if fd.fs == nil { return 0, syserror.EPERM } @@ -391,19 +409,20 @@ func (fd *DeviceFD) sendResponse(ctx context.Context, fut *futureResponse) error return nil } -// sendError sends an error response to the waiting task (if any). -func (fd *DeviceFD) sendError(ctx context.Context, errno int32, req *Request) error { +// sendError sends an error response to the waiting task (if any) by calling sendResponse(). +func (fd *DeviceFD) sendError(ctx context.Context, errno int32, unique linux.FUSEOpID) error { // Return the error to the calling task. outHdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes()) respHdr := linux.FUSEHeaderOut{ Len: outHdrLen, Error: errno, - Unique: req.hdr.Unique, + Unique: unique, } fut, ok := fd.completions[respHdr.Unique] if !ok { - // Server sent us a response for a request we never sent? + // A response for a request we never sent, + // or for which we already received a reply (e.g. aborted). return syserror.EINVAL } delete(fd.completions, respHdr.Unique) diff --git a/pkg/sentry/fsimpl/fuse/dev_test.go b/pkg/sentry/fsimpl/fuse/dev_test.go index 936683454..3da8b469b 100644 --- a/pkg/sentry/fsimpl/fuse/dev_test.go +++ b/pkg/sentry/fsimpl/fuse/dev_test.go @@ -35,10 +35,6 @@ import ( // will simply echo the payload back with the appropriate headers. const echoTestOpcode linux.FUSEOpcode = 1000 -type testPayload struct { - data uint32 -} - // TestFUSECommunication tests that the communication layer between the Sentry and the // FUSE server daemon works as expected. func TestFUSECommunication(t *testing.T) { diff --git a/pkg/sentry/fsimpl/fuse/fusefs.go b/pkg/sentry/fsimpl/fuse/fusefs.go index 30725182a..f4ed73c12 100644 --- a/pkg/sentry/fsimpl/fuse/fusefs.go +++ b/pkg/sentry/fsimpl/fuse/fusefs.go @@ -29,6 +29,7 @@ import ( "gvisor.dev/gvisor/pkg/sentry/kernel/auth" "gvisor.dev/gvisor/pkg/sentry/vfs" "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/waiter" "gvisor.dev/gvisor/tools/go_marshal/marshal" ) @@ -82,6 +83,9 @@ type filesystem struct { // opts is the options the fusefs is initialized with. opts *filesystemOptions + + // umounted is true if filesystem.Release() has been called. + umounted bool } // Name implements vfs.FilesystemType.Name. @@ -221,6 +225,14 @@ func newFUSEFilesystem(ctx context.Context, devMinor uint32, opts *filesystemOpt // Release implements vfs.FilesystemImpl.Release. func (fs *filesystem) Release(ctx context.Context) { + fs.umounted = true + fs.conn.Abort(ctx) + + fs.conn.fd.mu.Lock() + // Notify all the waiters on this fd. + fs.conn.fd.waitQueue.Notify(waiter.EventIn) + fs.conn.fd.mu.Unlock() + fs.Filesystem.VFSFilesystem().VirtualFilesystem().PutAnonBlockDevMinor(fs.devMinor) fs.Filesystem.Release(ctx) } diff --git a/pkg/sentry/fsimpl/fuse/utils_test.go b/pkg/sentry/fsimpl/fuse/utils_test.go new file mode 100644 index 000000000..436322830 --- /dev/null +++ b/pkg/sentry/fsimpl/fuse/utils_test.go @@ -0,0 +1,111 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fuse + +import ( + "testing" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/sentry/fsimpl/testutil" + "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" + "gvisor.dev/gvisor/pkg/sentry/vfs" + "gvisor.dev/gvisor/pkg/usermem" + "gvisor.dev/gvisor/tools/go_marshal/marshal" +) + +func setup(t *testing.T) *testutil.System { + k, err := testutil.Boot() + if err != nil { + t.Fatalf("Error creating kernel: %v", err) + } + + ctx := k.SupervisorContext() + creds := auth.CredentialsFromContext(ctx) + + k.VFS().MustRegisterFilesystemType(Name, &FilesystemType{}, &vfs.RegisterFilesystemTypeOptions{ + AllowUserList: true, + AllowUserMount: true, + }) + + mntns, err := k.VFS().NewMountNamespace(ctx, creds, "", "tmpfs", &vfs.GetFilesystemOptions{}) + if err != nil { + t.Fatalf("NewMountNamespace(): %v", err) + } + + return testutil.NewSystem(ctx, t, k.VFS(), mntns) +} + +// newTestConnection creates a fuse connection that the sentry can communicate with +// and the FD for the server to communicate with. +func newTestConnection(system *testutil.System, k *kernel.Kernel, maxActiveRequests uint64) (*connection, *vfs.FileDescription, error) { + vfsObj := &vfs.VirtualFilesystem{} + fuseDev := &DeviceFD{} + + if err := vfsObj.Init(system.Ctx); err != nil { + return nil, nil, err + } + + vd := vfsObj.NewAnonVirtualDentry("genCountFD") + defer vd.DecRef(system.Ctx) + if err := fuseDev.vfsfd.Init(fuseDev, linux.O_RDWR|linux.O_CREAT, vd.Mount(), vd.Dentry(), &vfs.FileDescriptionOptions{}); err != nil { + return nil, nil, err + } + + fsopts := filesystemOptions{ + maxActiveRequests: maxActiveRequests, + } + fs, err := newFUSEFilesystem(system.Ctx, 0, &fsopts, &fuseDev.vfsfd) + if err != nil { + return nil, nil, err + } + + return fs.conn, &fuseDev.vfsfd, nil +} + +type testPayload struct { + marshal.StubMarshallable + data uint32 +} + +// SizeBytes implements marshal.Marshallable.SizeBytes. +func (t *testPayload) SizeBytes() int { + return 4 +} + +// MarshalBytes implements marshal.Marshallable.MarshalBytes. +func (t *testPayload) MarshalBytes(dst []byte) { + usermem.ByteOrder.PutUint32(dst[:4], t.data) +} + +// UnmarshalBytes implements marshal.Marshallable.UnmarshalBytes. +func (t *testPayload) UnmarshalBytes(src []byte) { + *t = testPayload{data: usermem.ByteOrder.Uint32(src[:4])} +} + +// Packed implements marshal.Marshallable.Packed. +func (t *testPayload) Packed() bool { + return true +} + +// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe. +func (t *testPayload) MarshalUnsafe(dst []byte) { + t.MarshalBytes(dst) +} + +// UnmarshalUnsafe implements marshal.Marshallable.UnmarshalUnsafe. +func (t *testPayload) UnmarshalUnsafe(src []byte) { + t.UnmarshalBytes(src) +} diff --git a/pkg/syserror/syserror.go b/pkg/syserror/syserror.go index fe9f50169..f516c8e46 100644 --- a/pkg/syserror/syserror.go +++ b/pkg/syserror/syserror.go @@ -33,6 +33,7 @@ var ( EBADFD = error(syscall.EBADFD) EBUSY = error(syscall.EBUSY) ECHILD = error(syscall.ECHILD) + ECONNABORTED = error(syscall.ECONNABORTED) ECONNREFUSED = error(syscall.ECONNREFUSED) ECONNRESET = error(syscall.ECONNRESET) EDEADLK = error(syscall.EDEADLK) |