summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/kernel/pipe
diff options
context:
space:
mode:
authorAdin Scannell <ascannell@google.com>2019-09-12 17:42:14 -0700
committergVisor bot <gvisor-bot@google.com>2019-09-12 17:43:27 -0700
commit7c6ab6a219f37a1d4c18ced4a602458fcf363f85 (patch)
tree77ab0d0aaad32a81b09ee7ad55b97b80aaec5402 /pkg/sentry/kernel/pipe
parentdf5d377521e625aeb8f4fe18bd1d9974dbf9998c (diff)
Implement splice methods for pipes and sockets.
This also allows the tee(2) implementation to be enabled, since dup can now be properly supported via WriteTo. Note that this change necessitated some minor restructoring with the fs.FileOperations splice methods. If the *fs.File is passed through directly, then only public API methods are accessible, which will deadlock immediately since the locking is already done by fs.Splice. Instead, we pass through an abstract io.Reader or io.Writer, which elide locks and use the underlying fs.FileOperations directly. PiperOrigin-RevId: 268805207
Diffstat (limited to 'pkg/sentry/kernel/pipe')
-rw-r--r--pkg/sentry/kernel/pipe/buffer.go25
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go82
-rw-r--r--pkg/sentry/kernel/pipe/reader_writer.go76
3 files changed, 166 insertions, 17 deletions
diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go
index 69ef2a720..95bee2d37 100644
--- a/pkg/sentry/kernel/pipe/buffer.go
+++ b/pkg/sentry/kernel/pipe/buffer.go
@@ -15,6 +15,7 @@
package pipe
import (
+ "io"
"sync"
"gvisor.dev/gvisor/pkg/sentry/safemem"
@@ -67,6 +68,17 @@ func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) {
return n, err
}
+// WriteFromReader writes to the buffer from an io.Reader.
+func (b *buffer) WriteFromReader(r io.Reader, count int64) (int64, error) {
+ dst := b.data[b.write:]
+ if count < int64(len(dst)) {
+ dst = b.data[b.write:][:count]
+ }
+ n, err := r.Read(dst)
+ b.write += n
+ return int64(n), err
+}
+
// ReadToBlocks implements safemem.Reader.ReadToBlocks.
func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write]))
@@ -75,6 +87,19 @@ func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
return n, err
}
+// ReadToWriter reads from the buffer into an io.Writer.
+func (b *buffer) ReadToWriter(w io.Writer, count int64, dup bool) (int64, error) {
+ src := b.data[b.read:b.write]
+ if count < int64(len(src)) {
+ src = b.data[b.read:][:count]
+ }
+ n, err := w.Write(src)
+ if !dup {
+ b.read += n
+ }
+ return int64(n), err
+}
+
// bufferPool is a pool for buffers.
var bufferPool = sync.Pool{
New: func() interface{} {
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
index 247e2928e..93b50669f 100644
--- a/pkg/sentry/kernel/pipe/pipe.go
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -23,7 +23,6 @@ import (
"gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/sentry/fs"
- "gvisor.dev/gvisor/pkg/sentry/usermem"
"gvisor.dev/gvisor/pkg/syserror"
"gvisor.dev/gvisor/pkg/waiter"
)
@@ -173,13 +172,24 @@ func (p *Pipe) Open(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) *fs.F
}
}
+type readOps struct {
+ // left returns the bytes remaining.
+ left func() int64
+
+ // limit limits subsequence reads.
+ limit func(int64)
+
+ // read performs the actual read operation.
+ read func(*buffer) (int64, error)
+}
+
// read reads data from the pipe into dst and returns the number of bytes
// read, or returns ErrWouldBlock if the pipe is empty.
//
// Precondition: this pipe must have readers.
-func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
+func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) {
// Don't block for a zero-length read even if the pipe is empty.
- if dst.NumBytes() == 0 {
+ if ops.left() == 0 {
return 0, nil
}
@@ -196,12 +206,12 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
}
// Limit how much we consume.
- if dst.NumBytes() > p.size {
- dst = dst.TakeFirst64(p.size)
+ if ops.left() > p.size {
+ ops.limit(p.size)
}
done := int64(0)
- for dst.NumBytes() > 0 {
+ for ops.left() > 0 {
// Pop the first buffer.
first := p.data.Front()
if first == nil {
@@ -209,10 +219,9 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
}
// Copy user data.
- n, err := dst.CopyOutFrom(ctx, first)
+ n, err := ops.read(first)
done += int64(n)
p.size -= n
- dst = dst.DropFirst64(n)
// Empty buffer?
if first.Empty() {
@@ -230,12 +239,57 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
return done, nil
}
+// dup duplicates all data from this pipe into the given writer.
+//
+// There is no blocking behavior implemented here. The writer may propagate
+// some blocking error. All the writes must be complete writes.
+func (p *Pipe) dup(ctx context.Context, ops readOps) (int64, error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ // Is the pipe empty?
+ if p.size == 0 {
+ if !p.HasWriters() {
+ // See above.
+ return 0, nil
+ }
+ return 0, syserror.ErrWouldBlock
+ }
+
+ // Limit how much we consume.
+ if ops.left() > p.size {
+ ops.limit(p.size)
+ }
+
+ done := int64(0)
+ for buf := p.data.Front(); buf != nil; buf = buf.Next() {
+ n, err := ops.read(buf)
+ done += n
+ if err != nil {
+ return done, err
+ }
+ }
+
+ return done, nil
+}
+
+type writeOps struct {
+ // left returns the bytes remaining.
+ left func() int64
+
+ // limit should limit subsequent writes.
+ limit func(int64)
+
+ // write should write to the provided buffer.
+ write func(*buffer) (int64, error)
+}
+
// write writes data from sv into the pipe and returns the number of bytes
// written. If no bytes are written because the pipe is full (or has less than
// atomicIOBytes free capacity), write returns ErrWouldBlock.
//
// Precondition: this pipe must have writers.
-func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
+func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
@@ -246,17 +300,16 @@ func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error)
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
// atomic, but requires no atomicity for writes larger than this.
- wanted := src.NumBytes()
+ wanted := ops.left()
if avail := p.max - p.size; wanted > avail {
if wanted <= p.atomicIOBytes {
return 0, syserror.ErrWouldBlock
}
- // Limit to the available capacity.
- src = src.TakeFirst64(avail)
+ ops.limit(avail)
}
done := int64(0)
- for src.NumBytes() > 0 {
+ for ops.left() > 0 {
// Need a new buffer?
last := p.data.Back()
if last == nil || last.Full() {
@@ -266,10 +319,9 @@ func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error)
}
// Copy user data.
- n, err := src.CopyInTo(ctx, last)
+ n, err := ops.write(last)
done += int64(n)
p.size += n
- src = src.DropFirst64(n)
// Handle errors.
if err != nil {
diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go
index f69dbf27b..7c307f013 100644
--- a/pkg/sentry/kernel/pipe/reader_writer.go
+++ b/pkg/sentry/kernel/pipe/reader_writer.go
@@ -15,6 +15,7 @@
package pipe
import (
+ "io"
"math"
"syscall"
@@ -55,7 +56,45 @@ func (rw *ReaderWriter) Release() {
// Read implements fs.FileOperations.Read.
func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) {
- n, err := rw.Pipe.read(ctx, dst)
+ n, err := rw.Pipe.read(ctx, readOps{
+ left: func() int64 {
+ return dst.NumBytes()
+ },
+ limit: func(l int64) {
+ dst = dst.TakeFirst64(l)
+ },
+ read: func(buf *buffer) (int64, error) {
+ n, err := dst.CopyOutFrom(ctx, buf)
+ dst = dst.DropFirst64(n)
+ return n, err
+ },
+ })
+ if n > 0 {
+ rw.Pipe.Notify(waiter.EventOut)
+ }
+ return n, err
+}
+
+// WriteTo implements fs.FileOperations.WriteTo.
+func (rw *ReaderWriter) WriteTo(ctx context.Context, _ *fs.File, w io.Writer, count int64, dup bool) (int64, error) {
+ ops := readOps{
+ left: func() int64 {
+ return count
+ },
+ limit: func(l int64) {
+ count = l
+ },
+ read: func(buf *buffer) (int64, error) {
+ n, err := buf.ReadToWriter(w, count, dup)
+ count -= n
+ return n, err
+ },
+ }
+ if dup {
+ // There is no notification for dup operations.
+ return rw.Pipe.dup(ctx, ops)
+ }
+ n, err := rw.Pipe.read(ctx, ops)
if n > 0 {
rw.Pipe.Notify(waiter.EventOut)
}
@@ -64,7 +103,40 @@ func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequ
// Write implements fs.FileOperations.Write.
func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) {
- n, err := rw.Pipe.write(ctx, src)
+ n, err := rw.Pipe.write(ctx, writeOps{
+ left: func() int64 {
+ return src.NumBytes()
+ },
+ limit: func(l int64) {
+ src = src.TakeFirst64(l)
+ },
+ write: func(buf *buffer) (int64, error) {
+ n, err := src.CopyInTo(ctx, buf)
+ src = src.DropFirst64(n)
+ return n, err
+ },
+ })
+ if n > 0 {
+ rw.Pipe.Notify(waiter.EventIn)
+ }
+ return n, err
+}
+
+// ReadFrom implements fs.FileOperations.WriteTo.
+func (rw *ReaderWriter) ReadFrom(ctx context.Context, _ *fs.File, r io.Reader, count int64) (int64, error) {
+ n, err := rw.Pipe.write(ctx, writeOps{
+ left: func() int64 {
+ return count
+ },
+ limit: func(l int64) {
+ count = l
+ },
+ write: func(buf *buffer) (int64, error) {
+ n, err := buf.WriteFromReader(r, count)
+ count -= n
+ return n, err
+ },
+ })
if n > 0 {
rw.Pipe.Notify(waiter.EventIn)
}