// Copyright 2020 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fuse

import (
	"sync"

	"gvisor.dev/gvisor/pkg/abi/linux"
	"gvisor.dev/gvisor/pkg/context"
	"gvisor.dev/gvisor/pkg/errors/linuxerr"
	"gvisor.dev/gvisor/pkg/log"
	"gvisor.dev/gvisor/pkg/sentry/kernel"
	"gvisor.dev/gvisor/pkg/waiter"
)

const (
	// fuseDefaultMaxBackground is the default value for MaxBackground.
	fuseDefaultMaxBackground = 12

	// fuseDefaultCongestionThreshold is the default value for CongestionThreshold,
	// and is 75% of the default maximum of MaxGround.
	fuseDefaultCongestionThreshold = (fuseDefaultMaxBackground * 3 / 4)

	// fuseDefaultMaxPagesPerReq is the default value for MaxPagesPerReq.
	fuseDefaultMaxPagesPerReq = 32
)

// connection is the struct by which the sentry communicates with the FUSE server daemon.
//
// Lock order:
// - conn.fd.mu
// - conn.mu
// - conn.asyncMu
//
// +stateify savable
type connection struct {
	fd *DeviceFD

	// mu protects access to struct memebers.
	mu sync.Mutex `state:"nosave"`

	// attributeVersion is the version of connection's attributes.
	attributeVersion uint64

	// We target FUSE 7.23.
	// The following FUSE_INIT flags are currently unsupported by this implementation:
	// - FUSE_EXPORT_SUPPORT
	// - FUSE_POSIX_LOCKS: requires POSIX locks
	// - FUSE_FLOCK_LOCKS: requires POSIX locks
	// - FUSE_AUTO_INVAL_DATA: requires page caching eviction
	// - FUSE_DO_READDIRPLUS/FUSE_READDIRPLUS_AUTO: requires FUSE_READDIRPLUS implementation
	// - FUSE_ASYNC_DIO
	// - FUSE_PARALLEL_DIROPS (7.25)
	// - FUSE_HANDLE_KILLPRIV (7.26)
	// - FUSE_POSIX_ACL: affects defaultPermissions, posixACL, xattr handler (7.26)
	// - FUSE_ABORT_ERROR (7.27)
	// - FUSE_CACHE_SYMLINKS (7.28)
	// - FUSE_NO_OPENDIR_SUPPORT (7.29)
	// - FUSE_EXPLICIT_INVAL_DATA: requires page caching eviction (7.30)
	// - FUSE_MAP_ALIGNMENT (7.31)

	// initialized after receiving FUSE_INIT reply.
	// Until it's set, suspend sending FUSE requests.
	// Use SetInitialized() and IsInitialized() for atomic access.
	initialized int32

	// initializedChan is used to block requests before initialization.
	initializedChan chan struct{} `state:".(bool)"`

	// connected (connection established) when a new FUSE file system is created.
	// Set to false when:
	//   umount,
	//   connection abort,
	//   device release.
	connected bool

	// connInitError if FUSE_INIT encountered error (major version mismatch).
	// Only set in INIT.
	connInitError bool

	// connInitSuccess if FUSE_INIT is successful.
	// Only set in INIT.
	// Used for destory (not yet implemented).
	connInitSuccess bool

	// aborted via sysfs, and will send ECONNABORTED to read after disconnection (instead of ENODEV).
	// Set only if abortErr is true and via fuse control fs (not yet implemented).
	// TODO(gvisor.dev/issue/3525): set this to true when user aborts.
	aborted bool

	// numWating is the number of requests waiting to be
	// sent to FUSE device or being processed by FUSE daemon.
	numWaiting uint32

	// Terminology note:
	//
	// - `asyncNumMax` is the `MaxBackground` in the FUSE_INIT_IN struct.
	//
	// - `asyncCongestionThreshold` is the `CongestionThreshold` in the FUSE_INIT_IN struct.
	//
	// We call the "background" requests in unix term as async requests.
	// The "async requests" in unix term is our async requests that expect a reply,
	// i.e. `!request.noReply`

	// asyncMu protects the async request fields.
	asyncMu sync.Mutex `state:"nosave"`

	// asyncNum is the number of async requests.
	// Protected by asyncMu.
	asyncNum uint16

	// asyncCongestionThreshold the number of async requests.
	// Negotiated in FUSE_INIT as "CongestionThreshold".
	// TODO(gvisor.dev/issue/3529): add congestion control.
	// Protected by asyncMu.
	asyncCongestionThreshold uint16

	// asyncNumMax is the maximum number of asyncNum.
	// Connection blocks the async requests when it is reached.
	// Negotiated in FUSE_INIT as "MaxBackground".
	// Protected by asyncMu.
	asyncNumMax uint16

	// maxRead is the maximum size of a read buffer in in bytes.
	// Initialized from a fuse fs parameter.
	maxRead uint32

	// maxWrite is the maximum size of a write buffer in bytes.
	// Negotiated in FUSE_INIT.
	maxWrite uint32

	// maxPages is the maximum number of pages for a single request to use.
	// Negotiated in FUSE_INIT.
	maxPages uint16

	// minor version of the FUSE protocol.
	// Negotiated and only set in INIT.
	minor uint32

	// atomicOTrunc is true when FUSE does not send a separate SETATTR request
	// before open with O_TRUNC flag.
	// Negotiated and only set in INIT.
	atomicOTrunc bool

	// asyncRead if read pages asynchronously.
	// Negotiated and only set in INIT.
	asyncRead bool

	// writebackCache is true for write-back cache policy,
	// false for write-through policy.
	// Negotiated and only set in INIT.
	writebackCache bool

	// bigWrites if doing multi-page cached writes.
	// Negotiated and only set in INIT.
	bigWrites bool

	// dontMask if filestestem does not apply umask to creation modes.
	// Negotiated in INIT.
	dontMask bool

	// noOpen if FUSE server doesn't support open operation.
	// This flag only influence performance, not correctness of the program.
	noOpen bool
}

