summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/kernel/pipe
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/sentry/kernel/pipe')
-rw-r--r--pkg/sentry/kernel/pipe/buffer.go90
-rwxr-xr-xpkg/sentry/kernel/pipe/buffer_list.go173
-rw-r--r--pkg/sentry/kernel/pipe/device.go20
-rw-r--r--pkg/sentry/kernel/pipe/node.go196
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go429
-rwxr-xr-xpkg/sentry/kernel/pipe/pipe_state_autogen.go134
-rw-r--r--pkg/sentry/kernel/pipe/reader.go42
-rw-r--r--pkg/sentry/kernel/pipe/reader_writer.go96
-rw-r--r--pkg/sentry/kernel/pipe/writer.go42
9 files changed, 1222 insertions, 0 deletions
diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go
new file mode 100644
index 000000000..4360dc44f
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/buffer.go
@@ -0,0 +1,90 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "sync"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/safemem"
+)
+
+// buffer encapsulates a queueable byte buffer.
+//
+// Note that the total size is slightly less than two pages. This
+// is done intentionally to ensure that the buffer object aligns
+// with runtime internals. We have no hard size or alignment
+// requirements. This two page size will effectively minimize
+// internal fragmentation, but still have a large enough chunk
+// to limit excessive segmentation.
+//
+// +stateify savable
+type buffer struct {
+ data [8144]byte
+ read int
+ write int
+ bufferEntry
+}
+
+// Reset resets internal data.
+//
+// This must be called before use.
+func (b *buffer) Reset() {
+ b.read = 0
+ b.write = 0
+}
+
+// Empty indicates the buffer is empty.
+//
+// This indicates there is no data left to read.
+func (b *buffer) Empty() bool {
+ return b.read == b.write
+}
+
+// Full indicates the buffer is full.
+//
+// This indicates there is no capacity left to write.
+func (b *buffer) Full() bool {
+ return b.write == len(b.data)
+}
+
+// WriteFromBlocks implements safemem.Writer.WriteFromBlocks.
+func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) {
+ dst := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.write:]))
+ n, err := safemem.CopySeq(dst, srcs)
+ b.write += int(n)
+ return n, err
+}
+
+// ReadToBlocks implements safemem.Reader.ReadToBlocks.
+func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
+ src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write]))
+ n, err := safemem.CopySeq(dsts, src)
+ b.read += int(n)
+ return n, err
+}
+
+// bufferPool is a pool for buffers.
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(buffer)
+ },
+}
+
+// newBuffer grabs a new buffer from the pool.
+func newBuffer() *buffer {
+ b := bufferPool.Get().(*buffer)
+ b.Reset()
+ return b
+}
diff --git a/pkg/sentry/kernel/pipe/buffer_list.go b/pkg/sentry/kernel/pipe/buffer_list.go
new file mode 100755
index 000000000..42ec78788
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/buffer_list.go
@@ -0,0 +1,173 @@
+package pipe
+
+// ElementMapper provides an identity mapping by default.
+//
+// This can be replaced to provide a struct that maps elements to linker
+// objects, if they are not the same. An ElementMapper is not typically
+// required if: Linker is left as is, Element is left as is, or Linker and
+// Element are the same type.
+type bufferElementMapper struct{}
+
+// linkerFor maps an Element to a Linker.
+//
+// This default implementation should be inlined.
+//
+//go:nosplit
+func (bufferElementMapper) linkerFor(elem *buffer) *buffer { return elem }
+
+// List is an intrusive list. Entries can be added to or removed from the list
+// in O(1) time and with no additional memory allocations.
+//
+// The zero value for List is an empty list ready to use.
+//
+// To iterate over a list (where l is a List):
+// for e := l.Front(); e != nil; e = e.Next() {
+// // do something with e.
+// }
+//
+// +stateify savable
+type bufferList struct {
+ head *buffer
+ tail *buffer
+}
+
+// Reset resets list l to the empty state.
+func (l *bufferList) Reset() {
+ l.head = nil
+ l.tail = nil
+}
+
+// Empty returns true iff the list is empty.
+func (l *bufferList) Empty() bool {
+ return l.head == nil
+}
+
+// Front returns the first element of list l or nil.
+func (l *bufferList) Front() *buffer {
+ return l.head
+}
+
+// Back returns the last element of list l or nil.
+func (l *bufferList) Back() *buffer {
+ return l.tail
+}
+
+// PushFront inserts the element e at the front of list l.
+func (l *bufferList) PushFront(e *buffer) {
+ bufferElementMapper{}.linkerFor(e).SetNext(l.head)
+ bufferElementMapper{}.linkerFor(e).SetPrev(nil)
+
+ if l.head != nil {
+ bufferElementMapper{}.linkerFor(l.head).SetPrev(e)
+ } else {
+ l.tail = e
+ }
+
+ l.head = e
+}
+
+// PushBack inserts the element e at the back of list l.
+func (l *bufferList) PushBack(e *buffer) {
+ bufferElementMapper{}.linkerFor(e).SetNext(nil)
+ bufferElementMapper{}.linkerFor(e).SetPrev(l.tail)
+
+ if l.tail != nil {
+ bufferElementMapper{}.linkerFor(l.tail).SetNext(e)
+ } else {
+ l.head = e
+ }
+
+ l.tail = e
+}
+
+// PushBackList inserts list m at the end of list l, emptying m.
+func (l *bufferList) PushBackList(m *bufferList) {
+ if l.head == nil {
+ l.head = m.head
+ l.tail = m.tail
+ } else if m.head != nil {
+ bufferElementMapper{}.linkerFor(l.tail).SetNext(m.head)
+ bufferElementMapper{}.linkerFor(m.head).SetPrev(l.tail)
+
+ l.tail = m.tail
+ }
+
+ m.head = nil
+ m.tail = nil
+}
+
+// InsertAfter inserts e after b.
+func (l *bufferList) InsertAfter(b, e *buffer) {
+ a := bufferElementMapper{}.linkerFor(b).Next()
+ bufferElementMapper{}.linkerFor(e).SetNext(a)
+ bufferElementMapper{}.linkerFor(e).SetPrev(b)
+ bufferElementMapper{}.linkerFor(b).SetNext(e)
+
+ if a != nil {
+ bufferElementMapper{}.linkerFor(a).SetPrev(e)
+ } else {
+ l.tail = e
+ }
+}
+
+// InsertBefore inserts e before a.
+func (l *bufferList) InsertBefore(a, e *buffer) {
+ b := bufferElementMapper{}.linkerFor(a).Prev()
+ bufferElementMapper{}.linkerFor(e).SetNext(a)
+ bufferElementMapper{}.linkerFor(e).SetPrev(b)
+ bufferElementMapper{}.linkerFor(a).SetPrev(e)
+
+ if b != nil {
+ bufferElementMapper{}.linkerFor(b).SetNext(e)
+ } else {
+ l.head = e
+ }
+}
+
+// Remove removes e from l.
+func (l *bufferList) Remove(e *buffer) {
+ prev := bufferElementMapper{}.linkerFor(e).Prev()
+ next := bufferElementMapper{}.linkerFor(e).Next()
+
+ if prev != nil {
+ bufferElementMapper{}.linkerFor(prev).SetNext(next)
+ } else {
+ l.head = next
+ }
+
+ if next != nil {
+ bufferElementMapper{}.linkerFor(next).SetPrev(prev)
+ } else {
+ l.tail = prev
+ }
+}
+
+// Entry is a default implementation of Linker. Users can add anonymous fields
+// of this type to their structs to make them automatically implement the
+// methods needed by List.
+//
+// +stateify savable
+type bufferEntry struct {
+ next *buffer
+ prev *buffer
+}
+
+// Next returns the entry that follows e in the list.
+func (e *bufferEntry) Next() *buffer {
+ return e.next
+}
+
+// Prev returns the entry that precedes e in the list.
+func (e *bufferEntry) Prev() *buffer {
+ return e.prev
+}
+
+// SetNext assigns 'entry' as the entry that follows e in the list.
+func (e *bufferEntry) SetNext(elem *buffer) {
+ e.next = elem
+}
+
+// SetPrev assigns 'entry' as the entry that precedes e in the list.
+func (e *bufferEntry) SetPrev(elem *buffer) {
+ e.prev = elem
+}
diff --git a/pkg/sentry/kernel/pipe/device.go b/pkg/sentry/kernel/pipe/device.go
new file mode 100644
index 000000000..eb59e15a1
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/device.go
@@ -0,0 +1,20 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import "gvisor.googlesource.com/gvisor/pkg/sentry/device"
+
+// pipeDevice is used for all pipe files.
+var pipeDevice = device.NewAnonDevice()
diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go
new file mode 100644
index 000000000..926c4c623
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/node.go
@@ -0,0 +1,196 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "sync"
+
+ "gvisor.googlesource.com/gvisor/pkg/abi/linux"
+ "gvisor.googlesource.com/gvisor/pkg/amutex"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+)
+
+// inodeOperations implements fs.InodeOperations for pipes.
+//
+// +stateify savable
+type inodeOperations struct {
+ fsutil.InodeGenericChecker `state:"nosave"`
+ fsutil.InodeNoExtendedAttributes `state:"nosave"`
+ fsutil.InodeNoopRelease `state:"nosave"`
+ fsutil.InodeNoopTruncate `state:"nosave"`
+ fsutil.InodeNoopWriteOut `state:"nosave"`
+ fsutil.InodeNotDirectory `state:"nosave"`
+ fsutil.InodeNotMappable `state:"nosave"`
+ fsutil.InodeNotSocket `state:"nosave"`
+ fsutil.InodeNotSymlink `state:"nosave"`
+ fsutil.InodeNotVirtual `state:"nosave"`
+
+ fsutil.InodeSimpleAttributes
+
+ // mu protects the fields below.
+ mu sync.Mutex `state:"nosave"`
+
+ // p is the underlying Pipe object representing this fifo.
+ p *Pipe
+
+ // Channels for synchronizing the creation of new readers and writers of
+ // this fifo. See waitFor and newHandleLocked.
+ //
+ // These are not saved/restored because all waiters are unblocked on save,
+ // and either automatically restart (via ERESTARTSYS) or return EINTR on
+ // resume. On restarts via ERESTARTSYS, the appropriate channel will be
+ // recreated.
+ rWakeup chan struct{} `state:"nosave"`
+ wWakeup chan struct{} `state:"nosave"`
+}
+
+var _ fs.InodeOperations = (*inodeOperations)(nil)
+
+// NewInodeOperations returns a new fs.InodeOperations for a given pipe.
+func NewInodeOperations(ctx context.Context, perms fs.FilePermissions, p *Pipe) *inodeOperations {
+ return &inodeOperations{
+ InodeSimpleAttributes: fsutil.NewInodeSimpleAttributes(ctx, fs.FileOwnerFromContext(ctx), perms, linux.PIPEFS_MAGIC),
+ p: p,
+ }
+}
+
+// GetFile implements fs.InodeOperations.GetFile. Named pipes have special blocking
+// semantics during open:
+//
+// "Normally, opening the FIFO blocks until the other end is opened also. A
+// process can open a FIFO in nonblocking mode. In this case, opening for
+// read-only will succeed even if no-one has opened on the write side yet,
+// opening for write-only will fail with ENXIO (no such device or address)
+// unless the other end has already been opened. Under Linux, opening a FIFO
+// for read and write will succeed both in blocking and nonblocking mode. POSIX
+// leaves this behavior undefined. This can be used to open a FIFO for writing
+// while there are no readers available." - fifo(7)
+func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) (*fs.File, error) {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+
+ switch {
+ case flags.Read && !flags.Write: // O_RDONLY.
+ r := i.p.Open(ctx, flags)
+ i.newHandleLocked(&i.rWakeup)
+
+ if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() {
+ if !i.waitFor(&i.wWakeup, ctx) {
+ r.DecRef()
+ return nil, syserror.ErrInterrupted
+ }
+ }
+
+ // By now, either we're doing a nonblocking open or we have a writer. On
+ // a nonblocking read-only open, the open succeeds even if no-one has
+ // opened the write side yet.
+ return r, nil
+
+ case flags.Write && !flags.Read: // O_WRONLY.
+ w := i.p.Open(ctx, flags)
+ i.newHandleLocked(&i.wWakeup)
+
+ if i.p.isNamed && !i.p.HasReaders() {
+ // On a nonblocking, write-only open, the open fails with ENXIO if the
+ // read side isn't open yet.
+ if flags.NonBlocking {
+ w.DecRef()
+ return nil, syserror.ENXIO
+ }
+
+ if !i.waitFor(&i.rWakeup, ctx) {
+ w.DecRef()
+ return nil, syserror.ErrInterrupted
+ }
+ }
+ return w, nil
+
+ case flags.Read && flags.Write: // O_RDWR.
+ // Pipes opened for read-write always succeeds without blocking.
+ rw := i.p.Open(ctx, flags)
+ i.newHandleLocked(&i.rWakeup)
+ i.newHandleLocked(&i.wWakeup)
+ return rw, nil
+
+ default:
+ return nil, syserror.EINVAL
+ }
+}
+
+// waitFor blocks until the underlying pipe has at least one reader/writer is
+// announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this
+// function will block for either readers or writers, depending on where
+// 'wakeupChan' points.
+//
+// f.mu must be held by the caller. waitFor returns with f.mu held, but it will
+// drop f.mu before blocking for any reader/writers.
+func (i *inodeOperations) waitFor(wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool {
+ // Ideally this function would simply use a condition variable. However, the
+ // wait needs to be interruptible via 'sleeper', so we must sychronize via a
+ // channel. The synchronization below relies on the fact that closing a
+ // channel unblocks all receives on the channel.
+
+ // Does an appropriate wakeup channel already exist? If not, create a new
+ // one. This is all done under f.mu to avoid races.
+ if *wakeupChan == nil {
+ *wakeupChan = make(chan struct{})
+ }
+
+ // Grab a local reference to the wakeup channel since it may disappear as
+ // soon as we drop f.mu.
+ wakeup := *wakeupChan
+
+ // Drop the lock and prepare to sleep.
+ i.mu.Unlock()
+ cancel := sleeper.SleepStart()
+
+ // Wait for either a new reader/write to be signalled via 'wakeup', or
+ // for the sleep to be cancelled.
+ select {
+ case <-wakeup:
+ sleeper.SleepFinish(true)
+ case <-cancel:
+ sleeper.SleepFinish(false)
+ }
+
+ // Take the lock and check if we were woken. If we were woken and
+ // interrupted, the former takes priority.
+ i.mu.Lock()
+ select {
+ case <-wakeup:
+ return true
+ default:
+ return false
+ }
+}
+
+// newHandleLocked signals a new pipe reader or writer depending on where
+// 'wakeupChan' points. This unblocks any corresponding reader or writer
+// waiting for the other end of the channel to be opened, see Fifo.waitFor.
+//
+// i.mu must be held.
+func (*inodeOperations) newHandleLocked(wakeupChan *chan struct{}) {
+ if *wakeupChan != nil {
+ close(*wakeupChan)
+ *wakeupChan = nil
+ }
+}
+
+func (*inodeOperations) Allocate(_ context.Context, _ *fs.Inode, _, _ int64) error {
+ return syserror.EPIPE
+}
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
new file mode 100644
index 000000000..b65204492
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -0,0 +1,429 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package pipe provides a pipe implementation.
+package pipe
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "syscall"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+const (
+ // MinimumPipeSize is a hard limit of the minimum size of a pipe.
+ MinimumPipeSize = 64 << 10
+
+ // DefaultPipeSize is the system-wide default size of a pipe in bytes.
+ DefaultPipeSize = MinimumPipeSize
+
+ // MaximumPipeSize is a hard limit on the maximum size of a pipe.
+ MaximumPipeSize = 8 << 20
+)
+
+// Sizer is an interface for setting and getting the size of a pipe.
+//
+// It is implemented by Pipe and, through embedding, all other types.
+type Sizer interface {
+ // PipeSize returns the pipe capacity in bytes.
+ PipeSize() int64
+
+ // SetPipeSize sets the new pipe capacity in bytes.
+ //
+ // The new size is returned (which may be capped).
+ SetPipeSize(int64) (int64, error)
+}
+
+// Pipe is an encapsulation of a platform-independent pipe.
+// It manages a buffered byte queue shared between a reader/writer
+// pair.
+//
+// +stateify savable
+type Pipe struct {
+ waiter.Queue `state:"nosave"`
+
+ // isNamed indicates whether this is a named pipe.
+ //
+ // This value is immutable.
+ isNamed bool
+
+ // atomicIOBytes is the maximum number of bytes that the pipe will
+ // guarantee atomic reads or writes atomically.
+ //
+ // This value is immutable.
+ atomicIOBytes int64
+
+ // The dirent backing this pipe. Shared by all readers and writers.
+ //
+ // This value is immutable.
+ Dirent *fs.Dirent
+
+ // The number of active readers for this pipe.
+ //
+ // Access atomically.
+ readers int32
+
+ // The number of active writes for this pipe.
+ //
+ // Access atomically.
+ writers int32
+
+ // mu protects all pipe internal state below.
+ mu sync.Mutex `state:"nosave"`
+
+ // data is the buffer queue of pipe contents.
+ //
+ // This is protected by mu.
+ data bufferList
+
+ // max is the maximum size of the pipe in bytes. When this max has been
+ // reached, writers will get EWOULDBLOCK.
+ //
+ // This is protected by mu.
+ max int64
+
+ // size is the current size of the pipe in bytes.
+ //
+ // This is protected by mu.
+ size int64
+
+ // hadWriter indicates if this pipe ever had a writer. Note that this
+ // does not necessarily indicate there is *currently* a writer, just
+ // that there has been a writer at some point since the pipe was
+ // created.
+ //
+ // This is protected by mu.
+ hadWriter bool
+}
+
+// NewPipe initializes and returns a pipe.
+//
+// N.B. The size and atomicIOBytes will be bounded.
+func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe {
+ if sizeBytes < MinimumPipeSize {
+ sizeBytes = MinimumPipeSize
+ }
+ if sizeBytes > MaximumPipeSize {
+ sizeBytes = MaximumPipeSize
+ }
+ if atomicIOBytes <= 0 {
+ atomicIOBytes = 1
+ }
+ if atomicIOBytes > sizeBytes {
+ atomicIOBytes = sizeBytes
+ }
+ p := &Pipe{
+ isNamed: isNamed,
+ max: sizeBytes,
+ atomicIOBytes: atomicIOBytes,
+ }
+
+ // Build the fs.Dirent of this pipe, shared by all fs.Files associated
+ // with this pipe.
+ perms := fs.FilePermissions{
+ User: fs.PermMask{Read: true, Write: true},
+ }
+ iops := NewInodeOperations(ctx, perms, p)
+ ino := pipeDevice.NextIno()
+ sattr := fs.StableAttr{
+ Type: fs.Pipe,
+ DeviceID: pipeDevice.DeviceID(),
+ InodeID: ino,
+ BlockSize: int64(atomicIOBytes),
+ }
+ ms := fs.NewPseudoMountSource()
+ p.Dirent = fs.NewDirent(fs.NewInode(iops, ms, sattr), fmt.Sprintf("pipe:[%d]", ino))
+ return p
+}
+
+// NewConnectedPipe initializes a pipe and returns a pair of objects
+// representing the read and write ends of the pipe.
+func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs.File, *fs.File) {
+ p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes)
+ return p.Open(ctx, fs.FileFlags{Read: true}), p.Open(ctx, fs.FileFlags{Write: true})
+}
+
+// Open opens the pipe and returns a new file.
+//
+// Precondition: at least one of flags.Read or flags.Write must be set.
+func (p *Pipe) Open(ctx context.Context, flags fs.FileFlags) *fs.File {
+ switch {
+ case flags.Read && flags.Write:
+ p.rOpen()
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &ReaderWriter{
+ Pipe: p,
+ })
+ case flags.Read:
+ p.rOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Reader{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ case flags.Write:
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Writer{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ default:
+ // Precondition violated.
+ panic("invalid pipe flags")
+ }
+}
+
+// read reads data from the pipe into dst and returns the number of bytes
+// read, or returns ErrWouldBlock if the pipe is empty.
+//
+// Precondition: this pipe must have readers.
+func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
+ // Don't block for a zero-length read even if the pipe is empty.
+ if dst.NumBytes() == 0 {
+ return 0, nil
+ }
+
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ // Is the pipe empty?
+ if p.size == 0 {
+ if !p.HasWriters() {
+ // There are no writers, return EOF.
+ return 0, nil
+ }
+ return 0, syserror.ErrWouldBlock
+ }
+
+ // Limit how much we consume.
+ if dst.NumBytes() > p.size {
+ dst = dst.TakeFirst64(p.size)
+ }
+
+ done := int64(0)
+ for dst.NumBytes() > 0 {
+ // Pop the first buffer.
+ first := p.data.Front()
+ if first == nil {
+ break
+ }
+
+ // Copy user data.
+ n, err := dst.CopyOutFrom(ctx, first)
+ done += int64(n)
+ p.size -= n
+ dst = dst.DropFirst64(n)
+
+ // Empty buffer?
+ if first.Empty() {
+ // Push to the free list.
+ p.data.Remove(first)
+ bufferPool.Put(first)
+ }
+
+ // Handle errors.
+ if err != nil {
+ return done, err
+ }
+ }
+
+ return done, nil
+}
+
+// write writes data from sv into the pipe and returns the number of bytes
+// written. If no bytes are written because the pipe is full (or has less than
+// atomicIOBytes free capacity), write returns ErrWouldBlock.
+//
+// Precondition: this pipe must have writers.
+func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ // Can't write to a pipe with no readers.
+ if !p.HasReaders() {
+ return 0, syscall.EPIPE
+ }
+
+ // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
+ // atomic, but requires no atomicity for writes larger than this.
+ wanted := src.NumBytes()
+ if avail := p.max - p.size; wanted > avail {
+ if wanted <= p.atomicIOBytes {
+ return 0, syserror.ErrWouldBlock
+ }
+ // Limit to the available capacity.
+ src = src.TakeFirst64(avail)
+ }
+
+ done := int64(0)
+ for src.NumBytes() > 0 {
+ // Need a new buffer?
+ last := p.data.Back()
+ if last == nil || last.Full() {
+ // Add a new buffer to the data list.
+ last = newBuffer()
+ p.data.PushBack(last)
+ }
+
+ // Copy user data.
+ n, err := src.CopyInTo(ctx, last)
+ done += int64(n)
+ p.size += n
+ src = src.DropFirst64(n)
+
+ // Handle errors.
+ if err != nil {
+ return done, err
+ }
+ }
+ if wanted > done {
+ // Partial write due to full pipe.
+ return done, syserror.ErrWouldBlock
+ }
+
+ return done, nil
+}
+
+// rOpen signals a new reader of the pipe.
+func (p *Pipe) rOpen() {
+ atomic.AddInt32(&p.readers, 1)
+}
+
+// wOpen signals a new writer of the pipe.
+func (p *Pipe) wOpen() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ p.hadWriter = true
+ atomic.AddInt32(&p.writers, 1)
+}
+
+// rClose signals that a reader has closed their end of the pipe.
+func (p *Pipe) rClose() {
+ newReaders := atomic.AddInt32(&p.readers, -1)
+ if newReaders < 0 {
+ panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders))
+ }
+}
+
+// wClose signals that a writer has closed their end of the pipe.
+func (p *Pipe) wClose() {
+ newWriters := atomic.AddInt32(&p.writers, -1)
+ if newWriters < 0 {
+ panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters))
+ }
+}
+
+// HasReaders returns whether the pipe has any active readers.
+func (p *Pipe) HasReaders() bool {
+ return atomic.LoadInt32(&p.readers) > 0
+}
+
+// HasWriters returns whether the pipe has any active writers.
+func (p *Pipe) HasWriters() bool {
+ return atomic.LoadInt32(&p.writers) > 0
+}
+
+// rReadinessLocked calculates the read readiness.
+//
+// Precondition: mu must be held.
+func (p *Pipe) rReadinessLocked() waiter.EventMask {
+ ready := waiter.EventMask(0)
+ if p.HasReaders() && p.data.Front() != nil {
+ ready |= waiter.EventIn
+ }
+ if !p.HasWriters() && p.hadWriter {
+ // POLLHUP must be suppressed until the pipe has had at least one writer
+ // at some point. Otherwise a reader thread may poll and immediately get
+ // a POLLHUP before the writer ever opens the pipe, which the reader may
+ // interpret as the writer opening then closing the pipe.
+ ready |= waiter.EventHUp
+ }
+ return ready
+}
+
+// rReadiness returns a mask that states whether the read end of the pipe is
+// ready for reading.
+func (p *Pipe) rReadiness() waiter.EventMask {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.rReadinessLocked()
+}
+
+// wReadinessLocked calculates the write readiness.
+//
+// Precondition: mu must be held.
+func (p *Pipe) wReadinessLocked() waiter.EventMask {
+ ready := waiter.EventMask(0)
+ if p.HasWriters() && p.size < p.max {
+ ready |= waiter.EventOut
+ }
+ if !p.HasReaders() {
+ ready |= waiter.EventErr
+ }
+ return ready
+}
+
+// wReadiness returns a mask that states whether the write end of the pipe
+// is ready for writing.
+func (p *Pipe) wReadiness() waiter.EventMask {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.wReadinessLocked()
+}
+
+// rwReadiness returns a mask that states whether a read-write handle to the
+// pipe is ready for IO.
+func (p *Pipe) rwReadiness() waiter.EventMask {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.rReadinessLocked() | p.wReadinessLocked()
+}
+
+// queued returns the amount of queued data.
+func (p *Pipe) queued() int64 {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.size
+}
+
+// PipeSize implements PipeSizer.PipeSize.
+func (p *Pipe) PipeSize() int64 {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.max
+}
+
+// SetPipeSize implements PipeSize.SetPipeSize.
+func (p *Pipe) SetPipeSize(size int64) (int64, error) {
+ if size < 0 {
+ return 0, syserror.EINVAL
+ }
+ if size < MinimumPipeSize {
+ size = MinimumPipeSize // Per spec.
+ }
+ if size > MaximumPipeSize {
+ return 0, syserror.EPERM
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if size < p.size {
+ return 0, syserror.EBUSY
+ }
+ p.max = size
+ return size, nil
+}
diff --git a/pkg/sentry/kernel/pipe/pipe_state_autogen.go b/pkg/sentry/kernel/pipe/pipe_state_autogen.go
new file mode 100755
index 000000000..5d3686109
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/pipe_state_autogen.go
@@ -0,0 +1,134 @@
+// automatically generated by stateify.
+
+package pipe
+
+import (
+ "gvisor.googlesource.com/gvisor/pkg/state"
+)
+
+func (x *buffer) beforeSave() {}
+func (x *buffer) save(m state.Map) {
+ x.beforeSave()
+ m.Save("data", &x.data)
+ m.Save("read", &x.read)
+ m.Save("write", &x.write)
+ m.Save("bufferEntry", &x.bufferEntry)
+}
+
+func (x *buffer) afterLoad() {}
+func (x *buffer) load(m state.Map) {
+ m.Load("data", &x.data)
+ m.Load("read", &x.read)
+ m.Load("write", &x.write)
+ m.Load("bufferEntry", &x.bufferEntry)
+}
+
+func (x *bufferList) beforeSave() {}
+func (x *bufferList) save(m state.Map) {
+ x.beforeSave()
+ m.Save("head", &x.head)
+ m.Save("tail", &x.tail)
+}
+
+func (x *bufferList) afterLoad() {}
+func (x *bufferList) load(m state.Map) {
+ m.Load("head", &x.head)
+ m.Load("tail", &x.tail)
+}
+
+func (x *bufferEntry) beforeSave() {}
+func (x *bufferEntry) save(m state.Map) {
+ x.beforeSave()
+ m.Save("next", &x.next)
+ m.Save("prev", &x.prev)
+}
+
+func (x *bufferEntry) afterLoad() {}
+func (x *bufferEntry) load(m state.Map) {
+ m.Load("next", &x.next)
+ m.Load("prev", &x.prev)
+}
+
+func (x *inodeOperations) beforeSave() {}
+func (x *inodeOperations) save(m state.Map) {
+ x.beforeSave()
+ m.Save("InodeSimpleAttributes", &x.InodeSimpleAttributes)
+ m.Save("p", &x.p)
+}
+
+func (x *inodeOperations) afterLoad() {}
+func (x *inodeOperations) load(m state.Map) {
+ m.Load("InodeSimpleAttributes", &x.InodeSimpleAttributes)
+ m.Load("p", &x.p)
+}
+
+func (x *Pipe) beforeSave() {}
+func (x *Pipe) save(m state.Map) {
+ x.beforeSave()
+ m.Save("isNamed", &x.isNamed)
+ m.Save("atomicIOBytes", &x.atomicIOBytes)
+ m.Save("Dirent", &x.Dirent)
+ m.Save("readers", &x.readers)
+ m.Save("writers", &x.writers)
+ m.Save("data", &x.data)
+ m.Save("max", &x.max)
+ m.Save("size", &x.size)
+ m.Save("hadWriter", &x.hadWriter)
+}
+
+func (x *Pipe) afterLoad() {}
+func (x *Pipe) load(m state.Map) {
+ m.Load("isNamed", &x.isNamed)
+ m.Load("atomicIOBytes", &x.atomicIOBytes)
+ m.Load("Dirent", &x.Dirent)
+ m.Load("readers", &x.readers)
+ m.Load("writers", &x.writers)
+ m.Load("data", &x.data)
+ m.Load("max", &x.max)
+ m.Load("size", &x.size)
+ m.Load("hadWriter", &x.hadWriter)
+}
+
+func (x *Reader) beforeSave() {}
+func (x *Reader) save(m state.Map) {
+ x.beforeSave()
+ m.Save("ReaderWriter", &x.ReaderWriter)
+}
+
+func (x *Reader) afterLoad() {}
+func (x *Reader) load(m state.Map) {
+ m.Load("ReaderWriter", &x.ReaderWriter)
+}
+
+func (x *ReaderWriter) beforeSave() {}
+func (x *ReaderWriter) save(m state.Map) {
+ x.beforeSave()
+ m.Save("Pipe", &x.Pipe)
+}
+
+func (x *ReaderWriter) afterLoad() {}
+func (x *ReaderWriter) load(m state.Map) {
+ m.Load("Pipe", &x.Pipe)
+}
+
+func (x *Writer) beforeSave() {}
+func (x *Writer) save(m state.Map) {
+ x.beforeSave()
+ m.Save("ReaderWriter", &x.ReaderWriter)
+}
+
+func (x *Writer) afterLoad() {}
+func (x *Writer) load(m state.Map) {
+ m.Load("ReaderWriter", &x.ReaderWriter)
+}
+
+func init() {
+ state.Register("pipe.buffer", (*buffer)(nil), state.Fns{Save: (*buffer).save, Load: (*buffer).load})
+ state.Register("pipe.bufferList", (*bufferList)(nil), state.Fns{Save: (*bufferList).save, Load: (*bufferList).load})
+ state.Register("pipe.bufferEntry", (*bufferEntry)(nil), state.Fns{Save: (*bufferEntry).save, Load: (*bufferEntry).load})
+ state.Register("pipe.inodeOperations", (*inodeOperations)(nil), state.Fns{Save: (*inodeOperations).save, Load: (*inodeOperations).load})
+ state.Register("pipe.Pipe", (*Pipe)(nil), state.Fns{Save: (*Pipe).save, Load: (*Pipe).load})
+ state.Register("pipe.Reader", (*Reader)(nil), state.Fns{Save: (*Reader).save, Load: (*Reader).load})
+ state.Register("pipe.ReaderWriter", (*ReaderWriter)(nil), state.Fns{Save: (*ReaderWriter).save, Load: (*ReaderWriter).load})
+ state.Register("pipe.Writer", (*Writer)(nil), state.Fns{Save: (*Writer).save, Load: (*Writer).load})
+}
diff --git a/pkg/sentry/kernel/pipe/reader.go b/pkg/sentry/kernel/pipe/reader.go
new file mode 100644
index 000000000..656be824d
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/reader.go
@@ -0,0 +1,42 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+// Reader satisfies the fs.FileOperations interface for read-only pipes.
+// Reader should be used with !fs.FileFlags.Write to reject writes.
+//
+// +stateify savable
+type Reader struct {
+ ReaderWriter
+}
+
+// Release implements fs.FileOperations.Release.
+//
+// This overrides ReaderWriter.Release.
+func (r *Reader) Release() {
+ r.Pipe.rClose()
+
+ // Wake up writers.
+ r.Pipe.Notify(waiter.EventOut)
+}
+
+// Readiness returns the ready events in the underlying pipe.
+func (r *Reader) Readiness(mask waiter.EventMask) waiter.EventMask {
+ return r.Pipe.rReadiness() & mask
+}
diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go
new file mode 100644
index 000000000..e560b9be9
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/reader_writer.go
@@ -0,0 +1,96 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "math"
+ "syscall"
+
+ "gvisor.googlesource.com/gvisor/pkg/abi/linux"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/arch"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+// ReaderWriter satisfies the FileOperations interface and services both
+// read and write requests. This should only be used directly for named pipes.
+// pipe(2) and pipe2(2) only support unidirectional pipes and should use
+// either pipe.Reader or pipe.Writer.
+//
+// +stateify savable
+type ReaderWriter struct {
+ fsutil.FilePipeSeek `state:"nosave"`
+ fsutil.FileNotDirReaddir `state:"nosave"`
+ fsutil.FileNoFsync `state:"nosave"`
+ fsutil.FileNoMMap `state:"nosave"`
+ fsutil.FileNoSplice `state:"nosave"`
+ fsutil.FileNoopFlush `state:"nosave"`
+ fsutil.FileUseInodeUnstableAttr `state:"nosave"`
+ *Pipe
+}
+
+// Release implements fs.FileOperations.Release.
+func (rw *ReaderWriter) Release() {
+ rw.Pipe.rClose()
+ rw.Pipe.wClose()
+
+ // Wake up readers and writers.
+ rw.Pipe.Notify(waiter.EventIn | waiter.EventOut)
+}
+
+// Read implements fs.FileOperations.Read.
+func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) {
+ n, err := rw.Pipe.read(ctx, dst)
+ if n > 0 {
+ rw.Pipe.Notify(waiter.EventOut)
+ }
+ return n, err
+}
+
+// Write implements fs.FileOperations.Write.
+func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) {
+ n, err := rw.Pipe.write(ctx, src)
+ if n > 0 {
+ rw.Pipe.Notify(waiter.EventIn)
+ }
+ return n, err
+}
+
+// Readiness returns the ready events in the underlying pipe.
+func (rw *ReaderWriter) Readiness(mask waiter.EventMask) waiter.EventMask {
+ return rw.Pipe.rwReadiness() & mask
+}
+
+// Ioctl implements fs.FileOperations.Ioctl.
+func (rw *ReaderWriter) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArguments) (uintptr, error) {
+ // Switch on ioctl request.
+ switch int(args[1].Int()) {
+ case linux.FIONREAD:
+ v := rw.queued()
+ if v > math.MaxInt32 {
+ v = math.MaxInt32 // Silently truncate.
+ }
+ // Copy result to user-space.
+ _, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{
+ AddressSpaceActive: true,
+ })
+ return 0, err
+ default:
+ return 0, syscall.ENOTTY
+ }
+}
diff --git a/pkg/sentry/kernel/pipe/writer.go b/pkg/sentry/kernel/pipe/writer.go
new file mode 100644
index 000000000..8d5b68541
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/writer.go
@@ -0,0 +1,42 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+// Writer satisfies the fs.FileOperations interface for write-only pipes.
+// Writer should be used with !fs.FileFlags.Read to reject reads.
+//
+// +stateify savable
+type Writer struct {
+ ReaderWriter
+}
+
+// Release implements fs.FileOperations.Release.
+//
+// This overrides ReaderWriter.Release.
+func (w *Writer) Release() {
+ w.Pipe.wClose()
+
+ // Wake up readers.
+ w.Pipe.Notify(waiter.EventHUp)
+}
+
+// Readiness returns the ready events in the underlying pipe.
+func (w *Writer) Readiness(mask waiter.EventMask) waiter.EventMask {
+ return w.Pipe.wReadiness() & mask
+}