summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/kernel/pipe/pipe.go
diff options
context:
space:
mode:
authorAdin Scannell <ascannell@google.com>2019-05-21 20:11:26 -0700
committerShentubot <shentubot@google.com>2019-05-21 20:12:27 -0700
commitae1bb08871758844fa23fc7255ffeb0392f9dee6 (patch)
treeff854ebe0082be7d0df9f47aec4eaf13fe4327ac /pkg/sentry/kernel/pipe/pipe.go
parentc8857f72696c1097a427b75f4340969e20cc0e95 (diff)
Clean up pipe internals and add fcntl support
Pipe internals are made more efficient by avoiding garbage collection. A pool is now used that can be shared by all pipes, and buffers are chained via an intrusive list. The documentation for pipe structures and methods is also simplified and clarified. The pipe tests are now parameterized, so that they are run on all different variants (named pipes, small buffers, default buffers). The pipe buffer sizes are exposed by fcntl, which is now supported by this change. A size change test has been added to the suite. These new tests uncovered a bug regarding the semantics of open named pipes with O_NONBLOCK, which is also fixed by this CL. This fix also addresses the lack of the O_LARGEFILE flag for named pipes. PiperOrigin-RevId: 249375888 Change-Id: I48e61e9c868aedb0cadda2dff33f09a560dee773
Diffstat (limited to 'pkg/sentry/kernel/pipe/pipe.go')
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go315
1 files changed, 210 insertions, 105 deletions
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
index bd7649d2f..b65204492 100644
--- a/pkg/sentry/kernel/pipe/pipe.go
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -12,11 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Package pipe provides an in-memory implementation of a unidirectional
-// pipe.
-//
-// The goal of this pipe is to emulate the pipe syscall in all of its
-// edge cases and guarantees of atomic IO.
+// Package pipe provides a pipe implementation.
package pipe
import (
@@ -32,8 +28,29 @@ import (
"gvisor.googlesource.com/gvisor/pkg/waiter"
)
-// DefaultPipeSize is the system-wide default size of a pipe in bytes.
-const DefaultPipeSize = 65536
+const (
+ // MinimumPipeSize is a hard limit of the minimum size of a pipe.
+ MinimumPipeSize = 64 << 10
+
+ // DefaultPipeSize is the system-wide default size of a pipe in bytes.
+ DefaultPipeSize = MinimumPipeSize
+
+ // MaximumPipeSize is a hard limit on the maximum size of a pipe.
+ MaximumPipeSize = 8 << 20
+)
+
+// Sizer is an interface for setting and getting the size of a pipe.
+//
+// It is implemented by Pipe and, through embedding, all other types.
+type Sizer interface {
+ // PipeSize returns the pipe capacity in bytes.
+ PipeSize() int64
+
+ // SetPipeSize sets the new pipe capacity in bytes.
+ //
+ // The new size is returned (which may be capped).
+ SetPipeSize(int64) (int64, error)
+}
// Pipe is an encapsulation of a platform-independent pipe.
// It manages a buffered byte queue shared between a reader/writer
@@ -43,49 +60,76 @@ const DefaultPipeSize = 65536
type Pipe struct {
waiter.Queue `state:"nosave"`
- // Whether this is a named or anonymous pipe.
+ // isNamed indicates whether this is a named pipe.
+ //
+ // This value is immutable.
isNamed bool
+ // atomicIOBytes is the maximum number of bytes that the pipe will
+ // guarantee atomic reads or writes atomically.
+ //
+ // This value is immutable.
+ atomicIOBytes int64
+
// The dirent backing this pipe. Shared by all readers and writers.
+ //
+ // This value is immutable.
Dirent *fs.Dirent
- // The buffered byte queue.
- data bufferList
+ // The number of active readers for this pipe.
+ //
+ // Access atomically.
+ readers int32
- // Max size of the pipe in bytes. When this max has been reached,
- // writers will get EWOULDBLOCK.
- max int
+ // The number of active writes for this pipe.
+ //
+ // Access atomically.
+ writers int32
- // Current size of the pipe in bytes.
- size int
+ // mu protects all pipe internal state below.
+ mu sync.Mutex `state:"nosave"`
- // Max number of bytes the pipe can guarantee to read or write
- // atomically.
- atomicIOBytes int
+ // data is the buffer queue of pipe contents.
+ //
+ // This is protected by mu.
+ data bufferList
- // The number of active readers for this pipe. Load/store atomically.
- readers int32
+ // max is the maximum size of the pipe in bytes. When this max has been
+ // reached, writers will get EWOULDBLOCK.
+ //
+ // This is protected by mu.
+ max int64
- // The number of active writes for this pipe. Load/store atomically.
- writers int32
+ // size is the current size of the pipe in bytes.
+ //
+ // This is protected by mu.
+ size int64
- // This flag indicates if this pipe ever had a writer. Note that this does
- // not necessarily indicate there is *currently* a writer, just that there
- // has been a writer at some point since the pipe was created.
+ // hadWriter indicates if this pipe ever had a writer. Note that this
+ // does not necessarily indicate there is *currently* a writer, just
+ // that there has been a writer at some point since the pipe was
+ // created.
//
- // Protected by mu.
+ // This is protected by mu.
hadWriter bool
-
- // Lock protecting all pipe internal state.
- mu sync.Mutex `state:"nosave"`
}
-// NewPipe initializes and returns a pipe. A pipe created by this function is
-// persistent, and will remain valid even without any open fds to it. Named
-// pipes for mknod(2) are created via this function. Note that the
-// implementation of blocking semantics for opening the read and write ends of a
-// named pipe are left to filesystems.
-func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe {
+// NewPipe initializes and returns a pipe.
+//
+// N.B. The size and atomicIOBytes will be bounded.
+func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe {
+ if sizeBytes < MinimumPipeSize {
+ sizeBytes = MinimumPipeSize
+ }
+ if sizeBytes > MaximumPipeSize {
+ sizeBytes = MaximumPipeSize
+ }
+ if atomicIOBytes <= 0 {
+ atomicIOBytes = 1
+ }
+ if atomicIOBytes > sizeBytes {
+ atomicIOBytes = sizeBytes
+ }
p := &Pipe{
isNamed: isNamed,
max: sizeBytes,
@@ -110,48 +154,45 @@ func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *P
return p
}
-// NewConnectedPipe initializes a pipe and returns a pair of objects (which
-// implement kio.File) representing the read and write ends of the pipe. A pipe
-// created by this function becomes invalid as soon as either the read or write
-// end is closed, and errors on subsequent operations on either end. Pipes
-// for pipe(2) and pipe2(2) are generally created this way.
-func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) {
+// NewConnectedPipe initializes a pipe and returns a pair of objects
+// representing the read and write ends of the pipe.
+func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs.File, *fs.File) {
p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes)
- return p.ROpen(ctx), p.WOpen(ctx)
-}
-
-// ROpen opens the pipe for reading.
-func (p *Pipe) ROpen(ctx context.Context) *fs.File {
- p.rOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true}, &Reader{
- ReaderWriter: ReaderWriter{Pipe: p},
- })
-}
-
-// WOpen opens the pipe for writing.
-func (p *Pipe) WOpen(ctx context.Context) *fs.File {
- p.wOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Write: true}, &Writer{
- ReaderWriter: ReaderWriter{Pipe: p},
- })
+ return p.Open(ctx, fs.FileFlags{Read: true}), p.Open(ctx, fs.FileFlags{Write: true})
}
-// RWOpen opens the pipe for both reading and writing.
-func (p *Pipe) RWOpen(ctx context.Context) *fs.File {
- p.rOpen()
- p.wOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{
- Pipe: p,
- })
+// Open opens the pipe and returns a new file.
+//
+// Precondition: at least one of flags.Read or flags.Write must be set.
+func (p *Pipe) Open(ctx context.Context, flags fs.FileFlags) *fs.File {
+ switch {
+ case flags.Read && flags.Write:
+ p.rOpen()
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &ReaderWriter{
+ Pipe: p,
+ })
+ case flags.Read:
+ p.rOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Reader{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ case flags.Write:
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Writer{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ default:
+ // Precondition violated.
+ panic("invalid pipe flags")
+ }
}
// read reads data from the pipe into dst and returns the number of bytes
// read, or returns ErrWouldBlock if the pipe is empty.
+//
+// Precondition: this pipe must have readers.
func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
- if !p.HasReaders() {
- return 0, syscall.EBADF
- }
-
// Don't block for a zero-length read even if the pipe is empty.
if dst.NumBytes() == 0 {
return 0, nil
@@ -159,8 +200,8 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
p.mu.Lock()
defer p.mu.Unlock()
- // If there is nothing to read at the moment but there is a writer, tell the
- // caller to block.
+
+ // Is the pipe empty?
if p.size == 0 {
if !p.HasWriters() {
// There are no writers, return EOF.
@@ -168,64 +209,94 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
}
return 0, syserror.ErrWouldBlock
}
- var n int64
- for buffer := p.data.Front(); buffer != nil; buffer = p.data.Front() {
- n0, err := dst.CopyOut(ctx, buffer.bytes())
- n += int64(n0)
- p.size -= n0
- if buffer.truncate(n0) == 0 {
- p.data.Remove(buffer)
+
+ // Limit how much we consume.
+ if dst.NumBytes() > p.size {
+ dst = dst.TakeFirst64(p.size)
+ }
+
+ done := int64(0)
+ for dst.NumBytes() > 0 {
+ // Pop the first buffer.
+ first := p.data.Front()
+ if first == nil {
+ break
}
- dst = dst.DropFirst(n0)
- if dst.NumBytes() == 0 || err != nil {
- return n, err
+
+ // Copy user data.
+ n, err := dst.CopyOutFrom(ctx, first)
+ done += int64(n)
+ p.size -= n
+ dst = dst.DropFirst64(n)
+
+ // Empty buffer?
+ if first.Empty() {
+ // Push to the free list.
+ p.data.Remove(first)
+ bufferPool.Put(first)
+ }
+
+ // Handle errors.
+ if err != nil {
+ return done, err
}
}
- return n, nil
+
+ return done, nil
}
// write writes data from sv into the pipe and returns the number of bytes
// written. If no bytes are written because the pipe is full (or has less than
// atomicIOBytes free capacity), write returns ErrWouldBlock.
+//
+// Precondition: this pipe must have writers.
func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
- if !p.HasWriters() {
- return 0, syscall.EBADF
- }
+ // Can't write to a pipe with no readers.
if !p.HasReaders() {
return 0, syscall.EPIPE
}
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
- // atomic, but requires no atomicity for writes larger than this. However,
- // Linux appears to provide stronger semantics than this in practice:
- // unmerged writes are done one PAGE_SIZE buffer at a time, so for larger
- // writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement
- // this by writing at most atomicIOBytes at a time if we can't service the
- // write in its entirety.
- canWrite := src.NumBytes()
- if canWrite > int64(p.max-p.size) {
- if p.max-p.size >= p.atomicIOBytes {
- canWrite = int64(p.atomicIOBytes)
- } else {
+ // atomic, but requires no atomicity for writes larger than this.
+ wanted := src.NumBytes()
+ if avail := p.max - p.size; wanted > avail {
+ if wanted <= p.atomicIOBytes {
return 0, syserror.ErrWouldBlock
}
+ // Limit to the available capacity.
+ src = src.TakeFirst64(avail)
}
- // Copy data from user memory into a pipe-owned buffer.
- buf := make([]byte, canWrite)
- n, err := src.CopyIn(ctx, buf)
- if n > 0 {
- p.data.PushBack(newBuffer(buf[:n]))
+ done := int64(0)
+ for src.NumBytes() > 0 {
+ // Need a new buffer?
+ last := p.data.Back()
+ if last == nil || last.Full() {
+ // Add a new buffer to the data list.
+ last = newBuffer()
+ p.data.PushBack(last)
+ }
+
+ // Copy user data.
+ n, err := src.CopyInTo(ctx, last)
+ done += int64(n)
p.size += n
+ src = src.DropFirst64(n)
+
+ // Handle errors.
+ if err != nil {
+ return done, err
+ }
}
- if int64(n) < src.NumBytes() && err == nil {
+ if wanted > done {
// Partial write due to full pipe.
- err = syserror.ErrWouldBlock
+ return done, syserror.ErrWouldBlock
}
- return int64(n), err
+
+ return done, nil
}
// rOpen signals a new reader of the pipe.
@@ -267,6 +338,9 @@ func (p *Pipe) HasWriters() bool {
return atomic.LoadInt32(&p.writers) > 0
}
+// rReadinessLocked calculates the read readiness.
+//
+// Precondition: mu must be held.
func (p *Pipe) rReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasReaders() && p.data.Front() != nil {
@@ -290,6 +364,9 @@ func (p *Pipe) rReadiness() waiter.EventMask {
return p.rReadinessLocked()
}
+// wReadinessLocked calculates the write readiness.
+//
+// Precondition: mu must be held.
func (p *Pipe) wReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasWriters() && p.size < p.max {
@@ -317,8 +394,36 @@ func (p *Pipe) rwReadiness() waiter.EventMask {
return p.rReadinessLocked() | p.wReadinessLocked()
}
-func (p *Pipe) queuedSize() int {
+// queued returns the amount of queued data.
+func (p *Pipe) queued() int64 {
p.mu.Lock()
defer p.mu.Unlock()
return p.size
}
+
+// PipeSize implements PipeSizer.PipeSize.
+func (p *Pipe) PipeSize() int64 {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.max
+}
+
+// SetPipeSize implements PipeSize.SetPipeSize.
+func (p *Pipe) SetPipeSize(size int64) (int64, error) {
+ if size < 0 {
+ return 0, syserror.EINVAL
+ }
+ if size < MinimumPipeSize {
+ size = MinimumPipeSize // Per spec.
+ }
+ if size > MaximumPipeSize {
+ return 0, syserror.EPERM
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if size < p.size {
+ return 0, syserror.EBUSY
+ }
+ p.max = size
+ return size, nil
+}