// Copyright 2020 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stack

import (
	"fmt"

	"gvisor.dev/gvisor/pkg/sync"
	"gvisor.dev/gvisor/pkg/tcpip"
)

const (
	// maxPendingResolutions is the maximum number of pending link-address
	// resolutions.
	maxPendingResolutions          = 64
	maxPendingPacketsPerResolution = 256
)

type pendingPacket struct {
	nic   *NIC
	route *Route
	proto tcpip.NetworkProtocolNumber
	pkt   tcpip.PacketBuffer
}

type forwardQueue struct {
	sync.Mutex

	// The packets to send once the resolver completes.
	packets map[<-chan struct{}][]*pendingPacket

	// FIFO of channels used to cancel the oldest goroutine waiting for
	// link-address resolution.
	cancelChans []chan struct{}
}

func newForwardQueue() *forwardQueue {
	return &forwardQueue{packets: make(map[<-chan struct{}][]*pendingPacket)}
}

func (f *forwardQueue) enqueue(ch <-chan struct{}, n *NIC, r *Route, protocol tcpip.NetworkProtocolNumber, pkt tcpip.PacketBuffer) {
	shouldWait := false

	f.Lock()
	packets, ok := f.packets[ch]
	if !ok {
		shouldWait = true
	}
	for len(packets) == maxPendingPacketsPerResolution {
		p := packets[0]
		packets = packets[1:]
		p.nic.stack.stats.IP.OutgoingPacketErrors.Increment()
		p.route.Release()
	}
	if l := len(packets); l >= maxPendingPacketsPerResolution {
		panic(fmt.Sprintf("max pending packets for resolution reached; got %d packets, max = %d", l, maxPendingPacketsPerResolution))
	}
	f.packets[ch] = append(packets, &pendingPacket{
		nic:   n,
		route: r,
		proto: protocol,
		pkt:   pkt,
	})
	f.Unlock()

	if !shouldWait {
		return
	}

	// Wait for the link-address resolution to complete.
	// Start a goroutine with a forwarding-cancel channel so that we can
	// limit the maximum number of goroutines running concurrently.
	cancel := f.newCancelChannel()
	go func() {
		cancelled := false
		select {
		case <-ch:
		case <-cancel:
			cancelled = true
		}

		f.Lock()
		packets := f.packets[ch]
		delete(f.packets, ch)
		f.Unlock()

		for _, p := range packets {
			if cancelled {
				p.nic.stack.stats.IP.OutgoingPacketErrors.Increment()
			} else if _, err := p.route.Resolve(nil); err != nil {
				p.nic.stack.stats.IP.OutgoingPacketErrors.Increment()
			} else {
				p.nic.forwardPacket(p.route, p.proto, p.pkt)
			}
			p.route.Release()
		}
	}()
}

// newCancelChannel creates a channel that can cancel a pending forwarding
// activity. The oldest channel is closed if the number of open channels would
// exceed maxPendingResolutions.
func (f *forwardQueue) newCancelChannel() chan struct{} {
	f.Lock()
	defer f.Unlock()

	if len(f.cancelChans) == maxPendingResolutions {
		ch := f.cancelChans[0]
		f.cancelChans = f.cancelChans[1:]
		close(ch)
	}
	if l := len(f.cancelChans); l >= maxPendingResolutions {
		panic(fmt.Sprintf("max pending resolutions reached; got %d active resolutions, max = %d", l, maxPendingResolutions))
	}

	ch := make(chan struct{})
	f.cancelChans = append(f.cancelChans, ch)
	return ch
}