summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/abi/linux/fcntl.go4
-rw-r--r--pkg/sentry/kernel/pipe/BUILD8
-rw-r--r--pkg/sentry/kernel/pipe/buffer.go90
-rw-r--r--pkg/sentry/kernel/pipe/buffer_test.go32
-rw-r--r--pkg/sentry/kernel/pipe/buffers.go48
-rw-r--r--pkg/sentry/kernel/pipe/node.go7
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go315
-rw-r--r--pkg/sentry/kernel/pipe/pipe_test.go7
-rw-r--r--pkg/sentry/kernel/pipe/reader.go3
-rw-r--r--pkg/sentry/kernel/pipe/reader_writer.go6
-rw-r--r--pkg/sentry/kernel/pipe/writer.go3
-rw-r--r--pkg/sentry/syscalls/linux/sys_file.go14
-rw-r--r--test/syscalls/BUILD6
-rw-r--r--test/syscalls/linux/BUILD2
-rw-r--r--test/syscalls/linux/pipe.cc761
15 files changed, 828 insertions, 478 deletions
diff --git a/pkg/abi/linux/fcntl.go b/pkg/abi/linux/fcntl.go
index cc8f2702d..b30350193 100644
--- a/pkg/abi/linux/fcntl.go
+++ b/pkg/abi/linux/fcntl.go
@@ -17,7 +17,6 @@ package linux
// Comands from linux/fcntl.h.
const (
F_DUPFD = 0
- F_DUPFD_CLOEXEC = 1030
F_GETFD = 1
F_GETFL = 3
F_GETOWN = 9
@@ -26,6 +25,9 @@ const (
F_SETLK = 6
F_SETLKW = 7
F_SETOWN = 8
+ F_DUPFD_CLOEXEC = 1024 + 6
+ F_SETPIPE_SZ = 1024 + 7
+ F_GETPIPE_SZ = 1024 + 8
)
// Flags for fcntl.
diff --git a/pkg/sentry/kernel/pipe/BUILD b/pkg/sentry/kernel/pipe/BUILD
index 6b23117d9..b07d15a2a 100644
--- a/pkg/sentry/kernel/pipe/BUILD
+++ b/pkg/sentry/kernel/pipe/BUILD
@@ -10,16 +10,16 @@ go_template_instance(
prefix = "buffer",
template = "//pkg/ilist:generic_list",
types = {
- "Element": "*Buffer",
- "Linker": "*Buffer",
+ "Element": "*buffer",
+ "Linker": "*buffer",
},
)
go_library(
name = "pipe",
srcs = [
+ "buffer.go",
"buffer_list.go",
- "buffers.go",
"device.go",
"node.go",
"pipe.go",
@@ -37,6 +37,7 @@ go_library(
"//pkg/sentry/device",
"//pkg/sentry/fs",
"//pkg/sentry/fs/fsutil",
+ "//pkg/sentry/safemem",
"//pkg/sentry/usermem",
"//pkg/syserror",
"//pkg/waiter",
@@ -47,6 +48,7 @@ go_test(
name = "pipe_test",
size = "small",
srcs = [
+ "buffer_test.go",
"node_test.go",
"pipe_test.go",
],
diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go
new file mode 100644
index 000000000..4360dc44f
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/buffer.go
@@ -0,0 +1,90 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "sync"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/safemem"
+)
+
+// buffer encapsulates a queueable byte buffer.
+//
+// Note that the total size is slightly less than two pages. This
+// is done intentionally to ensure that the buffer object aligns
+// with runtime internals. We have no hard size or alignment
+// requirements. This two page size will effectively minimize
+// internal fragmentation, but still have a large enough chunk
+// to limit excessive segmentation.
+//
+// +stateify savable
+type buffer struct {
+ data [8144]byte
+ read int
+ write int
+ bufferEntry
+}
+
+// Reset resets internal data.
+//
+// This must be called before use.
+func (b *buffer) Reset() {
+ b.read = 0
+ b.write = 0
+}
+
+// Empty indicates the buffer is empty.
+//
+// This indicates there is no data left to read.
+func (b *buffer) Empty() bool {
+ return b.read == b.write
+}
+
+// Full indicates the buffer is full.
+//
+// This indicates there is no capacity left to write.
+func (b *buffer) Full() bool {
+ return b.write == len(b.data)
+}
+
+// WriteFromBlocks implements safemem.Writer.WriteFromBlocks.
+func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) {
+ dst := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.write:]))
+ n, err := safemem.CopySeq(dst, srcs)
+ b.write += int(n)
+ return n, err
+}
+
+// ReadToBlocks implements safemem.Reader.ReadToBlocks.
+func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
+ src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write]))
+ n, err := safemem.CopySeq(dsts, src)
+ b.read += int(n)
+ return n, err
+}
+
+// bufferPool is a pool for buffers.
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(buffer)
+ },
+}
+
+// newBuffer grabs a new buffer from the pool.
+func newBuffer() *buffer {
+ b := bufferPool.Get().(*buffer)
+ b.Reset()
+ return b
+}
diff --git a/pkg/sentry/kernel/pipe/buffer_test.go b/pkg/sentry/kernel/pipe/buffer_test.go
new file mode 100644
index 000000000..4b7dbc43f
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/buffer_test.go
@@ -0,0 +1,32 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "testing"
+ "unsafe"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+)
+
+func TestBufferSize(t *testing.T) {
+ bufferSize := unsafe.Sizeof(buffer{})
+ if bufferSize < usermem.PageSize {
+ t.Errorf("buffer is less than a page")
+ }
+ if bufferSize > (2 * usermem.PageSize) {
+ t.Errorf("buffer is greater than two pages")
+ }
+}
diff --git a/pkg/sentry/kernel/pipe/buffers.go b/pkg/sentry/kernel/pipe/buffers.go
deleted file mode 100644
index ba53fd482..000000000
--- a/pkg/sentry/kernel/pipe/buffers.go
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-// Buffer encapsulates a queueable byte buffer that can
-// easily be truncated. It is designed only for use with pipes.
-//
-// +stateify savable
-type Buffer struct {
- bufferEntry
- data []byte
-}
-
-// newBuffer initializes a Buffer.
-func newBuffer(buf []byte) *Buffer {
- return &Buffer{data: buf}
-}
-
-// bytes returns the bytes contained in the buffer.
-func (b *Buffer) bytes() []byte {
- return b.data
-}
-
-// size returns the number of bytes contained in the buffer.
-func (b *Buffer) size() int {
- return len(b.data)
-}
-
-// truncate removes the first n bytes from the buffer.
-func (b *Buffer) truncate(n int) int {
- if n > len(b.data) {
- panic("Trying to truncate past end of array.")
- }
- b.data = b.data[n:]
- return len(b.data)
-}
diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go
index 7c3739360..926c4c623 100644
--- a/pkg/sentry/kernel/pipe/node.go
+++ b/pkg/sentry/kernel/pipe/node.go
@@ -67,7 +67,6 @@ func NewInodeOperations(ctx context.Context, perms fs.FilePermissions, p *Pipe)
InodeSimpleAttributes: fsutil.NewInodeSimpleAttributes(ctx, fs.FileOwnerFromContext(ctx), perms, linux.PIPEFS_MAGIC),
p: p,
}
-
}
// GetFile implements fs.InodeOperations.GetFile. Named pipes have special blocking
@@ -87,7 +86,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
switch {
case flags.Read && !flags.Write: // O_RDONLY.
- r := i.p.ROpen(ctx)
+ r := i.p.Open(ctx, flags)
i.newHandleLocked(&i.rWakeup)
if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() {
@@ -103,7 +102,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
return r, nil
case flags.Write && !flags.Read: // O_WRONLY.
- w := i.p.WOpen(ctx)
+ w := i.p.Open(ctx, flags)
i.newHandleLocked(&i.wWakeup)
if i.p.isNamed && !i.p.HasReaders() {
@@ -123,7 +122,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
case flags.Read && flags.Write: // O_RDWR.
// Pipes opened for read-write always succeeds without blocking.
- rw := i.p.RWOpen(ctx)
+ rw := i.p.Open(ctx, flags)
i.newHandleLocked(&i.rWakeup)
i.newHandleLocked(&i.wWakeup)
return rw, nil
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
index bd7649d2f..b65204492 100644
--- a/pkg/sentry/kernel/pipe/pipe.go
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -12,11 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Package pipe provides an in-memory implementation of a unidirectional
-// pipe.
-//
-// The goal of this pipe is to emulate the pipe syscall in all of its
-// edge cases and guarantees of atomic IO.
+// Package pipe provides a pipe implementation.
package pipe
import (
@@ -32,8 +28,29 @@ import (
"gvisor.googlesource.com/gvisor/pkg/waiter"
)
-// DefaultPipeSize is the system-wide default size of a pipe in bytes.
-const DefaultPipeSize = 65536
+const (
+ // MinimumPipeSize is a hard limit of the minimum size of a pipe.
+ MinimumPipeSize = 64 << 10
+
+ // DefaultPipeSize is the system-wide default size of a pipe in bytes.
+ DefaultPipeSize = MinimumPipeSize
+
+ // MaximumPipeSize is a hard limit on the maximum size of a pipe.
+ MaximumPipeSize = 8 << 20
+)
+
+// Sizer is an interface for setting and getting the size of a pipe.
+//
+// It is implemented by Pipe and, through embedding, all other types.
+type Sizer interface {
+ // PipeSize returns the pipe capacity in bytes.
+ PipeSize() int64
+
+ // SetPipeSize sets the new pipe capacity in bytes.
+ //
+ // The new size is returned (which may be capped).
+ SetPipeSize(int64) (int64, error)
+}
// Pipe is an encapsulation of a platform-independent pipe.
// It manages a buffered byte queue shared between a reader/writer
@@ -43,49 +60,76 @@ const DefaultPipeSize = 65536
type Pipe struct {
waiter.Queue `state:"nosave"`
- // Whether this is a named or anonymous pipe.
+ // isNamed indicates whether this is a named pipe.
+ //
+ // This value is immutable.
isNamed bool
+ // atomicIOBytes is the maximum number of bytes that the pipe will
+ // guarantee atomic reads or writes atomically.
+ //
+ // This value is immutable.
+ atomicIOBytes int64
+
// The dirent backing this pipe. Shared by all readers and writers.
+ //
+ // This value is immutable.
Dirent *fs.Dirent
- // The buffered byte queue.
- data bufferList
+ // The number of active readers for this pipe.
+ //
+ // Access atomically.
+ readers int32
- // Max size of the pipe in bytes. When this max has been reached,
- // writers will get EWOULDBLOCK.
- max int
+ // The number of active writes for this pipe.
+ //
+ // Access atomically.
+ writers int32
- // Current size of the pipe in bytes.
- size int
+ // mu protects all pipe internal state below.
+ mu sync.Mutex `state:"nosave"`
- // Max number of bytes the pipe can guarantee to read or write
- // atomically.
- atomicIOBytes int
+ // data is the buffer queue of pipe contents.
+ //
+ // This is protected by mu.
+ data bufferList
- // The number of active readers for this pipe. Load/store atomically.
- readers int32
+ // max is the maximum size of the pipe in bytes. When this max has been
+ // reached, writers will get EWOULDBLOCK.
+ //
+ // This is protected by mu.
+ max int64
- // The number of active writes for this pipe. Load/store atomically.
- writers int32
+ // size is the current size of the pipe in bytes.
+ //
+ // This is protected by mu.
+ size int64
- // This flag indicates if this pipe ever had a writer. Note that this does
- // not necessarily indicate there is *currently* a writer, just that there
- // has been a writer at some point since the pipe was created.
+ // hadWriter indicates if this pipe ever had a writer. Note that this
+ // does not necessarily indicate there is *currently* a writer, just
+ // that there has been a writer at some point since the pipe was
+ // created.
//
- // Protected by mu.
+ // This is protected by mu.
hadWriter bool
-
- // Lock protecting all pipe internal state.
- mu sync.Mutex `state:"nosave"`
}
-// NewPipe initializes and returns a pipe. A pipe created by this function is
-// persistent, and will remain valid even without any open fds to it. Named
-// pipes for mknod(2) are created via this function. Note that the
-// implementation of blocking semantics for opening the read and write ends of a
-// named pipe are left to filesystems.
-func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe {
+// NewPipe initializes and returns a pipe.
+//
+// N.B. The size and atomicIOBytes will be bounded.
+func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe {
+ if sizeBytes < MinimumPipeSize {
+ sizeBytes = MinimumPipeSize
+ }
+ if sizeBytes > MaximumPipeSize {
+ sizeBytes = MaximumPipeSize
+ }
+ if atomicIOBytes <= 0 {
+ atomicIOBytes = 1
+ }
+ if atomicIOBytes > sizeBytes {
+ atomicIOBytes = sizeBytes
+ }
p := &Pipe{
isNamed: isNamed,
max: sizeBytes,
@@ -110,48 +154,45 @@ func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *P
return p
}
-// NewConnectedPipe initializes a pipe and returns a pair of objects (which
-// implement kio.File) representing the read and write ends of the pipe. A pipe
-// created by this function becomes invalid as soon as either the read or write
-// end is closed, and errors on subsequent operations on either end. Pipes
-// for pipe(2) and pipe2(2) are generally created this way.
-func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) {
+// NewConnectedPipe initializes a pipe and returns a pair of objects
+// representing the read and write ends of the pipe.
+func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs.File, *fs.File) {
p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes)
- return p.ROpen(ctx), p.WOpen(ctx)
-}
-
-// ROpen opens the pipe for reading.
-func (p *Pipe) ROpen(ctx context.Context) *fs.File {
- p.rOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true}, &Reader{
- ReaderWriter: ReaderWriter{Pipe: p},
- })
-}
-
-// WOpen opens the pipe for writing.
-func (p *Pipe) WOpen(ctx context.Context) *fs.File {
- p.wOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Write: true}, &Writer{
- ReaderWriter: ReaderWriter{Pipe: p},
- })
+ return p.Open(ctx, fs.FileFlags{Read: true}), p.Open(ctx, fs.FileFlags{Write: true})
}
-// RWOpen opens the pipe for both reading and writing.
-func (p *Pipe) RWOpen(ctx context.Context) *fs.File {
- p.rOpen()
- p.wOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{
- Pipe: p,
- })
+// Open opens the pipe and returns a new file.
+//
+// Precondition: at least one of flags.Read or flags.Write must be set.
+func (p *Pipe) Open(ctx context.Context, flags fs.FileFlags) *fs.File {
+ switch {
+ case flags.Read && flags.Write:
+ p.rOpen()
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &ReaderWriter{
+ Pipe: p,
+ })
+ case flags.Read:
+ p.rOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Reader{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ case flags.Write:
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Writer{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ default:
+ // Precondition violated.
+ panic("invalid pipe flags")
+ }
}
// read reads data from the pipe into dst and returns the number of bytes
// read, or returns ErrWouldBlock if the pipe is empty.
+//
+// Precondition: this pipe must have readers.
func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
- if !p.HasReaders() {
- return 0, syscall.EBADF
- }
-
// Don't block for a zero-length read even if the pipe is empty.
if dst.NumBytes() == 0 {
return 0, nil
@@ -159,8 +200,8 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
p.mu.Lock()
defer p.mu.Unlock()
- // If there is nothing to read at the moment but there is a writer, tell the
- // caller to block.
+
+ // Is the pipe empty?
if p.size == 0 {
if !p.HasWriters() {
// There are no writers, return EOF.
@@ -168,64 +209,94 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
}
return 0, syserror.ErrWouldBlock
}
- var n int64
- for buffer := p.data.Front(); buffer != nil; buffer = p.data.Front() {
- n0, err := dst.CopyOut(ctx, buffer.bytes())
- n += int64(n0)
- p.size -= n0
- if buffer.truncate(n0) == 0 {
- p.data.Remove(buffer)
+
+ // Limit how much we consume.
+ if dst.NumBytes() > p.size {
+ dst = dst.TakeFirst64(p.size)
+ }
+
+ done := int64(0)
+ for dst.NumBytes() > 0 {
+ // Pop the first buffer.
+ first := p.data.Front()
+ if first == nil {
+ break
}
- dst = dst.DropFirst(n0)
- if dst.NumBytes() == 0 || err != nil {
- return n, err
+
+ // Copy user data.
+ n, err := dst.CopyOutFrom(ctx, first)
+ done += int64(n)
+ p.size -= n
+ dst = dst.DropFirst64(n)
+
+ // Empty buffer?
+ if first.Empty() {
+ // Push to the free list.
+ p.data.Remove(first)
+ bufferPool.Put(first)
+ }
+
+ // Handle errors.
+ if err != nil {
+ return done, err
}
}
- return n, nil
+
+ return done, nil
}
// write writes data from sv into the pipe and returns the number of bytes
// written. If no bytes are written because the pipe is full (or has less than
// atomicIOBytes free capacity), write returns ErrWouldBlock.
+//
+// Precondition: this pipe must have writers.
func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
- if !p.HasWriters() {
- return 0, syscall.EBADF
- }
+ // Can't write to a pipe with no readers.
if !p.HasReaders() {
return 0, syscall.EPIPE
}
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
- // atomic, but requires no atomicity for writes larger than this. However,
- // Linux appears to provide stronger semantics than this in practice:
- // unmerged writes are done one PAGE_SIZE buffer at a time, so for larger
- // writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement
- // this by writing at most atomicIOBytes at a time if we can't service the
- // write in its entirety.
- canWrite := src.NumBytes()
- if canWrite > int64(p.max-p.size) {
- if p.max-p.size >= p.atomicIOBytes {
- canWrite = int64(p.atomicIOBytes)
- } else {
+ // atomic, but requires no atomicity for writes larger than this.
+ wanted := src.NumBytes()
+ if avail := p.max - p.size; wanted > avail {
+ if wanted <= p.atomicIOBytes {
return 0, syserror.ErrWouldBlock
}
+ // Limit to the available capacity.
+ src = src.TakeFirst64(avail)
}
- // Copy data from user memory into a pipe-owned buffer.
- buf := make([]byte, canWrite)
- n, err := src.CopyIn(ctx, buf)
- if n > 0 {
- p.data.PushBack(newBuffer(buf[:n]))
+ done := int64(0)
+ for src.NumBytes() > 0 {
+ // Need a new buffer?
+ last := p.data.Back()
+ if last == nil || last.Full() {
+ // Add a new buffer to the data list.
+ last = newBuffer()
+ p.data.PushBack(last)
+ }
+
+ // Copy user data.
+ n, err := src.CopyInTo(ctx, last)
+ done += int64(n)
p.size += n
+ src = src.DropFirst64(n)
+
+ // Handle errors.
+ if err != nil {
+ return done, err
+ }
}
- if int64(n) < src.NumBytes() && err == nil {
+ if wanted > done {
// Partial write due to full pipe.
- err = syserror.ErrWouldBlock
+ return done, syserror.ErrWouldBlock
}
- return int64(n), err
+
+ return done, nil
}
// rOpen signals a new reader of the pipe.
@@ -267,6 +338,9 @@ func (p *Pipe) HasWriters() bool {
return atomic.LoadInt32(&p.writers) > 0
}
+// rReadinessLocked calculates the read readiness.
+//
+// Precondition: mu must be held.
func (p *Pipe) rReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasReaders() && p.data.Front() != nil {
@@ -290,6 +364,9 @@ func (p *Pipe) rReadiness() waiter.EventMask {
return p.rReadinessLocked()
}
+// wReadinessLocked calculates the write readiness.
+//
+// Precondition: mu must be held.
func (p *Pipe) wReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasWriters() && p.size < p.max {
@@ -317,8 +394,36 @@ func (p *Pipe) rwReadiness() waiter.EventMask {
return p.rReadinessLocked() | p.wReadinessLocked()
}
-func (p *Pipe) queuedSize() int {
+// queued returns the amount of queued data.
+func (p *Pipe) queued() int64 {
p.mu.Lock()
defer p.mu.Unlock()
return p.size
}
+
+// PipeSize implements PipeSizer.PipeSize.
+func (p *Pipe) PipeSize() int64 {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.max
+}
+
+// SetPipeSize implements PipeSize.SetPipeSize.
+func (p *Pipe) SetPipeSize(size int64) (int64, error) {
+ if size < 0 {
+ return 0, syserror.EINVAL
+ }
+ if size < MinimumPipeSize {
+ size = MinimumPipeSize // Per spec.
+ }
+ if size > MaximumPipeSize {
+ return 0, syserror.EPERM
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if size < p.size {
+ return 0, syserror.EBUSY
+ }
+ p.max = size
+ return size, nil
+}
diff --git a/pkg/sentry/kernel/pipe/pipe_test.go b/pkg/sentry/kernel/pipe/pipe_test.go
index de340c40c..298c6587b 100644
--- a/pkg/sentry/kernel/pipe/pipe_test.go
+++ b/pkg/sentry/kernel/pipe/pipe_test.go
@@ -58,15 +58,16 @@ func TestPipeReadBlock(t *testing.T) {
func TestPipeWriteBlock(t *testing.T) {
const atomicIOBytes = 2
+ const capacity = MinimumPipeSize
ctx := contexttest.Context(t)
- r, w := NewConnectedPipe(ctx, 10, atomicIOBytes)
+ r, w := NewConnectedPipe(ctx, capacity, atomicIOBytes)
defer r.DecRef()
defer w.DecRef()
- msg := []byte("here's some bytes")
+ msg := make([]byte, capacity+1)
n, err := w.Writev(ctx, usermem.BytesIOSequence(msg))
- if wantN, wantErr := int64(atomicIOBytes), syserror.ErrWouldBlock; n != wantN || err != wantErr {
+ if wantN, wantErr := int64(capacity), syserror.ErrWouldBlock; n != wantN || err != wantErr {
t.Fatalf("Writev: got (%d, %v), wanted (%d, %v)", n, err, wantN, wantErr)
}
}
diff --git a/pkg/sentry/kernel/pipe/reader.go b/pkg/sentry/kernel/pipe/reader.go
index 48fab45d1..656be824d 100644
--- a/pkg/sentry/kernel/pipe/reader.go
+++ b/pkg/sentry/kernel/pipe/reader.go
@@ -27,8 +27,11 @@ type Reader struct {
}
// Release implements fs.FileOperations.Release.
+//
+// This overrides ReaderWriter.Release.
func (r *Reader) Release() {
r.Pipe.rClose()
+
// Wake up writers.
r.Pipe.Notify(waiter.EventOut)
}
diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go
index 59899be49..e560b9be9 100644
--- a/pkg/sentry/kernel/pipe/reader_writer.go
+++ b/pkg/sentry/kernel/pipe/reader_writer.go
@@ -15,7 +15,6 @@
package pipe
import (
- "fmt"
"math"
"syscall"
@@ -49,6 +48,7 @@ type ReaderWriter struct {
func (rw *ReaderWriter) Release() {
rw.Pipe.rClose()
rw.Pipe.wClose()
+
// Wake up readers and writers.
rw.Pipe.Notify(waiter.EventIn | waiter.EventOut)
}
@@ -81,9 +81,9 @@ func (rw *ReaderWriter) Ioctl(ctx context.Context, io usermem.IO, args arch.Sysc
// Switch on ioctl request.
switch int(args[1].Int()) {
case linux.FIONREAD:
- v := rw.queuedSize()
+ v := rw.queued()
if v > math.MaxInt32 {
- panic(fmt.Sprintf("Impossibly large pipe queued size: %d", v))
+ v = math.MaxInt32 // Silently truncate.
}
// Copy result to user-space.
_, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{
diff --git a/pkg/sentry/kernel/pipe/writer.go b/pkg/sentry/kernel/pipe/writer.go
index 0f29fbc43..8d5b68541 100644
--- a/pkg/sentry/kernel/pipe/writer.go
+++ b/pkg/sentry/kernel/pipe/writer.go
@@ -27,8 +27,11 @@ type Writer struct {
}
// Release implements fs.FileOperations.Release.
+//
+// This overrides ReaderWriter.Release.
func (w *Writer) Release() {
w.Pipe.wClose()
+
// Wake up readers.
w.Pipe.Notify(waiter.EventHUp)
}
diff --git a/pkg/sentry/syscalls/linux/sys_file.go b/pkg/sentry/syscalls/linux/sys_file.go
index 8a80cd430..19f579930 100644
--- a/pkg/sentry/syscalls/linux/sys_file.go
+++ b/pkg/sentry/syscalls/linux/sys_file.go
@@ -27,6 +27,7 @@ import (
"gvisor.googlesource.com/gvisor/pkg/sentry/kernel/auth"
"gvisor.googlesource.com/gvisor/pkg/sentry/kernel/fasync"
"gvisor.googlesource.com/gvisor/pkg/sentry/kernel/kdefs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/pipe"
ktime "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/time"
"gvisor.googlesource.com/gvisor/pkg/sentry/limits"
"gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
@@ -943,6 +944,19 @@ func Fcntl(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Syscall
}
err := tmpfs.AddSeals(file.Dirent.Inode, args[2].Uint())
return 0, nil, err
+ case linux.F_GETPIPE_SZ:
+ sz, ok := file.FileOperations.(pipe.Sizer)
+ if !ok {
+ return 0, nil, syserror.EINVAL
+ }
+ return uintptr(sz.PipeSize()), nil, nil
+ case linux.F_SETPIPE_SZ:
+ sz, ok := file.FileOperations.(pipe.Sizer)
+ if !ok {
+ return 0, nil, syserror.EINVAL
+ }
+ n, err := sz.SetPipeSize(int64(args[2].Int()))
+ return uintptr(n), nil, err
default:
// Everything else is not yet supported.
return 0, nil, syserror.EINVAL
diff --git a/test/syscalls/BUILD b/test/syscalls/BUILD
index b531d7629..0d6b6ccc7 100644
--- a/test/syscalls/BUILD
+++ b/test/syscalls/BUILD
@@ -191,7 +191,11 @@ syscall_test(test = "//test/syscalls/linux:partial_bad_buffer_test")
syscall_test(test = "//test/syscalls/linux:pause_test")
-syscall_test(test = "//test/syscalls/linux:pipe_test")
+syscall_test(
+ size = "large",
+ shard_count = 5,
+ test = "//test/syscalls/linux:pipe_test",
+)
syscall_test(test = "//test/syscalls/linux:poll_test")
diff --git a/test/syscalls/linux/BUILD b/test/syscalls/linux/BUILD
index d4e49bb3a..4e239617b 100644
--- a/test/syscalls/linux/BUILD
+++ b/test/syscalls/linux/BUILD
@@ -1237,6 +1237,8 @@ cc_binary(
linkstatic = 1,
deps = [
"//test/util:file_descriptor",
+ "//test/util:posix_error",
+ "//test/util:temp_path",
"//test/util:test_main",
"//test/util:test_util",
"//test/util:thread_util",
diff --git a/test/syscalls/linux/pipe.cc b/test/syscalls/linux/pipe.cc
index 8698295b3..bce351e08 100644
--- a/test/syscalls/linux/pipe.cc
+++ b/test/syscalls/linux/pipe.cc
@@ -26,6 +26,8 @@
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "test/util/file_descriptor.h"
+#include "test/util/posix_error.h"
+#include "test/util/temp_path.h"
#include "test/util/test_util.h"
#include "test/util/thread_util.h"
@@ -34,449 +36,588 @@ namespace testing {
namespace {
-// Buffer size of a pipe.
-//
-// TODO(b/35762278): Get this from F_GETPIPE_SZ.
-constexpr int kPipeSize = 65536;
+// Used as a non-zero sentinel value, below.
+constexpr int kTestValue = 0x12345678;
+
+// Used for synchronization in race tests.
+const absl::Duration syncDelay = absl::Seconds(2);
+
+struct PipeCreator {
+ std::string name_;
+
+ // void (fds, is_blocking, is_namedpipe).
+ std::function<void(int[2], bool*, bool*)> create_;
+};
+
+class PipeTest : public ::testing::TestWithParam<PipeCreator> {
+ protected:
+ FileDescriptor rfd;
+ FileDescriptor wfd;
-class PipeTest : public ::testing::Test {
public:
static void SetUpTestCase() {
// Tests intentionally generate SIGPIPE.
TEST_PCHECK(signal(SIGPIPE, SIG_IGN) != SIG_ERR);
}
+ // Initializes rfd and wfd as a blocking pipe.
+ //
+ // The return value indicates success: the test should be skipped otherwise.
+ bool CreateBlocking() { return create(true); }
+
+ // Initializes rfd and wfd as a non-blocking pipe.
+ //
+ // The return value is per CreateBlocking.
+ bool CreateNonBlocking() { return create(false); }
+
+ // Returns true iff the pipe represents a named pipe.
+ bool IsNamedPipe() { return namedpipe_; }
+
+ int Size() {
+ int s1 = fcntl(rfd.get(), F_GETPIPE_SZ);
+ int s2 = fcntl(wfd.get(), F_GETPIPE_SZ);
+ EXPECT_GT(s1, 0);
+ EXPECT_GT(s2, 0);
+ EXPECT_EQ(s1, s2);
+ return s1;
+ }
+
static void TearDownTestCase() {
TEST_PCHECK(signal(SIGPIPE, SIG_DFL) != SIG_ERR);
}
+
+ private:
+ bool namedpipe_ = false;
+
+ bool create(bool wants_blocking) {
+ // Generate the pipe.
+ int fds[2] = {-1, -1};
+ bool is_blocking = false;
+ GetParam().create_(fds, &is_blocking, &namedpipe_);
+ if (fds[0] < 0 || fds[1] < 0) {
+ return false;
+ }
+
+ // Save descriptors.
+ rfd.reset(fds[0]);
+ wfd.reset(fds[1]);
+
+ // Adjust blocking, if needed.
+ if (!is_blocking && wants_blocking) {
+ // Clear the blocking flag.
+ EXPECT_THAT(fcntl(fds[0], F_SETFL, 0), SyscallSucceeds());
+ EXPECT_THAT(fcntl(fds[1], F_SETFL, 0), SyscallSucceeds());
+ } else if (is_blocking && !wants_blocking) {
+ // Set the descriptors to blocking.
+ EXPECT_THAT(fcntl(fds[0], F_SETFL, O_NONBLOCK), SyscallSucceeds());
+ EXPECT_THAT(fcntl(fds[1], F_SETFL, O_NONBLOCK), SyscallSucceeds());
+ }
+
+ return true;
+ }
};
-TEST_F(PipeTest, Basic) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
+TEST_P(PipeTest, Inode) {
+ SKIP_IF(!CreateBlocking());
// Ensure that the inode number is the same for each end.
struct stat rst;
- ASSERT_THAT(fstat(fds[0], &rst), SyscallSucceeds());
+ ASSERT_THAT(fstat(rfd.get(), &rst), SyscallSucceeds());
struct stat wst;
- ASSERT_THAT(fstat(fds[1], &wst), SyscallSucceeds());
+ ASSERT_THAT(fstat(wfd.get(), &wst), SyscallSucceeds());
EXPECT_EQ(rst.st_ino, wst.st_ino);
-
- ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
-
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
-
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceeds());
- ASSERT_THAT(fcntl(fds[1], F_GETFL), SyscallSucceedsWithValue(O_WRONLY));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
}
-TEST_F(PipeTest, BasicCloExec) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe2(fds, O_CLOEXEC), SyscallSucceeds());
+TEST_P(PipeTest, Permissions) {
+ SKIP_IF(!CreateBlocking());
- ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
+ // Attempt bad operations.
+ int buf = kTestValue;
+ ASSERT_THAT(write(rfd.get(), &buf, sizeof(buf)),
+ SyscallFailsWithErrno(EBADF));
+ EXPECT_THAT(read(wfd.get(), &buf, sizeof(buf)), SyscallFailsWithErrno(EBADF));
+}
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
+TEST_P(PipeTest, Flags) {
+ SKIP_IF(!CreateBlocking());
+
+ if (IsNamedPipe()) {
+ // May be stubbed to zero; define locally.
+ constexpr int kLargefile = 0100000;
+ EXPECT_THAT(fcntl(rfd.get(), F_GETFL),
+ SyscallSucceedsWithValue(kLargefile | O_RDONLY));
+ EXPECT_THAT(fcntl(wfd.get(), F_GETFL),
+ SyscallSucceedsWithValue(kLargefile | O_WRONLY));
+ } else {
+ EXPECT_THAT(fcntl(rfd.get(), F_GETFL), SyscallSucceedsWithValue(O_RDONLY));
+ EXPECT_THAT(fcntl(wfd.get(), F_GETFL), SyscallSucceedsWithValue(O_WRONLY));
+ }
+}
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceeds());
- ASSERT_THAT(fcntl(fds[1], F_GETFL), SyscallSucceeds());
+TEST_P(PipeTest, Write) {
+ SKIP_IF(!CreateBlocking());
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+ int wbuf = kTestValue;
+ int rbuf = ~kTestValue;
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+ ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(wbuf, rbuf);
}
-TEST_F(PipeTest, BasicNoBlock) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe2(fds, O_NONBLOCK), SyscallSucceeds());
-
- ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
-
- ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK));
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
- ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK));
-
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceedsWithValue(O_NONBLOCK));
- ASSERT_THAT(fcntl(fds[1], F_GETFL),
- SyscallSucceedsWithValue(O_NONBLOCK | O_WRONLY));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+TEST_P(PipeTest, NonBlocking) {
+ SKIP_IF(!CreateNonBlocking());
+
+ int wbuf = kTestValue;
+ int rbuf = ~kTestValue;
+ EXPECT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallFailsWithErrno(EWOULDBLOCK));
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+
+ ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(wbuf, rbuf);
+ EXPECT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallFailsWithErrno(EWOULDBLOCK));
}
-TEST_F(PipeTest, BasicBothOptions) {
- // fds[0] is read end, fds[1] is write end.
+TEST(Pipe2Test, CloExec) {
int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe2(fds, O_NONBLOCK | O_CLOEXEC), SyscallSucceeds());
-
- ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
-
- ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK));
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
- ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK));
-
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceedsWithValue(O_NONBLOCK));
- ASSERT_THAT(fcntl(fds[1], F_GETFL),
- SyscallSucceedsWithValue(O_NONBLOCK | O_WRONLY));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+ ASSERT_THAT(pipe2(fds, O_CLOEXEC), SyscallSucceeds());
+ EXPECT_THAT(fcntl(fds[0], F_GETFD), SyscallSucceedsWithValue(FD_CLOEXEC));
+ EXPECT_THAT(fcntl(fds[1], F_GETFD), SyscallSucceedsWithValue(FD_CLOEXEC));
+ EXPECT_THAT(close(fds[0]), SyscallSucceeds());
+ EXPECT_THAT(close(fds[1]), SyscallSucceeds());
}
-TEST_F(PipeTest, BasicBadOptions) {
+TEST(Pipe2Test, BadOptions) {
int fds[2];
- ASSERT_THAT(pipe2(fds, 0xDEAD), SyscallFailsWithErrno(EINVAL));
+ EXPECT_THAT(pipe2(fds, 0xDEAD), SyscallFailsWithErrno(EINVAL));
}
-TEST_F(PipeTest, Seek) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
-
- ASSERT_THAT(lseek(fds[0], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
-
- ASSERT_THAT(lseek(fds[0], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
-
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
-
- ASSERT_THAT(lseek(fds[0], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
-
- ASSERT_THAT(lseek(fds[0], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
-
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
-
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceeds());
- ASSERT_THAT(fcntl(fds[1], F_GETFL), SyscallSucceedsWithValue(O_WRONLY));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+TEST_P(PipeTest, Seek) {
+ SKIP_IF(!CreateBlocking());
+
+ for (int i = 0; i < 4; i++) {
+ // Attempt absolute seeks.
+ EXPECT_THAT(lseek(rfd.get(), 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(rfd.get(), 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
+
+ // Attempt relative seeks.
+ EXPECT_THAT(lseek(rfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(rfd.get(), 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+
+ // Attempt end-of-file seeks.
+ EXPECT_THAT(lseek(rfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(rfd.get(), -4, SEEK_END), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), -4, SEEK_END), SyscallFailsWithErrno(ESPIPE));
+
+ // Add some more data to the pipe.
+ int buf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
+ }
}
-TEST_F(PipeTest, AbsoluteOffsetSyscallsFail) {
- // Syscalls for IO at absolute offsets fail because pipes are not seekable.
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
-
- std::vector<char> buf(4096);
- struct iovec iov;
+TEST_P(PipeTest, OffsetCalls) {
+ SKIP_IF(!CreateBlocking());
- EXPECT_THAT(pread(fds[1], buf.data(), buf.size(), 0),
+ int buf;
+ EXPECT_THAT(pread(wfd.get(), &buf, sizeof(buf), 0),
SyscallFailsWithErrno(ESPIPE));
- EXPECT_THAT(pwrite(fds[0], buf.data(), buf.size(), 0),
+ EXPECT_THAT(pwrite(rfd.get(), &buf, sizeof(buf), 0),
SyscallFailsWithErrno(ESPIPE));
- EXPECT_THAT(preadv(fds[1], &iov, 1, 0), SyscallFailsWithErrno(ESPIPE));
- EXPECT_THAT(pwritev(fds[0], &iov, 1, 0), SyscallFailsWithErrno(ESPIPE));
- EXPECT_THAT(close(fds[0]), SyscallSucceeds());
- EXPECT_THAT(close(fds[1]), SyscallSucceeds());
+ struct iovec iov;
+ EXPECT_THAT(preadv(wfd.get(), &iov, 1, 0), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(pwritev(rfd.get(), &iov, 1, 0), SyscallFailsWithErrno(ESPIPE));
}
-TEST_F(PipeTest, WriterSideCloses) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int rfd = fds[0];
- int i = 123;
- ScopedThread t([rfd]() {
- int j;
- ASSERT_THAT(read(rfd, &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
+TEST_P(PipeTest, WriterSideCloses) {
+ SKIP_IF(!CreateBlocking());
+
+ ScopedThread t([this]() {
+ int buf = ~kTestValue;
+ ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
+ EXPECT_EQ(buf, kTestValue);
// This will return when the close() completes.
- ASSERT_THAT(read(rfd, &j, sizeof(j)), SyscallSucceeds());
+ ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)), SyscallSucceeds());
// This will return straight away.
- ASSERT_THAT(read(rfd, &j, sizeof(j)), SyscallSucceeds());
+ ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(0));
});
+
// Sleep a bit so the thread can block.
- absl::SleepFor(absl::Seconds(1.0));
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
+ absl::SleepFor(syncDelay);
+
+ // Write to unblock.
+ int buf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
+
// Sleep a bit so the thread can block again.
- absl::SleepFor(absl::Seconds(3.0));
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
- t.Join();
+ absl::SleepFor(syncDelay);
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
+ // Allow the thread to complete.
+ ASSERT_THAT(close(wfd.release()), SyscallSucceeds());
+ t.Join();
}
-TEST_F(PipeTest, WriterSideClosesReadDataFirst) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int i = 123;
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- ASSERT_EQ(j, i);
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceeds());
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
+TEST_P(PipeTest, WriterSideClosesReadDataFirst) {
+ SKIP_IF(!CreateBlocking());
+
+ int wbuf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+ ASSERT_THAT(close(wfd.release()), SyscallSucceeds());
+
+ int rbuf;
+ ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(wbuf, rbuf);
+ EXPECT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(0));
}
-TEST_F(PipeTest, ReaderSideCloses) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- int i = 123;
- ASSERT_THAT(write(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EPIPE));
+TEST_P(PipeTest, ReaderSideCloses) {
+ SKIP_IF(!CreateBlocking());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+ ASSERT_THAT(close(rfd.release()), SyscallSucceeds());
+ int buf = kTestValue;
+ EXPECT_THAT(write(wfd.get(), &buf, sizeof(buf)),
+ SyscallFailsWithErrno(EPIPE));
}
-TEST_F(PipeTest, CloseTwice) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(close(fds[1]), SyscallFailsWithErrno(EBADF));
-
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(close(fds[1]), SyscallFailsWithErrno(EBADF));
+TEST_P(PipeTest, CloseTwice) {
+ SKIP_IF(!CreateBlocking());
+
+ int _rfd = rfd.release();
+ int _wfd = wfd.release();
+ ASSERT_THAT(close(_rfd), SyscallSucceeds());
+ ASSERT_THAT(close(_wfd), SyscallSucceeds());
+ EXPECT_THAT(close(_rfd), SyscallFailsWithErrno(EBADF));
+ EXPECT_THAT(close(_wfd), SyscallFailsWithErrno(EBADF));
}
// Blocking write returns EPIPE when read end is closed if nothing has been
// written.
-TEST_F(PipeTest, BlockWriteClosed) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int wfd = fds[1];
+TEST_P(PipeTest, BlockWriteClosed) {
+ SKIP_IF(!CreateBlocking());
absl::Notification notify;
- ScopedThread t([wfd, &notify]() {
- std::vector<char> buf(kPipeSize);
+ ScopedThread t([this, &notify]() {
+ std::vector<char> buf(Size());
// Exactly fill the pipe buffer.
- ASSERT_THAT(WriteFd(wfd, buf.data(), buf.size()),
+ ASSERT_THAT(WriteFd(wfd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(buf.size()));
notify.Notify();
// Attempt to write one more byte. Blocks.
// N.B. Don't use WriteFd, we don't want a retry.
- ASSERT_THAT(write(wfd, buf.data(), 1), SyscallFailsWithErrno(EPIPE));
+ EXPECT_THAT(write(wfd.get(), buf.data(), 1), SyscallFailsWithErrno(EPIPE));
});
notify.WaitForNotification();
- absl::SleepFor(absl::Seconds(1.0));
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
-
+ ASSERT_THAT(close(rfd.release()), SyscallSucceeds());
t.Join();
-
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
}
// Blocking write returns EPIPE when read end is closed even if something has
// been written.
-//
-// FIXME(b/35924046): Pipe writes blocking early allows S/R to interrupt the
-// write(2) call before the buffer is full. Then the next call will will return
-// non-zero instead of EPIPE.
-TEST_F(PipeTest, BlockPartialWriteClosed_NoRandomSave) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int wfd = fds[1];
+TEST_P(PipeTest, BlockPartialWriteClosed) {
+ SKIP_IF(!CreateBlocking());
- ScopedThread t([wfd]() {
- std::vector<char> buf(2 * kPipeSize);
+ ScopedThread t([this]() {
+ std::vector<char> buf(2 * Size());
// Write more than fits in the buffer. Blocks then returns partial write
// when the other end is closed. The next call returns EPIPE.
- if (IsRunningOnGvisor()) {
- // FIXME(b/35924046): Pipe writes block early on gVisor, resulting in a
- // shorter than expected partial write.
- ASSERT_THAT(write(wfd, buf.data(), buf.size()),
- SyscallSucceedsWithValue(::testing::Gt(0)));
- } else {
- ASSERT_THAT(write(wfd, buf.data(), buf.size()),
- SyscallSucceedsWithValue(kPipeSize));
- }
- ASSERT_THAT(write(wfd, buf.data(), buf.size()),
+ ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(Size()));
+ EXPECT_THAT(write(wfd.get(), buf.data(), buf.size()),
SyscallFailsWithErrno(EPIPE));
});
// Leave time for write to become blocked.
- absl::SleepFor(absl::Seconds(1.0));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
+ absl::SleepFor(syncDelay);
+ // Unblock the above.
+ ASSERT_THAT(close(rfd.release()), SyscallSucceeds());
t.Join();
-
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
}
-TEST_F(PipeTest, ReadFromClosedFd_NoRandomSave) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int rfd = fds[0];
+TEST_P(PipeTest, ReadFromClosedFd_NoRandomSave) {
+ SKIP_IF(!CreateBlocking());
+
absl::Notification notify;
- ScopedThread t([rfd, &notify]() {
- int f;
+ ScopedThread t([this, &notify]() {
notify.Notify();
- ASSERT_THAT(read(rfd, &f, sizeof(f)), SyscallSucceedsWithValue(sizeof(f)));
- ASSERT_EQ(123, f);
+ int buf;
+ ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
+ ASSERT_EQ(kTestValue, buf);
});
notify.WaitForNotification();
+
// Make sure that the thread gets to read().
- absl::SleepFor(absl::Seconds(5.0));
+ absl::SleepFor(syncDelay);
+
{
// We cannot save/restore here as the read end of pipe is closed but there
// is ongoing read() above. We will not be able to restart the read()
// successfully in restore run since the read fd is closed.
const DisableSave ds;
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- int i = 123;
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
+ ASSERT_THAT(close(rfd.release()), SyscallSucceeds());
+ int buf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
t.Join();
}
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
}
-TEST_F(PipeTest, FionRead) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int data[2] = {0x12345678, 0x9101112};
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
+TEST_P(PipeTest, FionRead) {
+ SKIP_IF(!CreateBlocking());
- int n = -1;
- EXPECT_THAT(ioctl(fds[0], FIONREAD, &n), SyscallSucceedsWithValue(0));
+ int n;
+ ASSERT_THAT(ioctl(rfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0));
EXPECT_EQ(n, 0);
- n = -1;
- EXPECT_THAT(ioctl(fds[1], FIONREAD, &n), SyscallSucceedsWithValue(0));
+ ASSERT_THAT(ioctl(wfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0));
EXPECT_EQ(n, 0);
- EXPECT_THAT(write(fds[1], data, sizeof(data)),
- SyscallSucceedsWithValue(sizeof(data)));
+ std::vector<char> buf(Size());
+ ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(buf.size()));
- n = -1;
- EXPECT_THAT(ioctl(fds[0], FIONREAD, &n), SyscallSucceedsWithValue(0));
- EXPECT_EQ(n, sizeof(data));
- n = -1;
- EXPECT_THAT(ioctl(fds[1], FIONREAD, &n), SyscallSucceedsWithValue(0));
- EXPECT_EQ(n, sizeof(data));
+ EXPECT_THAT(ioctl(rfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0));
+ EXPECT_EQ(n, buf.size());
+ EXPECT_THAT(ioctl(wfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0));
+ EXPECT_EQ(n, buf.size());
}
// Test that opening an empty anonymous pipe RDONLY via /proc/self/fd/N does not
// block waiting for a writer.
-TEST_F(PipeTest, OpenViaProcSelfFD) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- FileDescriptor rfd(fds[0]);
- FileDescriptor wfd(fds[1]);
+TEST_P(PipeTest, OpenViaProcSelfFD) {
+ SKIP_IF(!CreateBlocking());
+ SKIP_IF(IsNamedPipe());
// Close the write end of the pipe.
- wfd.release();
+ ASSERT_THAT(close(wfd.release()), SyscallSucceeds());
// Open other side via /proc/self/fd. It should not block.
FileDescriptor proc_self_fd = ASSERT_NO_ERRNO_AND_VALUE(
- Open(absl::StrCat("/proc/self/fd/", fds[0]), O_RDONLY));
+ Open(absl::StrCat("/proc/self/fd/", rfd.get()), O_RDONLY));
}
// Test that opening and reading from an anonymous pipe (with existing writes)
// RDONLY via /proc/self/fd/N returns the existing data.
-TEST_F(PipeTest, OpenViaProcSelfFDWithWrites) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- FileDescriptor rfd(fds[0]);
- FileDescriptor wfd(fds[1]);
+TEST_P(PipeTest, OpenViaProcSelfFDWithWrites) {
+ SKIP_IF(!CreateBlocking());
+ SKIP_IF(IsNamedPipe());
// Write to the pipe and then close the write fd.
- char data = 'x';
- ASSERT_THAT(write(fds[1], &data, 1), SyscallSucceedsWithValue(1));
- wfd.release();
+ int wbuf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+ ASSERT_THAT(close(wfd.release()), SyscallSucceeds());
// Open read side via /proc/self/fd, and read from it.
FileDescriptor proc_self_fd = ASSERT_NO_ERRNO_AND_VALUE(
- Open(absl::StrCat("/proc/self/fd/", fds[0]), O_RDONLY));
- char got;
- ASSERT_THAT(read(proc_self_fd.get(), &got, 1), SyscallSucceedsWithValue(1));
+ Open(absl::StrCat("/proc/self/fd/", rfd.get()), O_RDONLY));
+ int rbuf;
+ ASSERT_THAT(read(proc_self_fd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(wbuf, rbuf);
+}
+
+// Test that accesses of /proc/<PID>/fd correctly decrement the refcount.
+TEST_P(PipeTest, ProcFDReleasesFile) {
+ SKIP_IF(!CreateBlocking());
- // We should get what we sent.
- EXPECT_EQ(got, data);
+ // Stat the pipe FD, which shouldn't alter the refcount.
+ struct stat wst;
+ ASSERT_THAT(lstat(absl::StrCat("/proc/self/fd/", wfd.get()).c_str(), &wst),
+ SyscallSucceeds());
+
+ // Close the write end and ensure that read indicates EOF.
+ wfd.reset();
+ char buf;
+ ASSERT_THAT(read(rfd.get(), &buf, 1), SyscallSucceedsWithValue(0));
}
-TEST_F(PipeTest, LargeFile) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- FileDescriptor rfd(fds[0]);
- FileDescriptor wfd(fds[1]);
+// Same for /proc/<PID>/fdinfo.
+TEST_P(PipeTest, ProcFDInfoReleasesFile) {
+ SKIP_IF(!CreateBlocking());
+
+ // Stat the pipe FD, which shouldn't alter the refcount.
+ struct stat wst;
+ ASSERT_THAT(
+ lstat(absl::StrCat("/proc/self/fdinfo/", wfd.get()).c_str(), &wst),
+ SyscallSucceeds());
+
+ // Close the write end and ensure that read indicates EOF.
+ wfd.reset();
+ char buf;
+ ASSERT_THAT(read(rfd.get(), &buf, 1), SyscallSucceedsWithValue(0));
+}
+
+TEST_P(PipeTest, SizeChange) {
+ SKIP_IF(!CreateBlocking());
+
+ // Set the minimum possible size.
+ ASSERT_THAT(fcntl(rfd.get(), F_SETPIPE_SZ, 0), SyscallSucceeds());
+ int min = Size();
+ EXPECT_GT(min, 0); // Should be rounded up.
+
+ // Set from the read end.
+ ASSERT_THAT(fcntl(rfd.get(), F_SETPIPE_SZ, min + 1), SyscallSucceeds());
+ int med = Size();
+ EXPECT_GT(med, min); // Should have grown, may be rounded.
+
+ // Set from the write end.
+ ASSERT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, med + 1), SyscallSucceeds());
+ int max = Size();
+ EXPECT_GT(max, med); // Ditto.
+}
- int rflags;
- EXPECT_THAT(rflags = fcntl(rfd.get(), F_GETFL), SyscallSucceeds());
+TEST_P(PipeTest, SizeChangeMax) {
+ SKIP_IF(!CreateBlocking());
- // The kernel did *not* set O_LARGEFILE.
- EXPECT_EQ(rflags, 0);
+ // Assert there's some maximum.
+ EXPECT_THAT(fcntl(rfd.get(), F_SETPIPE_SZ, 0x7fffffffffffffff),
+ SyscallFailsWithErrno(EINVAL));
+ EXPECT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, 0x7fffffffffffffff),
+ SyscallFailsWithErrno(EINVAL));
}
-// Test that accesses of /proc/<PID>/fd/<FD> and /proc/<PID>/fdinfo/<FD>
-// correctly decrement the refcount of that file descriptor.
-TEST_F(PipeTest, ProcFDReleasesFile) {
- std::vector<std::string> paths = {"/proc/self/fd/", "/proc/self/fdinfo/"};
- for (const std::string& path : paths) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- FileDescriptor rfd(fds[0]);
- FileDescriptor wfd(fds[1]);
-
- // Stat the pipe FD, which shouldn't alter the refcount of the write end of
- // the pipe.
- struct stat wst;
- ASSERT_THAT(lstat(absl::StrCat(path.c_str(), wfd.get()).c_str(), &wst),
- SyscallSucceeds());
- // Close the write end of the pipe and ensure that read indicates EOF.
- wfd.reset();
- char buf;
- ASSERT_THAT(read(rfd.get(), &buf, 1), SyscallSucceedsWithValue(0));
+TEST_P(PipeTest, SizeChangeFull) {
+ SKIP_IF(!CreateBlocking());
+
+ // Ensure that we adjust to a large enough size to avoid rounding when we
+ // perform the size decrease. If rounding occurs, we may not actually
+ // adjust the size and the call below will return success. It was found via
+ // experimentation that this granularity avoids the rounding for Linux.
+ constexpr int kDelta = 64 * 1024;
+ ASSERT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, Size() + kDelta),
+ SyscallSucceeds());
+
+ // Fill the buffer and try to change down.
+ std::vector<char> buf(Size());
+ ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(buf.size()));
+ EXPECT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, Size() - kDelta),
+ SyscallFailsWithErrno(EBUSY));
+}
+
+TEST_P(PipeTest, Streaming) {
+ SKIP_IF(!CreateBlocking());
+
+ // We make too many calls to go through full save cycles.
+ DisableSave ds;
+
+ absl::Notification notify;
+ ScopedThread t([this, &notify]() {
+ // Don't start until it's full.
+ notify.WaitForNotification();
+ for (int i = 0; i < 2 * Size(); i++) {
+ int rbuf;
+ ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(rbuf, i);
+ }
+ });
+ for (int i = 0; i < 2 * Size(); i++) {
+ int wbuf = i;
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+ // Did that write just fill up the buffer? Wake up the reader. Once only.
+ if ((i * sizeof(wbuf)) < Size() && ((i + 1) * sizeof(wbuf)) >= Size()) {
+ notify.Notify();
+ }
}
}
+std::string PipeCreatorName(::testing::TestParamInfo<PipeCreator> info) {
+ return info.param.name_; // Use the name specified.
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ Pipes, PipeTest,
+ ::testing::Values(
+ PipeCreator{
+ "pipe",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ ASSERT_THAT(pipe(fds), SyscallSucceeds());
+ *is_blocking = true;
+ *is_namedpipe = false;
+ },
+ },
+ PipeCreator{
+ "pipe2blocking",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ ASSERT_THAT(pipe2(fds, 0), SyscallSucceeds());
+ *is_blocking = true;
+ *is_namedpipe = false;
+ },
+ },
+ PipeCreator{
+ "pipe2nonblocking",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ ASSERT_THAT(pipe2(fds, O_NONBLOCK), SyscallSucceeds());
+ *is_blocking = false;
+ *is_namedpipe = false;
+ },
+ },
+ PipeCreator{
+ "smallbuffer",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ // Set to the minimum available size (will round up).
+ ASSERT_THAT(pipe(fds), SyscallSucceeds());
+ ASSERT_THAT(fcntl(fds[0], F_SETPIPE_SZ, 0), SyscallSucceeds());
+ *is_blocking = true;
+ *is_namedpipe = false;
+ },
+ },
+ PipeCreator{
+ "namednonblocking",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ // Create a new file-based pipe (non-blocking).
+ auto file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
+ ASSERT_THAT(unlink(file.path().c_str()), SyscallSucceeds());
+ SKIP_IF(mkfifo(file.path().c_str(), 0644) != 0);
+ fds[0] = open(file.path().c_str(), O_NONBLOCK | O_RDONLY);
+ fds[1] = open(file.path().c_str(), O_NONBLOCK | O_WRONLY);
+ MaybeSave();
+ *is_blocking = false;
+ *is_namedpipe = true;
+ },
+ },
+ PipeCreator{
+ "namedblocking",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ // Create a new file-based pipe (blocking).
+ auto file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
+ ASSERT_THAT(unlink(file.path().c_str()), SyscallSucceeds());
+ SKIP_IF(mkfifo(file.path().c_str(), 0644) != 0);
+ ScopedThread t([&file, &fds]() {
+ fds[1] = open(file.path().c_str(), O_WRONLY);
+ });
+ fds[0] = open(file.path().c_str(), O_RDONLY);
+ t.Join();
+ MaybeSave();
+ *is_blocking = true;
+ *is_namedpipe = true;
+ },
+ }),
+ PipeCreatorName);
+
} // namespace
} // namespace testing
} // namespace gvisor