// Copyright 2019 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipe

import (
	"io"
	"math"
	"syscall"

	"gvisor.dev/gvisor/pkg/abi/linux"
	"gvisor.dev/gvisor/pkg/amutex"
	"gvisor.dev/gvisor/pkg/context"
	"gvisor.dev/gvisor/pkg/marshal/primitive"
	"gvisor.dev/gvisor/pkg/safemem"
	"gvisor.dev/gvisor/pkg/sentry/arch"
	"gvisor.dev/gvisor/pkg/sync"
	"gvisor.dev/gvisor/pkg/usermem"
	"gvisor.dev/gvisor/pkg/waiter"
)

// This file contains Pipe file functionality that is tied to neither VFS nor
// the old fs architecture.

// Release cleans up the pipe's state.
func (p *Pipe) Release(context.Context) {
	p.rClose()
	p.wClose()

	// Wake up readers and writers.
	p.Notify(waiter.EventIn | waiter.EventOut)
}

// Read reads from the Pipe into dst.
func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
	n, err := dst.CopyOutFrom(ctx, p)
	if n > 0 {
		p.Notify(waiter.EventOut)
	}
	return n, err
}

// ReadToBlocks implements safemem.Reader.ReadToBlocks for Pipe.Read.
func (p *Pipe) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
	n, err := p.read(int64(dsts.NumBytes()), func(srcs safemem.BlockSeq) (uint64, error) {
		return safemem.CopySeq(dsts, srcs)
	}, true /* removeFromSrc */)
	return uint64(n), err
}

func (p *Pipe) read(count int64, f func(srcs safemem.BlockSeq) (uint64, error), removeFromSrc bool) (int64, error) {
	p.mu.Lock()
	defer p.mu.Unlock()
	n, err := p.peekLocked(count, f)
	if n > 0 && removeFromSrc {
		p.consumeLocked(n)
	}
	return n, err
}

// WriteTo writes to w from the Pipe.
func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) (int64, error) {
	n, err := p.read(count, func(srcs safemem.BlockSeq) (uint64, error) {
		return safemem.FromIOWriter{w}.WriteFromBlocks(srcs)
	}, !dup /* removeFromSrc */)
	if n > 0 && !dup {
		p.Notify(waiter.EventOut)
	}
	return n, err
}

// Write writes to the Pipe from src.
func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) {
	n, err := src.CopyInTo(ctx, p)
	if n > 0 {
		p.Notify(waiter.EventIn)
	}
	return n, err
}

// WriteFromBlocks implements safemem.Writer.WriteFromBlocks for Pipe.Write.
func (p *Pipe) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) {
	n, err := p.write(int64(srcs.NumBytes()), func(dsts safemem.BlockSeq) (uint64, error) {
		return safemem.CopySeq(dsts, srcs)
	})
	return uint64(n), err
}

func (p *Pipe) write(count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) {
	p.mu.Lock()
	defer p.mu.Unlock()
	return p.writeLocked(count, f)
}

// ReadFrom reads from r to the Pipe.
func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, error) {
	n, err := p.write(count, func(dsts safemem.BlockSeq) (uint64, error) {
		return safemem.FromIOReader{r}.ReadToBlocks(dsts)
	})
	if n > 0 {
		p.Notify(waiter.EventIn)
	}
	return n, err
}

// Readiness returns the ready events in the underlying pipe.
func (p *Pipe) Readiness(mask waiter.EventMask) waiter.EventMask {
	return p.rwReadiness() & mask
}

// Ioctl implements ioctls on the Pipe.
func (p *Pipe) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArguments) (uintptr, error) {
	// Switch on ioctl request.
	switch int(args[1].Int()) {
	case linux.FIONREAD:
		v := p.queued()
		if v > math.MaxInt32 {
			v = math.MaxInt32 // Silently truncate.
		}
		// Copy result to userspace.
		iocc := primitive.IOCopyContext{
			IO:  io,
			Ctx: ctx,
			Opts: usermem.IOOpts{
				AddressSpaceActive: true,
			},
		}
		_, err := primitive.CopyInt32Out(&iocc, args[2].Pointer(), int32(v))
		return 0, err
	default:
		return 0, syscall.ENOTTY
	}
}

// waitFor blocks until the underlying pipe has at least one reader/writer is
// announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this
// function will block for either readers or writers, depending on where
// 'wakeupChan' points.
//
// mu must be held by the caller. waitFor returns with mu held, but it will
// drop mu before blocking for any reader/writers.
func waitFor(mu *sync.Mutex, wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool {
	// Ideally this function would simply use a condition variable. However, the
	// wait needs to be interruptible via 'sleeper', so we must sychronize via a
	// channel. The synchronization below relies on the fact that closing a
	// channel unblocks all receives on the channel.

	// Does an appropriate wakeup channel already exist? If not, create a new
	// one. This is all done under f.mu to avoid races.
	if *wakeupChan == nil {
		*wakeupChan = make(chan struct{})
	}

	// Grab a local reference to the wakeup channel since it may disappear as
	// soon as we drop f.mu.
	wakeup := *wakeupChan

	// Drop the lock and prepare to sleep.
	mu.Unlock()
	cancel := sleeper.SleepStart()

	// Wait for either a new reader/write to be signalled via 'wakeup', or
	// for the sleep to be cancelled.
	select {
	case <-wakeup:
		sleeper.SleepFinish(true)
	case <-cancel:
		sleeper.SleepFinish(false)
	}

	// Take the lock and check if we were woken. If we were woken and
	// interrupted, the former takes priority.
	mu.Lock()
	select {
	case <-wakeup:
		return true
	default:
		return false
	}
}

// newHandleLocked signals a new pipe reader or writer depending on where
// 'wakeupChan' points. This unblocks any corresponding reader or writer
// waiting for the other end of the channel to be opened, see Fifo.waitFor.
//
// Precondition: the mutex protecting wakeupChan must be held.
func newHandleLocked(wakeupChan *chan struct{}) {
	if *wakeupChan != nil {
		close(*wakeupChan)
		*wakeupChan = nil
	}
}