diff options
Diffstat (limited to 'pkg/sentry/kernel/pipe')
-rw-r--r-- | pkg/sentry/kernel/pipe/BUILD | 68 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/buffers.go | 50 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/device.go | 20 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/node.go | 175 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/node_test.go | 308 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe.go | 335 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe_test.go | 138 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/reader.go | 37 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/reader_writer.go | 91 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/writer.go | 37 |
10 files changed, 1259 insertions, 0 deletions
diff --git a/pkg/sentry/kernel/pipe/BUILD b/pkg/sentry/kernel/pipe/BUILD new file mode 100644 index 000000000..ca9825f9d --- /dev/null +++ b/pkg/sentry/kernel/pipe/BUILD @@ -0,0 +1,68 @@ +package(licenses = ["notice"]) # Apache 2.0 + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("//tools/go_stateify:defs.bzl", "go_stateify") + +go_stateify( + name = "pipe_state", + srcs = [ + "buffers.go", + "node.go", + "pipe.go", + "reader.go", + "reader_writer.go", + "writer.go", + ], + out = "pipe_state.go", + package = "pipe", +) + +go_library( + name = "pipe", + srcs = [ + "buffers.go", + "device.go", + "node.go", + "pipe.go", + "pipe_state.go", + "reader.go", + "reader_writer.go", + "writer.go", + ], + importpath = "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/pipe", + visibility = ["//pkg/sentry:internal"], + deps = [ + "//pkg/abi/linux", + "//pkg/amutex", + "//pkg/ilist", + "//pkg/log", + "//pkg/refs", + "//pkg/sentry/arch", + "//pkg/sentry/context", + "//pkg/sentry/device", + "//pkg/sentry/fs", + "//pkg/sentry/fs/fsutil", + "//pkg/sentry/usermem", + "//pkg/state", + "//pkg/syserror", + "//pkg/waiter", + ], +) + +go_test( + name = "pipe_test", + size = "small", + srcs = [ + "node_test.go", + "pipe_test.go", + ], + embed = [":pipe"], + deps = [ + "//pkg/sentry/context", + "//pkg/sentry/context/contexttest", + "//pkg/sentry/fs", + "//pkg/sentry/usermem", + "//pkg/syserror", + "//pkg/waiter", + ], +) diff --git a/pkg/sentry/kernel/pipe/buffers.go b/pkg/sentry/kernel/pipe/buffers.go new file mode 100644 index 000000000..f300537c5 --- /dev/null +++ b/pkg/sentry/kernel/pipe/buffers.go @@ -0,0 +1,50 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "gvisor.googlesource.com/gvisor/pkg/ilist" +) + +// Buffer encapsulates a queueable byte buffer that can +// easily be truncated. It is designed only for use with pipes. +type Buffer struct { + ilist.Entry + data []byte +} + +// newBuffer initializes a Buffer. +func newBuffer(buf []byte) *Buffer { + return &Buffer{data: buf} +} + +// bytes returns the bytes contained in the buffer. +func (b *Buffer) bytes() []byte { + return b.data +} + +// size returns the number of bytes contained in the buffer. +func (b *Buffer) size() int { + return len(b.data) +} + +// truncate removes the first n bytes from the buffer. +func (b *Buffer) truncate(n int) int { + if n > len(b.data) { + panic("Trying to truncate past end of array.") + } + b.data = b.data[n:] + return len(b.data) +} diff --git a/pkg/sentry/kernel/pipe/device.go b/pkg/sentry/kernel/pipe/device.go new file mode 100644 index 000000000..8d383577a --- /dev/null +++ b/pkg/sentry/kernel/pipe/device.go @@ -0,0 +1,20 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import "gvisor.googlesource.com/gvisor/pkg/sentry/device" + +// pipeDevice is used for all pipe files. +var pipeDevice = device.NewAnonDevice() diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go new file mode 100644 index 000000000..5b47427ef --- /dev/null +++ b/pkg/sentry/kernel/pipe/node.go @@ -0,0 +1,175 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "sync" + + "gvisor.googlesource.com/gvisor/pkg/amutex" + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/syserror" +) + +// inodeOperations wraps fs.InodeOperations operations with common pipe opening semantics. +type inodeOperations struct { + fs.InodeOperations + + // mu protects the fields below. + mu sync.Mutex `state:"nosave"` + + // p is the underlying Pipe object representing this fifo. + p *Pipe + + // Channels for synchronizing the creation of new readers and writers of + // this fifo. See waitFor and newHandleLocked. + // + // These are not saved/restored because all waiters are unblocked on save, + // and either automatically restart (via ERESTARTSYS) or return EINTR on + // resume. On restarts via ERESTARTSYS, the appropriate channel will be + // recreated. + rWakeup chan struct{} `state:"nosave"` + wWakeup chan struct{} `state:"nosave"` +} + +// NewInodeOperations creates a new pipe fs.InodeOperations. +func NewInodeOperations(base fs.InodeOperations, p *Pipe) fs.InodeOperations { + return &inodeOperations{ + InodeOperations: base, + p: p, + } +} + +// GetFile implements fs.InodeOperations.GetFile. Named pipes have special blocking +// semantics during open: +// +// "Normally, opening the FIFO blocks until the other end is opened also. A +// process can open a FIFO in nonblocking mode. In this case, opening for +// read-only will succeed even if no-one has opened on the write side yet, +// opening for write-only will fail with ENXIO (no such device or address) +// unless the other end has already been opened. Under Linux, opening a FIFO +// for read and write will succeed both in blocking and nonblocking mode. POSIX +// leaves this behavior undefined. This can be used to open a FIFO for writing +// while there are no readers available." - fifo(7) +func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) (*fs.File, error) { + i.mu.Lock() + defer i.mu.Unlock() + + switch { + case flags.Read && !flags.Write: // O_RDONLY. + r := i.p.ROpen(ctx) + i.newHandleLocked(&i.rWakeup) + + if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() { + if !i.waitFor(&i.wWakeup, ctx) { + r.DecRef() + return nil, syserror.ErrInterrupted + } + } + + // By now, either we're doing a nonblocking open or we have a writer. On + // a nonblocking read-only open, the open succeeds even if no-one has + // opened the write side yet. + return r, nil + + case flags.Write && !flags.Read: // O_WRONLY. + w := i.p.WOpen(ctx) + i.newHandleLocked(&i.wWakeup) + + if i.p.isNamed && !i.p.HasReaders() { + // On a nonblocking, write-only open, the open fails with ENXIO if the + // read side isn't open yet. + if flags.NonBlocking { + w.DecRef() + return nil, syserror.ENXIO + } + + if !i.waitFor(&i.rWakeup, ctx) { + w.DecRef() + return nil, syserror.ErrInterrupted + } + } + return w, nil + + case flags.Read && flags.Write: // O_RDWR. + // Pipes opened for read-write always succeeds without blocking. + rw := i.p.RWOpen(ctx) + i.newHandleLocked(&i.rWakeup) + i.newHandleLocked(&i.wWakeup) + return rw, nil + + default: + return nil, syserror.EINVAL + } +} + +// waitFor blocks until the underlying pipe has at least one reader/writer is +// announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this +// function will block for either readers or writers, depending on where +// 'wakeupChan' points. +// +// f.mu must be held by the caller. waitFor returns with f.mu held, but it will +// drop f.mu before blocking for any reader/writers. +func (i *inodeOperations) waitFor(wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool { + // Ideally this function would simply use a condition variable. However, the + // wait needs to be interruptible via 'sleeper', so we must sychronize via a + // channel. The synchronization below relies on the fact that closing a + // channel unblocks all receives on the channel. + + // Does an appropriate wakeup channel already exist? If not, create a new + // one. This is all done under f.mu to avoid races. + if *wakeupChan == nil { + *wakeupChan = make(chan struct{}) + } + + // Grab a local reference to the wakeup channel since it may disappear as + // soon as we drop f.mu. + wakeup := *wakeupChan + + // Drop the lock and prepare to sleep. + i.mu.Unlock() + cancel := sleeper.SleepStart() + + // Wait for either a new reader/write to be signalled via 'wakeup', or + // for the sleep to be cancelled. + select { + case <-wakeup: + sleeper.SleepFinish(true) + case <-cancel: + sleeper.SleepFinish(false) + } + + // Take the lock and check if we were woken. If we were woken and + // interrupted, the former takes priority. + i.mu.Lock() + select { + case <-wakeup: + return true + default: + return false + } +} + +// newHandleLocked signals a new pipe reader or writer depending on where +// 'wakeupChan' points. This unblocks any corresponding reader or writer +// waiting for the other end of the channel to be opened, see Fifo.waitFor. +// +// i.mu must be held. +func (*inodeOperations) newHandleLocked(wakeupChan *chan struct{}) { + if *wakeupChan != nil { + close(*wakeupChan) + *wakeupChan = nil + } +} diff --git a/pkg/sentry/kernel/pipe/node_test.go b/pkg/sentry/kernel/pipe/node_test.go new file mode 100644 index 000000000..cc1ebf4f6 --- /dev/null +++ b/pkg/sentry/kernel/pipe/node_test.go @@ -0,0 +1,308 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "testing" + "time" + + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/context/contexttest" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/syserror" +) + +type sleeper struct { + context.Context + ch chan struct{} +} + +func newSleeperContext(t *testing.T) context.Context { + return &sleeper{ + Context: contexttest.Context(t), + ch: make(chan struct{}), + } +} + +func (s *sleeper) SleepStart() <-chan struct{} { + return s.ch +} + +func (s *sleeper) SleepFinish(bool) { +} + +func (s *sleeper) Cancel() { + s.ch <- struct{}{} +} + +type openResult struct { + *fs.File + error +} + +func testOpenOrDie(ctx context.Context, t *testing.T, n fs.InodeOperations, flags fs.FileFlags, doneChan chan<- struct{}) (*fs.File, error) { + file, err := n.GetFile(ctx, nil, flags) + if err != nil { + t.Fatalf("open with flags %+v failed: %v", flags, err) + } + if doneChan != nil { + doneChan <- struct{}{} + } + return file, err +} + +func testOpen(ctx context.Context, t *testing.T, n fs.InodeOperations, flags fs.FileFlags, resChan chan<- openResult) (*fs.File, error) { + file, err := n.GetFile(ctx, nil, flags) + if resChan != nil { + resChan <- openResult{file, err} + } + return file, err +} + +func newNamedPipe(t *testing.T) *Pipe { + return NewPipe(contexttest.Context(t), true, DefaultPipeSize, usermem.PageSize) +} + +func newAnonPipe(t *testing.T) *Pipe { + return NewPipe(contexttest.Context(t), false, DefaultPipeSize, usermem.PageSize) +} + +// assertRecvBlocks ensures that a recv attempt on c blocks for at least +// blockDuration. This is useful for checking that a goroutine that is supposed +// to be executing a blocking operation is actually blocking. +func assertRecvBlocks(t *testing.T, c <-chan struct{}, blockDuration time.Duration, failMsg string) { + select { + case <-c: + t.Fatalf(failMsg) + case <-time.After(blockDuration): + // Ok, blocked for the required duration. + } +} + +func TestReadOpenBlocksForWriteOpen(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + rDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone) + + // Verify that the open for read is blocking. + assertRecvBlocks(t, rDone, time.Millisecond*100, + "open for read not blocking with no writers") + + wDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone) + + <-wDone + <-rDone +} + +func TestWriteOpenBlocksForReadOpen(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + wDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone) + + // Verify that the open for write is blocking + assertRecvBlocks(t, wDone, time.Millisecond*100, + "open for write not blocking with no readers") + + rDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone) + + <-rDone + <-wDone +} + +func TestMultipleWriteOpenDoesntCountAsReadOpen(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + rDone1 := make(chan struct{}) + rDone2 := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone1) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone2) + + assertRecvBlocks(t, rDone1, time.Millisecond*100, + "open for read didn't block with no writers") + assertRecvBlocks(t, rDone2, time.Millisecond*100, + "open for read didn't block with no writers") + + wDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone) + + <-wDone + <-rDone2 + <-rDone1 +} + +func TestClosedReaderBlocksWriteOpen(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + rFile, _ := testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true, NonBlocking: true}, nil) + rFile.DecRef() + + wDone := make(chan struct{}) + // This open for write should block because the reader is now gone. + go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone) + assertRecvBlocks(t, wDone, time.Millisecond*100, + "open for write didn't block with no concurrent readers") + + // Open for read again. This should unblock the open for write. + rDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone) + + <-rDone + <-wDone +} + +func TestReadWriteOpenNeverBlocks(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + rwDone := make(chan struct{}) + // Open for read-write never wait for a reader or writer, even if the + // nonblocking flag is not set. + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true, Write: true, NonBlocking: false}, rwDone) + <-rwDone +} + +func TestReadWriteOpenUnblocksReadOpen(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + rDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone) + + rwDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true, Write: true}, rwDone) + + <-rwDone + <-rDone +} + +func TestReadWriteOpenUnblocksWriteOpen(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + wDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone) + + rwDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true, Write: true}, rwDone) + + <-rwDone + <-wDone +} + +func TestBlockedOpenIsCancellable(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + done := make(chan openResult) + go testOpen(ctx, t, f, fs.FileFlags{Read: true}, done) + select { + case <-done: + t.Fatalf("open for read didn't block with no writers") + case <-time.After(time.Millisecond * 100): + // Ok. + } + + ctx.(*sleeper).Cancel() + // If the cancel on the sleeper didn't work, the open for read would never + // return. + res := <-done + if res.error != syserror.ErrInterrupted { + t.Fatalf("Cancellation didn't cause GetFile to return fs.ErrInterrupted, got %v.", + res.error) + } +} + +func TestNonblockingReadOpenNoWriters(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + if _, err := testOpen(ctx, t, f, fs.FileFlags{Read: true, NonBlocking: true}, nil); err != nil { + t.Fatalf("Nonblocking open for read failed with error %v.", err) + } +} + +func TestNonblockingWriteOpenNoReaders(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + if _, err := testOpen(ctx, t, f, fs.FileFlags{Write: true, NonBlocking: true}, nil); err != syserror.ENXIO { + t.Fatalf("Nonblocking open for write failed unexpected error %v.", err) + } +} + +func TestNonBlockingReadOpenWithWriter(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + wDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Write: true}, wDone) + + // Open for write blocks since there are no readers yet. + assertRecvBlocks(t, wDone, time.Millisecond*100, + "Open for write didn't block with no reader.") + + if _, err := testOpen(ctx, t, f, fs.FileFlags{Read: true, NonBlocking: true}, nil); err != nil { + t.Fatalf("Nonblocking open for read failed with error %v.", err) + } + + // Open for write should now be unblocked. + <-wDone +} + +func TestNonBlockingWriteOpenWithReader(t *testing.T) { + f := NewInodeOperations(nil, newNamedPipe(t)) + ctx := newSleeperContext(t) + + rDone := make(chan struct{}) + go testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true}, rDone) + + // Open for write blocked, since no reader yet. + assertRecvBlocks(t, rDone, time.Millisecond*100, + "Open for reader didn't block with no writer.") + + if _, err := testOpen(ctx, t, f, fs.FileFlags{Write: true, NonBlocking: true}, nil); err != nil { + t.Fatalf("Nonblocking open for write failed with error %v.", err) + } + + // Open for write should now be unblocked. + <-rDone +} + +func TestAnonReadOpen(t *testing.T) { + f := NewInodeOperations(nil, newAnonPipe(t)) + ctx := newSleeperContext(t) + + if _, err := testOpen(ctx, t, f, fs.FileFlags{Read: true}, nil); err != nil { + t.Fatalf("open anon pipe for read failed: %v", err) + } +} + +func TestAnonWriteOpen(t *testing.T) { + f := NewInodeOperations(nil, newAnonPipe(t)) + ctx := newSleeperContext(t) + + if _, err := testOpen(ctx, t, f, fs.FileFlags{Write: true}, nil); err != nil { + t.Fatalf("open anon pipe for write failed: %v", err) + } +} diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go new file mode 100644 index 000000000..1656c6ff3 --- /dev/null +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -0,0 +1,335 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package pipe provides an in-memory implementation of a unidirectional +// pipe. +// +// The goal of this pipe is to emulate the pipe syscall in all of its +// edge cases and guarantees of atomic IO. +package pipe + +import ( + "fmt" + "sync" + "sync/atomic" + "syscall" + + "gvisor.googlesource.com/gvisor/pkg/abi/linux" + "gvisor.googlesource.com/gvisor/pkg/ilist" + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/syserror" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// DefaultPipeSize is the system-wide default size of a pipe in bytes. +const DefaultPipeSize = 65536 + +// Pipe is an encapsulation of a platform-independent pipe. +// It manages a buffered byte queue shared between a reader/writer +// pair. +type Pipe struct { + waiter.Queue `state:"nosave"` + + // Whether this is a named or anonymous pipe. + isNamed bool + + // The dirent backing this pipe. Shared by all readers and writers. + dirent *fs.Dirent + + // The buffered byte queue. + data ilist.List + + // Max size of the pipe in bytes. When this max has been reached, + // writers will get EWOULDBLOCK. + max int + + // Current size of the pipe in bytes. + size int + + // Max number of bytes the pipe can guarantee to read or write + // atomically. + atomicIOBytes int + + // The number of active readers for this pipe. Load/store atomically. + readers int32 + + // The number of active writes for this pipe. Load/store atomically. + writers int32 + + // This flag indicates if this pipe ever had a writer. Note that this does + // not necessarily indicate there is *currently* a writer, just that there + // has been a writer at some point since the pipe was created. + // + // Protected by mu. + hadWriter bool + + // Lock protecting all pipe internal state. + mu sync.Mutex `state:"nosave"` +} + +// NewPipe initializes and returns a pipe. A pipe created by this function is +// persistent, and will remain valid even without any open fds to it. Named +// pipes for mknod(2) are created via this function. Note that the +// implementation of blocking semantics for opening the read and write ends of a +// named pipe are left to filesystems. +func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe { + p := &Pipe{ + isNamed: isNamed, + max: sizeBytes, + atomicIOBytes: atomicIOBytes, + } + + // Build the fs.Dirent of this pipe, shared by all fs.Files associated + // with this pipe. + ino := pipeDevice.NextIno() + base := fsutil.NewSimpleInodeOperations(fsutil.InodeSimpleAttributes{ + FSType: linux.PIPEFS_MAGIC, + UAttr: fs.WithCurrentTime(ctx, fs.UnstableAttr{ + Owner: fs.FileOwnerFromContext(ctx), + Perms: fs.FilePermissions{ + User: fs.PermMask{Read: true, Write: true}, + }, + Links: 1, + }), + }) + sattr := fs.StableAttr{ + Type: fs.Pipe, + DeviceID: pipeDevice.DeviceID(), + InodeID: ino, + BlockSize: int64(atomicIOBytes), + } + // There is no real filesystem backing this pipe, so we pass in a nil + // Filesystem. + sb := fs.NewNonCachingMountSource(nil, fs.MountSourceFlags{}) + p.dirent = fs.NewDirent(fs.NewInode(NewInodeOperations(base, p), sb, sattr), fmt.Sprintf("pipe:[%d]", ino)) + + return p +} + +// NewConnectedPipe initializes a pipe and returns a pair of objects (which +// implement kio.File) representing the read and write ends of the pipe. A pipe +// created by this function becomes invalid as soon as either the read or write +// end is closed, and errors on subsequent operations on either end. Pipes +// for pipe(2) and pipe2(2) are generally created this way. +func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) { + p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes) + return p.ROpen(ctx), p.WOpen(ctx) +} + +// ROpen opens the pipe for reading. +func (p *Pipe) ROpen(ctx context.Context) *fs.File { + p.rOpen() + return fs.NewFile(ctx, p.dirent, fs.FileFlags{Read: true}, &Reader{ + ReaderWriter: ReaderWriter{Pipe: p}, + }) +} + +// WOpen opens the pipe for writing. +func (p *Pipe) WOpen(ctx context.Context) *fs.File { + p.wOpen() + return fs.NewFile(ctx, p.dirent, fs.FileFlags{Write: true}, &Writer{ + ReaderWriter: ReaderWriter{Pipe: p}, + }) +} + +// RWOpen opens the pipe for both reading and writing. +func (p *Pipe) RWOpen(ctx context.Context) *fs.File { + p.rOpen() + p.wOpen() + return fs.NewFile(ctx, p.dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{ + Pipe: p, + }) +} + +// read reads data from the pipe into dst and returns the number of bytes +// read, or returns ErrWouldBlock if the pipe is empty. +func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) { + if !p.HasReaders() { + return 0, syscall.EBADF + } + + // Don't block for a zero-length read even if the pipe is empty. + if dst.NumBytes() == 0 { + return 0, nil + } + + p.mu.Lock() + defer p.mu.Unlock() + // If there is nothing to read at the moment but there is a writer, tell the + // caller to block. + if p.size == 0 { + if !p.HasWriters() { + // There are no writers, return EOF. + return 0, nil + } + return 0, syserror.ErrWouldBlock + } + var n int64 + for b := p.data.Front(); b != nil; b = p.data.Front() { + buffer := b.(*Buffer) + n0, err := dst.CopyOut(ctx, buffer.bytes()) + n += int64(n0) + p.size -= n0 + if buffer.truncate(n0) == 0 { + p.data.Remove(b) + } + dst = dst.DropFirst(n0) + if dst.NumBytes() == 0 || err != nil { + return n, err + } + } + return n, nil +} + +// write writes data from sv into the pipe and returns the number of bytes +// written. If no bytes are written because the pipe is full (or has less than +// atomicIOBytes free capacity), write returns ErrWouldBlock. +func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if !p.HasWriters() { + return 0, syscall.EBADF + } + if !p.HasReaders() { + return 0, syscall.EPIPE + } + + // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be + // atomic, but requires no atomicity for writes larger than this. However, + // Linux appears to provide stronger semantics than this in practice: + // unmerged writes are done one PAGE_SIZE buffer at a time, so for larger + // writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement + // this by writing at most atomicIOBytes at a time if we can't service the + // write in its entirety. + canWrite := src.NumBytes() + if canWrite > int64(p.max-p.size) { + if p.max-p.size >= p.atomicIOBytes { + canWrite = int64(p.atomicIOBytes) + } else { + return 0, syserror.ErrWouldBlock + } + } + + // Copy data from user memory into a pipe-owned buffer. + buf := make([]byte, canWrite) + n, err := src.CopyIn(ctx, buf) + if n > 0 { + p.data.PushBack(newBuffer(buf[:n])) + p.size += n + } + if int64(n) < src.NumBytes() && err == nil { + // Partial write due to full pipe. + err = syserror.ErrWouldBlock + } + return int64(n), err +} + +// rOpen signals a new reader of the pipe. +func (p *Pipe) rOpen() { + atomic.AddInt32(&p.readers, 1) +} + +// wOpen signals a new writer of the pipe. +func (p *Pipe) wOpen() { + p.mu.Lock() + defer p.mu.Unlock() + p.hadWriter = true + atomic.AddInt32(&p.writers, 1) +} + +// rClose signals that a reader has closed their end of the pipe. +func (p *Pipe) rClose() { + newReaders := atomic.AddInt32(&p.readers, -1) + if newReaders < 0 { + panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders)) + } +} + +// wClose signals that a writer has closed their end of the pipe. +func (p *Pipe) wClose() { + newWriters := atomic.AddInt32(&p.writers, -1) + if newWriters < 0 { + panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters)) + } +} + +// HasReaders returns whether the pipe has any active readers. +func (p *Pipe) HasReaders() bool { + return atomic.LoadInt32(&p.readers) > 0 +} + +// HasWriters returns whether the pipe has any active writers. +func (p *Pipe) HasWriters() bool { + return atomic.LoadInt32(&p.writers) > 0 +} + +func (p *Pipe) rReadinessLocked() waiter.EventMask { + ready := waiter.EventMask(0) + if p.HasReaders() && p.data.Front() != nil { + ready |= waiter.EventIn + } + if !p.HasWriters() && p.hadWriter { + // POLLHUP must be supressed until the pipe has had at least one writer + // at some point. Otherwise a reader thread may poll and immediately get + // a POLLHUP before the writer ever opens the pipe, which the reader may + // interpret as the writer opening then closing the pipe. + ready |= waiter.EventHUp + } + return ready +} + +// rReadiness returns a mask that states whether the read end of the pipe is +// ready for reading. +func (p *Pipe) rReadiness() waiter.EventMask { + p.mu.Lock() + defer p.mu.Unlock() + return p.rReadinessLocked() +} + +func (p *Pipe) wReadinessLocked() waiter.EventMask { + ready := waiter.EventMask(0) + if p.HasWriters() && p.size < p.max { + ready |= waiter.EventOut + } + if !p.HasReaders() { + ready |= waiter.EventErr + } + return ready +} + +// wReadiness returns a mask that states whether the write end of the pipe +// is ready for writing. +func (p *Pipe) wReadiness() waiter.EventMask { + p.mu.Lock() + defer p.mu.Unlock() + return p.wReadinessLocked() +} + +// rwReadiness returns a mask that states whether a read-write handle to the +// pipe is ready for IO. +func (p *Pipe) rwReadiness() waiter.EventMask { + p.mu.Lock() + defer p.mu.Unlock() + return p.rReadinessLocked() | p.wReadinessLocked() +} + +func (p *Pipe) queuedSize() int { + p.mu.Lock() + defer p.mu.Unlock() + return p.size +} diff --git a/pkg/sentry/kernel/pipe/pipe_test.go b/pkg/sentry/kernel/pipe/pipe_test.go new file mode 100644 index 000000000..49ef8c8ac --- /dev/null +++ b/pkg/sentry/kernel/pipe/pipe_test.go @@ -0,0 +1,138 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "bytes" + "testing" + + "gvisor.googlesource.com/gvisor/pkg/sentry/context/contexttest" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/syserror" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +func TestPipeRW(t *testing.T) { + ctx := contexttest.Context(t) + r, w := NewConnectedPipe(ctx, 65536, 4096) + defer r.DecRef() + defer w.DecRef() + + msg := []byte("here's some bytes") + wantN := int64(len(msg)) + n, err := w.Writev(ctx, usermem.BytesIOSequence(msg)) + if n != wantN || err != nil { + t.Fatalf("Writev: got (%d, %v), wanted (%d, nil)", n, err, wantN) + } + + buf := make([]byte, len(msg)) + n, err = r.Readv(ctx, usermem.BytesIOSequence(buf)) + if n != wantN || err != nil || !bytes.Equal(buf, msg) { + t.Fatalf("Readv: got (%d, %v) %q, wanted (%d, nil) %q", n, err, buf, wantN, msg) + } +} + +func TestPipeReadBlock(t *testing.T) { + ctx := contexttest.Context(t) + r, w := NewConnectedPipe(ctx, 65536, 4096) + defer r.DecRef() + defer w.DecRef() + + n, err := r.Readv(ctx, usermem.BytesIOSequence(make([]byte, 1))) + if n != 0 || err != syserror.ErrWouldBlock { + t.Fatalf("Readv: got (%d, %v), wanted (0, %v)", n, err, syserror.ErrWouldBlock) + } +} + +func TestPipeWriteBlock(t *testing.T) { + const atomicIOBytes = 2 + + ctx := contexttest.Context(t) + r, w := NewConnectedPipe(ctx, 10, atomicIOBytes) + defer r.DecRef() + defer w.DecRef() + + msg := []byte("here's some bytes") + n, err := w.Writev(ctx, usermem.BytesIOSequence(msg)) + if wantN, wantErr := int64(atomicIOBytes), syserror.ErrWouldBlock; n != wantN || err != wantErr { + t.Fatalf("Writev: got (%d, %v), wanted (%d, %v)", n, err, wantN, wantErr) + } +} + +func TestPipeWriteUntilEnd(t *testing.T) { + const atomicIOBytes = 2 + + ctx := contexttest.Context(t) + r, w := NewConnectedPipe(ctx, atomicIOBytes, atomicIOBytes) + defer r.DecRef() + defer w.DecRef() + + msg := []byte("here's some bytes") + + wDone := make(chan struct{}, 0) + rDone := make(chan struct{}, 0) + defer func() { + // Signal the reader to stop and wait until it does so. + close(wDone) + <-rDone + }() + + go func() { + defer close(rDone) + // Read from r until done is closed. + ctx := contexttest.Context(t) + buf := make([]byte, len(msg)+1) + dst := usermem.BytesIOSequence(buf) + e, ch := waiter.NewChannelEntry(nil) + r.EventRegister(&e, waiter.EventIn) + defer r.EventUnregister(&e) + for { + n, err := r.Readv(ctx, dst) + dst = dst.DropFirst64(n) + if err == syserror.ErrWouldBlock { + select { + case <-ch: + continue + case <-wDone: + // We expect to have 1 byte left in dst since len(buf) == + // len(msg)+1. + if dst.NumBytes() != 1 || !bytes.Equal(buf[:len(msg)], msg) { + t.Errorf("Reader: got %q (%d bytes remaining), wanted %q", buf, dst.NumBytes(), msg) + } + return + } + } + if err != nil { + t.Fatalf("Readv: got unexpected error %v", err) + } + } + }() + + src := usermem.BytesIOSequence(msg) + e, ch := waiter.NewChannelEntry(nil) + w.EventRegister(&e, waiter.EventOut) + defer w.EventUnregister(&e) + for src.NumBytes() != 0 { + n, err := w.Writev(ctx, src) + src = src.DropFirst64(n) + if err == syserror.ErrWouldBlock { + <-ch + continue + } + if err != nil { + t.Fatalf("Writev: got (%d, %v)", n, err) + } + } +} diff --git a/pkg/sentry/kernel/pipe/reader.go b/pkg/sentry/kernel/pipe/reader.go new file mode 100644 index 000000000..40d5e4943 --- /dev/null +++ b/pkg/sentry/kernel/pipe/reader.go @@ -0,0 +1,37 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// Reader satisfies the fs.FileOperations interface for read-only pipes. +// Reader should be used with !fs.FileFlags.Write to reject writes. +type Reader struct { + ReaderWriter +} + +// Release implements fs.FileOperations.Release. +func (r *Reader) Release() { + r.Pipe.rClose() + // Wake up writers. + r.Pipe.Notify(waiter.EventOut) +} + +// Readiness returns the ready events in the underlying pipe. +func (r *Reader) Readiness(mask waiter.EventMask) waiter.EventMask { + return r.Pipe.rReadiness() & mask +} diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go new file mode 100644 index 000000000..dc642a3a6 --- /dev/null +++ b/pkg/sentry/kernel/pipe/reader_writer.go @@ -0,0 +1,91 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "fmt" + "math" + "syscall" + + "gvisor.googlesource.com/gvisor/pkg/sentry/arch" + "gvisor.googlesource.com/gvisor/pkg/sentry/context" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs" + "gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil" + "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// ReaderWriter satisfies the FileOperations interface and services both +// read and write requests. This should only be used directly for named pipes. +// pipe(2) and pipe2(2) only support unidirectional pipes and should use +// either pipe.Reader or pipe.Writer. +type ReaderWriter struct { + fsutil.PipeSeek `state:"nosave"` + fsutil.NotDirReaddir `state:"nosave"` + fsutil.NoFsync `state:"nosave"` + fsutil.NoopFlush `state:"nosave"` + fsutil.NoMMap `state:"nosave"` + *Pipe +} + +// Release implements fs.FileOperations.Release. +func (rw *ReaderWriter) Release() { + rw.Pipe.rClose() + rw.Pipe.wClose() + // Wake up readers and writers. + rw.Pipe.Notify(waiter.EventIn | waiter.EventOut) +} + +// Read implements fs.FileOperations.Read. +func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) { + n, err := rw.Pipe.read(ctx, dst) + if n > 0 { + rw.Pipe.Notify(waiter.EventOut) + } + return n, err +} + +// Write implements fs.FileOperations.Write. +func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) { + n, err := rw.Pipe.write(ctx, src) + if n > 0 { + rw.Pipe.Notify(waiter.EventIn) + } + return n, err +} + +// Readiness returns the ready events in the underlying pipe. +func (rw *ReaderWriter) Readiness(mask waiter.EventMask) waiter.EventMask { + return rw.Pipe.rwReadiness() & mask +} + +// Ioctl implements fs.FileOperations.Ioctl. +func (rw *ReaderWriter) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArguments) (uintptr, error) { + // Switch on ioctl request. + switch int(args[1].Int()) { + case syscall.TIOCINQ: + v := rw.queuedSize() + if v > math.MaxInt32 { + panic(fmt.Sprintf("Impossibly large pipe queued size: %d", v)) + } + // Copy result to user-space. + _, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{ + AddressSpaceActive: true, + }) + return 0, err + default: + return 0, syscall.ENOTTY + } +} diff --git a/pkg/sentry/kernel/pipe/writer.go b/pkg/sentry/kernel/pipe/writer.go new file mode 100644 index 000000000..fd13008ac --- /dev/null +++ b/pkg/sentry/kernel/pipe/writer.go @@ -0,0 +1,37 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// Writer satisfies the fs.FileOperations interface for write-only pipes. +// Writer should be used with !fs.FileFlags.Read to reject reads. +type Writer struct { + ReaderWriter +} + +// Release implements fs.FileOperations.Release. +func (w *Writer) Release() { + w.Pipe.wClose() + // Wake up readers. + w.Pipe.Notify(waiter.EventHUp) +} + +// Readiness returns the ready events in the underlying pipe. +func (w *Writer) Readiness(mask waiter.EventMask) waiter.EventMask { + return w.Pipe.wReadiness() & mask +} |