// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package pipe provides a pipe implementation.
package pipe

import (
	"fmt"
	"io"
	"sync/atomic"

	"golang.org/x/sys/unix"
	"gvisor.dev/gvisor/pkg/context"
	"gvisor.dev/gvisor/pkg/safemem"
	"gvisor.dev/gvisor/pkg/sentry/fs"
	"gvisor.dev/gvisor/pkg/sync"
	"gvisor.dev/gvisor/pkg/syserror"
	"gvisor.dev/gvisor/pkg/usermem"
	"gvisor.dev/gvisor/pkg/waiter"
)

const (
	// MinimumPipeSize is a hard limit of the minimum size of a pipe.
	// It corresponds to fs/pipe.c:pipe_min_size.
	MinimumPipeSize = usermem.PageSize

	// MaximumPipeSize is a hard limit on the maximum size of a pipe.
	// It corresponds to fs/pipe.c:pipe_max_size.
	MaximumPipeSize = 1048576

	// DefaultPipeSize is the system-wide default size of a pipe in bytes.
	// It corresponds to pipe_fs_i.h:PIPE_DEF_BUFFERS.
	DefaultPipeSize = 16 * usermem.PageSize

	// atomicIOBytes is the maximum number of bytes that the pipe will
	// guarantee atomic reads or writes atomically.
	// It corresponds to limits.h:PIPE_BUF.
	atomicIOBytes = 4096
)

// Pipe is an encapsulation of a platform-independent pipe.
// It manages a buffered byte queue shared between a reader/writer
// pair.
//
// +stateify savable
type Pipe struct {
	waiter.Queue `state:"nosave"`

	// isNamed indicates whether this is a named pipe.
	//
	// This value is immutable.
	isNamed bool

	// The number of active readers for this pipe.
	//
	// Access atomically.
	readers int32

	// The number of active writes for this pipe.
	//
	// Access atomically.
	writers int32

	// mu protects all pipe internal state below.
	mu sync.Mutex `state:"nosave"`

	// buf holds the pipe's data. buf is a circular buffer; the first valid
	// byte in buf is at offset off, and the pipe contains size valid bytes.
	// bufBlocks contains two identical safemem.Blocks representing buf; this
	// avoids needing to heap-allocate a new safemem.Block slice when buf is
	// resized. bufBlockSeq is a safemem.BlockSeq representing bufBlocks.
	//
	// These fields are protected by mu.
	buf         []byte
	bufBlocks   [2]safemem.Block `state:"nosave"`
	bufBlockSeq safemem.BlockSeq `state:"nosave"`
	off         int64
	size        int64

	// max is the maximum size of the pipe in bytes. When this max has been
	// reached, writers will get EWOULDBLOCK.
	//
	// This is protected by mu.
	max int64

	// hadWriter indicates if this pipe ever had a writer. Note that this
	// does not necessarily indicate there is *currently* a writer, just
	// that there has been a writer at some point since the pipe was
	// created.
	//
	// This is protected by mu.
	hadWriter bool
}

// NewPipe initializes and returns a pipe.
//
// N.B. The size will be bounded.
func NewPipe(isNamed bool, sizeBytes int64) *Pipe {
	var p Pipe
	initPipe(&p, isNamed, sizeBytes)
	return &p
}

func initPipe(pipe *Pipe, isNamed bool, sizeBytes int64) {
	if sizeBytes < MinimumPipeSize {
		sizeBytes = MinimumPipeSize
	}
	if sizeBytes > MaximumPipeSize {
		sizeBytes = MaximumPipeSize
	}
	pipe.isNamed = isNamed
	pipe.max = sizeBytes
}

// NewConnectedPipe initializes a pipe and returns a pair of objects
// representing the read and write ends of the pipe.
func NewConnectedPipe(ctx context.Context, sizeBytes int64) (*fs.File, *fs.File) {
	p := NewPipe(false /* isNamed */, sizeBytes)

	// Build an fs.Dirent for the pipe which will be shared by both
	// returned files.
	perms := fs.FilePermissions{
		User: fs.PermMask{Read: true, Write: true},
	}
	iops := NewInodeOperations(ctx, perms, p)
	ino := pipeDevice.NextIno()
	sattr := fs.StableAttr{
		Type:      fs.Pipe,
		DeviceID:  pipeDevice.DeviceID(),
		InodeID:   ino,
		BlockSize: int64(atomicIOBytes),
	}
	ms := fs.NewPseudoMountSource(ctx)
	d := fs.NewDirent(ctx, fs.NewInode(ctx, iops, ms, sattr), fmt.Sprintf("pipe:[%d]", ino))
	// The p.Open calls below will each take a reference on the Dirent. We
	// must drop the one we already have.
	defer d.DecRef(ctx)
	return p.Open(ctx, d, fs.FileFlags{Read: true}), p.Open(ctx, d, fs.FileFlags{Write: true})
}

