From dfdbdf14fa101e850bb3361f91da6362b98d11d0 Mon Sep 17 00:00:00 2001
From: Kevin Krakauer <krakauer@google.com>
Date: Thu, 17 Oct 2019 13:08:27 -0700
Subject: Refactor pipe to support VFS2.

* Pulls common functionality (IO and locking on open) into pipe_util.go.
* Adds pipe/vfs.go, which implements a subset of vfs.FileDescriptionImpl.

A subsequent change will add support for pipes in memfs.

PiperOrigin-RevId: 275322385
---
 pkg/sentry/kernel/pipe/BUILD            |   3 +
 pkg/sentry/kernel/pipe/node.go          |  72 +----------
 pkg/sentry/kernel/pipe/pipe.go          |  24 +++-
 pkg/sentry/kernel/pipe/pipe_util.go     | 213 +++++++++++++++++++++++++++++++
 pkg/sentry/kernel/pipe/reader_writer.go | 111 +---------------
 pkg/sentry/kernel/pipe/vfs.go           | 220 ++++++++++++++++++++++++++++++++
 6 files changed, 467 insertions(+), 176 deletions(-)
 create mode 100644 pkg/sentry/kernel/pipe/pipe_util.go
 create mode 100644 pkg/sentry/kernel/pipe/vfs.go

diff --git a/pkg/sentry/kernel/pipe/BUILD b/pkg/sentry/kernel/pipe/BUILD
index cde647139..9d34f6d4d 100644
--- a/pkg/sentry/kernel/pipe/BUILD
+++ b/pkg/sentry/kernel/pipe/BUILD
@@ -24,8 +24,10 @@ go_library(
         "device.go",
         "node.go",
         "pipe.go",
+        "pipe_util.go",
         "reader.go",
         "reader_writer.go",
+        "vfs.go",
         "writer.go",
     ],
     importpath = "gvisor.dev/gvisor/pkg/sentry/kernel/pipe",
@@ -40,6 +42,7 @@ go_library(
         "//pkg/sentry/fs/fsutil",
         "//pkg/sentry/safemem",
         "//pkg/sentry/usermem",
+        "//pkg/sentry/vfs",
         "//pkg/syserror",
         "//pkg/waiter",
     ],
diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go
index a2dc72204..4a19ab7ce 100644
--- a/pkg/sentry/kernel/pipe/node.go
+++ b/pkg/sentry/kernel/pipe/node.go
@@ -18,7 +18,6 @@ import (
 	"sync"
 
 	"gvisor.dev/gvisor/pkg/abi/linux"
-	"gvisor.dev/gvisor/pkg/amutex"
 	"gvisor.dev/gvisor/pkg/sentry/context"
 	"gvisor.dev/gvisor/pkg/sentry/fs"
 	"gvisor.dev/gvisor/pkg/sentry/fs/fsutil"
@@ -91,10 +90,10 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
 	switch {
 	case flags.Read && !flags.Write: // O_RDONLY.
 		r := i.p.Open(ctx, d, flags)
-		i.newHandleLocked(&i.rWakeup)
+		newHandleLocked(&i.rWakeup)
 
 		if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() {
-			if !i.waitFor(&i.wWakeup, ctx) {
+			if !waitFor(&i.mu, &i.wWakeup, ctx) {
 				r.DecRef()
 				return nil, syserror.ErrInterrupted
 			}
@@ -107,7 +106,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
 
 	case flags.Write && !flags.Read: // O_WRONLY.
 		w := i.p.Open(ctx, d, flags)
-		i.newHandleLocked(&i.wWakeup)
+		newHandleLocked(&i.wWakeup)
 
 		if i.p.isNamed && !i.p.HasReaders() {
 			// On a nonblocking, write-only open, the open fails with ENXIO if the
@@ -117,7 +116,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
 				return nil, syserror.ENXIO
 			}
 
-			if !i.waitFor(&i.rWakeup, ctx) {
+			if !waitFor(&i.mu, &i.rWakeup, ctx) {
 				w.DecRef()
 				return nil, syserror.ErrInterrupted
 			}
@@ -127,8 +126,8 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
 	case flags.Read && flags.Write: // O_RDWR.
 		// Pipes opened for read-write always succeeds without blocking.
 		rw := i.p.Open(ctx, d, flags)
-		i.newHandleLocked(&i.rWakeup)
-		i.newHandleLocked(&i.wWakeup)
+		newHandleLocked(&i.rWakeup)
+		newHandleLocked(&i.wWakeup)
 		return rw, nil
 
 	default:
@@ -136,65 +135,6 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
 	}
 }
 
-// waitFor blocks until the underlying pipe has at least one reader/writer is
-// announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this
-// function will block for either readers or writers, depending on where
-// 'wakeupChan' points.
-//
-// f.mu must be held by the caller. waitFor returns with f.mu held, but it will
-// drop f.mu before blocking for any reader/writers.
-func (i *inodeOperations) waitFor(wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool {
-	// Ideally this function would simply use a condition variable. However, the
-	// wait needs to be interruptible via 'sleeper', so we must sychronize via a
-	// channel. The synchronization below relies on the fact that closing a
-	// channel unblocks all receives on the channel.
-
-	// Does an appropriate wakeup channel already exist? If not, create a new
-	// one. This is all done under f.mu to avoid races.
-	if *wakeupChan == nil {
-		*wakeupChan = make(chan struct{})
-	}
-
-	// Grab a local reference to the wakeup channel since it may disappear as
-	// soon as we drop f.mu.
-	wakeup := *wakeupChan
-
-	// Drop the lock and prepare to sleep.
-	i.mu.Unlock()
-	cancel := sleeper.SleepStart()
-
-	// Wait for either a new reader/write to be signalled via 'wakeup', or
-	// for the sleep to be cancelled.
-	select {
-	case <-wakeup:
-		sleeper.SleepFinish(true)
-	case <-cancel:
-		sleeper.SleepFinish(false)
-	}
-
-	// Take the lock and check if we were woken. If we were woken and
-	// interrupted, the former takes priority.
-	i.mu.Lock()
-	select {
-	case <-wakeup:
-		return true
-	default:
-		return false
-	}
-}
-
-// newHandleLocked signals a new pipe reader or writer depending on where
-// 'wakeupChan' points. This unblocks any corresponding reader or writer
-// waiting for the other end of the channel to be opened, see Fifo.waitFor.
-//
-// i.mu must be held.
-func (*inodeOperations) newHandleLocked(wakeupChan *chan struct{}) {
-	if *wakeupChan != nil {
-		close(*wakeupChan)
-		*wakeupChan = nil
-	}
-}
-
 func (*inodeOperations) Allocate(_ context.Context, _ *fs.Inode, _, _ int64) error {
 	return syserror.EPIPE
 }
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
index 8e4e8e82e..1a1b38f83 100644
--- a/pkg/sentry/kernel/pipe/pipe.go
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -111,11 +111,27 @@ func NewPipe(isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe {
 	if atomicIOBytes > sizeBytes {
 		atomicIOBytes = sizeBytes
 	}
-	return &Pipe{
-		isNamed:       isNamed,
-		max:           sizeBytes,
-		atomicIOBytes: atomicIOBytes,
+	var p Pipe
+	initPipe(&p, isNamed, sizeBytes, atomicIOBytes)
+	return &p
+}
+
+func initPipe(pipe *Pipe, isNamed bool, sizeBytes, atomicIOBytes int64) {
+	if sizeBytes < MinimumPipeSize {
+		sizeBytes = MinimumPipeSize
+	}
+	if sizeBytes > MaximumPipeSize {
+		sizeBytes = MaximumPipeSize
+	}
+	if atomicIOBytes <= 0 {
+		atomicIOBytes = 1
+	}
+	if atomicIOBytes > sizeBytes {
+		atomicIOBytes = sizeBytes
 	}
+	pipe.isNamed = isNamed
+	pipe.max = sizeBytes
+	pipe.atomicIOBytes = atomicIOBytes
 }
 
 // NewConnectedPipe initializes a pipe and returns a pair of objects
diff --git a/pkg/sentry/kernel/pipe/pipe_util.go b/pkg/sentry/kernel/pipe/pipe_util.go
new file mode 100644
index 000000000..ef9641e6a
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/pipe_util.go
@@ -0,0 +1,213 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+	"io"
+	"math"
+	"sync"
+	"syscall"
+
+	"gvisor.dev/gvisor/pkg/abi/linux"
+	"gvisor.dev/gvisor/pkg/amutex"
+	"gvisor.dev/gvisor/pkg/sentry/arch"
+	"gvisor.dev/gvisor/pkg/sentry/context"
+	"gvisor.dev/gvisor/pkg/sentry/usermem"
+	"gvisor.dev/gvisor/pkg/waiter"
+)
+
+// This file contains Pipe file functionality that is tied to neither VFS nor
+// the old fs architecture.
+
+// Release cleans up the pipe's state.
+func (p *Pipe) Release() {
+	p.rClose()
+	p.wClose()
+
+	// Wake up readers and writers.
+	p.Notify(waiter.EventIn | waiter.EventOut)
+}
+
+// Read reads from the Pipe into dst.
+func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
+	n, err := p.read(ctx, readOps{
+		left: func() int64 {
+			return dst.NumBytes()
+		},
+		limit: func(l int64) {
+			dst = dst.TakeFirst64(l)
+		},
+		read: func(buf *buffer) (int64, error) {
+			n, err := dst.CopyOutFrom(ctx, buf)
+			dst = dst.DropFirst64(n)
+			return n, err
+		},
+	})
+	if n > 0 {
+		p.Notify(waiter.EventOut)
+	}
+	return n, err
+}
+
+// WriteTo writes to w from the Pipe.
+func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) (int64, error) {
+	ops := readOps{
+		left: func() int64 {
+			return count
+		},
+		limit: func(l int64) {
+			count = l
+		},
+		read: func(buf *buffer) (int64, error) {
+			n, err := buf.ReadToWriter(w, count, dup)
+			count -= n
+			return n, err
+		},
+	}
+	if dup {
+		// There is no notification for dup operations.
+		return p.dup(ctx, ops)
+	}
+	n, err := p.read(ctx, ops)
+	if n > 0 {
+		p.Notify(waiter.EventOut)
+	}
+	return n, err
+}
+
+// Write writes to the Pipe from src.
+func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) {
+	n, err := p.write(ctx, writeOps{
+		left: func() int64 {
+			return src.NumBytes()
+		},
+		limit: func(l int64) {
+			src = src.TakeFirst64(l)
+		},
+		write: func(buf *buffer) (int64, error) {
+			n, err := src.CopyInTo(ctx, buf)
+			src = src.DropFirst64(n)
+			return n, err
+		},
+	})
+	if n > 0 {
+		p.Notify(waiter.EventIn)
+	}
+	return n, err
+}
+
+// ReadFrom reads from r to the Pipe.
+func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, error) {
+	n, err := p.write(ctx, writeOps{
+		left: func() int64 {
+			return count
+		},
+		limit: func(l int64) {
+			count = l
+		},
+		write: func(buf *buffer) (int64, error) {
+			n, err := buf.WriteFromReader(r, count)
+			count -= n
+			return n, err
+		},
+	})
+	if n > 0 {
+		p.Notify(waiter.EventIn)
+	}
+	return n, err
+}
+
+// Readiness returns the ready events in the underlying pipe.
+func (p *Pipe) Readiness(mask waiter.EventMask) waiter.EventMask {
+	return p.rwReadiness() & mask
+}
+
+// Ioctl implements ioctls on the Pipe.
+func (p *Pipe) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArguments) (uintptr, error) {
+	// Switch on ioctl request.
+	switch int(args[1].Int()) {
+	case linux.FIONREAD:
+		v := p.queued()
+		if v > math.MaxInt32 {
+			v = math.MaxInt32 // Silently truncate.
+		}
+		// Copy result to user-space.
+		_, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{
+			AddressSpaceActive: true,
+		})
+		return 0, err
+	default:
+		return 0, syscall.ENOTTY
+	}
+}
+
+// waitFor blocks until the underlying pipe has at least one reader/writer is
+// announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this
+// function will block for either readers or writers, depending on where
+// 'wakeupChan' points.
+//
+// mu must be held by the caller. waitFor returns with mu held, but it will
+// drop mu before blocking for any reader/writers.
+func waitFor(mu *sync.Mutex, wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool {
+	// Ideally this function would simply use a condition variable. However, the
+	// wait needs to be interruptible via 'sleeper', so we must sychronize via a
+	// channel. The synchronization below relies on the fact that closing a
+	// channel unblocks all receives on the channel.
+
+	// Does an appropriate wakeup channel already exist? If not, create a new
+	// one. This is all done under f.mu to avoid races.
+	if *wakeupChan == nil {
+		*wakeupChan = make(chan struct{})
+	}
+
+	// Grab a local reference to the wakeup channel since it may disappear as
+	// soon as we drop f.mu.
+	wakeup := *wakeupChan
+
+	// Drop the lock and prepare to sleep.
+	mu.Unlock()
+	cancel := sleeper.SleepStart()
+
+	// Wait for either a new reader/write to be signalled via 'wakeup', or
+	// for the sleep to be cancelled.
+	select {
+	case <-wakeup:
+		sleeper.SleepFinish(true)
+	case <-cancel:
+		sleeper.SleepFinish(false)
+	}
+
+	// Take the lock and check if we were woken. If we were woken and
+	// interrupted, the former takes priority.
+	mu.Lock()
+	select {
+	case <-wakeup:
+		return true
+	default:
+		return false
+	}
+}
+
+// newHandleLocked signals a new pipe reader or writer depending on where
+// 'wakeupChan' points. This unblocks any corresponding reader or writer
+// waiting for the other end of the channel to be opened, see Fifo.waitFor.
+//
+// Precondition: the mutex protecting wakeupChan must be held.
+func newHandleLocked(wakeupChan *chan struct{}) {
+	if *wakeupChan != nil {
+		close(*wakeupChan)
+		*wakeupChan = nil
+	}
+}
diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go
index 7c307f013..b4d29fc77 100644
--- a/pkg/sentry/kernel/pipe/reader_writer.go
+++ b/pkg/sentry/kernel/pipe/reader_writer.go
@@ -16,16 +16,12 @@ package pipe
 
 import (
 	"io"
-	"math"
-	"syscall"
 
-	"gvisor.dev/gvisor/pkg/abi/linux"
 	"gvisor.dev/gvisor/pkg/sentry/arch"
 	"gvisor.dev/gvisor/pkg/sentry/context"
 	"gvisor.dev/gvisor/pkg/sentry/fs"
 	"gvisor.dev/gvisor/pkg/sentry/fs/fsutil"
 	"gvisor.dev/gvisor/pkg/sentry/usermem"
-	"gvisor.dev/gvisor/pkg/waiter"
 )
 
 // ReaderWriter satisfies the FileOperations interface and services both
@@ -45,124 +41,27 @@ type ReaderWriter struct {
 	*Pipe
 }
 
-// Release implements fs.FileOperations.Release.
-func (rw *ReaderWriter) Release() {
-	rw.Pipe.rClose()
-	rw.Pipe.wClose()
-
-	// Wake up readers and writers.
-	rw.Pipe.Notify(waiter.EventIn | waiter.EventOut)
-}
-
 // Read implements fs.FileOperations.Read.
 func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) {
-	n, err := rw.Pipe.read(ctx, readOps{
-		left: func() int64 {
-			return dst.NumBytes()
-		},
-		limit: func(l int64) {
-			dst = dst.TakeFirst64(l)
-		},
-		read: func(buf *buffer) (int64, error) {
-			n, err := dst.CopyOutFrom(ctx, buf)
-			dst = dst.DropFirst64(n)
-			return n, err
-		},
-	})
-	if n > 0 {
-		rw.Pipe.Notify(waiter.EventOut)
-	}
-	return n, err
+	return rw.Pipe.Read(ctx, dst)
 }
 
 // WriteTo implements fs.FileOperations.WriteTo.
 func (rw *ReaderWriter) WriteTo(ctx context.Context, _ *fs.File, w io.Writer, count int64, dup bool) (int64, error) {
-	ops := readOps{
-		left: func() int64 {
-			return count
-		},
-		limit: func(l int64) {
-			count = l
-		},
-		read: func(buf *buffer) (int64, error) {
-			n, err := buf.ReadToWriter(w, count, dup)
-			count -= n
-			return n, err
-		},
-	}
-	if dup {
-		// There is no notification for dup operations.
-		return rw.Pipe.dup(ctx, ops)
-	}
-	n, err := rw.Pipe.read(ctx, ops)
-	if n > 0 {
-		rw.Pipe.Notify(waiter.EventOut)
-	}
-	return n, err
+	return rw.Pipe.WriteTo(ctx, w, count, dup)
 }
 
 // Write implements fs.FileOperations.Write.
 func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) {
-	n, err := rw.Pipe.write(ctx, writeOps{
-		left: func() int64 {
-			return src.NumBytes()
-		},
-		limit: func(l int64) {
-			src = src.TakeFirst64(l)
-		},
-		write: func(buf *buffer) (int64, error) {
-			n, err := src.CopyInTo(ctx, buf)
-			src = src.DropFirst64(n)
-			return n, err
-		},
-	})
-	if n > 0 {
-		rw.Pipe.Notify(waiter.EventIn)
-	}
-	return n, err
+	return rw.Pipe.Write(ctx, src)
 }
 
 // ReadFrom implements fs.FileOperations.WriteTo.
 func (rw *ReaderWriter) ReadFrom(ctx context.Context, _ *fs.File, r io.Reader, count int64) (int64, error) {
-	n, err := rw.Pipe.write(ctx, writeOps{
-		left: func() int64 {
-			return count
-		},
-		limit: func(l int64) {
-			count = l
-		},
-		write: func(buf *buffer) (int64, error) {
-			n, err := buf.WriteFromReader(r, count)
-			count -= n
-			return n, err
-		},
-	})
-	if n > 0 {
-		rw.Pipe.Notify(waiter.EventIn)
-	}
-	return n, err
-}
-
-// Readiness returns the ready events in the underlying pipe.
-func (rw *ReaderWriter) Readiness(mask waiter.EventMask) waiter.EventMask {
-	return rw.Pipe.rwReadiness() & mask
+	return rw.Pipe.ReadFrom(ctx, r, count)
 }
 
 // Ioctl implements fs.FileOperations.Ioctl.
 func (rw *ReaderWriter) Ioctl(ctx context.Context, _ *fs.File, io usermem.IO, args arch.SyscallArguments) (uintptr, error) {
-	// Switch on ioctl request.
-	switch int(args[1].Int()) {
-	case linux.FIONREAD:
-		v := rw.queued()
-		if v > math.MaxInt32 {
-			v = math.MaxInt32 // Silently truncate.
-		}
-		// Copy result to user-space.
-		_, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{
-			AddressSpaceActive: true,
-		})
-		return 0, err
-	default:
-		return 0, syscall.ENOTTY
-	}
+	return rw.Pipe.Ioctl(ctx, io, args)
 }
