diff options
Diffstat (limited to 'pkg/sentry/kernel/pipe/pipe.go')
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe.go | 315 |
1 files changed, 210 insertions, 105 deletions
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go index bd7649d2f..b65204492 100644 --- a/pkg/sentry/kernel/pipe/pipe.go +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -12,11 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package pipe provides an in-memory implementation of a unidirectional -// pipe. -// -// The goal of this pipe is to emulate the pipe syscall in all of its -// edge cases and guarantees of atomic IO. +// Package pipe provides a pipe implementation. package pipe import ( @@ -32,8 +28,29 @@ import ( "gvisor.googlesource.com/gvisor/pkg/waiter" ) -// DefaultPipeSize is the system-wide default size of a pipe in bytes. -const DefaultPipeSize = 65536 +const ( + // MinimumPipeSize is a hard limit of the minimum size of a pipe. + MinimumPipeSize = 64 << 10 + + // DefaultPipeSize is the system-wide default size of a pipe in bytes. + DefaultPipeSize = MinimumPipeSize + + // MaximumPipeSize is a hard limit on the maximum size of a pipe. + MaximumPipeSize = 8 << 20 +) + +// Sizer is an interface for setting and getting the size of a pipe. +// +// It is implemented by Pipe and, through embedding, all other types. +type Sizer interface { + // PipeSize returns the pipe capacity in bytes. + PipeSize() int64 + + // SetPipeSize sets the new pipe capacity in bytes. + // + // The new size is returned (which may be capped). + SetPipeSize(int64) (int64, error) +} // Pipe is an encapsulation of a platform-independent pipe. // It manages a buffered byte queue shared between a reader/writer @@ -43,49 +60,76 @@ const DefaultPipeSize = 65536 type Pipe struct { waiter.Queue `state:"nosave"` - // Whether this is a named or anonymous pipe. + // isNamed indicates whether this is a named pipe. + // + // This value is immutable. isNamed bool + // atomicIOBytes is the maximum number of bytes that the pipe will + // guarantee atomic reads or writes atomically. + // + // This value is immutable. + atomicIOBytes int64 + // The dirent backing this pipe. Shared by all readers and writers. + // + // This value is immutable. Dirent *fs.Dirent - // The buffered byte queue. - data bufferList + // The number of active readers for this pipe. + // + // Access atomically. + readers int32 - // Max size of the pipe in bytes. When this max has been reached, - // writers will get EWOULDBLOCK. - max int + // The number of active writes for this pipe. + // + // Access atomically. + writers int32 - // Current size of the pipe in bytes. - size int + // mu protects all pipe internal state below. + mu sync.Mutex `state:"nosave"` - // Max number of bytes the pipe can guarantee to read or write - // atomically. - atomicIOBytes int + // data is the buffer queue of pipe contents. + // + // This is protected by mu. + data bufferList - // The number of active readers for this pipe. Load/store atomically. - readers int32 + // max is the maximum size of the pipe in bytes. When this max has been + // reached, writers will get EWOULDBLOCK. + // + // This is protected by mu. + max int64 - // The number of active writes for this pipe. Load/store atomically. - writers int32 + // size is the current size of the pipe in bytes. + // + // This is protected by mu. + size int64 - // This flag indicates if this pipe ever had a writer. Note that this does - // not necessarily indicate there is *currently* a writer, just that there - // has been a writer at some point since the pipe was created. + // hadWriter indicates if this pipe ever had a writer. Note that this + // does not necessarily indicate there is *currently* a writer, just + // that there has been a writer at some point since the pipe was + // created. // - // Protected by mu. + // This is protected by mu. hadWriter bool - - // Lock protecting all pipe internal state. - mu sync.Mutex `state:"nosave"` } -// NewPipe initializes and returns a pipe. A pipe created by this function is -// persistent, and will remain valid even without any open fds to it. Named -// pipes for mknod(2) are created via this function. Note that the -// implementation of blocking semantics for opening the read and write ends of a -// named pipe are left to filesystems. -func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe { +// NewPipe initializes and returns a pipe. +// +// N.B. The size and atomicIOBytes will be bounded. +func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe { + if sizeBytes < MinimumPipeSize { + sizeBytes = MinimumPipeSize + } + if sizeBytes > MaximumPipeSize { + sizeBytes = MaximumPipeSize + } + if atomicIOBytes <= 0 { + atomicIOBytes = 1 + } + if atomicIOBytes > sizeBytes { + atomicIOBytes = sizeBytes + } p := &Pipe{ isNamed: isNamed, max: sizeBytes, @@ -110,48 +154,45 @@ func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *P return p } -// NewConnectedPipe initializes a pipe and returns a pair of objects (which -// implement kio.File) representing the read and write ends of the pipe. A pipe -// created by this function becomes invalid as soon as either the read or write -// end is closed, and errors on subsequent operations on either end. Pipes -// for pipe(2) and pipe2(2) are generally created this way. -func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) { +// NewConnectedPipe initializes a pipe and returns a pair of objects +// representing the read and write ends of the pipe. +func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs.File, *fs.File) { p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes) - return p.ROpen(ctx), p.WOpen(ctx) -} - -// ROpen opens the pipe for reading. -func (p *Pipe) ROpen(ctx context.Context) *fs.File { - p.rOpen() - return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true}, &Reader{ - ReaderWriter: ReaderWriter{Pipe: p}, - }) -} - -// WOpen opens the pipe for writing. -func (p *Pipe) WOpen(ctx context.Context) *fs.File { - p.wOpen() - return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Write: true}, &Writer{ - ReaderWriter: ReaderWriter{Pipe: p}, - }) + return p.Open(ctx, fs.FileFlags{Read: true}), p.Open(ctx, fs.FileFlags{Write: true}) } -// RWOpen opens the pipe for both reading and writing. -func (p *Pipe) RWOpen(ctx context.Context) *fs.File { - p.rOpen() - p.wOpen() - return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{ - Pipe: p, - }) +// Open opens the pipe and returns a new file. +// +// Precondition: at least one of flags.Read or flags.Write must be set. +func (p *Pipe) Open(ctx context.Context, flags fs.FileFlags) *fs.File { + switch { + case flags.Read && flags.Write: + p.rOpen() + p.wOpen() + return fs.NewFile(ctx, p.Dirent, flags, &ReaderWriter{ + Pipe: p, + }) + case flags.Read: + p.rOpen() + return fs.NewFile(ctx, p.Dirent, flags, &Reader{ + ReaderWriter: ReaderWriter{Pipe: p}, + }) + case flags.Write: + p.wOpen() + return fs.NewFile(ctx, p.Dirent, flags, &Writer{ + ReaderWriter: ReaderWriter{Pipe: p}, + }) + default: + // Precondition violated. + panic("invalid pipe flags") + } } // read reads data from the pipe into dst and returns the number of bytes // read, or returns ErrWouldBlock if the pipe is empty. +// +// Precondition: this pipe must have readers. func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) { - if !p.HasReaders() { - return 0, syscall.EBADF - } - // Don't block for a zero-length read even if the pipe is empty. if dst.NumBytes() == 0 { return 0, nil @@ -159,8 +200,8 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) p.mu.Lock() defer p.mu.Unlock() - // If there is nothing to read at the moment but there is a writer, tell the - // caller to block. + + // Is the pipe empty? if p.size == 0 { if !p.HasWriters() { // There are no writers, return EOF. @@ -168,64 +209,94 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) } return 0, syserror.ErrWouldBlock } - var n int64 - for buffer := p.data.Front(); buffer != nil; buffer = p.data.Front() { - n0, err := dst.CopyOut(ctx, buffer.bytes()) - n += int64(n0) - p.size -= n0 - if buffer.truncate(n0) == 0 { - p.data.Remove(buffer) + + // Limit how much we consume. + if dst.NumBytes() > p.size { + dst = dst.TakeFirst64(p.size) + } + + done := int64(0) + for dst.NumBytes() > 0 { + // Pop the first buffer. + first := p.data.Front() + if first == nil { + break } - dst = dst.DropFirst(n0) - if dst.NumBytes() == 0 || err != nil { - return n, err + + // Copy user data. + n, err := dst.CopyOutFrom(ctx, first) + done += int64(n) + p.size -= n + dst = dst.DropFirst64(n) + + // Empty buffer? + if first.Empty() { + // Push to the free list. + p.data.Remove(first) + bufferPool.Put(first) + } + + // Handle errors. + if err != nil { + return done, err } } - return n, nil + + return done, nil } // write writes data from sv into the pipe and returns the number of bytes // written. If no bytes are written because the pipe is full (or has less than // atomicIOBytes free capacity), write returns ErrWouldBlock. +// +// Precondition: this pipe must have writers. func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) { p.mu.Lock() defer p.mu.Unlock() - if !p.HasWriters() { - return 0, syscall.EBADF - } + // Can't write to a pipe with no readers. if !p.HasReaders() { return 0, syscall.EPIPE } // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be - // atomic, but requires no atomicity for writes larger than this. However, - // Linux appears to provide stronger semantics than this in practice: - // unmerged writes are done one PAGE_SIZE buffer at a time, so for larger - // writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement - // this by writing at most atomicIOBytes at a time if we can't service the - // write in its entirety. - canWrite := src.NumBytes() - if canWrite > int64(p.max-p.size) { - if p.max-p.size >= p.atomicIOBytes { - canWrite = int64(p.atomicIOBytes) - } else { + // atomic, but requires no atomicity for writes larger than this. + wanted := src.NumBytes() + if avail := p.max - p.size; wanted > avail { + if wanted <= p.atomicIOBytes { return 0, syserror.ErrWouldBlock } + // Limit to the available capacity. + src = src.TakeFirst64(avail) } - // Copy data from user memory into a pipe-owned buffer. - buf := make([]byte, canWrite) - n, err := src.CopyIn(ctx, buf) - if n > 0 { - p.data.PushBack(newBuffer(buf[:n])) + done := int64(0) + for src.NumBytes() > 0 { + // Need a new buffer? + last := p.data.Back() + if last == nil || last.Full() { + // Add a new buffer to the data list. + last = newBuffer() + p.data.PushBack(last) + } + + // Copy user data. + n, err := src.CopyInTo(ctx, last) + done += int64(n) p.size += n + src = src.DropFirst64(n) + + // Handle errors. + if err != nil { + return done, err + } } - if int64(n) < src.NumBytes() && err == nil { + if wanted > done { // Partial write due to full pipe. - err = syserror.ErrWouldBlock + return done, syserror.ErrWouldBlock } - return int64(n), err + + return done, nil } // rOpen signals a new reader of the pipe. @@ -267,6 +338,9 @@ func (p *Pipe) HasWriters() bool { return atomic.LoadInt32(&p.writers) > 0 } +// rReadinessLocked calculates the read readiness. +// +// Precondition: mu must be held. func (p *Pipe) rReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasReaders() && p.data.Front() != nil { @@ -290,6 +364,9 @@ func (p *Pipe) rReadiness() waiter.EventMask { return p.rReadinessLocked() } +// wReadinessLocked calculates the write readiness. +// +// Precondition: mu must be held. func (p *Pipe) wReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasWriters() && p.size < p.max { @@ -317,8 +394,36 @@ func (p *Pipe) rwReadiness() waiter.EventMask { return p.rReadinessLocked() | p.wReadinessLocked() } -func (p *Pipe) queuedSize() int { +// queued returns the amount of queued data. +func (p *Pipe) queued() int64 { p.mu.Lock() defer p.mu.Unlock() return p.size } + +// PipeSize implements PipeSizer.PipeSize. +func (p *Pipe) PipeSize() int64 { + p.mu.Lock() + defer p.mu.Unlock() + return p.max +} + +// SetPipeSize implements PipeSize.SetPipeSize. +func (p *Pipe) SetPipeSize(size int64) (int64, error) { + if size < 0 { + return 0, syserror.EINVAL + } + if size < MinimumPipeSize { + size = MinimumPipeSize // Per spec. + } + if size > MaximumPipeSize { + return 0, syserror.EPERM + } + p.mu.Lock() + defer p.mu.Unlock() + if size < p.size { + return 0, syserror.EBUSY + } + p.max = size + return size, nil +} |