diff options
-rw-r--r-- | pkg/abi/linux/fcntl.go | 4 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/BUILD | 8 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/buffer.go | 90 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/buffer_test.go | 32 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/buffers.go | 48 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/node.go | 7 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe.go | 315 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe_test.go | 7 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/reader.go | 3 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/reader_writer.go | 6 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/writer.go | 3 | ||||
-rw-r--r-- | pkg/sentry/syscalls/linux/sys_file.go | 14 | ||||
-rw-r--r-- | test/syscalls/BUILD | 6 | ||||
-rw-r--r-- | test/syscalls/linux/BUILD | 2 | ||||
-rw-r--r-- | test/syscalls/linux/pipe.cc | 761 |
15 files changed, 828 insertions, 478 deletions
diff --git a/pkg/abi/linux/fcntl.go b/pkg/abi/linux/fcntl.go index cc8f2702d..b30350193 100644 --- a/pkg/abi/linux/fcntl.go +++ b/pkg/abi/linux/fcntl.go @@ -17,7 +17,6 @@ package linux // Comands from linux/fcntl.h. const ( F_DUPFD = 0 - F_DUPFD_CLOEXEC = 1030 F_GETFD = 1 F_GETFL = 3 F_GETOWN = 9 @@ -26,6 +25,9 @@ const ( F_SETLK = 6 F_SETLKW = 7 F_SETOWN = 8 + F_DUPFD_CLOEXEC = 1024 + 6 + F_SETPIPE_SZ = 1024 + 7 + F_GETPIPE_SZ = 1024 + 8 ) // Flags for fcntl. diff --git a/pkg/sentry/kernel/pipe/BUILD b/pkg/sentry/kernel/pipe/BUILD index 6b23117d9..b07d15a2a 100644 --- a/pkg/sentry/kernel/pipe/BUILD +++ b/pkg/sentry/kernel/pipe/BUILD @@ -10,16 +10,16 @@ go_template_instance( prefix = "buffer", template = "//pkg/ilist:generic_list", types = { - "Element": "*Buffer", - "Linker": "*Buffer", + "Element": "*buffer", + "Linker": "*buffer", }, ) go_library( name = "pipe", srcs = [ + "buffer.go", "buffer_list.go", - "buffers.go", "device.go", "node.go", "pipe.go", @@ -37,6 +37,7 @@ go_library( "//pkg/sentry/device", "//pkg/sentry/fs", "//pkg/sentry/fs/fsutil", + "//pkg/sentry/safemem", "//pkg/sentry/usermem", "//pkg/syserror", "//pkg/waiter", @@ -47,6 +48,7 @@ go_test( name = "pipe_test", size = "small", srcs = [ + "buffer_test.go", "node_test.go", "pipe_test.go", ], diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go new file mode 100644 index 000000000..4360dc44f --- /dev/null +++ b/pkg/sentry/kernel/pipe/buffer.go @@ -0,0 +1,90 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "sync" + + "gvisor.googlesource.com/gvisor/pkg/sentry/safemem" +) + +// buffer encapsulates a queueable byte buffer. +// +// Note that the total size is slightly less than two pages. This +// is done intentionally to ensure that the buffer object aligns +// with runtime internals. We have no hard size or alignment +// requirements. This two page size will effectively minimize +// internal fragmentation, but still have a large enough chunk +// to limit excessive segmentation. +// +// +stateify savable +type buffer struct { + data [8144]byte + read int + write int + bufferEntry +} + +// Reset resets internal data. +// +// This must be called before use. +func (b *buffer) Reset() { + b.read = 0 + b.write = 0 +} + +// Empty indicates the buffer is empty. +// +// This indicates there is no data left to read. +func (b *buffer) Empty() bool { + return b.read == b.write +} + +// Full indicates the buffer is full. +// +// This indicates there is no capacity left to write. +func (b *buffer) Full() bool { + return b.write == len(b.data) +} + +// WriteFromBlocks implements safemem.Writer.WriteFromBlocks. +func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) { + dst := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.write:])) + n, err := safemem.CopySeq(dst, srcs) + b.write += int(n) + return n, err +} + +// ReadToBlocks implements safemem.Reader.ReadToBlocks. +func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) { + src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write])) + n, err := safemem.CopySeq(dsts, src) + b.read += int(n) + return n, err +} + +// bufferPool is a pool for buffers. +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(buffer) + }, +} + +// newBuffer grabs a new buffer from the pool. +func newBuffer() *buffer { + b := bufferPool.Get().(*buffer) + b.Reset() + return b +} diff --git a/pkg/sentry/kernel/pipe/buffer_test.go b/pkg/sentry/kernel/pipe/buffer_test.go new file mode 100644 index 000000000..4b7dbc43f --- /dev/null +++ b/pkg/sentry/kernel/pipe/buffer_test.go @@ -0,0 +1,32 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "testing" + "unsafe" + + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" +) + +func TestBufferSize(t *testing.T) { + bufferSize := unsafe.Sizeof(buffer{}) + if bufferSize < usermem.PageSize { + t.Errorf("buffer is less than a page") + } + if bufferSize > (2 * usermem.PageSize) { + t.Errorf("buffer is greater than two pages") + } +} diff --git a/pkg/sentry/kernel/pipe/buffers.go b/pkg/sentry/kernel/pipe/buffers.go deleted file mode 100644 index ba53fd482..000000000 --- a/pkg/sentry/kernel/pipe/buffers.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -// Buffer encapsulates a queueable byte buffer that can -// easily be truncated. It is designed only for use with pipes. -// -// +stateify savable -type Buffer struct { - bufferEntry - data []byte -} - -// newBuffer initializes a Buffer. -func newBuffer(buf []byte) *Buffer { - return &Buffer{data: buf} -} - -// bytes returns the bytes contained in the buffer. -func (b *Buffer) bytes() []byte { - return b.data -} - -// size returns the number of bytes contained in the buffer. -func (b *Buffer) size() int { - return len(b.data) -} - -// truncate removes the first n bytes from the buffer. -func (b *Buffer) truncate(n int) int { - if n > len(b.data) { - panic("Trying to truncate past end of array.") - } - b.data = b.data[n:] - return len(b.data) -} diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go index 7c3739360..926c4c623 100644 --- a/pkg/sentry/kernel/pipe/node.go +++ b/pkg/sentry/kernel/pipe/node.go @@ -67,7 +67,6 @@ func NewInodeOperations(ctx context.Context, perms fs.FilePermissions, p *Pipe) InodeSimpleAttributes: fsutil.NewInodeSimpleAttributes(ctx, fs.FileOwnerFromContext(ctx), perms, linux.PIPEFS_MAGIC), p: p, } - } // GetFile implements fs.InodeOperations.GetFile. Named pipes have special blocking @@ -87,7 +86,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi switch { case flags.Read && !flags.Write: // O_RDONLY. - r := i.p.ROpen(ctx) + r := i.p.Open(ctx, flags) i.newHandleLocked(&i.rWakeup) if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() { @@ -103,7 +102,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi return r, nil case flags.Write && !flags.Read: // O_WRONLY. - w := i.p.WOpen(ctx) + w := i.p.Open(ctx, flags) i.newHandleLocked(&i.wWakeup) if i.p.isNamed && !i.p.HasReaders() { @@ -123,7 +122,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi case flags.Read && flags.Write: // O_RDWR. // Pipes opened for read-write always succeeds without blocking. - rw := i.p.RWOpen(ctx) + rw := i.p.Open(ctx, flags) i.newHandleLocked(&i.rWakeup) i.newHandleLocked(&i.wWakeup) return rw, nil diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go index bd7649d2f..b65204492 100644 --- a/pkg/sentry/kernel/pipe/pipe.go +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -12,11 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package pipe provides an in-memory implementation of a unidirectional -// pipe. -// -// The goal of this pipe is to emulate the pipe syscall in all of its -// edge cases and guarantees of atomic IO. +// Package pipe provides a pipe implementation. package pipe import ( @@ -32,8 +28,29 @@ import ( "gvisor.googlesource.com/gvisor/pkg/waiter" ) -// DefaultPipeSize is the system-wide default size of a pipe in bytes. -const DefaultPipeSize = 65536 +const ( + // MinimumPipeSize is a hard limit of the minimum size of a pipe. + MinimumPipeSize = 64 << 10 + + // DefaultPipeSize is the system-wide default size of a pipe in bytes. + DefaultPipeSize = MinimumPipeSize + + // MaximumPipeSize is a hard limit on the maximum size of a pipe. + MaximumPipeSize = 8 << 20 +) + +// Sizer is an interface for setting and getting the size of a pipe. +// +// It is implemented by Pipe and, through embedding, all other types. +type Sizer interface { + // PipeSize returns the pipe capacity in bytes. + PipeSize() int64 + + // SetPipeSize sets the new pipe capacity in bytes. + // + // The new size is returned (which may be capped). + SetPipeSize(int64) (int64, error) +} // Pipe is an encapsulation of a platform-independent pipe. // It manages a buffered byte queue shared between a reader/writer @@ -43,49 +60,76 @@ const DefaultPipeSize = 65536 type Pipe struct { waiter.Queue `state:"nosave"` - // Whether this is a named or anonymous pipe. + // isNamed indicates whether this is a named pipe. + // + // This value is immutable. isNamed bool + // atomicIOBytes is the maximum number of bytes that the pipe will + // guarantee atomic reads or writes atomically. + // + // This value is immutable. + atomicIOBytes int64 + // The dirent backing this pipe. Shared by all readers and writers. + // + // This value is immutable. Dirent *fs.Dirent - // The buffered byte queue. - data bufferList + // The number of active readers for this pipe. + // + // Access atomically. + readers int32 - // Max size of the pipe in bytes. When this max has been reached, - // writers will get EWOULDBLOCK. - max int + // The number of active writes for this pipe. + // + // Access atomically. + writers int32 - // Current size of the pipe in bytes. - size int + // mu protects all pipe internal state below. + mu sync.Mutex `state:"nosave"` - // Max number of bytes the pipe can guarantee to read or write - // atomically. - atomicIOBytes int + // data is the buffer queue of pipe contents. + // + // This is protected by mu. + data bufferList - // The number of active readers for this pipe. Load/store atomically. - readers int32 + // max is the maximum size of the pipe in bytes. When this max has been + // reached, writers will get EWOULDBLOCK. + // + // This is protected by mu. + max int64 - // The number of active writes for this pipe. Load/store atomically. - writers int32 + // size is the current size of the pipe in bytes. + // + // This is protected by mu. + size int64 - // This flag indicates if this pipe ever had a writer. Note that this does - // not necessarily indicate there is *currently* a writer, just that there - // has been a writer at some point since the pipe was created. + // hadWriter indicates if this pipe ever had a writer. Note that this + // does not necessarily indicate there is *currently* a writer, just + // that there has been a writer at some point since the pipe was + // created. // - // Protected by mu. + // This is protected by mu. hadWriter bool - - // Lock protecting all pipe internal state. - mu sync.Mutex `state:"nosave"` } -// NewPipe initializes and returns a pipe. A pipe created by this function is -// persistent, and will remain valid even without any open fds to it. Named -// pipes for mknod(2) are created via this function. Note that the -// implementation of blocking semantics for opening the read and write ends of a -// named pipe are left to filesystems. -func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe { +// NewPipe initializes and returns a pipe. +// +// N.B. The size and atomicIOBytes will be bounded. +func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe { + if sizeBytes < MinimumPipeSize { + sizeBytes = MinimumPipeSize + } + if sizeBytes > MaximumPipeSize { + sizeBytes = MaximumPipeSize + } + if atomicIOBytes <= 0 { + atomicIOBytes = 1 + } + if atomicIOBytes > sizeBytes { + atomicIOBytes = sizeBytes + } p := &Pipe{ isNamed: isNamed, max: sizeBytes, @@ -110,48 +154,45 @@ func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *P return p } -// NewConnectedPipe initializes a pipe and returns a pair of objects (which -// implement kio.File) representing the read and write ends of the pipe. A pipe -// created by this function becomes invalid as soon as either the read or write -// end is closed, and errors on subsequent operations on either end. Pipes -// for pipe(2) and pipe2(2) are generally created this way. -func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) { +// NewConnectedPipe initializes a pipe and returns a pair of objects +// representing the read and write ends of the pipe. +func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs.File, *fs.File) { p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes) - return p.ROpen(ctx), p.WOpen(ctx) -} - -// ROpen opens the pipe for reading. -func (p *Pipe) ROpen(ctx context.Context) *fs.File { - p.rOpen() - return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true}, &Reader{ - ReaderWriter: ReaderWriter{Pipe: p}, - }) -} - -// WOpen opens the pipe for writing. -func (p *Pipe) WOpen(ctx context.Context) *fs.File { - p.wOpen() - return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Write: true}, &Writer{ - ReaderWriter: ReaderWriter{Pipe: p}, - }) + return p.Open(ctx, fs.FileFlags{Read: true}), p.Open(ctx, fs.FileFlags{Write: true}) } -// RWOpen opens the pipe for both reading and writing. -func (p *Pipe) RWOpen(ctx context.Context) *fs.File { - p.rOpen() - p.wOpen() - return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{ - Pipe: p, - }) +// Open opens the pipe and returns a new file. +// +// Precondition: at least one of flags.Read or flags.Write must be set. +func (p *Pipe) Open(ctx context.Context, flags fs.FileFlags) *fs.File { + switch { + case flags.Read && flags.Write: + p.rOpen() + p.wOpen() + return fs.NewFile(ctx, p.Dirent, flags, &ReaderWriter{ + Pipe: p, + }) + case flags.Read: + p.rOpen() + return fs.NewFile(ctx, p.Dirent, flags, &Reader{ + ReaderWriter: ReaderWriter{Pipe: p}, + }) + case flags.Write: + p.wOpen() + return fs.NewFile(ctx, p.Dirent, flags, &Writer{ + ReaderWriter: ReaderWriter{Pipe: p}, + }) + default: + // Precondition violated. + panic("invalid pipe flags") + } } // read reads data from the pipe into dst and returns the number of bytes // read, or returns ErrWouldBlock if the pipe is empty. +// +// Precondition: this pipe must have readers. func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) { - if !p.HasReaders() { - return 0, syscall.EBADF - } - // Don't block for a zero-length read even if the pipe is empty. if dst.NumBytes() == 0 { return 0, nil @@ -159,8 +200,8 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) p.mu.Lock() defer p.mu.Unlock() - // If there is nothing to read at the moment but there is a writer, tell the - // caller to block. + + // Is the pipe empty? if p.size == 0 { if !p.HasWriters() { // There are no writers, return EOF. @@ -168,64 +209,94 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) } return 0, syserror.ErrWouldBlock } - var n int64 - for buffer := p.data.Front(); buffer != nil; buffer = p.data.Front() { - n0, err := dst.CopyOut(ctx, buffer.bytes()) - n += int64(n0) - p.size -= n0 - if buffer.truncate(n0) == 0 { - p.data.Remove(buffer) + + // Limit how much we consume. + if dst.NumBytes() > p.size { + dst = dst.TakeFirst64(p.size) + } + + done := int64(0) + for dst.NumBytes() > 0 { + // Pop the first buffer. + first := p.data.Front() + if first == nil { + break } - dst = dst.DropFirst(n0) - if dst.NumBytes() == 0 || err != nil { - return n, err + + // Copy user data. + n, err := dst.CopyOutFrom(ctx, first) + done += int64(n) + p.size -= n + dst = dst.DropFirst64(n) + + // Empty buffer? + if first.Empty() { + // Push to the free list. + p.data.Remove(first) + bufferPool.Put(first) + } + + // Handle errors. + if err != nil { + return done, err } } - return n, nil + + return done, nil } // write writes data from sv into the pipe and returns the number of bytes // written. If no bytes are written because the pipe is full (or has less than // atomicIOBytes free capacity), write returns ErrWouldBlock. +// +// Precondition: this pipe must have writers. func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) { p.mu.Lock() defer p.mu.Unlock() - if !p.HasWriters() { - return 0, syscall.EBADF - } + // Can't write to a pipe with no readers. if !p.HasReaders() { return 0, syscall.EPIPE } // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be - // atomic, but requires no atomicity for writes larger than this. However, - // Linux appears to provide stronger semantics than this in practice: - // unmerged writes are done one PAGE_SIZE buffer at a time, so for larger - // writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement - // this by writing at most atomicIOBytes at a time if we can't service the - // write in its entirety. - canWrite := src.NumBytes() - if canWrite > int64(p.max-p.size) { - if p.max-p.size >= p.atomicIOBytes { - canWrite = int64(p.atomicIOBytes) - } else { + // atomic, but requires no atomicity for writes larger than this. + wanted := src.NumBytes() + if avail := p.max - p.size; wanted > avail { + if wanted <= p.atomicIOBytes { return 0, syserror.ErrWouldBlock } + // Limit to the available capacity. + src = src.TakeFirst64(avail) } - // Copy data from user memory into a pipe-owned buffer. - buf := make([]byte, canWrite) - n, err := src.CopyIn(ctx, buf) - if n > 0 { - p.data.PushBack(newBuffer(buf[:n])) + done := int64(0) + for src.NumBytes() > 0 { + // Need a new buffer? + last := p.data.Back() + if last == nil || last.Full() { + // Add a new buffer to the data list. + last = newBuffer() + p.data.PushBack(last) + } + + // Copy user data. + n, err := src.CopyInTo(ctx, last) + done += int64(n) p.size += n + src = src.DropFirst64(n) + + // Handle errors. + if err != nil { + return done, err + } } - if int64(n) < src.NumBytes() && err == nil { + if wanted > done { // Partial write due to full pipe. - err = syserror.ErrWouldBlock + return done, syserror.ErrWouldBlock } - return int64(n), err + + return done, nil } // rOpen signals a new reader of the pipe. @@ -267,6 +338,9 @@ func (p *Pipe) HasWriters() bool { return atomic.LoadInt32(&p.writers) > 0 } +// rReadinessLocked calculates the read readiness. +// +// Precondition: mu must be held. func (p *Pipe) rReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasReaders() && p.data.Front() != nil { @@ -290,6 +364,9 @@ func (p *Pipe) rReadiness() waiter.EventMask { return p.rReadinessLocked() } +// wReadinessLocked calculates the write readiness. +// +// Precondition: mu must be held. func (p *Pipe) wReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasWriters() && p.size < p.max { @@ -317,8 +394,36 @@ func (p *Pipe) rwReadiness() waiter.EventMask { return p.rReadinessLocked() | p.wReadinessLocked() } -func (p *Pipe) queuedSize() int { +// queued returns the amount of queued data. +func (p *Pipe) queued() int64 { p.mu.Lock() defer p.mu.Unlock() return p.size } + +// PipeSize implements PipeSizer.PipeSize. +func (p *Pipe) PipeSize() int64 { + p.mu.Lock() + defer p.mu.Unlock() + return p.max +} + +// SetPipeSize implements PipeSize.SetPipeSize. +func (p *Pipe) SetPipeSize(size int64) (int64, error) { + if size < 0 { + return 0, syserror.EINVAL + } + if size < MinimumPipeSize { + size = MinimumPipeSize // Per spec. + } + if size > MaximumPipeSize { + return 0, syserror.EPERM + } + p.mu.Lock() + defer p.mu.Unlock() + if size < p.size { + return 0, syserror.EBUSY + } + p.max = size + return size, nil +} diff --git a/pkg/sentry/kernel/pipe/pipe_test.go b/pkg/sentry/kernel/pipe/pipe_test.go index de340c40c..298c6587b 100644 --- a/pkg/sentry/kernel/pipe/pipe_test.go +++ b/pkg/sentry/kernel/pipe/pipe_test.go @@ -58,15 +58,16 @@ func TestPipeReadBlock(t *testing.T) { func TestPipeWriteBlock(t *testing.T) { const atomicIOBytes = 2 + const capacity = MinimumPipeSize ctx := contexttest.Context(t) - r, w := NewConnectedPipe(ctx, 10, atomicIOBytes) + r, w := NewConnectedPipe(ctx, capacity, atomicIOBytes) defer r.DecRef() defer w.DecRef() - msg := []byte("here's some bytes") + msg := make([]byte, capacity+1) n, err := w.Writev(ctx, usermem.BytesIOSequence(msg)) - if wantN, wantErr := int64(atomicIOBytes), syserror.ErrWouldBlock; n != wantN || err != wantErr { + if wantN, wantErr := int64(capacity), syserror.ErrWouldBlock; n != wantN || err != wantErr { t.Fatalf("Writev: got (%d, %v), wanted (%d, %v)", n, err, wantN, wantErr) } } diff --git a/pkg/sentry/kernel/pipe/reader.go b/pkg/sentry/kernel/pipe/reader.go index 48fab45d1..656be824d 100644 --- a/pkg/sentry/kernel/pipe/reader.go +++ b/pkg/sentry/kernel/pipe/reader.go @@ -27,8 +27,11 @@ type Reader struct { } // Release implements fs.FileOperations.Release. +// +// This overrides ReaderWriter.Release. func (r *Reader) Release() { r.Pipe.rClose() + // Wake up writers. r.Pipe.Notify(waiter.EventOut) } diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go index 59899be49..e560b9be9 100644 --- a/pkg/sentry/kernel/pipe/reader_writer.go +++ b/pkg/sentry/kernel/pipe/reader_writer.go @@ -15,7 +15,6 @@ package pipe import ( - "fmt" "math" "syscall" @@ -49,6 +48,7 @@ type ReaderWriter struct { func (rw *ReaderWriter) Release() { rw.Pipe.rClose() rw.Pipe.wClose() + // Wake up readers and writers. rw.Pipe.Notify(waiter.EventIn | waiter.EventOut) } @@ -81,9 +81,9 @@ func (rw *ReaderWriter) Ioctl(ctx context.Context, io usermem.IO, args arch.Sysc // Switch on ioctl request. switch int(args[1].Int()) { case linux.FIONREAD: - v := rw.queuedSize() + v := rw.queued() if v > math.MaxInt32 { - panic(fmt.Sprintf("Impossibly large pipe queued size: %d", v)) + v = math.MaxInt32 // Silently truncate. } // Copy result to user-space. _, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{ diff --git a/pkg/sentry/kernel/pipe/writer.go b/pkg/sentry/kernel/pipe/writer.go index 0f29fbc43..8d5b68541 100644 --- a/pkg/sentry/kernel/pipe/writer.go +++ b/pkg/sentry/kernel/pipe/writer.go @@ -27,8 +27,11 @@ type Writer struct { } // Release implements fs.FileOperations.Release. +// +// This overrides ReaderWriter.Release. func (w *Writer) Release() { w.Pipe.wClose() + // Wake up readers. w.Pipe.Notify(waiter.EventHUp) } diff --git a/pkg/sentry/syscalls/linux/sys_file.go b/pkg/sentry/syscalls/linux/sys_file.go index 8a80cd430..19f579930 100644 --- a/pkg/sentry/syscalls/linux/sys_file.go +++ b/pkg/sentry/syscalls/linux/sys_file.go @@ -27,6 +27,7 @@ import ( "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/auth" "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/fasync" "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/kdefs" + "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/pipe" ktime "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/time" "gvisor.googlesource.com/gvisor/pkg/sentry/limits" "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" @@ -943,6 +944,19 @@ func Fcntl(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Syscall } err := tmpfs.AddSeals(file.Dirent.Inode, args[2].Uint()) return 0, nil, err + case linux.F_GETPIPE_SZ: + sz, ok := file.FileOperations.(pipe.Sizer) + if !ok { + return 0, nil, syserror.EINVAL + } + return uintptr(sz.PipeSize()), nil, nil + case linux.F_SETPIPE_SZ: + sz, ok := file.FileOperations.(pipe.Sizer) + if !ok { + return 0, nil, syserror.EINVAL + } + n, err := sz.SetPipeSize(int64(args[2].Int())) + return uintptr(n), nil, err default: // Everything else is not yet supported. return 0, nil, syserror.EINVAL diff --git a/test/syscalls/BUILD b/test/syscalls/BUILD index b531d7629..0d6b6ccc7 100644 --- a/test/syscalls/BUILD +++ b/test/syscalls/BUILD @@ -191,7 +191,11 @@ syscall_test(test = "//test/syscalls/linux:partial_bad_buffer_test") syscall_test(test = "//test/syscalls/linux:pause_test") -syscall_test(test = "//test/syscalls/linux:pipe_test") +syscall_test( + size = "large", + shard_count = 5, + test = "//test/syscalls/linux:pipe_test", +) syscall_test(test = "//test/syscalls/linux:poll_test") diff --git a/test/syscalls/linux/BUILD b/test/syscalls/linux/BUILD index d4e49bb3a..4e239617b 100644 --- a/test/syscalls/linux/BUILD +++ b/test/syscalls/linux/BUILD @@ -1237,6 +1237,8 @@ cc_binary( linkstatic = 1, deps = [ "//test/util:file_descriptor", + "//test/util:posix_error", + "//test/util:temp_path", "//test/util:test_main", "//test/util:test_util", "//test/util:thread_util", diff --git a/test/syscalls/linux/pipe.cc b/test/syscalls/linux/pipe.cc index 8698295b3..bce351e08 100644 --- a/test/syscalls/linux/pipe.cc +++ b/test/syscalls/linux/pipe.cc @@ -26,6 +26,8 @@ #include "absl/time/clock.h" #include "absl/time/time.h" #include "test/util/file_descriptor.h" +#include "test/util/posix_error.h" +#include "test/util/temp_path.h" #include "test/util/test_util.h" #include "test/util/thread_util.h" @@ -34,449 +36,588 @@ namespace testing { namespace { -// Buffer size of a pipe. -// -// TODO(b/35762278): Get this from F_GETPIPE_SZ. -constexpr int kPipeSize = 65536; +// Used as a non-zero sentinel value, below. +constexpr int kTestValue = 0x12345678; + +// Used for synchronization in race tests. +const absl::Duration syncDelay = absl::Seconds(2); + +struct PipeCreator { + std::string name_; + + // void (fds, is_blocking, is_namedpipe). + std::function<void(int[2], bool*, bool*)> create_; +}; + +class PipeTest : public ::testing::TestWithParam<PipeCreator> { + protected: + FileDescriptor rfd; + FileDescriptor wfd; -class PipeTest : public ::testing::Test { public: static void SetUpTestCase() { // Tests intentionally generate SIGPIPE. TEST_PCHECK(signal(SIGPIPE, SIG_IGN) != SIG_ERR); } + // Initializes rfd and wfd as a blocking pipe. + // + // The return value indicates success: the test should be skipped otherwise. + bool CreateBlocking() { return create(true); } + + // Initializes rfd and wfd as a non-blocking pipe. + // + // The return value is per CreateBlocking. + bool CreateNonBlocking() { return create(false); } + + // Returns true iff the pipe represents a named pipe. + bool IsNamedPipe() { return namedpipe_; } + + int Size() { + int s1 = fcntl(rfd.get(), F_GETPIPE_SZ); + int s2 = fcntl(wfd.get(), F_GETPIPE_SZ); + EXPECT_GT(s1, 0); + EXPECT_GT(s2, 0); + EXPECT_EQ(s1, s2); + return s1; + } + static void TearDownTestCase() { TEST_PCHECK(signal(SIGPIPE, SIG_DFL) != SIG_ERR); } + + private: + bool namedpipe_ = false; + + bool create(bool wants_blocking) { + // Generate the pipe. + int fds[2] = {-1, -1}; + bool is_blocking = false; + GetParam().create_(fds, &is_blocking, &namedpipe_); + if (fds[0] < 0 || fds[1] < 0) { + return false; + } + + // Save descriptors. + rfd.reset(fds[0]); + wfd.reset(fds[1]); + + // Adjust blocking, if needed. + if (!is_blocking && wants_blocking) { + // Clear the blocking flag. + EXPECT_THAT(fcntl(fds[0], F_SETFL, 0), SyscallSucceeds()); + EXPECT_THAT(fcntl(fds[1], F_SETFL, 0), SyscallSucceeds()); + } else if (is_blocking && !wants_blocking) { + // Set the descriptors to blocking. + EXPECT_THAT(fcntl(fds[0], F_SETFL, O_NONBLOCK), SyscallSucceeds()); + EXPECT_THAT(fcntl(fds[1], F_SETFL, O_NONBLOCK), SyscallSucceeds()); + } + + return true; + } }; -TEST_F(PipeTest, Basic) { - // fds[0] is read end, fds[1] is write end. - int fds[2]; - int i = 0x12345678; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); +TEST_P(PipeTest, Inode) { + SKIP_IF(!CreateBlocking()); // Ensure that the inode number is the same for each end. struct stat rst; - ASSERT_THAT(fstat(fds[0], &rst), SyscallSucceeds()); + ASSERT_THAT(fstat(rfd.get(), &rst), SyscallSucceeds()); struct stat wst; - ASSERT_THAT(fstat(fds[1], &wst), SyscallSucceeds()); + ASSERT_THAT(fstat(wfd.get(), &wst), SyscallSucceeds()); EXPECT_EQ(rst.st_ino, wst.st_ino); - - ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF)); - ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF)); - - ASSERT_THAT(write(fds[1], &i, sizeof(i)), - SyscallSucceedsWithValue(sizeof(i))); - int j; - ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j))); - EXPECT_EQ(i, j); - - ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceeds()); - ASSERT_THAT(fcntl(fds[1], F_GETFL), SyscallSucceedsWithValue(O_WRONLY)); - - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); } -TEST_F(PipeTest, BasicCloExec) { - // fds[0] is read end, fds[1] is write end. - int fds[2]; - int i = 0x12345678; - ASSERT_THAT(pipe2(fds, O_CLOEXEC), SyscallSucceeds()); +TEST_P(PipeTest, Permissions) { + SKIP_IF(!CreateBlocking()); - ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF)); - ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF)); + // Attempt bad operations. + int buf = kTestValue; + ASSERT_THAT(write(rfd.get(), &buf, sizeof(buf)), + SyscallFailsWithErrno(EBADF)); + EXPECT_THAT(read(wfd.get(), &buf, sizeof(buf)), SyscallFailsWithErrno(EBADF)); +} - ASSERT_THAT(write(fds[1], &i, sizeof(i)), - SyscallSucceedsWithValue(sizeof(i))); - int j; - ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j))); - EXPECT_EQ(i, j); +TEST_P(PipeTest, Flags) { + SKIP_IF(!CreateBlocking()); + + if (IsNamedPipe()) { + // May be stubbed to zero; define locally. + constexpr int kLargefile = 0100000; + EXPECT_THAT(fcntl(rfd.get(), F_GETFL), + SyscallSucceedsWithValue(kLargefile | O_RDONLY)); + EXPECT_THAT(fcntl(wfd.get(), F_GETFL), + SyscallSucceedsWithValue(kLargefile | O_WRONLY)); + } else { + EXPECT_THAT(fcntl(rfd.get(), F_GETFL), SyscallSucceedsWithValue(O_RDONLY)); + EXPECT_THAT(fcntl(wfd.get(), F_GETFL), SyscallSucceedsWithValue(O_WRONLY)); + } +} - ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceeds()); - ASSERT_THAT(fcntl(fds[1], F_GETFL), SyscallSucceeds()); +TEST_P(PipeTest, Write) { + SKIP_IF(!CreateBlocking()); - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); + int wbuf = kTestValue; + int rbuf = ~kTestValue; + ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)), + SyscallSucceedsWithValue(sizeof(wbuf))); + ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)), + SyscallSucceedsWithValue(sizeof(rbuf))); + EXPECT_EQ(wbuf, rbuf); } -TEST_F(PipeTest, BasicNoBlock) { - // fds[0] is read end, fds[1] is write end. - int fds[2]; - int i = 0x12345678; - ASSERT_THAT(pipe2(fds, O_NONBLOCK), SyscallSucceeds()); - - ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF)); - ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF)); - - ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK)); - ASSERT_THAT(write(fds[1], &i, sizeof(i)), - SyscallSucceedsWithValue(sizeof(i))); - int j; - ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j))); - EXPECT_EQ(i, j); - ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK)); - - ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceedsWithValue(O_NONBLOCK)); - ASSERT_THAT(fcntl(fds[1], F_GETFL), - SyscallSucceedsWithValue(O_NONBLOCK | O_WRONLY)); - - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); +TEST_P(PipeTest, NonBlocking) { + SKIP_IF(!CreateNonBlocking()); + + int wbuf = kTestValue; + int rbuf = ~kTestValue; + EXPECT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)), + SyscallFailsWithErrno(EWOULDBLOCK)); + ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)), + SyscallSucceedsWithValue(sizeof(wbuf))); + + ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)), + SyscallSucceedsWithValue(sizeof(rbuf))); + EXPECT_EQ(wbuf, rbuf); + EXPECT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)), + SyscallFailsWithErrno(EWOULDBLOCK)); } -TEST_F(PipeTest, BasicBothOptions) { - // fds[0] is read end, fds[1] is write end. +TEST(Pipe2Test, CloExec) { int fds[2]; - int i = 0x12345678; - ASSERT_THAT(pipe2(fds, O_NONBLOCK | O_CLOEXEC), SyscallSucceeds()); - - ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF)); - ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF)); - - ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK)); - ASSERT_THAT(write(fds[1], &i, sizeof(i)), - SyscallSucceedsWithValue(sizeof(i))); - int j; - ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j))); - EXPECT_EQ(i, j); - ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK)); - - ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceedsWithValue(O_NONBLOCK)); - ASSERT_THAT(fcntl(fds[1], F_GETFL), - SyscallSucceedsWithValue(O_NONBLOCK | O_WRONLY)); - - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); + ASSERT_THAT(pipe2(fds, O_CLOEXEC), SyscallSucceeds()); + EXPECT_THAT(fcntl(fds[0], F_GETFD), SyscallSucceedsWithValue(FD_CLOEXEC)); + EXPECT_THAT(fcntl(fds[1], F_GETFD), SyscallSucceedsWithValue(FD_CLOEXEC)); + EXPECT_THAT(close(fds[0]), SyscallSucceeds()); + EXPECT_THAT(close(fds[1]), SyscallSucceeds()); } -TEST_F(PipeTest, BasicBadOptions) { +TEST(Pipe2Test, BadOptions) { int fds[2]; - ASSERT_THAT(pipe2(fds, 0xDEAD), SyscallFailsWithErrno(EINVAL)); + EXPECT_THAT(pipe2(fds, 0xDEAD), SyscallFailsWithErrno(EINVAL)); } -TEST_F(PipeTest, Seek) { - // fds[0] is read end, fds[1] is write end. - int fds[2]; - int i = 0x12345678; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - - ASSERT_THAT(lseek(fds[0], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[1], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[0], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[0], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[1], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[1], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); - - ASSERT_THAT(lseek(fds[0], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[0], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[1], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[1], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - - ASSERT_THAT(write(fds[1], &i, sizeof(i)), - SyscallSucceedsWithValue(sizeof(i))); - int j; - - ASSERT_THAT(lseek(fds[0], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[0], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[1], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[1], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); - - ASSERT_THAT(lseek(fds[0], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[0], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[1], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - ASSERT_THAT(lseek(fds[1], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); - - ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j))); - EXPECT_EQ(i, j); - - ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceeds()); - ASSERT_THAT(fcntl(fds[1], F_GETFL), SyscallSucceedsWithValue(O_WRONLY)); - - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); +TEST_P(PipeTest, Seek) { + SKIP_IF(!CreateBlocking()); + + for (int i = 0; i < 4; i++) { + // Attempt absolute seeks. + EXPECT_THAT(lseek(rfd.get(), 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(lseek(rfd.get(), 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(lseek(wfd.get(), 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(lseek(wfd.get(), 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE)); + + // Attempt relative seeks. + EXPECT_THAT(lseek(rfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(lseek(rfd.get(), 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(lseek(wfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(lseek(wfd.get(), 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); + + // Attempt end-of-file seeks. + EXPECT_THAT(lseek(rfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(lseek(rfd.get(), -4, SEEK_END), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(lseek(wfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(lseek(wfd.get(), -4, SEEK_END), SyscallFailsWithErrno(ESPIPE)); + + // Add some more data to the pipe. + int buf = kTestValue; + ASSERT_THAT(write(wfd.get(), &buf, sizeof(buf)), + SyscallSucceedsWithValue(sizeof(buf))); + } } -TEST_F(PipeTest, AbsoluteOffsetSyscallsFail) { - // Syscalls for IO at absolute offsets fail because pipes are not seekable. - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - - std::vector<char> buf(4096); - struct iovec iov; +TEST_P(PipeTest, OffsetCalls) { + SKIP_IF(!CreateBlocking()); - EXPECT_THAT(pread(fds[1], buf.data(), buf.size(), 0), + int buf; + EXPECT_THAT(pread(wfd.get(), &buf, sizeof(buf), 0), SyscallFailsWithErrno(ESPIPE)); - EXPECT_THAT(pwrite(fds[0], buf.data(), buf.size(), 0), + EXPECT_THAT(pwrite(rfd.get(), &buf, sizeof(buf), 0), SyscallFailsWithErrno(ESPIPE)); - EXPECT_THAT(preadv(fds[1], &iov, 1, 0), SyscallFailsWithErrno(ESPIPE)); - EXPECT_THAT(pwritev(fds[0], &iov, 1, 0), SyscallFailsWithErrno(ESPIPE)); - EXPECT_THAT(close(fds[0]), SyscallSucceeds()); - EXPECT_THAT(close(fds[1]), SyscallSucceeds()); + struct iovec iov; + EXPECT_THAT(preadv(wfd.get(), &iov, 1, 0), SyscallFailsWithErrno(ESPIPE)); + EXPECT_THAT(pwritev(rfd.get(), &iov, 1, 0), SyscallFailsWithErrno(ESPIPE)); } -TEST_F(PipeTest, WriterSideCloses) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - int rfd = fds[0]; - int i = 123; - ScopedThread t([rfd]() { - int j; - ASSERT_THAT(read(rfd, &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j))); +TEST_P(PipeTest, WriterSideCloses) { + SKIP_IF(!CreateBlocking()); + + ScopedThread t([this]() { + int buf = ~kTestValue; + ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)), + SyscallSucceedsWithValue(sizeof(buf))); + EXPECT_EQ(buf, kTestValue); // This will return when the close() completes. - ASSERT_THAT(read(rfd, &j, sizeof(j)), SyscallSucceeds()); + ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)), SyscallSucceeds()); // This will return straight away. - ASSERT_THAT(read(rfd, &j, sizeof(j)), SyscallSucceeds()); + ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)), + SyscallSucceedsWithValue(0)); }); + // Sleep a bit so the thread can block. - absl::SleepFor(absl::Seconds(1.0)); - ASSERT_THAT(write(fds[1], &i, sizeof(i)), - SyscallSucceedsWithValue(sizeof(i))); + absl::SleepFor(syncDelay); + + // Write to unblock. + int buf = kTestValue; + ASSERT_THAT(write(wfd.get(), &buf, sizeof(buf)), + SyscallSucceedsWithValue(sizeof(buf))); + // Sleep a bit so the thread can block again. - absl::SleepFor(absl::Seconds(3.0)); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); - t.Join(); + absl::SleepFor(syncDelay); - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); + // Allow the thread to complete. + ASSERT_THAT(close(wfd.release()), SyscallSucceeds()); + t.Join(); } -TEST_F(PipeTest, WriterSideClosesReadDataFirst) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - int i = 123; - ASSERT_THAT(write(fds[1], &i, sizeof(i)), - SyscallSucceedsWithValue(sizeof(i))); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); - int j; - ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j))); - ASSERT_EQ(j, i); - ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceeds()); - - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); +TEST_P(PipeTest, WriterSideClosesReadDataFirst) { + SKIP_IF(!CreateBlocking()); + + int wbuf = kTestValue; + ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)), + SyscallSucceedsWithValue(sizeof(wbuf))); + ASSERT_THAT(close(wfd.release()), SyscallSucceeds()); + + int rbuf; + ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)), + SyscallSucceedsWithValue(sizeof(rbuf))); + EXPECT_EQ(wbuf, rbuf); + EXPECT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)), + SyscallSucceedsWithValue(0)); } -TEST_F(PipeTest, ReaderSideCloses) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - int i = 123; - ASSERT_THAT(write(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EPIPE)); +TEST_P(PipeTest, ReaderSideCloses) { + SKIP_IF(!CreateBlocking()); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); + ASSERT_THAT(close(rfd.release()), SyscallSucceeds()); + int buf = kTestValue; + EXPECT_THAT(write(wfd.get(), &buf, sizeof(buf)), + SyscallFailsWithErrno(EPIPE)); } -TEST_F(PipeTest, CloseTwice) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); - ASSERT_THAT(close(fds[0]), SyscallFailsWithErrno(EBADF)); - ASSERT_THAT(close(fds[1]), SyscallFailsWithErrno(EBADF)); - - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - ASSERT_THAT(close(fds[0]), SyscallFailsWithErrno(EBADF)); - ASSERT_THAT(close(fds[1]), SyscallFailsWithErrno(EBADF)); +TEST_P(PipeTest, CloseTwice) { + SKIP_IF(!CreateBlocking()); + + int _rfd = rfd.release(); + int _wfd = wfd.release(); + ASSERT_THAT(close(_rfd), SyscallSucceeds()); + ASSERT_THAT(close(_wfd), SyscallSucceeds()); + EXPECT_THAT(close(_rfd), SyscallFailsWithErrno(EBADF)); + EXPECT_THAT(close(_wfd), SyscallFailsWithErrno(EBADF)); } // Blocking write returns EPIPE when read end is closed if nothing has been // written. -TEST_F(PipeTest, BlockWriteClosed) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - int wfd = fds[1]; +TEST_P(PipeTest, BlockWriteClosed) { + SKIP_IF(!CreateBlocking()); absl::Notification notify; - ScopedThread t([wfd, ¬ify]() { - std::vector<char> buf(kPipeSize); + ScopedThread t([this, ¬ify]() { + std::vector<char> buf(Size()); // Exactly fill the pipe buffer. - ASSERT_THAT(WriteFd(wfd, buf.data(), buf.size()), + ASSERT_THAT(WriteFd(wfd.get(), buf.data(), buf.size()), SyscallSucceedsWithValue(buf.size())); notify.Notify(); // Attempt to write one more byte. Blocks. // N.B. Don't use WriteFd, we don't want a retry. - ASSERT_THAT(write(wfd, buf.data(), 1), SyscallFailsWithErrno(EPIPE)); + EXPECT_THAT(write(wfd.get(), buf.data(), 1), SyscallFailsWithErrno(EPIPE)); }); notify.WaitForNotification(); - absl::SleepFor(absl::Seconds(1.0)); - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - + ASSERT_THAT(close(rfd.release()), SyscallSucceeds()); t.Join(); - - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); } // Blocking write returns EPIPE when read end is closed even if something has // been written. -// -// FIXME(b/35924046): Pipe writes blocking early allows S/R to interrupt the -// write(2) call before the buffer is full. Then the next call will will return -// non-zero instead of EPIPE. -TEST_F(PipeTest, BlockPartialWriteClosed_NoRandomSave) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - int wfd = fds[1]; +TEST_P(PipeTest, BlockPartialWriteClosed) { + SKIP_IF(!CreateBlocking()); - ScopedThread t([wfd]() { - std::vector<char> buf(2 * kPipeSize); + ScopedThread t([this]() { + std::vector<char> buf(2 * Size()); // Write more than fits in the buffer. Blocks then returns partial write // when the other end is closed. The next call returns EPIPE. - if (IsRunningOnGvisor()) { - // FIXME(b/35924046): Pipe writes block early on gVisor, resulting in a - // shorter than expected partial write. - ASSERT_THAT(write(wfd, buf.data(), buf.size()), - SyscallSucceedsWithValue(::testing::Gt(0))); - } else { - ASSERT_THAT(write(wfd, buf.data(), buf.size()), - SyscallSucceedsWithValue(kPipeSize)); - } - ASSERT_THAT(write(wfd, buf.data(), buf.size()), + ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()), + SyscallSucceedsWithValue(Size())); + EXPECT_THAT(write(wfd.get(), buf.data(), buf.size()), SyscallFailsWithErrno(EPIPE)); }); // Leave time for write to become blocked. - absl::SleepFor(absl::Seconds(1.0)); - - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); + absl::SleepFor(syncDelay); + // Unblock the above. + ASSERT_THAT(close(rfd.release()), SyscallSucceeds()); t.Join(); - - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); } -TEST_F(PipeTest, ReadFromClosedFd_NoRandomSave) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - int rfd = fds[0]; +TEST_P(PipeTest, ReadFromClosedFd_NoRandomSave) { + SKIP_IF(!CreateBlocking()); + absl::Notification notify; - ScopedThread t([rfd, ¬ify]() { - int f; + ScopedThread t([this, ¬ify]() { notify.Notify(); - ASSERT_THAT(read(rfd, &f, sizeof(f)), SyscallSucceedsWithValue(sizeof(f))); - ASSERT_EQ(123, f); + int buf; + ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)), + SyscallSucceedsWithValue(sizeof(buf))); + ASSERT_EQ(kTestValue, buf); }); notify.WaitForNotification(); + // Make sure that the thread gets to read(). - absl::SleepFor(absl::Seconds(5.0)); + absl::SleepFor(syncDelay); + { // We cannot save/restore here as the read end of pipe is closed but there // is ongoing read() above. We will not be able to restart the read() // successfully in restore run since the read fd is closed. const DisableSave ds; - ASSERT_THAT(close(fds[0]), SyscallSucceeds()); - int i = 123; - ASSERT_THAT(write(fds[1], &i, sizeof(i)), - SyscallSucceedsWithValue(sizeof(i))); + ASSERT_THAT(close(rfd.release()), SyscallSucceeds()); + int buf = kTestValue; + ASSERT_THAT(write(wfd.get(), &buf, sizeof(buf)), + SyscallSucceedsWithValue(sizeof(buf))); t.Join(); } - ASSERT_THAT(close(fds[1]), SyscallSucceeds()); } -TEST_F(PipeTest, FionRead) { - // fds[0] is read end, fds[1] is write end. - int fds[2]; - int data[2] = {0x12345678, 0x9101112}; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); +TEST_P(PipeTest, FionRead) { + SKIP_IF(!CreateBlocking()); - int n = -1; - EXPECT_THAT(ioctl(fds[0], FIONREAD, &n), SyscallSucceedsWithValue(0)); + int n; + ASSERT_THAT(ioctl(rfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0)); EXPECT_EQ(n, 0); - n = -1; - EXPECT_THAT(ioctl(fds[1], FIONREAD, &n), SyscallSucceedsWithValue(0)); + ASSERT_THAT(ioctl(wfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0)); EXPECT_EQ(n, 0); - EXPECT_THAT(write(fds[1], data, sizeof(data)), - SyscallSucceedsWithValue(sizeof(data))); + std::vector<char> buf(Size()); + ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()), + SyscallSucceedsWithValue(buf.size())); - n = -1; - EXPECT_THAT(ioctl(fds[0], FIONREAD, &n), SyscallSucceedsWithValue(0)); - EXPECT_EQ(n, sizeof(data)); - n = -1; - EXPECT_THAT(ioctl(fds[1], FIONREAD, &n), SyscallSucceedsWithValue(0)); - EXPECT_EQ(n, sizeof(data)); + EXPECT_THAT(ioctl(rfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0)); + EXPECT_EQ(n, buf.size()); + EXPECT_THAT(ioctl(wfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0)); + EXPECT_EQ(n, buf.size()); } // Test that opening an empty anonymous pipe RDONLY via /proc/self/fd/N does not // block waiting for a writer. -TEST_F(PipeTest, OpenViaProcSelfFD) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - FileDescriptor rfd(fds[0]); - FileDescriptor wfd(fds[1]); +TEST_P(PipeTest, OpenViaProcSelfFD) { + SKIP_IF(!CreateBlocking()); + SKIP_IF(IsNamedPipe()); // Close the write end of the pipe. - wfd.release(); + ASSERT_THAT(close(wfd.release()), SyscallSucceeds()); // Open other side via /proc/self/fd. It should not block. FileDescriptor proc_self_fd = ASSERT_NO_ERRNO_AND_VALUE( - Open(absl::StrCat("/proc/self/fd/", fds[0]), O_RDONLY)); + Open(absl::StrCat("/proc/self/fd/", rfd.get()), O_RDONLY)); } // Test that opening and reading from an anonymous pipe (with existing writes) // RDONLY via /proc/self/fd/N returns the existing data. -TEST_F(PipeTest, OpenViaProcSelfFDWithWrites) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - FileDescriptor rfd(fds[0]); - FileDescriptor wfd(fds[1]); +TEST_P(PipeTest, OpenViaProcSelfFDWithWrites) { + SKIP_IF(!CreateBlocking()); + SKIP_IF(IsNamedPipe()); // Write to the pipe and then close the write fd. - char data = 'x'; - ASSERT_THAT(write(fds[1], &data, 1), SyscallSucceedsWithValue(1)); - wfd.release(); + int wbuf = kTestValue; + ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)), + SyscallSucceedsWithValue(sizeof(wbuf))); + ASSERT_THAT(close(wfd.release()), SyscallSucceeds()); // Open read side via /proc/self/fd, and read from it. FileDescriptor proc_self_fd = ASSERT_NO_ERRNO_AND_VALUE( - Open(absl::StrCat("/proc/self/fd/", fds[0]), O_RDONLY)); - char got; - ASSERT_THAT(read(proc_self_fd.get(), &got, 1), SyscallSucceedsWithValue(1)); + Open(absl::StrCat("/proc/self/fd/", rfd.get()), O_RDONLY)); + int rbuf; + ASSERT_THAT(read(proc_self_fd.get(), &rbuf, sizeof(rbuf)), + SyscallSucceedsWithValue(sizeof(rbuf))); + EXPECT_EQ(wbuf, rbuf); +} + +// Test that accesses of /proc/<PID>/fd correctly decrement the refcount. +TEST_P(PipeTest, ProcFDReleasesFile) { + SKIP_IF(!CreateBlocking()); - // We should get what we sent. - EXPECT_EQ(got, data); + // Stat the pipe FD, which shouldn't alter the refcount. + struct stat wst; + ASSERT_THAT(lstat(absl::StrCat("/proc/self/fd/", wfd.get()).c_str(), &wst), + SyscallSucceeds()); + + // Close the write end and ensure that read indicates EOF. + wfd.reset(); + char buf; + ASSERT_THAT(read(rfd.get(), &buf, 1), SyscallSucceedsWithValue(0)); } -TEST_F(PipeTest, LargeFile) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - FileDescriptor rfd(fds[0]); - FileDescriptor wfd(fds[1]); +// Same for /proc/<PID>/fdinfo. +TEST_P(PipeTest, ProcFDInfoReleasesFile) { + SKIP_IF(!CreateBlocking()); + + // Stat the pipe FD, which shouldn't alter the refcount. + struct stat wst; + ASSERT_THAT( + lstat(absl::StrCat("/proc/self/fdinfo/", wfd.get()).c_str(), &wst), + SyscallSucceeds()); + + // Close the write end and ensure that read indicates EOF. + wfd.reset(); + char buf; + ASSERT_THAT(read(rfd.get(), &buf, 1), SyscallSucceedsWithValue(0)); +} + +TEST_P(PipeTest, SizeChange) { + SKIP_IF(!CreateBlocking()); + + // Set the minimum possible size. + ASSERT_THAT(fcntl(rfd.get(), F_SETPIPE_SZ, 0), SyscallSucceeds()); + int min = Size(); + EXPECT_GT(min, 0); // Should be rounded up. + + // Set from the read end. + ASSERT_THAT(fcntl(rfd.get(), F_SETPIPE_SZ, min + 1), SyscallSucceeds()); + int med = Size(); + EXPECT_GT(med, min); // Should have grown, may be rounded. + + // Set from the write end. + ASSERT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, med + 1), SyscallSucceeds()); + int max = Size(); + EXPECT_GT(max, med); // Ditto. +} - int rflags; - EXPECT_THAT(rflags = fcntl(rfd.get(), F_GETFL), SyscallSucceeds()); +TEST_P(PipeTest, SizeChangeMax) { + SKIP_IF(!CreateBlocking()); - // The kernel did *not* set O_LARGEFILE. - EXPECT_EQ(rflags, 0); + // Assert there's some maximum. + EXPECT_THAT(fcntl(rfd.get(), F_SETPIPE_SZ, 0x7fffffffffffffff), + SyscallFailsWithErrno(EINVAL)); + EXPECT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, 0x7fffffffffffffff), + SyscallFailsWithErrno(EINVAL)); } -// Test that accesses of /proc/<PID>/fd/<FD> and /proc/<PID>/fdinfo/<FD> -// correctly decrement the refcount of that file descriptor. -TEST_F(PipeTest, ProcFDReleasesFile) { - std::vector<std::string> paths = {"/proc/self/fd/", "/proc/self/fdinfo/"}; - for (const std::string& path : paths) { - int fds[2]; - ASSERT_THAT(pipe(fds), SyscallSucceeds()); - FileDescriptor rfd(fds[0]); - FileDescriptor wfd(fds[1]); - - // Stat the pipe FD, which shouldn't alter the refcount of the write end of - // the pipe. - struct stat wst; - ASSERT_THAT(lstat(absl::StrCat(path.c_str(), wfd.get()).c_str(), &wst), - SyscallSucceeds()); - // Close the write end of the pipe and ensure that read indicates EOF. - wfd.reset(); - char buf; - ASSERT_THAT(read(rfd.get(), &buf, 1), SyscallSucceedsWithValue(0)); +TEST_P(PipeTest, SizeChangeFull) { + SKIP_IF(!CreateBlocking()); + + // Ensure that we adjust to a large enough size to avoid rounding when we + // perform the size decrease. If rounding occurs, we may not actually + // adjust the size and the call below will return success. It was found via + // experimentation that this granularity avoids the rounding for Linux. + constexpr int kDelta = 64 * 1024; + ASSERT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, Size() + kDelta), + SyscallSucceeds()); + + // Fill the buffer and try to change down. + std::vector<char> buf(Size()); + ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()), + SyscallSucceedsWithValue(buf.size())); + EXPECT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, Size() - kDelta), + SyscallFailsWithErrno(EBUSY)); +} + +TEST_P(PipeTest, Streaming) { + SKIP_IF(!CreateBlocking()); + + // We make too many calls to go through full save cycles. + DisableSave ds; + + absl::Notification notify; + ScopedThread t([this, ¬ify]() { + // Don't start until it's full. + notify.WaitForNotification(); + for (int i = 0; i < 2 * Size(); i++) { + int rbuf; + ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)), + SyscallSucceedsWithValue(sizeof(rbuf))); + EXPECT_EQ(rbuf, i); + } + }); + for (int i = 0; i < 2 * Size(); i++) { + int wbuf = i; + ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)), + SyscallSucceedsWithValue(sizeof(wbuf))); + // Did that write just fill up the buffer? Wake up the reader. Once only. + if ((i * sizeof(wbuf)) < Size() && ((i + 1) * sizeof(wbuf)) >= Size()) { + notify.Notify(); + } } } +std::string PipeCreatorName(::testing::TestParamInfo<PipeCreator> info) { + return info.param.name_; // Use the name specified. +} + +INSTANTIATE_TEST_SUITE_P( + Pipes, PipeTest, + ::testing::Values( + PipeCreator{ + "pipe", + [](int fds[2], bool* is_blocking, bool* is_namedpipe) { + ASSERT_THAT(pipe(fds), SyscallSucceeds()); + *is_blocking = true; + *is_namedpipe = false; + }, + }, + PipeCreator{ + "pipe2blocking", + [](int fds[2], bool* is_blocking, bool* is_namedpipe) { + ASSERT_THAT(pipe2(fds, 0), SyscallSucceeds()); + *is_blocking = true; + *is_namedpipe = false; + }, + }, + PipeCreator{ + "pipe2nonblocking", + [](int fds[2], bool* is_blocking, bool* is_namedpipe) { + ASSERT_THAT(pipe2(fds, O_NONBLOCK), SyscallSucceeds()); + *is_blocking = false; + *is_namedpipe = false; + }, + }, + PipeCreator{ + "smallbuffer", + [](int fds[2], bool* is_blocking, bool* is_namedpipe) { + // Set to the minimum available size (will round up). + ASSERT_THAT(pipe(fds), SyscallSucceeds()); + ASSERT_THAT(fcntl(fds[0], F_SETPIPE_SZ, 0), SyscallSucceeds()); + *is_blocking = true; + *is_namedpipe = false; + }, + }, + PipeCreator{ + "namednonblocking", + [](int fds[2], bool* is_blocking, bool* is_namedpipe) { + // Create a new file-based pipe (non-blocking). + auto file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile()); + ASSERT_THAT(unlink(file.path().c_str()), SyscallSucceeds()); + SKIP_IF(mkfifo(file.path().c_str(), 0644) != 0); + fds[0] = open(file.path().c_str(), O_NONBLOCK | O_RDONLY); + fds[1] = open(file.path().c_str(), O_NONBLOCK | O_WRONLY); + MaybeSave(); + *is_blocking = false; + *is_namedpipe = true; + }, + }, + PipeCreator{ + "namedblocking", + [](int fds[2], bool* is_blocking, bool* is_namedpipe) { + // Create a new file-based pipe (blocking). + auto file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile()); + ASSERT_THAT(unlink(file.path().c_str()), SyscallSucceeds()); + SKIP_IF(mkfifo(file.path().c_str(), 0644) != 0); + ScopedThread t([&file, &fds]() { + fds[1] = open(file.path().c_str(), O_WRONLY); + }); + fds[0] = open(file.path().c_str(), O_RDONLY); + t.Join(); + MaybeSave(); + *is_blocking = true; + *is_namedpipe = true; + }, + }), + PipeCreatorName); + } // namespace } // namespace testing } // namespace gvisor |