summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/kernel/pipe/pipe.go
diff options
context:
space:
mode:
authorJamie Liu <jamieliu@google.com>2021-01-14 17:32:38 -0800
committergVisor bot <gvisor-bot@google.com>2021-01-14 17:35:07 -0800
commite57ebcd37a7b9f98d80e594f2c0baf2220d7b830 (patch)
treec48d591cf3a7f24db2455f5d56b64674ea838baf /pkg/sentry/kernel/pipe/pipe.go
parent95371cff350ef5c22c0e0b76ef9474c16e29a6f6 (diff)
Simplify the pipe implementation.
- Remove the pipe package's dependence on the buffer package, which becomes unused as a result. The buffer package is currently intended to serve two use cases, pipes and temporary buffers, and does neither optimally as a result; this change facilitates retooling the buffer package to better serve the latter. - Pass callbacks taking safemem.BlockSeq to the internal pipe I/O methods, which makes most callbacks trivial. - Fix VFS1's splice() and tee() to immediately return if a pipe returns a partial write. PiperOrigin-RevId: 351911375
Diffstat (limited to 'pkg/sentry/kernel/pipe/pipe.go')
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go198
1 files changed, 113 insertions, 85 deletions
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
index b989e14c7..c551acd99 100644
--- a/pkg/sentry/kernel/pipe/pipe.go
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -21,8 +21,8 @@ import (
"sync/atomic"
"syscall"
- "gvisor.dev/gvisor/pkg/buffer"
"gvisor.dev/gvisor/pkg/context"
+ "gvisor.dev/gvisor/pkg/safemem"
"gvisor.dev/gvisor/pkg/sentry/fs"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/syserror"
@@ -75,10 +75,18 @@ type Pipe struct {
// mu protects all pipe internal state below.
mu sync.Mutex `state:"nosave"`
- // view is the underlying set of buffers.
+ // buf holds the pipe's data. buf is a circular buffer; the first valid
+ // byte in buf is at offset off, and the pipe contains size valid bytes.
+ // bufBlocks contains two identical safemem.Blocks representing buf; this
+ // avoids needing to heap-allocate a new safemem.Block slice when buf is
+ // resized. bufBlockSeq is a safemem.BlockSeq representing bufBlocks.
//
- // This is protected by mu.
- view buffer.View
+ // These fields are protected by mu.
+ buf []byte
+ bufBlocks [2]safemem.Block `state:"nosave"`
+ bufBlockSeq safemem.BlockSeq `state:"nosave"`
+ off int64
+ size int64
// max is the maximum size of the pipe in bytes. When this max has been
// reached, writers will get EWOULDBLOCK.
@@ -99,12 +107,6 @@ type Pipe struct {
//
// N.B. The size will be bounded.
func NewPipe(isNamed bool, sizeBytes int64) *Pipe {
- if sizeBytes < MinimumPipeSize {
- sizeBytes = MinimumPipeSize
- }
- if sizeBytes > MaximumPipeSize {
- sizeBytes = MaximumPipeSize
- }
var p Pipe
initPipe(&p, isNamed, sizeBytes)
return &p
@@ -175,75 +177,71 @@ func (p *Pipe) Open(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) *fs.F
}
}
-type readOps struct {
- // left returns the bytes remaining.
- left func() int64
-
- // limit limits subsequence reads.
- limit func(int64)
-
- // read performs the actual read operation.
- read func(*buffer.View) (int64, error)
-}
-
-// read reads data from the pipe into dst and returns the number of bytes
-// read, or returns ErrWouldBlock if the pipe is empty.
+// peekLocked passes the first count bytes in the pipe to f and returns its
+// result. If fewer than count bytes are available, the safemem.BlockSeq passed
+// to f will be less than count bytes in length.
//
-// Precondition: this pipe must have readers.
-func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) {
- p.mu.Lock()
- defer p.mu.Unlock()
- return p.readLocked(ctx, ops)
-}
-
-func (p *Pipe) readLocked(ctx context.Context, ops readOps) (int64, error) {
+// peekLocked does not mutate the pipe; if the read consumes bytes from the
+// pipe, then the caller is responsible for calling p.consumeLocked() and
+// p.Notify(waiter.EventOut). (The latter must be called with p.mu unlocked.)
+//
+// Preconditions:
+// * p.mu must be locked.
+// * This pipe must have readers.
+func (p *Pipe) peekLocked(count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) {
// Don't block for a zero-length read even if the pipe is empty.
- if ops.left() == 0 {
+ if count == 0 {
return 0, nil
}
- // Is the pipe empty?
- if p.view.Size() == 0 {
- if !p.HasWriters() {
- // There are no writers, return EOF.
- return 0, io.EOF
+ // Limit the amount of data read to the amount of data in the pipe.
+ if count > p.size {
+ if p.size == 0 {
+ if !p.HasWriters() {
+ return 0, io.EOF
+ }
+ return 0, syserror.ErrWouldBlock
}
- return 0, syserror.ErrWouldBlock
+ count = p.size
}
- // Limit how much we consume.
- if ops.left() > p.view.Size() {
- ops.limit(p.view.Size())
- }
+ // Prepare the view of the data to be read.
+ bs := p.bufBlockSeq.DropFirst64(uint64(p.off)).TakeFirst64(uint64(count))
- // Copy user data; the read op is responsible for trimming.
- done, err := ops.read(&p.view)
- return done, err
+ // Perform the read.
+ done, err := f(bs)
+ return int64(done), err
}
-type writeOps struct {
- // left returns the bytes remaining.
- left func() int64
-
- // limit should limit subsequent writes.
- limit func(int64)
-
- // write should write to the provided buffer.
- write func(*buffer.View) (int64, error)
-}
-
-// write writes data from sv into the pipe and returns the number of bytes
-// written. If no bytes are written because the pipe is full (or has less than
-// atomicIOBytes free capacity), write returns ErrWouldBlock.
+// consumeLocked consumes the first n bytes in the pipe, such that they will no
+// longer be visible to future reads.
//
-// Precondition: this pipe must have writers.
-func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) {
- p.mu.Lock()
- defer p.mu.Unlock()
- return p.writeLocked(ctx, ops)
+// Preconditions:
+// * p.mu must be locked.
+// * The pipe must contain at least n bytes.
+func (p *Pipe) consumeLocked(n int64) {
+ p.off += n
+ if max := int64(len(p.buf)); p.off >= max {
+ p.off -= max
+ }
+ p.size -= n
}
-func (p *Pipe) writeLocked(ctx context.Context, ops writeOps) (int64, error) {
+// writeLocked passes a safemem.BlockSeq representing the first count bytes of
+// unused space in the pipe to f and returns the result. If fewer than count
+// bytes are free, the safemem.BlockSeq passed to f will be less than count
+// bytes in length. If the pipe is full or otherwise cannot accomodate a write
+// of any number of bytes up to count, writeLocked returns ErrWouldBlock
+// without calling f.
+//
+// Unlike peekLocked, writeLocked assumes that f returns the number of bytes
+// written to the pipe, and increases the number of bytes stored in the pipe
+// accordingly. Callers are still responsible for calling
+// p.Notify(waiter.EventIn) with p.mu unlocked.
+//
+// Preconditions:
+// * p.mu must be locked.
+func (p *Pipe) writeLocked(count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) {
// Can't write to a pipe with no readers.
if !p.HasReaders() {
return 0, syscall.EPIPE
@@ -251,29 +249,59 @@ func (p *Pipe) writeLocked(ctx context.Context, ops writeOps) (int64, error) {
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
// atomic, but requires no atomicity for writes larger than this.
- wanted := ops.left()
- avail := p.max - p.view.Size()
- if wanted > avail {
- if wanted <= atomicIOBytes {
+ avail := p.max - p.size
+ short := false
+ if count > avail {
+ if count <= atomicIOBytes {
return 0, syserror.ErrWouldBlock
}
- ops.limit(avail)
+ count = avail
+ short = true
}
- // Copy user data.
- done, err := ops.write(&p.view)
- if err != nil {
- return done, err
+ // Ensure that the buffer is big enough.
+ if newLen, oldCap := p.size+count, int64(len(p.buf)); newLen > oldCap {
+ // Allocate a new buffer.
+ newCap := oldCap * 2
+ if oldCap == 0 {
+ newCap = 8 // arbitrary; sending individual integers across pipes is relatively common
+ }
+ for newLen > newCap {
+ newCap *= 2
+ }
+ if newCap > p.max {
+ newCap = p.max
+ }
+ newBuf := make([]byte, newCap)
+ // Copy the old buffer's contents to the beginning of the new one.
+ safemem.CopySeq(
+ safemem.BlockSeqOf(safemem.BlockFromSafeSlice(newBuf)),
+ p.bufBlockSeq.DropFirst64(uint64(p.off)).TakeFirst64(uint64(p.size)))
+ // Switch to the new buffer.
+ p.buf = newBuf
+ p.bufBlocks[0] = safemem.BlockFromSafeSlice(newBuf)
+ p.bufBlocks[1] = p.bufBlocks[0]
+ p.bufBlockSeq = safemem.BlockSeqFromSlice(p.bufBlocks[:])
+ p.off = 0
}
- if done < avail {
- // Non-failure, but short write.
- return done, nil
+ // Prepare the view of the space to be written.
+ woff := p.off + p.size
+ if woff >= int64(len(p.buf)) {
+ woff -= int64(len(p.buf))
}
- if done < wanted {
- // Partial write due to full pipe. Note that this could also be
- // the short write case above, we would expect a second call
- // and the write to return zero bytes in this case.
+ bs := p.bufBlockSeq.DropFirst64(uint64(woff)).TakeFirst64(uint64(count))
+
+ // Perform the write.
+ doneU64, err := f(bs)
+ done := int64(doneU64)
+ p.size += done
+ if done < count || err != nil {
+ return done, err
+ }
+
+ // If we shortened the write, adjust the returned error appropriately.
+ if short {
return done, syserror.ErrWouldBlock
}
@@ -324,7 +352,7 @@ func (p *Pipe) HasWriters() bool {
// Precondition: mu must be held.
func (p *Pipe) rReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
- if p.HasReaders() && p.view.Size() != 0 {
+ if p.HasReaders() && p.size != 0 {
ready |= waiter.EventIn
}
if !p.HasWriters() && p.hadWriter {
@@ -350,7 +378,7 @@ func (p *Pipe) rReadiness() waiter.EventMask {
// Precondition: mu must be held.
func (p *Pipe) wReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
- if p.HasWriters() && p.view.Size() < p.max {
+ if p.HasWriters() && p.size < p.max {
ready |= waiter.EventOut
}
if !p.HasReaders() {
@@ -383,7 +411,7 @@ func (p *Pipe) queued() int64 {
}
func (p *Pipe) queuedLocked() int64 {
- return p.view.Size()
+ return p.size
}
// FifoSize implements fs.FifoSizer.FifoSize.
@@ -406,7 +434,7 @@ func (p *Pipe) SetFifoSize(size int64) (int64, error) {
}
p.mu.Lock()
defer p.mu.Unlock()
- if size < p.view.Size() {
+ if size < p.size {
return 0, syserror.EBUSY
}
p.max = size