// Open opens the pipe and returns a new file.
//
// Precondition: at least one of flags.Read or flags.Write must be set.
func (p *Pipe) Open(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) *fs.File {
	flags.NonSeekable = true
	switch {
	case flags.Read && flags.Write:
		p.rOpen()
		p.wOpen()
		return fs.NewFile(ctx, d, flags, &ReaderWriter{
			Pipe: p,
		})
	case flags.Read:
		p.rOpen()
		return fs.NewFile(ctx, d, flags, &Reader{
			ReaderWriter: ReaderWriter{Pipe: p},
		})
	case flags.Write:
		p.wOpen()
		return fs.NewFile(ctx, d, flags, &Writer{
			ReaderWriter: ReaderWriter{Pipe: p},
		})
	default:
		// Precondition violated.
		panic("invalid pipe flags")
	}
}

// peekLocked passes the first count bytes in the pipe to f and returns its
// result. If fewer than count bytes are available, the safemem.BlockSeq passed
// to f will be less than count bytes in length.
//
// peekLocked does not mutate the pipe; if the read consumes bytes from the
// pipe, then the caller is responsible for calling p.consumeLocked() and
// p.Notify(waiter.EventOut). (The latter must be called with p.mu unlocked.)
//
// Preconditions:
// * p.mu must be locked.
// * This pipe must have readers.
func (p *Pipe) peekLocked(count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) {
	// Don't block for a zero-length read even if the pipe is empty.
	if count == 0 {
		return 0, nil
	}

	// Limit the amount of data read to the amount of data in the pipe.
	if count > p.size {
		if p.size == 0 {
			if !p.HasWriters() {
				return 0, io.EOF
			}
			return 0, syserror.ErrWouldBlock
		}
		count = p.size
	}

	// Prepare the view of the data to be read.
	bs := p.bufBlockSeq.DropFirst64(uint64(p.off)).TakeFirst64(uint64(count))

	// Perform the read.
	done, err := f(bs)
	return int64(done), err
}

// consumeLocked consumes the first n bytes in the pipe, such that they will no
// longer be visible to future reads.
//
// Preconditions:
// * p.mu must be locked.
// * The pipe must contain at least n bytes.
func (p *Pipe) consumeLocked(n int64) {
	p.off += n
	if max := int64(len(p.buf)); p.off >= max {
		p.off -= max
	}
	p.size -= n
}

// writeLocked passes a safemem.BlockSeq representing the first count bytes of
// unused space in the pipe to f and returns the result. If fewer than count
// bytes are free, the safemem.BlockSeq passed to f will be less than count
// bytes in length. If the pipe is full or otherwise cannot accomodate a write
// of any number of bytes up to count, writeLocked returns ErrWouldBlock
// without calling f.
//
// Unlike peekLocked, writeLocked assumes that f returns the number of bytes
// written to the pipe, and increases the number of bytes stored in the pipe
// accordingly. Callers are still responsible for calling
// p.Notify(waiter.EventIn) with p.mu unlocked.
//
// Preconditions:
// * p.mu must be locked.
func (p *Pipe) writeLocked(count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) {
	// Can't write to a pipe with no readers.
	if !p.HasReaders() {
		return 0, unix.EPIPE
	}

	avail := p.max - p.size
	if avail == 0 {
		return 0, syserror.ErrWouldBlock
	}
	short := false
	if count > avail {
		// POSIX requires that a write smaller than atomicIOBytes
		// (PIPE_BUF) be atomic, but requires no atomicity for writes
		// larger than this.
		if count <= atomicIOBytes {
			return 0, syserror.ErrWouldBlock
		}
		count = avail
		short = true
	}

	// Ensure that the buffer is big enough.
	if newLen, oldCap := p.size+count, int64(len(p.buf)); newLen > oldCap {
		// Allocate a new buffer.
		newCap := oldCap * 2
		if oldCap == 0 {
			newCap = 8 // arbitrary; sending individual integers across pipes is relatively common
		}
		for newLen > newCap {
			newCap *= 2
		}
		if newCap > p.max {
			newCap = p.max
		}
		newBuf := make([]byte, newCap)
		// Copy the old buffer's contents to the beginning of the new one.
		safemem.CopySeq(
			safemem.BlockSeqOf(safemem.BlockFromSafeSlice(newBuf)),
			p.bufBlockSeq.DropFirst64(uint64(p.off)).TakeFirst64(uint64(p.size)))
		// Switch to the new buffer.
		p.buf = newBuf
		p.bufBlocks[0] = safemem.BlockFromSafeSlice(newBuf)
		p.bufBlocks[1] = p.bufBlocks[0]
		p.bufBlockSeq = safemem.BlockSeqFromSlice(p.bufBlocks[:])
		p.off = 0
	}

	// Prepare the view of the space to be written.
	woff := p.off + p.size
	if woff >= int64(len(p.buf)) {
		woff -= int64(len(p.buf))
	}
	bs := p.bufBlockSeq.DropFirst64(uint64(woff)).TakeFirst64(uint64(count))

	// Perform the write.
	doneU64, err := f(bs)
	done := int64(doneU64)
	p.size += done
	if done < count || err != nil {
		return done, err
	}

	// If we shortened the write, adjust the returned error appropriately.
	if short {
		return done, syserror.ErrWouldBlock
	}

	return done, nil
}

