summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/sentry/fs/file.go23
-rw-r--r--pkg/sentry/fs/file_operations.go9
-rw-r--r--pkg/sentry/fs/file_overlay.go9
-rw-r--r--pkg/sentry/fs/fsutil/file.go6
-rw-r--r--pkg/sentry/fs/inotify.go5
-rw-r--r--pkg/sentry/fs/splice.go162
-rw-r--r--pkg/sentry/kernel/pipe/buffer.go25
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go82
-rw-r--r--pkg/sentry/kernel/pipe/reader_writer.go76
-rw-r--r--pkg/sentry/socket/epsocket/epsocket.go134
-rw-r--r--pkg/sentry/syscalls/linux/linux64.go4
-rw-r--r--pkg/sentry/syscalls/linux/sys_splice.go86
-rw-r--r--pkg/tcpip/header/udp.go5
-rw-r--r--pkg/tcpip/stack/transport_test.go4
-rw-r--r--pkg/tcpip/tcpip.go48
-rw-r--r--pkg/tcpip/transport/icmp/endpoint.go4
-rw-r--r--pkg/tcpip/transport/raw/endpoint.go7
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go68
-rw-r--r--pkg/tcpip/transport/udp/endpoint.go14
-rw-r--r--test/syscalls/linux/BUILD3
-rw-r--r--test/syscalls/linux/pipe.cc14
-rw-r--r--test/syscalls/linux/sendfile.cc69
-rw-r--r--test/syscalls/linux/splice.cc194
23 files changed, 770 insertions, 281 deletions
diff --git a/pkg/sentry/fs/file.go b/pkg/sentry/fs/file.go
index bb8117f89..c0a6e884b 100644
--- a/pkg/sentry/fs/file.go
+++ b/pkg/sentry/fs/file.go
@@ -515,6 +515,11 @@ type lockedReader struct {
// File is the file to read from.
File *File
+
+ // Offset is the offset to start at.
+ //
+ // This applies only to Read, not ReadAt.
+ Offset int64
}
// Read implements io.Reader.Read.
@@ -522,7 +527,8 @@ func (r *lockedReader) Read(buf []byte) (int, error) {
if r.Ctx.Interrupted() {
return 0, syserror.ErrInterrupted
}
- n, err := r.File.FileOperations.Read(r.Ctx, r.File, usermem.BytesIOSequence(buf), r.File.offset)
+ n, err := r.File.FileOperations.Read(r.Ctx, r.File, usermem.BytesIOSequence(buf), r.Offset)
+ r.Offset += n
return int(n), err
}
@@ -544,11 +550,21 @@ type lockedWriter struct {
// File is the file to write to.
File *File
+
+ // Offset is the offset to start at.
+ //
+ // This applies only to Write, not WriteAt.
+ Offset int64
}
// Write implements io.Writer.Write.
func (w *lockedWriter) Write(buf []byte) (int, error) {
- return w.WriteAt(buf, w.File.offset)
+ if w.Ctx.Interrupted() {
+ return 0, syserror.ErrInterrupted
+ }
+ n, err := w.WriteAt(buf, w.Offset)
+ w.Offset += int64(n)
+ return int(n), err
}
// WriteAt implements io.Writer.WriteAt.
@@ -562,6 +578,9 @@ func (w *lockedWriter) WriteAt(buf []byte, offset int64) (int, error) {
// io.Copy, since our own Write interface does not have this same
// contract. Enforce that here.
for written < len(buf) {
+ if w.Ctx.Interrupted() {
+ return written, syserror.ErrInterrupted
+ }
var n int64
n, err = w.File.FileOperations.Write(w.Ctx, w.File, usermem.BytesIOSequence(buf[written:]), offset+int64(written))
if n > 0 {
diff --git a/pkg/sentry/fs/file_operations.go b/pkg/sentry/fs/file_operations.go
index d86f5bf45..b88303f17 100644
--- a/pkg/sentry/fs/file_operations.go
+++ b/pkg/sentry/fs/file_operations.go
@@ -15,6 +15,8 @@
package fs
import (
+ "io"
+
"gvisor.dev/gvisor/pkg/sentry/arch"
"gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/sentry/memmap"
@@ -105,8 +107,11 @@ type FileOperations interface {
// on the destination, following by a buffered copy with standard Read
// and Write operations.
//
+ // If dup is set, the data should be duplicated into the destination
+ // and retained.
+ //
// The same preconditions as Read apply.
- WriteTo(ctx context.Context, file *File, dst *File, opts SpliceOpts) (int64, error)
+ WriteTo(ctx context.Context, file *File, dst io.Writer, count int64, dup bool) (int64, error)
// Write writes src to file at offset and returns the number of bytes
// written which must be greater than or equal to 0. Like Read, file
@@ -126,7 +131,7 @@ type FileOperations interface {
// source. See WriteTo for details regarding how this is called.
//
// The same preconditions as Write apply; FileFlags.Write must be set.
- ReadFrom(ctx context.Context, file *File, src *File, opts SpliceOpts) (int64, error)
+ ReadFrom(ctx context.Context, file *File, src io.Reader, count int64) (int64, error)
// Fsync writes buffered modifications of file and/or flushes in-flight
// operations to backing storage based on syncType. The range to sync is
diff --git a/pkg/sentry/fs/file_overlay.go b/pkg/sentry/fs/file_overlay.go
index 9820f0b13..225e40186 100644
--- a/pkg/sentry/fs/file_overlay.go
+++ b/pkg/sentry/fs/file_overlay.go
@@ -15,6 +15,7 @@
package fs
import (
+ "io"
"sync"
"gvisor.dev/gvisor/pkg/refs"
@@ -268,9 +269,9 @@ func (f *overlayFileOperations) Read(ctx context.Context, file *File, dst userme
}
// WriteTo implements FileOperations.WriteTo.
-func (f *overlayFileOperations) WriteTo(ctx context.Context, file *File, dst *File, opts SpliceOpts) (n int64, err error) {
+func (f *overlayFileOperations) WriteTo(ctx context.Context, file *File, dst io.Writer, count int64, dup bool) (n int64, err error) {
err = f.onTop(ctx, file, func(file *File, ops FileOperations) error {
- n, err = ops.WriteTo(ctx, file, dst, opts)
+ n, err = ops.WriteTo(ctx, file, dst, count, dup)
return err // Will overwrite itself.
})
return
@@ -285,9 +286,9 @@ func (f *overlayFileOperations) Write(ctx context.Context, file *File, src userm
}
// ReadFrom implements FileOperations.ReadFrom.
-func (f *overlayFileOperations) ReadFrom(ctx context.Context, file *File, src *File, opts SpliceOpts) (n int64, err error) {
+func (f *overlayFileOperations) ReadFrom(ctx context.Context, file *File, src io.Reader, count int64) (n int64, err error) {
// See above; f.upper must be non-nil.
- return f.upper.FileOperations.ReadFrom(ctx, f.upper, src, opts)
+ return f.upper.FileOperations.ReadFrom(ctx, f.upper, src, count)
}
// Fsync implements FileOperations.Fsync.
diff --git a/pkg/sentry/fs/fsutil/file.go b/pkg/sentry/fs/fsutil/file.go
index 626b9126a..fc5b3b1a1 100644
--- a/pkg/sentry/fs/fsutil/file.go
+++ b/pkg/sentry/fs/fsutil/file.go
@@ -15,6 +15,8 @@
package fsutil
import (
+ "io"
+
"gvisor.dev/gvisor/pkg/sentry/arch"
"gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/sentry/fs"
@@ -228,12 +230,12 @@ func (FileNoIoctl) Ioctl(context.Context, *fs.File, usermem.IO, arch.SyscallArgu
type FileNoSplice struct{}
// WriteTo implements fs.FileOperations.WriteTo.
-func (FileNoSplice) WriteTo(context.Context, *fs.File, *fs.File, fs.SpliceOpts) (int64, error) {
+func (FileNoSplice) WriteTo(context.Context, *fs.File, io.Writer, int64, bool) (int64, error) {
return 0, syserror.ENOSYS
}
// ReadFrom implements fs.FileOperations.ReadFrom.
-func (FileNoSplice) ReadFrom(context.Context, *fs.File, *fs.File, fs.SpliceOpts) (int64, error) {
+func (FileNoSplice) ReadFrom(context.Context, *fs.File, io.Reader, int64) (int64, error) {
return 0, syserror.ENOSYS
}
diff --git a/pkg/sentry/fs/inotify.go b/pkg/sentry/fs/inotify.go
index c7f4e2d13..ba3e0233d 100644
--- a/pkg/sentry/fs/inotify.go
+++ b/pkg/sentry/fs/inotify.go
@@ -15,6 +15,7 @@
package fs
import (
+ "io"
"sync"
"sync/atomic"
@@ -172,7 +173,7 @@ func (i *Inotify) Read(ctx context.Context, _ *File, dst usermem.IOSequence, _ i
}
// WriteTo implements FileOperations.WriteTo.
-func (*Inotify) WriteTo(context.Context, *File, *File, SpliceOpts) (int64, error) {
+func (*Inotify) WriteTo(context.Context, *File, io.Writer, int64, bool) (int64, error) {
return 0, syserror.ENOSYS
}
@@ -182,7 +183,7 @@ func (*Inotify) Fsync(context.Context, *File, int64, int64, SyncType) error {
}
// ReadFrom implements FileOperations.ReadFrom.
-func (*Inotify) ReadFrom(context.Context, *File, *File, SpliceOpts) (int64, error) {
+func (*Inotify) ReadFrom(context.Context, *File, io.Reader, int64) (int64, error) {
return 0, syserror.ENOSYS
}
diff --git a/pkg/sentry/fs/splice.go b/pkg/sentry/fs/splice.go
index eed1c2854..b03b7f836 100644
--- a/pkg/sentry/fs/splice.go
+++ b/pkg/sentry/fs/splice.go
@@ -18,7 +18,6 @@ import (
"io"
"sync/atomic"
- "gvisor.dev/gvisor/pkg/secio"
"gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/syserror"
)
@@ -33,146 +32,131 @@ func Splice(ctx context.Context, dst *File, src *File, opts SpliceOpts) (int64,
}
// Check whether or not the objects being sliced are stream-oriented
- // (i.e. pipes or sockets). If yes, we elide checks and offset locks.
- srcPipe := IsPipe(src.Dirent.Inode.StableAttr) || IsSocket(src.Dirent.Inode.StableAttr)
- dstPipe := IsPipe(dst.Dirent.Inode.StableAttr) || IsSocket(dst.Dirent.Inode.StableAttr)
+ // (i.e. pipes or sockets). For all stream-oriented files and files
+ // where a specific offiset is not request, we acquire the file mutex.
+ // This has two important side effects. First, it provides the standard
+ // protection against concurrent writes that would mutate the offset.
+ // Second, it prevents Splice deadlocks. Only internal anonymous files
+ // implement the ReadFrom and WriteTo methods directly, and since such
+ // anonymous files are referred to by a unique fs.File object, we know
+ // that the file mutex takes strict precedence over internal locks.
+ // Since we enforce lock ordering here, we can't deadlock by using
+ // using a file in two different splice operations simultaneously.
+ srcPipe := !IsRegular(src.Dirent.Inode.StableAttr)
+ dstPipe := !IsRegular(dst.Dirent.Inode.StableAttr)
+ dstAppend := !dstPipe && dst.Flags().Append
+ srcLock := srcPipe || !opts.SrcOffset
+ dstLock := dstPipe || !opts.DstOffset || dstAppend
- if !dstPipe && !opts.DstOffset && !srcPipe && !opts.SrcOffset {
+ switch {
+ case srcLock && dstLock:
switch {
case dst.UniqueID < src.UniqueID:
// Acquire dst first.
if !dst.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted
}
- defer dst.mu.Unlock()
if !src.mu.Lock(ctx) {
+ dst.mu.Unlock()
return 0, syserror.ErrInterrupted
}
- defer src.mu.Unlock()
case dst.UniqueID > src.UniqueID:
// Acquire src first.
if !src.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted
}
- defer src.mu.Unlock()
if !dst.mu.Lock(ctx) {
+ src.mu.Unlock()
return 0, syserror.ErrInterrupted
}
- defer dst.mu.Unlock()
case dst.UniqueID == src.UniqueID:
// Acquire only one lock; it's the same file. This is a
// bit of a edge case, but presumably it's possible.
if !dst.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted
}
- defer dst.mu.Unlock()
+ srcLock = false // Only need one unlock.
}
// Use both offsets (locked).
opts.DstStart = dst.offset
opts.SrcStart = src.offset
- } else if !dstPipe && !opts.DstOffset {
+ case dstLock:
// Acquire only dst.
if !dst.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted
}
- defer dst.mu.Unlock()
opts.DstStart = dst.offset // Safe: locked.
- } else if !srcPipe && !opts.SrcOffset {
+ case srcLock:
// Acquire only src.
if !src.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted
}
- defer src.mu.Unlock()
opts.SrcStart = src.offset // Safe: locked.
}
- // Check append-only mode and the limit.
- if !dstPipe {
+ var err error
+ if dstAppend {
unlock := dst.Dirent.Inode.lockAppendMu(dst.Flags().Append)
defer unlock()
- if dst.Flags().Append {
- if opts.DstOffset {
- // We need to acquire the lock.
- if !dst.mu.Lock(ctx) {
- return 0, syserror.ErrInterrupted
- }
- defer dst.mu.Unlock()
- }
- // Figure out the appropriate offset to use.
- if err := dst.offsetForAppend(ctx, &opts.DstStart); err != nil {
- return 0, err
- }
- }
+ // Figure out the appropriate offset to use.
+ err = dst.offsetForAppend(ctx, &opts.DstStart)
+ }
+ if err == nil && !dstPipe {
// Enforce file limits.
limit, ok := dst.checkLimit(ctx, opts.DstStart)
switch {
case ok && limit == 0:
- return 0, syserror.ErrExceedsFileSizeLimit
+ err = syserror.ErrExceedsFileSizeLimit
case ok && limit < opts.Length:
opts.Length = limit // Cap the write.
}
}
+ if err != nil {
+ if dstLock {
+ dst.mu.Unlock()
+ }
+ if srcLock {
+ src.mu.Unlock()
+ }
+ return 0, err
+ }
- // Attempt to do a WriteTo; this is likely the most efficient.
- //
- // The underlying implementation may be able to donate buffers.
- newOpts := SpliceOpts{
- Length: opts.Length,
- SrcStart: opts.SrcStart,
- SrcOffset: !srcPipe,
- Dup: opts.Dup,
- DstStart: opts.DstStart,
- DstOffset: !dstPipe,
+ // Construct readers and writers for the splice. This is used to
+ // provide a safer locking path for the WriteTo/ReadFrom operations
+ // (since they will otherwise go through public interface methods which
+ // conflict with locking done above), and simplifies the fallback path.
+ w := &lockedWriter{
+ Ctx: ctx,
+ File: dst,
+ Offset: opts.DstStart,
}
- n, err := src.FileOperations.WriteTo(ctx, src, dst, newOpts)
- if n == 0 && err != nil {
- // Attempt as a ReadFrom. If a WriteTo, a ReadFrom may also
- // be more efficient than a copy if buffers are cached or readily
- // available. (It's unlikely that they can actually be donate
- n, err = dst.FileOperations.ReadFrom(ctx, dst, src, newOpts)
+ r := &lockedReader{
+ Ctx: ctx,
+ File: src,
+ Offset: opts.SrcStart,
}
- if n == 0 && err != nil {
- // If we've failed up to here, and at least one of the sources
- // is a pipe or socket, then we can't properly support dup.
- // Return an error indicating that this operation is not
- // supported.
- if (srcPipe || dstPipe) && newOpts.Dup {
- return 0, syserror.EINVAL
- }
- // We failed to splice the files. But that's fine; we just fall
- // back to a slow path in this case. This copies without doing
- // any mode changes, so should still be more efficient.
- var (
- r io.Reader
- w io.Writer
- )
- fw := &lockedWriter{
- Ctx: ctx,
- File: dst,
- }
- if newOpts.DstOffset {
- // Use the provided offset.
- w = secio.NewOffsetWriter(fw, newOpts.DstStart)
- } else {
- // Writes will proceed with no offset.
- w = fw
- }
- fr := &lockedReader{
- Ctx: ctx,
- File: src,
- }
- if newOpts.SrcOffset {
- // Limit to the given offset and length.
- r = io.NewSectionReader(fr, opts.SrcStart, opts.Length)
- } else {
- // Limit just to the given length.
- r = &io.LimitedReader{fr, opts.Length}
- }
+ // Attempt to do a WriteTo; this is likely the most efficient.
+ n, err := src.FileOperations.WriteTo(ctx, src, w, opts.Length, opts.Dup)
+ if n == 0 && err != nil && err != syserror.ErrWouldBlock && !opts.Dup {
+ // Attempt as a ReadFrom. If a WriteTo, a ReadFrom may also be
+ // more efficient than a copy if buffers are cached or readily
+ // available. (It's unlikely that they can actually be donated).
+ n, err = dst.FileOperations.ReadFrom(ctx, dst, r, opts.Length)
+ }
- // Copy between the two.
- n, err = io.Copy(w, r)
+ // Support one last fallback option, but only if at least one of
+ // the source and destination are regular files. This is because
+ // if we block at some point, we could lose data. If the source is
+ // not a pipe then reading is not destructive; if the destination
+ // is a regular file, then it is guaranteed not to block writing.
+ if n == 0 && err != nil && err != syserror.ErrWouldBlock && !opts.Dup && (!dstPipe || !srcPipe) {
+ // Fallback to an in-kernel copy.
+ n, err = io.Copy(w, &io.LimitedReader{
+ R: r,
+ N: opts.Length,
+ })
}
// Update offsets, if required.
@@ -185,5 +169,13 @@ func Splice(ctx context.Context, dst *File, src *File, opts SpliceOpts) (int64,
}
}
+ // Drop locks.
+ if dstLock {
+ dst.mu.Unlock()
+ }
+ if srcLock {
+ src.mu.Unlock()
+ }
+
return n, err
}
diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go
index 69ef2a720..95bee2d37 100644
--- a/pkg/sentry/kernel/pipe/buffer.go
+++ b/pkg/sentry/kernel/pipe/buffer.go
@@ -15,6 +15,7 @@
package pipe
import (
+ "io"
"sync"
"gvisor.dev/gvisor/pkg/sentry/safemem"
@@ -67,6 +68,17 @@ func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) {
return n, err
}
+// WriteFromReader writes to the buffer from an io.Reader.
+func (b *buffer) WriteFromReader(r io.Reader, count int64) (int64, error) {
+ dst := b.data[b.write:]
+ if count < int64(len(dst)) {
+ dst = b.data[b.write:][:count]
+ }
+ n, err := r.Read(dst)
+ b.write += n
+ return int64(n), err
+}
+
// ReadToBlocks implements safemem.Reader.ReadToBlocks.
func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write]))
@@ -75,6 +87,19 @@ func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
return n, err
}
+// ReadToWriter reads from the buffer into an io.Writer.
+func (b *buffer) ReadToWriter(w io.Writer, count int64, dup bool) (int64, error) {
+ src := b.data[b.read:b.write]
+ if count < int64(len(src)) {
+ src = b.data[b.read:][:count]
+ }
+ n, err := w.Write(src)
+ if !dup {
+ b.read += n
+ }
+ return int64(n), err
+}
+
// bufferPool is a pool for buffers.
var bufferPool = sync.Pool{
New: func() interface{} {
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
index 247e2928e..93b50669f 100644
--- a/pkg/sentry/kernel/pipe/pipe.go
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -23,7 +23,6 @@ import (
"gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/sentry/fs"
- "gvisor.dev/gvisor/pkg/sentry/usermem"
"gvisor.dev/gvisor/pkg/syserror"
"gvisor.dev/gvisor/pkg/waiter"
)
@@ -173,13 +172,24 @@ func (p *Pipe) Open(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) *fs.F
}
}
+type readOps struct {
+ // left returns the bytes remaining.
+ left func() int64
+
+ // limit limits subsequence reads.
+ limit func(int64)
+
+ // read performs the actual read operation.
+ read func(*buffer) (int64, error)
+}
+
// read reads data from the pipe into dst and returns the number of bytes
// read, or returns ErrWouldBlock if the pipe is empty.
//
// Precondition: this pipe must have readers.
-func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
+func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) {
// Don't block for a zero-length read even if the pipe is empty.
- if dst.NumBytes() == 0 {
+ if ops.left() == 0 {
return 0, nil
}
@@ -196,12 +206,12 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
}
// Limit how much we consume.
- if dst.NumBytes() > p.size {
- dst = dst.TakeFirst64(p.size)
+ if ops.left() > p.size {
+ ops.limit(p.size)
}
done := int64(0)
- for dst.NumBytes() > 0 {
+ for ops.left() > 0 {
// Pop the first buffer.
first := p.data.Front()
if first == nil {
@@ -209,10 +219,9 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
}
// Copy user data.
- n, err := dst.CopyOutFrom(ctx, first)
+ n, err := ops.read(first)
done += int64(n)
p.size -= n
- dst = dst.DropFirst64(n)
// Empty buffer?
if first.Empty() {
@@ -230,12 +239,57 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
return done, nil
}
+// dup duplicates all data from this pipe into the given writer.
+//
+// There is no blocking behavior implemented here. The writer may propagate
+// some blocking error. All the writes must be complete writes.
+func (p *Pipe) dup(ctx context.Context, ops readOps) (int64, error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ // Is the pipe empty?
+ if p.size == 0 {
+ if !p.HasWriters() {
+ // See above.
+ return 0, nil
+ }
+ return 0, syserror.ErrWouldBlock
+ }
+
+ // Limit how much we consume.
+ if ops.left() > p.size {
+ ops.limit(p.size)
+ }
+
+ done := int64(0)
+ for buf := p.data.Front(); buf != nil; buf = buf.Next() {
+ n, err := ops.read(buf)
+ done += n
+ if err != nil {
+ return done, err
+ }
+ }
+
+ return done, nil
+}
+
+type writeOps struct {
+ // left returns the bytes remaining.
+ left func() int64
+
+ // limit should limit subsequent writes.
+ limit func(int64)
+
+ // write should write to the provided buffer.
+ write func(*buffer) (int64, error)
+}
+
// write writes data from sv into the pipe and returns the number of bytes
// written. If no bytes are written because the pipe is full (or has less than
// atomicIOBytes free capacity), write returns ErrWouldBlock.
//
// Precondition: this pipe must have writers.
-func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
+func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
@@ -246,17 +300,16 @@ func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error)
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
// atomic, but requires no atomicity for writes larger than this.
- wanted := src.NumBytes()
+ wanted := ops.left()
if avail := p.max - p.size; wanted > avail {
if wanted <= p.atomicIOBytes {
return 0, syserror.ErrWouldBlock
}
- // Limit to the available capacity.
- src = src.TakeFirst64(avail)
+ ops.limit(avail)
}
done := int64(0)
- for src.NumBytes() > 0 {
+ for ops.left() > 0 {
// Need a new buffer?
last := p.data.Back()
if last == nil || last.Full() {
@@ -266,10 +319,9 @@ func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error)
}
// Copy user data.
- n, err := src.CopyInTo(ctx, last)
+ n, err := ops.write(last)
done += int64(n)
p.size += n
- src = src.DropFirst64(n)
// Handle errors.
if err != nil {
diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go
index f69dbf27b..7c307f013 100644
--- a/pkg/sentry/kernel/pipe/reader_writer.go
+++ b/pkg/sentry/kernel/pipe/reader_writer.go
@@ -15,6 +15,7 @@
package pipe
import (
+ "io"
"math"
"syscall"
@@ -55,7 +56,45 @@ func (rw *ReaderWriter) Release() {
// Read implements fs.FileOperations.Read.
func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) {
- n, err := rw.Pipe.read(ctx, dst)
+ n, err := rw.Pipe.read(ctx, readOps{
+ left: func() int64 {
+ return dst.NumBytes()
+ },
+ limit: func(l int64) {
+ dst = dst.TakeFirst64(l)
+ },
+ read: func(buf *buffer) (int64, error) {
+ n, err := dst.CopyOutFrom(ctx, buf)
+ dst = dst.DropFirst64(n)
+ return n, err
+ },
+ })
+ if n > 0 {
+ rw.Pipe.Notify(waiter.EventOut)
+ }
+ return n, err
+}
+
+// WriteTo implements fs.FileOperations.WriteTo.
+func (rw *ReaderWriter) WriteTo(ctx context.Context, _ *fs.File, w io.Writer, count int64, dup bool) (int64, error) {
+ ops := readOps{
+ left: func() int64 {
+ return count
+ },
+ limit: func(l int64) {
+ count = l
+ },
+ read: func(buf *buffer) (int64, error) {
+ n, err := buf.ReadToWriter(w, count, dup)
+ count -= n
+ return n, err
+ },
+ }
+ if dup {
+ // There is no notification for dup operations.
+ return rw.Pipe.dup(ctx, ops)
+ }
+ n, err := rw.Pipe.read(ctx, ops)
if n > 0 {
rw.Pipe.Notify(waiter.EventOut)
}
@@ -64,7 +103,40 @@ func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequ
// Write implements fs.FileOperations.Write.
func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) {
- n, err := rw.Pipe.write(ctx, src)
+ n, err := rw.Pipe.write(ctx, writeOps{
+ left: func() int64 {
+ return src.NumBytes()
+ },
+ limit: func(l int64) {
+ src = src.TakeFirst64(l)
+ },
+ write: func(buf *buffer) (int64, error) {
+ n, err := src.CopyInTo(ctx, buf)
+ src = src.DropFirst64(n)
+ return n, err
+ },
+ })
+ if n > 0 {
+ rw.Pipe.Notify(waiter.EventIn)
+ }
+ return n, err
+}
+
+// ReadFrom implements fs.FileOperations.WriteTo.
+func (rw *ReaderWriter) ReadFrom(ctx context.Context, _ *fs.File, r io.Reader, count int64) (int64, error) {
+ n, err := rw.Pipe.write(ctx, writeOps{
+ left: func() int64 {
+ return count
+ },
+ limit: func(l int64) {
+ count = l
+ },
+ write: func(buf *buffer) (int64, error) {
+ n, err := buf.WriteFromReader(r, count)
+ count -= n
+ return n, err
+ },
+ })
if n > 0 {
rw.Pipe.Notify(waiter.EventIn)
}
diff --git a/pkg/sentry/socket/epsocket/epsocket.go b/pkg/sentry/socket/epsocket/epsocket.go
index 0e37ce61b..3e05e40fe 100644
--- a/pkg/sentry/socket/epsocket/epsocket.go
+++ b/pkg/sentry/socket/epsocket/epsocket.go
@@ -26,6 +26,7 @@ package epsocket
import (
"bytes"
+ "io"
"math"
"reflect"
"sync"
@@ -227,7 +228,6 @@ type SocketOperations struct {
fsutil.FileNoopFlush `state:"nosave"`
fsutil.FileNoFsync `state:"nosave"`
fsutil.FileNoMMap `state:"nosave"`
- fsutil.FileNoSplice `state:"nosave"`
fsutil.FileUseInodeUnstableAttr `state:"nosave"`
socket.SendReceiveTimeout
*waiter.Queue
@@ -412,17 +412,58 @@ func (s *SocketOperations) Read(ctx context.Context, _ *fs.File, dst usermem.IOS
return int64(n), nil
}
-// ioSequencePayload implements tcpip.Payload. It copies user memory bytes on demand
-// based on the requested size.
+// WriteTo implements fs.FileOperations.WriteTo.
+func (s *SocketOperations) WriteTo(ctx context.Context, _ *fs.File, dst io.Writer, count int64, dup bool) (int64, error) {
+ s.readMu.Lock()
+ defer s.readMu.Unlock()
+
+ // Copy as much data as possible.
+ done := int64(0)
+ for count > 0 {
+ // This may return a blocking error.
+ if err := s.fetchReadView(); err != nil {
+ return done, err.ToError()
+ }
+
+ // Write to the underlying file.
+ n, err := dst.Write(s.readView)
+ done += int64(n)
+ count -= int64(n)
+ if dup {
+ // That's all we support for dup. This is generally
+ // supported by any Linux system calls, but the
+ // expectation is that now a caller will call read to
+ // actually remove these bytes from the socket.
+ return done, nil
+ }
+
+ // Drop that part of the view.
+ s.readView.TrimFront(n)
+ if err != nil {
+ return done, err
+ }
+ }
+
+ return done, nil
+}
+
+// ioSequencePayload implements tcpip.Payload.
+//
+// t copies user memory bytes on demand based on the requested size.
type ioSequencePayload struct {
ctx context.Context
src usermem.IOSequence
}
-// Get implements tcpip.Payload.
-func (i *ioSequencePayload) Get(size int) ([]byte, *tcpip.Error) {
- if size > i.Size() {
- size = i.Size()
+// FullPayload implements tcpip.Payloader.FullPayload
+func (i *ioSequencePayload) FullPayload() ([]byte, *tcpip.Error) {
+ return i.Payload(int(i.src.NumBytes()))
+}
+
+// Payload implements tcpip.Payloader.Payload.
+func (i *ioSequencePayload) Payload(size int) ([]byte, *tcpip.Error) {
+ if max := int(i.src.NumBytes()); size > max {
+ size = max
}
v := buffer.NewView(size)
if _, err := i.src.CopyIn(i.ctx, v); err != nil {
@@ -431,11 +472,6 @@ func (i *ioSequencePayload) Get(size int) ([]byte, *tcpip.Error) {
return v, nil
}
-// Size implements tcpip.Payload.
-func (i *ioSequencePayload) Size() int {
- return int(i.src.NumBytes())
-}
-
// DropFirst drops the first n bytes from underlying src.
func (i *ioSequencePayload) DropFirst(n int) {
i.src = i.src.DropFirst(int(n))
@@ -469,6 +505,76 @@ func (s *SocketOperations) Write(ctx context.Context, _ *fs.File, src usermem.IO
return int64(n), nil
}
+// readerPayload implements tcpip.Payloader.
+//
+// It allocates a view and reads from a reader on-demand, based on available
+// capacity in the endpoint.
+type readerPayload struct {
+ ctx context.Context
+ r io.Reader
+ count int64
+ err error
+}
+
+// FullPayload implements tcpip.Payloader.FullPayload.
+func (r *readerPayload) FullPayload() ([]byte, *tcpip.Error) {
+ return r.Payload(int(r.count))
+}
+
+// Payload implements tcpip.Payloader.Payload.
+func (r *readerPayload) Payload(size int) ([]byte, *tcpip.Error) {
+ if size > int(r.count) {
+ size = int(r.count)
+ }
+ v := buffer.NewView(size)
+ n, err := r.r.Read(v)
+ if n > 0 {
+ // We ignore the error here. It may re-occur on subsequent
+ // reads, but for now we can enqueue some amount of data.
+ r.count -= int64(n)
+ return v[:n], nil
+ }
+ if err == syserror.ErrWouldBlock {
+ return nil, tcpip.ErrWouldBlock
+ } else if err != nil {
+ r.err = err // Save for propation.
+ return nil, tcpip.ErrBadAddress
+ }
+
+ // There is no data and no error. Return an error, which will propagate
+ // r.err, which will be nil. This is the desired result: (0, nil).
+ return nil, tcpip.ErrBadAddress
+}
+
+// ReadFrom implements fs.FileOperations.ReadFrom.
+func (s *SocketOperations) ReadFrom(ctx context.Context, _ *fs.File, r io.Reader, count int64) (int64, error) {
+ f := &readerPayload{ctx: ctx, r: r, count: count}
+ n, resCh, err := s.Endpoint.Write(f, tcpip.WriteOptions{})
+ if err == tcpip.ErrWouldBlock {
+ return 0, syserror.ErrWouldBlock
+ }
+
+ if resCh != nil {
+ t := ctx.(*kernel.Task)
+ if err := t.Block(resCh); err != nil {
+ return 0, syserr.FromError(err).ToError()
+ }
+
+ n, _, err = s.Endpoint.Write(f, tcpip.WriteOptions{
+ // Reads may be destructive but should be very fast,
+ // so we can't release the lock while copying data.
+ Atomic: true,
+ })
+ }
+ if err == tcpip.ErrWouldBlock {
+ return n, syserror.ErrWouldBlock
+ } else if err != nil {
+ return int64(n), f.err // Propagate error.
+ }
+
+ return int64(n), nil
+}
+
// Readiness returns a mask of ready events for socket s.
func (s *SocketOperations) Readiness(mask waiter.EventMask) waiter.EventMask {
r := s.Endpoint.Readiness(mask)
@@ -2060,7 +2166,7 @@ func (s *SocketOperations) SendMsg(t *kernel.Task, src usermem.IOSequence, to []
n, _, err = s.Endpoint.Write(v, opts)
}
dontWait := flags&linux.MSG_DONTWAIT != 0
- if err == nil && (n >= int64(v.Size()) || dontWait) {
+ if err == nil && (n >= v.src.NumBytes() || dontWait) {
// Complete write.
return int(n), nil
}
@@ -2085,7 +2191,7 @@ func (s *SocketOperations) SendMsg(t *kernel.Task, src usermem.IOSequence, to []
return 0, syserr.TranslateNetstackError(err)
}
- if err == nil && v.Size() == 0 || err != nil && err != tcpip.ErrWouldBlock {
+ if err == nil && v.src.NumBytes() == 0 || err != nil && err != tcpip.ErrWouldBlock {
return int(total), nil
}
diff --git a/pkg/sentry/syscalls/linux/linux64.go b/pkg/sentry/syscalls/linux/linux64.go
index ed996ba51..150999fb8 100644
--- a/pkg/sentry/syscalls/linux/linux64.go
+++ b/pkg/sentry/syscalls/linux/linux64.go
@@ -320,8 +320,8 @@ var AMD64 = &kernel.SyscallTable{
272: syscalls.PartiallySupported("unshare", Unshare, "Mount, cgroup namespaces not supported. Network namespaces supported but must be empty.", nil),
273: syscalls.Error("set_robust_list", syserror.ENOSYS, "Obsolete.", nil),
274: syscalls.Error("get_robust_list", syserror.ENOSYS, "Obsolete.", nil),
- 275: syscalls.PartiallySupported("splice", Splice, "Stub implementation.", []string{"gvisor.dev/issue/138"}), // TODO(b/29354098)
- 276: syscalls.ErrorWithEvent("tee", syserror.ENOSYS, "", []string{"gvisor.dev/issue/138"}), // TODO(b/29354098)
+ 275: syscalls.Supported("splice", Splice),
+ 276: syscalls.Supported("tee", Tee),
277: syscalls.PartiallySupported("sync_file_range", SyncFileRange, "Full data flush is not guaranteed at this time.", nil),
278: syscalls.ErrorWithEvent("vmsplice", syserror.ENOSYS, "", []string{"gvisor.dev/issue/138"}), // TODO(b/29354098)
279: syscalls.CapError("move_pages", linux.CAP_SYS_NICE, "", nil), // requires cap_sys_nice (mostly)
diff --git a/pkg/sentry/syscalls/linux/sys_splice.go b/pkg/sentry/syscalls/linux/sys_splice.go
index 8a98fedcb..f0a292f2f 100644
--- a/pkg/sentry/syscalls/linux/sys_splice.go
+++ b/pkg/sentry/syscalls/linux/sys_splice.go
@@ -29,9 +29,8 @@ func doSplice(t *kernel.Task, outFile, inFile *fs.File, opts fs.SpliceOpts, nonB
total int64
n int64
err error
- ch chan struct{}
- inW bool
- outW bool
+ inCh chan struct{}
+ outCh chan struct{}
)
for opts.Length > 0 {
n, err = fs.Splice(t, outFile, inFile, opts)
@@ -43,35 +42,33 @@ func doSplice(t *kernel.Task, outFile, inFile *fs.File, opts fs.SpliceOpts, nonB
break
}
- // Are we a registered waiter?
- if ch == nil {
- ch = make(chan struct{}, 1)
- }
- if !inW && !inFile.Flags().NonBlocking {
- w, _ := waiter.NewChannelEntry(ch)
- inFile.EventRegister(&w, EventMaskRead)
- defer inFile.EventUnregister(&w)
- inW = true // Registered.
- } else if !outW && !outFile.Flags().NonBlocking {
- w, _ := waiter.NewChannelEntry(ch)
- outFile.EventRegister(&w, EventMaskWrite)
- defer outFile.EventUnregister(&w)
- outW = true // Registered.
- }
-
- // Was anything registered? If no, everything is non-blocking.
- if !inW && !outW {
- break
- }
-
- if (!inW || inFile.Readiness(EventMaskRead) != 0) && (!outW || outFile.Readiness(EventMaskWrite) != 0) {
- // Something became ready, try again without blocking.
- continue
+ // Note that the blocking behavior here is a bit different than the
+ // normal pattern. Because we need to have both data to read and data
+ // to write simultaneously, we actually explicitly block on both of
+ // these cases in turn before returning to the splice operation.
+ if inFile.Readiness(EventMaskRead) == 0 {
+ if inCh == nil {
+ inCh = make(chan struct{}, 1)
+ inW, _ := waiter.NewChannelEntry(inCh)
+ inFile.EventRegister(&inW, EventMaskRead)
+ defer inFile.EventUnregister(&inW)
+ continue // Need to refresh readiness.
+ }
+ if err = t.Block(inCh); err != nil {
+ break
+ }
}
-
- // Block until there's data.
- if err = t.Block(ch); err != nil {
- break
+ if outFile.Readiness(EventMaskWrite) == 0 {
+ if outCh == nil {
+ outCh = make(chan struct{}, 1)
+ outW, _ := waiter.NewChannelEntry(outCh)
+ outFile.EventRegister(&outW, EventMaskWrite)
+ defer outFile.EventUnregister(&outW)
+ continue // Need to refresh readiness.
+ }
+ if err = t.Block(outCh); err != nil {
+ break
+ }
}
}
@@ -149,7 +146,7 @@ func Sendfile(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Sysc
Length: count,
SrcOffset: true,
SrcStart: offset,
- }, false)
+ }, outFile.Flags().NonBlocking)
// Copy out the new offset.
if _, err := t.CopyOut(offsetAddr, n+offset); err != nil {
@@ -159,7 +156,7 @@ func Sendfile(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Sysc
// Send data using splice.
n, err = doSplice(t, outFile, inFile, fs.SpliceOpts{
Length: count,
- }, false)
+ }, outFile.Flags().NonBlocking)
}
// We can only pass a single file to handleIOError, so pick inFile
@@ -181,12 +178,6 @@ func Splice(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Syscal
return 0, nil, syserror.EINVAL
}
- // Only non-blocking is meaningful. Note that unlike in Linux, this
- // flag is applied consistently. We will have either fully blocking or
- // non-blocking behavior below, regardless of the underlying files
- // being spliced to. It's unclear if this is a bug or not yet.
- nonBlocking := (flags & linux.SPLICE_F_NONBLOCK) != 0
-
// Get files.
outFile := t.GetFile(outFD)
if outFile == nil {
@@ -200,6 +191,13 @@ func Splice(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Syscal
}
defer inFile.DecRef()
+ // The operation is non-blocking if anything is non-blocking.
+ //
+ // N.B. This is a rather simplistic heuristic that avoids some
+ // poor edge case behavior since the exact semantics here are
+ // underspecified and vary between versions of Linux itself.
+ nonBlock := inFile.Flags().NonBlocking || outFile.Flags().NonBlocking || (flags&linux.SPLICE_F_NONBLOCK != 0)
+
// Construct our options.
//
// Note that exactly one of the underlying buffers must be a pipe. We
@@ -257,7 +255,7 @@ func Splice(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Syscal
}
// Splice data.
- n, err := doSplice(t, outFile, inFile, opts, nonBlocking)
+ n, err := doSplice(t, outFile, inFile, opts, nonBlock)
// See above; inFile is chosen arbitrarily here.
return uintptr(n), nil, handleIOError(t, n != 0, err, kernel.ERESTARTSYS, "splice", inFile)
@@ -275,9 +273,6 @@ func Tee(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.SyscallCo
return 0, nil, syserror.EINVAL
}
- // Only non-blocking is meaningful.
- nonBlocking := (flags & linux.SPLICE_F_NONBLOCK) != 0
-
// Get files.
outFile := t.GetFile(outFD)
if outFile == nil {
@@ -301,11 +296,14 @@ func Tee(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.SyscallCo
return 0, nil, syserror.EINVAL
}
+ // The operation is non-blocking if anything is non-blocking.
+ nonBlock := inFile.Flags().NonBlocking || outFile.Flags().NonBlocking || (flags&linux.SPLICE_F_NONBLOCK != 0)
+
// Splice data.
n, err := doSplice(t, outFile, inFile, fs.SpliceOpts{
Length: count,
Dup: true,
- }, nonBlocking)
+ }, nonBlock)
// See above; inFile is chosen arbitrarily here.
return uintptr(n), nil, handleIOError(t, n != 0, err, kernel.ERESTARTSYS, "tee", inFile)
diff --git a/pkg/tcpip/header/udp.go b/pkg/tcpip/header/udp.go
index c1f454805..74412c894 100644
--- a/pkg/tcpip/header/udp.go
+++ b/pkg/tcpip/header/udp.go
@@ -27,6 +27,11 @@ const (
udpChecksum = 6
)
+const (
+ // UDPMaximumPacketSize is the largest possible UDP packet.
+ UDPMaximumPacketSize = 0xffff
+)
+
// UDPFields contains the fields of a UDP packet. It is used to describe the
// fields of a packet that needs to be encoded.
type UDPFields struct {
diff --git a/pkg/tcpip/stack/transport_test.go b/pkg/tcpip/stack/transport_test.go
index 87d1e0d0d..847d02982 100644
--- a/pkg/tcpip/stack/transport_test.go
+++ b/pkg/tcpip/stack/transport_test.go
@@ -65,13 +65,13 @@ func (*fakeTransportEndpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.Contr
return buffer.View{}, tcpip.ControlMessages{}, nil
}
-func (f *fakeTransportEndpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
+func (f *fakeTransportEndpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
if len(f.route.RemoteAddress) == 0 {
return 0, nil, tcpip.ErrNoRoute
}
hdr := buffer.NewPrependable(int(f.route.MaxHeaderLength()))
- v, err := p.Get(p.Size())
+ v, err := p.FullPayload()
if err != nil {
return 0, nil, err
}
diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go
index ebf8a2d04..2534069ab 100644
--- a/pkg/tcpip/tcpip.go
+++ b/pkg/tcpip/tcpip.go
@@ -261,31 +261,34 @@ type FullAddress struct {
Port uint16
}
-// Payload provides an interface around data that is being sent to an endpoint.
-// This allows the endpoint to request the amount of data it needs based on
-// internal buffers without exposing them. 'p.Get(p.Size())' reads all the data.
-type Payload interface {
- // Get returns a slice containing exactly 'min(size, p.Size())' bytes.
- Get(size int) ([]byte, *Error)
-
- // Size returns the payload size.
- Size() int
+// Payloader is an interface that provides data.
+//
+// This interface allows the endpoint to request the amount of data it needs
+// based on internal buffers without exposing them.
+type Payloader interface {
+ // FullPayload returns all available bytes.
+ FullPayload() ([]byte, *Error)
+
+ // Payload returns a slice containing at most size bytes.
+ Payload(size int) ([]byte, *Error)
}
-// SlicePayload implements Payload on top of slices for convenience.
+// SlicePayload implements Payloader for slices.
+//
+// This is typically used for tests.
type SlicePayload []byte
-// Get implements Payload.
-func (s SlicePayload) Get(size int) ([]byte, *Error) {
- if size > s.Size() {
- size = s.Size()
- }
- return s[:size], nil
+// FullPayload implements Payloader.FullPayload.
+func (s SlicePayload) FullPayload() ([]byte, *Error) {
+ return s, nil
}
-// Size implements Payload.
-func (s SlicePayload) Size() int {
- return len(s)
+// Payload implements Payloader.Payload.
+func (s SlicePayload) Payload(size int) ([]byte, *Error) {
+ if size > len(s) {
+ size = len(s)
+ }
+ return s[:size], nil
}
// A ControlMessages contains socket control messages for IP sockets.
@@ -338,7 +341,7 @@ type Endpoint interface {
// ErrNoLinkAddress and a notification channel is returned for the caller to
// block. Channel is closed once address resolution is complete (success or
// not). The channel is only non-nil in this case.
- Write(Payload, WriteOptions) (int64, <-chan struct{}, *Error)
+ Write(Payloader, WriteOptions) (int64, <-chan struct{}, *Error)
// Peek reads data without consuming it from the endpoint.
//
@@ -432,6 +435,11 @@ type WriteOptions struct {
// EndOfRecord has the same semantics as Linux's MSG_EOR.
EndOfRecord bool
+
+ // Atomic means that all data fetched from Payloader must be written to the
+ // endpoint. If Atomic is false, then data fetched from the Payloader may be
+ // discarded if available endpoint buffer space is unsufficient.
+ Atomic bool
}
// SockOpt represents socket options which values have the int type.
diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go
index e1f622af6..3db060384 100644
--- a/pkg/tcpip/transport/icmp/endpoint.go
+++ b/pkg/tcpip/transport/icmp/endpoint.go
@@ -204,7 +204,7 @@ func (e *endpoint) prepareForWrite(to *tcpip.FullAddress) (retry bool, err *tcpi
// Write writes data to the endpoint's peer. This method does not block
// if the data cannot be written.
-func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
+func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
// MSG_MORE is unimplemented. (This also means that MSG_EOR is a no-op.)
if opts.More {
return 0, nil, tcpip.ErrInvalidOptionValue
@@ -289,7 +289,7 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-cha
}
}
- v, err := p.Get(p.Size())
+ v, err := p.FullPayload()
if err != nil {
return 0, nil, err
}
diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go
index 13e17e2a6..cf1c5c433 100644
--- a/pkg/tcpip/transport/raw/endpoint.go
+++ b/pkg/tcpip/transport/raw/endpoint.go
@@ -207,7 +207,7 @@ func (ep *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMes
}
// Write implements tcpip.Endpoint.Write.
-func (ep *endpoint) Write(payload tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
+func (ep *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
// MSG_MORE is unimplemented. This also means that MSG_EOR is a no-op.
if opts.More {
return 0, nil, tcpip.ErrInvalidOptionValue
@@ -220,9 +220,8 @@ func (ep *endpoint) Write(payload tcpip.Payload, opts tcpip.WriteOptions) (int64
return 0, nil, tcpip.ErrInvalidEndpointState
}
- payloadBytes, err := payload.Get(payload.Size())
+ payloadBytes, err := p.FullPayload()
if err != nil {
- ep.mu.RUnlock()
return 0, nil, err
}
@@ -230,7 +229,7 @@ func (ep *endpoint) Write(payload tcpip.Payload, opts tcpip.WriteOptions) (int64
// destination address, route using that address.
if !ep.associated {
ip := header.IPv4(payloadBytes)
- if !ip.IsValid(payload.Size()) {
+ if !ip.IsValid(len(payloadBytes)) {
ep.mu.RUnlock()
return 0, nil, tcpip.ErrInvalidOptionValue
}
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index ac927569a..dd931f88c 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -806,7 +806,7 @@ func (e *endpoint) isEndpointWritableLocked() (int, *tcpip.Error) {
}
// Write writes data to the endpoint's peer.
-func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
+func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
// Linux completely ignores any address passed to sendto(2) for TCP sockets
// (without the MSG_FASTOPEN flag). Corking is unimplemented, so opts.More
// and opts.EndOfRecord are also ignored.
@@ -821,47 +821,52 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-cha
return 0, nil, err
}
- e.sndBufMu.Unlock()
- e.mu.RUnlock()
-
- // Nothing to do if the buffer is empty.
- if p.Size() == 0 {
- return 0, nil, nil
+ // We can release locks while copying data.
+ //
+ // This is not possible if atomic is set, because we can't allow the
+ // available buffer space to be consumed by some other caller while we
+ // are copying data in.
+ if !opts.Atomic {
+ e.sndBufMu.Unlock()
+ e.mu.RUnlock()
}
- // Copy in memory without holding sndBufMu so that worker goroutine can
- // make progress independent of this operation.
- v, perr := p.Get(avail)
- if perr != nil {
+ // Fetch data.
+ v, perr := p.Payload(avail)
+ if perr != nil || len(v) == 0 {
+ if opts.Atomic { // See above.
+ e.sndBufMu.Unlock()
+ e.mu.RUnlock()
+ }
+ // Note that perr may be nil if len(v) == 0.
return 0, nil, perr
}
- e.mu.RLock()
- e.sndBufMu.Lock()
+ if !opts.Atomic { // See above.
+ e.mu.RLock()
+ e.sndBufMu.Lock()
- // Because we released the lock before copying, check state again
- // to make sure the endpoint is still in a valid state for a
- // write.
- avail, err = e.isEndpointWritableLocked()
- if err != nil {
- e.sndBufMu.Unlock()
- e.mu.RUnlock()
- return 0, nil, err
- }
+ // Because we released the lock before copying, check state again
+ // to make sure the endpoint is still in a valid state for a write.
+ avail, err = e.isEndpointWritableLocked()
+ if err != nil {
+ e.sndBufMu.Unlock()
+ e.mu.RUnlock()
+ return 0, nil, err
+ }
- // Discard any excess data copied in due to avail being reduced due to a
- // simultaneous write call to the socket.
- if avail < len(v) {
- v = v[:avail]
+ // Discard any excess data copied in due to avail being reduced due
+ // to a simultaneous write call to the socket.
+ if avail < len(v) {
+ v = v[:avail]
+ }
}
// Add data to the send queue.
- l := len(v)
s := newSegmentFromView(&e.route, e.id, v)
- e.sndBufUsed += l
- e.sndBufInQueue += seqnum.Size(l)
+ e.sndBufUsed += len(v)
+ e.sndBufInQueue += seqnum.Size(len(v))
e.sndQueue.PushBack(s)
-
e.sndBufMu.Unlock()
// Release the endpoint lock to prevent deadlocks due to lock
// order inversion when acquiring workMu.
@@ -875,7 +880,8 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-cha
// Let the protocol goroutine do the work.
e.sndWaker.Assert()
}
- return int64(l), nil, nil
+
+ return int64(len(v)), nil, nil
}
// Peek reads data without consuming it from the endpoint.
diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go
index dccb9a7eb..6ac7c067a 100644
--- a/pkg/tcpip/transport/udp/endpoint.go
+++ b/pkg/tcpip/transport/udp/endpoint.go
@@ -15,7 +15,6 @@
package udp
import (
- "math"
"sync"
"gvisor.dev/gvisor/pkg/tcpip"
@@ -277,17 +276,12 @@ func (e *endpoint) connectRoute(nicid tcpip.NICID, addr tcpip.FullAddress, netPr
// Write writes data to the endpoint's peer. This method does not block
// if the data cannot be written.
-func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
+func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
// MSG_MORE is unimplemented. (This also means that MSG_EOR is a no-op.)
if opts.More {
return 0, nil, tcpip.ErrInvalidOptionValue
}
- if p.Size() > math.MaxUint16 {
- // Payload can't possibly fit in a packet.
- return 0, nil, tcpip.ErrMessageTooLong
- }
-
to := opts.To
e.mu.RLock()
@@ -370,10 +364,14 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-cha
}
}
- v, err := p.Get(p.Size())
+ v, err := p.FullPayload()
if err != nil {
return 0, nil, err
}
+ if len(v) > header.UDPMaximumPacketSize {
+ // Payload can't possibly fit in a packet.
+ return 0, nil, tcpip.ErrMessageTooLong
+ }
ttl := route.DefaultTTL()
if header.IsV4MulticastAddress(route.RemoteAddress) || header.IsV6MulticastAddress(route.RemoteAddress) {
diff --git a/test/syscalls/linux/BUILD b/test/syscalls/linux/BUILD
index 34057e3d0..df00d2c14 100644
--- a/test/syscalls/linux/BUILD
+++ b/test/syscalls/linux/BUILD
@@ -1867,7 +1867,9 @@ cc_binary(
"//test/util:temp_path",
"//test/util:test_main",
"//test/util:test_util",
+ "//test/util:thread_util",
"@com_google_absl//absl/strings",
+ "@com_google_absl//absl/time",
"@com_google_googletest//:gtest",
],
)
@@ -1901,6 +1903,7 @@ cc_binary(
"//test/util:test_util",
"//test/util:thread_util",
"@com_google_absl//absl/strings",
+ "@com_google_absl//absl/time",
"@com_google_googletest//:gtest",
],
)
diff --git a/test/syscalls/linux/pipe.cc b/test/syscalls/linux/pipe.cc
index 65afb90f3..10e2a6dfc 100644
--- a/test/syscalls/linux/pipe.cc
+++ b/test/syscalls/linux/pipe.cc
@@ -168,6 +168,20 @@ TEST_P(PipeTest, Write) {
EXPECT_EQ(wbuf, rbuf);
}
+TEST_P(PipeTest, WritePage) {
+ SKIP_IF(!CreateBlocking());
+
+ std::vector<char> wbuf(kPageSize);
+ RandomizeBuffer(wbuf.data(), wbuf.size());
+ std::vector<char> rbuf(wbuf.size());
+
+ ASSERT_THAT(write(wfd_.get(), wbuf.data(), wbuf.size()),
+ SyscallSucceedsWithValue(wbuf.size()));
+ ASSERT_THAT(read(rfd_.get(), rbuf.data(), rbuf.size()),
+ SyscallSucceedsWithValue(rbuf.size()));
+ EXPECT_EQ(memcmp(rbuf.data(), wbuf.data(), wbuf.size()), 0);
+}
+
TEST_P(PipeTest, NonBlocking) {
SKIP_IF(!CreateNonBlocking());
diff --git a/test/syscalls/linux/sendfile.cc b/test/syscalls/linux/sendfile.cc
index 9167ab066..4502e7fb4 100644
--- a/test/syscalls/linux/sendfile.cc
+++ b/test/syscalls/linux/sendfile.cc
@@ -19,9 +19,12 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "absl/strings/string_view.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
#include "test/util/file_descriptor.h"
#include "test/util/temp_path.h"
#include "test/util/test_util.h"
+#include "test/util/thread_util.h"
namespace gvisor {
namespace testing {
@@ -442,6 +445,72 @@ TEST(SendFileTest, SendToNotARegularFile) {
EXPECT_THAT(sendfile(outf.get(), inf.get(), nullptr, 0),
SyscallFailsWithErrno(EINVAL));
}
+
+TEST(SendFileTest, SendPipeWouldBlock) {
+ // Create temp file.
+ constexpr char kData[] =
+ "The fool doth think he is wise, but the wise man knows himself to be a "
+ "fool.";
+ constexpr int kDataSize = sizeof(kData) - 1;
+ const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFileWith(
+ GetAbsoluteTestTmpdir(), kData, TempPath::kDefaultFileMode));
+
+ // Open the input file as read only.
+ const FileDescriptor inf =
+ ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDONLY));
+
+ // Setup the output named pipe.
+ int fds[2];
+ ASSERT_THAT(pipe2(fds, O_NONBLOCK), SyscallSucceeds());
+ const FileDescriptor rfd(fds[0]);
+ const FileDescriptor wfd(fds[1]);
+
+ // Fill up the pipe's buffer.
+ int pipe_size = -1;
+ ASSERT_THAT(pipe_size = fcntl(wfd.get(), F_GETPIPE_SZ), SyscallSucceeds());
+ std::vector<char> buf(2 * pipe_size);
+ ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(pipe_size));
+
+ EXPECT_THAT(sendfile(wfd.get(), inf.get(), nullptr, kDataSize),
+ SyscallFailsWithErrno(EWOULDBLOCK));
+}
+
+TEST(SendFileTest, SendPipeBlocks) {
+ // Create temp file.
+ constexpr char kData[] =
+ "The fault, dear Brutus, is not in our stars, but in ourselves.";
+ constexpr int kDataSize = sizeof(kData) - 1;
+ const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFileWith(
+ GetAbsoluteTestTmpdir(), kData, TempPath::kDefaultFileMode));
+
+ // Open the input file as read only.
+ const FileDescriptor inf =
+ ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDONLY));
+
+ // Setup the output named pipe.
+ int fds[2];
+ ASSERT_THAT(pipe(fds), SyscallSucceeds());
+ const FileDescriptor rfd(fds[0]);
+ const FileDescriptor wfd(fds[1]);
+
+ // Fill up the pipe's buffer.
+ int pipe_size = -1;
+ ASSERT_THAT(pipe_size = fcntl(wfd.get(), F_GETPIPE_SZ), SyscallSucceeds());
+ std::vector<char> buf(pipe_size);
+ ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(pipe_size));
+
+ ScopedThread t([&]() {
+ absl::SleepFor(absl::Milliseconds(100));
+ ASSERT_THAT(read(rfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(pipe_size));
+ });
+
+ EXPECT_THAT(sendfile(wfd.get(), inf.get(), nullptr, kDataSize),
+ SyscallSucceedsWithValue(kDataSize));
+}
+
} // namespace
} // namespace testing
diff --git a/test/syscalls/linux/splice.cc b/test/syscalls/linux/splice.cc
index e25f264f6..85232cb1f 100644
--- a/test/syscalls/linux/splice.cc
+++ b/test/syscalls/linux/splice.cc
@@ -14,12 +14,16 @@
#include <fcntl.h>
#include <sys/eventfd.h>
+#include <sys/resource.h>
#include <sys/sendfile.h>
+#include <sys/time.h>
#include <unistd.h>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "absl/strings/string_view.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
#include "test/util/file_descriptor.h"
#include "test/util/temp_path.h"
#include "test/util/test_util.h"
@@ -36,23 +40,23 @@ TEST(SpliceTest, TwoRegularFiles) {
const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
// Open the input file as read only.
- const FileDescriptor inf =
+ const FileDescriptor in_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDONLY));
// Open the output file as write only.
- const FileDescriptor outf =
+ const FileDescriptor out_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_WRONLY));
// Verify that it is rejected as expected; regardless of offsets.
loff_t in_offset = 0;
loff_t out_offset = 0;
- EXPECT_THAT(splice(inf.get(), &in_offset, outf.get(), &out_offset, 1, 0),
+ EXPECT_THAT(splice(in_fd.get(), &in_offset, out_fd.get(), &out_offset, 1, 0),
SyscallFailsWithErrno(EINVAL));
- EXPECT_THAT(splice(inf.get(), nullptr, outf.get(), &out_offset, 1, 0),
+ EXPECT_THAT(splice(in_fd.get(), nullptr, out_fd.get(), &out_offset, 1, 0),
SyscallFailsWithErrno(EINVAL));
- EXPECT_THAT(splice(inf.get(), &in_offset, outf.get(), nullptr, 1, 0),
+ EXPECT_THAT(splice(in_fd.get(), &in_offset, out_fd.get(), nullptr, 1, 0),
SyscallFailsWithErrno(EINVAL));
- EXPECT_THAT(splice(inf.get(), nullptr, outf.get(), nullptr, 1, 0),
+ EXPECT_THAT(splice(in_fd.get(), nullptr, out_fd.get(), nullptr, 1, 0),
SyscallFailsWithErrno(EINVAL));
}
@@ -75,8 +79,6 @@ TEST(SpliceTest, SamePipe) {
}
TEST(TeeTest, SamePipe) {
- SKIP_IF(IsRunningOnGvisor());
-
// Create a new pipe.
int fds[2];
ASSERT_THAT(pipe(fds), SyscallSucceeds());
@@ -95,11 +97,9 @@ TEST(TeeTest, SamePipe) {
}
TEST(TeeTest, RegularFile) {
- SKIP_IF(IsRunningOnGvisor());
-
// Open some file.
const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
- const FileDescriptor inf =
+ const FileDescriptor in_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDWR));
// Create a new pipe.
@@ -109,9 +109,9 @@ TEST(TeeTest, RegularFile) {
const FileDescriptor wfd(fds[1]);
// Attempt to tee from the file.
- EXPECT_THAT(tee(inf.get(), wfd.get(), kPageSize, 0),
+ EXPECT_THAT(tee(in_fd.get(), wfd.get(), kPageSize, 0),
SyscallFailsWithErrno(EINVAL));
- EXPECT_THAT(tee(rfd.get(), inf.get(), kPageSize, 0),
+ EXPECT_THAT(tee(rfd.get(), in_fd.get(), kPageSize, 0),
SyscallFailsWithErrno(EINVAL));
}
@@ -142,7 +142,7 @@ TEST(SpliceTest, FromEventFD) {
constexpr uint64_t kEventFDValue = 1;
int efd;
ASSERT_THAT(efd = eventfd(kEventFDValue, 0), SyscallSucceeds());
- const FileDescriptor inf(efd);
+ const FileDescriptor in_fd(efd);
// Create a new pipe.
int fds[2];
@@ -152,7 +152,7 @@ TEST(SpliceTest, FromEventFD) {
// Splice 8-byte eventfd value to pipe.
constexpr int kEventFDSize = 8;
- EXPECT_THAT(splice(inf.get(), nullptr, wfd.get(), nullptr, kEventFDSize, 0),
+ EXPECT_THAT(splice(in_fd.get(), nullptr, wfd.get(), nullptr, kEventFDSize, 0),
SyscallSucceedsWithValue(kEventFDSize));
// Contents should be equal.
@@ -166,7 +166,7 @@ TEST(SpliceTest, FromEventFD) {
TEST(SpliceTest, FromEventFDOffset) {
int efd;
ASSERT_THAT(efd = eventfd(0, 0), SyscallSucceeds());
- const FileDescriptor inf(efd);
+ const FileDescriptor in_fd(efd);
// Create a new pipe.
int fds[2];
@@ -179,7 +179,7 @@ TEST(SpliceTest, FromEventFDOffset) {
// This is not allowed because eventfd doesn't support pread.
constexpr int kEventFDSize = 8;
loff_t in_off = 0;
- EXPECT_THAT(splice(inf.get(), &in_off, wfd.get(), nullptr, kEventFDSize, 0),
+ EXPECT_THAT(splice(in_fd.get(), &in_off, wfd.get(), nullptr, kEventFDSize, 0),
SyscallFailsWithErrno(EINVAL));
}
@@ -200,28 +200,29 @@ TEST(SpliceTest, ToEventFDOffset) {
int efd;
ASSERT_THAT(efd = eventfd(0, 0), SyscallSucceeds());
- const FileDescriptor outf(efd);
+ const FileDescriptor out_fd(efd);
// Attempt to splice 8-byte eventfd value to pipe with offset.
//
// This is not allowed because eventfd doesn't support pwrite.
loff_t out_off = 0;
- EXPECT_THAT(splice(rfd.get(), nullptr, outf.get(), &out_off, kEventFDSize, 0),
- SyscallFailsWithErrno(EINVAL));
+ EXPECT_THAT(
+ splice(rfd.get(), nullptr, out_fd.get(), &out_off, kEventFDSize, 0),
+ SyscallFailsWithErrno(EINVAL));
}
TEST(SpliceTest, ToPipe) {
// Open the input file.
const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
- const FileDescriptor inf =
+ const FileDescriptor in_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDWR));
// Fill with some random data.
std::vector<char> buf(kPageSize);
RandomizeBuffer(buf.data(), buf.size());
- ASSERT_THAT(write(inf.get(), buf.data(), buf.size()),
+ ASSERT_THAT(write(in_fd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(kPageSize));
- ASSERT_THAT(lseek(inf.get(), 0, SEEK_SET), SyscallSucceedsWithValue(0));
+ ASSERT_THAT(lseek(in_fd.get(), 0, SEEK_SET), SyscallSucceedsWithValue(0));
// Create a new pipe.
int fds[2];
@@ -230,7 +231,7 @@ TEST(SpliceTest, ToPipe) {
const FileDescriptor wfd(fds[1]);
// Splice to the pipe.
- EXPECT_THAT(splice(inf.get(), nullptr, wfd.get(), nullptr, kPageSize, 0),
+ EXPECT_THAT(splice(in_fd.get(), nullptr, wfd.get(), nullptr, kPageSize, 0),
SyscallSucceedsWithValue(kPageSize));
// Contents should be equal.
@@ -243,13 +244,13 @@ TEST(SpliceTest, ToPipe) {
TEST(SpliceTest, ToPipeOffset) {
// Open the input file.
const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
- const FileDescriptor inf =
+ const FileDescriptor in_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDWR));
// Fill with some random data.
std::vector<char> buf(kPageSize);
RandomizeBuffer(buf.data(), buf.size());
- ASSERT_THAT(write(inf.get(), buf.data(), buf.size()),
+ ASSERT_THAT(write(in_fd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(kPageSize));
// Create a new pipe.
@@ -261,7 +262,7 @@ TEST(SpliceTest, ToPipeOffset) {
// Splice to the pipe.
loff_t in_offset = kPageSize / 2;
EXPECT_THAT(
- splice(inf.get(), &in_offset, wfd.get(), nullptr, kPageSize / 2, 0),
+ splice(in_fd.get(), &in_offset, wfd.get(), nullptr, kPageSize / 2, 0),
SyscallSucceedsWithValue(kPageSize / 2));
// Contents should be equal to only the second part.
@@ -286,22 +287,22 @@ TEST(SpliceTest, FromPipe) {
// Open the input file.
const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
- const FileDescriptor outf =
+ const FileDescriptor out_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_RDWR));
// Splice to the output file.
- EXPECT_THAT(splice(rfd.get(), nullptr, outf.get(), nullptr, kPageSize, 0),
+ EXPECT_THAT(splice(rfd.get(), nullptr, out_fd.get(), nullptr, kPageSize, 0),
SyscallSucceedsWithValue(kPageSize));
// The offset of the output should be equal to kPageSize. We assert that and
// reset to zero so that we can read the contents and ensure they match.
- EXPECT_THAT(lseek(outf.get(), 0, SEEK_CUR),
+ EXPECT_THAT(lseek(out_fd.get(), 0, SEEK_CUR),
SyscallSucceedsWithValue(kPageSize));
- ASSERT_THAT(lseek(outf.get(), 0, SEEK_SET), SyscallSucceedsWithValue(0));
+ ASSERT_THAT(lseek(out_fd.get(), 0, SEEK_SET), SyscallSucceedsWithValue(0));
// Contents should be equal.
std::vector<char> rbuf(kPageSize);
- ASSERT_THAT(read(outf.get(), rbuf.data(), rbuf.size()),
+ ASSERT_THAT(read(out_fd.get(), rbuf.data(), rbuf.size()),
SyscallSucceedsWithValue(kPageSize));
EXPECT_EQ(memcmp(rbuf.data(), buf.data(), buf.size()), 0);
}
@@ -321,18 +322,19 @@ TEST(SpliceTest, FromPipeOffset) {
// Open the input file.
const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
- const FileDescriptor outf =
+ const FileDescriptor out_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_RDWR));
// Splice to the output file.
loff_t out_offset = kPageSize / 2;
- EXPECT_THAT(splice(rfd.get(), nullptr, outf.get(), &out_offset, kPageSize, 0),
- SyscallSucceedsWithValue(kPageSize));
+ EXPECT_THAT(
+ splice(rfd.get(), nullptr, out_fd.get(), &out_offset, kPageSize, 0),
+ SyscallSucceedsWithValue(kPageSize));
// Content should reflect the splice. We write to a specific offset in the
// file, so the internals should now be allocated sparsely.
std::vector<char> rbuf(kPageSize);
- ASSERT_THAT(read(outf.get(), rbuf.data(), rbuf.size()),
+ ASSERT_THAT(read(out_fd.get(), rbuf.data(), rbuf.size()),
SyscallSucceedsWithValue(kPageSize));
std::vector<char> zbuf(kPageSize / 2);
memset(zbuf.data(), 0, zbuf.size());
@@ -404,8 +406,6 @@ TEST(SpliceTest, Blocking) {
}
TEST(TeeTest, Blocking) {
- SKIP_IF(IsRunningOnGvisor());
-
// Create two new pipes.
int first[2], second[2];
ASSERT_THAT(pipe(first), SyscallSucceeds());
@@ -440,6 +440,49 @@ TEST(TeeTest, Blocking) {
EXPECT_EQ(memcmp(rbuf.data(), buf.data(), kPageSize), 0);
}
+TEST(TeeTest, BlockingWrite) {
+ // Create two new pipes.
+ int first[2], second[2];
+ ASSERT_THAT(pipe(first), SyscallSucceeds());
+ const FileDescriptor rfd1(first[0]);
+ const FileDescriptor wfd1(first[1]);
+ ASSERT_THAT(pipe(second), SyscallSucceeds());
+ const FileDescriptor rfd2(second[0]);
+ const FileDescriptor wfd2(second[1]);
+
+ // Make some data available to be read.
+ std::vector<char> buf1(kPageSize);
+ RandomizeBuffer(buf1.data(), buf1.size());
+ ASSERT_THAT(write(wfd1.get(), buf1.data(), buf1.size()),
+ SyscallSucceedsWithValue(kPageSize));
+
+ // Fill up the write pipe's buffer.
+ int pipe_size = -1;
+ ASSERT_THAT(pipe_size = fcntl(wfd2.get(), F_GETPIPE_SZ), SyscallSucceeds());
+ std::vector<char> buf2(pipe_size);
+ ASSERT_THAT(write(wfd2.get(), buf2.data(), buf2.size()),
+ SyscallSucceedsWithValue(pipe_size));
+
+ ScopedThread t([&]() {
+ absl::SleepFor(absl::Milliseconds(100));
+ ASSERT_THAT(read(rfd2.get(), buf2.data(), buf2.size()),
+ SyscallSucceedsWithValue(pipe_size));
+ });
+
+ // Attempt a tee immediately; it should block.
+ EXPECT_THAT(tee(rfd1.get(), wfd2.get(), kPageSize, 0),
+ SyscallSucceedsWithValue(kPageSize));
+
+ // Thread should be joinable.
+ t.Join();
+
+ // Content should reflect the tee.
+ std::vector<char> rbuf(kPageSize);
+ ASSERT_THAT(read(rfd2.get(), rbuf.data(), rbuf.size()),
+ SyscallSucceedsWithValue(kPageSize));
+ EXPECT_EQ(memcmp(rbuf.data(), buf1.data(), kPageSize), 0);
+}
+
TEST(SpliceTest, NonBlocking) {
// Create two new pipes.
int first[2], second[2];
@@ -457,8 +500,6 @@ TEST(SpliceTest, NonBlocking) {
}
TEST(TeeTest, NonBlocking) {
- SKIP_IF(IsRunningOnGvisor());
-
// Create two new pipes.
int first[2], second[2];
ASSERT_THAT(pipe(first), SyscallSucceeds());
@@ -473,6 +514,79 @@ TEST(TeeTest, NonBlocking) {
SyscallFailsWithErrno(EAGAIN));
}
+TEST(TeeTest, MultiPage) {
+ // Create two new pipes.
+ int first[2], second[2];
+ ASSERT_THAT(pipe(first), SyscallSucceeds());
+ const FileDescriptor rfd1(first[0]);
+ const FileDescriptor wfd1(first[1]);
+ ASSERT_THAT(pipe(second), SyscallSucceeds());
+ const FileDescriptor rfd2(second[0]);
+ const FileDescriptor wfd2(second[1]);
+
+ // Make some data available to be read.
+ std::vector<char> wbuf(8 * kPageSize);
+ RandomizeBuffer(wbuf.data(), wbuf.size());
+ ASSERT_THAT(write(wfd1.get(), wbuf.data(), wbuf.size()),
+ SyscallSucceedsWithValue(wbuf.size()));
+
+ // Attempt a tee immediately; it should complete.
+ EXPECT_THAT(tee(rfd1.get(), wfd2.get(), wbuf.size(), 0),
+ SyscallSucceedsWithValue(wbuf.size()));
+
+ // Content should reflect the tee.
+ std::vector<char> rbuf(wbuf.size());
+ ASSERT_THAT(read(rfd2.get(), rbuf.data(), rbuf.size()),
+ SyscallSucceedsWithValue(rbuf.size()));
+ EXPECT_EQ(memcmp(rbuf.data(), wbuf.data(), rbuf.size()), 0);
+ ASSERT_THAT(read(rfd1.get(), rbuf.data(), rbuf.size()),
+ SyscallSucceedsWithValue(rbuf.size()));
+ EXPECT_EQ(memcmp(rbuf.data(), wbuf.data(), rbuf.size()), 0);
+}
+
+TEST(SpliceTest, FromPipeMaxFileSize) {
+ // Create a new pipe.
+ int fds[2];
+ ASSERT_THAT(pipe(fds), SyscallSucceeds());
+ const FileDescriptor rfd(fds[0]);
+ const FileDescriptor wfd(fds[1]);
+
+ // Fill with some random data.
+ std::vector<char> buf(kPageSize);
+ RandomizeBuffer(buf.data(), buf.size());
+ ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(kPageSize));
+
+ // Open the input file.
+ const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
+ const FileDescriptor out_fd =
+ ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_RDWR));
+
+ EXPECT_THAT(ftruncate(out_fd.get(), 13 << 20), SyscallSucceeds());
+ EXPECT_THAT(lseek(out_fd.get(), 0, SEEK_END),
+ SyscallSucceedsWithValue(13 << 20));
+
+ // Set our file size limit.
+ sigset_t set;
+ sigemptyset(&set);
+ sigaddset(&set, SIGXFSZ);
+ TEST_PCHECK(sigprocmask(SIG_BLOCK, &set, nullptr) == 0);
+ rlimit rlim = {};
+ rlim.rlim_cur = rlim.rlim_max = (13 << 20);
+ EXPECT_THAT(setrlimit(RLIMIT_FSIZE, &rlim), SyscallSucceeds());
+
+ // Splice to the output file.
+ EXPECT_THAT(
+ splice(rfd.get(), nullptr, out_fd.get(), nullptr, 3 * kPageSize, 0),
+ SyscallFailsWithErrno(EFBIG));
+
+ // Contents should be equal.
+ std::vector<char> rbuf(kPageSize);
+ ASSERT_THAT(read(rfd.get(), rbuf.data(), rbuf.size()),
+ SyscallSucceedsWithValue(kPageSize));
+ EXPECT_EQ(memcmp(rbuf.data(), buf.data(), buf.size()), 0);
+}
+
} // namespace
} // namespace testing