// Copyright 2018 The gVisor Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package pipe provides an in-memory implementation of a unidirectional // pipe. // // The goal of this pipe is to emulate the pipe syscall in all of its // edge cases and guarantees of atomic IO. package pipe import ( "fmt" "sync" "sync/atomic" "syscall" "gvisor.googlesource.com/gvisor/pkg/sentry/context" "gvisor.googlesource.com/gvisor/pkg/sentry/fs" "gvisor.googlesource.com/gvisor/pkg/sentry/usermem" "gvisor.googlesource.com/gvisor/pkg/syserror" "gvisor.googlesource.com/gvisor/pkg/waiter" ) // DefaultPipeSize is the system-wide default size of a pipe in bytes. const DefaultPipeSize = 65536 // Pipe is an encapsulation of a platform-independent pipe. // It manages a buffered byte queue shared between a reader/writer // pair. // // +stateify savable type Pipe struct { waiter.Queue `state:"nosave"` // Whether this is a named or anonymous pipe. isNamed bool // The dirent backing this pipe. Shared by all readers and writers. Dirent *fs.Dirent // The buffered byte queue. data bufferList // Max size of the pipe in bytes. When this max has been reached, // writers will get EWOULDBLOCK. max int // Current size of the pipe in bytes. size int // Max number of bytes the pipe can guarantee to read or write // atomically. atomicIOBytes int // The number of active readers for this pipe. Load/store atomically. readers int32 // The number of active writes for this pipe. Load/store atomically. writers int32 // This flag indicates if this pipe ever had a writer. Note that this does // not necessarily indicate there is *currently* a writer, just that there // has been a writer at some point since the pipe was created. // // Protected by mu. hadWriter bool // Lock protecting all pipe internal state. mu sync.Mutex `state:"nosave"` } // NewPipe initializes and returns a pipe. A pipe created by this function is // persistent, and will remain valid even without any open fds to it. Named // pipes for mknod(2) are created via this function. Note that the // implementation of blocking semantics for opening the read and write ends of a // named pipe are left to filesystems. func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe { p := &Pipe{ isNamed: isNamed, max: sizeBytes, atomicIOBytes: atomicIOBytes, } // Build the fs.Dirent of this pipe, shared by all fs.Files associated // with this pipe. perms := fs.FilePermissions{ User: fs.PermMask{Read: true, Write: true}, } iops := NewInodeOperations(ctx, perms, p) ino := pipeDevice.NextIno() sattr := fs.StableAttr{ Type: fs.Pipe, DeviceID: pipeDevice.DeviceID(), InodeID: ino, BlockSize: int64(atomicIOBytes), } ms := fs.NewPseudoMountSource() p.Dirent = fs.NewDirent(fs.NewInode(iops, ms, sattr), fmt.Sprintf("pipe:[%d]", ino)) return p } // NewConnectedPipe initializes a pipe and returns a pair of objects (which // implement kio.File) representing the read and write ends of the pipe. A pipe // created by this function becomes invalid as soon as either the read or write // end is closed, and errors on subsequent operations on either end. Pipes // for pipe(2) and pipe2(2) are generally created this way. func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) { p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes) return p.ROpen(ctx), p.WOpen(ctx) } // ROpen opens the pipe for reading. func (p *Pipe) ROpen(ctx context.Context) *fs.File { p.rOpen() return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true}, &Reader{ ReaderWriter: ReaderWriter{Pipe: p}, }) } // WOpen opens the pipe for writing. func (p *Pipe) WOpen(ctx context.Context) *fs.File { p.wOpen() return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Write: true}, &Writer{ ReaderWriter: ReaderWriter{Pipe: p}, }) } // RWOpen opens the pipe for both reading and writing. func (p *Pipe) RWOpen(ctx context.Context) *fs.File { p.rOpen() p.wOpen() return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{ Pipe: p, }) } // read reads data from the pipe into dst and returns the number of bytes // read, or returns ErrWouldBlock if the pipe is empty. func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) { if !p.HasReaders() { return 0, syscall.EBADF } // Don't block for a zero-length read even if the pipe is empty. if dst.NumBytes() == 0 { return 0, nil } p.mu.Lock() defer p.mu.Unlock() // If there is nothing to read at the moment but there is a writer, tell the // caller to block. if p.size == 0 { if !p.HasWriters() { // There are no writers, return EOF. return 0, nil } return 0, syserror.ErrWouldBlock } var n int64 for buffer := p.data.Front(); buffer != nil; buffer = p.data.Front() { n0, err := dst.CopyOut(ctx, buffer.bytes()) n += int64(n0) p.size -= n0 if buffer.truncate(n0) == 0 { p.data.Remove(buffer) } dst = dst.DropFirst(n0) if dst.NumBytes() == 0 || err != nil { return n, err } } return n, nil } // write writes data from sv into the pipe and returns the number of bytes // written. If no bytes are written because the pipe is full (or has less than // atomicIOBytes free capacity), write returns ErrWouldBlock. func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) { p.mu.Lock() defer p.mu.Unlock() if !p.HasWriters() { return 0, syscall.EBADF } if !p.HasReaders() { return 0, syscall.EPIPE } // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be // atomic, but requires no atomicity for writes larger than this. However, // Linux appears to provide stronger semantics than this in practice: // unmerged writes are done one PAGE_SIZE buffer at a time, so for larger // writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement // this by writing at most atomicIOBytes at a time if we can't service the // write in its entirety. canWrite := src.NumBytes() if canWrite > int64(p.max-p.size) { if p.max-p.size >= p.atomicIOBytes { canWrite = int64(p.atomicIOBytes) } else { return 0, syserror.ErrWouldBlock } } // Copy data from user memory into a pipe-owned buffer. buf := make([]byte, canWrite) n, err := src.CopyIn(ctx, buf) if n > 0 { p.data.PushBack(newBuffer(buf[:n])) p.size += n } if int64(n) < src.NumBytes() && err == nil { // Partial write due to full pipe. err = syserror.ErrWouldBlock } return int64(n), err } // rOpen signals a new reader of the pipe. func (p *Pipe) rOpen() { atomic.AddInt32(&p.readers, 1) } // wOpen signals a new writer of the pipe. func (p *Pipe) wOpen() { p.mu.Lock() defer p.mu.Unlock() p.hadWriter = true atomic.AddInt32(&p.writers, 1) } // rClose signals that a reader has closed their end of the pipe. func (p *Pipe) rClose() { newReaders := atomic.AddInt32(&p.readers, -1) if newReaders < 0 { panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders)) } } // wClose signals that a writer has closed their end of the pipe. func (p *Pipe) wClose() { newWriters := atomic.AddInt32(&p.writers, -1) if newWriters < 0 { panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters)) } } // HasReaders returns whether the pipe has any active readers. func (p *Pipe) HasReaders() bool { return atomic.LoadInt32(&p.readers) > 0 } // HasWriters returns whether the pipe has any active writers. func (p *Pipe) HasWriters() bool { return atomic.LoadInt32(&p.writers) > 0 } func (p *Pipe) rReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasReaders() && p.data.Front() != nil { ready |= waiter.EventIn } if !p.HasWriters() && p.hadWriter { // POLLHUP must be suppressed until the pipe has had at least one writer // at some point. Otherwise a reader thread may poll and immediately get // a POLLHUP before the writer ever opens the pipe, which the reader may // interpret as the writer opening then closing the pipe. ready |= waiter.EventHUp } return ready } // rReadiness returns a mask that states whether the read end of the pipe is // ready for reading. func (p *Pipe) rReadiness() waiter.EventMask { p.mu.Lock() defer p.mu.Unlock() return p.rReadinessLocked() } func (p *Pipe) wReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasWriters() && p.size < p.max { ready |= waiter.EventOut } if !p.HasReaders() { ready |= waiter.EventErr } return ready } // wReadiness returns a mask that states whether the write end of the pipe // is ready for writing. func (p *Pipe) wReadiness() waiter.EventMask { p.mu.Lock() defer p.mu.Unlock() return p.wReadinessLocked() } // rwReadiness returns a mask that states whether a read-write handle to the // pipe is ready for IO. func (p *Pipe) rwReadiness() waiter.EventMask { p.mu.Lock() defer p.mu.Unlock() return p.rReadinessLocked() | p.wReadinessLocked() } func (p *Pipe) queuedSize() int { p.mu.Lock() defer p.mu.Unlock() return p.size }