summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdin Scannell <ascannell@google.com>2019-05-21 20:11:26 -0700
committerShentubot <shentubot@google.com>2019-05-21 20:12:27 -0700
commitae1bb08871758844fa23fc7255ffeb0392f9dee6 (patch)
treeff854ebe0082be7d0df9f47aec4eaf13fe4327ac
parentc8857f72696c1097a427b75f4340969e20cc0e95 (diff)
Clean up pipe internals and add fcntl support
Pipe internals are made more efficient by avoiding garbage collection. A pool is now used that can be shared by all pipes, and buffers are chained via an intrusive list. The documentation for pipe structures and methods is also simplified and clarified. The pipe tests are now parameterized, so that they are run on all different variants (named pipes, small buffers, default buffers). The pipe buffer sizes are exposed by fcntl, which is now supported by this change. A size change test has been added to the suite. These new tests uncovered a bug regarding the semantics of open named pipes with O_NONBLOCK, which is also fixed by this CL. This fix also addresses the lack of the O_LARGEFILE flag for named pipes. PiperOrigin-RevId: 249375888 Change-Id: I48e61e9c868aedb0cadda2dff33f09a560dee773
-rw-r--r--pkg/abi/linux/fcntl.go4
-rw-r--r--pkg/sentry/kernel/pipe/BUILD8
-rw-r--r--pkg/sentry/kernel/pipe/buffer.go90
-rw-r--r--pkg/sentry/kernel/pipe/buffer_test.go32
-rw-r--r--pkg/sentry/kernel/pipe/buffers.go48
-rw-r--r--pkg/sentry/kernel/pipe/node.go7
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go315
-rw-r--r--pkg/sentry/kernel/pipe/pipe_test.go7
-rw-r--r--pkg/sentry/kernel/pipe/reader.go3
-rw-r--r--pkg/sentry/kernel/pipe/reader_writer.go6
-rw-r--r--pkg/sentry/kernel/pipe/writer.go3
-rw-r--r--pkg/sentry/syscalls/linux/sys_file.go14
-rw-r--r--test/syscalls/BUILD6
-rw-r--r--test/syscalls/linux/BUILD2
-rw-r--r--test/syscalls/linux/pipe.cc761
15 files changed, 828 insertions, 478 deletions
diff --git a/pkg/abi/linux/fcntl.go b/pkg/abi/linux/fcntl.go
index cc8f2702d..b30350193 100644
--- a/pkg/abi/linux/fcntl.go
+++ b/pkg/abi/linux/fcntl.go
@@ -17,7 +17,6 @@ package linux
// Comands from linux/fcntl.h.
const (
F_DUPFD = 0
- F_DUPFD_CLOEXEC = 1030
F_GETFD = 1
F_GETFL = 3
F_GETOWN = 9
@@ -26,6 +25,9 @@ const (
F_SETLK = 6
F_SETLKW = 7
F_SETOWN = 8
+ F_DUPFD_CLOEXEC = 1024 + 6
+ F_SETPIPE_SZ = 1024 + 7
+ F_GETPIPE_SZ = 1024 + 8
)
// Flags for fcntl.
diff --git a/pkg/sentry/kernel/pipe/BUILD b/pkg/sentry/kernel/pipe/BUILD
index 6b23117d9..b07d15a2a 100644
--- a/pkg/sentry/kernel/pipe/BUILD
+++ b/pkg/sentry/kernel/pipe/BUILD
@@ -10,16 +10,16 @@ go_template_instance(
prefix = "buffer",
template = "//pkg/ilist:generic_list",
types = {
- "Element": "*Buffer",
- "Linker": "*Buffer",
+ "Element": "*buffer",
+ "Linker": "*buffer",
},
)
go_library(
name = "pipe",
srcs = [
+ "buffer.go",
"buffer_list.go",
- "buffers.go",
"device.go",
"node.go",
"pipe.go",
@@ -37,6 +37,7 @@ go_library(
"//pkg/sentry/device",
"//pkg/sentry/fs",
"//pkg/sentry/fs/fsutil",
+ "//pkg/sentry/safemem",
"//pkg/sentry/usermem",
"//pkg/syserror",
"//pkg/waiter",
@@ -47,6 +48,7 @@ go_test(
name = "pipe_test",
size = "small",
srcs = [
+ "buffer_test.go",
"node_test.go",
"pipe_test.go",
],
diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go
new file mode 100644
index 000000000..4360dc44f
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/buffer.go
@@ -0,0 +1,90 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "sync"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/safemem"
+)
+
+// buffer encapsulates a queueable byte buffer.
+//
+// Note that the total size is slightly less than two pages. This
+// is done intentionally to ensure that the buffer object aligns
+// with runtime internals. We have no hard size or alignment
+// requirements. This two page size will effectively minimize
+// internal fragmentation, but still have a large enough chunk
+// to limit excessive segmentation.
+//
+// +stateify savable
+type buffer struct {
+ data [8144]byte
+ read int
+ write int
+ bufferEntry
+}
+
+// Reset resets internal data.
+//
+// This must be called before use.
+func (b *buffer) Reset() {
+ b.read = 0
+ b.write = 0
+}
+
+// Empty indicates the buffer is empty.
+//
+// This indicates there is no data left to read.
+func (b *buffer) Empty() bool {
+ return b.read == b.write
+}
+
+// Full indicates the buffer is full.
+//
+// This indicates there is no capacity left to write.
+func (b *buffer) Full() bool {
+ return b.write == len(b.data)
+}
+
+// WriteFromBlocks implements safemem.Writer.WriteFromBlocks.
+func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) {
+ dst := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.write:]))
+ n, err := safemem.CopySeq(dst, srcs)
+ b.write += int(n)
+ return n, err
+}
+
+// ReadToBlocks implements safemem.Reader.ReadToBlocks.
+func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
+ src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write]))
+ n, err := safemem.CopySeq(dsts, src)
+ b.read += int(n)
+ return n, err
+}
+
+// bufferPool is a pool for buffers.
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(buffer)
+ },
+}
+
+// newBuffer grabs a new buffer from the pool.
+func newBuffer() *buffer {
+ b := bufferPool.Get().(*buffer)
+ b.Reset()
+ return b
+}
diff --git a/pkg/sentry/kernel/pipe/buffer_test.go b/pkg/sentry/kernel/pipe/buffer_test.go
new file mode 100644
index 000000000..4b7dbc43f
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/buffer_test.go
@@ -0,0 +1,32 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "testing"
+ "unsafe"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+)
+
+func TestBufferSize(t *testing.T) {
+ bufferSize := unsafe.Sizeof(buffer{})
+ if bufferSize < usermem.PageSize {
+ t.Errorf("buffer is less than a page")
+ }
+ if bufferSize > (2 * usermem.PageSize) {
+ t.Errorf("buffer is greater than two pages")
+ }
+}
diff --git a/pkg/sentry/kernel/pipe/buffers.go b/pkg/sentry/kernel/pipe/buffers.go
deleted file mode 100644
index ba53fd482..000000000
--- a/pkg/sentry/kernel/pipe/buffers.go
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-// Buffer encapsulates a queueable byte buffer that can
-// easily be truncated. It is designed only for use with pipes.
-//
-// +stateify savable
-type Buffer struct {
- bufferEntry
- data []byte
-}
-
-// newBuffer initializes a Buffer.
-func newBuffer(buf []byte) *Buffer {
- return &Buffer{data: buf}
-}
-
-// bytes returns the bytes contained in the buffer.
-func (b *Buffer) bytes() []byte {
- return b.data
-}
-
-// size returns the number of bytes contained in the buffer.
-func (b *Buffer) size() int {
- return len(b.data)
-}
-
-// truncate removes the first n bytes from the buffer.
-func (b *Buffer) truncate(n int) int {
- if n > len(b.data) {
- panic("Trying to truncate past end of array.")
- }
- b.data = b.data[n:]
- return len(b.data)
-}
diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go
index 7c3739360..926c4c623 100644
--- a/pkg/sentry/kernel/pipe/node.go
+++ b/pkg/sentry/kernel/pipe/node.go
@@ -67,7 +67,6 @@ func NewInodeOperations(ctx context.Context, perms fs.FilePermissions, p *Pipe)
InodeSimpleAttributes: fsutil.NewInodeSimpleAttributes(ctx, fs.FileOwnerFromContext(ctx), perms, linux.PIPEFS_MAGIC),
p: p,
}
-
}
// GetFile implements fs.InodeOperations.GetFile. Named pipes have special blocking
@@ -87,7 +86,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
switch {
case flags.Read && !flags.Write: // O_RDONLY.
- r := i.p.ROpen(ctx)
+ r := i.p.Open(ctx, flags)
i.newHandleLocked(&i.rWakeup)
if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() {
@@ -103,7 +102,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
return r, nil
case flags.Write && !flags.Read: // O_WRONLY.
- w := i.p.WOpen(ctx)
+ w := i.p.Open(ctx, flags)
i.newHandleLocked(&i.wWakeup)
if i.p.isNamed && !i.p.HasReaders() {
@@ -123,7 +122,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi
case flags.Read && flags.Write: // O_RDWR.
// Pipes opened for read-write always succeeds without blocking.
- rw := i.p.RWOpen(ctx)
+ rw := i.p.Open(ctx, flags)
i.newHandleLocked(&i.rWakeup)
i.newHandleLocked(&i.wWakeup)
return rw, nil
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
index bd7649d2f..b65204492 100644
--- a/pkg/sentry/kernel/pipe/pipe.go
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -12,11 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Package pipe provides an in-memory implementation of a unidirectional
-// pipe.
-//
-// The goal of this pipe is to emulate the pipe syscall in all of its
-// edge cases and guarantees of atomic IO.
+// Package pipe provides a pipe implementation.
package pipe
import (
@@ -32,8 +28,29 @@ import (
"gvisor.googlesource.com/gvisor/pkg/waiter"
)
-// DefaultPipeSize is the system-wide default size of a pipe in bytes.
-const DefaultPipeSize = 65536
+const (
+ // MinimumPipeSize is a hard limit of the minimum size of a pipe.
+ MinimumPipeSize = 64 << 10
+
+ // DefaultPipeSize is the system-wide default size of a pipe in bytes.
+ DefaultPipeSize = MinimumPipeSize
+
+ // MaximumPipeSize is a hard limit on the maximum size of a pipe.
+ MaximumPipeSize = 8 << 20
+)
+
+// Sizer is an interface for setting and getting the size of a pipe.
+//
+// It is implemented by Pipe and, through embedding, all other types.
+type Sizer interface {
+ // PipeSize returns the pipe capacity in bytes.
+ PipeSize() int64
+
+ // SetPipeSize sets the new pipe capacity in bytes.
+ //
+ // The new size is returned (which may be capped).
+ SetPipeSize(int64) (int64, error)
+}
// Pipe is an encapsulation of a platform-independent pipe.
// It manages a buffered byte queue shared between a reader/writer
@@ -43,49 +60,76 @@ const DefaultPipeSize = 65536
type Pipe struct {
waiter.Queue `state:"nosave"`
- // Whether this is a named or anonymous pipe.
+ // isNamed indicates whether this is a named pipe.
+ //
+ // This value is immutable.
isNamed bool
+ // atomicIOBytes is the maximum number of bytes that the pipe will
+ // guarantee atomic reads or writes atomically.
+ //
+ // This value is immutable.
+ atomicIOBytes int64
+
// The dirent backing this pipe. Shared by all readers and writers.
+ //
+ // This value is immutable.
Dirent *fs.Dirent
- // The buffered byte queue.
- data bufferList
+ // The number of active readers for this pipe.
+ //
+ // Access atomically.
+ readers int32
- // Max size of the pipe in bytes. When this max has been reached,
- // writers will get EWOULDBLOCK.
- max int
+ // The number of active writes for this pipe.
+ //
+ // Access atomically.
+ writers int32
- // Current size of the pipe in bytes.
- size int
+ // mu protects all pipe internal state below.
+ mu sync.Mutex `state:"nosave"`
- // Max number of bytes the pipe can guarantee to read or write
- // atomically.
- atomicIOBytes int
+ // data is the buffer queue of pipe contents.
+ //
+ // This is protected by mu.
+ data bufferList
- // The number of active readers for this pipe. Load/store atomically.
- readers int32
+ // max is the maximum size of the pipe in bytes. When this max has been
+ // reached, writers will get EWOULDBLOCK.
+ //
+ // This is protected by mu.
+ max int64
- // The number of active writes for this pipe. Load/store atomically.
- writers int32
+ // size is the current size of the pipe in bytes.
+ //
+ // This is protected by mu.
+ size int64
- // This flag indicates if this pipe ever had a writer. Note that this does
- // not necessarily indicate there is *currently* a writer, just that there
- // has been a writer at some point since the pipe was created.
+ // hadWriter indicates if this pipe ever had a writer. Note that this
+ // does not necessarily indicate there is *currently* a writer, just
+ // that there has been a writer at some point since the pipe was
+ // created.
//
- // Protected by mu.
+ // This is protected by mu.
hadWriter bool
-
- // Lock protecting all pipe internal state.
- mu sync.Mutex `state:"nosave"`
}
-// NewPipe initializes and returns a pipe. A pipe created by this function is
-// persistent, and will remain valid even without any open fds to it. Named
-// pipes for mknod(2) are created via this function. Note that the
-// implementation of blocking semantics for opening the read and write ends of a
-// named pipe are left to filesystems.
-func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe {
+// NewPipe initializes and returns a pipe.
+//
+// N.B. The size and atomicIOBytes will be bounded.
+func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe {
+ if sizeBytes < MinimumPipeSize {
+ sizeBytes = MinimumPipeSize
+ }
+ if sizeBytes > MaximumPipeSize {
+ sizeBytes = MaximumPipeSize
+ }
+ if atomicIOBytes <= 0 {
+ atomicIOBytes = 1
+ }
+ if atomicIOBytes > sizeBytes {
+ atomicIOBytes = sizeBytes
+ }
p := &Pipe{
isNamed: isNamed,
max: sizeBytes,
@@ -110,48 +154,45 @@ func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *P
return p
}
-// NewConnectedPipe initializes a pipe and returns a pair of objects (which
-// implement kio.File) representing the read and write ends of the pipe. A pipe
-// created by this function becomes invalid as soon as either the read or write
-// end is closed, and errors on subsequent operations on either end. Pipes
-// for pipe(2) and pipe2(2) are generally created this way.
-func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) {
+// NewConnectedPipe initializes a pipe and returns a pair of objects
+// representing the read and write ends of the pipe.
+func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs.File, *fs.File) {
p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes)
- return p.ROpen(ctx), p.WOpen(ctx)
-}
-
-// ROpen opens the pipe for reading.
-func (p *Pipe) ROpen(ctx context.Context) *fs.File {
- p.rOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true}, &Reader{
- ReaderWriter: ReaderWriter{Pipe: p},
- })
-}
-
-// WOpen opens the pipe for writing.
-func (p *Pipe) WOpen(ctx context.Context) *fs.File {
- p.wOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Write: true}, &Writer{
- ReaderWriter: ReaderWriter{Pipe: p},
- })
+ return p.Open(ctx, fs.FileFlags{Read: true}), p.Open(ctx, fs.FileFlags{Write: true})
}
-// RWOpen opens the pipe for both reading and writing.
-func (p *Pipe) RWOpen(ctx context.Context) *fs.File {
- p.rOpen()
- p.wOpen()
- return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{
- Pipe: p,
- })
+// Open opens the pipe and returns a new file.
+//
+// Precondition: at least one of flags.Read or flags.Write must be set.
+func (p *Pipe) Open(ctx context.Context, flags fs.FileFlags) *fs.File {
+ switch {
+ case flags.Read && flags.Write:
+ p.rOpen()
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &ReaderWriter{
+ Pipe: p,
+ })
+ case flags.Read:
+ p.rOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Reader{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ case flags.Write:
+ p.wOpen()
+ return fs.NewFile(ctx, p.Dirent, flags, &Writer{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+ default:
+ // Precondition violated.
+ panic("invalid pipe flags")
+ }
}
// read reads data from the pipe into dst and returns the number of bytes
// read, or returns ErrWouldBlock if the pipe is empty.
+//
+// Precondition: this pipe must have readers.
func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
- if !p.HasReaders() {
- return 0, syscall.EBADF
- }
-
// Don't block for a zero-length read even if the pipe is empty.
if dst.NumBytes() == 0 {
return 0, nil
@@ -159,8 +200,8 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
p.mu.Lock()
defer p.mu.Unlock()
- // If there is nothing to read at the moment but there is a writer, tell the
- // caller to block.
+
+ // Is the pipe empty?
if p.size == 0 {
if !p.HasWriters() {
// There are no writers, return EOF.
@@ -168,64 +209,94 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
}
return 0, syserror.ErrWouldBlock
}
- var n int64
- for buffer := p.data.Front(); buffer != nil; buffer = p.data.Front() {
- n0, err := dst.CopyOut(ctx, buffer.bytes())
- n += int64(n0)
- p.size -= n0
- if buffer.truncate(n0) == 0 {
- p.data.Remove(buffer)
+
+ // Limit how much we consume.
+ if dst.NumBytes() > p.size {
+ dst = dst.TakeFirst64(p.size)
+ }
+
+ done := int64(0)
+ for dst.NumBytes() > 0 {
+ // Pop the first buffer.
+ first := p.data.Front()
+ if first == nil {
+ break
}
- dst = dst.DropFirst(n0)
- if dst.NumBytes() == 0 || err != nil {
- return n, err
+
+ // Copy user data.
+ n, err := dst.CopyOutFrom(ctx, first)
+ done += int64(n)
+ p.size -= n
+ dst = dst.DropFirst64(n)
+
+ // Empty buffer?
+ if first.Empty() {
+ // Push to the free list.
+ p.data.Remove(first)
+ bufferPool.Put(first)
+ }
+
+ // Handle errors.
+ if err != nil {
+ return done, err
}
}
- return n, nil
+
+ return done, nil
}
// write writes data from sv into the pipe and returns the number of bytes
// written. If no bytes are written because the pipe is full (or has less than
// atomicIOBytes free capacity), write returns ErrWouldBlock.
+//
+// Precondition: this pipe must have writers.
func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
- if !p.HasWriters() {
- return 0, syscall.EBADF
- }
+ // Can't write to a pipe with no readers.
if !p.HasReaders() {
return 0, syscall.EPIPE
}
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
- // atomic, but requires no atomicity for writes larger than this. However,
- // Linux appears to provide stronger semantics than this in practice:
- // unmerged writes are done one PAGE_SIZE buffer at a time, so for larger
- // writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement
- // this by writing at most atomicIOBytes at a time if we can't service the
- // write in its entirety.
- canWrite := src.NumBytes()
- if canWrite > int64(p.max-p.size) {
- if p.max-p.size >= p.atomicIOBytes {
- canWrite = int64(p.atomicIOBytes)
- } else {
+ // atomic, but requires no atomicity for writes larger than this.
+ wanted := src.NumBytes()
+ if avail := p.max - p.size; wanted > avail {
+ if wanted <= p.atomicIOBytes {
return 0, syserror.ErrWouldBlock
}
+ // Limit to the available capacity.
+ src = src.TakeFirst64(avail)
}
- // Copy data from user memory into a pipe-owned buffer.
- buf := make([]byte, canWrite)
- n, err := src.CopyIn(ctx, buf)
- if n > 0 {
- p.data.PushBack(newBuffer(buf[:n]))
+ done := int64(0)
+ for src.NumBytes() > 0 {
+ // Need a new buffer?
+ last := p.data.Back()
+ if last == nil || last.Full() {
+ // Add a new buffer to the data list.
+ last = newBuffer()
+ p.data.PushBack(last)
+ }
+
+ // Copy user data.
+ n, err := src.CopyInTo(ctx, last)
+ done += int64(n)
p.size += n
+ src = src.DropFirst64(n)
+
+ // Handle errors.
+ if err != nil {
+ return done, err
+ }
}
- if int64(n) < src.NumBytes() && err == nil {
+ if wanted > done {
// Partial write due to full pipe.
- err = syserror.ErrWouldBlock
+ return done, syserror.ErrWouldBlock
}
- return int64(n), err
+
+ return done, nil
}
// rOpen signals a new reader of the pipe.
@@ -267,6 +338,9 @@ func (p *Pipe) HasWriters() bool {
return atomic.LoadInt32(&p.writers) > 0
}
+// rReadinessLocked calculates the read readiness.
+//
+// Precondition: mu must be held.
func (p *Pipe) rReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasReaders() && p.data.Front() != nil {
@@ -290,6 +364,9 @@ func (p *Pipe) rReadiness() waiter.EventMask {
return p.rReadinessLocked()
}
+// wReadinessLocked calculates the write readiness.
+//
+// Precondition: mu must be held.
func (p *Pipe) wReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasWriters() && p.size < p.max {
@@ -317,8 +394,36 @@ func (p *Pipe) rwReadiness() waiter.EventMask {
return p.rReadinessLocked() | p.wReadinessLocked()
}
-func (p *Pipe) queuedSize() int {
+// queued returns the amount of queued data.
+func (p *Pipe) queued() int64 {
p.mu.Lock()
defer p.mu.Unlock()
return p.size
}
+
+// PipeSize implements PipeSizer.PipeSize.
+func (p *Pipe) PipeSize() int64 {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.max
+}
+
+// SetPipeSize implements PipeSize.SetPipeSize.
+func (p *Pipe) SetPipeSize(size int64) (int64, error) {
+ if size < 0 {
+ return 0, syserror.EINVAL
+ }
+ if size < MinimumPipeSize {
+ size = MinimumPipeSize // Per spec.
+ }
+ if size > MaximumPipeSize {
+ return 0, syserror.EPERM
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if size < p.size {
+ return 0, syserror.EBUSY
+ }
+ p.max = size
+ return size, nil
+}
diff --git a/pkg/sentry/kernel/pipe/pipe_test.go b/pkg/sentry/kernel/pipe/pipe_test.go
index de340c40c..298c6587b 100644
--- a/pkg/sentry/kernel/pipe/pipe_test.go
+++ b/pkg/sentry/kernel/pipe/pipe_test.go
@@ -58,15 +58,16 @@ func TestPipeReadBlock(t *testing.T) {
func TestPipeWriteBlock(t *testing.T) {
const atomicIOBytes = 2
+ const capacity = MinimumPipeSize
ctx := contexttest.Context(t)
- r, w := NewConnectedPipe(ctx, 10, atomicIOBytes)
+ r, w := NewConnectedPipe(ctx, capacity, atomicIOBytes)
defer r.DecRef()
defer w.DecRef()
- msg := []byte("here's some bytes")
+ msg := make([]byte, capacity+1)
n, err := w.Writev(ctx, usermem.BytesIOSequence(msg))
- if wantN, wantErr := int64(atomicIOBytes), syserror.ErrWouldBlock; n != wantN || err != wantErr {
+ if wantN, wantErr := int64(capacity), syserror.ErrWouldBlock; n != wantN || err != wantErr {
t.Fatalf("Writev: got (%d, %v), wanted (%d, %v)", n, err, wantN, wantErr)
}
}
diff --git a/pkg/sentry/kernel/pipe/reader.go b/pkg/sentry/kernel/pipe/reader.go
index 48fab45d1..656be824d 100644
--- a/pkg/sentry/kernel/pipe/reader.go
+++ b/pkg/sentry/kernel/pipe/reader.go
@@ -27,8 +27,11 @@ type Reader struct {
}
// Release implements fs.FileOperations.Release.
+//
+// This overrides ReaderWriter.Release.
func (r *Reader) Release() {
r.Pipe.rClose()
+
// Wake up writers.
r.Pipe.Notify(waiter.EventOut)
}
diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go
index 59899be49..e560b9be9 100644
--- a/pkg/sentry/kernel/pipe/reader_writer.go
+++ b/pkg/sentry/kernel/pipe/reader_writer.go
@@ -15,7 +15,6 @@
package pipe
import (
- "fmt"
"math"
"syscall"
@@ -49,6 +48,7 @@ type ReaderWriter struct {
func (rw *ReaderWriter) Release() {
rw.Pipe.rClose()
rw.Pipe.wClose()
+
// Wake up readers and writers.
rw.Pipe.Notify(waiter.EventIn | waiter.EventOut)
}
@@ -81,9 +81,9 @@ func (rw *ReaderWriter) Ioctl(ctx context.Context, io usermem.IO, args arch.Sysc
// Switch on ioctl request.
switch int(args[1].Int()) {
case linux.FIONREAD:
- v := rw.queuedSize()
+ v := rw.queued()
if v > math.MaxInt32 {
- panic(fmt.Sprintf("Impossibly large pipe queued size: %d", v))
+ v = math.MaxInt32 // Silently truncate.
}
// Copy result to user-space.
_, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{
diff --git a/pkg/sentry/kernel/pipe/writer.go b/pkg/sentry/kernel/pipe/writer.go
index 0f29fbc43..8d5b68541 100644
--- a/pkg/sentry/kernel/pipe/writer.go
+++ b/pkg/sentry/kernel/pipe/writer.go
@@ -27,8 +27,11 @@ type Writer struct {
}
// Release implements fs.FileOperations.Release.
+//
+// This overrides ReaderWriter.Release.
func (w *Writer) Release() {
w.Pipe.wClose()
+
// Wake up readers.
w.Pipe.Notify(waiter.EventHUp)
}
diff --git a/pkg/sentry/syscalls/linux/sys_file.go b/pkg/sentry/syscalls/linux/sys_file.go
index 8a80cd430..19f579930 100644
--- a/pkg/sentry/syscalls/linux/sys_file.go
+++ b/pkg/sentry/syscalls/linux/sys_file.go
@@ -27,6 +27,7 @@ import (
"gvisor.googlesource.com/gvisor/pkg/sentry/kernel/auth"
"gvisor.googlesource.com/gvisor/pkg/sentry/kernel/fasync"
"gvisor.googlesource.com/gvisor/pkg/sentry/kernel/kdefs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/pipe"
ktime "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/time"
"gvisor.googlesource.com/gvisor/pkg/sentry/limits"
"gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
@@ -943,6 +944,19 @@ func Fcntl(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Syscall
}
err := tmpfs.AddSeals(file.Dirent.Inode, args[2].Uint())
return 0, nil, err
+ case linux.F_GETPIPE_SZ:
+ sz, ok := file.FileOperations.(pipe.Sizer)
+ if !ok {
+ return 0, nil, syserror.EINVAL
+ }
+ return uintptr(sz.PipeSize()), nil, nil
+ case linux.F_SETPIPE_SZ:
+ sz, ok := file.FileOperations.(pipe.Sizer)
+ if !ok {
+ return 0, nil, syserror.EINVAL
+ }
+ n, err := sz.SetPipeSize(int64(args[2].Int()))
+ return uintptr(n), nil, err
default:
// Everything else is not yet supported.
return 0, nil, syserror.EINVAL
diff --git a/test/syscalls/BUILD b/test/syscalls/BUILD
index b531d7629..0d6b6ccc7 100644
--- a/test/syscalls/BUILD
+++ b/test/syscalls/BUILD
@@ -191,7 +191,11 @@ syscall_test(test = "//test/syscalls/linux:partial_bad_buffer_test")
syscall_test(test = "//test/syscalls/linux:pause_test")
-syscall_test(test = "//test/syscalls/linux:pipe_test")
+syscall_test(
+ size = "large",
+ shard_count = 5,
+ test = "//test/syscalls/linux:pipe_test",
+)
syscall_test(test = "//test/syscalls/linux:poll_test")
diff --git a/test/syscalls/linux/BUILD b/test/syscalls/linux/BUILD
index d4e49bb3a..4e239617b 100644
--- a/test/syscalls/linux/BUILD
+++ b/test/syscalls/linux/BUILD
@@ -1237,6 +1237,8 @@ cc_binary(
linkstatic = 1,
deps = [
"//test/util:file_descriptor",
+ "//test/util:posix_error",
+ "//test/util:temp_path",
"//test/util:test_main",
"//test/util:test_util",
"//test/util:thread_util",
diff --git a/test/syscalls/linux/pipe.cc b/test/syscalls/linux/pipe.cc
index 8698295b3..bce351e08 100644
--- a/test/syscalls/linux/pipe.cc
+++ b/test/syscalls/linux/pipe.cc
@@ -26,6 +26,8 @@
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "test/util/file_descriptor.h"
+#include "test/util/posix_error.h"
+#include "test/util/temp_path.h"
#include "test/util/test_util.h"
#include "test/util/thread_util.h"
@@ -34,449 +36,588 @@ namespace testing {
namespace {
-// Buffer size of a pipe.
-//
-// TODO(b/35762278): Get this from F_GETPIPE_SZ.
-constexpr int kPipeSize = 65536;
+// Used as a non-zero sentinel value, below.
+constexpr int kTestValue = 0x12345678;
+
+// Used for synchronization in race tests.
+const absl::Duration syncDelay = absl::Seconds(2);
+
+struct PipeCreator {
+ std::string name_;
+
+ // void (fds, is_blocking, is_namedpipe).
+ std::function<void(int[2], bool*, bool*)> create_;
+};
+
+class PipeTest : public ::testing::TestWithParam<PipeCreator> {
+ protected:
+ FileDescriptor rfd;
+ FileDescriptor wfd;
-class PipeTest : public ::testing::Test {
public:
static void SetUpTestCase() {
// Tests intentionally generate SIGPIPE.
TEST_PCHECK(signal(SIGPIPE, SIG_IGN) != SIG_ERR);
}
+ // Initializes rfd and wfd as a blocking pipe.
+ //
+ // The return value indicates success: the test should be skipped otherwise.
+ bool CreateBlocking() { return create(true); }
+
+ // Initializes rfd and wfd as a non-blocking pipe.
+ //
+ // The return value is per CreateBlocking.
+ bool CreateNonBlocking() { return create(false); }
+
+ // Returns true iff the pipe represents a named pipe.
+ bool IsNamedPipe() { return namedpipe_; }
+
+ int Size() {
+ int s1 = fcntl(rfd.get(), F_GETPIPE_SZ);
+ int s2 = fcntl(wfd.get(), F_GETPIPE_SZ);
+ EXPECT_GT(s1, 0);
+ EXPECT_GT(s2, 0);
+ EXPECT_EQ(s1, s2);
+ return s1;
+ }
+
static void TearDownTestCase() {
TEST_PCHECK(signal(SIGPIPE, SIG_DFL) != SIG_ERR);
}
+
+ private:
+ bool namedpipe_ = false;
+
+ bool create(bool wants_blocking) {
+ // Generate the pipe.
+ int fds[2] = {-1, -1};
+ bool is_blocking = false;
+ GetParam().create_(fds, &is_blocking, &namedpipe_);
+ if (fds[0] < 0 || fds[1] < 0) {
+ return false;
+ }
+
+ // Save descriptors.
+ rfd.reset(fds[0]);
+ wfd.reset(fds[1]);
+
+ // Adjust blocking, if needed.
+ if (!is_blocking && wants_blocking) {
+ // Clear the blocking flag.
+ EXPECT_THAT(fcntl(fds[0], F_SETFL, 0), SyscallSucceeds());
+ EXPECT_THAT(fcntl(fds[1], F_SETFL, 0), SyscallSucceeds());
+ } else if (is_blocking && !wants_blocking) {
+ // Set the descriptors to blocking.
+ EXPECT_THAT(fcntl(fds[0], F_SETFL, O_NONBLOCK), SyscallSucceeds());
+ EXPECT_THAT(fcntl(fds[1], F_SETFL, O_NONBLOCK), SyscallSucceeds());
+ }
+
+ return true;
+ }
};
-TEST_F(PipeTest, Basic) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
+TEST_P(PipeTest, Inode) {
+ SKIP_IF(!CreateBlocking());
// Ensure that the inode number is the same for each end.
struct stat rst;
- ASSERT_THAT(fstat(fds[0], &rst), SyscallSucceeds());
+ ASSERT_THAT(fstat(rfd.get(), &rst), SyscallSucceeds());
struct stat wst;
- ASSERT_THAT(fstat(fds[1], &wst), SyscallSucceeds());
+ ASSERT_THAT(fstat(wfd.get(), &wst), SyscallSucceeds());
EXPECT_EQ(rst.st_ino, wst.st_ino);
-
- ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
-
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
-
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceeds());
- ASSERT_THAT(fcntl(fds[1], F_GETFL), SyscallSucceedsWithValue(O_WRONLY));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
}
-TEST_F(PipeTest, BasicCloExec) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe2(fds, O_CLOEXEC), SyscallSucceeds());
+TEST_P(PipeTest, Permissions) {
+ SKIP_IF(!CreateBlocking());
- ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
+ // Attempt bad operations.
+ int buf = kTestValue;
+ ASSERT_THAT(write(rfd.get(), &buf, sizeof(buf)),
+ SyscallFailsWithErrno(EBADF));
+ EXPECT_THAT(read(wfd.get(), &buf, sizeof(buf)), SyscallFailsWithErrno(EBADF));
+}
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
+TEST_P(PipeTest, Flags) {
+ SKIP_IF(!CreateBlocking());
+
+ if (IsNamedPipe()) {
+ // May be stubbed to zero; define locally.
+ constexpr int kLargefile = 0100000;
+ EXPECT_THAT(fcntl(rfd.get(), F_GETFL),
+ SyscallSucceedsWithValue(kLargefile | O_RDONLY));
+ EXPECT_THAT(fcntl(wfd.get(), F_GETFL),
+ SyscallSucceedsWithValue(kLargefile | O_WRONLY));
+ } else {
+ EXPECT_THAT(fcntl(rfd.get(), F_GETFL), SyscallSucceedsWithValue(O_RDONLY));
+ EXPECT_THAT(fcntl(wfd.get(), F_GETFL), SyscallSucceedsWithValue(O_WRONLY));
+ }
+}
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceeds());
- ASSERT_THAT(fcntl(fds[1], F_GETFL), SyscallSucceeds());
+TEST_P(PipeTest, Write) {
+ SKIP_IF(!CreateBlocking());
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+ int wbuf = kTestValue;
+ int rbuf = ~kTestValue;
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+ ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(wbuf, rbuf);
}
-TEST_F(PipeTest, BasicNoBlock) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe2(fds, O_NONBLOCK), SyscallSucceeds());
-
- ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
-
- ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK));
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
- ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK));
-
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceedsWithValue(O_NONBLOCK));
- ASSERT_THAT(fcntl(fds[1], F_GETFL),
- SyscallSucceedsWithValue(O_NONBLOCK | O_WRONLY));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+TEST_P(PipeTest, NonBlocking) {
+ SKIP_IF(!CreateNonBlocking());
+
+ int wbuf = kTestValue;
+ int rbuf = ~kTestValue;
+ EXPECT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallFailsWithErrno(EWOULDBLOCK));
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+
+ ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(wbuf, rbuf);
+ EXPECT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallFailsWithErrno(EWOULDBLOCK));
}
-TEST_F(PipeTest, BasicBothOptions) {
- // fds[0] is read end, fds[1] is write end.
+TEST(Pipe2Test, CloExec) {
int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe2(fds, O_NONBLOCK | O_CLOEXEC), SyscallSucceeds());
-
- ASSERT_THAT(write(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(read(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EBADF));
-
- ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK));
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
- ASSERT_THAT(read(fds[0], &i, sizeof(i)), SyscallFailsWithErrno(EWOULDBLOCK));
-
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceedsWithValue(O_NONBLOCK));
- ASSERT_THAT(fcntl(fds[1], F_GETFL),
- SyscallSucceedsWithValue(O_NONBLOCK | O_WRONLY));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+ ASSERT_THAT(pipe2(fds, O_CLOEXEC), SyscallSucceeds());
+ EXPECT_THAT(fcntl(fds[0], F_GETFD), SyscallSucceedsWithValue(FD_CLOEXEC));
+ EXPECT_THAT(fcntl(fds[1], F_GETFD), SyscallSucceedsWithValue(FD_CLOEXEC));
+ EXPECT_THAT(close(fds[0]), SyscallSucceeds());
+ EXPECT_THAT(close(fds[1]), SyscallSucceeds());
}
-TEST_F(PipeTest, BasicBadOptions) {
+TEST(Pipe2Test, BadOptions) {
int fds[2];
- ASSERT_THAT(pipe2(fds, 0xDEAD), SyscallFailsWithErrno(EINVAL));
+ EXPECT_THAT(pipe2(fds, 0xDEAD), SyscallFailsWithErrno(EINVAL));
}
-TEST_F(PipeTest, Seek) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int i = 0x12345678;
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
-
- ASSERT_THAT(lseek(fds[0], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
-
- ASSERT_THAT(lseek(fds[0], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
-
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- int j;
-
- ASSERT_THAT(lseek(fds[0], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
-
- ASSERT_THAT(lseek(fds[0], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[0], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
- ASSERT_THAT(lseek(fds[1], 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
-
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- EXPECT_EQ(i, j);
-
- ASSERT_THAT(fcntl(fds[0], F_GETFL), SyscallSucceeds());
- ASSERT_THAT(fcntl(fds[1], F_GETFL), SyscallSucceedsWithValue(O_WRONLY));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+TEST_P(PipeTest, Seek) {
+ SKIP_IF(!CreateBlocking());
+
+ for (int i = 0; i < 4; i++) {
+ // Attempt absolute seeks.
+ EXPECT_THAT(lseek(rfd.get(), 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(rfd.get(), 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 0, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 4, SEEK_SET), SyscallFailsWithErrno(ESPIPE));
+
+ // Attempt relative seeks.
+ EXPECT_THAT(lseek(rfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(rfd.get(), 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 4, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+
+ // Attempt end-of-file seeks.
+ EXPECT_THAT(lseek(rfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(rfd.get(), -4, SEEK_END), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), 0, SEEK_CUR), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(lseek(wfd.get(), -4, SEEK_END), SyscallFailsWithErrno(ESPIPE));
+
+ // Add some more data to the pipe.
+ int buf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
+ }
}
-TEST_F(PipeTest, AbsoluteOffsetSyscallsFail) {
- // Syscalls for IO at absolute offsets fail because pipes are not seekable.
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
-
- std::vector<char> buf(4096);
- struct iovec iov;
+TEST_P(PipeTest, OffsetCalls) {
+ SKIP_IF(!CreateBlocking());
- EXPECT_THAT(pread(fds[1], buf.data(), buf.size(), 0),
+ int buf;
+ EXPECT_THAT(pread(wfd.get(), &buf, sizeof(buf), 0),
SyscallFailsWithErrno(ESPIPE));
- EXPECT_THAT(pwrite(fds[0], buf.data(), buf.size(), 0),
+ EXPECT_THAT(pwrite(rfd.get(), &buf, sizeof(buf), 0),
SyscallFailsWithErrno(ESPIPE));
- EXPECT_THAT(preadv(fds[1], &iov, 1, 0), SyscallFailsWithErrno(ESPIPE));
- EXPECT_THAT(pwritev(fds[0], &iov, 1, 0), SyscallFailsWithErrno(ESPIPE));
- EXPECT_THAT(close(fds[0]), SyscallSucceeds());
- EXPECT_THAT(close(fds[1]), SyscallSucceeds());
+ struct iovec iov;
+ EXPECT_THAT(preadv(wfd.get(), &iov, 1, 0), SyscallFailsWithErrno(ESPIPE));
+ EXPECT_THAT(pwritev(rfd.get(), &iov, 1, 0), SyscallFailsWithErrno(ESPIPE));
}
-TEST_F(PipeTest, WriterSideCloses) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int rfd = fds[0];
- int i = 123;
- ScopedThread t([rfd]() {
- int j;
- ASSERT_THAT(read(rfd, &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
+TEST_P(PipeTest, WriterSideCloses) {
+ SKIP_IF(!CreateBlocking());
+
+ ScopedThread t([this]() {
+ int buf = ~kTestValue;
+ ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
+ EXPECT_EQ(buf, kTestValue);
// This will return when the close() completes.
- ASSERT_THAT(read(rfd, &j, sizeof(j)), SyscallSucceeds());
+ ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)), SyscallSucceeds());
// This will return straight away.
- ASSERT_THAT(read(rfd, &j, sizeof(j)), SyscallSucceeds());
+ ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(0));
});
+
// Sleep a bit so the thread can block.
- absl::SleepFor(absl::Seconds(1.0));
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
+ absl::SleepFor(syncDelay);
+
+ // Write to unblock.
+ int buf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
+
// Sleep a bit so the thread can block again.
- absl::SleepFor(absl::Seconds(3.0));
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
- t.Join();
+ absl::SleepFor(syncDelay);
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
+ // Allow the thread to complete.
+ ASSERT_THAT(close(wfd.release()), SyscallSucceeds());
+ t.Join();
}
-TEST_F(PipeTest, WriterSideClosesReadDataFirst) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int i = 123;
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
- int j;
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceedsWithValue(sizeof(j)));
- ASSERT_EQ(j, i);
- ASSERT_THAT(read(fds[0], &j, sizeof(j)), SyscallSucceeds());
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
+TEST_P(PipeTest, WriterSideClosesReadDataFirst) {
+ SKIP_IF(!CreateBlocking());
+
+ int wbuf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+ ASSERT_THAT(close(wfd.release()), SyscallSucceeds());
+
+ int rbuf;
+ ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(wbuf, rbuf);
+ EXPECT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(0));
}
-TEST_F(PipeTest, ReaderSideCloses) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- int i = 123;
- ASSERT_THAT(write(fds[1], &i, sizeof(i)), SyscallFailsWithErrno(EPIPE));
+TEST_P(PipeTest, ReaderSideCloses) {
+ SKIP_IF(!CreateBlocking());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
+ ASSERT_THAT(close(rfd.release()), SyscallSucceeds());
+ int buf = kTestValue;
+ EXPECT_THAT(write(wfd.get(), &buf, sizeof(buf)),
+ SyscallFailsWithErrno(EPIPE));
}
-TEST_F(PipeTest, CloseTwice) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(close(fds[1]), SyscallFailsWithErrno(EBADF));
-
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- ASSERT_THAT(close(fds[0]), SyscallFailsWithErrno(EBADF));
- ASSERT_THAT(close(fds[1]), SyscallFailsWithErrno(EBADF));
+TEST_P(PipeTest, CloseTwice) {
+ SKIP_IF(!CreateBlocking());
+
+ int _rfd = rfd.release();
+ int _wfd = wfd.release();
+ ASSERT_THAT(close(_rfd), SyscallSucceeds());
+ ASSERT_THAT(close(_wfd), SyscallSucceeds());
+ EXPECT_THAT(close(_rfd), SyscallFailsWithErrno(EBADF));
+ EXPECT_THAT(close(_wfd), SyscallFailsWithErrno(EBADF));
}
// Blocking write returns EPIPE when read end is closed if nothing has been
// written.
-TEST_F(PipeTest, BlockWriteClosed) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int wfd = fds[1];
+TEST_P(PipeTest, BlockWriteClosed) {
+ SKIP_IF(!CreateBlocking());
absl::Notification notify;
- ScopedThread t([wfd, &notify]() {
- std::vector<char> buf(kPipeSize);
+ ScopedThread t([this, &notify]() {
+ std::vector<char> buf(Size());
// Exactly fill the pipe buffer.
- ASSERT_THAT(WriteFd(wfd, buf.data(), buf.size()),
+ ASSERT_THAT(WriteFd(wfd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(buf.size()));
notify.Notify();
// Attempt to write one more byte. Blocks.
// N.B. Don't use WriteFd, we don't want a retry.
- ASSERT_THAT(write(wfd, buf.data(), 1), SyscallFailsWithErrno(EPIPE));
+ EXPECT_THAT(write(wfd.get(), buf.data(), 1), SyscallFailsWithErrno(EPIPE));
});
notify.WaitForNotification();
- absl::SleepFor(absl::Seconds(1.0));
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
-
+ ASSERT_THAT(close(rfd.release()), SyscallSucceeds());
t.Join();
-
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
}
// Blocking write returns EPIPE when read end is closed even if something has
// been written.
-//
-// FIXME(b/35924046): Pipe writes blocking early allows S/R to interrupt the
-// write(2) call before the buffer is full. Then the next call will will return
-// non-zero instead of EPIPE.
-TEST_F(PipeTest, BlockPartialWriteClosed_NoRandomSave) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int wfd = fds[1];
+TEST_P(PipeTest, BlockPartialWriteClosed) {
+ SKIP_IF(!CreateBlocking());
- ScopedThread t([wfd]() {
- std::vector<char> buf(2 * kPipeSize);
+ ScopedThread t([this]() {
+ std::vector<char> buf(2 * Size());
// Write more than fits in the buffer. Blocks then returns partial write
// when the other end is closed. The next call returns EPIPE.
- if (IsRunningOnGvisor()) {
- // FIXME(b/35924046): Pipe writes block early on gVisor, resulting in a
- // shorter than expected partial write.
- ASSERT_THAT(write(wfd, buf.data(), buf.size()),
- SyscallSucceedsWithValue(::testing::Gt(0)));
- } else {
- ASSERT_THAT(write(wfd, buf.data(), buf.size()),
- SyscallSucceedsWithValue(kPipeSize));
- }
- ASSERT_THAT(write(wfd, buf.data(), buf.size()),
+ ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(Size()));
+ EXPECT_THAT(write(wfd.get(), buf.data(), buf.size()),
SyscallFailsWithErrno(EPIPE));
});
// Leave time for write to become blocked.
- absl::SleepFor(absl::Seconds(1.0));
-
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
+ absl::SleepFor(syncDelay);
+ // Unblock the above.
+ ASSERT_THAT(close(rfd.release()), SyscallSucceeds());
t.Join();
-
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
}
-TEST_F(PipeTest, ReadFromClosedFd_NoRandomSave) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- int rfd = fds[0];
+TEST_P(PipeTest, ReadFromClosedFd_NoRandomSave) {
+ SKIP_IF(!CreateBlocking());
+
absl::Notification notify;
- ScopedThread t([rfd, &notify]() {
- int f;
+ ScopedThread t([this, &notify]() {
notify.Notify();
- ASSERT_THAT(read(rfd, &f, sizeof(f)), SyscallSucceedsWithValue(sizeof(f)));
- ASSERT_EQ(123, f);
+ int buf;
+ ASSERT_THAT(read(rfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
+ ASSERT_EQ(kTestValue, buf);
});
notify.WaitForNotification();
+
// Make sure that the thread gets to read().
- absl::SleepFor(absl::Seconds(5.0));
+ absl::SleepFor(syncDelay);
+
{
// We cannot save/restore here as the read end of pipe is closed but there
// is ongoing read() above. We will not be able to restart the read()
// successfully in restore run since the read fd is closed.
const DisableSave ds;
- ASSERT_THAT(close(fds[0]), SyscallSucceeds());
- int i = 123;
- ASSERT_THAT(write(fds[1], &i, sizeof(i)),
- SyscallSucceedsWithValue(sizeof(i)));
+ ASSERT_THAT(close(rfd.release()), SyscallSucceeds());
+ int buf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &buf, sizeof(buf)),
+ SyscallSucceedsWithValue(sizeof(buf)));
t.Join();
}
- ASSERT_THAT(close(fds[1]), SyscallSucceeds());
}
-TEST_F(PipeTest, FionRead) {
- // fds[0] is read end, fds[1] is write end.
- int fds[2];
- int data[2] = {0x12345678, 0x9101112};
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
+TEST_P(PipeTest, FionRead) {
+ SKIP_IF(!CreateBlocking());
- int n = -1;
- EXPECT_THAT(ioctl(fds[0], FIONREAD, &n), SyscallSucceedsWithValue(0));
+ int n;
+ ASSERT_THAT(ioctl(rfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0));
EXPECT_EQ(n, 0);
- n = -1;
- EXPECT_THAT(ioctl(fds[1], FIONREAD, &n), SyscallSucceedsWithValue(0));
+ ASSERT_THAT(ioctl(wfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0));
EXPECT_EQ(n, 0);
- EXPECT_THAT(write(fds[1], data, sizeof(data)),
- SyscallSucceedsWithValue(sizeof(data)));
+ std::vector<char> buf(Size());
+ ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(buf.size()));
- n = -1;
- EXPECT_THAT(ioctl(fds[0], FIONREAD, &n), SyscallSucceedsWithValue(0));
- EXPECT_EQ(n, sizeof(data));
- n = -1;
- EXPECT_THAT(ioctl(fds[1], FIONREAD, &n), SyscallSucceedsWithValue(0));
- EXPECT_EQ(n, sizeof(data));
+ EXPECT_THAT(ioctl(rfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0));
+ EXPECT_EQ(n, buf.size());
+ EXPECT_THAT(ioctl(wfd.get(), FIONREAD, &n), SyscallSucceedsWithValue(0));
+ EXPECT_EQ(n, buf.size());
}
// Test that opening an empty anonymous pipe RDONLY via /proc/self/fd/N does not
// block waiting for a writer.
-TEST_F(PipeTest, OpenViaProcSelfFD) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- FileDescriptor rfd(fds[0]);
- FileDescriptor wfd(fds[1]);
+TEST_P(PipeTest, OpenViaProcSelfFD) {
+ SKIP_IF(!CreateBlocking());
+ SKIP_IF(IsNamedPipe());
// Close the write end of the pipe.
- wfd.release();
+ ASSERT_THAT(close(wfd.release()), SyscallSucceeds());
// Open other side via /proc/self/fd. It should not block.
FileDescriptor proc_self_fd = ASSERT_NO_ERRNO_AND_VALUE(
- Open(absl::StrCat("/proc/self/fd/", fds[0]), O_RDONLY));
+ Open(absl::StrCat("/proc/self/fd/", rfd.get()), O_RDONLY));
}
// Test that opening and reading from an anonymous pipe (with existing writes)
// RDONLY via /proc/self/fd/N returns the existing data.
-TEST_F(PipeTest, OpenViaProcSelfFDWithWrites) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- FileDescriptor rfd(fds[0]);
- FileDescriptor wfd(fds[1]);
+TEST_P(PipeTest, OpenViaProcSelfFDWithWrites) {
+ SKIP_IF(!CreateBlocking());
+ SKIP_IF(IsNamedPipe());
// Write to the pipe and then close the write fd.
- char data = 'x';
- ASSERT_THAT(write(fds[1], &data, 1), SyscallSucceedsWithValue(1));
- wfd.release();
+ int wbuf = kTestValue;
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+ ASSERT_THAT(close(wfd.release()), SyscallSucceeds());
// Open read side via /proc/self/fd, and read from it.
FileDescriptor proc_self_fd = ASSERT_NO_ERRNO_AND_VALUE(
- Open(absl::StrCat("/proc/self/fd/", fds[0]), O_RDONLY));
- char got;
- ASSERT_THAT(read(proc_self_fd.get(), &got, 1), SyscallSucceedsWithValue(1));
+ Open(absl::StrCat("/proc/self/fd/", rfd.get()), O_RDONLY));
+ int rbuf;
+ ASSERT_THAT(read(proc_self_fd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(wbuf, rbuf);
+}
+
+// Test that accesses of /proc/<PID>/fd correctly decrement the refcount.
+TEST_P(PipeTest, ProcFDReleasesFile) {
+ SKIP_IF(!CreateBlocking());
- // We should get what we sent.
- EXPECT_EQ(got, data);
+ // Stat the pipe FD, which shouldn't alter the refcount.
+ struct stat wst;
+ ASSERT_THAT(lstat(absl::StrCat("/proc/self/fd/", wfd.get()).c_str(), &wst),
+ SyscallSucceeds());
+
+ // Close the write end and ensure that read indicates EOF.
+ wfd.reset();
+ char buf;
+ ASSERT_THAT(read(rfd.get(), &buf, 1), SyscallSucceedsWithValue(0));
}
-TEST_F(PipeTest, LargeFile) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- FileDescriptor rfd(fds[0]);
- FileDescriptor wfd(fds[1]);
+// Same for /proc/<PID>/fdinfo.
+TEST_P(PipeTest, ProcFDInfoReleasesFile) {
+ SKIP_IF(!CreateBlocking());
+
+ // Stat the pipe FD, which shouldn't alter the refcount.
+ struct stat wst;
+ ASSERT_THAT(
+ lstat(absl::StrCat("/proc/self/fdinfo/", wfd.get()).c_str(), &wst),
+ SyscallSucceeds());
+
+ // Close the write end and ensure that read indicates EOF.
+ wfd.reset();
+ char buf;
+ ASSERT_THAT(read(rfd.get(), &buf, 1), SyscallSucceedsWithValue(0));
+}
+
+TEST_P(PipeTest, SizeChange) {
+ SKIP_IF(!CreateBlocking());
+
+ // Set the minimum possible size.
+ ASSERT_THAT(fcntl(rfd.get(), F_SETPIPE_SZ, 0), SyscallSucceeds());
+ int min = Size();
+ EXPECT_GT(min, 0); // Should be rounded up.
+
+ // Set from the read end.
+ ASSERT_THAT(fcntl(rfd.get(), F_SETPIPE_SZ, min + 1), SyscallSucceeds());
+ int med = Size();
+ EXPECT_GT(med, min); // Should have grown, may be rounded.
+
+ // Set from the write end.
+ ASSERT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, med + 1), SyscallSucceeds());
+ int max = Size();
+ EXPECT_GT(max, med); // Ditto.
+}
- int rflags;
- EXPECT_THAT(rflags = fcntl(rfd.get(), F_GETFL), SyscallSucceeds());
+TEST_P(PipeTest, SizeChangeMax) {
+ SKIP_IF(!CreateBlocking());
- // The kernel did *not* set O_LARGEFILE.
- EXPECT_EQ(rflags, 0);
+ // Assert there's some maximum.
+ EXPECT_THAT(fcntl(rfd.get(), F_SETPIPE_SZ, 0x7fffffffffffffff),
+ SyscallFailsWithErrno(EINVAL));
+ EXPECT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, 0x7fffffffffffffff),
+ SyscallFailsWithErrno(EINVAL));
}
-// Test that accesses of /proc/<PID>/fd/<FD> and /proc/<PID>/fdinfo/<FD>
-// correctly decrement the refcount of that file descriptor.
-TEST_F(PipeTest, ProcFDReleasesFile) {
- std::vector<std::string> paths = {"/proc/self/fd/", "/proc/self/fdinfo/"};
- for (const std::string& path : paths) {
- int fds[2];
- ASSERT_THAT(pipe(fds), SyscallSucceeds());
- FileDescriptor rfd(fds[0]);
- FileDescriptor wfd(fds[1]);
-
- // Stat the pipe FD, which shouldn't alter the refcount of the write end of
- // the pipe.
- struct stat wst;
- ASSERT_THAT(lstat(absl::StrCat(path.c_str(), wfd.get()).c_str(), &wst),
- SyscallSucceeds());
- // Close the write end of the pipe and ensure that read indicates EOF.
- wfd.reset();
- char buf;
- ASSERT_THAT(read(rfd.get(), &buf, 1), SyscallSucceedsWithValue(0));
+TEST_P(PipeTest, SizeChangeFull) {
+ SKIP_IF(!CreateBlocking());
+
+ // Ensure that we adjust to a large enough size to avoid rounding when we
+ // perform the size decrease. If rounding occurs, we may not actually
+ // adjust the size and the call below will return success. It was found via
+ // experimentation that this granularity avoids the rounding for Linux.
+ constexpr int kDelta = 64 * 1024;
+ ASSERT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, Size() + kDelta),
+ SyscallSucceeds());
+
+ // Fill the buffer and try to change down.
+ std::vector<char> buf(Size());
+ ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
+ SyscallSucceedsWithValue(buf.size()));
+ EXPECT_THAT(fcntl(wfd.get(), F_SETPIPE_SZ, Size() - kDelta),
+ SyscallFailsWithErrno(EBUSY));
+}
+
+TEST_P(PipeTest, Streaming) {
+ SKIP_IF(!CreateBlocking());
+
+ // We make too many calls to go through full save cycles.
+ DisableSave ds;
+
+ absl::Notification notify;
+ ScopedThread t([this, &notify]() {
+ // Don't start until it's full.
+ notify.WaitForNotification();
+ for (int i = 0; i < 2 * Size(); i++) {
+ int rbuf;
+ ASSERT_THAT(read(rfd.get(), &rbuf, sizeof(rbuf)),
+ SyscallSucceedsWithValue(sizeof(rbuf)));
+ EXPECT_EQ(rbuf, i);
+ }
+ });
+ for (int i = 0; i < 2 * Size(); i++) {
+ int wbuf = i;
+ ASSERT_THAT(write(wfd.get(), &wbuf, sizeof(wbuf)),
+ SyscallSucceedsWithValue(sizeof(wbuf)));
+ // Did that write just fill up the buffer? Wake up the reader. Once only.
+ if ((i * sizeof(wbuf)) < Size() && ((i + 1) * sizeof(wbuf)) >= Size()) {
+ notify.Notify();
+ }
}
}
+std::string PipeCreatorName(::testing::TestParamInfo<PipeCreator> info) {
+ return info.param.name_; // Use the name specified.
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ Pipes, PipeTest,
+ ::testing::Values(
+ PipeCreator{
+ "pipe",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ ASSERT_THAT(pipe(fds), SyscallSucceeds());
+ *is_blocking = true;
+ *is_namedpipe = false;
+ },
+ },
+ PipeCreator{
+ "pipe2blocking",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ ASSERT_THAT(pipe2(fds, 0), SyscallSucceeds());
+ *is_blocking = true;
+ *is_namedpipe = false;
+ },
+ },
+ PipeCreator{
+ "pipe2nonblocking",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ ASSERT_THAT(pipe2(fds, O_NONBLOCK), SyscallSucceeds());
+ *is_blocking = false;
+ *is_namedpipe = false;
+ },
+ },
+ PipeCreator{
+ "smallbuffer",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ // Set to the minimum available size (will round up).
+ ASSERT_THAT(pipe(fds), SyscallSucceeds());
+ ASSERT_THAT(fcntl(fds[0], F_SETPIPE_SZ, 0), SyscallSucceeds());
+ *is_blocking = true;
+ *is_namedpipe = false;
+ },
+ },
+ PipeCreator{
+ "namednonblocking",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ // Create a new file-based pipe (non-blocking).
+ auto file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
+ ASSERT_THAT(unlink(file.path().c_str()), SyscallSucceeds());
+ SKIP_IF(mkfifo(file.path().c_str(), 0644) != 0);
+ fds[0] = open(file.path().c_str(), O_NONBLOCK | O_RDONLY);
+ fds[1] = open(file.path().c_str(), O_NONBLOCK | O_WRONLY);
+ MaybeSave();
+ *is_blocking = false;
+ *is_namedpipe = true;
+ },
+ },
+ PipeCreator{
+ "namedblocking",
+ [](int fds[2], bool* is_blocking, bool* is_namedpipe) {
+ // Create a new file-based pipe (blocking).
+ auto file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
+ ASSERT_THAT(unlink(file.path().c_str()), SyscallSucceeds());
+ SKIP_IF(mkfifo(file.path().c_str(), 0644) != 0);
+ ScopedThread t([&file, &fds]() {
+ fds[1] = open(file.path().c_str(), O_WRONLY);
+ });
+ fds[0] = open(file.path().c_str(), O_RDONLY);
+ t.Join();
+ MaybeSave();
+ *is_blocking = true;
+ *is_namedpipe = true;
+ },
+ }),
+ PipeCreatorName);
+
} // namespace
} // namespace testing
} // namespace gvisor