diff options
Diffstat (limited to 'pkg/sentry/fs/fdpipe')
-rw-r--r-- | pkg/sentry/fs/fdpipe/BUILD | 76 | ||||
-rw-r--r-- | pkg/sentry/fs/fdpipe/pipe.go | 167 | ||||
-rw-r--r-- | pkg/sentry/fs/fdpipe/pipe_opener.go | 193 | ||||
-rw-r--r-- | pkg/sentry/fs/fdpipe/pipe_opener_test.go | 522 | ||||
-rw-r--r-- | pkg/sentry/fs/fdpipe/pipe_state.go | 88 | ||||
-rw-r--r-- | pkg/sentry/fs/fdpipe/pipe_test.go | 489 |
6 files changed, 1535 insertions, 0 deletions
diff --git a/pkg/sentry/fs/fdpipe/BUILD b/pkg/sentry/fs/fdpipe/BUILD new file mode 100644 index 000000000..9e1f65d3e --- /dev/null +++ b/pkg/sentry/fs/fdpipe/BUILD @@ -0,0 +1,76 @@ +package(licenses = ["notice"]) # Apache 2.0 + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("//tools/go_stateify:defs.bzl", "go_stateify") + +go_stateify( + name = "pipe_state", + srcs = [ + "pipe.go", + "pipe_state.go", + ], + out = "pipe_autogen_state.go", + imports = ["gvisor.googlesource.com/gvisor/pkg/sentry/fs"], + package = "fdpipe", +) + +go_library( + name = "fdpipe", + srcs = [ + "pipe.go", + "pipe_autogen_state.go", + "pipe_opener.go", + "pipe_state.go", + ], + importpath = "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fdpipe", + visibility = ["//pkg/sentry:internal"], + deps = [ + "//pkg/abi/linux", + "//pkg/amutex", + "//pkg/fd", + "//pkg/log", + "//pkg/metric", + "//pkg/p9", + "//pkg/refs", + "//pkg/secio", + "//pkg/sentry/context", + "//pkg/sentry/device", + "//pkg/sentry/fs", + "//pkg/sentry/fs/fsutil", + "//pkg/sentry/fs/lock", + "//pkg/sentry/kernel/auth", + "//pkg/sentry/kernel/time", + "//pkg/sentry/memmap", + "//pkg/sentry/platform", + "//pkg/sentry/safemem", + "//pkg/sentry/uniqueid", + "//pkg/sentry/usermem", + "//pkg/state", + "//pkg/syserror", + "//pkg/tcpip", + "//pkg/tcpip/transport/unix", + "//pkg/unet", + "//pkg/waiter", + "//pkg/waiter/fdnotifier", + ], +) + +go_test( + name = "fdpipe_test", + size = "small", + srcs = [ + "pipe_opener_test.go", + "pipe_test.go", + ], + embed = [":fdpipe"], + deps = [ + "//pkg/fd", + "//pkg/sentry/context", + "//pkg/sentry/context/contexttest", + "//pkg/sentry/fs", + "//pkg/sentry/usermem", + "//pkg/syserror", + "//pkg/waiter/fdnotifier", + "@com_github_google_uuid//:go_default_library", + ], +) diff --git a/pkg/sentry/fs/fdpipe/pipe.go b/pkg/sentry/fs/fdpipe/pipe.go new file mode 100644 index 000000000..f7bbd4aff --- /dev/null +++ b/pkg/sentry/fs/fdpipe/pipe.go @@ -0,0 +1,167 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package fdpipe implements common namedpipe opening and accessing logic. +package fdpipe + +import ( + "os" + "sync" + "syscall" + + "gvisor.googlesource.com/gvisor/pkg/fd" + "gvisor.googlesource.com/gvisor/pkg/log" + "gvisor.googlesource.com/gvisor/pkg/secio" + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil" + "gvisor.googlesource.com/gvisor/pkg/sentry/safemem" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/syserror" + "gvisor.googlesource.com/gvisor/pkg/waiter" + "gvisor.googlesource.com/gvisor/pkg/waiter/fdnotifier" +) + +// pipeOperations are the fs.FileOperations of a host pipe. +type pipeOperations struct { + fsutil.PipeSeek `state:"nosave"` + fsutil.NotDirReaddir `state:"nosave"` + fsutil.NoFsync `state:"nosave"` + fsutil.NoopFlush `state:"nosave"` + fsutil.NoMMap `state:"nosave"` + fsutil.NoIoctl `state:"nosave"` + waiter.Queue `state:"nosave"` + + // flags are the flags used to open the pipe. + flags fs.FileFlags `state:".(fs.FileFlags)"` + + // opener is how the pipe was opened. + opener NonBlockingOpener `state:"wait"` + + // file represents the host pipe. + file *fd.FD `state:"nosave"` + + // mu protects readAheadBuffer access below. + mu sync.Mutex `state:"nosave"` + + // readAheadBuffer contains read bytes that have not yet been read + // by the application but need to be buffered for save-restore for correct + // opening semantics. The readAheadBuffer will only be non-empty when the + // is first opened and will be drained by subsequent reads on the pipe. + readAheadBuffer []byte +} + +// newPipeOperations returns an implementation of fs.FileOperations for a pipe. +func newPipeOperations(ctx context.Context, opener NonBlockingOpener, flags fs.FileFlags, file *fd.FD, readAheadBuffer []byte) (*pipeOperations, error) { + pipeOps := &pipeOperations{ + flags: flags, + opener: opener, + file: file, + readAheadBuffer: readAheadBuffer, + } + if err := pipeOps.init(); err != nil { + return nil, err + } + return pipeOps, nil +} + +// init initializes p.file. +func (p *pipeOperations) init() error { + var s syscall.Stat_t + if err := syscall.Fstat(p.file.FD(), &s); err != nil { + log.Warningf("pipe: cannot stat fd %d: %v", p.file.FD(), err) + return syscall.EINVAL + } + if s.Mode&syscall.S_IFIFO != syscall.S_IFIFO { + log.Warningf("pipe: cannot load fd %d as pipe, file type: %o", p.file.FD(), s.Mode) + return syscall.EINVAL + } + if err := syscall.SetNonblock(p.file.FD(), true); err != nil { + return err + } + if err := fdnotifier.AddFD(int32(p.file.FD()), &p.Queue); err != nil { + return err + } + return nil +} + +// EventRegister implements waiter.Waitable.EventRegister. +func (p *pipeOperations) EventRegister(e *waiter.Entry, mask waiter.EventMask) { + p.Queue.EventRegister(e, mask) + fdnotifier.UpdateFD(int32(p.file.FD())) +} + +// EventUnregister implements waiter.Waitable.EventUnregister. +func (p *pipeOperations) EventUnregister(e *waiter.Entry) { + p.Queue.EventUnregister(e) + fdnotifier.UpdateFD(int32(p.file.FD())) +} + +// Readiness returns a mask of ready events for stream. +func (p *pipeOperations) Readiness(mask waiter.EventMask) (eventMask waiter.EventMask) { + return fdnotifier.NonBlockingPoll(int32(p.file.FD()), mask) +} + +// Release implements fs.FileOperations.Release. +func (p *pipeOperations) Release() { + fdnotifier.RemoveFD(int32(p.file.FD())) + p.file.Close() + p.file = nil +} + +// Read implements fs.FileOperations.Read. +func (p *pipeOperations) Read(ctx context.Context, file *fs.File, dst usermem.IOSequence, offset int64) (int64, error) { + // Drain the read ahead buffer, if it contains anything first. + var bufN int + var bufErr error + p.mu.Lock() + if len(p.readAheadBuffer) > 0 { + bufN, bufErr = dst.CopyOut(ctx, p.readAheadBuffer) + p.readAheadBuffer = p.readAheadBuffer[bufN:] + dst = dst.DropFirst(bufN) + } + p.mu.Unlock() + if dst.NumBytes() == 0 || bufErr != nil { + return int64(bufN), bufErr + } + + // Pipes expect full reads. + n, err := dst.CopyOutFrom(ctx, safemem.FromIOReader{secio.FullReader{p.file}}) + total := int64(bufN) + n + if err != nil && isBlockError(err) { + return total, syserror.ErrWouldBlock + } + return total, err +} + +// Write implements fs.FileOperations.Write. +func (p *pipeOperations) Write(ctx context.Context, file *fs.File, src usermem.IOSequence, offset int64) (int64, error) { + n, err := src.CopyInTo(ctx, safemem.FromIOWriter{p.file}) + if err != nil && isBlockError(err) { + return n, syserror.ErrWouldBlock + } + return n, err +} + +// isBlockError unwraps os errors and checks if they are caused by EAGAIN or +// EWOULDBLOCK. This is so they can be transformed into syserror.ErrWouldBlock. +func isBlockError(err error) bool { + if err == syserror.EAGAIN || err == syserror.EWOULDBLOCK { + return true + } + if pe, ok := err.(*os.PathError); ok { + return isBlockError(pe.Err) + } + return false +} diff --git a/pkg/sentry/fs/fdpipe/pipe_opener.go b/pkg/sentry/fs/fdpipe/pipe_opener.go new file mode 100644 index 000000000..a0d59575f --- /dev/null +++ b/pkg/sentry/fs/fdpipe/pipe_opener.go @@ -0,0 +1,193 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fdpipe + +import ( + "io" + "os" + "syscall" + "time" + + "gvisor.googlesource.com/gvisor/pkg/fd" + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/syserror" +) + +// NonBlockingOpener is a generic host file opener used to retry opening host +// pipes if necessary. +type NonBlockingOpener interface { + // NonBlockingOpen tries to open a host pipe in a non-blocking way, + // and otherwise returns an error. Implementations should be idempotent. + NonBlockingOpen(context.Context, fs.PermMask) (*fd.FD, error) +} + +// Open blocks until a host pipe can be opened or the action was cancelled. +// On success, returns fs.FileOperations wrapping the opened host pipe. +func Open(ctx context.Context, opener NonBlockingOpener, flags fs.FileFlags) (fs.FileOperations, error) { + p := &pipeOpenState{} + canceled := false + for { + if file, err := p.TryOpen(ctx, opener, flags); err != syserror.ErrWouldBlock { + return file, err + } + + // Honor the cancellation request if open still blocks. + if canceled { + // If we were canceled but we have a handle to a host + // file, we need to close it. + if p.hostFile != nil { + p.hostFile.Close() + } + return nil, syserror.ErrInterrupted + } + + cancel := ctx.SleepStart() + select { + case <-cancel: + // The cancellation request received here really says + // "cancel from now on (or ASAP)". Any environmental + // changes happened before receiving it, that might have + // caused open to not block anymore, should still be + // respected. So we cannot just return here. We have to + // give open another try below first. + canceled = true + ctx.SleepFinish(false) + case <-time.After(100 * time.Millisecond): + // If we would block, then delay retrying for a bit, since there + // is no way to know when the pipe would be ready to be + // re-opened. This is identical to sending an event notification + // to stop blocking in Task.Block, given that this routine will + // stop retrying if a cancelation is received. + ctx.SleepFinish(true) + } + } +} + +// pipeOpenState holds state needed to open a blocking named pipe read only, for instance the +// file that has been opened but doesn't yet have a corresponding writer. +type pipeOpenState struct { + // hostFile is the read only named pipe which lacks a corresponding writer. + hostFile *fd.FD +} + +// unwrapError is needed to match against ENXIO primarily. +func unwrapError(err error) error { + if pe, ok := err.(*os.PathError); ok { + return pe.Err + } + return err +} + +// TryOpen uses a NonBlockingOpener to try to open a host pipe, respecting the fs.FileFlags. +func (p *pipeOpenState) TryOpen(ctx context.Context, opener NonBlockingOpener, flags fs.FileFlags) (*pipeOperations, error) { + switch { + // Reject invalid configurations so they don't accidently succeed below. + case !flags.Read && !flags.Write: + return nil, syscall.EINVAL + + // Handle opening RDWR or with O_NONBLOCK: will never block, so try only once. + case (flags.Read && flags.Write) || flags.NonBlocking: + f, err := opener.NonBlockingOpen(ctx, fs.PermMask{Read: flags.Read, Write: flags.Write}) + if err != nil { + return nil, err + } + return newPipeOperations(ctx, opener, flags, f, nil) + + // Handle opening O_WRONLY blocking: convert ENXIO to syserror.ErrWouldBlock. + // See TryOpenWriteOnly for more details. + case flags.Write: + return p.TryOpenWriteOnly(ctx, opener) + + default: + // Handle opening O_RDONLY blocking: convert EOF from read to syserror.ErrWouldBlock. + // See TryOpenReadOnly for more details. + return p.TryOpenReadOnly(ctx, opener) + } +} + +// TryOpenReadOnly tries to open a host pipe read only but only returns a fs.File when +// there is a coordinating writer. Call TryOpenReadOnly repeatedly on the same pipeOpenState +// until syserror.ErrWouldBlock is no longer returned. +// +// How it works: +// +// Opening a pipe read only will return no error, but each non zero Read will return EOF +// until a writer becomes available, then EWOULDBLOCK. This is the only state change +// available to us. We keep a read ahead buffer in case we read bytes instead of getting +// EWOULDBLOCK, to be read from on the first read request to this fs.File. +func (p *pipeOpenState) TryOpenReadOnly(ctx context.Context, opener NonBlockingOpener) (*pipeOperations, error) { + // Waiting for a blocking read only open involves reading from the host pipe until + // bytes or other writers are available, so instead of retrying opening the pipe, + // it's necessary to retry reading from the pipe. To do this we need to keep around + // the read only pipe we opened, until success or an irrecoverable read error (at + // which point it must be closed). + if p.hostFile == nil { + var err error + p.hostFile, err = opener.NonBlockingOpen(ctx, fs.PermMask{Read: true}) + if err != nil { + return nil, err + } + } + + // Try to read from the pipe to see if writers are around. + tryReadBuffer := make([]byte, 1) + n, rerr := p.hostFile.Read(tryReadBuffer) + + // No bytes were read. + if n == 0 { + // EOF means that we're not ready yet. + if rerr == nil || rerr == io.EOF { + return nil, syserror.ErrWouldBlock + } + // Any error that is not EWOULDBLOCK also means we're not + // ready yet, and probably never will be ready. In this + // case we need to close the host pipe we opened. + if unwrapError(rerr) != syscall.EWOULDBLOCK { + p.hostFile.Close() + return nil, rerr + } + } + + // If any bytes were read, no matter the corresponding error, we need + // to keep them around so they can be read by the application. + var readAheadBuffer []byte + if n > 0 { + readAheadBuffer = tryReadBuffer + } + + // Successfully opened read only blocking pipe with either bytes available + // to read and/or a writer available. + return newPipeOperations(ctx, opener, fs.FileFlags{Read: true}, p.hostFile, readAheadBuffer) +} + +// TryOpenWriteOnly tries to open a host pipe write only but only returns a fs.File when +// there is a coordinating reader. Call TryOpenWriteOnly repeatedly on the same pipeOpenState +// until syserror.ErrWouldBlock is no longer returned. +// +// How it works: +// +// Opening a pipe write only will return ENXIO until readers are available. Converts the ENXIO +// to an syserror.ErrWouldBlock, to tell callers to retry. +func (*pipeOpenState) TryOpenWriteOnly(ctx context.Context, opener NonBlockingOpener) (*pipeOperations, error) { + hostFile, err := opener.NonBlockingOpen(ctx, fs.PermMask{Write: true}) + if unwrapError(err) == syscall.ENXIO { + return nil, syserror.ErrWouldBlock + } + if err != nil { + return nil, err + } + return newPipeOperations(ctx, opener, fs.FileFlags{Write: true}, hostFile, nil) +} diff --git a/pkg/sentry/fs/fdpipe/pipe_opener_test.go b/pkg/sentry/fs/fdpipe/pipe_opener_test.go new file mode 100644 index 000000000..83f6c1986 --- /dev/null +++ b/pkg/sentry/fs/fdpipe/pipe_opener_test.go @@ -0,0 +1,522 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fdpipe + +import ( + "bytes" + "fmt" + "io" + "os" + "path" + "syscall" + "testing" + "time" + + "github.com/google/uuid" + "gvisor.googlesource.com/gvisor/pkg/fd" + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/context/contexttest" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/syserror" +) + +type hostOpener struct { + name string +} + +func (h *hostOpener) NonBlockingOpen(_ context.Context, p fs.PermMask) (*fd.FD, error) { + var flags int + switch { + case p.Read && p.Write: + flags = syscall.O_RDWR + case p.Write: + flags = syscall.O_WRONLY + case p.Read: + flags = syscall.O_RDONLY + default: + return nil, syscall.EINVAL + } + f, err := syscall.Open(h.name, flags|syscall.O_NONBLOCK, 0666) + if err != nil { + return nil, err + } + return fd.New(f), nil +} + +func pipename() string { + return fmt.Sprintf(path.Join(os.TempDir(), "test-named-pipe-%s"), uuid.New()) +} + +func mkpipe(name string) error { + return syscall.Mknod(name, syscall.S_IFIFO|0666, 0) +} + +func TestTryOpen(t *testing.T) { + for _, test := range []struct { + // desc is the test's description. + desc string + + // makePipe is true if the test case should create the pipe. + makePipe bool + + // flags are the fs.FileFlags used to open the pipe. + flags fs.FileFlags + + // expectFile is true if a fs.File is expected. + expectFile bool + + // err is the expected error + err error + }{ + { + desc: "FileFlags lacking Read and Write are invalid", + makePipe: false, + flags: fs.FileFlags{}, /* bogus */ + expectFile: false, + err: syscall.EINVAL, + }, + { + desc: "NonBlocking Read only error returns immediately", + makePipe: false, /* causes the error */ + flags: fs.FileFlags{Read: true, NonBlocking: true}, + expectFile: false, + err: syscall.ENOENT, + }, + { + desc: "NonBlocking Read only success returns immediately", + makePipe: true, + flags: fs.FileFlags{Read: true, NonBlocking: true}, + expectFile: true, + err: nil, + }, + { + desc: "NonBlocking Write only error returns immediately", + makePipe: false, /* causes the error */ + flags: fs.FileFlags{Write: true, NonBlocking: true}, + expectFile: false, + err: syscall.ENOENT, + }, + { + desc: "NonBlocking Write only no reader error returns immediately", + makePipe: true, + flags: fs.FileFlags{Write: true, NonBlocking: true}, + expectFile: false, + err: syscall.ENXIO, + }, + { + desc: "ReadWrite error returns immediately", + makePipe: false, /* causes the error */ + flags: fs.FileFlags{Read: true, Write: true}, + expectFile: false, + err: syscall.ENOENT, + }, + { + desc: "ReadWrite returns immediately", + makePipe: true, + flags: fs.FileFlags{Read: true, Write: true}, + expectFile: true, + err: nil, + }, + { + desc: "Blocking Write only returns open error", + makePipe: false, /* causes the error */ + flags: fs.FileFlags{Write: true}, + expectFile: false, + err: syscall.ENOENT, /* from bogus perms */ + }, + { + desc: "Blocking Read only returns open error", + makePipe: false, /* causes the error */ + flags: fs.FileFlags{Read: true}, + expectFile: false, + err: syscall.ENOENT, + }, + { + desc: "Blocking Write only returns with syserror.ErrWouldBlock", + makePipe: true, + flags: fs.FileFlags{Write: true}, + expectFile: false, + err: syserror.ErrWouldBlock, + }, + { + desc: "Blocking Read only returns with syserror.ErrWouldBlock", + makePipe: true, + flags: fs.FileFlags{Read: true}, + expectFile: false, + err: syserror.ErrWouldBlock, + }, + } { + name := pipename() + if test.makePipe { + // Create the pipe. We do this per-test case to keep tests independent. + if err := mkpipe(name); err != nil { + t.Errorf("%s: failed to make host pipe: %v", test.desc, err) + continue + } + defer syscall.Unlink(name) + } + + // Use a host opener to keep things simple. + opener := &hostOpener{name: name} + + pipeOpenState := &pipeOpenState{} + ctx := contexttest.Context(t) + pipeOps, err := pipeOpenState.TryOpen(ctx, opener, test.flags) + if unwrapError(err) != test.err { + t.Errorf("%s: got error %v, want %v", test.desc, err, test.err) + if pipeOps != nil { + // Cleanup the state of the pipe, and remove the fd from the + // fdnotifier. Sadly this needed to maintain the correctness + // of other tests because the fdnotifier is global. + pipeOps.Release() + } + continue + } + if (pipeOps != nil) != test.expectFile { + t.Errorf("%s: got non-nil file %v, want %v", test.desc, pipeOps != nil, test.expectFile) + } + if pipeOps != nil { + // Same as above. + pipeOps.Release() + } + } +} + +func TestPipeOpenUnblocksEventually(t *testing.T) { + for _, test := range []struct { + // desc is the test's description. + desc string + + // partnerIsReader is true if the goroutine opening the same pipe as the test case + // should open the pipe read only. Otherwise write only. This also means that the + // test case will open the pipe in the opposite way. + partnerIsReader bool + + // partnerIsBlocking is true if the goroutine opening the same pipe as the test case + // should do so without the O_NONBLOCK flag, otherwise opens the pipe with O_NONBLOCK + // until ENXIO is not returned. + partnerIsBlocking bool + }{ + { + desc: "Blocking Read with blocking writer partner opens eventually", + partnerIsReader: false, + partnerIsBlocking: true, + }, + { + desc: "Blocking Write with blocking reader partner opens eventually", + partnerIsReader: true, + partnerIsBlocking: true, + }, + { + desc: "Blocking Read with non-blocking writer partner opens eventually", + partnerIsReader: false, + partnerIsBlocking: false, + }, + { + desc: "Blocking Write with non-blocking reader partner opens eventually", + partnerIsReader: true, + partnerIsBlocking: false, + }, + } { + // Create the pipe. We do this per-test case to keep tests independent. + name := pipename() + if err := mkpipe(name); err != nil { + t.Errorf("%s: failed to make host pipe: %v", test.desc, err) + continue + } + defer syscall.Unlink(name) + + // Spawn the partner. + type fderr struct { + fd int + err error + } + errch := make(chan fderr, 1) + go func() { + var flags int + if test.partnerIsReader { + flags = syscall.O_RDONLY + } else { + flags = syscall.O_WRONLY + } + if test.partnerIsBlocking { + fd, err := syscall.Open(name, flags, 0666) + errch <- fderr{fd: fd, err: err} + } else { + var fd int + err := error(syscall.ENXIO) + for err == syscall.ENXIO { + fd, err = syscall.Open(name, flags|syscall.O_NONBLOCK, 0666) + time.Sleep(1 * time.Second) + } + errch <- fderr{fd: fd, err: err} + } + }() + + // Setup file flags for either a read only or write only open. + flags := fs.FileFlags{ + Read: !test.partnerIsReader, + Write: test.partnerIsReader, + } + + // Open the pipe in a blocking way, which should succeed eventually. + opener := &hostOpener{name: name} + ctx := contexttest.Context(t) + pipeOps, err := Open(ctx, opener, flags) + if pipeOps != nil { + // Same as TestTryOpen. + pipeOps.Release() + } + + // Check that the partner opened the file successfully. + e := <-errch + if e.err != nil { + t.Errorf("%s: partner got error %v, wanted nil", test.desc, e.err) + continue + } + // If so, then close the partner fd to avoid leaking an fd. + syscall.Close(e.fd) + + // Check that our blocking open was successful. + if err != nil { + t.Errorf("%s: blocking open got error %v, wanted nil", test.desc, err) + continue + } + if pipeOps == nil { + t.Errorf("%s: blocking open got nil file, wanted non-nil", test.desc) + continue + } + } +} + +func TestCopiedReadAheadBuffer(t *testing.T) { + // Create the pipe. + name := pipename() + if err := mkpipe(name); err != nil { + t.Fatalf("failed to make host pipe: %v", err) + } + defer syscall.Unlink(name) + + // We're taking advantage of the fact that pipes opened read only always return + // success, but internally they are not deemed "opened" until we're sure that + // another writer comes along. This means we can open the same pipe write only + // with no problems + write to it, given that opener.Open already tried to open + // the pipe RDONLY and succeeded, which we know happened if TryOpen returns + // syserror.ErrwouldBlock. + // + // This simulates the open(RDONLY) <-> open(WRONLY)+write race we care about, but + // does not cause our test to be racy (which would be terrible). + opener := &hostOpener{name: name} + pipeOpenState := &pipeOpenState{} + ctx := contexttest.Context(t) + pipeOps, err := pipeOpenState.TryOpen(ctx, opener, fs.FileFlags{Read: true}) + if pipeOps != nil { + pipeOps.Release() + t.Fatalf("open(%s, %o) got file, want nil", name, syscall.O_RDONLY) + } + if err != syserror.ErrWouldBlock { + t.Fatalf("open(%s, %o) got error %v, want %v", name, syscall.O_RDONLY, err, syserror.ErrWouldBlock) + } + + // Then open the same pipe write only and write some bytes to it. The next + // time we try to open the pipe read only again via the pipeOpenState, we should + // succeed and buffer some of the bytes written. + fd, err := syscall.Open(name, syscall.O_WRONLY, 0666) + if err != nil { + t.Fatalf("open(%s, %o) got error %v, want nil", name, syscall.O_WRONLY, err) + } + defer syscall.Close(fd) + + data := []byte("hello") + if n, err := syscall.Write(fd, data); n != len(data) || err != nil { + t.Fatalf("write(%v) got (%d, %v), want (%d, nil)", data, n, err, len(data)) + } + + // Try the read again, knowing that it should succeed this time. + pipeOps, err = pipeOpenState.TryOpen(ctx, opener, fs.FileFlags{Read: true}) + if pipeOps == nil { + t.Fatalf("open(%s, %o) got nil file, want not nil", name, syscall.O_RDONLY) + } + defer pipeOps.Release() + + if err != nil { + t.Fatalf("open(%s, %o) got error %v, want nil", name, syscall.O_RDONLY, err) + } + + inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{ + Type: fs.Pipe, + }) + file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, pipeOps) + + // Check that the file we opened points to a pipe with a non-empty read ahead buffer. + bufsize := len(pipeOps.readAheadBuffer) + if bufsize != 1 { + t.Fatalf("read ahead buffer got %d bytes, want %d", bufsize, 1) + } + + // Now for the final test, try to read everything in, expecting to get back all of + // the bytes that were written at once. Note that in the wild there is no atomic + // read size so expecting to get all bytes from a single writer when there are + // multiple readers is a bad expectation. + buf := make([]byte, len(data)) + ioseq := usermem.BytesIOSequence(buf) + n, err := pipeOps.Read(ctx, file, ioseq, 0) + if err != nil { + t.Fatalf("read request got error %v, want nil", err) + } + if n != int64(len(data)) { + t.Fatalf("read request got %d bytes, want %d", n, len(data)) + } + if !bytes.Equal(buf, data) { + t.Errorf("read request got bytes [%v], want [%v]", buf, data) + } +} + +func TestPipeHangup(t *testing.T) { + for _, test := range []struct { + // desc is the test's description. + desc string + + // flags control how we open our end of the pipe and must be read + // only or write only. They also dicate how a coordinating partner + // fd is opened, which is their inverse (read only -> write only, etc). + flags fs.FileFlags + + // hangupSelf if true causes the test case to close our end of the pipe + // and causes hangup errors to be asserted on our coordinating partner's + // fd. If hangupSelf is false, then our partner's fd is closed and the + // hangup errors are expected on our end of the pipe. + hangupSelf bool + }{ + { + desc: "Read only gets hangup error", + flags: fs.FileFlags{Read: true}, + }, + { + desc: "Write only gets hangup error", + flags: fs.FileFlags{Write: true}, + }, + { + desc: "Read only generates hangup error", + flags: fs.FileFlags{Read: true}, + hangupSelf: true, + }, + { + desc: "Write only generates hangup error", + flags: fs.FileFlags{Write: true}, + hangupSelf: true, + }, + } { + if test.flags.Read == test.flags.Write { + t.Errorf("%s: test requires a single reader or writer", test.desc) + continue + } + + // Create the pipe. We do this per-test case to keep tests independent. + name := pipename() + if err := mkpipe(name); err != nil { + t.Errorf("%s: failed to make host pipe: %v", test.desc, err) + continue + } + defer syscall.Unlink(name) + + // Fire off a partner routine which tries to open the same pipe blocking, + // which will synchronize with us. The channel allows us to get back the + // fd once we expect this partner routine to succeed, so we can manifest + // hangup events more directly. + fdchan := make(chan int, 1) + go func() { + // Be explicit about the flags to protect the test from + // misconfiguration. + var flags int + if test.flags.Read { + flags = syscall.O_WRONLY + } else { + flags = syscall.O_RDONLY + } + fd, err := syscall.Open(name, flags, 0666) + if err != nil { + t.Logf("Open(%q, %o, 0666) partner failed: %v", name, flags, err) + } + fdchan <- fd + }() + + // Open our end in a blocking way to ensure that we coordinate. + opener := &hostOpener{name: name} + ctx := contexttest.Context(t) + pipeOps, err := Open(ctx, opener, test.flags) + if err != nil { + t.Errorf("%s: Open got error %v, want nil", test.desc, err) + continue + } + // Don't defer file.DecRef here because that causes the hangup we're + // trying to test for. + + // Expect the partner routine to have coordinated with us and get back + // its open fd. + f := <-fdchan + if f < 0 { + t.Errorf("%s: partner routine got fd %d, want > 0", test.desc, f) + pipeOps.Release() + continue + } + + if test.hangupSelf { + // Hangup self and assert that our partner got the expected hangup + // error. + pipeOps.Release() + + if test.flags.Read { + // Partner is writer. + assertWriterHungup(t, test.desc, fd.NewReadWriter(f)) + } else { + // Partner is reader. + assertReaderHungup(t, test.desc, fd.NewReadWriter(f)) + } + } else { + // Hangup our partner and expect us to get the hangup error. + syscall.Close(f) + defer pipeOps.Release() + + if test.flags.Read { + assertReaderHungup(t, test.desc, pipeOps.(*pipeOperations).file) + } else { + assertWriterHungup(t, test.desc, pipeOps.(*pipeOperations).file) + } + } + } +} + +func assertReaderHungup(t *testing.T, desc string, reader io.Reader) bool { + // Drain the pipe completely, it might have crap in it, but expect EOF eventually. + var err error + for err == nil { + _, err = reader.Read(make([]byte, 10)) + } + if err != io.EOF { + t.Errorf("%s: read from self after hangup got error %v, want %v", desc, err, io.EOF) + return false + } + return true +} + +func assertWriterHungup(t *testing.T, desc string, writer io.Writer) bool { + if _, err := writer.Write([]byte("hello")); unwrapError(err) != syscall.EPIPE { + t.Errorf("%s: write to self after hangup got error %v, want %v", desc, err, syscall.EPIPE) + return false + } + return true +} diff --git a/pkg/sentry/fs/fdpipe/pipe_state.go b/pkg/sentry/fs/fdpipe/pipe_state.go new file mode 100644 index 000000000..8996a2178 --- /dev/null +++ b/pkg/sentry/fs/fdpipe/pipe_state.go @@ -0,0 +1,88 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fdpipe + +import ( + "fmt" + "io/ioutil" + "sync" + + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" +) + +// beforeSave is invoked by stateify. +func (p *pipeOperations) beforeSave() { + if p.flags.Read { + data, err := ioutil.ReadAll(p.file) + if err != nil && !isBlockError(err) { + panic(fmt.Sprintf("failed to read from pipe: %v", err)) + } + p.readAheadBuffer = append(p.readAheadBuffer, data...) + } else if p.flags.Write { + file, err := p.opener.NonBlockingOpen(context.Background(), fs.PermMask{Write: true}) + if err != nil { + panic(fs.ErrSaveRejection{fmt.Errorf("write-only pipe end cannot be re-opened as %v: %v", p, err)}) + } + file.Close() + } +} + +// saveFlags is invoked by stateify. +func (p *pipeOperations) saveFlags() fs.FileFlags { + return p.flags +} + +// readPipeOperationsLoading is used to ensure that write-only pipe fds are +// opened after read/write and read-only pipe fds, to avoid ENXIO when +// multiple pipe fds refer to different ends of the same pipe. +var readPipeOperationsLoading sync.WaitGroup + +// loadFlags is invoked by stateify. +func (p *pipeOperations) loadFlags(flags fs.FileFlags) { + // This is a hack to ensure that readPipeOperationsLoading includes all + // readable pipe fds before any asynchronous calls to + // readPipeOperationsLoading.Wait(). + if flags.Read { + readPipeOperationsLoading.Add(1) + } + p.flags = flags +} + +// afterLoad is invoked by stateify. +func (p *pipeOperations) afterLoad() { + load := func() { + if !p.flags.Read { + readPipeOperationsLoading.Wait() + } else { + defer readPipeOperationsLoading.Done() + } + var err error + p.file, err = p.opener.NonBlockingOpen(context.Background(), fs.PermMask{ + Read: p.flags.Read, + Write: p.flags.Write, + }) + if err != nil { + panic(fmt.Sprintf("unable to open pipe %v: %v", p, err)) + } + if err := p.init(); err != nil { + panic(fmt.Sprintf("unable to initialize pipe %v: %v", p, err)) + } + } + + // Do background opening of pipe ends. Note for write-only pipe ends we + // have to do it asynchronously to avoid blocking the restore. + fs.Async(load) +} diff --git a/pkg/sentry/fs/fdpipe/pipe_test.go b/pkg/sentry/fs/fdpipe/pipe_test.go new file mode 100644 index 000000000..6cd314f5b --- /dev/null +++ b/pkg/sentry/fs/fdpipe/pipe_test.go @@ -0,0 +1,489 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fdpipe + +import ( + "bytes" + "io" + "os" + "syscall" + "testing" + + "gvisor.googlesource.com/gvisor/pkg/fd" + "gvisor.googlesource.com/gvisor/pkg/sentry/context/contexttest" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/syserror" + "gvisor.googlesource.com/gvisor/pkg/waiter/fdnotifier" +) + +func singlePipeFD() (int, error) { + fds := make([]int, 2) + if err := syscall.Pipe(fds); err != nil { + return -1, err + } + syscall.Close(fds[1]) + return fds[0], nil +} + +func singleDirFD() (int, error) { + return syscall.Open(os.TempDir(), syscall.O_RDONLY, 0666) +} + +func mockPipeDirent(t *testing.T) *fs.Dirent { + ctx := contexttest.Context(t) + node := fs.NewMockInodeOperations(ctx) + node.UAttr = fs.UnstableAttr{ + Perms: fs.FilePermissions{ + User: fs.PermMask{Read: true, Write: true}, + }, + } + inode := fs.NewInode(node, fs.NewMockMountSource(nil), fs.StableAttr{ + Type: fs.Pipe, + BlockSize: usermem.PageSize, + }) + return fs.NewDirent(inode, "") +} + +func TestNewPipe(t *testing.T) { + for _, test := range []struct { + // desc is the test's description. + desc string + + // getfd generates the fd to pass to newPipeOperations. + getfd func() (int, error) + + // flags are the fs.FileFlags passed to newPipeOperations. + flags fs.FileFlags + + // readAheadBuffer is the buffer passed to newPipeOperations. + readAheadBuffer []byte + + // err is the expected error. + err error + }{ + { + desc: "Cannot make new pipe from bad fd", + getfd: func() (int, error) { return -1, nil }, + err: syscall.EINVAL, + }, + { + desc: "Cannot make new pipe from non-pipe fd", + getfd: singleDirFD, + err: syscall.EINVAL, + }, + { + desc: "Can make new pipe from pipe fd", + getfd: singlePipeFD, + flags: fs.FileFlags{Read: true}, + readAheadBuffer: []byte("hello"), + }, + } { + gfd, err := test.getfd() + if err != nil { + t.Errorf("%s: getfd got (%d, %v), want (fd, nil)", test.desc, gfd, err) + continue + } + f := fd.New(gfd) + + p, err := newPipeOperations(contexttest.Context(t), nil, test.flags, f, test.readAheadBuffer) + if p != nil { + // This is necessary to remove the fd from the global fd notifier. + defer p.Release() + } else { + // If there is no p to DecRef on, because newPipeOperations failed, then the + // file still needs to be closed. + defer f.Close() + } + + if err != test.err { + t.Errorf("%s: got error %v, want %v", test.desc, err, test.err) + continue + } + // Check the state of the pipe given that it was successfully opened. + if err == nil { + if p == nil { + t.Errorf("%s: got nil pipe and nil error, want (pipe, nil)", test.desc) + continue + } + if flags := p.flags; test.flags != flags { + t.Errorf("%s: got file flags %s, want %s", test.desc, flags, test.flags) + continue + } + if len(test.readAheadBuffer) != len(p.readAheadBuffer) { + t.Errorf("%s: got read ahead buffer length %d, want %d", test.desc, len(p.readAheadBuffer), len(test.readAheadBuffer)) + continue + } + fileFlags, _, errno := syscall.Syscall(syscall.SYS_FCNTL, uintptr(p.file.FD()), syscall.F_GETFL, 0) + if errno != 0 { + t.Errorf("%s: failed to get file flags for fd %d, got %v, want 0", test.desc, p.file.FD(), errno) + continue + } + if fileFlags&syscall.O_NONBLOCK == 0 { + t.Errorf("%s: pipe is blocking, expected non-blocking", test.desc) + continue + } + if !fdnotifier.HasFD(int32(f.FD())) { + t.Errorf("%s: pipe fd %d is not registered for events", test.desc, f.FD) + } + } + } +} + +func TestPipeDestruction(t *testing.T) { + fds := make([]int, 2) + if err := syscall.Pipe(fds); err != nil { + t.Fatalf("failed to create pipes: got %v, want nil", err) + } + f := fd.New(fds[0]) + + // We don't care about the other end, just use the read end. + syscall.Close(fds[1]) + + // Test the read end, but it doesn't really matter which. + p, err := newPipeOperations(contexttest.Context(t), nil, fs.FileFlags{Read: true}, f, nil) + if err != nil { + f.Close() + t.Fatalf("newPipeOperations got error %v, want nil", err) + } + // Drop our only reference, which should trigger the destructor. + p.Release() + + if fdnotifier.HasFD(int32(fds[0])) { + t.Fatalf("after DecRef fdnotifier has fd %d, want no longer registered", fds[0]) + } + if p.file != nil { + t.Errorf("after DecRef got file, want nil") + } +} + +type Seek struct{} + +type ReadDir struct{} + +type Writev struct { + Src usermem.IOSequence +} + +type Readv struct { + Dst usermem.IOSequence +} + +type Fsync struct{} + +func TestPipeRequest(t *testing.T) { + for _, test := range []struct { + // desc is the test's description. + desc string + + // request to execute. + context interface{} + + // flags determines whether to use the read or write end + // of the pipe, for this test it can only be Read or Write. + flags fs.FileFlags + + // keepOpenPartner if false closes the other end of the pipe, + // otherwise this is delayed until the end of the test. + keepOpenPartner bool + + // expected error + err error + }{ + { + desc: "ReadDir on pipe returns ENOTDIR", + context: &ReadDir{}, + err: syscall.ENOTDIR, + }, + { + desc: "Fsync on pipe returns EINVAL", + context: &Fsync{}, + err: syscall.EINVAL, + }, + { + desc: "Seek on pipe returns ESPIPE", + context: &Seek{}, + err: syscall.ESPIPE, + }, + { + desc: "Readv on pipe from empty buffer returns nil", + context: &Readv{Dst: usermem.BytesIOSequence(nil)}, + flags: fs.FileFlags{Read: true}, + }, + { + desc: "Readv on pipe from non-empty buffer and closed partner returns EOF", + context: &Readv{Dst: usermem.BytesIOSequence(make([]byte, 10))}, + flags: fs.FileFlags{Read: true}, + err: io.EOF, + }, + { + desc: "Readv on pipe from non-empty buffer and open partner returns EWOULDBLOCK", + context: &Readv{Dst: usermem.BytesIOSequence(make([]byte, 10))}, + flags: fs.FileFlags{Read: true}, + keepOpenPartner: true, + err: syserror.ErrWouldBlock, + }, + { + desc: "Writev on pipe from empty buffer returns nil", + context: &Writev{Src: usermem.BytesIOSequence(nil)}, + flags: fs.FileFlags{Write: true}, + }, + { + desc: "Writev on pipe from non-empty buffer and closed partner returns EPIPE", + context: &Writev{Src: usermem.BytesIOSequence([]byte("hello"))}, + flags: fs.FileFlags{Write: true}, + err: syscall.EPIPE, + }, + { + desc: "Writev on pipe from non-empty buffer and open partner succeeds", + context: &Writev{Src: usermem.BytesIOSequence([]byte("hello"))}, + flags: fs.FileFlags{Write: true}, + keepOpenPartner: true, + }, + } { + if test.flags.Read && test.flags.Write { + panic("both read and write not supported for this test") + } + + fds := make([]int, 2) + if err := syscall.Pipe(fds); err != nil { + t.Errorf("%s: failed to create pipes: got %v, want nil", test.desc, err) + continue + } + + // Configure the fd and partner fd based on the file flags. + testFd, partnerFd := fds[0], fds[1] + if test.flags.Write { + testFd, partnerFd = fds[1], fds[0] + } + + // Configure closing the fds. + if test.keepOpenPartner { + defer syscall.Close(partnerFd) + } else { + syscall.Close(partnerFd) + } + + // Create the pipe. + ctx := contexttest.Context(t) + p, err := newPipeOperations(ctx, nil, test.flags, fd.New(testFd), nil) + if err != nil { + t.Fatalf("%s: newPipeOperations got error %v, want nil", test.desc, err) + } + defer p.Release() + + inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{Type: fs.Pipe}) + file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, p) + + // Issue request via the appropriate function. + switch c := test.context.(type) { + case *Seek: + _, err = p.Seek(ctx, file, 0, 0) + case *ReadDir: + _, err = p.Readdir(ctx, file, nil) + case *Readv: + _, err = p.Read(ctx, file, c.Dst, 0) + case *Writev: + _, err = p.Write(ctx, file, c.Src, 0) + case *Fsync: + err = p.Fsync(ctx, file, 0, fs.FileMaxOffset, fs.SyncAll) + default: + t.Errorf("%s: unknown request type %T", test.desc, test.context) + } + + if unwrapError(err) != test.err { + t.Errorf("%s: got error %v, want %v", test.desc, err, test.err) + } + } +} + +func TestPipeReadAheadBuffer(t *testing.T) { + fds := make([]int, 2) + if err := syscall.Pipe(fds); err != nil { + t.Fatalf("failed to create pipes: got %v, want nil", err) + } + rfile := fd.New(fds[0]) + + // Eventually close the write end, which is not wrapped in a pipe object. + defer syscall.Close(fds[1]) + + // Write some bytes to this end. + data := []byte("world") + if n, err := syscall.Write(fds[1], data); n != len(data) || err != nil { + rfile.Close() + t.Fatalf("write to pipe got (%d, %v), want (%d, nil)", n, err, len(data)) + } + // Close the write end immediately, we don't care about it. + + buffered := []byte("hello ") + ctx := contexttest.Context(t) + p, err := newPipeOperations(ctx, nil, fs.FileFlags{Read: true}, rfile, buffered) + if err != nil { + rfile.Close() + t.Fatalf("newPipeOperations got error %v, want nil", err) + } + defer p.Release() + + inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{ + Type: fs.Pipe, + }) + file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, p) + + // In total we expect to read data + buffered. + total := append(buffered, data...) + + buf := make([]byte, len(total)) + iov := usermem.BytesIOSequence(buf) + n, err := p.Read(contexttest.Context(t), file, iov, 0) + if err != nil { + t.Fatalf("read request got error %v, want nil", err) + } + if n != int64(len(total)) { + t.Fatalf("read request got %d bytes, want %d", n, len(total)) + } + if !bytes.Equal(buf, total) { + t.Errorf("read request got bytes [%v], want [%v]", buf, total) + } +} + +// This is very important for pipes in general because they can return EWOULDBLOCK and for +// those that block they must continue until they have read all of the data (and report it +// as such. +func TestPipeReadsAccumulate(t *testing.T) { + fds := make([]int, 2) + if err := syscall.Pipe(fds); err != nil { + t.Fatalf("failed to create pipes: got %v, want nil", err) + } + rfile := fd.New(fds[0]) + + // Eventually close the write end, it doesn't depend on a pipe object. + defer syscall.Close(fds[1]) + + // Get a new read only pipe reference. + ctx := contexttest.Context(t) + p, err := newPipeOperations(ctx, nil, fs.FileFlags{Read: true}, rfile, nil) + if err != nil { + rfile.Close() + t.Fatalf("newPipeOperations got error %v, want nil", err) + } + // Don't forget to remove the fd from the fd notifier. Otherwise other tests will + // likely be borked, because it's global :( + defer p.Release() + + inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{ + Type: fs.Pipe, + }) + file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, p) + + // Write some some bytes to the pipe. + data := []byte("some message") + if n, err := syscall.Write(fds[1], data); n != len(data) || err != nil { + t.Fatalf("write to pipe got (%d, %v), want (%d, nil)", n, err, len(data)) + } + + // Construct a segment vec that is a bit more than we have written so we trigger + // an EWOULDBLOCK. + wantBytes := len(data) + 1 + readBuffer := make([]byte, wantBytes) + iov := usermem.BytesIOSequence(readBuffer) + n, err := p.Read(ctx, file, iov, 0) + total := n + iov = iov.DropFirst64(n) + if err != syserror.ErrWouldBlock { + t.Fatalf("Readv got error %v, want %v", err, syserror.ErrWouldBlock) + } + + // Write a few more bytes to allow us to read more/accumulate. + extra := []byte("extra") + if n, err := syscall.Write(fds[1], extra); n != len(extra) || err != nil { + t.Fatalf("write to pipe got (%d, %v), want (%d, nil)", n, err, len(extra)) + } + + // This time, using the same request, we should not block. + n, err = p.Read(ctx, file, iov, 0) + total += n + if err != nil { + t.Fatalf("Readv got error %v, want nil", err) + } + + // Assert that the result we got back is cumulative. + if total != int64(wantBytes) { + t.Fatalf("Readv sequence got %d bytes, want %d", total, wantBytes) + } + + if want := append(data, extra[0]); !bytes.Equal(readBuffer, want) { + t.Errorf("Readv sequence got %v, want %v", readBuffer, want) + } +} + +// Same as TestReadsAccumulate. +func TestPipeWritesAccumulate(t *testing.T) { + fds := make([]int, 2) + if err := syscall.Pipe(fds); err != nil { + t.Fatalf("failed to create pipes: got %v, want nil", err) + } + wfile := fd.New(fds[1]) + + // Eventually close the read end, it doesn't depend on a pipe object. + defer syscall.Close(fds[0]) + + // Get a new write only pipe reference. + ctx := contexttest.Context(t) + p, err := newPipeOperations(ctx, nil, fs.FileFlags{Write: true}, wfile, nil) + if err != nil { + wfile.Close() + t.Fatalf("newPipeOperations got error %v, want nil", err) + } + // Don't forget to remove the fd from the fd notifier. Otherwise other tests will + // likely be borked, because it's global :( + defer p.Release() + + inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{ + Type: fs.Pipe, + }) + file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, p) + + // Construct a segment vec that is larger than the pipe size to trigger an EWOULDBLOCK. + wantBytes := 65536 * 2 + writeBuffer := make([]byte, wantBytes) + for i := 0; i < wantBytes; i++ { + writeBuffer[i] = 'a' + } + iov := usermem.BytesIOSequence(writeBuffer) + n, err := p.Write(ctx, file, iov, 0) + total := n + iov = iov.DropFirst64(n) + if err != syserror.ErrWouldBlock { + t.Fatalf("Writev got error %v, want %v", err, syserror.ErrWouldBlock) + } + + // Read the entire pipe buf size to make space for the second half. + throwAway := make([]byte, 65536) + if n, err := syscall.Read(fds[0], throwAway); n != len(throwAway) || err != nil { + t.Fatalf("write to pipe got (%d, %v), want (%d, nil)", n, err, len(throwAway)) + } + + // This time we should not block. + n, err = p.Write(ctx, file, iov, 0) + total += n + if err != nil { + t.Fatalf("Writev got error %v, want nil", err) + } + + // Assert that the result we got back is cumulative. + if total != int64(wantBytes) { + t.Fatalf("Writev sequence got %d bytes, want %d", total, wantBytes) + } +} |