func (conn *connection) saveInitializedChan() bool {
	select {
	case <-conn.initializedChan:
		return true // Closed.
	default:
		return false // Not closed.
	}
}

func (conn *connection) loadInitializedChan(closed bool) {
	conn.initializedChan = make(chan struct{}, 1)
	if closed {
		close(conn.initializedChan)
	}
}

// newFUSEConnection creates a FUSE connection to fuseFD.
func newFUSEConnection(_ context.Context, fuseFD *DeviceFD, opts *filesystemOptions) (*connection, error) {
	// Mark the device as ready so it can be used.
	// FIXME(gvisor.dev/issue/4813): fuseFD's fields are accessed without
	// synchronization and without checking if fuseFD has already been used to
	// mount another filesystem.

	// Create the writeBuf for the header to be stored in.
	hdrLen := uint32((*linux.FUSEHeaderOut)(nil).SizeBytes())
	fuseFD.writeBuf = make([]byte, hdrLen)
	fuseFD.completions = make(map[linux.FUSEOpID]*futureResponse)
	fuseFD.fullQueueCh = make(chan struct{}, opts.maxActiveRequests)
	fuseFD.writeCursor = 0

	return &connection{
		fd:                       fuseFD,
		asyncNumMax:              fuseDefaultMaxBackground,
		asyncCongestionThreshold: fuseDefaultCongestionThreshold,
		maxRead:                  opts.maxRead,
		maxPages:                 fuseDefaultMaxPagesPerReq,
		initializedChan:          make(chan struct{}),
		connected:                true,
	}, nil
}

// CallAsync makes an async (aka background) request.
// It's a simple wrapper around Call().
func (conn *connection) CallAsync(t *kernel.Task, r *Request) error {
	r.async = true
	_, err := conn.Call(t, r)
	return err
}

// Call makes a request to the server.
// Block before the connection is initialized.
// When the Request is FUSE_INIT, it will not be blocked before initialization.
// Task should never be nil.
//
// For a sync request, it blocks the invoking task until
// a server responds with a response.
//
// For an async request (that do not expect a response immediately),
// it returns directly unless being blocked either before initialization
// or when there are too many async requests ongoing.
//
// Example for async request:
// init, readahead, write, async read/write, fuse_notify_reply,
// non-sync release, interrupt, forget.
//
// The forget request does not have a reply,
// as documented in include/uapi/linux/fuse.h:FUSE_FORGET.
func (conn *connection) Call(t *kernel.Task, r *Request) (*Response, error) {
	// Block requests sent before connection is initalized.
	if !conn.Initialized() && r.hdr.Opcode != linux.FUSE_INIT {
		if err := t.Block(conn.initializedChan); err != nil {
			return nil, err
		}
	}

	if !conn.connected {
		return nil, linuxerr.ENOTCONN
	}

	if conn.connInitError {
		return nil, linuxerr.ECONNREFUSED
	}

	fut, err := conn.callFuture(t, r)
	if err != nil {
		return nil, err
	}

	return fut.resolve(t)
}

// callFuture makes a request to the server and returns a future response.
// Call resolve() when the response needs to be fulfilled.
func (conn *connection) callFuture(t *kernel.Task, r *Request) (*futureResponse, error) {
	conn.fd.mu.Lock()
	defer conn.fd.mu.Unlock()

	// Is the queue full?
	//
	// We must busy wait here until the request can be queued. We don't
	// block on the fd.fullQueueCh with a lock - so after being signalled,
	// before we acquire the lock, it is possible that a barging task enters
	// and queues a request. As a result, upon acquiring the lock we must
	// again check if the room is available.
	//
	// This can potentially starve a request forever but this can only happen
	// if there are always too many ongoing requests all the time. The
	// supported maxActiveRequests setting should be really high to avoid this.
	for conn.fd.numActiveRequests == conn.fd.fs.opts.maxActiveRequests {
		log.Infof("Blocking request %v from being queued. Too many active requests: %v",
			r.id, conn.fd.numActiveRequests)
		conn.fd.mu.Unlock()
		err := t.Block(conn.fd.fullQueueCh)
		conn.fd.mu.Lock()
		if err != nil {
			return nil, err
		}
	}

	return conn.callFutureLocked(t, r)
}

// callFutureLocked makes a request to the server and returns a future response.
func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureResponse, error) {
	// Check connected again holding conn.mu.
	conn.mu.Lock()
	if !conn.connected {
		conn.mu.Unlock()
		// we checked connected before,
		// this must be due to aborted connection.
		return nil, linuxerr.ECONNABORTED
	}
	conn.mu.Unlock()

	conn.fd.queue.PushBack(r)
	conn.fd.numActiveRequests++
	fut := newFutureResponse(r)
	conn.fd.completions[r.id] = fut

	// Signal the readers that there is something to read.
	conn.fd.waitQueue.Notify(waiter.ReadableEvents)

	return fut, nil
}