diff options
Diffstat (limited to 'pkg/sentry/kernel/pipe')
-rw-r--r-- | pkg/sentry/kernel/pipe/BUILD | 38 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/buffer.go | 115 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/node.go | 11 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/node_test.go | 8 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe.go | 139 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe_test.go | 20 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe_unsafe.go (renamed from pkg/sentry/kernel/pipe/buffer_test.go) | 23 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe_util.go | 35 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/reader.go | 3 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/reader_writer.go | 4 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/vfs.go | 410 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/writer.go | 3 |
12 files changed, 435 insertions, 374 deletions
diff --git a/pkg/sentry/kernel/pipe/BUILD b/pkg/sentry/kernel/pipe/BUILD index 9d34f6d4d..449643118 100644 --- a/pkg/sentry/kernel/pipe/BUILD +++ b/pkg/sentry/kernel/pipe/BUILD @@ -1,49 +1,36 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_test") -load("//tools/go_generics:defs.bzl", "go_template_instance") -load("//tools/go_stateify:defs.bzl", "go_library") +load("//tools:defs.bzl", "go_library", "go_test") package(licenses = ["notice"]) -go_template_instance( - name = "buffer_list", - out = "buffer_list.go", - package = "pipe", - prefix = "buffer", - template = "//pkg/ilist:generic_list", - types = { - "Element": "*buffer", - "Linker": "*buffer", - }, -) - go_library( name = "pipe", srcs = [ - "buffer.go", - "buffer_list.go", "device.go", "node.go", "pipe.go", + "pipe_unsafe.go", "pipe_util.go", "reader.go", "reader_writer.go", "vfs.go", "writer.go", ], - importpath = "gvisor.dev/gvisor/pkg/sentry/kernel/pipe", visibility = ["//pkg/sentry:internal"], deps = [ "//pkg/abi/linux", "//pkg/amutex", + "//pkg/buffer", + "//pkg/context", + "//pkg/safemem", "//pkg/sentry/arch", - "//pkg/sentry/context", "//pkg/sentry/device", "//pkg/sentry/fs", "//pkg/sentry/fs/fsutil", - "//pkg/sentry/safemem", - "//pkg/sentry/usermem", + "//pkg/sentry/fs/lock", "//pkg/sentry/vfs", + "//pkg/sync", "//pkg/syserror", + "//pkg/usermem", "//pkg/waiter", ], ) @@ -52,17 +39,16 @@ go_test( name = "pipe_test", size = "small", srcs = [ - "buffer_test.go", "node_test.go", "pipe_test.go", ], - embed = [":pipe"], + library = ":pipe", deps = [ - "//pkg/sentry/context", - "//pkg/sentry/context/contexttest", + "//pkg/context", + "//pkg/sentry/contexttest", "//pkg/sentry/fs", - "//pkg/sentry/usermem", "//pkg/syserror", + "//pkg/usermem", "//pkg/waiter", ], ) diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go deleted file mode 100644 index 95bee2d37..000000000 --- a/pkg/sentry/kernel/pipe/buffer.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -import ( - "io" - "sync" - - "gvisor.dev/gvisor/pkg/sentry/safemem" -) - -// buffer encapsulates a queueable byte buffer. -// -// Note that the total size is slightly less than two pages. This -// is done intentionally to ensure that the buffer object aligns -// with runtime internals. We have no hard size or alignment -// requirements. This two page size will effectively minimize -// internal fragmentation, but still have a large enough chunk -// to limit excessive segmentation. -// -// +stateify savable -type buffer struct { - data [8144]byte - read int - write int - bufferEntry -} - -// Reset resets internal data. -// -// This must be called before use. -func (b *buffer) Reset() { - b.read = 0 - b.write = 0 -} - -// Empty indicates the buffer is empty. -// -// This indicates there is no data left to read. -func (b *buffer) Empty() bool { - return b.read == b.write -} - -// Full indicates the buffer is full. -// -// This indicates there is no capacity left to write. -func (b *buffer) Full() bool { - return b.write == len(b.data) -} - -// WriteFromBlocks implements safemem.Writer.WriteFromBlocks. -func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) { - dst := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.write:])) - n, err := safemem.CopySeq(dst, srcs) - b.write += int(n) - return n, err -} - -// WriteFromReader writes to the buffer from an io.Reader. -func (b *buffer) WriteFromReader(r io.Reader, count int64) (int64, error) { - dst := b.data[b.write:] - if count < int64(len(dst)) { - dst = b.data[b.write:][:count] - } - n, err := r.Read(dst) - b.write += n - return int64(n), err -} - -// ReadToBlocks implements safemem.Reader.ReadToBlocks. -func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) { - src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write])) - n, err := safemem.CopySeq(dsts, src) - b.read += int(n) - return n, err -} - -// ReadToWriter reads from the buffer into an io.Writer. -func (b *buffer) ReadToWriter(w io.Writer, count int64, dup bool) (int64, error) { - src := b.data[b.read:b.write] - if count < int64(len(src)) { - src = b.data[b.read:][:count] - } - n, err := w.Write(src) - if !dup { - b.read += n - } - return int64(n), err -} - -// bufferPool is a pool for buffers. -var bufferPool = sync.Pool{ - New: func() interface{} { - return new(buffer) - }, -} - -// newBuffer grabs a new buffer from the pool. -func newBuffer() *buffer { - b := bufferPool.Get().(*buffer) - b.Reset() - return b -} diff --git a/pkg/sentry/kernel/pipe/node.go b/pkg/sentry/kernel/pipe/node.go index 4a19ab7ce..6497dc4ba 100644 --- a/pkg/sentry/kernel/pipe/node.go +++ b/pkg/sentry/kernel/pipe/node.go @@ -15,12 +15,11 @@ package pipe import ( - "sync" - "gvisor.dev/gvisor/pkg/abi/linux" - "gvisor.dev/gvisor/pkg/sentry/context" + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sentry/fs/fsutil" + "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/syserror" ) @@ -94,7 +93,7 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi if i.p.isNamed && !flags.NonBlocking && !i.p.HasWriters() { if !waitFor(&i.mu, &i.wWakeup, ctx) { - r.DecRef() + r.DecRef(ctx) return nil, syserror.ErrInterrupted } } @@ -112,12 +111,12 @@ func (i *inodeOperations) GetFile(ctx context.Context, d *fs.Dirent, flags fs.Fi // On a nonblocking, write-only open, the open fails with ENXIO if the // read side isn't open yet. if flags.NonBlocking { - w.DecRef() + w.DecRef(ctx) return nil, syserror.ENXIO } if !waitFor(&i.mu, &i.rWakeup, ctx) { - w.DecRef() + w.DecRef(ctx) return nil, syserror.ErrInterrupted } } diff --git a/pkg/sentry/kernel/pipe/node_test.go b/pkg/sentry/kernel/pipe/node_test.go index 16fa80abe..ce0db5583 100644 --- a/pkg/sentry/kernel/pipe/node_test.go +++ b/pkg/sentry/kernel/pipe/node_test.go @@ -18,11 +18,11 @@ import ( "testing" "time" - "gvisor.dev/gvisor/pkg/sentry/context" - "gvisor.dev/gvisor/pkg/sentry/context/contexttest" + "gvisor.dev/gvisor/pkg/context" + "gvisor.dev/gvisor/pkg/sentry/contexttest" "gvisor.dev/gvisor/pkg/sentry/fs" - "gvisor.dev/gvisor/pkg/sentry/usermem" "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/usermem" ) type sleeper struct { @@ -167,7 +167,7 @@ func TestClosedReaderBlocksWriteOpen(t *testing.T) { f := NewInodeOperations(ctx, perms, newNamedPipe(t)) rFile, _ := testOpenOrDie(ctx, t, f, fs.FileFlags{Read: true, NonBlocking: true}, nil) - rFile.DecRef() + rFile.DecRef(ctx) wDone := make(chan struct{}) // This open for write should block because the reader is now gone. diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go index 1a1b38f83..297e8f28f 100644 --- a/pkg/sentry/kernel/pipe/pipe.go +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -17,12 +17,13 @@ package pipe import ( "fmt" - "sync" "sync/atomic" "syscall" - "gvisor.dev/gvisor/pkg/sentry/context" + "gvisor.dev/gvisor/pkg/buffer" + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/fs" + "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/syserror" "gvisor.dev/gvisor/pkg/waiter" ) @@ -70,10 +71,10 @@ type Pipe struct { // mu protects all pipe internal state below. mu sync.Mutex `state:"nosave"` - // data is the buffer queue of pipe contents. + // view is the underlying set of buffers. // // This is protected by mu. - data bufferList + view buffer.View // max is the maximum size of the pipe in bytes. When this max has been // reached, writers will get EWOULDBLOCK. @@ -81,11 +82,6 @@ type Pipe struct { // This is protected by mu. max int64 - // size is the current size of the pipe in bytes. - // - // This is protected by mu. - size int64 - // hadWriter indicates if this pipe ever had a writer. Note that this // does not necessarily indicate there is *currently* a writer, just // that there has been a writer at some point since the pipe was @@ -156,7 +152,7 @@ func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs. d := fs.NewDirent(ctx, fs.NewInode(ctx, iops, ms, sattr), fmt.Sprintf("pipe:[%d]", ino)) // The p.Open calls below will each take a reference on the Dirent. We // must drop the one we already have. - defer d.DecRef() + defer d.DecRef(ctx) return p.Open(ctx, d, fs.FileFlags{Read: true}), p.Open(ctx, d, fs.FileFlags{Write: true}) } @@ -196,7 +192,7 @@ type readOps struct { limit func(int64) // read performs the actual read operation. - read func(*buffer) (int64, error) + read func(*buffer.View) (int64, error) } // read reads data from the pipe into dst and returns the number of bytes @@ -211,82 +207,27 @@ func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) { p.mu.Lock() defer p.mu.Unlock() - - // Is the pipe empty? - if p.size == 0 { - if !p.HasWriters() { - // There are no writers, return EOF. - return 0, nil - } - return 0, syserror.ErrWouldBlock - } - - // Limit how much we consume. - if ops.left() > p.size { - ops.limit(p.size) - } - - done := int64(0) - for ops.left() > 0 { - // Pop the first buffer. - first := p.data.Front() - if first == nil { - break - } - - // Copy user data. - n, err := ops.read(first) - done += int64(n) - p.size -= n - - // Empty buffer? - if first.Empty() { - // Push to the free list. - p.data.Remove(first) - bufferPool.Put(first) - } - - // Handle errors. - if err != nil { - return done, err - } - } - - return done, nil + return p.readLocked(ctx, ops) } -// dup duplicates all data from this pipe into the given writer. -// -// There is no blocking behavior implemented here. The writer may propagate -// some blocking error. All the writes must be complete writes. -func (p *Pipe) dup(ctx context.Context, ops readOps) (int64, error) { - p.mu.Lock() - defer p.mu.Unlock() - +func (p *Pipe) readLocked(ctx context.Context, ops readOps) (int64, error) { // Is the pipe empty? - if p.size == 0 { + if p.view.Size() == 0 { if !p.HasWriters() { - // See above. + // There are no writers, return EOF. return 0, nil } return 0, syserror.ErrWouldBlock } // Limit how much we consume. - if ops.left() > p.size { - ops.limit(p.size) + if ops.left() > p.view.Size() { + ops.limit(p.view.Size()) } - done := int64(0) - for buf := p.data.Front(); buf != nil; buf = buf.Next() { - n, err := ops.read(buf) - done += n - if err != nil { - return done, err - } - } - - return done, nil + // Copy user data; the read op is responsible for trimming. + done, err := ops.read(&p.view) + return done, err } type writeOps struct { @@ -297,7 +238,7 @@ type writeOps struct { limit func(int64) // write should write to the provided buffer. - write func(*buffer) (int64, error) + write func(*buffer.View) (int64, error) } // write writes data from sv into the pipe and returns the number of bytes @@ -308,7 +249,10 @@ type writeOps struct { func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) { p.mu.Lock() defer p.mu.Unlock() + return p.writeLocked(ctx, ops) +} +func (p *Pipe) writeLocked(ctx context.Context, ops writeOps) (int64, error) { // Can't write to a pipe with no readers. if !p.HasReaders() { return 0, syscall.EPIPE @@ -317,35 +261,28 @@ func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) { // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be // atomic, but requires no atomicity for writes larger than this. wanted := ops.left() - if avail := p.max - p.size; wanted > avail { + avail := p.max - p.view.Size() + if wanted > avail { if wanted <= p.atomicIOBytes { return 0, syserror.ErrWouldBlock } ops.limit(avail) } - done := int64(0) - for ops.left() > 0 { - // Need a new buffer? - last := p.data.Back() - if last == nil || last.Full() { - // Add a new buffer to the data list. - last = newBuffer() - p.data.PushBack(last) - } - - // Copy user data. - n, err := ops.write(last) - done += int64(n) - p.size += n + // Copy user data. + done, err := ops.write(&p.view) + if err != nil { + return done, err + } - // Handle errors. - if err != nil { - return done, err - } + if done < avail { + // Non-failure, but short write. + return done, nil } - if wanted > done { - // Partial write due to full pipe. + if done < wanted { + // Partial write due to full pipe. Note that this could also be + // the short write case above, we would expect a second call + // and the write to return zero bytes in this case. return done, syserror.ErrWouldBlock } @@ -396,7 +333,7 @@ func (p *Pipe) HasWriters() bool { // Precondition: mu must be held. func (p *Pipe) rReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) - if p.HasReaders() && p.data.Front() != nil { + if p.HasReaders() && p.view.Size() != 0 { ready |= waiter.EventIn } if !p.HasWriters() && p.hadWriter { @@ -422,7 +359,7 @@ func (p *Pipe) rReadiness() waiter.EventMask { // Precondition: mu must be held. func (p *Pipe) wReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) - if p.HasWriters() && p.size < p.max { + if p.HasWriters() && p.view.Size() < p.max { ready |= waiter.EventOut } if !p.HasReaders() { @@ -451,7 +388,7 @@ func (p *Pipe) rwReadiness() waiter.EventMask { func (p *Pipe) queued() int64 { p.mu.Lock() defer p.mu.Unlock() - return p.size + return p.view.Size() } // FifoSize implements fs.FifoSizer.FifoSize. @@ -474,7 +411,7 @@ func (p *Pipe) SetFifoSize(size int64) (int64, error) { } p.mu.Lock() defer p.mu.Unlock() - if size < p.size { + if size < p.view.Size() { return 0, syserror.EBUSY } p.max = size diff --git a/pkg/sentry/kernel/pipe/pipe_test.go b/pkg/sentry/kernel/pipe/pipe_test.go index e3a14b665..fe97e9800 100644 --- a/pkg/sentry/kernel/pipe/pipe_test.go +++ b/pkg/sentry/kernel/pipe/pipe_test.go @@ -18,17 +18,17 @@ import ( "bytes" "testing" - "gvisor.dev/gvisor/pkg/sentry/context/contexttest" - "gvisor.dev/gvisor/pkg/sentry/usermem" + "gvisor.dev/gvisor/pkg/sentry/contexttest" "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/usermem" "gvisor.dev/gvisor/pkg/waiter" ) func TestPipeRW(t *testing.T) { ctx := contexttest.Context(t) r, w := NewConnectedPipe(ctx, 65536, 4096) - defer r.DecRef() - defer w.DecRef() + defer r.DecRef(ctx) + defer w.DecRef(ctx) msg := []byte("here's some bytes") wantN := int64(len(msg)) @@ -47,8 +47,8 @@ func TestPipeRW(t *testing.T) { func TestPipeReadBlock(t *testing.T) { ctx := contexttest.Context(t) r, w := NewConnectedPipe(ctx, 65536, 4096) - defer r.DecRef() - defer w.DecRef() + defer r.DecRef(ctx) + defer w.DecRef(ctx) n, err := r.Readv(ctx, usermem.BytesIOSequence(make([]byte, 1))) if n != 0 || err != syserror.ErrWouldBlock { @@ -62,8 +62,8 @@ func TestPipeWriteBlock(t *testing.T) { ctx := contexttest.Context(t) r, w := NewConnectedPipe(ctx, capacity, atomicIOBytes) - defer r.DecRef() - defer w.DecRef() + defer r.DecRef(ctx) + defer w.DecRef(ctx) msg := make([]byte, capacity+1) n, err := w.Writev(ctx, usermem.BytesIOSequence(msg)) @@ -77,8 +77,8 @@ func TestPipeWriteUntilEnd(t *testing.T) { ctx := contexttest.Context(t) r, w := NewConnectedPipe(ctx, atomicIOBytes, atomicIOBytes) - defer r.DecRef() - defer w.DecRef() + defer r.DecRef(ctx) + defer w.DecRef(ctx) msg := []byte("here's some bytes") diff --git a/pkg/sentry/kernel/pipe/buffer_test.go b/pkg/sentry/kernel/pipe/pipe_unsafe.go index ee1b90115..dd60cba24 100644 --- a/pkg/sentry/kernel/pipe/buffer_test.go +++ b/pkg/sentry/kernel/pipe/pipe_unsafe.go @@ -15,18 +15,21 @@ package pipe import ( - "testing" "unsafe" - - "gvisor.dev/gvisor/pkg/sentry/usermem" ) -func TestBufferSize(t *testing.T) { - bufferSize := unsafe.Sizeof(buffer{}) - if bufferSize < usermem.PageSize { - t.Errorf("buffer is less than a page") - } - if bufferSize > (2 * usermem.PageSize) { - t.Errorf("buffer is greater than two pages") +// lockTwoPipes locks both x.mu and y.mu in an order that is guaranteed to be +// consistent for both lockTwoPipes(x, y) and lockTwoPipes(y, x), such that +// concurrent calls cannot deadlock. +// +// Preconditions: x != y. +func lockTwoPipes(x, y *Pipe) { + // Lock the two pipes in order of increasing address. + if uintptr(unsafe.Pointer(x)) < uintptr(unsafe.Pointer(y)) { + x.mu.Lock() + y.mu.Lock() + } else { + y.mu.Lock() + x.mu.Lock() } } diff --git a/pkg/sentry/kernel/pipe/pipe_util.go b/pkg/sentry/kernel/pipe/pipe_util.go index ef9641e6a..6d58b682f 100644 --- a/pkg/sentry/kernel/pipe/pipe_util.go +++ b/pkg/sentry/kernel/pipe/pipe_util.go @@ -17,14 +17,15 @@ package pipe import ( "io" "math" - "sync" "syscall" "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/amutex" + "gvisor.dev/gvisor/pkg/buffer" + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/arch" - "gvisor.dev/gvisor/pkg/sentry/context" - "gvisor.dev/gvisor/pkg/sentry/usermem" + "gvisor.dev/gvisor/pkg/sync" + "gvisor.dev/gvisor/pkg/usermem" "gvisor.dev/gvisor/pkg/waiter" ) @@ -32,7 +33,7 @@ import ( // the old fs architecture. // Release cleans up the pipe's state. -func (p *Pipe) Release() { +func (p *Pipe) Release(context.Context) { p.rClose() p.wClose() @@ -49,9 +50,10 @@ func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) limit: func(l int64) { dst = dst.TakeFirst64(l) }, - read: func(buf *buffer) (int64, error) { - n, err := dst.CopyOutFrom(ctx, buf) + read: func(view *buffer.View) (int64, error) { + n, err := dst.CopyOutFrom(ctx, view) dst = dst.DropFirst64(n) + view.TrimFront(n) return n, err }, }) @@ -70,16 +72,15 @@ func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) limit: func(l int64) { count = l }, - read: func(buf *buffer) (int64, error) { - n, err := buf.ReadToWriter(w, count, dup) + read: func(view *buffer.View) (int64, error) { + n, err := view.ReadToWriter(w, count) + if !dup { + view.TrimFront(n) + } count -= n return n, err }, } - if dup { - // There is no notification for dup operations. - return p.dup(ctx, ops) - } n, err := p.read(ctx, ops) if n > 0 { p.Notify(waiter.EventOut) @@ -96,8 +97,8 @@ func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) limit: func(l int64) { src = src.TakeFirst64(l) }, - write: func(buf *buffer) (int64, error) { - n, err := src.CopyInTo(ctx, buf) + write: func(view *buffer.View) (int64, error) { + n, err := src.CopyInTo(ctx, view) src = src.DropFirst64(n) return n, err }, @@ -117,8 +118,8 @@ func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, e limit: func(l int64) { count = l }, - write: func(buf *buffer) (int64, error) { - n, err := buf.WriteFromReader(r, count) + write: func(view *buffer.View) (int64, error) { + n, err := view.WriteFromReader(r, count) count -= n return n, err }, @@ -143,7 +144,7 @@ func (p *Pipe) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArgume if v > math.MaxInt32 { v = math.MaxInt32 // Silently truncate. } - // Copy result to user-space. + // Copy result to userspace. _, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{ AddressSpaceActive: true, }) diff --git a/pkg/sentry/kernel/pipe/reader.go b/pkg/sentry/kernel/pipe/reader.go index 7724b4452..ac18785c0 100644 --- a/pkg/sentry/kernel/pipe/reader.go +++ b/pkg/sentry/kernel/pipe/reader.go @@ -15,6 +15,7 @@ package pipe import ( + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/waiter" ) @@ -29,7 +30,7 @@ type Reader struct { // Release implements fs.FileOperations.Release. // // This overrides ReaderWriter.Release. -func (r *Reader) Release() { +func (r *Reader) Release(context.Context) { r.Pipe.rClose() // Wake up writers. diff --git a/pkg/sentry/kernel/pipe/reader_writer.go b/pkg/sentry/kernel/pipe/reader_writer.go index b4d29fc77..b2b5691ee 100644 --- a/pkg/sentry/kernel/pipe/reader_writer.go +++ b/pkg/sentry/kernel/pipe/reader_writer.go @@ -17,11 +17,11 @@ package pipe import ( "io" + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/arch" - "gvisor.dev/gvisor/pkg/sentry/context" "gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sentry/fs/fsutil" - "gvisor.dev/gvisor/pkg/sentry/usermem" + "gvisor.dev/gvisor/pkg/usermem" ) // ReaderWriter satisfies the FileOperations interface and services both diff --git a/pkg/sentry/kernel/pipe/vfs.go b/pkg/sentry/kernel/pipe/vfs.go index 6416e0dd8..28f998e45 100644 --- a/pkg/sentry/kernel/pipe/vfs.go +++ b/pkg/sentry/kernel/pipe/vfs.go @@ -15,14 +15,16 @@ package pipe import ( - "sync" - "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/buffer" + "gvisor.dev/gvisor/pkg/context" + "gvisor.dev/gvisor/pkg/safemem" "gvisor.dev/gvisor/pkg/sentry/arch" - "gvisor.dev/gvisor/pkg/sentry/context" - "gvisor.dev/gvisor/pkg/sentry/usermem" + fslock "gvisor.dev/gvisor/pkg/sentry/fs/lock" "gvisor.dev/gvisor/pkg/sentry/vfs" + "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/syserror" + "gvisor.dev/gvisor/pkg/usermem" "gvisor.dev/gvisor/pkg/waiter" ) @@ -50,38 +52,44 @@ type VFSPipe struct { } // NewVFSPipe returns an initialized VFSPipe. -func NewVFSPipe(sizeBytes, atomicIOBytes int64) *VFSPipe { +func NewVFSPipe(isNamed bool, sizeBytes, atomicIOBytes int64) *VFSPipe { var vp VFSPipe - initPipe(&vp.pipe, true /* isNamed */, sizeBytes, atomicIOBytes) + initPipe(&vp.pipe, isNamed, sizeBytes, atomicIOBytes) return &vp } -// NewVFSPipeFD opens a named pipe. Named pipes have special blocking semantics -// during open: +// ReaderWriterPair returns read-only and write-only FDs for vp. // -// "Normally, opening the FIFO blocks until the other end is opened also. A -// process can open a FIFO in nonblocking mode. In this case, opening for -// read-only will succeed even if no-one has opened on the write side yet, -// opening for write-only will fail with ENXIO (no such device or address) -// unless the other end has already been opened. Under Linux, opening a FIFO -// for read and write will succeed both in blocking and nonblocking mode. POSIX -// leaves this behavior undefined. This can be used to open a FIFO for writing -// while there are no readers available." - fifo(7) -func (vp *VFSPipe) NewVFSPipeFD(ctx context.Context, rp *vfs.ResolvingPath, vfsd *vfs.Dentry, vfsfd *vfs.FileDescription, flags uint32) (*VFSPipeFD, error) { +// Preconditions: statusFlags should not contain an open access mode. +func (vp *VFSPipe) ReaderWriterPair(mnt *vfs.Mount, vfsd *vfs.Dentry, statusFlags uint32) (*vfs.FileDescription, *vfs.FileDescription) { + // Connected pipes share the same locks. + locks := &vfs.FileLocks{} + return vp.newFD(mnt, vfsd, linux.O_RDONLY|statusFlags, locks), vp.newFD(mnt, vfsd, linux.O_WRONLY|statusFlags, locks) +} + +// Open opens the pipe represented by vp. +func (vp *VFSPipe) Open(ctx context.Context, mnt *vfs.Mount, vfsd *vfs.Dentry, statusFlags uint32, locks *vfs.FileLocks) (*vfs.FileDescription, error) { vp.mu.Lock() defer vp.mu.Unlock() - readable := vfs.MayReadFileWithOpenFlags(flags) - writable := vfs.MayWriteFileWithOpenFlags(flags) + readable := vfs.MayReadFileWithOpenFlags(statusFlags) + writable := vfs.MayWriteFileWithOpenFlags(statusFlags) if !readable && !writable { return nil, syserror.EINVAL } - vfd, err := vp.open(rp, vfsd, vfsfd, flags) - if err != nil { - return nil, err - } + fd := vp.newFD(mnt, vfsd, statusFlags, locks) + // Named pipes have special blocking semantics during open: + // + // "Normally, opening the FIFO blocks until the other end is opened also. A + // process can open a FIFO in nonblocking mode. In this case, opening for + // read-only will succeed even if no-one has opened on the write side yet, + // opening for write-only will fail with ENXIO (no such device or address) + // unless the other end has already been opened. Under Linux, opening a + // FIFO for read and write will succeed both in blocking and nonblocking + // mode. POSIX leaves this behavior undefined. This can be used to open a + // FIFO for writing while there are no readers available." - fifo(7) switch { case readable && writable: // Pipes opened for read-write always succeed without blocking. @@ -90,23 +98,26 @@ func (vp *VFSPipe) NewVFSPipeFD(ctx context.Context, rp *vfs.ResolvingPath, vfsd case readable: newHandleLocked(&vp.rWakeup) - // If this pipe is being opened as nonblocking and there's no + // If this pipe is being opened as blocking and there's no // writer, we have to wait for a writer to open the other end. - if flags&linux.O_NONBLOCK == 0 && !vp.pipe.HasWriters() && !waitFor(&vp.mu, &vp.wWakeup, ctx) { + if vp.pipe.isNamed && statusFlags&linux.O_NONBLOCK == 0 && !vp.pipe.HasWriters() && !waitFor(&vp.mu, &vp.wWakeup, ctx) { + fd.DecRef(ctx) return nil, syserror.EINTR } case writable: newHandleLocked(&vp.wWakeup) - if !vp.pipe.HasReaders() { - // Nonblocking, write-only opens fail with ENXIO when - // the read side isn't open yet. - if flags&linux.O_NONBLOCK != 0 { + if vp.pipe.isNamed && !vp.pipe.HasReaders() { + // Non-blocking, write-only opens fail with ENXIO when the read + // side isn't open yet. + if statusFlags&linux.O_NONBLOCK != 0 { + fd.DecRef(ctx) return nil, syserror.ENXIO } // Wait for a reader to open the other end. if !waitFor(&vp.mu, &vp.rWakeup, ctx) { + fd.DecRef(ctx) return nil, syserror.EINTR } } @@ -115,102 +126,102 @@ func (vp *VFSPipe) NewVFSPipeFD(ctx context.Context, rp *vfs.ResolvingPath, vfsd panic("invalid pipe flags: must be readable, writable, or both") } - return vfd, nil + return fd, nil } // Preconditions: vp.mu must be held. -func (vp *VFSPipe) open(rp *vfs.ResolvingPath, vfsd *vfs.Dentry, vfsfd *vfs.FileDescription, flags uint32) (*VFSPipeFD, error) { - var fd VFSPipeFD - fd.flags = flags - fd.readable = vfs.MayReadFileWithOpenFlags(flags) - fd.writable = vfs.MayWriteFileWithOpenFlags(flags) - fd.vfsfd = vfsfd - fd.pipe = &vp.pipe - if fd.writable { - // The corresponding Mount.EndWrite() is in VFSPipe.Release(). - if err := rp.Mount().CheckBeginWrite(); err != nil { - return nil, err - } +func (vp *VFSPipe) newFD(mnt *vfs.Mount, vfsd *vfs.Dentry, statusFlags uint32, locks *vfs.FileLocks) *vfs.FileDescription { + fd := &VFSPipeFD{ + pipe: &vp.pipe, } + fd.LockFD.Init(locks) + fd.vfsfd.Init(fd, statusFlags, mnt, vfsd, &vfs.FileDescriptionOptions{ + DenyPRead: true, + DenyPWrite: true, + UseDentryMetadata: true, + }) switch { - case fd.readable && fd.writable: + case fd.vfsfd.IsReadable() && fd.vfsfd.IsWritable(): vp.pipe.rOpen() vp.pipe.wOpen() - case fd.readable: + case fd.vfsfd.IsReadable(): vp.pipe.rOpen() - case fd.writable: + case fd.vfsfd.IsWritable(): vp.pipe.wOpen() default: panic("invalid pipe flags: must be readable, writable, or both") } - return &fd, nil + return &fd.vfsfd } -// VFSPipeFD implements a subset of vfs.FileDescriptionImpl for pipes. It is -// expected that filesystesm will use this in a struct implementing -// vfs.FileDescriptionImpl. +// VFSPipeFD implements vfs.FileDescriptionImpl for pipes. It also implements +// non-atomic usermem.IO methods, allowing it to be passed as usermem.IO to +// other FileDescriptions for splice(2) and tee(2). type VFSPipeFD struct { - pipe *Pipe - flags uint32 - readable bool - writable bool - vfsfd *vfs.FileDescription + vfsfd vfs.FileDescription + vfs.FileDescriptionDefaultImpl + vfs.DentryMetadataFileDescriptionImpl + vfs.LockFD + + pipe *Pipe } // Release implements vfs.FileDescriptionImpl.Release. -func (fd *VFSPipeFD) Release() { +func (fd *VFSPipeFD) Release(context.Context) { var event waiter.EventMask - if fd.readable { + if fd.vfsfd.IsReadable() { fd.pipe.rClose() - event |= waiter.EventIn + event |= waiter.EventOut } - if fd.writable { + if fd.vfsfd.IsWritable() { fd.pipe.wClose() - event |= waiter.EventOut + event |= waiter.EventIn | waiter.EventHUp } if event == 0 { panic("invalid pipe flags: must be readable, writable, or both") } - if fd.writable { - fd.vfsfd.VirtualDentry().Mount().EndWrite() + fd.pipe.Notify(event) +} + +// Readiness implements waiter.Waitable.Readiness. +func (fd *VFSPipeFD) Readiness(mask waiter.EventMask) waiter.EventMask { + switch { + case fd.vfsfd.IsReadable() && fd.vfsfd.IsWritable(): + return fd.pipe.rwReadiness() + case fd.vfsfd.IsReadable(): + return fd.pipe.rReadiness() + case fd.vfsfd.IsWritable(): + return fd.pipe.wReadiness() + default: + panic("pipe FD is neither readable nor writable") } +} - fd.pipe.Notify(event) +// Allocate implements vfs.FileDescriptionImpl.Allocate. +func (fd *VFSPipeFD) Allocate(ctx context.Context, mode, offset, length uint64) error { + return syserror.ESPIPE } -// OnClose implements vfs.FileDescriptionImpl.OnClose. -func (fd *VFSPipeFD) OnClose(_ context.Context) error { - return nil +// EventRegister implements waiter.Waitable.EventRegister. +func (fd *VFSPipeFD) EventRegister(e *waiter.Entry, mask waiter.EventMask) { + fd.pipe.EventRegister(e, mask) } -// PRead implements vfs.FileDescriptionImpl.PRead. -func (fd *VFSPipeFD) PRead(_ context.Context, _ usermem.IOSequence, _ int64, _ vfs.ReadOptions) (int64, error) { - return 0, syserror.ESPIPE +// EventUnregister implements waiter.Waitable.EventUnregister. +func (fd *VFSPipeFD) EventUnregister(e *waiter.Entry) { + fd.pipe.EventUnregister(e) } // Read implements vfs.FileDescriptionImpl.Read. func (fd *VFSPipeFD) Read(ctx context.Context, dst usermem.IOSequence, _ vfs.ReadOptions) (int64, error) { - if !fd.readable { - return 0, syserror.EINVAL - } - return fd.pipe.Read(ctx, dst) } -// PWrite implements vfs.FileDescriptionImpl.PWrite. -func (fd *VFSPipeFD) PWrite(_ context.Context, _ usermem.IOSequence, _ int64, _ vfs.WriteOptions) (int64, error) { - return 0, syserror.ESPIPE -} - // Write implements vfs.FileDescriptionImpl.Write. func (fd *VFSPipeFD) Write(ctx context.Context, src usermem.IOSequence, _ vfs.WriteOptions) (int64, error) { - if !fd.writable { - return 0, syserror.EINVAL - } - return fd.pipe.Write(ctx, src) } @@ -218,3 +229,240 @@ func (fd *VFSPipeFD) Write(ctx context.Context, src usermem.IOSequence, _ vfs.Wr func (fd *VFSPipeFD) Ioctl(ctx context.Context, uio usermem.IO, args arch.SyscallArguments) (uintptr, error) { return fd.pipe.Ioctl(ctx, uio, args) } + +// PipeSize implements fcntl(F_GETPIPE_SZ). +func (fd *VFSPipeFD) PipeSize() int64 { + // Inline Pipe.FifoSize() rather than calling it with nil Context and + // fs.File and ignoring the returned error (which is always nil). + fd.pipe.mu.Lock() + defer fd.pipe.mu.Unlock() + return fd.pipe.max +} + +// SetPipeSize implements fcntl(F_SETPIPE_SZ). +func (fd *VFSPipeFD) SetPipeSize(size int64) (int64, error) { + return fd.pipe.SetFifoSize(size) +} + +// IOSequence returns a useremm.IOSequence that reads up to count bytes from, +// or writes up to count bytes to, fd. +func (fd *VFSPipeFD) IOSequence(count int64) usermem.IOSequence { + return usermem.IOSequence{ + IO: fd, + Addrs: usermem.AddrRangeSeqOf(usermem.AddrRange{0, usermem.Addr(count)}), + } +} + +// CopyIn implements usermem.IO.CopyIn. +func (fd *VFSPipeFD) CopyIn(ctx context.Context, addr usermem.Addr, dst []byte, opts usermem.IOOpts) (int, error) { + origCount := int64(len(dst)) + n, err := fd.pipe.read(ctx, readOps{ + left: func() int64 { + return int64(len(dst)) + }, + limit: func(l int64) { + dst = dst[:l] + }, + read: func(view *buffer.View) (int64, error) { + n, err := view.ReadAt(dst, 0) + view.TrimFront(int64(n)) + return int64(n), err + }, + }) + if n > 0 { + fd.pipe.Notify(waiter.EventOut) + } + if err == nil && n != origCount { + return int(n), syserror.ErrWouldBlock + } + return int(n), err +} + +// CopyOut implements usermem.IO.CopyOut. +func (fd *VFSPipeFD) CopyOut(ctx context.Context, addr usermem.Addr, src []byte, opts usermem.IOOpts) (int, error) { + origCount := int64(len(src)) + n, err := fd.pipe.write(ctx, writeOps{ + left: func() int64 { + return int64(len(src)) + }, + limit: func(l int64) { + src = src[:l] + }, + write: func(view *buffer.View) (int64, error) { + view.Append(src) + return int64(len(src)), nil + }, + }) + if n > 0 { + fd.pipe.Notify(waiter.EventIn) + } + if err == nil && n != origCount { + return int(n), syserror.ErrWouldBlock + } + return int(n), err +} + +// ZeroOut implements usermem.IO.ZeroOut. +func (fd *VFSPipeFD) ZeroOut(ctx context.Context, addr usermem.Addr, toZero int64, opts usermem.IOOpts) (int64, error) { + origCount := toZero + n, err := fd.pipe.write(ctx, writeOps{ + left: func() int64 { + return toZero + }, + limit: func(l int64) { + toZero = l + }, + write: func(view *buffer.View) (int64, error) { + view.Grow(view.Size()+toZero, true /* zero */) + return toZero, nil + }, + }) + if n > 0 { + fd.pipe.Notify(waiter.EventIn) + } + if err == nil && n != origCount { + return n, syserror.ErrWouldBlock + } + return n, err +} + +// CopyInTo implements usermem.IO.CopyInTo. +func (fd *VFSPipeFD) CopyInTo(ctx context.Context, ars usermem.AddrRangeSeq, dst safemem.Writer, opts usermem.IOOpts) (int64, error) { + count := ars.NumBytes() + if count == 0 { + return 0, nil + } + origCount := count + n, err := fd.pipe.read(ctx, readOps{ + left: func() int64 { + return count + }, + limit: func(l int64) { + count = l + }, + read: func(view *buffer.View) (int64, error) { + n, err := view.ReadToSafememWriter(dst, uint64(count)) + view.TrimFront(int64(n)) + return int64(n), err + }, + }) + if n > 0 { + fd.pipe.Notify(waiter.EventOut) + } + if err == nil && n != origCount { + return n, syserror.ErrWouldBlock + } + return n, err +} + +// CopyOutFrom implements usermem.IO.CopyOutFrom. +func (fd *VFSPipeFD) CopyOutFrom(ctx context.Context, ars usermem.AddrRangeSeq, src safemem.Reader, opts usermem.IOOpts) (int64, error) { + count := ars.NumBytes() + if count == 0 { + return 0, nil + } + origCount := count + n, err := fd.pipe.write(ctx, writeOps{ + left: func() int64 { + return count + }, + limit: func(l int64) { + count = l + }, + write: func(view *buffer.View) (int64, error) { + n, err := view.WriteFromSafememReader(src, uint64(count)) + return int64(n), err + }, + }) + if n > 0 { + fd.pipe.Notify(waiter.EventIn) + } + if err == nil && n != origCount { + return n, syserror.ErrWouldBlock + } + return n, err +} + +// SwapUint32 implements usermem.IO.SwapUint32. +func (fd *VFSPipeFD) SwapUint32(ctx context.Context, addr usermem.Addr, new uint32, opts usermem.IOOpts) (uint32, error) { + // How did a pipe get passed as the virtual address space to futex(2)? + panic("VFSPipeFD.SwapUint32 called unexpectedly") +} + +// CompareAndSwapUint32 implements usermem.IO.CompareAndSwapUint32. +func (fd *VFSPipeFD) CompareAndSwapUint32(ctx context.Context, addr usermem.Addr, old, new uint32, opts usermem.IOOpts) (uint32, error) { + panic("VFSPipeFD.CompareAndSwapUint32 called unexpectedly") +} + +// LoadUint32 implements usermem.IO.LoadUint32. +func (fd *VFSPipeFD) LoadUint32(ctx context.Context, addr usermem.Addr, opts usermem.IOOpts) (uint32, error) { + panic("VFSPipeFD.LoadUint32 called unexpectedly") +} + +// Splice reads up to count bytes from src and writes them to dst. It returns +// the number of bytes moved. +// +// Preconditions: count > 0. +func Splice(ctx context.Context, dst, src *VFSPipeFD, count int64) (int64, error) { + return spliceOrTee(ctx, dst, src, count, true /* removeFromSrc */) +} + +// Tee reads up to count bytes from src and writes them to dst, without +// removing the read bytes from src. It returns the number of bytes copied. +// +// Preconditions: count > 0. +func Tee(ctx context.Context, dst, src *VFSPipeFD, count int64) (int64, error) { + return spliceOrTee(ctx, dst, src, count, false /* removeFromSrc */) +} + +// Preconditions: count > 0. +func spliceOrTee(ctx context.Context, dst, src *VFSPipeFD, count int64, removeFromSrc bool) (int64, error) { + if dst.pipe == src.pipe { + return 0, syserror.EINVAL + } + + lockTwoPipes(dst.pipe, src.pipe) + defer dst.pipe.mu.Unlock() + defer src.pipe.mu.Unlock() + + n, err := dst.pipe.writeLocked(ctx, writeOps{ + left: func() int64 { + return count + }, + limit: func(l int64) { + count = l + }, + write: func(dstView *buffer.View) (int64, error) { + return src.pipe.readLocked(ctx, readOps{ + left: func() int64 { + return count + }, + limit: func(l int64) { + count = l + }, + read: func(srcView *buffer.View) (int64, error) { + n, err := srcView.ReadToSafememWriter(dstView, uint64(count)) + if n > 0 && removeFromSrc { + srcView.TrimFront(int64(n)) + } + return int64(n), err + }, + }) + }, + }) + if n > 0 { + dst.pipe.Notify(waiter.EventIn) + src.pipe.Notify(waiter.EventOut) + } + return n, err +} + +// LockPOSIX implements vfs.FileDescriptionImpl.LockPOSIX. +func (fd *VFSPipeFD) LockPOSIX(ctx context.Context, uid fslock.UniqueID, t fslock.LockType, start, length uint64, whence int16, block fslock.Blocker) error { + return fd.Locks().LockPOSIX(ctx, &fd.vfsfd, uid, t, start, length, whence, block) +} + +// UnlockPOSIX implements vfs.FileDescriptionImpl.UnlockPOSIX. +func (fd *VFSPipeFD) UnlockPOSIX(ctx context.Context, uid fslock.UniqueID, start, length uint64, whence int16) error { + return fd.Locks().UnlockPOSIX(ctx, &fd.vfsfd, uid, start, length, whence) +} diff --git a/pkg/sentry/kernel/pipe/writer.go b/pkg/sentry/kernel/pipe/writer.go index 5bc6aa931..ef4b70ca3 100644 --- a/pkg/sentry/kernel/pipe/writer.go +++ b/pkg/sentry/kernel/pipe/writer.go @@ -15,6 +15,7 @@ package pipe import ( + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/waiter" ) @@ -29,7 +30,7 @@ type Writer struct { // Release implements fs.FileOperations.Release. // // This overrides ReaderWriter.Release. -func (w *Writer) Release() { +func (w *Writer) Release(context.Context) { w.Pipe.wClose() // Wake up readers. |