diff --git a/pkg/sentry/kernel/pipe/vfs.go b/pkg/sentry/kernel/pipe/vfs.go
new file mode 100644
index 000000000..02320b830
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/vfs.go
@@ -0,0 +1,220 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+	"sync"
+
+	"gvisor.dev/gvisor/pkg/abi/linux"
+	"gvisor.dev/gvisor/pkg/sentry/arch"
+	"gvisor.dev/gvisor/pkg/sentry/context"
+	"gvisor.dev/gvisor/pkg/sentry/usermem"
+	"gvisor.dev/gvisor/pkg/sentry/vfs"
+	"gvisor.dev/gvisor/pkg/syserror"
+	"gvisor.dev/gvisor/pkg/waiter"
+)
+
+// This file contains types enabling the pipe package to be used with the vfs
+// package.
+
+// VFSPipe represents the actual pipe, analagous to an inode. VFSPipes should
+// not be copied.
+type VFSPipe struct {
+	// mu protects the fields below.
+	mu sync.Mutex `state:"nosave"`
+
+	// pipe is the underlying pipe.
+	pipe Pipe
+
+	// Channels for synchronizing the creation of new readers and writers
+	// of this fifo. See waitFor and newHandleLocked.
+	//
+	// These are not saved/restored because all waiters are unblocked on
+	// save, and either automatically restart (via ERESTARTSYS) or return
+	// EINTR on resume. On restarts via ERESTARTSYS, the appropriate
+	// channel will be recreated.
+	rWakeup chan struct{} `state:"nosave"`
+	wWakeup chan struct{} `state:"nosave"`
+}
+
+// NewVFSPipe returns an initialized VFSPipe.
+func NewVFSPipe(sizeBytes, atomicIOBytes int64) *VFSPipe {
+	var vp VFSPipe
+	initPipe(&vp.pipe, true /* isNamed */, sizeBytes, atomicIOBytes)
+	return &vp
+}
+
+// NewVFSPipeFD opens a named pipe. Named pipes have special blocking semantics
+// during open:
+//
+// "Normally, opening the FIFO blocks until the other end is opened also. A
+// process can open a FIFO in nonblocking mode. In this case, opening for
+// read-only will succeed even if no-one has opened on the write side yet,
+// opening for write-only will fail with ENXIO (no such device or address)
+// unless the other end has already been opened. Under Linux, opening a FIFO
+// for read and write will succeed both in blocking and nonblocking mode. POSIX
+// leaves this behavior undefined. This can be used to open a FIFO for writing
+// while there are no readers available." - fifo(7)
+func (vp *VFSPipe) NewVFSPipeFD(ctx context.Context, rp *vfs.ResolvingPath, vfsd *vfs.Dentry, vfsfd *vfs.FileDescription, flags uint32) (*VFSPipeFD, error) {
+	vp.mu.Lock()
+	defer vp.mu.Unlock()
+
+	readable := vfs.MayReadFileWithOpenFlags(flags)
+	writable := vfs.MayWriteFileWithOpenFlags(flags)
+	if !readable && !writable {
+		return nil, syserror.EINVAL
+	}
+
+	vfd, err := vp.open(rp, vfsd, vfsfd, flags)
+	if err != nil {
+		return nil, err
+	}
+
+	switch {
+	case readable && writable:
+		// Pipes opened for read-write always succeed without blocking.
+		newHandleLocked(&vp.rWakeup)
+		newHandleLocked(&vp.wWakeup)
+
+	case readable:
+		newHandleLocked(&vp.rWakeup)
+		// If this pipe is being opened as nonblocking and there's no
+		// writer, we have to wait for a writer to open the other end.
+		if flags&linux.O_NONBLOCK == 0 && !vp.pipe.HasWriters() && !waitFor(&vp.mu, &vp.wWakeup, ctx) {
+			return nil, syserror.EINTR
+		}
+
+	case writable:
+		newHandleLocked(&vp.wWakeup)
+
+		if !vp.pipe.HasReaders() {
+			// Nonblocking, write-only opens fail with ENXIO when
+			// the read side isn't open yet.
+			if flags&linux.O_NONBLOCK != 0 {
+				return nil, syserror.ENXIO
+			}
+			// Wait for a reader to open the other end.
+			if !waitFor(&vp.mu, &vp.rWakeup, ctx) {
+				return nil, syserror.EINTR
+			}
+		}
+
+	default:
+		panic("invalid pipe flags: must be readable, writable, or both")
+	}
+
+	return vfd, nil
+}
+
+// Preconditions: vp.mu must be held.
+func (vp *VFSPipe) open(rp *vfs.ResolvingPath, vfsd *vfs.Dentry, vfsfd *vfs.FileDescription, flags uint32) (*VFSPipeFD, error) {
+	var fd VFSPipeFD
+	fd.flags = flags
+	fd.readable = vfs.MayReadFileWithOpenFlags(flags)
+	fd.writable = vfs.MayWriteFileWithOpenFlags(flags)
+	fd.vfsfd = vfsfd
+	fd.pipe = &vp.pipe
+	if fd.writable {
+		// The corresponding Mount.EndWrite() is in VFSPipe.Release().
+		if err := rp.Mount().CheckBeginWrite(); err != nil {
+			return nil, err
+		}
+	}
+
+	switch {
+	case fd.readable && fd.writable:
+		vp.pipe.rOpen()
+		vp.pipe.wOpen()
+	case fd.readable:
+		vp.pipe.rOpen()
+	case fd.writable:
+		vp.pipe.wOpen()
+	default:
+		panic("invalid pipe flags: must be readable, writable, or both")
+	}
+
+	return &fd, nil
+}
+
+// VFSPipeFD implements a subset of vfs.FileDescriptionImpl for pipes. It is
+// expected that filesystesm will use this in a struct implementing
+// vfs.FileDescriptionImpl.
+type VFSPipeFD struct {
+	pipe     *Pipe
+	flags    uint32
+	readable bool
+	writable bool
+	vfsfd    *vfs.FileDescription
+}
+
+// Release implements vfs.FileDescriptionImpl.Release.
+func (fd *VFSPipeFD) Release() {
+	var event waiter.EventMask
+	if fd.readable {
+		fd.pipe.rClose()
+		event |= waiter.EventIn
+	}
+	if fd.writable {
+		fd.pipe.wClose()
+		event |= waiter.EventOut
+	}
+	if event == 0 {
+		panic("invalid pipe flags: must be readable, writable, or both")
+	}
+
+	if fd.writable {
+		fd.vfsfd.VirtualDentry().Mount().EndWrite()
+	}
+
+	fd.pipe.Notify(event)
+}
+
+// OnClose implements vfs.FileDescriptionImpl.OnClose.
+func (fd *VFSPipeFD) OnClose() error {
+	return nil
+}
+
+// PRead implements vfs.FileDescriptionImpl.PRead.
+func (fd *VFSPipeFD) PRead(_ context.Context, _ usermem.IOSequence, _ int64, _ vfs.ReadOptions) (int64, error) {
+	return 0, syserror.ESPIPE
+}
+
+// Read implements vfs.FileDescriptionImpl.Read.
+func (fd *VFSPipeFD) Read(ctx context.Context, dst usermem.IOSequence, _ vfs.ReadOptions) (int64, error) {
+	if !fd.readable {
+		return 0, syserror.EINVAL
+	}
+
+	return fd.pipe.Read(ctx, dst)
+}
+
+// PWrite implements vfs.FileDescriptionImpl.PWrite.
+func (fd *VFSPipeFD) PWrite(_ context.Context, _ usermem.IOSequence, _ int64, _ vfs.WriteOptions) (int64, error) {
+	return 0, syserror.ESPIPE
+}
+
+// Write implements vfs.FileDescriptionImpl.Write.
+func (fd *VFSPipeFD) Write(ctx context.Context, src usermem.IOSequence, _ vfs.WriteOptions) (int64, error) {
+	if !fd.writable {
+		return 0, syserror.EINVAL
+	}
+
+	return fd.pipe.Write(ctx, src)
+}
+
+// Ioctl implements vfs.FileDescriptionImpl.Ioctl.
+func (fd *VFSPipeFD) Ioctl(ctx context.Context, uio usermem.IO, args arch.SyscallArguments) (uintptr, error) {
+	return fd.pipe.Ioctl(ctx, uio, args)
+}
-- 
cgit v1.2.3