summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/fs/fdpipe
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/sentry/fs/fdpipe')
-rw-r--r--pkg/sentry/fs/fdpipe/BUILD76
-rw-r--r--pkg/sentry/fs/fdpipe/pipe.go167
-rw-r--r--pkg/sentry/fs/fdpipe/pipe_opener.go193
-rw-r--r--pkg/sentry/fs/fdpipe/pipe_opener_test.go522
-rw-r--r--pkg/sentry/fs/fdpipe/pipe_state.go88
-rw-r--r--pkg/sentry/fs/fdpipe/pipe_test.go489
6 files changed, 1535 insertions, 0 deletions
diff --git a/pkg/sentry/fs/fdpipe/BUILD b/pkg/sentry/fs/fdpipe/BUILD
new file mode 100644
index 000000000..9e1f65d3e
--- /dev/null
+++ b/pkg/sentry/fs/fdpipe/BUILD
@@ -0,0 +1,76 @@
+package(licenses = ["notice"]) # Apache 2.0
+
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("//tools/go_stateify:defs.bzl", "go_stateify")
+
+go_stateify(
+ name = "pipe_state",
+ srcs = [
+ "pipe.go",
+ "pipe_state.go",
+ ],
+ out = "pipe_autogen_state.go",
+ imports = ["gvisor.googlesource.com/gvisor/pkg/sentry/fs"],
+ package = "fdpipe",
+)
+
+go_library(
+ name = "fdpipe",
+ srcs = [
+ "pipe.go",
+ "pipe_autogen_state.go",
+ "pipe_opener.go",
+ "pipe_state.go",
+ ],
+ importpath = "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fdpipe",
+ visibility = ["//pkg/sentry:internal"],
+ deps = [
+ "//pkg/abi/linux",
+ "//pkg/amutex",
+ "//pkg/fd",
+ "//pkg/log",
+ "//pkg/metric",
+ "//pkg/p9",
+ "//pkg/refs",
+ "//pkg/secio",
+ "//pkg/sentry/context",
+ "//pkg/sentry/device",
+ "//pkg/sentry/fs",
+ "//pkg/sentry/fs/fsutil",
+ "//pkg/sentry/fs/lock",
+ "//pkg/sentry/kernel/auth",
+ "//pkg/sentry/kernel/time",
+ "//pkg/sentry/memmap",
+ "//pkg/sentry/platform",
+ "//pkg/sentry/safemem",
+ "//pkg/sentry/uniqueid",
+ "//pkg/sentry/usermem",
+ "//pkg/state",
+ "//pkg/syserror",
+ "//pkg/tcpip",
+ "//pkg/tcpip/transport/unix",
+ "//pkg/unet",
+ "//pkg/waiter",
+ "//pkg/waiter/fdnotifier",
+ ],
+)
+
+go_test(
+ name = "fdpipe_test",
+ size = "small",
+ srcs = [
+ "pipe_opener_test.go",
+ "pipe_test.go",
+ ],
+ embed = [":fdpipe"],
+ deps = [
+ "//pkg/fd",
+ "//pkg/sentry/context",
+ "//pkg/sentry/context/contexttest",
+ "//pkg/sentry/fs",
+ "//pkg/sentry/usermem",
+ "//pkg/syserror",
+ "//pkg/waiter/fdnotifier",
+ "@com_github_google_uuid//:go_default_library",
+ ],
+)
diff --git a/pkg/sentry/fs/fdpipe/pipe.go b/pkg/sentry/fs/fdpipe/pipe.go
new file mode 100644
index 000000000..f7bbd4aff
--- /dev/null
+++ b/pkg/sentry/fs/fdpipe/pipe.go
@@ -0,0 +1,167 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fdpipe implements common namedpipe opening and accessing logic.
+package fdpipe
+
+import (
+ "os"
+ "sync"
+ "syscall"
+
+ "gvisor.googlesource.com/gvisor/pkg/fd"
+ "gvisor.googlesource.com/gvisor/pkg/log"
+ "gvisor.googlesource.com/gvisor/pkg/secio"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/safemem"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+ "gvisor.googlesource.com/gvisor/pkg/waiter/fdnotifier"
+)
+
+// pipeOperations are the fs.FileOperations of a host pipe.
+type pipeOperations struct {
+ fsutil.PipeSeek `state:"nosave"`
+ fsutil.NotDirReaddir `state:"nosave"`
+ fsutil.NoFsync `state:"nosave"`
+ fsutil.NoopFlush `state:"nosave"`
+ fsutil.NoMMap `state:"nosave"`
+ fsutil.NoIoctl `state:"nosave"`
+ waiter.Queue `state:"nosave"`
+
+ // flags are the flags used to open the pipe.
+ flags fs.FileFlags `state:".(fs.FileFlags)"`
+
+ // opener is how the pipe was opened.
+ opener NonBlockingOpener `state:"wait"`
+
+ // file represents the host pipe.
+ file *fd.FD `state:"nosave"`
+
+ // mu protects readAheadBuffer access below.
+ mu sync.Mutex `state:"nosave"`
+
+ // readAheadBuffer contains read bytes that have not yet been read
+ // by the application but need to be buffered for save-restore for correct
+ // opening semantics. The readAheadBuffer will only be non-empty when the
+ // is first opened and will be drained by subsequent reads on the pipe.
+ readAheadBuffer []byte
+}
+
+// newPipeOperations returns an implementation of fs.FileOperations for a pipe.
+func newPipeOperations(ctx context.Context, opener NonBlockingOpener, flags fs.FileFlags, file *fd.FD, readAheadBuffer []byte) (*pipeOperations, error) {
+ pipeOps := &pipeOperations{
+ flags: flags,
+ opener: opener,
+ file: file,
+ readAheadBuffer: readAheadBuffer,
+ }
+ if err := pipeOps.init(); err != nil {
+ return nil, err
+ }
+ return pipeOps, nil
+}
+
+// init initializes p.file.
+func (p *pipeOperations) init() error {
+ var s syscall.Stat_t
+ if err := syscall.Fstat(p.file.FD(), &s); err != nil {
+ log.Warningf("pipe: cannot stat fd %d: %v", p.file.FD(), err)
+ return syscall.EINVAL
+ }
+ if s.Mode&syscall.S_IFIFO != syscall.S_IFIFO {
+ log.Warningf("pipe: cannot load fd %d as pipe, file type: %o", p.file.FD(), s.Mode)
+ return syscall.EINVAL
+ }
+ if err := syscall.SetNonblock(p.file.FD(), true); err != nil {
+ return err
+ }
+ if err := fdnotifier.AddFD(int32(p.file.FD()), &p.Queue); err != nil {
+ return err
+ }
+ return nil
+}
+
+// EventRegister implements waiter.Waitable.EventRegister.
+func (p *pipeOperations) EventRegister(e *waiter.Entry, mask waiter.EventMask) {
+ p.Queue.EventRegister(e, mask)
+ fdnotifier.UpdateFD(int32(p.file.FD()))
+}
+
+// EventUnregister implements waiter.Waitable.EventUnregister.
+func (p *pipeOperations) EventUnregister(e *waiter.Entry) {
+ p.Queue.EventUnregister(e)
+ fdnotifier.UpdateFD(int32(p.file.FD()))
+}
+
+// Readiness returns a mask of ready events for stream.
+func (p *pipeOperations) Readiness(mask waiter.EventMask) (eventMask waiter.EventMask) {
+ return fdnotifier.NonBlockingPoll(int32(p.file.FD()), mask)
+}
+
+// Release implements fs.FileOperations.Release.
+func (p *pipeOperations) Release() {
+ fdnotifier.RemoveFD(int32(p.file.FD()))
+ p.file.Close()
+ p.file = nil
+}
+
+// Read implements fs.FileOperations.Read.
+func (p *pipeOperations) Read(ctx context.Context, file *fs.File, dst usermem.IOSequence, offset int64) (int64, error) {
+ // Drain the read ahead buffer, if it contains anything first.
+ var bufN int
+ var bufErr error
+ p.mu.Lock()
+ if len(p.readAheadBuffer) > 0 {
+ bufN, bufErr = dst.CopyOut(ctx, p.readAheadBuffer)
+ p.readAheadBuffer = p.readAheadBuffer[bufN:]
+ dst = dst.DropFirst(bufN)
+ }
+ p.mu.Unlock()
+ if dst.NumBytes() == 0 || bufErr != nil {
+ return int64(bufN), bufErr
+ }
+
+ // Pipes expect full reads.
+ n, err := dst.CopyOutFrom(ctx, safemem.FromIOReader{secio.FullReader{p.file}})
+ total := int64(bufN) + n
+ if err != nil && isBlockError(err) {
+ return total, syserror.ErrWouldBlock
+ }
+ return total, err
+}
+
+// Write implements fs.FileOperations.Write.
+func (p *pipeOperations) Write(ctx context.Context, file *fs.File, src usermem.IOSequence, offset int64) (int64, error) {
+ n, err := src.CopyInTo(ctx, safemem.FromIOWriter{p.file})
+ if err != nil && isBlockError(err) {
+ return n, syserror.ErrWouldBlock
+ }
+ return n, err
+}
+
+// isBlockError unwraps os errors and checks if they are caused by EAGAIN or
+// EWOULDBLOCK. This is so they can be transformed into syserror.ErrWouldBlock.
+func isBlockError(err error) bool {
+ if err == syserror.EAGAIN || err == syserror.EWOULDBLOCK {
+ return true
+ }
+ if pe, ok := err.(*os.PathError); ok {
+ return isBlockError(pe.Err)
+ }
+ return false
+}
diff --git a/pkg/sentry/fs/fdpipe/pipe_opener.go b/pkg/sentry/fs/fdpipe/pipe_opener.go
new file mode 100644
index 000000000..a0d59575f
--- /dev/null
+++ b/pkg/sentry/fs/fdpipe/pipe_opener.go
@@ -0,0 +1,193 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fdpipe
+
+import (
+ "io"
+ "os"
+ "syscall"
+ "time"
+
+ "gvisor.googlesource.com/gvisor/pkg/fd"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+)
+
+// NonBlockingOpener is a generic host file opener used to retry opening host
+// pipes if necessary.
+type NonBlockingOpener interface {
+ // NonBlockingOpen tries to open a host pipe in a non-blocking way,
+ // and otherwise returns an error. Implementations should be idempotent.
+ NonBlockingOpen(context.Context, fs.PermMask) (*fd.FD, error)
+}
+
+// Open blocks until a host pipe can be opened or the action was cancelled.
+// On success, returns fs.FileOperations wrapping the opened host pipe.
+func Open(ctx context.Context, opener NonBlockingOpener, flags fs.FileFlags) (fs.FileOperations, error) {
+ p := &pipeOpenState{}
+ canceled := false
+ for {
+ if file, err := p.TryOpen(ctx, opener, flags); err != syserror.ErrWouldBlock {
+ return file, err
+ }
+
+ // Honor the cancellation request if open still blocks.
+ if canceled {
+ // If we were canceled but we have a handle to a host
+ // file, we need to close it.
+ if p.hostFile != nil {
+ p.hostFile.Close()
+ }
+ return nil, syserror.ErrInterrupted
+ }
+
+ cancel := ctx.SleepStart()
+ select {
+ case <-cancel:
+ // The cancellation request received here really says
+ // "cancel from now on (or ASAP)". Any environmental
+ // changes happened before receiving it, that might have
+ // caused open to not block anymore, should still be
+ // respected. So we cannot just return here. We have to
+ // give open another try below first.
+ canceled = true
+ ctx.SleepFinish(false)
+ case <-time.After(100 * time.Millisecond):
+ // If we would block, then delay retrying for a bit, since there
+ // is no way to know when the pipe would be ready to be
+ // re-opened. This is identical to sending an event notification
+ // to stop blocking in Task.Block, given that this routine will
+ // stop retrying if a cancelation is received.
+ ctx.SleepFinish(true)
+ }
+ }
+}
+
+// pipeOpenState holds state needed to open a blocking named pipe read only, for instance the
+// file that has been opened but doesn't yet have a corresponding writer.
+type pipeOpenState struct {
+ // hostFile is the read only named pipe which lacks a corresponding writer.
+ hostFile *fd.FD
+}
+
+// unwrapError is needed to match against ENXIO primarily.
+func unwrapError(err error) error {
+ if pe, ok := err.(*os.PathError); ok {
+ return pe.Err
+ }
+ return err
+}
+
+// TryOpen uses a NonBlockingOpener to try to open a host pipe, respecting the fs.FileFlags.
+func (p *pipeOpenState) TryOpen(ctx context.Context, opener NonBlockingOpener, flags fs.FileFlags) (*pipeOperations, error) {
+ switch {
+ // Reject invalid configurations so they don't accidently succeed below.
+ case !flags.Read && !flags.Write:
+ return nil, syscall.EINVAL
+
+ // Handle opening RDWR or with O_NONBLOCK: will never block, so try only once.
+ case (flags.Read && flags.Write) || flags.NonBlocking:
+ f, err := opener.NonBlockingOpen(ctx, fs.PermMask{Read: flags.Read, Write: flags.Write})
+ if err != nil {
+ return nil, err
+ }
+ return newPipeOperations(ctx, opener, flags, f, nil)
+
+ // Handle opening O_WRONLY blocking: convert ENXIO to syserror.ErrWouldBlock.
+ // See TryOpenWriteOnly for more details.
+ case flags.Write:
+ return p.TryOpenWriteOnly(ctx, opener)
+
+ default:
+ // Handle opening O_RDONLY blocking: convert EOF from read to syserror.ErrWouldBlock.
+ // See TryOpenReadOnly for more details.
+ return p.TryOpenReadOnly(ctx, opener)
+ }
+}
+
+// TryOpenReadOnly tries to open a host pipe read only but only returns a fs.File when
+// there is a coordinating writer. Call TryOpenReadOnly repeatedly on the same pipeOpenState
+// until syserror.ErrWouldBlock is no longer returned.
+//
+// How it works:
+//
+// Opening a pipe read only will return no error, but each non zero Read will return EOF
+// until a writer becomes available, then EWOULDBLOCK. This is the only state change
+// available to us. We keep a read ahead buffer in case we read bytes instead of getting
+// EWOULDBLOCK, to be read from on the first read request to this fs.File.
+func (p *pipeOpenState) TryOpenReadOnly(ctx context.Context, opener NonBlockingOpener) (*pipeOperations, error) {
+ // Waiting for a blocking read only open involves reading from the host pipe until
+ // bytes or other writers are available, so instead of retrying opening the pipe,
+ // it's necessary to retry reading from the pipe. To do this we need to keep around
+ // the read only pipe we opened, until success or an irrecoverable read error (at
+ // which point it must be closed).
+ if p.hostFile == nil {
+ var err error
+ p.hostFile, err = opener.NonBlockingOpen(ctx, fs.PermMask{Read: true})
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // Try to read from the pipe to see if writers are around.
+ tryReadBuffer := make([]byte, 1)
+ n, rerr := p.hostFile.Read(tryReadBuffer)
+
+ // No bytes were read.
+ if n == 0 {
+ // EOF means that we're not ready yet.
+ if rerr == nil || rerr == io.EOF {
+ return nil, syserror.ErrWouldBlock
+ }
+ // Any error that is not EWOULDBLOCK also means we're not
+ // ready yet, and probably never will be ready. In this
+ // case we need to close the host pipe we opened.
+ if unwrapError(rerr) != syscall.EWOULDBLOCK {
+ p.hostFile.Close()
+ return nil, rerr
+ }
+ }
+
+ // If any bytes were read, no matter the corresponding error, we need
+ // to keep them around so they can be read by the application.
+ var readAheadBuffer []byte
+ if n > 0 {
+ readAheadBuffer = tryReadBuffer
+ }
+
+ // Successfully opened read only blocking pipe with either bytes available
+ // to read and/or a writer available.
+ return newPipeOperations(ctx, opener, fs.FileFlags{Read: true}, p.hostFile, readAheadBuffer)
+}
+
+// TryOpenWriteOnly tries to open a host pipe write only but only returns a fs.File when
+// there is a coordinating reader. Call TryOpenWriteOnly repeatedly on the same pipeOpenState
+// until syserror.ErrWouldBlock is no longer returned.
+//
+// How it works:
+//
+// Opening a pipe write only will return ENXIO until readers are available. Converts the ENXIO
+// to an syserror.ErrWouldBlock, to tell callers to retry.
+func (*pipeOpenState) TryOpenWriteOnly(ctx context.Context, opener NonBlockingOpener) (*pipeOperations, error) {
+ hostFile, err := opener.NonBlockingOpen(ctx, fs.PermMask{Write: true})
+ if unwrapError(err) == syscall.ENXIO {
+ return nil, syserror.ErrWouldBlock
+ }
+ if err != nil {
+ return nil, err
+ }
+ return newPipeOperations(ctx, opener, fs.FileFlags{Write: true}, hostFile, nil)
+}
diff --git a/pkg/sentry/fs/fdpipe/pipe_opener_test.go b/pkg/sentry/fs/fdpipe/pipe_opener_test.go
new file mode 100644
index 000000000..83f6c1986
--- /dev/null
+++ b/pkg/sentry/fs/fdpipe/pipe_opener_test.go
@@ -0,0 +1,522 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fdpipe
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "os"
+ "path"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "gvisor.googlesource.com/gvisor/pkg/fd"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context/contexttest"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+)
+
+type hostOpener struct {
+ name string
+}
+
+func (h *hostOpener) NonBlockingOpen(_ context.Context, p fs.PermMask) (*fd.FD, error) {
+ var flags int
+ switch {
+ case p.Read && p.Write:
+ flags = syscall.O_RDWR
+ case p.Write:
+ flags = syscall.O_WRONLY
+ case p.Read:
+ flags = syscall.O_RDONLY
+ default:
+ return nil, syscall.EINVAL
+ }
+ f, err := syscall.Open(h.name, flags|syscall.O_NONBLOCK, 0666)
+ if err != nil {
+ return nil, err
+ }
+ return fd.New(f), nil
+}
+
+func pipename() string {
+ return fmt.Sprintf(path.Join(os.TempDir(), "test-named-pipe-%s"), uuid.New())
+}
+
+func mkpipe(name string) error {
+ return syscall.Mknod(name, syscall.S_IFIFO|0666, 0)
+}
+
+func TestTryOpen(t *testing.T) {
+ for _, test := range []struct {
+ // desc is the test's description.
+ desc string
+
+ // makePipe is true if the test case should create the pipe.
+ makePipe bool
+
+ // flags are the fs.FileFlags used to open the pipe.
+ flags fs.FileFlags
+
+ // expectFile is true if a fs.File is expected.
+ expectFile bool
+
+ // err is the expected error
+ err error
+ }{
+ {
+ desc: "FileFlags lacking Read and Write are invalid",
+ makePipe: false,
+ flags: fs.FileFlags{}, /* bogus */
+ expectFile: false,
+ err: syscall.EINVAL,
+ },
+ {
+ desc: "NonBlocking Read only error returns immediately",
+ makePipe: false, /* causes the error */
+ flags: fs.FileFlags{Read: true, NonBlocking: true},
+ expectFile: false,
+ err: syscall.ENOENT,
+ },
+ {
+ desc: "NonBlocking Read only success returns immediately",
+ makePipe: true,
+ flags: fs.FileFlags{Read: true, NonBlocking: true},
+ expectFile: true,
+ err: nil,
+ },
+ {
+ desc: "NonBlocking Write only error returns immediately",
+ makePipe: false, /* causes the error */
+ flags: fs.FileFlags{Write: true, NonBlocking: true},
+ expectFile: false,
+ err: syscall.ENOENT,
+ },
+ {
+ desc: "NonBlocking Write only no reader error returns immediately",
+ makePipe: true,
+ flags: fs.FileFlags{Write: true, NonBlocking: true},
+ expectFile: false,
+ err: syscall.ENXIO,
+ },
+ {
+ desc: "ReadWrite error returns immediately",
+ makePipe: false, /* causes the error */
+ flags: fs.FileFlags{Read: true, Write: true},
+ expectFile: false,
+ err: syscall.ENOENT,
+ },
+ {
+ desc: "ReadWrite returns immediately",
+ makePipe: true,
+ flags: fs.FileFlags{Read: true, Write: true},
+ expectFile: true,
+ err: nil,
+ },
+ {
+ desc: "Blocking Write only returns open error",
+ makePipe: false, /* causes the error */
+ flags: fs.FileFlags{Write: true},
+ expectFile: false,
+ err: syscall.ENOENT, /* from bogus perms */
+ },
+ {
+ desc: "Blocking Read only returns open error",
+ makePipe: false, /* causes the error */
+ flags: fs.FileFlags{Read: true},
+ expectFile: false,
+ err: syscall.ENOENT,
+ },
+ {
+ desc: "Blocking Write only returns with syserror.ErrWouldBlock",
+ makePipe: true,
+ flags: fs.FileFlags{Write: true},
+ expectFile: false,
+ err: syserror.ErrWouldBlock,
+ },
+ {
+ desc: "Blocking Read only returns with syserror.ErrWouldBlock",
+ makePipe: true,
+ flags: fs.FileFlags{Read: true},
+ expectFile: false,
+ err: syserror.ErrWouldBlock,
+ },
+ } {
+ name := pipename()
+ if test.makePipe {
+ // Create the pipe. We do this per-test case to keep tests independent.
+ if err := mkpipe(name); err != nil {
+ t.Errorf("%s: failed to make host pipe: %v", test.desc, err)
+ continue
+ }
+ defer syscall.Unlink(name)
+ }
+
+ // Use a host opener to keep things simple.
+ opener := &hostOpener{name: name}
+
+ pipeOpenState := &pipeOpenState{}
+ ctx := contexttest.Context(t)
+ pipeOps, err := pipeOpenState.TryOpen(ctx, opener, test.flags)
+ if unwrapError(err) != test.err {
+ t.Errorf("%s: got error %v, want %v", test.desc, err, test.err)
+ if pipeOps != nil {
+ // Cleanup the state of the pipe, and remove the fd from the
+ // fdnotifier. Sadly this needed to maintain the correctness
+ // of other tests because the fdnotifier is global.
+ pipeOps.Release()
+ }
+ continue
+ }
+ if (pipeOps != nil) != test.expectFile {
+ t.Errorf("%s: got non-nil file %v, want %v", test.desc, pipeOps != nil, test.expectFile)
+ }
+ if pipeOps != nil {
+ // Same as above.
+ pipeOps.Release()
+ }
+ }
+}
+
+func TestPipeOpenUnblocksEventually(t *testing.T) {
+ for _, test := range []struct {
+ // desc is the test's description.
+ desc string
+
+ // partnerIsReader is true if the goroutine opening the same pipe as the test case
+ // should open the pipe read only. Otherwise write only. This also means that the
+ // test case will open the pipe in the opposite way.
+ partnerIsReader bool
+
+ // partnerIsBlocking is true if the goroutine opening the same pipe as the test case
+ // should do so without the O_NONBLOCK flag, otherwise opens the pipe with O_NONBLOCK
+ // until ENXIO is not returned.
+ partnerIsBlocking bool
+ }{
+ {
+ desc: "Blocking Read with blocking writer partner opens eventually",
+ partnerIsReader: false,
+ partnerIsBlocking: true,
+ },
+ {
+ desc: "Blocking Write with blocking reader partner opens eventually",
+ partnerIsReader: true,
+ partnerIsBlocking: true,
+ },
+ {
+ desc: "Blocking Read with non-blocking writer partner opens eventually",
+ partnerIsReader: false,
+ partnerIsBlocking: false,
+ },
+ {
+ desc: "Blocking Write with non-blocking reader partner opens eventually",
+ partnerIsReader: true,
+ partnerIsBlocking: false,
+ },
+ } {
+ // Create the pipe. We do this per-test case to keep tests independent.
+ name := pipename()
+ if err := mkpipe(name); err != nil {
+ t.Errorf("%s: failed to make host pipe: %v", test.desc, err)
+ continue
+ }
+ defer syscall.Unlink(name)
+
+ // Spawn the partner.
+ type fderr struct {
+ fd int
+ err error
+ }
+ errch := make(chan fderr, 1)
+ go func() {
+ var flags int
+ if test.partnerIsReader {
+ flags = syscall.O_RDONLY
+ } else {
+ flags = syscall.O_WRONLY
+ }
+ if test.partnerIsBlocking {
+ fd, err := syscall.Open(name, flags, 0666)
+ errch <- fderr{fd: fd, err: err}
+ } else {
+ var fd int
+ err := error(syscall.ENXIO)
+ for err == syscall.ENXIO {
+ fd, err = syscall.Open(name, flags|syscall.O_NONBLOCK, 0666)
+ time.Sleep(1 * time.Second)
+ }
+ errch <- fderr{fd: fd, err: err}
+ }
+ }()
+
+ // Setup file flags for either a read only or write only open.
+ flags := fs.FileFlags{
+ Read: !test.partnerIsReader,
+ Write: test.partnerIsReader,
+ }
+
+ // Open the pipe in a blocking way, which should succeed eventually.
+ opener := &hostOpener{name: name}
+ ctx := contexttest.Context(t)
+ pipeOps, err := Open(ctx, opener, flags)
+ if pipeOps != nil {
+ // Same as TestTryOpen.
+ pipeOps.Release()
+ }
+
+ // Check that the partner opened the file successfully.
+ e := <-errch
+ if e.err != nil {
+ t.Errorf("%s: partner got error %v, wanted nil", test.desc, e.err)
+ continue
+ }
+ // If so, then close the partner fd to avoid leaking an fd.
+ syscall.Close(e.fd)
+
+ // Check that our blocking open was successful.
+ if err != nil {
+ t.Errorf("%s: blocking open got error %v, wanted nil", test.desc, err)
+ continue
+ }
+ if pipeOps == nil {
+ t.Errorf("%s: blocking open got nil file, wanted non-nil", test.desc)
+ continue
+ }
+ }
+}
+
+func TestCopiedReadAheadBuffer(t *testing.T) {
+ // Create the pipe.
+ name := pipename()
+ if err := mkpipe(name); err != nil {
+ t.Fatalf("failed to make host pipe: %v", err)
+ }
+ defer syscall.Unlink(name)
+
+ // We're taking advantage of the fact that pipes opened read only always return
+ // success, but internally they are not deemed "opened" until we're sure that
+ // another writer comes along. This means we can open the same pipe write only
+ // with no problems + write to it, given that opener.Open already tried to open
+ // the pipe RDONLY and succeeded, which we know happened if TryOpen returns
+ // syserror.ErrwouldBlock.
+ //
+ // This simulates the open(RDONLY) <-> open(WRONLY)+write race we care about, but
+ // does not cause our test to be racy (which would be terrible).
+ opener := &hostOpener{name: name}
+ pipeOpenState := &pipeOpenState{}
+ ctx := contexttest.Context(t)
+ pipeOps, err := pipeOpenState.TryOpen(ctx, opener, fs.FileFlags{Read: true})
+ if pipeOps != nil {
+ pipeOps.Release()
+ t.Fatalf("open(%s, %o) got file, want nil", name, syscall.O_RDONLY)
+ }
+ if err != syserror.ErrWouldBlock {
+ t.Fatalf("open(%s, %o) got error %v, want %v", name, syscall.O_RDONLY, err, syserror.ErrWouldBlock)
+ }
+
+ // Then open the same pipe write only and write some bytes to it. The next
+ // time we try to open the pipe read only again via the pipeOpenState, we should
+ // succeed and buffer some of the bytes written.
+ fd, err := syscall.Open(name, syscall.O_WRONLY, 0666)
+ if err != nil {
+ t.Fatalf("open(%s, %o) got error %v, want nil", name, syscall.O_WRONLY, err)
+ }
+ defer syscall.Close(fd)
+
+ data := []byte("hello")
+ if n, err := syscall.Write(fd, data); n != len(data) || err != nil {
+ t.Fatalf("write(%v) got (%d, %v), want (%d, nil)", data, n, err, len(data))
+ }
+
+ // Try the read again, knowing that it should succeed this time.
+ pipeOps, err = pipeOpenState.TryOpen(ctx, opener, fs.FileFlags{Read: true})
+ if pipeOps == nil {
+ t.Fatalf("open(%s, %o) got nil file, want not nil", name, syscall.O_RDONLY)
+ }
+ defer pipeOps.Release()
+
+ if err != nil {
+ t.Fatalf("open(%s, %o) got error %v, want nil", name, syscall.O_RDONLY, err)
+ }
+
+ inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{
+ Type: fs.Pipe,
+ })
+ file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, pipeOps)
+
+ // Check that the file we opened points to a pipe with a non-empty read ahead buffer.
+ bufsize := len(pipeOps.readAheadBuffer)
+ if bufsize != 1 {
+ t.Fatalf("read ahead buffer got %d bytes, want %d", bufsize, 1)
+ }
+
+ // Now for the final test, try to read everything in, expecting to get back all of
+ // the bytes that were written at once. Note that in the wild there is no atomic
+ // read size so expecting to get all bytes from a single writer when there are
+ // multiple readers is a bad expectation.
+ buf := make([]byte, len(data))
+ ioseq := usermem.BytesIOSequence(buf)
+ n, err := pipeOps.Read(ctx, file, ioseq, 0)
+ if err != nil {
+ t.Fatalf("read request got error %v, want nil", err)
+ }
+ if n != int64(len(data)) {
+ t.Fatalf("read request got %d bytes, want %d", n, len(data))
+ }
+ if !bytes.Equal(buf, data) {
+ t.Errorf("read request got bytes [%v], want [%v]", buf, data)
+ }
+}
+
+func TestPipeHangup(t *testing.T) {
+ for _, test := range []struct {
+ // desc is the test's description.
+ desc string
+
+ // flags control how we open our end of the pipe and must be read
+ // only or write only. They also dicate how a coordinating partner
+ // fd is opened, which is their inverse (read only -> write only, etc).
+ flags fs.FileFlags
+
+ // hangupSelf if true causes the test case to close our end of the pipe
+ // and causes hangup errors to be asserted on our coordinating partner's
+ // fd. If hangupSelf is false, then our partner's fd is closed and the
+ // hangup errors are expected on our end of the pipe.
+ hangupSelf bool
+ }{
+ {
+ desc: "Read only gets hangup error",
+ flags: fs.FileFlags{Read: true},
+ },
+ {
+ desc: "Write only gets hangup error",
+ flags: fs.FileFlags{Write: true},
+ },
+ {
+ desc: "Read only generates hangup error",
+ flags: fs.FileFlags{Read: true},
+ hangupSelf: true,
+ },
+ {
+ desc: "Write only generates hangup error",
+ flags: fs.FileFlags{Write: true},
+ hangupSelf: true,
+ },
+ } {
+ if test.flags.Read == test.flags.Write {
+ t.Errorf("%s: test requires a single reader or writer", test.desc)
+ continue
+ }
+
+ // Create the pipe. We do this per-test case to keep tests independent.
+ name := pipename()
+ if err := mkpipe(name); err != nil {
+ t.Errorf("%s: failed to make host pipe: %v", test.desc, err)
+ continue
+ }
+ defer syscall.Unlink(name)
+
+ // Fire off a partner routine which tries to open the same pipe blocking,
+ // which will synchronize with us. The channel allows us to get back the
+ // fd once we expect this partner routine to succeed, so we can manifest
+ // hangup events more directly.
+ fdchan := make(chan int, 1)
+ go func() {
+ // Be explicit about the flags to protect the test from
+ // misconfiguration.
+ var flags int
+ if test.flags.Read {
+ flags = syscall.O_WRONLY
+ } else {
+ flags = syscall.O_RDONLY
+ }
+ fd, err := syscall.Open(name, flags, 0666)
+ if err != nil {
+ t.Logf("Open(%q, %o, 0666) partner failed: %v", name, flags, err)
+ }
+ fdchan <- fd
+ }()
+
+ // Open our end in a blocking way to ensure that we coordinate.
+ opener := &hostOpener{name: name}
+ ctx := contexttest.Context(t)
+ pipeOps, err := Open(ctx, opener, test.flags)
+ if err != nil {
+ t.Errorf("%s: Open got error %v, want nil", test.desc, err)
+ continue
+ }
+ // Don't defer file.DecRef here because that causes the hangup we're
+ // trying to test for.
+
+ // Expect the partner routine to have coordinated with us and get back
+ // its open fd.
+ f := <-fdchan
+ if f < 0 {
+ t.Errorf("%s: partner routine got fd %d, want > 0", test.desc, f)
+ pipeOps.Release()
+ continue
+ }
+
+ if test.hangupSelf {
+ // Hangup self and assert that our partner got the expected hangup
+ // error.
+ pipeOps.Release()
+
+ if test.flags.Read {
+ // Partner is writer.
+ assertWriterHungup(t, test.desc, fd.NewReadWriter(f))
+ } else {
+ // Partner is reader.
+ assertReaderHungup(t, test.desc, fd.NewReadWriter(f))
+ }
+ } else {
+ // Hangup our partner and expect us to get the hangup error.
+ syscall.Close(f)
+ defer pipeOps.Release()
+
+ if test.flags.Read {
+ assertReaderHungup(t, test.desc, pipeOps.(*pipeOperations).file)
+ } else {
+ assertWriterHungup(t, test.desc, pipeOps.(*pipeOperations).file)
+ }
+ }
+ }
+}
+
+func assertReaderHungup(t *testing.T, desc string, reader io.Reader) bool {
+ // Drain the pipe completely, it might have crap in it, but expect EOF eventually.
+ var err error
+ for err == nil {
+ _, err = reader.Read(make([]byte, 10))
+ }
+ if err != io.EOF {
+ t.Errorf("%s: read from self after hangup got error %v, want %v", desc, err, io.EOF)
+ return false
+ }
+ return true
+}
+
+func assertWriterHungup(t *testing.T, desc string, writer io.Writer) bool {
+ if _, err := writer.Write([]byte("hello")); unwrapError(err) != syscall.EPIPE {
+ t.Errorf("%s: write to self after hangup got error %v, want %v", desc, err, syscall.EPIPE)
+ return false
+ }
+ return true
+}
diff --git a/pkg/sentry/fs/fdpipe/pipe_state.go b/pkg/sentry/fs/fdpipe/pipe_state.go
new file mode 100644
index 000000000..8996a2178
--- /dev/null
+++ b/pkg/sentry/fs/fdpipe/pipe_state.go
@@ -0,0 +1,88 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fdpipe
+
+import (
+ "fmt"
+ "io/ioutil"
+ "sync"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+)
+
+// beforeSave is invoked by stateify.
+func (p *pipeOperations) beforeSave() {
+ if p.flags.Read {
+ data, err := ioutil.ReadAll(p.file)
+ if err != nil && !isBlockError(err) {
+ panic(fmt.Sprintf("failed to read from pipe: %v", err))
+ }
+ p.readAheadBuffer = append(p.readAheadBuffer, data...)
+ } else if p.flags.Write {
+ file, err := p.opener.NonBlockingOpen(context.Background(), fs.PermMask{Write: true})
+ if err != nil {
+ panic(fs.ErrSaveRejection{fmt.Errorf("write-only pipe end cannot be re-opened as %v: %v", p, err)})
+ }
+ file.Close()
+ }
+}
+
+// saveFlags is invoked by stateify.
+func (p *pipeOperations) saveFlags() fs.FileFlags {
+ return p.flags
+}
+
+// readPipeOperationsLoading is used to ensure that write-only pipe fds are
+// opened after read/write and read-only pipe fds, to avoid ENXIO when
+// multiple pipe fds refer to different ends of the same pipe.
+var readPipeOperationsLoading sync.WaitGroup
+
+// loadFlags is invoked by stateify.
+func (p *pipeOperations) loadFlags(flags fs.FileFlags) {
+ // This is a hack to ensure that readPipeOperationsLoading includes all
+ // readable pipe fds before any asynchronous calls to
+ // readPipeOperationsLoading.Wait().
+ if flags.Read {
+ readPipeOperationsLoading.Add(1)
+ }
+ p.flags = flags
+}
+
+// afterLoad is invoked by stateify.
+func (p *pipeOperations) afterLoad() {
+ load := func() {
+ if !p.flags.Read {
+ readPipeOperationsLoading.Wait()
+ } else {
+ defer readPipeOperationsLoading.Done()
+ }
+ var err error
+ p.file, err = p.opener.NonBlockingOpen(context.Background(), fs.PermMask{
+ Read: p.flags.Read,
+ Write: p.flags.Write,
+ })
+ if err != nil {
+ panic(fmt.Sprintf("unable to open pipe %v: %v", p, err))
+ }
+ if err := p.init(); err != nil {
+ panic(fmt.Sprintf("unable to initialize pipe %v: %v", p, err))
+ }
+ }
+
+ // Do background opening of pipe ends. Note for write-only pipe ends we
+ // have to do it asynchronously to avoid blocking the restore.
+ fs.Async(load)
+}
diff --git a/pkg/sentry/fs/fdpipe/pipe_test.go b/pkg/sentry/fs/fdpipe/pipe_test.go
new file mode 100644
index 000000000..6cd314f5b
--- /dev/null
+++ b/pkg/sentry/fs/fdpipe/pipe_test.go
@@ -0,0 +1,489 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fdpipe
+
+import (
+ "bytes"
+ "io"
+ "os"
+ "syscall"
+ "testing"
+
+ "gvisor.googlesource.com/gvisor/pkg/fd"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context/contexttest"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+ "gvisor.googlesource.com/gvisor/pkg/waiter/fdnotifier"
+)
+
+func singlePipeFD() (int, error) {
+ fds := make([]int, 2)
+ if err := syscall.Pipe(fds); err != nil {
+ return -1, err
+ }
+ syscall.Close(fds[1])
+ return fds[0], nil
+}
+
+func singleDirFD() (int, error) {
+ return syscall.Open(os.TempDir(), syscall.O_RDONLY, 0666)
+}
+
+func mockPipeDirent(t *testing.T) *fs.Dirent {
+ ctx := contexttest.Context(t)
+ node := fs.NewMockInodeOperations(ctx)
+ node.UAttr = fs.UnstableAttr{
+ Perms: fs.FilePermissions{
+ User: fs.PermMask{Read: true, Write: true},
+ },
+ }
+ inode := fs.NewInode(node, fs.NewMockMountSource(nil), fs.StableAttr{
+ Type: fs.Pipe,
+ BlockSize: usermem.PageSize,
+ })
+ return fs.NewDirent(inode, "")
+}
+
+func TestNewPipe(t *testing.T) {
+ for _, test := range []struct {
+ // desc is the test's description.
+ desc string
+
+ // getfd generates the fd to pass to newPipeOperations.
+ getfd func() (int, error)
+
+ // flags are the fs.FileFlags passed to newPipeOperations.
+ flags fs.FileFlags
+
+ // readAheadBuffer is the buffer passed to newPipeOperations.
+ readAheadBuffer []byte
+
+ // err is the expected error.
+ err error
+ }{
+ {
+ desc: "Cannot make new pipe from bad fd",
+ getfd: func() (int, error) { return -1, nil },
+ err: syscall.EINVAL,
+ },
+ {
+ desc: "Cannot make new pipe from non-pipe fd",
+ getfd: singleDirFD,
+ err: syscall.EINVAL,
+ },
+ {
+ desc: "Can make new pipe from pipe fd",
+ getfd: singlePipeFD,
+ flags: fs.FileFlags{Read: true},
+ readAheadBuffer: []byte("hello"),
+ },
+ } {
+ gfd, err := test.getfd()
+ if err != nil {
+ t.Errorf("%s: getfd got (%d, %v), want (fd, nil)", test.desc, gfd, err)
+ continue
+ }
+ f := fd.New(gfd)
+
+ p, err := newPipeOperations(contexttest.Context(t), nil, test.flags, f, test.readAheadBuffer)
+ if p != nil {
+ // This is necessary to remove the fd from the global fd notifier.
+ defer p.Release()
+ } else {
+ // If there is no p to DecRef on, because newPipeOperations failed, then the
+ // file still needs to be closed.
+ defer f.Close()
+ }
+
+ if err != test.err {
+ t.Errorf("%s: got error %v, want %v", test.desc, err, test.err)
+ continue
+ }
+ // Check the state of the pipe given that it was successfully opened.
+ if err == nil {
+ if p == nil {
+ t.Errorf("%s: got nil pipe and nil error, want (pipe, nil)", test.desc)
+ continue
+ }
+ if flags := p.flags; test.flags != flags {
+ t.Errorf("%s: got file flags %s, want %s", test.desc, flags, test.flags)
+ continue
+ }
+ if len(test.readAheadBuffer) != len(p.readAheadBuffer) {
+ t.Errorf("%s: got read ahead buffer length %d, want %d", test.desc, len(p.readAheadBuffer), len(test.readAheadBuffer))
+ continue
+ }
+ fileFlags, _, errno := syscall.Syscall(syscall.SYS_FCNTL, uintptr(p.file.FD()), syscall.F_GETFL, 0)
+ if errno != 0 {
+ t.Errorf("%s: failed to get file flags for fd %d, got %v, want 0", test.desc, p.file.FD(), errno)
+ continue
+ }
+ if fileFlags&syscall.O_NONBLOCK == 0 {
+ t.Errorf("%s: pipe is blocking, expected non-blocking", test.desc)
+ continue
+ }
+ if !fdnotifier.HasFD(int32(f.FD())) {
+ t.Errorf("%s: pipe fd %d is not registered for events", test.desc, f.FD)
+ }
+ }
+ }
+}
+
+func TestPipeDestruction(t *testing.T) {
+ fds := make([]int, 2)
+ if err := syscall.Pipe(fds); err != nil {
+ t.Fatalf("failed to create pipes: got %v, want nil", err)
+ }
+ f := fd.New(fds[0])
+
+ // We don't care about the other end, just use the read end.
+ syscall.Close(fds[1])
+
+ // Test the read end, but it doesn't really matter which.
+ p, err := newPipeOperations(contexttest.Context(t), nil, fs.FileFlags{Read: true}, f, nil)
+ if err != nil {
+ f.Close()
+ t.Fatalf("newPipeOperations got error %v, want nil", err)
+ }
+ // Drop our only reference, which should trigger the destructor.
+ p.Release()
+
+ if fdnotifier.HasFD(int32(fds[0])) {
+ t.Fatalf("after DecRef fdnotifier has fd %d, want no longer registered", fds[0])
+ }
+ if p.file != nil {
+ t.Errorf("after DecRef got file, want nil")
+ }
+}
+
+type Seek struct{}
+
+type ReadDir struct{}
+
+type Writev struct {
+ Src usermem.IOSequence
+}
+
+type Readv struct {
+ Dst usermem.IOSequence
+}
+
+type Fsync struct{}
+
+func TestPipeRequest(t *testing.T) {
+ for _, test := range []struct {
+ // desc is the test's description.
+ desc string
+
+ // request to execute.
+ context interface{}
+
+ // flags determines whether to use the read or write end
+ // of the pipe, for this test it can only be Read or Write.
+ flags fs.FileFlags
+
+ // keepOpenPartner if false closes the other end of the pipe,
+ // otherwise this is delayed until the end of the test.
+ keepOpenPartner bool
+
+ // expected error
+ err error
+ }{
+ {
+ desc: "ReadDir on pipe returns ENOTDIR",
+ context: &ReadDir{},
+ err: syscall.ENOTDIR,
+ },
+ {
+ desc: "Fsync on pipe returns EINVAL",
+ context: &Fsync{},
+ err: syscall.EINVAL,
+ },
+ {
+ desc: "Seek on pipe returns ESPIPE",
+ context: &Seek{},
+ err: syscall.ESPIPE,
+ },
+ {
+ desc: "Readv on pipe from empty buffer returns nil",
+ context: &Readv{Dst: usermem.BytesIOSequence(nil)},
+ flags: fs.FileFlags{Read: true},
+ },
+ {
+ desc: "Readv on pipe from non-empty buffer and closed partner returns EOF",
+ context: &Readv{Dst: usermem.BytesIOSequence(make([]byte, 10))},
+ flags: fs.FileFlags{Read: true},
+ err: io.EOF,
+ },
+ {
+ desc: "Readv on pipe from non-empty buffer and open partner returns EWOULDBLOCK",
+ context: &Readv{Dst: usermem.BytesIOSequence(make([]byte, 10))},
+ flags: fs.FileFlags{Read: true},
+ keepOpenPartner: true,
+ err: syserror.ErrWouldBlock,
+ },
+ {
+ desc: "Writev on pipe from empty buffer returns nil",
+ context: &Writev{Src: usermem.BytesIOSequence(nil)},
+ flags: fs.FileFlags{Write: true},
+ },
+ {
+ desc: "Writev on pipe from non-empty buffer and closed partner returns EPIPE",
+ context: &Writev{Src: usermem.BytesIOSequence([]byte("hello"))},
+ flags: fs.FileFlags{Write: true},
+ err: syscall.EPIPE,
+ },
+ {
+ desc: "Writev on pipe from non-empty buffer and open partner succeeds",
+ context: &Writev{Src: usermem.BytesIOSequence([]byte("hello"))},
+ flags: fs.FileFlags{Write: true},
+ keepOpenPartner: true,
+ },
+ } {
+ if test.flags.Read && test.flags.Write {
+ panic("both read and write not supported for this test")
+ }
+
+ fds := make([]int, 2)
+ if err := syscall.Pipe(fds); err != nil {
+ t.Errorf("%s: failed to create pipes: got %v, want nil", test.desc, err)
+ continue
+ }
+
+ // Configure the fd and partner fd based on the file flags.
+ testFd, partnerFd := fds[0], fds[1]
+ if test.flags.Write {
+ testFd, partnerFd = fds[1], fds[0]
+ }
+
+ // Configure closing the fds.
+ if test.keepOpenPartner {
+ defer syscall.Close(partnerFd)
+ } else {
+ syscall.Close(partnerFd)
+ }
+
+ // Create the pipe.
+ ctx := contexttest.Context(t)
+ p, err := newPipeOperations(ctx, nil, test.flags, fd.New(testFd), nil)
+ if err != nil {
+ t.Fatalf("%s: newPipeOperations got error %v, want nil", test.desc, err)
+ }
+ defer p.Release()
+
+ inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{Type: fs.Pipe})
+ file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, p)
+
+ // Issue request via the appropriate function.
+ switch c := test.context.(type) {
+ case *Seek:
+ _, err = p.Seek(ctx, file, 0, 0)
+ case *ReadDir:
+ _, err = p.Readdir(ctx, file, nil)
+ case *Readv:
+ _, err = p.Read(ctx, file, c.Dst, 0)
+ case *Writev:
+ _, err = p.Write(ctx, file, c.Src, 0)
+ case *Fsync:
+ err = p.Fsync(ctx, file, 0, fs.FileMaxOffset, fs.SyncAll)
+ default:
+ t.Errorf("%s: unknown request type %T", test.desc, test.context)
+ }
+
+ if unwrapError(err) != test.err {
+ t.Errorf("%s: got error %v, want %v", test.desc, err, test.err)
+ }
+ }
+}
+
+func TestPipeReadAheadBuffer(t *testing.T) {
+ fds := make([]int, 2)
+ if err := syscall.Pipe(fds); err != nil {
+ t.Fatalf("failed to create pipes: got %v, want nil", err)
+ }
+ rfile := fd.New(fds[0])
+
+ // Eventually close the write end, which is not wrapped in a pipe object.
+ defer syscall.Close(fds[1])
+
+ // Write some bytes to this end.
+ data := []byte("world")
+ if n, err := syscall.Write(fds[1], data); n != len(data) || err != nil {
+ rfile.Close()
+ t.Fatalf("write to pipe got (%d, %v), want (%d, nil)", n, err, len(data))
+ }
+ // Close the write end immediately, we don't care about it.
+
+ buffered := []byte("hello ")
+ ctx := contexttest.Context(t)
+ p, err := newPipeOperations(ctx, nil, fs.FileFlags{Read: true}, rfile, buffered)
+ if err != nil {
+ rfile.Close()
+ t.Fatalf("newPipeOperations got error %v, want nil", err)
+ }
+ defer p.Release()
+
+ inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{
+ Type: fs.Pipe,
+ })
+ file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, p)
+
+ // In total we expect to read data + buffered.
+ total := append(buffered, data...)
+
+ buf := make([]byte, len(total))
+ iov := usermem.BytesIOSequence(buf)
+ n, err := p.Read(contexttest.Context(t), file, iov, 0)
+ if err != nil {
+ t.Fatalf("read request got error %v, want nil", err)
+ }
+ if n != int64(len(total)) {
+ t.Fatalf("read request got %d bytes, want %d", n, len(total))
+ }
+ if !bytes.Equal(buf, total) {
+ t.Errorf("read request got bytes [%v], want [%v]", buf, total)
+ }
+}
+
+// This is very important for pipes in general because they can return EWOULDBLOCK and for
+// those that block they must continue until they have read all of the data (and report it
+// as such.
+func TestPipeReadsAccumulate(t *testing.T) {
+ fds := make([]int, 2)
+ if err := syscall.Pipe(fds); err != nil {
+ t.Fatalf("failed to create pipes: got %v, want nil", err)
+ }
+ rfile := fd.New(fds[0])
+
+ // Eventually close the write end, it doesn't depend on a pipe object.
+ defer syscall.Close(fds[1])
+
+ // Get a new read only pipe reference.
+ ctx := contexttest.Context(t)
+ p, err := newPipeOperations(ctx, nil, fs.FileFlags{Read: true}, rfile, nil)
+ if err != nil {
+ rfile.Close()
+ t.Fatalf("newPipeOperations got error %v, want nil", err)
+ }
+ // Don't forget to remove the fd from the fd notifier. Otherwise other tests will
+ // likely be borked, because it's global :(
+ defer p.Release()
+
+ inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{
+ Type: fs.Pipe,
+ })
+ file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, p)
+
+ // Write some some bytes to the pipe.
+ data := []byte("some message")
+ if n, err := syscall.Write(fds[1], data); n != len(data) || err != nil {
+ t.Fatalf("write to pipe got (%d, %v), want (%d, nil)", n, err, len(data))
+ }
+
+ // Construct a segment vec that is a bit more than we have written so we trigger
+ // an EWOULDBLOCK.
+ wantBytes := len(data) + 1
+ readBuffer := make([]byte, wantBytes)
+ iov := usermem.BytesIOSequence(readBuffer)
+ n, err := p.Read(ctx, file, iov, 0)
+ total := n
+ iov = iov.DropFirst64(n)
+ if err != syserror.ErrWouldBlock {
+ t.Fatalf("Readv got error %v, want %v", err, syserror.ErrWouldBlock)
+ }
+
+ // Write a few more bytes to allow us to read more/accumulate.
+ extra := []byte("extra")
+ if n, err := syscall.Write(fds[1], extra); n != len(extra) || err != nil {
+ t.Fatalf("write to pipe got (%d, %v), want (%d, nil)", n, err, len(extra))
+ }
+
+ // This time, using the same request, we should not block.
+ n, err = p.Read(ctx, file, iov, 0)
+ total += n
+ if err != nil {
+ t.Fatalf("Readv got error %v, want nil", err)
+ }
+
+ // Assert that the result we got back is cumulative.
+ if total != int64(wantBytes) {
+ t.Fatalf("Readv sequence got %d bytes, want %d", total, wantBytes)
+ }
+
+ if want := append(data, extra[0]); !bytes.Equal(readBuffer, want) {
+ t.Errorf("Readv sequence got %v, want %v", readBuffer, want)
+ }
+}
+
+// Same as TestReadsAccumulate.
+func TestPipeWritesAccumulate(t *testing.T) {
+ fds := make([]int, 2)
+ if err := syscall.Pipe(fds); err != nil {
+ t.Fatalf("failed to create pipes: got %v, want nil", err)
+ }
+ wfile := fd.New(fds[1])
+
+ // Eventually close the read end, it doesn't depend on a pipe object.
+ defer syscall.Close(fds[0])
+
+ // Get a new write only pipe reference.
+ ctx := contexttest.Context(t)
+ p, err := newPipeOperations(ctx, nil, fs.FileFlags{Write: true}, wfile, nil)
+ if err != nil {
+ wfile.Close()
+ t.Fatalf("newPipeOperations got error %v, want nil", err)
+ }
+ // Don't forget to remove the fd from the fd notifier. Otherwise other tests will
+ // likely be borked, because it's global :(
+ defer p.Release()
+
+ inode := fs.NewMockInode(ctx, fs.NewMockMountSource(nil), fs.StableAttr{
+ Type: fs.Pipe,
+ })
+ file := fs.NewFile(ctx, fs.NewDirent(inode, "pipe"), fs.FileFlags{Read: true}, p)
+
+ // Construct a segment vec that is larger than the pipe size to trigger an EWOULDBLOCK.
+ wantBytes := 65536 * 2
+ writeBuffer := make([]byte, wantBytes)
+ for i := 0; i < wantBytes; i++ {
+ writeBuffer[i] = 'a'
+ }
+ iov := usermem.BytesIOSequence(writeBuffer)
+ n, err := p.Write(ctx, file, iov, 0)
+ total := n
+ iov = iov.DropFirst64(n)
+ if err != syserror.ErrWouldBlock {
+ t.Fatalf("Writev got error %v, want %v", err, syserror.ErrWouldBlock)
+ }
+
+ // Read the entire pipe buf size to make space for the second half.
+ throwAway := make([]byte, 65536)
+ if n, err := syscall.Read(fds[0], throwAway); n != len(throwAway) || err != nil {
+ t.Fatalf("write to pipe got (%d, %v), want (%d, nil)", n, err, len(throwAway))
+ }
+
+ // This time we should not block.
+ n, err = p.Write(ctx, file, iov, 0)
+ total += n
+ if err != nil {
+ t.Fatalf("Writev got error %v, want nil", err)
+ }
+
+ // Assert that the result we got back is cumulative.
+ if total != int64(wantBytes) {
+ t.Fatalf("Writev sequence got %d bytes, want %d", total, wantBytes)
+ }
+}