summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/kernel/pipe/pipe.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/sentry/kernel/pipe/pipe.go')
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go315
1 files changed, 210 insertions, 105 deletions
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
index bd7649d2f..b65204492 100644
--- a/pkg/sentry/kernel/pipe/pipe.go
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -12,11 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Package pipe provides an in-memory implementation of a unidirectional
-// pipe.
-//
-// The goal of this pipe is to emulate the pipe syscall in all of its
-// edge cases and guarantees of atomic IO.
+// Package pipe provides a pipe implementation.
package pipe
import (
@@ -32,8 +28,29 @@ import (
"gvisor.googlesource.com/gvisor/pkg/waiter"
)
-// DefaultPipeSize is the system-wide default size of a pipe in bytes.
-const DefaultPipeSize = 65536
+const (
+ // MinimumPipeSize is a hard limit of the minimum size of a pipe.
+ MinimumPipeSize = 64 << 10
+
+ // DefaultPipeSize is the system-wide default size of a pipe in bytes.
+ DefaultPipeSize = MinimumPipeSize
+
+ // MaximumPipeSize is a hard limit on the maximum size of a pipe.
+ MaximumPipeSize = 8 << 20
+)
+
+// Sizer is an interface for setting and getting the size of a pipe.
+//
+// It is implemented by Pipe and, through embedding, all other types.
+type Sizer interface {
+ // PipeSize returns the pipe capacity in bytes.
+ PipeSize() int64
+
+ // SetPipeSize sets the new pipe capacity in bytes.
+ //
+ // The new size is returned (which may be capped).
+ SetPipeSize(int64) (int64, error)
+}
// Pipe is an encapsulation of a platform-independent pipe.
// It manages a buffered byte queue shared between a reader/writer
@@ -43,49 +60,76 @@ const DefaultPipeSize = 65536
type Pipe struct {
waiter.Queue `state:"nosave"`
- // Whether this is a named or anonymous pipe.
+ // isNamed indicates whether this is a named pipe.
+ //
+ // This value is immutable.
isNamed bool
+ // atomicIOBytes is the maximum number of bytes that the pipe will
+ // guarantee atomic reads or writes atomically.
+ //
+ // This value is immutable.
+ atomicIOBytes int64
+
// The dirent backing this pipe. Shared by all readers and writers.
+ //
+ // This value is immutable.
Dirent *fs.Dirent
- // The buffered byte queue.
- data bufferList
+ // The number of active readers for this pipe.
+ //
+ // Access atomically.
+ readers int32
- // Max size of the pipe in bytes. When this max has been reached,
- // writers will get EWOULDBLOCK.
- max int
+ // The number of active writes for this pipe.
+ //
+ // Access atomically.
+ writers int32
- // Current size of the pipe in bytes.
- size int
+ // mu protects all pipe internal state below.
+ mu sync.Mutex `state:"nosave"`
- // Max number of bytes the pipe can guarantee to read or write
- // atomically.
- atomicIOBytes int
+ // data is the buffer queue of pipe contents.
+ //
+ // This is protected by mu.
+ data bufferList
- // The number of active readers for this pipe. Load/store atomically.
- readers int32
+ // max is the maximum size of the pipe in bytes. When this max has been
+ // reached, writers will get EWOULDBLOCK.
+ //
+ // This is protected by mu.
+ max int64
- // The number of active writes for this pipe. Load/store atomically.
- writers int32
+ // size is the current size of the pipe in bytes.
+ //
+ // This is protected by mu.
+ size int64
- // This flag indicates if this pipe ever had a writer. Note that this does
- // not necessarily indicate there is *currently* a writer, just that there
- // has been a writer at some point since the pipe was created.
+ // hadWriter indicates if this pipe ever had a writer. Note that this
+ // does not necessarily indicate there is *currently* a writer, just
+ // that there has been a writer at some point since the pipe was
+ // created.
//
- // Protected by mu.
+ // This is protected by mu.
hadWriter bool
-
- // Lock protecting all pipe internal state.
- mu sync.Mutex `state:"nosave"`
}
-// NewPipe initializes and returns a pipe. A pipe created by this function is
-// persistent, and will remain valid even without any open fds to it. Named
-// pipes for mknod(2) are created via this function. Note that the
-// implementation of blocking semantics for opening the read and write ends of a
-// named pipe are left to filesystems.
-func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe {
+// NewPipe initializes and returns a pipe.
+//
+// N.B. The size and atomicIOBytes will be bounded.
+func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe {
+ if sizeBytes < MinimumPipeSize {
+ sizeBytes = MinimumPipeSize
+ }
+ if sizeBytes > MaximumPipeSize {
+ sizeBytes = MaximumPipeSize
+ }
+ if atomicIOBytes <= 0 {
+ atomicIOBytes = 1
+ }
+ if atomicIOBytes > sizeBytes {
+ atomicIOBytes = sizeBytes
+ }
p := &Pipe{
isNamed: isNamed,
max: sizeBytes,
@@ -110,48 +154,45 @@ func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *P
return p
}
-// NewConnectedPipe initializes a pipe and returns a pair of objects (which
-// implement kio.File) representing the read and write ends of the pipe. A pipe
-// created by this function becomes invalid as soon as either the read or write
-// end is closed, and errors on subsequent operations on either end. Pipes
-// for pipe(2) and pipe2(2) are generally created this way.
-func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) {
+// NewConnectedPipe initializes a pipe and returns a pair of objects
+// representing the read and write ends of the pipe.
+func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs.File, *fs.File) {
p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes)
- return p.ROpen(ctx), p.WOpen(ctx)
-}
-
-// ROpen opens the pipe for reading.
-func (p *Pipe) ROpen(ctx context.Context) *fs.File {
- p.rOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true}, &Reader{
- ReaderWriter: ReaderWriter{Pipe: p},
- })
-}
-
-// WOpen opens the pipe for writing.
-func (p *Pipe) WOpen(ctx context.Context) *fs.File {
- p.wOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Write: true}, &Writer{
- ReaderWriter: ReaderWriter{Pipe: p},
- })
+ return p.Open(ctx, fs.FileFlags{Read: true}), p.Open(ctx, fs.FileFlags{Write: true})
}
-// RWOpen opens the pipe for both reading and writing.
-func (p *Pipe) RWOpen(ctx context.Context) *fs.File {
- p.rOpen()
- p.wOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{
- Pipe: p,
- })
+// Open opens the pipe and returns a new file.
+//
+// Precondition: at least one of flags.Read or flags.Write must be set.
+func (p *Pipe) Open(ctx context.Context, flags fs.FileFlags) *fs.File {
+ switch {
+ case flags.Read && flags.Write:
+ p.rOpen()
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &ReaderWriter{
+ Pipe: p,
+ })
+ case flags.Read:
+ p.rOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Reader{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ case flags.Write:
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Writer{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ default:
+ // Precondition violated.
+ panic("invalid pipe flags")
+ }
}
// read reads data from the pipe into dst and returns the number of bytes
// read, or returns ErrWouldBlock if the pipe is empty.
+//
+// Precondition: this pipe must have readers.
func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
- if !p.HasReaders() {
- return 0, syscall.EBADF
- }
-
// Don't block for a zero-length read even if the pipe is empty.
if dst.NumBytes() == 0 {
return 0, nil
@@ -159,8 +200,8 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
p.mu.Lock()
defer p.mu.Unlock()
- // If there is nothing to read at the moment but there is a writer, tell the
- // caller to block.
+
+ // Is the pipe empty?
if p.size == 0 {
if !p.HasWriters() {
// There are no writers, return EOF.
@@ -168,64 +209,94 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
}
return 0, syserror.ErrWouldBlock
}
- var n int64
- for buffer := p.data.Front(); buffer != nil; buffer = p.data.Front() {
- n0, err := dst.CopyOut(ctx, buffer.bytes())
- n += int64(n0)
- p.size -= n0
- if buffer.truncate(n0) == 0 {
- p.data.Remove(buffer)
+
+ // Limit how much we consume.
+ if dst.NumBytes() > p.size {
+ dst = dst.TakeFirst64(p.size)
+ }
+
+ done := int64(0)
+ for dst.NumBytes() > 0 {
+ // Pop the first buffer.
+ first := p.data.Front()
+ if first == nil {
+ break
}
- dst = dst.DropFirst(n0)
- if dst.NumBytes() == 0 || err != nil {
- return n, err
+
+ // Copy user data.
+ n, err := dst.CopyOutFrom(ctx, first)
+ done += int64(n)
+ p.size -= n
+ dst = dst.DropFirst64(n)
+
+ // Empty buffer?
+ if first.Empty() {
+ // Push to the free list.
+ p.data.Remove(first)
+ bufferPool.Put(first)
+ }
+
+ // Handle errors.
+ if err != nil {
+ return done, err
}
}
- return n, nil
+
+ return done, nil
}
// write writes data from sv into the pipe and returns the number of bytes
// written. If no bytes are written because the pipe is full (or has less than
// atomicIOBytes free capacity), write returns ErrWouldBlock.
+//
+// Precondition: this pipe must have writers.
func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
- if !p.HasWriters() {
- return 0, syscall.EBADF
- }
+ // Can't write to a pipe with no readers.
if !p.HasReaders() {
return 0, syscall.EPIPE
}
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
- // atomic, but requires no atomicity for writes larger than this. However,
- // Linux appears to provide stronger semantics than this in practice:
- // unmerged writes are done one PAGE_SIZE buffer at a time, so for larger
- // writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement
- // this by writing at most atomicIOBytes at a time if we can't service the
- // write in its entirety.
- canWrite := src.NumBytes()
- if canWrite > int64(p.max-p.size) {
- if p.max-p.size >= p.atomicIOBytes {
- canWrite = int64(p.atomicIOBytes)
- } else {
+ // atomic, but requires no atomicity for writes larger than this.
+ wanted := src.NumBytes()
+ if avail := p.max - p.size; wanted > avail {
+ if wanted <= p.atomicIOBytes {
return 0, syserror.ErrWouldBlock
}
+ // Limit to the available capacity.
+ src = src.TakeFirst64(avail)
}
- // Copy data from user memory into a pipe-owned buffer.
- buf := make([]byte, canWrite)
- n, err := src.CopyIn(ctx, buf)
- if n > 0 {
- p.data.PushBack(newBuffer(buf[:n]))
+ done := int64(0)
+ for src.NumBytes() > 0 {
+ // Need a new buffer?
+ last := p.data.Back()
+ if last == nil || last.Full() {
+ // Add a new buffer to the data list.
+ last = newBuffer()
+ p.data.PushBack(last)
+ }
+
+ // Copy user data.
+ n, err := src.CopyInTo(ctx, last)
+ done += int64(n)
p.size += n
+ src = src.DropFirst64(n)
+
+ // Handle errors.
+ if err != nil {
+ return done, err
+ }
}
- if int64(n) < src.NumBytes() && err == nil {
+ if wanted > done {
// Partial write due to full pipe.
- err = syserror.ErrWouldBlock
+ return done, syserror.ErrWouldBlock
}
- return int64(n), err
+
+ return done, nil
}
// rOpen signals a new reader of the pipe.
@@ -267,6 +338,9 @@ func (p *Pipe) HasWriters() bool {
return atomic.LoadInt32(&p.writers) > 0
}
+// rReadinessLocked calculates the read readiness.
+//
+// Precondition: mu must be held.
func (p *Pipe) rReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasReaders() && p.data.Front() != nil {
@@ -290,6 +364,9 @@ func (p *Pipe) rReadiness() waiter.EventMask {
return p.rReadinessLocked()
}
+// wReadinessLocked calculates the write readiness.
+//
+// Precondition: mu must be held.
func (p *Pipe) wReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasWriters() && p.size < p.max {
@@ -317,8 +394,36 @@ func (p *Pipe) rwReadiness() waiter.EventMask {
return p.rReadinessLocked() | p.wReadinessLocked()
}
-func (p *Pipe) queuedSize() int {
+// queued returns the amount of queued data.
+func (p *Pipe) queued() int64 {
p.mu.Lock()
defer p.mu.Unlock()
return p.size
}
+
+// PipeSize implements PipeSizer.PipeSize.
+func (p *Pipe) PipeSize() int64 {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.max
+}
+
+// SetPipeSize implements PipeSize.SetPipeSize.
+func (p *Pipe) SetPipeSize(size int64) (int64, error) {
+ if size < 0 {
+ return 0, syserror.EINVAL
+ }
+ if size < MinimumPipeSize {
+ size = MinimumPipeSize // Per spec.
+ }
+ if size > MaximumPipeSize {
+ return 0, syserror.EPERM
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if size < p.size {
+ return 0, syserror.EBUSY
+ }
+ p.max = size
+ return size, nil
+}