From dfdbdf14fa101e850bb3361f91da6362b98d11d0 Mon Sep 17 00:00:00 2001 From: Kevin Krakauer Date: Thu, 17 Oct 2019 13:08:27 -0700 Subject: Refactor pipe to support VFS2. * Pulls common functionality (IO and locking on open) into pipe_util.go. * Adds pipe/vfs.go, which implements a subset of vfs.FileDescriptionImpl. A subsequent change will add support for pipes in memfs. PiperOrigin-RevId: 275322385 --- pkg/sentry/kernel/pipe/BUILD | 3 + pkg/sentry/kernel/pipe/node.go | 72 +---------- pkg/sentry/kernel/pipe/pipe.go | 24 +++- pkg/sentry/kernel/pipe/pipe_util.go | 213 +++++++++++++++++++++++++++++++ pkg/sentry/kernel/pipe/reader_writer.go | 111 +--------------- pkg/sentry/kernel/pipe/vfs.go | 220 ++++++++++++++++++++++++++++++++ 6 files changed, 467 insertions(+), 176 deletions(-) create mode 100644 pkg/sentry/kernel/pipe/pipe_util.go create mode 100644 pkg/sentry/kernel/pipe/vfs.go (limited to 'pkg/sentry/kernel/pipe') diff --git a/pkg/sentry/kernel/pipe/BUILD b/pkg/sentry/kernel/pipe/BUILD index cde647139..9d34f6d4d 100644 --- a/pkg/sentry/kernel/pipe/BUILD +++ b/pkg/sentry/kernel/pipe/BUILD @@ -24,8 +24,10 @@ go_library( "device.go", "node.go", "pipe.go", + "pipe_util.go", "reader.go", "reader_writer.go", + "vfs.go", "writer.go", ], importpath = "gvisor.dev/gvisor/pkg/sentry/kernel/pipe", @@ -40,6 +42,7 @@ go_library( "//pkg/sentry/fs/fsutil", "//pkg/sentry/safemem", "//pkg/sentry/usermem", + "//pkg/sentry/vfs", "//pkg/syserror", "//pkg/waiter", ], diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go index a2dc72204..4a19ab7ce 100644 --- a/pkg/sentry/kernel/pipe/node.go +++ b/pkg/sentry/kernel/pipe/node.go @@ -18,7 +18,6 @@ import ( "sync" "gvisor.dev/gvisor/pkg/abi/linux" - "gvisor.dev/gvisor/pkg/amutex" "gvisor.dev/gvisor/pkg/sentry/context" "gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sentry/fs/fsutil" @@ -91,10 +90,10 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi switch { case flags.Read && !flags.Write: // O_RDONLY. r := i.p.Open(ctx, d, flags) - i.newHandleLocked(&i.rWakeup) + newHandleLocked(&i.rWakeup) if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() { - if !i.waitFor(&i.wWakeup, ctx) { + if !waitFor(&i.mu, &i.wWakeup, ctx) { r.DecRef() return nil, syserror.ErrInterrupted } @@ -107,7 +106,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi case flags.Write && !flags.Read: // O_WRONLY. w := i.p.Open(ctx, d, flags) - i.newHandleLocked(&i.wWakeup) + newHandleLocked(&i.wWakeup) if i.p.isNamed && !i.p.HasReaders() { // On a nonblocking, write-only open, the open fails with ENXIO if the @@ -117,7 +116,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi return nil, syserror.ENXIO } - if !i.waitFor(&i.rWakeup, ctx) { + if !waitFor(&i.mu, &i.rWakeup, ctx) { w.DecRef() return nil, syserror.ErrInterrupted } @@ -127,8 +126,8 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi case flags.Read && flags.Write: // O_RDWR. // Pipes opened for read-write always succeeds without blocking. rw := i.p.Open(ctx, d, flags) - i.newHandleLocked(&i.rWakeup) - i.newHandleLocked(&i.wWakeup) + newHandleLocked(&i.rWakeup) + newHandleLocked(&i.wWakeup) return rw, nil default: @@ -136,65 +135,6 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi } } -// waitFor blocks until the underlying pipe has at least one reader/writer is -// announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this -// function will block for either readers or writers, depending on where -// 'wakeupChan' points. -// -// f.mu must be held by the caller. waitFor returns with f.mu held, but it will -// drop f.mu before blocking for any reader/writers. -func (i *inodeOperations) waitFor(wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool { - // Ideally this function would simply use a condition variable. However, the - // wait needs to be interruptible via 'sleeper', so we must sychronize via a - // channel. The synchronization below relies on the fact that closing a - // channel unblocks all receives on the channel. - - // Does an appropriate wakeup channel already exist? If not, create a new - // one. This is all done under f.mu to avoid races. - if *wakeupChan == nil { - *wakeupChan = make(chan struct{}) - } - - // Grab a local reference to the wakeup channel since it may disappear as - // soon as we drop f.mu. - wakeup := *wakeupChan - - // Drop the lock and prepare to sleep. - i.mu.Unlock() - cancel := sleeper.SleepStart() - - // Wait for either a new reader/write to be signalled via 'wakeup', or - // for the sleep to be cancelled. - select { - case <-wakeup: - sleeper.SleepFinish(true) - case <-cancel: - sleeper.SleepFinish(false) - } - - // Take the lock and check if we were woken. If we were woken and - // interrupted, the former takes priority. - i.mu.Lock() - select { - case <-wakeup: - return true - default: - return false - } -} - -// newHandleLocked signals a new pipe reader or writer depending on where -// 'wakeupChan' points. This unblocks any corresponding reader or writer -// waiting for the other end of the channel to be opened, see Fifo.waitFor. -// -// i.mu must be held. -func (*inodeOperations) newHandleLocked(wakeupChan *chan struct{}) { - if *wakeupChan != nil { - close(*wakeupChan) - *wakeupChan = nil - } -} - func (*inodeOperations) Allocate(_ context.Context, _ *fs.Inode, _, _ int64) error { return syserror.EPIPE } diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go index 8e4e8e82e..1a1b38f83 100644 --- a/pkg/sentry/kernel/pipe/pipe.go +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -111,11 +111,27 @@ func NewPipe(isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe { if atomicIOBytes > sizeBytes { atomicIOBytes = sizeBytes } - return &Pipe{ - isNamed: isNamed, - max: sizeBytes, - atomicIOBytes: atomicIOBytes, + var p Pipe + initPipe(&p, isNamed, sizeBytes, atomicIOBytes) + return &p +} + +func initPipe(pipe *Pipe, isNamed bool, sizeBytes, atomicIOBytes int64) { + if sizeBytes < MinimumPipeSize { + sizeBytes = MinimumPipeSize + } + if sizeBytes > MaximumPipeSize { + sizeBytes = MaximumPipeSize + } + if atomicIOBytes <= 0 { + atomicIOBytes = 1 + } + if atomicIOBytes > sizeBytes { + atomicIOBytes = sizeBytes } + pipe.isNamed = isNamed + pipe.max = sizeBytes + pipe.atomicIOBytes = atomicIOBytes } // NewConnectedPipe initializes a pipe and returns a pair of objects diff --git a/pkg/sentry/kernel/pipe/pipe_util.go b/pkg/sentry/kernel/pipe/pipe_util.go new file mode 100644 index 000000000..ef9641e6a --- /dev/null +++ b/pkg/sentry/kernel/pipe/pipe_util.go @@ -0,0 +1,213 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "io" + "math" + "sync" + "syscall" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/amutex" + "gvisor.dev/gvisor/pkg/sentry/arch" + "gvisor.dev/gvisor/pkg/sentry/context" + "gvisor.dev/gvisor/pkg/sentry/usermem" + "gvisor.dev/gvisor/pkg/waiter" +) + +// This file contains Pipe file functionality that is tied to neither VFS nor +// the old fs architecture. + +// Release cleans up the pipe's state. +func (p *Pipe) Release() { + p.rClose() + p.wClose() + + // Wake up readers and writers. + p.Notify(waiter.EventIn | waiter.EventOut) +} + +// Read reads from the Pipe into dst. +func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) { + n, err := p.read(ctx, readOps{ + left: func() int64 { + return dst.NumBytes() + }, + limit: func(l int64) { + dst = dst.TakeFirst64(l) + }, + read: func(buf *buffer) (int64, error) { + n, err := dst.CopyOutFrom(ctx, buf) + dst = dst.DropFirst64(n) + return n, err + }, + }) + if n > 0 { + p.Notify(waiter.EventOut) + } + return n, err +} + +// WriteTo writes to w from the Pipe. +func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) (int64, error) { + ops := readOps{ + left: func() int64 { + return count + }, + limit: func(l int64) { + count = l + }, + read: func(buf *buffer) (int64, error) { + n, err := buf.ReadToWriter(w, count, dup) + count -= n + return n, err + }, + } + if dup { + // There is no notification for dup operations. + return p.dup(ctx, ops) + } + n, err := p.read(ctx, ops) + if n > 0 { + p.Notify(waiter.EventOut) + } + return n, err +} + +// Write writes to the Pipe from src. +func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) { + n, err := p.write(ctx, writeOps{ + left: func() int64 { + return src.NumBytes() + }, + limit: func(l int64) { + src = src.TakeFirst64(l) + }, + write: func(buf *buffer) (int64, error) { + n, err := src.CopyInTo(ctx, buf) + src = src.DropFirst64(n) + return n, err + }, + }) + if n > 0 { + p.Notify(waiter.EventIn) + } + return n, err +} + +// ReadFrom reads from r to the Pipe. +func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, error) { + n, err := p.write(ctx, writeOps{ + left: func() int64 { + return count + }, + limit: func(l int64) { + count = l + }, + write: func(buf *buffer) (int64, error) { + n, err := buf.WriteFromReader(r, count) + count -= n + return n, err + }, + }) + if n > 0 { + p.Notify(waiter.EventIn) + } + return n, err +} + +// Readiness returns the ready events in the underlying pipe. +func (p *Pipe) Readiness(mask waiter.EventMask) waiter.EventMask { + return p.rwReadiness() & mask +} + +// Ioctl implements ioctls on the Pipe. +func (p *Pipe) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArguments) (uintptr, error) { + // Switch on ioctl request. + switch int(args[1].Int()) { + case linux.FIONREAD: + v := p.queued() + if v > math.MaxInt32 { + v = math.MaxInt32 // Silently truncate. + } + // Copy result to user-space. + _, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{ + AddressSpaceActive: true, + }) + return 0, err + default: + return 0, syscall.ENOTTY + } +} + +// waitFor blocks until the underlying pipe has at least one reader/writer is +// announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this +// function will block for either readers or writers, depending on where +// 'wakeupChan' points. +// +// mu must be held by the caller. waitFor returns with mu held, but it will +// drop mu before blocking for any reader/writers. +func waitFor(mu *sync.Mutex, wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool { + // Ideally this function would simply use a condition variable. However, the + // wait needs to be interruptible via 'sleeper', so we must sychronize via a + // channel. The synchronization below relies on the fact that closing a + // channel unblocks all receives on the channel. + + // Does an appropriate wakeup channel already exist? If not, create a new + // one. This is all done under f.mu to avoid races. + if *wakeupChan == nil { + *wakeupChan = make(chan struct{}) + } + + // Grab a local reference to the wakeup channel since it may disappear as + // soon as we drop f.mu. + wakeup := *wakeupChan + + // Drop the lock and prepare to sleep. + mu.Unlock() + cancel := sleeper.SleepStart() + + // Wait for either a new reader/write to be signalled via 'wakeup', or + // for the sleep to be cancelled. + select { + case <-wakeup: + sleeper.SleepFinish(true) + case <-cancel: + sleeper.SleepFinish(false) + } + + // Take the lock and check if we were woken. If we were woken and + // interrupted, the former takes priority. + mu.Lock() + select { + case <-wakeup: + return true + default: + return false + } +} + +// newHandleLocked signals a new pipe reader or writer depending on where +// 'wakeupChan' points. This unblocks any corresponding reader or writer +// waiting for the other end of the channel to be opened, see Fifo.waitFor. +// +// Precondition: the mutex protecting wakeupChan must be held. +func newHandleLocked(wakeupChan *chan struct{}) { + if *wakeupChan != nil { + close(*wakeupChan) + *wakeupChan = nil + } +} diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go index 7c307f013..b4d29fc77 100644 --- a/pkg/sentry/kernel/pipe/reader_writer.go +++ b/pkg/sentry/kernel/pipe/reader_writer.go @@ -16,16 +16,12 @@ package pipe import ( "io" - "math" - "syscall" - "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/sentry/arch" "gvisor.dev/gvisor/pkg/sentry/context" "gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sentry/fs/fsutil" "gvisor.dev/gvisor/pkg/sentry/usermem" - "gvisor.dev/gvisor/pkg/waiter" ) // ReaderWriter satisfies the FileOperations interface and services both @@ -45,124 +41,27 @@ type ReaderWriter struct { *Pipe } -// Release implements fs.FileOperations.Release. -func (rw *ReaderWriter) Release() { - rw.Pipe.rClose() - rw.Pipe.wClose() - - // Wake up readers and writers. - rw.Pipe.Notify(waiter.EventIn | waiter.EventOut) -} - // Read implements fs.FileOperations.Read. func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) { - n, err := rw.Pipe.read(ctx, readOps{ - left: func() int64 { - return dst.NumBytes() - }, - limit: func(l int64) { - dst = dst.TakeFirst64(l) - }, - read: func(buf *buffer) (int64, error) { - n, err := dst.CopyOutFrom(ctx, buf) - dst = dst.DropFirst64(n) - return n, err - }, - }) - if n > 0 { - rw.Pipe.Notify(waiter.EventOut) - } - return n, err + return rw.Pipe.Read(ctx, dst) } // WriteTo implements fs.FileOperations.WriteTo. func (rw *ReaderWriter) WriteTo(ctx context.Context, _ *fs.File, w io.Writer, count int64, dup bool) (int64, error) { - ops := readOps{ - left: func() int64 { - return count - }, - limit: func(l int64) { - count = l - }, - read: func(buf *buffer) (int64, error) { - n, err := buf.ReadToWriter(w, count, dup) - count -= n - return n, err - }, - } - if dup { - // There is no notification for dup operations. - return rw.Pipe.dup(ctx, ops) - } - n, err := rw.Pipe.read(ctx, ops) - if n > 0 { - rw.Pipe.Notify(waiter.EventOut) - } - return n, err + return rw.Pipe.WriteTo(ctx, w, count, dup) } // Write implements fs.FileOperations.Write. func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) { - n, err := rw.Pipe.write(ctx, writeOps{ - left: func() int64 { - return src.NumBytes() - }, - limit: func(l int64) { - src = src.TakeFirst64(l) - }, - write: func(buf *buffer) (int64, error) { - n, err := src.CopyInTo(ctx, buf) - src = src.DropFirst64(n) - return n, err - }, - }) - if n > 0 { - rw.Pipe.Notify(waiter.EventIn) - } - return n, err + return rw.Pipe.Write(ctx, src) } // ReadFrom implements fs.FileOperations.WriteTo. func (rw *ReaderWriter) ReadFrom(ctx context.Context, _ *fs.File, r io.Reader, count int64) (int64, error) { - n, err := rw.Pipe.write(ctx, writeOps{ - left: func() int64 { - return count - }, - limit: func(l int64) { - count = l - }, - write: func(buf *buffer) (int64, error) { - n, err := buf.WriteFromReader(r, count) - count -= n - return n, err - }, - }) - if n > 0 { - rw.Pipe.Notify(waiter.EventIn) - } - return n, err -} - -// Readiness returns the ready events in the underlying pipe. -func (rw *ReaderWriter) Readiness(mask waiter.EventMask) waiter.EventMask { - return rw.Pipe.rwReadiness() & mask + return rw.Pipe.ReadFrom(ctx, r, count) } // Ioctl implements fs.FileOperations.Ioctl. func (rw *ReaderWriter) Ioctl(ctx context.Context, _ *fs.File, io usermem.IO, args arch.SyscallArguments) (uintptr, error) { - // Switch on ioctl request. - switch int(args[1].Int()) { - case linux.FIONREAD: - v := rw.queued() - if v > math.MaxInt32 { - v = math.MaxInt32 // Silently truncate. - } - // Copy result to user-space. - _, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{ - AddressSpaceActive: true, - }) - return 0, err - default: - return 0, syscall.ENOTTY - } + return rw.Pipe.Ioctl(ctx, io, args) } diff --git a/pkg/sentry/kernel/pipe/vfs.go b/pkg/sentry/kernel/pipe/vfs.go new file mode 100644 index 000000000..02320b830 --- /dev/null +++ b/pkg/sentry/kernel/pipe/vfs.go @@ -0,0 +1,220 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "sync" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/sentry/arch" + "gvisor.dev/gvisor/pkg/sentry/context" + "gvisor.dev/gvisor/pkg/sentry/usermem" + "gvisor.dev/gvisor/pkg/sentry/vfs" + "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/waiter" +) + +// This file contains types enabling the pipe package to be used with the vfs +// package. + +// VFSPipe represents the actual pipe, analagous to an inode. VFSPipes should +// not be copied. +type VFSPipe struct { + // mu protects the fields below. + mu sync.Mutex `state:"nosave"` + + // pipe is the underlying pipe. + pipe Pipe + + // Channels for synchronizing the creation of new readers and writers + // of this fifo. See waitFor and newHandleLocked. + // + // These are not saved/restored because all waiters are unblocked on + // save, and either automatically restart (via ERESTARTSYS) or return + // EINTR on resume. On restarts via ERESTARTSYS, the appropriate + // channel will be recreated. + rWakeup chan struct{} `state:"nosave"` + wWakeup chan struct{} `state:"nosave"` +} + +// NewVFSPipe returns an initialized VFSPipe. +func NewVFSPipe(sizeBytes, atomicIOBytes int64) *VFSPipe { + var vp VFSPipe + initPipe(&vp.pipe, true /* isNamed */, sizeBytes, atomicIOBytes) + return &vp +} + +// NewVFSPipeFD opens a named pipe. Named pipes have special blocking semantics +// during open: +// +// "Normally, opening the FIFO blocks until the other end is opened also. A +// process can open a FIFO in nonblocking mode. In this case, opening for +// read-only will succeed even if no-one has opened on the write side yet, +// opening for write-only will fail with ENXIO (no such device or address) +// unless the other end has already been opened. Under Linux, opening a FIFO +// for read and write will succeed both in blocking and nonblocking mode. POSIX +// leaves this behavior undefined. This can be used to open a FIFO for writing +// while there are no readers available." - fifo(7) +func (vp *VFSPipe) NewVFSPipeFD(ctx context.Context, rp *vfs.ResolvingPath, vfsd *vfs.Dentry, vfsfd *vfs.FileDescription, flags uint32) (*VFSPipeFD, error) { + vp.mu.Lock() + defer vp.mu.Unlock() + + readable := vfs.MayReadFileWithOpenFlags(flags) + writable := vfs.MayWriteFileWithOpenFlags(flags) + if !readable && !writable { + return nil, syserror.EINVAL + } + + vfd, err := vp.open(rp, vfsd, vfsfd, flags) + if err != nil { + return nil, err + } + + switch { + case readable && writable: + // Pipes opened for read-write always succeed without blocking. + newHandleLocked(&vp.rWakeup) + newHandleLocked(&vp.wWakeup) + + case readable: + newHandleLocked(&vp.rWakeup) + // If this pipe is being opened as nonblocking and there's no + // writer, we have to wait for a writer to open the other end. + if flags&linux.O_NONBLOCK == 0 && !vp.pipe.HasWriters() && !waitFor(&vp.mu, &vp.wWakeup, ctx) { + return nil, syserror.EINTR + } + + case writable: + newHandleLocked(&vp.wWakeup) + + if !vp.pipe.HasReaders() { + // Nonblocking, write-only opens fail with ENXIO when + // the read side isn't open yet. + if flags&linux.O_NONBLOCK != 0 { + return nil, syserror.ENXIO + } + // Wait for a reader to open the other end. + if !waitFor(&vp.mu, &vp.rWakeup, ctx) { + return nil, syserror.EINTR + } + } + + default: + panic("invalid pipe flags: must be readable, writable, or both") + } + + return vfd, nil +} + +// Preconditions: vp.mu must be held. +func (vp *VFSPipe) open(rp *vfs.ResolvingPath, vfsd *vfs.Dentry, vfsfd *vfs.FileDescription, flags uint32) (*VFSPipeFD, error) { + var fd VFSPipeFD + fd.flags = flags + fd.readable = vfs.MayReadFileWithOpenFlags(flags) + fd.writable = vfs.MayWriteFileWithOpenFlags(flags) + fd.vfsfd = vfsfd + fd.pipe = &vp.pipe + if fd.writable { + // The corresponding Mount.EndWrite() is in VFSPipe.Release(). + if err := rp.Mount().CheckBeginWrite(); err != nil { + return nil, err + } + } + + switch { + case fd.readable && fd.writable: + vp.pipe.rOpen() + vp.pipe.wOpen() + case fd.readable: + vp.pipe.rOpen() + case fd.writable: + vp.pipe.wOpen() + default: + panic("invalid pipe flags: must be readable, writable, or both") + } + + return &fd, nil +} + +// VFSPipeFD implements a subset of vfs.FileDescriptionImpl for pipes. It is +// expected that filesystesm will use this in a struct implementing +// vfs.FileDescriptionImpl. +type VFSPipeFD struct { + pipe *Pipe + flags uint32 + readable bool + writable bool + vfsfd *vfs.FileDescription +} + +// Release implements vfs.FileDescriptionImpl.Release. +func (fd *VFSPipeFD) Release() { + var event waiter.EventMask + if fd.readable { + fd.pipe.rClose() + event |= waiter.EventIn + } + if fd.writable { + fd.pipe.wClose() + event |= waiter.EventOut + } + if event == 0 { + panic("invalid pipe flags: must be readable, writable, or both") + } + + if fd.writable { + fd.vfsfd.VirtualDentry().Mount().EndWrite() + } + + fd.pipe.Notify(event) +} + +// OnClose implements vfs.FileDescriptionImpl.OnClose. +func (fd *VFSPipeFD) OnClose() error { + return nil +} + +// PRead implements vfs.FileDescriptionImpl.PRead. +func (fd *VFSPipeFD) PRead(_ context.Context, _ usermem.IOSequence, _ int64, _ vfs.ReadOptions) (int64, error) { + return 0, syserror.ESPIPE +} + +// Read implements vfs.FileDescriptionImpl.Read. +func (fd *VFSPipeFD) Read(ctx context.Context, dst usermem.IOSequence, _ vfs.ReadOptions) (int64, error) { + if !fd.readable { + return 0, syserror.EINVAL + } + + return fd.pipe.Read(ctx, dst) +} + +// PWrite implements vfs.FileDescriptionImpl.PWrite. +func (fd *VFSPipeFD) PWrite(_ context.Context, _ usermem.IOSequence, _ int64, _ vfs.WriteOptions) (int64, error) { + return 0, syserror.ESPIPE +} + +// Write implements vfs.FileDescriptionImpl.Write. +func (fd *VFSPipeFD) Write(ctx context.Context, src usermem.IOSequence, _ vfs.WriteOptions) (int64, error) { + if !fd.writable { + return 0, syserror.EINVAL + } + + return fd.pipe.Write(ctx, src) +} + +// Ioctl implements vfs.FileDescriptionImpl.Ioctl. +func (fd *VFSPipeFD) Ioctl(ctx context.Context, uio usermem.IO, args arch.SyscallArguments) (uintptr, error) { + return fd.pipe.Ioctl(ctx, uio, args) +} -- cgit v1.2.3