diff options
Diffstat (limited to 'pkg/sentry/kernel/pipe')
-rw-r--r-- | pkg/sentry/kernel/pipe/buffer.go | 90 | ||||
-rwxr-xr-x | pkg/sentry/kernel/pipe/buffer_list.go | 173 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/device.go | 20 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/node.go | 196 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe.go | 429 | ||||
-rwxr-xr-x | pkg/sentry/kernel/pipe/pipe_state_autogen.go | 134 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/reader.go | 42 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/reader_writer.go | 96 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/writer.go | 42 |
9 files changed, 1222 insertions, 0 deletions
diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go new file mode 100644 index 000000000..4360dc44f --- /dev/null +++ b/pkg/sentry/kernel/pipe/buffer.go @@ -0,0 +1,90 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "sync" + + "gvisor.googlesource.com/gvisor/pkg/sentry/safemem" +) + +// buffer encapsulates a queueable byte buffer. +// +// Note that the total size is slightly less than two pages. This +// is done intentionally to ensure that the buffer object aligns +// with runtime internals. We have no hard size or alignment +// requirements. This two page size will effectively minimize +// internal fragmentation, but still have a large enough chunk +// to limit excessive segmentation. +// +// +stateify savable +type buffer struct { + data [8144]byte + read int + write int + bufferEntry +} + +// Reset resets internal data. +// +// This must be called before use. +func (b *buffer) Reset() { + b.read = 0 + b.write = 0 +} + +// Empty indicates the buffer is empty. +// +// This indicates there is no data left to read. +func (b *buffer) Empty() bool { + return b.read == b.write +} + +// Full indicates the buffer is full. +// +// This indicates there is no capacity left to write. +func (b *buffer) Full() bool { + return b.write == len(b.data) +} + +// WriteFromBlocks implements safemem.Writer.WriteFromBlocks. +func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) { + dst := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.write:])) + n, err := safemem.CopySeq(dst, srcs) + b.write += int(n) + return n, err +} + +// ReadToBlocks implements safemem.Reader.ReadToBlocks. +func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) { + src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write])) + n, err := safemem.CopySeq(dsts, src) + b.read += int(n) + return n, err +} + +// bufferPool is a pool for buffers. +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(buffer) + }, +} + +// newBuffer grabs a new buffer from the pool. +func newBuffer() *buffer { + b := bufferPool.Get().(*buffer) + b.Reset() + return b +} diff --git a/pkg/sentry/kernel/pipe/buffer_list.go b/pkg/sentry/kernel/pipe/buffer_list.go new file mode 100755 index 000000000..42ec78788 --- /dev/null +++ b/pkg/sentry/kernel/pipe/buffer_list.go @@ -0,0 +1,173 @@ +package pipe + +// ElementMapper provides an identity mapping by default. +// +// This can be replaced to provide a struct that maps elements to linker +// objects, if they are not the same. An ElementMapper is not typically +// required if: Linker is left as is, Element is left as is, or Linker and +// Element are the same type. +type bufferElementMapper struct{} + +// linkerFor maps an Element to a Linker. +// +// This default implementation should be inlined. +// +//go:nosplit +func (bufferElementMapper) linkerFor(elem *buffer) *buffer { return elem } + +// List is an intrusive list. Entries can be added to or removed from the list +// in O(1) time and with no additional memory allocations. +// +// The zero value for List is an empty list ready to use. +// +// To iterate over a list (where l is a List): +// for e := l.Front(); e != nil; e = e.Next() { +// // do something with e. +// } +// +// +stateify savable +type bufferList struct { + head *buffer + tail *buffer +} + +// Reset resets list l to the empty state. +func (l *bufferList) Reset() { + l.head = nil + l.tail = nil +} + +// Empty returns true iff the list is empty. +func (l *bufferList) Empty() bool { + return l.head == nil +} + +// Front returns the first element of list l or nil. +func (l *bufferList) Front() *buffer { + return l.head +} + +// Back returns the last element of list l or nil. +func (l *bufferList) Back() *buffer { + return l.tail +} + +// PushFront inserts the element e at the front of list l. +func (l *bufferList) PushFront(e *buffer) { + bufferElementMapper{}.linkerFor(e).SetNext(l.head) + bufferElementMapper{}.linkerFor(e).SetPrev(nil) + + if l.head != nil { + bufferElementMapper{}.linkerFor(l.head).SetPrev(e) + } else { + l.tail = e + } + + l.head = e +} + +// PushBack inserts the element e at the back of list l. +func (l *bufferList) PushBack(e *buffer) { + bufferElementMapper{}.linkerFor(e).SetNext(nil) + bufferElementMapper{}.linkerFor(e).SetPrev(l.tail) + + if l.tail != nil { + bufferElementMapper{}.linkerFor(l.tail).SetNext(e) + } else { + l.head = e + } + + l.tail = e +} + +// PushBackList inserts list m at the end of list l, emptying m. +func (l *bufferList) PushBackList(m *bufferList) { + if l.head == nil { + l.head = m.head + l.tail = m.tail + } else if m.head != nil { + bufferElementMapper{}.linkerFor(l.tail).SetNext(m.head) + bufferElementMapper{}.linkerFor(m.head).SetPrev(l.tail) + + l.tail = m.tail + } + + m.head = nil + m.tail = nil +} + +// InsertAfter inserts e after b. +func (l *bufferList) InsertAfter(b, e *buffer) { + a := bufferElementMapper{}.linkerFor(b).Next() + bufferElementMapper{}.linkerFor(e).SetNext(a) + bufferElementMapper{}.linkerFor(e).SetPrev(b) + bufferElementMapper{}.linkerFor(b).SetNext(e) + + if a != nil { + bufferElementMapper{}.linkerFor(a).SetPrev(e) + } else { + l.tail = e + } +} + +// InsertBefore inserts e before a. +func (l *bufferList) InsertBefore(a, e *buffer) { + b := bufferElementMapper{}.linkerFor(a).Prev() + bufferElementMapper{}.linkerFor(e).SetNext(a) + bufferElementMapper{}.linkerFor(e).SetPrev(b) + bufferElementMapper{}.linkerFor(a).SetPrev(e) + + if b != nil { + bufferElementMapper{}.linkerFor(b).SetNext(e) + } else { + l.head = e + } +} + +// Remove removes e from l. +func (l *bufferList) Remove(e *buffer) { + prev := bufferElementMapper{}.linkerFor(e).Prev() + next := bufferElementMapper{}.linkerFor(e).Next() + + if prev != nil { + bufferElementMapper{}.linkerFor(prev).SetNext(next) + } else { + l.head = next + } + + if next != nil { + bufferElementMapper{}.linkerFor(next).SetPrev(prev) + } else { + l.tail = prev + } +} + +// Entry is a default implementation of Linker. Users can add anonymous fields +// of this type to their structs to make them automatically implement the +// methods needed by List. +// +// +stateify savable +type bufferEntry struct { + next *buffer + prev *buffer +} + +// Next returns the entry that follows e in the list. +func (e *bufferEntry) Next() *buffer { + return e.next +} + +// Prev returns the entry that precedes e in the list. +func (e *bufferEntry) Prev() *buffer { + return e.prev +} + +// SetNext assigns 'entry' as the entry that follows e in the list. +func (e *bufferEntry) SetNext(elem *buffer) { + e.next = elem +} + +// SetPrev assigns 'entry' as the entry that precedes e in the list. +func (e *bufferEntry) SetPrev(elem *buffer) { + e.prev = elem +} diff --git a/pkg/sentry/kernel/pipe/device.go b/pkg/sentry/kernel/pipe/device.go new file mode 100644 index 000000000..eb59e15a1 --- /dev/null +++ b/pkg/sentry/kernel/pipe/device.go @@ -0,0 +1,20 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import "gvisor.googlesource.com/gvisor/pkg/sentry/device" + +// pipeDevice is used for all pipe files. +var pipeDevice = device.NewAnonDevice() diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go new file mode 100644 index 000000000..926c4c623 --- /dev/null +++ b/pkg/sentry/kernel/pipe/node.go @@ -0,0 +1,196 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "sync" + + "gvisor.googlesource.com/gvisor/pkg/abi/linux" + "gvisor.googlesource.com/gvisor/pkg/amutex" + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil" + "gvisor.googlesource.com/gvisor/pkg/syserror" +) + +// inodeOperations implements fs.InodeOperations for pipes. +// +// +stateify savable +type inodeOperations struct { + fsutil.InodeGenericChecker `state:"nosave"` + fsutil.InodeNoExtendedAttributes `state:"nosave"` + fsutil.InodeNoopRelease `state:"nosave"` + fsutil.InodeNoopTruncate `state:"nosave"` + fsutil.InodeNoopWriteOut `state:"nosave"` + fsutil.InodeNotDirectory `state:"nosave"` + fsutil.InodeNotMappable `state:"nosave"` + fsutil.InodeNotSocket `state:"nosave"` + fsutil.InodeNotSymlink `state:"nosave"` + fsutil.InodeNotVirtual `state:"nosave"` + + fsutil.InodeSimpleAttributes + + // mu protects the fields below. + mu sync.Mutex `state:"nosave"` + + // p is the underlying Pipe object representing this fifo. + p *Pipe + + // Channels for synchronizing the creation of new readers and writers of + // this fifo. See waitFor and newHandleLocked. + // + // These are not saved/restored because all waiters are unblocked on save, + // and either automatically restart (via ERESTARTSYS) or return EINTR on + // resume. On restarts via ERESTARTSYS, the appropriate channel will be + // recreated. + rWakeup chan struct{} `state:"nosave"` + wWakeup chan struct{} `state:"nosave"` +} + +var _ fs.InodeOperations = (*inodeOperations)(nil) + +// NewInodeOperations returns a new fs.InodeOperations for a given pipe. +func NewInodeOperations(ctx context.Context, perms fs.FilePermissions, p *Pipe) *inodeOperations { + return &inodeOperations{ + InodeSimpleAttributes: fsutil.NewInodeSimpleAttributes(ctx, fs.FileOwnerFromContext(ctx), perms, linux.PIPEFS_MAGIC), + p: p, + } +} + +// GetFile implements fs.InodeOperations.GetFile. Named pipes have special blocking +// semantics during open: +// +// "Normally, opening the FIFO blocks until the other end is opened also. A +// process can open a FIFO in nonblocking mode. In this case, opening for +// read-only will succeed even if no-one has opened on the write side yet, +// opening for write-only will fail with ENXIO (no such device or address) +// unless the other end has already been opened. Under Linux, opening a FIFO +// for read and write will succeed both in blocking and nonblocking mode. POSIX +// leaves this behavior undefined. This can be used to open a FIFO for writing +// while there are no readers available." - fifo(7) +func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) (*fs.File, error) { + i.mu.Lock() + defer i.mu.Unlock() + + switch { + case flags.Read && !flags.Write: // O_RDONLY. + r := i.p.Open(ctx, flags) + i.newHandleLocked(&i.rWakeup) + + if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() { + if !i.waitFor(&i.wWakeup, ctx) { + r.DecRef() + return nil, syserror.ErrInterrupted + } + } + + // By now, either we're doing a nonblocking open or we have a writer. On + // a nonblocking read-only open, the open succeeds even if no-one has + // opened the write side yet. + return r, nil + + case flags.Write && !flags.Read: // O_WRONLY. + w := i.p.Open(ctx, flags) + i.newHandleLocked(&i.wWakeup) + + if i.p.isNamed && !i.p.HasReaders() { + // On a nonblocking, write-only open, the open fails with ENXIO if the + // read side isn't open yet. + if flags.NonBlocking { + w.DecRef() + return nil, syserror.ENXIO + } + + if !i.waitFor(&i.rWakeup, ctx) { + w.DecRef() + return nil, syserror.ErrInterrupted + } + } + return w, nil + + case flags.Read && flags.Write: // O_RDWR. + // Pipes opened for read-write always succeeds without blocking. + rw := i.p.Open(ctx, flags) + i.newHandleLocked(&i.rWakeup) + i.newHandleLocked(&i.wWakeup) + return rw, nil + + default: + return nil, syserror.EINVAL + } +} + +// waitFor blocks until the underlying pipe has at least one reader/writer is +// announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this +// function will block for either readers or writers, depending on where +// 'wakeupChan' points. +// +// f.mu must be held by the caller. waitFor returns with f.mu held, but it will +// drop f.mu before blocking for any reader/writers. +func (i *inodeOperations) waitFor(wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool { + // Ideally this function would simply use a condition variable. However, the + // wait needs to be interruptible via 'sleeper', so we must sychronize via a + // channel. The synchronization below relies on the fact that closing a + // channel unblocks all receives on the channel. + + // Does an appropriate wakeup channel already exist? If not, create a new + // one. This is all done under f.mu to avoid races. + if *wakeupChan == nil { + *wakeupChan = make(chan struct{}) + } + + // Grab a local reference to the wakeup channel since it may disappear as + // soon as we drop f.mu. + wakeup := *wakeupChan + + // Drop the lock and prepare to sleep. + i.mu.Unlock() + cancel := sleeper.SleepStart() + + // Wait for either a new reader/write to be signalled via 'wakeup', or + // for the sleep to be cancelled. + select { + case <-wakeup: + sleeper.SleepFinish(true) + case <-cancel: + sleeper.SleepFinish(false) + } + + // Take the lock and check if we were woken. If we were woken and + // interrupted, the former takes priority. + i.mu.Lock() + select { + case <-wakeup: + return true + default: + return false + } +} + +// newHandleLocked signals a new pipe reader or writer depending on where +// 'wakeupChan' points. This unblocks any corresponding reader or writer +// waiting for the other end of the channel to be opened, see Fifo.waitFor. +// +// i.mu must be held. +func (*inodeOperations) newHandleLocked(wakeupChan *chan struct{}) { + if *wakeupChan != nil { + close(*wakeupChan) + *wakeupChan = nil + } +} + +func (*inodeOperations) Allocate(_ context.Context, _ *fs.Inode, _, _ int64) error { + return syserror.EPIPE +} diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go new file mode 100644 index 000000000..b65204492 --- /dev/null +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -0,0 +1,429 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package pipe provides a pipe implementation. +package pipe + +import ( + "fmt" + "sync" + "sync/atomic" + "syscall" + + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/syserror" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +const ( + // MinimumPipeSize is a hard limit of the minimum size of a pipe. + MinimumPipeSize = 64 << 10 + + // DefaultPipeSize is the system-wide default size of a pipe in bytes. + DefaultPipeSize = MinimumPipeSize + + // MaximumPipeSize is a hard limit on the maximum size of a pipe. + MaximumPipeSize = 8 << 20 +) + +// Sizer is an interface for setting and getting the size of a pipe. +// +// It is implemented by Pipe and, through embedding, all other types. +type Sizer interface { + // PipeSize returns the pipe capacity in bytes. + PipeSize() int64 + + // SetPipeSize sets the new pipe capacity in bytes. + // + // The new size is returned (which may be capped). + SetPipeSize(int64) (int64, error) +} + +// Pipe is an encapsulation of a platform-independent pipe. +// It manages a buffered byte queue shared between a reader/writer +// pair. +// +// +stateify savable +type Pipe struct { + waiter.Queue `state:"nosave"` + + // isNamed indicates whether this is a named pipe. + // + // This value is immutable. + isNamed bool + + // atomicIOBytes is the maximum number of bytes that the pipe will + // guarantee atomic reads or writes atomically. + // + // This value is immutable. + atomicIOBytes int64 + + // The dirent backing this pipe. Shared by all readers and writers. + // + // This value is immutable. + Dirent *fs.Dirent + + // The number of active readers for this pipe. + // + // Access atomically. + readers int32 + + // The number of active writes for this pipe. + // + // Access atomically. + writers int32 + + // mu protects all pipe internal state below. + mu sync.Mutex `state:"nosave"` + + // data is the buffer queue of pipe contents. + // + // This is protected by mu. + data bufferList + + // max is the maximum size of the pipe in bytes. When this max has been + // reached, writers will get EWOULDBLOCK. + // + // This is protected by mu. + max int64 + + // size is the current size of the pipe in bytes. + // + // This is protected by mu. + size int64 + + // hadWriter indicates if this pipe ever had a writer. Note that this + // does not necessarily indicate there is *currently* a writer, just + // that there has been a writer at some point since the pipe was + // created. + // + // This is protected by mu. + hadWriter bool +} + +// NewPipe initializes and returns a pipe. +// +// N.B. The size and atomicIOBytes will be bounded. +func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe { + if sizeBytes < MinimumPipeSize { + sizeBytes = MinimumPipeSize + } + if sizeBytes > MaximumPipeSize { + sizeBytes = MaximumPipeSize + } + if atomicIOBytes <= 0 { + atomicIOBytes = 1 + } + if atomicIOBytes > sizeBytes { + atomicIOBytes = sizeBytes + } + p := &Pipe{ + isNamed: isNamed, + max: sizeBytes, + atomicIOBytes: atomicIOBytes, + } + + // Build the fs.Dirent of this pipe, shared by all fs.Files associated + // with this pipe. + perms := fs.FilePermissions{ + User: fs.PermMask{Read: true, Write: true}, + } + iops := NewInodeOperations(ctx, perms, p) + ino := pipeDevice.NextIno() + sattr := fs.StableAttr{ + Type: fs.Pipe, + DeviceID: pipeDevice.DeviceID(), + InodeID: ino, + BlockSize: int64(atomicIOBytes), + } + ms := fs.NewPseudoMountSource() + p.Dirent = fs.NewDirent(fs.NewInode(iops, ms, sattr), fmt.Sprintf("pipe:[%d]", ino)) + return p +} + +// NewConnectedPipe initializes a pipe and returns a pair of objects +// representing the read and write ends of the pipe. +func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs.File, *fs.File) { + p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes) + return p.Open(ctx, fs.FileFlags{Read: true}), p.Open(ctx, fs.FileFlags{Write: true}) +} + +// Open opens the pipe and returns a new file. +// +// Precondition: at least one of flags.Read or flags.Write must be set. +func (p *Pipe) Open(ctx context.Context, flags fs.FileFlags) *fs.File { + switch { + case flags.Read && flags.Write: + p.rOpen() + p.wOpen() + return fs.NewFile(ctx, p.Dirent, flags, &ReaderWriter{ + Pipe: p, + }) + case flags.Read: + p.rOpen() + return fs.NewFile(ctx, p.Dirent, flags, &Reader{ + ReaderWriter: ReaderWriter{Pipe: p}, + }) + case flags.Write: + p.wOpen() + return fs.NewFile(ctx, p.Dirent, flags, &Writer{ + ReaderWriter: ReaderWriter{Pipe: p}, + }) + default: + // Precondition violated. + panic("invalid pipe flags") + } +} + +// read reads data from the pipe into dst and returns the number of bytes +// read, or returns ErrWouldBlock if the pipe is empty. +// +// Precondition: this pipe must have readers. +func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) { + // Don't block for a zero-length read even if the pipe is empty. + if dst.NumBytes() == 0 { + return 0, nil + } + + p.mu.Lock() + defer p.mu.Unlock() + + // Is the pipe empty? + if p.size == 0 { + if !p.HasWriters() { + // There are no writers, return EOF. + return 0, nil + } + return 0, syserror.ErrWouldBlock + } + + // Limit how much we consume. + if dst.NumBytes() > p.size { + dst = dst.TakeFirst64(p.size) + } + + done := int64(0) + for dst.NumBytes() > 0 { + // Pop the first buffer. + first := p.data.Front() + if first == nil { + break + } + + // Copy user data. + n, err := dst.CopyOutFrom(ctx, first) + done += int64(n) + p.size -= n + dst = dst.DropFirst64(n) + + // Empty buffer? + if first.Empty() { + // Push to the free list. + p.data.Remove(first) + bufferPool.Put(first) + } + + // Handle errors. + if err != nil { + return done, err + } + } + + return done, nil +} + +// write writes data from sv into the pipe and returns the number of bytes +// written. If no bytes are written because the pipe is full (or has less than +// atomicIOBytes free capacity), write returns ErrWouldBlock. +// +// Precondition: this pipe must have writers. +func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) { + p.mu.Lock() + defer p.mu.Unlock() + + // Can't write to a pipe with no readers. + if !p.HasReaders() { + return 0, syscall.EPIPE + } + + // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be + // atomic, but requires no atomicity for writes larger than this. + wanted := src.NumBytes() + if avail := p.max - p.size; wanted > avail { + if wanted <= p.atomicIOBytes { + return 0, syserror.ErrWouldBlock + } + // Limit to the available capacity. + src = src.TakeFirst64(avail) + } + + done := int64(0) + for src.NumBytes() > 0 { + // Need a new buffer? + last := p.data.Back() + if last == nil || last.Full() { + // Add a new buffer to the data list. + last = newBuffer() + p.data.PushBack(last) + } + + // Copy user data. + n, err := src.CopyInTo(ctx, last) + done += int64(n) + p.size += n + src = src.DropFirst64(n) + + // Handle errors. + if err != nil { + return done, err + } + } + if wanted > done { + // Partial write due to full pipe. + return done, syserror.ErrWouldBlock + } + + return done, nil +} + +// rOpen signals a new reader of the pipe. +func (p *Pipe) rOpen() { + atomic.AddInt32(&p.readers, 1) +} + +// wOpen signals a new writer of the pipe. +func (p *Pipe) wOpen() { + p.mu.Lock() + defer p.mu.Unlock() + p.hadWriter = true + atomic.AddInt32(&p.writers, 1) +} + +// rClose signals that a reader has closed their end of the pipe. +func (p *Pipe) rClose() { + newReaders := atomic.AddInt32(&p.readers, -1) + if newReaders < 0 { + panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders)) + } +} + +// wClose signals that a writer has closed their end of the pipe. +func (p *Pipe) wClose() { + newWriters := atomic.AddInt32(&p.writers, -1) + if newWriters < 0 { + panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters)) + } +} + +// HasReaders returns whether the pipe has any active readers. +func (p *Pipe) HasReaders() bool { + return atomic.LoadInt32(&p.readers) > 0 +} + +// HasWriters returns whether the pipe has any active writers. +func (p *Pipe) HasWriters() bool { + return atomic.LoadInt32(&p.writers) > 0 +} + +// rReadinessLocked calculates the read readiness. +// +// Precondition: mu must be held. +func (p *Pipe) rReadinessLocked() waiter.EventMask { + ready := waiter.EventMask(0) + if p.HasReaders() && p.data.Front() != nil { + ready |= waiter.EventIn + } + if !p.HasWriters() && p.hadWriter { + // POLLHUP must be suppressed until the pipe has had at least one writer + // at some point. Otherwise a reader thread may poll and immediately get + // a POLLHUP before the writer ever opens the pipe, which the reader may + // interpret as the writer opening then closing the pipe. + ready |= waiter.EventHUp + } + return ready +} + +// rReadiness returns a mask that states whether the read end of the pipe is +// ready for reading. +func (p *Pipe) rReadiness() waiter.EventMask { + p.mu.Lock() + defer p.mu.Unlock() + return p.rReadinessLocked() +} + +// wReadinessLocked calculates the write readiness. +// +// Precondition: mu must be held. +func (p *Pipe) wReadinessLocked() waiter.EventMask { + ready := waiter.EventMask(0) + if p.HasWriters() && p.size < p.max { + ready |= waiter.EventOut + } + if !p.HasReaders() { + ready |= waiter.EventErr + } + return ready +} + +// wReadiness returns a mask that states whether the write end of the pipe +// is ready for writing. +func (p *Pipe) wReadiness() waiter.EventMask { + p.mu.Lock() + defer p.mu.Unlock() + return p.wReadinessLocked() +} + +// rwReadiness returns a mask that states whether a read-write handle to the +// pipe is ready for IO. +func (p *Pipe) rwReadiness() waiter.EventMask { + p.mu.Lock() + defer p.mu.Unlock() + return p.rReadinessLocked() | p.wReadinessLocked() +} + +// queued returns the amount of queued data. +func (p *Pipe) queued() int64 { + p.mu.Lock() + defer p.mu.Unlock() + return p.size +} + +// PipeSize implements PipeSizer.PipeSize. +func (p *Pipe) PipeSize() int64 { + p.mu.Lock() + defer p.mu.Unlock() + return p.max +} + +// SetPipeSize implements PipeSize.SetPipeSize. +func (p *Pipe) SetPipeSize(size int64) (int64, error) { + if size < 0 { + return 0, syserror.EINVAL + } + if size < MinimumPipeSize { + size = MinimumPipeSize // Per spec. + } + if size > MaximumPipeSize { + return 0, syserror.EPERM + } + p.mu.Lock() + defer p.mu.Unlock() + if size < p.size { + return 0, syserror.EBUSY + } + p.max = size + return size, nil +} diff --git a/pkg/sentry/kernel/pipe/pipe_state_autogen.go b/pkg/sentry/kernel/pipe/pipe_state_autogen.go new file mode 100755 index 000000000..5d3686109 --- /dev/null +++ b/pkg/sentry/kernel/pipe/pipe_state_autogen.go @@ -0,0 +1,134 @@ +// automatically generated by stateify. + +package pipe + +import ( + "gvisor.googlesource.com/gvisor/pkg/state" +) + +func (x *buffer) beforeSave() {} +func (x *buffer) save(m state.Map) { + x.beforeSave() + m.Save("data", &x.data) + m.Save("read", &x.read) + m.Save("write", &x.write) + m.Save("bufferEntry", &x.bufferEntry) +} + +func (x *buffer) afterLoad() {} +func (x *buffer) load(m state.Map) { + m.Load("data", &x.data) + m.Load("read", &x.read) + m.Load("write", &x.write) + m.Load("bufferEntry", &x.bufferEntry) +} + +func (x *bufferList) beforeSave() {} +func (x *bufferList) save(m state.Map) { + x.beforeSave() + m.Save("head", &x.head) + m.Save("tail", &x.tail) +} + +func (x *bufferList) afterLoad() {} +func (x *bufferList) load(m state.Map) { + m.Load("head", &x.head) + m.Load("tail", &x.tail) +} + +func (x *bufferEntry) beforeSave() {} +func (x *bufferEntry) save(m state.Map) { + x.beforeSave() + m.Save("next", &x.next) + m.Save("prev", &x.prev) +} + +func (x *bufferEntry) afterLoad() {} +func (x *bufferEntry) load(m state.Map) { + m.Load("next", &x.next) + m.Load("prev", &x.prev) +} + +func (x *inodeOperations) beforeSave() {} +func (x *inodeOperations) save(m state.Map) { + x.beforeSave() + m.Save("InodeSimpleAttributes", &x.InodeSimpleAttributes) + m.Save("p", &x.p) +} + +func (x *inodeOperations) afterLoad() {} +func (x *inodeOperations) load(m state.Map) { + m.Load("InodeSimpleAttributes", &x.InodeSimpleAttributes) + m.Load("p", &x.p) +} + +func (x *Pipe) beforeSave() {} +func (x *Pipe) save(m state.Map) { + x.beforeSave() + m.Save("isNamed", &x.isNamed) + m.Save("atomicIOBytes", &x.atomicIOBytes) + m.Save("Dirent", &x.Dirent) + m.Save("readers", &x.readers) + m.Save("writers", &x.writers) + m.Save("data", &x.data) + m.Save("max", &x.max) + m.Save("size", &x.size) + m.Save("hadWriter", &x.hadWriter) +} + +func (x *Pipe) afterLoad() {} +func (x *Pipe) load(m state.Map) { + m.Load("isNamed", &x.isNamed) + m.Load("atomicIOBytes", &x.atomicIOBytes) + m.Load("Dirent", &x.Dirent) + m.Load("readers", &x.readers) + m.Load("writers", &x.writers) + m.Load("data", &x.data) + m.Load("max", &x.max) + m.Load("size", &x.size) + m.Load("hadWriter", &x.hadWriter) +} + +func (x *Reader) beforeSave() {} +func (x *Reader) save(m state.Map) { + x.beforeSave() + m.Save("ReaderWriter", &x.ReaderWriter) +} + +func (x *Reader) afterLoad() {} +func (x *Reader) load(m state.Map) { + m.Load("ReaderWriter", &x.ReaderWriter) +} + +func (x *ReaderWriter) beforeSave() {} +func (x *ReaderWriter) save(m state.Map) { + x.beforeSave() + m.Save("Pipe", &x.Pipe) +} + +func (x *ReaderWriter) afterLoad() {} +func (x *ReaderWriter) load(m state.Map) { + m.Load("Pipe", &x.Pipe) +} + +func (x *Writer) beforeSave() {} +func (x *Writer) save(m state.Map) { + x.beforeSave() + m.Save("ReaderWriter", &x.ReaderWriter) +} + +func (x *Writer) afterLoad() {} +func (x *Writer) load(m state.Map) { + m.Load("ReaderWriter", &x.ReaderWriter) +} + +func init() { + state.Register("pipe.buffer", (*buffer)(nil), state.Fns{Save: (*buffer).save, Load: (*buffer).load}) + state.Register("pipe.bufferList", (*bufferList)(nil), state.Fns{Save: (*bufferList).save, Load: (*bufferList).load}) + state.Register("pipe.bufferEntry", (*bufferEntry)(nil), state.Fns{Save: (*bufferEntry).save, Load: (*bufferEntry).load}) + state.Register("pipe.inodeOperations", (*inodeOperations)(nil), state.Fns{Save: (*inodeOperations).save, Load: (*inodeOperations).load}) + state.Register("pipe.Pipe", (*Pipe)(nil), state.Fns{Save: (*Pipe).save, Load: (*Pipe).load}) + state.Register("pipe.Reader", (*Reader)(nil), state.Fns{Save: (*Reader).save, Load: (*Reader).load}) + state.Register("pipe.ReaderWriter", (*ReaderWriter)(nil), state.Fns{Save: (*ReaderWriter).save, Load: (*ReaderWriter).load}) + state.Register("pipe.Writer", (*Writer)(nil), state.Fns{Save: (*Writer).save, Load: (*Writer).load}) +} diff --git a/pkg/sentry/kernel/pipe/reader.go b/pkg/sentry/kernel/pipe/reader.go new file mode 100644 index 000000000..656be824d --- /dev/null +++ b/pkg/sentry/kernel/pipe/reader.go @@ -0,0 +1,42 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// Reader satisfies the fs.FileOperations interface for read-only pipes. +// Reader should be used with !fs.FileFlags.Write to reject writes. +// +// +stateify savable +type Reader struct { + ReaderWriter +} + +// Release implements fs.FileOperations.Release. +// +// This overrides ReaderWriter.Release. +func (r *Reader) Release() { + r.Pipe.rClose() + + // Wake up writers. + r.Pipe.Notify(waiter.EventOut) +} + +// Readiness returns the ready events in the underlying pipe. +func (r *Reader) Readiness(mask waiter.EventMask) waiter.EventMask { + return r.Pipe.rReadiness() & mask +} diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go new file mode 100644 index 000000000..e560b9be9 --- /dev/null +++ b/pkg/sentry/kernel/pipe/reader_writer.go @@ -0,0 +1,96 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "math" + "syscall" + + "gvisor.googlesource.com/gvisor/pkg/abi/linux" + "gvisor.googlesource.com/gvisor/pkg/sentry/arch" + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// ReaderWriter satisfies the FileOperations interface and services both +// read and write requests. This should only be used directly for named pipes. +// pipe(2) and pipe2(2) only support unidirectional pipes and should use +// either pipe.Reader or pipe.Writer. +// +// +stateify savable +type ReaderWriter struct { + fsutil.FilePipeSeek `state:"nosave"` + fsutil.FileNotDirReaddir `state:"nosave"` + fsutil.FileNoFsync `state:"nosave"` + fsutil.FileNoMMap `state:"nosave"` + fsutil.FileNoSplice `state:"nosave"` + fsutil.FileNoopFlush `state:"nosave"` + fsutil.FileUseInodeUnstableAttr `state:"nosave"` + *Pipe +} + +// Release implements fs.FileOperations.Release. +func (rw *ReaderWriter) Release() { + rw.Pipe.rClose() + rw.Pipe.wClose() + + // Wake up readers and writers. + rw.Pipe.Notify(waiter.EventIn | waiter.EventOut) +} + +// Read implements fs.FileOperations.Read. +func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) { + n, err := rw.Pipe.read(ctx, dst) + if n > 0 { + rw.Pipe.Notify(waiter.EventOut) + } + return n, err +} + +// Write implements fs.FileOperations.Write. +func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) { + n, err := rw.Pipe.write(ctx, src) + if n > 0 { + rw.Pipe.Notify(waiter.EventIn) + } + return n, err +} + +// Readiness returns the ready events in the underlying pipe. +func (rw *ReaderWriter) Readiness(mask waiter.EventMask) waiter.EventMask { + return rw.Pipe.rwReadiness() & mask +} + +// Ioctl implements fs.FileOperations.Ioctl. +func (rw *ReaderWriter) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArguments) (uintptr, error) { + // Switch on ioctl request. + switch int(args[1].Int()) { + case linux.FIONREAD: + v := rw.queued() + if v > math.MaxInt32 { + v = math.MaxInt32 // Silently truncate. + } + // Copy result to user-space. + _, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{ + AddressSpaceActive: true, + }) + return 0, err + default: + return 0, syscall.ENOTTY + } +} diff --git a/pkg/sentry/kernel/pipe/writer.go b/pkg/sentry/kernel/pipe/writer.go new file mode 100644 index 000000000..8d5b68541 --- /dev/null +++ b/pkg/sentry/kernel/pipe/writer.go @@ -0,0 +1,42 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// Writer satisfies the fs.FileOperations interface for write-only pipes. +// Writer should be used with !fs.FileFlags.Read to reject reads. +// +// +stateify savable +type Writer struct { + ReaderWriter +} + +// Release implements fs.FileOperations.Release. +// +// This overrides ReaderWriter.Release. +func (w *Writer) Release() { + w.Pipe.wClose() + + // Wake up readers. + w.Pipe.Notify(waiter.EventHUp) +} + +// Readiness returns the ready events in the underlying pipe. +func (w *Writer) Readiness(mask waiter.EventMask) waiter.EventMask { + return w.Pipe.wReadiness() & mask +} |