summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/kernel/pipe
diff options
context:
space:
mode:
authorGoogler <noreply@google.com>2018-04-27 10:37:02 -0700
committerAdin Scannell <ascannell@google.com>2018-04-28 01:44:26 -0400
commitd02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch)
tree54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/sentry/kernel/pipe
parentf70210e742919f40aa2f0934a22f1c9ba6dada62 (diff)
Check in gVisor.
PiperOrigin-RevId: 194583126 Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/sentry/kernel/pipe')
-rw-r--r--pkg/sentry/kernel/pipe/BUILD68
-rw-r--r--pkg/sentry/kernel/pipe/buffers.go50
-rw-r--r--pkg/sentry/kernel/pipe/device.go20
-rw-r--r--pkg/sentry/kernel/pipe/node.go175
-rw-r--r--pkg/sentry/kernel/pipe/node_test.go308
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go335
-rw-r--r--pkg/sentry/kernel/pipe/pipe_test.go138
-rw-r--r--pkg/sentry/kernel/pipe/reader.go37
-rw-r--r--pkg/sentry/kernel/pipe/reader_writer.go91
-rw-r--r--pkg/sentry/kernel/pipe/writer.go37
10 files changed, 1259 insertions, 0 deletions
diff --git a/pkg/sentry/kernel/pipe/BUILD b/pkg/sentry/kernel/pipe/BUILD
new file mode 100644
index 000000000..ca9825f9d
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/BUILD
@@ -0,0 +1,68 @@
+package(licenses = ["notice"]) # Apache 2.0
+
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("//tools/go_stateify:defs.bzl", "go_stateify")
+
+go_stateify(
+ name = "pipe_state",
+ srcs = [
+ "buffers.go",
+ "node.go",
+ "pipe.go",
+ "reader.go",
+ "reader_writer.go",
+ "writer.go",
+ ],
+ out = "pipe_state.go",
+ package = "pipe",
+)
+
+go_library(
+ name = "pipe",
+ srcs = [
+ "buffers.go",
+ "device.go",
+ "node.go",
+ "pipe.go",
+ "pipe_state.go",
+ "reader.go",
+ "reader_writer.go",
+ "writer.go",
+ ],
+ importpath = "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/pipe",
+ visibility = ["//pkg/sentry:internal"],
+ deps = [
+ "//pkg/abi/linux",
+ "//pkg/amutex",
+ "//pkg/ilist",
+ "//pkg/log",
+ "//pkg/refs",
+ "//pkg/sentry/arch",
+ "//pkg/sentry/context",
+ "//pkg/sentry/device",
+ "//pkg/sentry/fs",
+ "//pkg/sentry/fs/fsutil",
+ "//pkg/sentry/usermem",
+ "//pkg/state",
+ "//pkg/syserror",
+ "//pkg/waiter",
+ ],
+)
+
+go_test(
+ name = "pipe_test",
+ size = "small",
+ srcs = [
+ "node_test.go",
+ "pipe_test.go",
+ ],
+ embed = [":pipe"],
+ deps = [
+ "//pkg/sentry/context",
+ "//pkg/sentry/context/contexttest",
+ "//pkg/sentry/fs",
+ "//pkg/sentry/usermem",
+ "//pkg/syserror",
+ "//pkg/waiter",
+ ],
+)
diff --git a/pkg/sentry/kernel/pipe/buffers.go b/pkg/sentry/kernel/pipe/buffers.go
new file mode 100644
index 000000000..f300537c5
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/buffers.go
@@ -0,0 +1,50 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "gvisor.googlesource.com/gvisor/pkg/ilist"
+)
+
+// Buffer encapsulates a queueable byte buffer that can
+// easily be truncated. It is designed only for use with pipes.
+type Buffer struct {
+ ilist.Entry
+ data []byte
+}
+
+// newBuffer initializes a Buffer.
+func newBuffer(buf []byte) *Buffer {
+ return &Buffer{data: buf}
+}
+
+// bytes returns the bytes contained in the buffer.
+func (b *Buffer) bytes() []byte {
+ return b.data
+}
+
+// size returns the number of bytes contained in the buffer.
+func (b *Buffer) size() int {
+ return len(b.data)
+}
+
+// truncate removes the first n bytes from the buffer.
+func (b *Buffer) truncate(n int) int {
+ if n > len(b.data) {
+ panic("Trying to truncate past end of array.")
+ }
+ b.data = b.data[n:]
+ return len(b.data)
+}
diff --git a/pkg/sentry/kernel/pipe/device.go b/pkg/sentry/kernel/pipe/device.go
new file mode 100644
index 000000000..8d383577a
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/device.go
@@ -0,0 +1,20 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import "gvisor.googlesource.com/gvisor/pkg/sentry/device"
+
+// pipeDevice is used for all pipe files.
+var pipeDevice = device.NewAnonDevice()
diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go
new file mode 100644
index 000000000..5b47427ef
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/node.go
@@ -0,0 +1,175 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "sync"
+
+ "gvisor.googlesource.com/gvisor/pkg/amutex"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+)
+
+// inodeOperations wraps fs.InodeOperations operations with common pipe opening semantics.
+type inodeOperations struct {
+ fs.InodeOperations
+
+ // mu protects the fields below.
+ mu sync.Mutex `state:"nosave"`
+
+ // p is the underlying Pipe object representing this fifo.
+ p *Pipe
+
+ // Channels for synchronizing the creation of new readers and writers of
+ // this fifo. See waitFor and newHandleLocked.
+ //
+ // These are not saved/restored because all waiters are unblocked on save,
+ // and either automatically restart (via ERESTARTSYS) or return EINTR on
+ // resume. On restarts via ERESTARTSYS, the appropriate channel will be
+ // recreated.
+ rWakeup chan struct{} `state:"nosave"`
+ wWakeup chan struct{} `state:"nosave"`
+}
+
+// NewInodeOperations creates a new pipe fs.InodeOperations.
+func NewInodeOperations(base fs.InodeOperations, p *Pipe) fs.InodeOperations {
+ return &inodeOperations{
+ InodeOperations: base,
+ p: p,
+ }
+}
+
+// GetFile implements fs.InodeOperations.GetFile. Named pipes have special blocking
+// semantics during open:
+//
+// "Normally, opening the FIFO blocks until the other end is opened also. A
+// process can open a FIFO in nonblocking mode. In this case, opening for
+// read-only will succeed even if no-one has opened on the write side yet,
+// opening for write-only will fail with ENXIO (no such device or address)
+// unless the other end has already been opened. Under Linux, opening a FIFO
+// for read and write will succeed both in blocking and nonblocking mode. POSIX
+// leaves this behavior undefined. This can be used to open a FIFO for writing
+// while there are no readers available." - fifo(7)
+func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) (*fs.File, error) {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+
+ switch {
+ case flags.Read && !flags.Write: // O_RDONLY.
+ r := i.p.ROpen(ctx)
+ i.newHandleLocked(&i.rWakeup)
+
+ if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() {
+ if !i.waitFor(&i.wWakeup, ctx) {
+ r.DecRef()
+ return nil, syserror.ErrInterrupted
+ }
+ }
+
+ // By now, either we're doing a nonblocking open or we have a writer. On
+ // a nonblocking read-only open, the open succeeds even if no-one has
+ // opened the write side yet.
+ return r, nil
+
+ case flags.Write && !flags.Read: // O_WRONLY.
+ w := i.p.WOpen(ctx)
+ i.newHandleLocked(&i.wWakeup)
+
+ if i.p.isNamed && !i.p.HasReaders() {
+ // On a nonblocking, write-only open, the open fails with ENXIO if the
+ // read side isn't open yet.
+ if flags.NonBlocking {
+ w.DecRef()
+ return nil, syserror.ENXIO
+ }
+
+ if !i.waitFor(&i.rWakeup, ctx) {
+ w.DecRef()
+ return nil, syserror.ErrInterrupted
+ }
+ }
+ return w, nil
+
+ case flags.Read && flags.Write: // O_RDWR.
+ // Pipes opened for read-write always succeeds without blocking.
+ rw := i.p.RWOpen(ctx)
+ i.newHandleLocked(&i.rWakeup)
+ i.newHandleLocked(&i.wWakeup)
+ return rw, nil
+
+ default:
+ return nil, syserror.EINVAL
+ }
+}
+
+// waitFor blocks until the underlying pipe has at least one reader/writer is
+// announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this
+// function will block for either readers or writers, depending on where
+// 'wakeupChan' points.
+//
+// f.mu must be held by the caller. waitFor returns with f.mu held, but it will
+// drop f.mu before blocking for any reader/writers.
+func (i *inodeOperations) waitFor(wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool {
+ // Ideally this function would simply use a condition variable. However, the
+ // wait needs to be interruptible via 'sleeper', so we must sychronize via a
+ // channel. The synchronization below relies on the fact that closing a
+ // channel unblocks all receives on the channel.
+
+ // Does an appropriate wakeup channel already exist? If not, create a new
+ // one. This is all done under f.mu to avoid races.
+ if *wakeupChan == nil {
+ *wakeupChan = make(chan struct{})
+ }
+
+ // Grab a local reference to the wakeup channel since it may disappear as
+ // soon as we drop f.mu.
+ wakeup := *wakeupChan
+
+ // Drop the lock and prepare to sleep.
+ i.mu.Unlock()
+ cancel := sleeper.SleepStart()
+
+ // Wait for either a new reader/write to be signalled via 'wakeup', or
+ // for the sleep to be cancelled.
+ select {
+ case <-wakeup:
+ sleeper.SleepFinish(true)
+ case <-cancel:
+ sleeper.SleepFinish(false)
+ }
+
+ // Take the lock and check if we were woken. If we were woken and
+ // interrupted, the former takes priority.
+ i.mu.Lock()
+ select {
+ case <-wakeup:
+ return true
+ default:
+ return false
+ }
+}
+
+// newHandleLocked signals a new pipe reader or writer depending on where
+// 'wakeupChan' points. This unblocks any corresponding reader or writer
+// waiting for the other end of the channel to be opened, see Fifo.waitFor.
+//
+// i.mu must be held.
+func (*inodeOperations) newHandleLocked(wakeupChan *chan struct{}) {
+ if *wakeupChan != nil {
+ close(*wakeupChan)
+ *wakeupChan = nil
+ }
+}
diff --git a/pkg/sentry/kernel/pipe/node_test.go b/pkg/sentry/kernel/pipe/node_test.go
new file mode 100644
index 000000000..cc1ebf4f6
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/node_test.go
@@ -0,0 +1,308 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "testing"
+ "time"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context/contexttest"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+)
+
+type sleeper struct {
+ context.Context
+ ch chan struct{}
+}
+
+func newSleeperContext(t *testing.T) context.Context {
+ return &sleeper{
+ Context: contexttest.Context(t),
+ ch: make(chan struct{}),
+ }
+}
+
+func (s *sleeper) SleepStart() <-chan struct{} {
+ return s.ch
+}
+
+func (s *sleeper) SleepFinish(bool) {
+}
+
+func (s *sleeper) Cancel() {
+ s.ch <- struct{}{}
+}
+
+type openResult struct {
+ *fs.File
+ error
+}
+
+func testOpenOrDie(ctx context.Context, t *testing.T, n fs.InodeOperations, flags fs.FileFlags, doneChan chan<- struct{}) (*fs.File, error) {
+ file, err := n.GetFile(ctx, nil, flags)
+ if err != nil {
+ t.Fatalf("open with flags %+v failed: %v", flags, err)
+ }
+ if doneChan != nil {
+ doneChan <- struct{}{}
+ }
+ return file, err
+}
+
+func testOpen(ctx context.Context, t *testing.T, n fs.InodeOperations, flags fs.FileFlags, resChan chan<- openResult) (*fs.File, error) {
+ file, err := n.GetFile(ctx, nil, flags)
+ if resChan != nil {
+ resChan <- openResult{file, err}
+ }
+ return file, err
+}
+
+func newNamedPipe(t *testing.T) *Pipe {
+ return NewPipe(contexttest.Context(t), true, DefaultPipeSize, usermem.PageSize)
+}
+
+func newAnonPipe(t *testing.T) *Pipe {
+ return NewPipe(contexttest.Context(t), false, DefaultPipeSize, usermem.PageSize)
+}
+
+// assertRecvBlocks ensures that a recv attempt on c blocks for at least
+// blockDuration. This is useful for checking that a goroutine that is supposed
+// to be executing a blocking operation is actually blocking.
+func assertRecvBlocks(t *testing.T, c <-chan struct{}, blockDuration time.Duration, failMsg string) {
+ select {
+ case <-c:
+ t.Fatalf(failMsg)
+ case <-time.After(blockDuration):
+ // Ok, blocked for the required duration.
+ }
+}
+
+func TestReadOpenBlocksForWriteOpen(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ rDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone)
+
+ // Verify that the open for read is blocking.
+ assertRecvBlocks(t, rDone, time.Millisecond*100,
+ "open for read not blocking with no writers")
+
+ wDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone)
+
+ <-wDone
+ <-rDone
+}
+
+func TestWriteOpenBlocksForReadOpen(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ wDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone)
+
+ // Verify that the open for write is blocking
+ assertRecvBlocks(t, wDone, time.Millisecond*100,
+ "open for write not blocking with no readers")
+
+ rDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone)
+
+ <-rDone
+ <-wDone
+}
+
+func TestMultipleWriteOpenDoesntCountAsReadOpen(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ rDone1 := make(chan struct{})
+ rDone2 := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone1)
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone2)
+
+ assertRecvBlocks(t, rDone1, time.Millisecond*100,
+ "open for read didn't block with no writers")
+ assertRecvBlocks(t, rDone2, time.Millisecond*100,
+ "open for read didn't block with no writers")
+
+ wDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone)
+
+ <-wDone
+ <-rDone2
+ <-rDone1
+}
+
+func TestClosedReaderBlocksWriteOpen(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ rFile, _ := testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true, NonBlocking: true}, nil)
+ rFile.DecRef()
+
+ wDone := make(chan struct{})
+ // This open for write should block because the reader is now gone.
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone)
+ assertRecvBlocks(t, wDone, time.Millisecond*100,
+ "open for write didn't block with no concurrent readers")
+
+ // Open for read again. This should unblock the open for write.
+ rDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone)
+
+ <-rDone
+ <-wDone
+}
+
+func TestReadWriteOpenNeverBlocks(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ rwDone := make(chan struct{})
+ // Open for read-write never wait for a reader or writer, even if the
+ // nonblocking flag is not set.
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true, Write: true, NonBlocking: false}, rwDone)
+ <-rwDone
+}
+
+func TestReadWriteOpenUnblocksReadOpen(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ rDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone)
+
+ rwDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true, Write: true}, rwDone)
+
+ <-rwDone
+ <-rDone
+}
+
+func TestReadWriteOpenUnblocksWriteOpen(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ wDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone)
+
+ rwDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true, Write: true}, rwDone)
+
+ <-rwDone
+ <-wDone
+}
+
+func TestBlockedOpenIsCancellable(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ done := make(chan openResult)
+ go testOpen(ctx, t, f, fs.FileFlags{Read: true}, done)
+ select {
+ case <-done:
+ t.Fatalf("open for read didn't block with no writers")
+ case <-time.After(time.Millisecond * 100):
+ // Ok.
+ }
+
+ ctx.(*sleeper).Cancel()
+ // If the cancel on the sleeper didn't work, the open for read would never
+ // return.
+ res := <-done
+ if res.error != syserror.ErrInterrupted {
+ t.Fatalf("Cancellation didn't cause GetFile to return fs.ErrInterrupted, got %v.",
+ res.error)
+ }
+}
+
+func TestNonblockingReadOpenNoWriters(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ if _, err := testOpen(ctx, t, f, fs.FileFlags{Read: true, NonBlocking: true}, nil); err != nil {
+ t.Fatalf("Nonblocking open for read failed with error %v.", err)
+ }
+}
+
+func TestNonblockingWriteOpenNoReaders(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ if _, err := testOpen(ctx, t, f, fs.FileFlags{Write: true, NonBlocking: true}, nil); err != syserror.ENXIO {
+ t.Fatalf("Nonblocking open for write failed unexpected error %v.", err)
+ }
+}
+
+func TestNonBlockingReadOpenWithWriter(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ wDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone)
+
+ // Open for write blocks since there are no readers yet.
+ assertRecvBlocks(t, wDone, time.Millisecond*100,
+ "Open for write didn't block with no reader.")
+
+ if _, err := testOpen(ctx, t, f, fs.FileFlags{Read: true, NonBlocking: true}, nil); err != nil {
+ t.Fatalf("Nonblocking open for read failed with error %v.", err)
+ }
+
+ // Open for write should now be unblocked.
+ <-wDone
+}
+
+func TestNonBlockingWriteOpenWithReader(t *testing.T) {
+ f := NewInodeOperations(nil, newNamedPipe(t))
+ ctx := newSleeperContext(t)
+
+ rDone := make(chan struct{})
+ go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone)
+
+ // Open for write blocked, since no reader yet.
+ assertRecvBlocks(t, rDone, time.Millisecond*100,
+ "Open for reader didn't block with no writer.")
+
+ if _, err := testOpen(ctx, t, f, fs.FileFlags{Write: true, NonBlocking: true}, nil); err != nil {
+ t.Fatalf("Nonblocking open for write failed with error %v.", err)
+ }
+
+ // Open for write should now be unblocked.
+ <-rDone
+}
+
+func TestAnonReadOpen(t *testing.T) {
+ f := NewInodeOperations(nil, newAnonPipe(t))
+ ctx := newSleeperContext(t)
+
+ if _, err := testOpen(ctx, t, f, fs.FileFlags{Read: true}, nil); err != nil {
+ t.Fatalf("open anon pipe for read failed: %v", err)
+ }
+}
+
+func TestAnonWriteOpen(t *testing.T) {
+ f := NewInodeOperations(nil, newAnonPipe(t))
+ ctx := newSleeperContext(t)
+
+ if _, err := testOpen(ctx, t, f, fs.FileFlags{Write: true}, nil); err != nil {
+ t.Fatalf("open anon pipe for write failed: %v", err)
+ }
+}
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
new file mode 100644
index 000000000..1656c6ff3
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -0,0 +1,335 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package pipe provides an in-memory implementation of a unidirectional
+// pipe.
+//
+// The goal of this pipe is to emulate the pipe syscall in all of its
+// edge cases and guarantees of atomic IO.
+package pipe
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "syscall"
+
+ "gvisor.googlesource.com/gvisor/pkg/abi/linux"
+ "gvisor.googlesource.com/gvisor/pkg/ilist"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+// DefaultPipeSize is the system-wide default size of a pipe in bytes.
+const DefaultPipeSize = 65536
+
+// Pipe is an encapsulation of a platform-independent pipe.
+// It manages a buffered byte queue shared between a reader/writer
+// pair.
+type Pipe struct {
+ waiter.Queue `state:"nosave"`
+
+ // Whether this is a named or anonymous pipe.
+ isNamed bool
+
+ // The dirent backing this pipe. Shared by all readers and writers.
+ dirent *fs.Dirent
+
+ // The buffered byte queue.
+ data ilist.List
+
+ // Max size of the pipe in bytes. When this max has been reached,
+ // writers will get EWOULDBLOCK.
+ max int
+
+ // Current size of the pipe in bytes.
+ size int
+
+ // Max number of bytes the pipe can guarantee to read or write
+ // atomically.
+ atomicIOBytes int
+
+ // The number of active readers for this pipe. Load/store atomically.
+ readers int32
+
+ // The number of active writes for this pipe. Load/store atomically.
+ writers int32
+
+ // This flag indicates if this pipe ever had a writer. Note that this does
+ // not necessarily indicate there is *currently* a writer, just that there
+ // has been a writer at some point since the pipe was created.
+ //
+ // Protected by mu.
+ hadWriter bool
+
+ // Lock protecting all pipe internal state.
+ mu sync.Mutex `state:"nosave"`
+}
+
+// NewPipe initializes and returns a pipe. A pipe created by this function is
+// persistent, and will remain valid even without any open fds to it. Named
+// pipes for mknod(2) are created via this function. Note that the
+// implementation of blocking semantics for opening the read and write ends of a
+// named pipe are left to filesystems.
+func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe {
+ p := &Pipe{
+ isNamed: isNamed,
+ max: sizeBytes,
+ atomicIOBytes: atomicIOBytes,
+ }
+
+ // Build the fs.Dirent of this pipe, shared by all fs.Files associated
+ // with this pipe.
+ ino := pipeDevice.NextIno()
+ base := fsutil.NewSimpleInodeOperations(fsutil.InodeSimpleAttributes{
+ FSType: linux.PIPEFS_MAGIC,
+ UAttr: fs.WithCurrentTime(ctx, fs.UnstableAttr{
+ Owner: fs.FileOwnerFromContext(ctx),
+ Perms: fs.FilePermissions{
+ User: fs.PermMask{Read: true, Write: true},
+ },
+ Links: 1,
+ }),
+ })
+ sattr := fs.StableAttr{
+ Type: fs.Pipe,
+ DeviceID: pipeDevice.DeviceID(),
+ InodeID: ino,
+ BlockSize: int64(atomicIOBytes),
+ }
+ // There is no real filesystem backing this pipe, so we pass in a nil
+ // Filesystem.
+ sb := fs.NewNonCachingMountSource(nil, fs.MountSourceFlags{})
+ p.dirent = fs.NewDirent(fs.NewInode(NewInodeOperations(base, p), sb, sattr), fmt.Sprintf("pipe:[%d]", ino))
+
+ return p
+}
+
+// NewConnectedPipe initializes a pipe and returns a pair of objects (which
+// implement kio.File) representing the read and write ends of the pipe. A pipe
+// created by this function becomes invalid as soon as either the read or write
+// end is closed, and errors on subsequent operations on either end. Pipes
+// for pipe(2) and pipe2(2) are generally created this way.
+func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) {
+ p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes)
+ return p.ROpen(ctx), p.WOpen(ctx)
+}
+
+// ROpen opens the pipe for reading.
+func (p *Pipe) ROpen(ctx context.Context) *fs.File {
+ p.rOpen()
+ return fs.NewFile(ctx, p.dirent, fs.FileFlags{Read: true}, &Reader{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+}
+
+// WOpen opens the pipe for writing.
+func (p *Pipe) WOpen(ctx context.Context) *fs.File {
+ p.wOpen()
+ return fs.NewFile(ctx, p.dirent, fs.FileFlags{Write: true}, &Writer{
+ ReaderWriter: ReaderWriter{Pipe: p},
+ })
+}
+
+// RWOpen opens the pipe for both reading and writing.
+func (p *Pipe) RWOpen(ctx context.Context) *fs.File {
+ p.rOpen()
+ p.wOpen()
+ return fs.NewFile(ctx, p.dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{
+ Pipe: p,
+ })
+}
+
+// read reads data from the pipe into dst and returns the number of bytes
+// read, or returns ErrWouldBlock if the pipe is empty.
+func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
+ if !p.HasReaders() {
+ return 0, syscall.EBADF
+ }
+
+ // Don't block for a zero-length read even if the pipe is empty.
+ if dst.NumBytes() == 0 {
+ return 0, nil
+ }
+
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ // If there is nothing to read at the moment but there is a writer, tell the
+ // caller to block.
+ if p.size == 0 {
+ if !p.HasWriters() {
+ // There are no writers, return EOF.
+ return 0, nil
+ }
+ return 0, syserror.ErrWouldBlock
+ }
+ var n int64
+ for b := p.data.Front(); b != nil; b = p.data.Front() {
+ buffer := b.(*Buffer)
+ n0, err := dst.CopyOut(ctx, buffer.bytes())
+ n += int64(n0)
+ p.size -= n0
+ if buffer.truncate(n0) == 0 {
+ p.data.Remove(b)
+ }
+ dst = dst.DropFirst(n0)
+ if dst.NumBytes() == 0 || err != nil {
+ return n, err
+ }
+ }
+ return n, nil
+}
+
+// write writes data from sv into the pipe and returns the number of bytes
+// written. If no bytes are written because the pipe is full (or has less than
+// atomicIOBytes free capacity), write returns ErrWouldBlock.
+func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if !p.HasWriters() {
+ return 0, syscall.EBADF
+ }
+ if !p.HasReaders() {
+ return 0, syscall.EPIPE
+ }
+
+ // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
+ // atomic, but requires no atomicity for writes larger than this. However,
+ // Linux appears to provide stronger semantics than this in practice:
+ // unmerged writes are done one PAGE_SIZE buffer at a time, so for larger
+ // writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement
+ // this by writing at most atomicIOBytes at a time if we can't service the
+ // write in its entirety.
+ canWrite := src.NumBytes()
+ if canWrite > int64(p.max-p.size) {
+ if p.max-p.size >= p.atomicIOBytes {
+ canWrite = int64(p.atomicIOBytes)
+ } else {
+ return 0, syserror.ErrWouldBlock
+ }
+ }
+
+ // Copy data from user memory into a pipe-owned buffer.
+ buf := make([]byte, canWrite)
+ n, err := src.CopyIn(ctx, buf)
+ if n > 0 {
+ p.data.PushBack(newBuffer(buf[:n]))
+ p.size += n
+ }
+ if int64(n) < src.NumBytes() && err == nil {
+ // Partial write due to full pipe.
+ err = syserror.ErrWouldBlock
+ }
+ return int64(n), err
+}
+
+// rOpen signals a new reader of the pipe.
+func (p *Pipe) rOpen() {
+ atomic.AddInt32(&p.readers, 1)
+}
+
+// wOpen signals a new writer of the pipe.
+func (p *Pipe) wOpen() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ p.hadWriter = true
+ atomic.AddInt32(&p.writers, 1)
+}
+
+// rClose signals that a reader has closed their end of the pipe.
+func (p *Pipe) rClose() {
+ newReaders := atomic.AddInt32(&p.readers, -1)
+ if newReaders < 0 {
+ panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders))
+ }
+}
+
+// wClose signals that a writer has closed their end of the pipe.
+func (p *Pipe) wClose() {
+ newWriters := atomic.AddInt32(&p.writers, -1)
+ if newWriters < 0 {
+ panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters))
+ }
+}
+
+// HasReaders returns whether the pipe has any active readers.
+func (p *Pipe) HasReaders() bool {
+ return atomic.LoadInt32(&p.readers) > 0
+}
+
+// HasWriters returns whether the pipe has any active writers.
+func (p *Pipe) HasWriters() bool {
+ return atomic.LoadInt32(&p.writers) > 0
+}
+
+func (p *Pipe) rReadinessLocked() waiter.EventMask {
+ ready := waiter.EventMask(0)
+ if p.HasReaders() && p.data.Front() != nil {
+ ready |= waiter.EventIn
+ }
+ if !p.HasWriters() && p.hadWriter {
+ // POLLHUP must be supressed until the pipe has had at least one writer
+ // at some point. Otherwise a reader thread may poll and immediately get
+ // a POLLHUP before the writer ever opens the pipe, which the reader may
+ // interpret as the writer opening then closing the pipe.
+ ready |= waiter.EventHUp
+ }
+ return ready
+}
+
+// rReadiness returns a mask that states whether the read end of the pipe is
+// ready for reading.
+func (p *Pipe) rReadiness() waiter.EventMask {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.rReadinessLocked()
+}
+
+func (p *Pipe) wReadinessLocked() waiter.EventMask {
+ ready := waiter.EventMask(0)
+ if p.HasWriters() && p.size < p.max {
+ ready |= waiter.EventOut
+ }
+ if !p.HasReaders() {
+ ready |= waiter.EventErr
+ }
+ return ready
+}
+
+// wReadiness returns a mask that states whether the write end of the pipe
+// is ready for writing.
+func (p *Pipe) wReadiness() waiter.EventMask {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.wReadinessLocked()
+}
+
+// rwReadiness returns a mask that states whether a read-write handle to the
+// pipe is ready for IO.
+func (p *Pipe) rwReadiness() waiter.EventMask {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.rReadinessLocked() | p.wReadinessLocked()
+}
+
+func (p *Pipe) queuedSize() int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.size
+}
diff --git a/pkg/sentry/kernel/pipe/pipe_test.go b/pkg/sentry/kernel/pipe/pipe_test.go
new file mode 100644
index 000000000..49ef8c8ac
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/pipe_test.go
@@ -0,0 +1,138 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "bytes"
+ "testing"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context/contexttest"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+ "gvisor.googlesource.com/gvisor/pkg/syserror"
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+func TestPipeRW(t *testing.T) {
+ ctx := contexttest.Context(t)
+ r, w := NewConnectedPipe(ctx, 65536, 4096)
+ defer r.DecRef()
+ defer w.DecRef()
+
+ msg := []byte("here's some bytes")
+ wantN := int64(len(msg))
+ n, err := w.Writev(ctx, usermem.BytesIOSequence(msg))
+ if n != wantN || err != nil {
+ t.Fatalf("Writev: got (%d, %v), wanted (%d, nil)", n, err, wantN)
+ }
+
+ buf := make([]byte, len(msg))
+ n, err = r.Readv(ctx, usermem.BytesIOSequence(buf))
+ if n != wantN || err != nil || !bytes.Equal(buf, msg) {
+ t.Fatalf("Readv: got (%d, %v) %q, wanted (%d, nil) %q", n, err, buf, wantN, msg)
+ }
+}
+
+func TestPipeReadBlock(t *testing.T) {
+ ctx := contexttest.Context(t)
+ r, w := NewConnectedPipe(ctx, 65536, 4096)
+ defer r.DecRef()
+ defer w.DecRef()
+
+ n, err := r.Readv(ctx, usermem.BytesIOSequence(make([]byte, 1)))
+ if n != 0 || err != syserror.ErrWouldBlock {
+ t.Fatalf("Readv: got (%d, %v), wanted (0, %v)", n, err, syserror.ErrWouldBlock)
+ }
+}
+
+func TestPipeWriteBlock(t *testing.T) {
+ const atomicIOBytes = 2
+
+ ctx := contexttest.Context(t)
+ r, w := NewConnectedPipe(ctx, 10, atomicIOBytes)
+ defer r.DecRef()
+ defer w.DecRef()
+
+ msg := []byte("here's some bytes")
+ n, err := w.Writev(ctx, usermem.BytesIOSequence(msg))
+ if wantN, wantErr := int64(atomicIOBytes), syserror.ErrWouldBlock; n != wantN || err != wantErr {
+ t.Fatalf("Writev: got (%d, %v), wanted (%d, %v)", n, err, wantN, wantErr)
+ }
+}
+
+func TestPipeWriteUntilEnd(t *testing.T) {
+ const atomicIOBytes = 2
+
+ ctx := contexttest.Context(t)
+ r, w := NewConnectedPipe(ctx, atomicIOBytes, atomicIOBytes)
+ defer r.DecRef()
+ defer w.DecRef()
+
+ msg := []byte("here's some bytes")
+
+ wDone := make(chan struct{}, 0)
+ rDone := make(chan struct{}, 0)
+ defer func() {
+ // Signal the reader to stop and wait until it does so.
+ close(wDone)
+ <-rDone
+ }()
+
+ go func() {
+ defer close(rDone)
+ // Read from r until done is closed.
+ ctx := contexttest.Context(t)
+ buf := make([]byte, len(msg)+1)
+ dst := usermem.BytesIOSequence(buf)
+ e, ch := waiter.NewChannelEntry(nil)
+ r.EventRegister(&e, waiter.EventIn)
+ defer r.EventUnregister(&e)
+ for {
+ n, err := r.Readv(ctx, dst)
+ dst = dst.DropFirst64(n)
+ if err == syserror.ErrWouldBlock {
+ select {
+ case <-ch:
+ continue
+ case <-wDone:
+ // We expect to have 1 byte left in dst since len(buf) ==
+ // len(msg)+1.
+ if dst.NumBytes() != 1 || !bytes.Equal(buf[:len(msg)], msg) {
+ t.Errorf("Reader: got %q (%d bytes remaining), wanted %q", buf, dst.NumBytes(), msg)
+ }
+ return
+ }
+ }
+ if err != nil {
+ t.Fatalf("Readv: got unexpected error %v", err)
+ }
+ }
+ }()
+
+ src := usermem.BytesIOSequence(msg)
+ e, ch := waiter.NewChannelEntry(nil)
+ w.EventRegister(&e, waiter.EventOut)
+ defer w.EventUnregister(&e)
+ for src.NumBytes() != 0 {
+ n, err := w.Writev(ctx, src)
+ src = src.DropFirst64(n)
+ if err == syserror.ErrWouldBlock {
+ <-ch
+ continue
+ }
+ if err != nil {
+ t.Fatalf("Writev: got (%d, %v)", n, err)
+ }
+ }
+}
diff --git a/pkg/sentry/kernel/pipe/reader.go b/pkg/sentry/kernel/pipe/reader.go
new file mode 100644
index 000000000..40d5e4943
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/reader.go
@@ -0,0 +1,37 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+// Reader satisfies the fs.FileOperations interface for read-only pipes.
+// Reader should be used with !fs.FileFlags.Write to reject writes.
+type Reader struct {
+ ReaderWriter
+}
+
+// Release implements fs.FileOperations.Release.
+func (r *Reader) Release() {
+ r.Pipe.rClose()
+ // Wake up writers.
+ r.Pipe.Notify(waiter.EventOut)
+}
+
+// Readiness returns the ready events in the underlying pipe.
+func (r *Reader) Readiness(mask waiter.EventMask) waiter.EventMask {
+ return r.Pipe.rReadiness() & mask
+}
diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go
new file mode 100644
index 000000000..dc642a3a6
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/reader_writer.go
@@ -0,0 +1,91 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "fmt"
+ "math"
+ "syscall"
+
+ "gvisor.googlesource.com/gvisor/pkg/sentry/arch"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/context"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil"
+ "gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+// ReaderWriter satisfies the FileOperations interface and services both
+// read and write requests. This should only be used directly for named pipes.
+// pipe(2) and pipe2(2) only support unidirectional pipes and should use
+// either pipe.Reader or pipe.Writer.
+type ReaderWriter struct {
+ fsutil.PipeSeek `state:"nosave"`
+ fsutil.NotDirReaddir `state:"nosave"`
+ fsutil.NoFsync `state:"nosave"`
+ fsutil.NoopFlush `state:"nosave"`
+ fsutil.NoMMap `state:"nosave"`
+ *Pipe
+}
+
+// Release implements fs.FileOperations.Release.
+func (rw *ReaderWriter) Release() {
+ rw.Pipe.rClose()
+ rw.Pipe.wClose()
+ // Wake up readers and writers.
+ rw.Pipe.Notify(waiter.EventIn | waiter.EventOut)
+}
+
+// Read implements fs.FileOperations.Read.
+func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) {
+ n, err := rw.Pipe.read(ctx, dst)
+ if n > 0 {
+ rw.Pipe.Notify(waiter.EventOut)
+ }
+ return n, err
+}
+
+// Write implements fs.FileOperations.Write.
+func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) {
+ n, err := rw.Pipe.write(ctx, src)
+ if n > 0 {
+ rw.Pipe.Notify(waiter.EventIn)
+ }
+ return n, err
+}
+
+// Readiness returns the ready events in the underlying pipe.
+func (rw *ReaderWriter) Readiness(mask waiter.EventMask) waiter.EventMask {
+ return rw.Pipe.rwReadiness() & mask
+}
+
+// Ioctl implements fs.FileOperations.Ioctl.
+func (rw *ReaderWriter) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArguments) (uintptr, error) {
+ // Switch on ioctl request.
+ switch int(args[1].Int()) {
+ case syscall.TIOCINQ:
+ v := rw.queuedSize()
+ if v > math.MaxInt32 {
+ panic(fmt.Sprintf("Impossibly large pipe queued size: %d", v))
+ }
+ // Copy result to user-space.
+ _, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{
+ AddressSpaceActive: true,
+ })
+ return 0, err
+ default:
+ return 0, syscall.ENOTTY
+ }
+}
diff --git a/pkg/sentry/kernel/pipe/writer.go b/pkg/sentry/kernel/pipe/writer.go
new file mode 100644
index 000000000..fd13008ac
--- /dev/null
+++ b/pkg/sentry/kernel/pipe/writer.go
@@ -0,0 +1,37 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+// Writer satisfies the fs.FileOperations interface for write-only pipes.
+// Writer should be used with !fs.FileFlags.Read to reject reads.
+type Writer struct {
+ ReaderWriter
+}
+
+// Release implements fs.FileOperations.Release.
+func (w *Writer) Release() {
+ w.Pipe.wClose()
+ // Wake up readers.
+ w.Pipe.Notify(waiter.EventHUp)
+}
+
+// Readiness returns the ready events in the underlying pipe.
+func (w *Writer) Readiness(mask waiter.EventMask) waiter.EventMask {
+ return w.Pipe.wReadiness() & mask
+}