// rOpen signals a new reader of the pipe.
func (p *Pipe) rOpen() {
	atomic.AddInt32(&p.readers, 1)
}

// wOpen signals a new writer of the pipe.
func (p *Pipe) wOpen() {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.hadWriter = true
	atomic.AddInt32(&p.writers, 1)
}

// rClose signals that a reader has closed their end of the pipe.
func (p *Pipe) rClose() {
	newReaders := atomic.AddInt32(&p.readers, -1)
	if newReaders < 0 {
		panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders))
	}
}

// wClose signals that a writer has closed their end of the pipe.
func (p *Pipe) wClose() {
	newWriters := atomic.AddInt32(&p.writers, -1)
	if newWriters < 0 {
		panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters))
	}
}

// HasReaders returns whether the pipe has any active readers.
func (p *Pipe) HasReaders() bool {
	return atomic.LoadInt32(&p.readers) > 0
}

// HasWriters returns whether the pipe has any active writers.
func (p *Pipe) HasWriters() bool {
	return atomic.LoadInt32(&p.writers) > 0
}

// rReadinessLocked calculates the read readiness.
//
// Precondition: mu must be held.
func (p *Pipe) rReadinessLocked() waiter.EventMask {
	ready := waiter.EventMask(0)
	if p.HasReaders() && p.size != 0 {
		ready |= waiter.EventIn
	}
	if !p.HasWriters() && p.hadWriter {
		// POLLHUP must be suppressed until the pipe has had at least one writer
		// at some point. Otherwise a reader thread may poll and immediately get
		// a POLLHUP before the writer ever opens the pipe, which the reader may
		// interpret as the writer opening then closing the pipe.
		ready |= waiter.EventHUp
	}
	return ready
}

// rReadiness returns a mask that states whether the read end of the pipe is
// ready for reading.
func (p *Pipe) rReadiness() waiter.EventMask {
	p.mu.Lock()
	defer p.mu.Unlock()
	return p.rReadinessLocked()
}

// wReadinessLocked calculates the write readiness.
//
// Precondition: mu must be held.
func (p *Pipe) wReadinessLocked() waiter.EventMask {
	ready := waiter.EventMask(0)
	if p.HasWriters() && p.size < p.max {
		ready |= waiter.EventOut
	}
	if !p.HasReaders() {
		ready |= waiter.EventErr
	}
	return ready
}

// wReadiness returns a mask that states whether the write end of the pipe
// is ready for writing.
func (p *Pipe) wReadiness() waiter.EventMask {
	p.mu.Lock()
	defer p.mu.Unlock()
	return p.wReadinessLocked()
}

// rwReadiness returns a mask that states whether a read-write handle to the
// pipe is ready for IO.
func (p *Pipe) rwReadiness() waiter.EventMask {
	p.mu.Lock()
	defer p.mu.Unlock()
	return p.rReadinessLocked() | p.wReadinessLocked()
}

// queued returns the amount of queued data.
func (p *Pipe) queued() int64 {
	p.mu.Lock()
	defer p.mu.Unlock()
	return p.queuedLocked()
}

func (p *Pipe) queuedLocked() int64 {
	return p.size
}

// FifoSize implements fs.FifoSizer.FifoSize.
func (p *Pipe) FifoSize(context.Context, *fs.File) (int64, error) {
	p.mu.Lock()
	defer p.mu.Unlock()
	return p.max, nil
}

// SetFifoSize implements fs.FifoSizer.SetFifoSize.
func (p *Pipe) SetFifoSize(size int64) (int64, error) {
	if size < 0 {
		return 0, syserror.EINVAL
	}
	if size < MinimumPipeSize {
		size = MinimumPipeSize // Per spec.
	}
	if size > MaximumPipeSize {
		return 0, syserror.EPERM
	}
	p.mu.Lock()
	defer p.mu.Unlock()
	if size < p.size {
		return 0, syserror.EBUSY
	}
	p.max = size
	return size, nil
}