// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
	"encoding/json"
	"fmt"
	log "github.com/Sirupsen/logrus"
	"github.com/eapache/channels"
	api "github.com/osrg/gobgp/api"
	"github.com/osrg/gobgp/config"
	"github.com/osrg/gobgp/packet/bgp"
	"github.com/osrg/gobgp/table"
	"net"
	"time"
)

const (
	FLOP_THRESHOLD    = time.Second * 30
	MIN_CONNECT_RETRY = 10
)

type Peer struct {
	tableId           string
	fsm               *FSM
	adjRibIn          *table.AdjRib
	adjRibOut         *table.AdjRib
	outgoing          chan *FsmOutgoingMsg
	policy            *table.RoutingPolicy
	localRib          *table.TableManager
	prefixLimitWarned map[bgp.RouteFamily]bool
}

func NewPeer(g *config.Global, conf *config.Neighbor, loc *table.TableManager, policy *table.RoutingPolicy) *Peer {
	peer := &Peer{
		outgoing:          make(chan *FsmOutgoingMsg, 128),
		localRib:          loc,
		policy:            policy,
		fsm:               NewFSM(g, conf, policy),
		prefixLimitWarned: make(map[bgp.RouteFamily]bool),
	}
	if peer.isRouteServerClient() {
		peer.tableId = conf.Config.NeighborAddress
	} else {
		peer.tableId = table.GLOBAL_RIB_NAME
	}
	rfs, _ := config.AfiSafis(conf.AfiSafis).ToRfList()
	peer.adjRibIn = table.NewAdjRib(peer.ID(), rfs)
	peer.adjRibOut = table.NewAdjRib(peer.ID(), rfs)
	return peer
}

func (peer *Peer) ID() string {
	return peer.fsm.pConf.Config.NeighborAddress
}

func (peer *Peer) TableID() string {
	return peer.tableId
}

func (peer *Peer) isIBGPPeer() bool {
	return peer.fsm.pConf.Config.PeerAs == peer.fsm.gConf.Config.As
}

func (peer *Peer) isRouteServerClient() bool {
	return peer.fsm.pConf.RouteServer.Config.RouteServerClient
}

func (peer *Peer) isRouteReflectorClient() bool {
	return peer.fsm.pConf.RouteReflector.Config.RouteReflectorClient
}

func (peer *Peer) isGracefulRestartEnabled() bool {
	return peer.fsm.pConf.GracefulRestart.State.Enabled
}

func (peer *Peer) recvedAllEOR() bool {
	for _, a := range peer.fsm.pConf.AfiSafis {
		if s := a.MpGracefulRestart.State; s.Enabled && !s.EndOfRibReceived {
			return false
		}
	}
	return true
}

func (peer *Peer) configuredRFlist() []bgp.RouteFamily {
	rfs, _ := config.AfiSafis(peer.fsm.pConf.AfiSafis).ToRfList()
	return rfs
}

func (peer *Peer) forwardingPreservedFamilies() ([]bgp.RouteFamily, []bgp.RouteFamily) {
	list := []bgp.RouteFamily{}
	for _, a := range peer.fsm.pConf.AfiSafis {
		if s := a.MpGracefulRestart.State; s.Enabled && s.Received {
			f, _ := bgp.GetRouteFamily(string(a.Config.AfiSafiName))
			list = append(list, f)
		}
	}
	preserved := []bgp.RouteFamily{}
	notPreserved := []bgp.RouteFamily{}
	for _, f := range peer.configuredRFlist() {
		p := true
		for _, g := range list {
			if f == g {
				p = false
				preserved = append(preserved, f)
			}
		}
		if p {
			notPreserved = append(notPreserved, f)
		}
	}
	return preserved, notPreserved
}

func (peer *Peer) getAccepted(rfList []bgp.RouteFamily) []*table.Path {
	return peer.adjRibIn.PathList(rfList, true)
}

func (peer *Peer) getBestFromLocal(rfList []bgp.RouteFamily) ([]*table.Path, []*table.Path) {
	pathList := []*table.Path{}
	filtered := []*table.Path{}
	options := &table.PolicyOptions{
		Neighbor: peer.fsm.peerInfo.Address,
	}
	for _, path := range peer.localRib.GetBestPathList(peer.TableID(), rfList) {
		p := peer.policy.ApplyPolicy(peer.TableID(), table.POLICY_DIRECTION_EXPORT, filterpath(peer, path), options)
		if p == nil {
			filtered = append(filtered, path)
			continue
		}
		if !peer.isRouteServerClient() {
			p = p.Clone(p.IsWithdraw)
			p.UpdatePathAttrs(peer.fsm.gConf, peer.fsm.pConf)
		}
		pathList = append(pathList, p)
	}
	if peer.isGracefulRestartEnabled() {
		for _, family := range rfList {
			pathList = append(pathList, table.NewEOR(family))
		}
	}
	return pathList, filtered
}

func (peer *Peer) processOutgoingPaths(paths, withdrawals []*table.Path) []*table.Path {
	if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
		return nil
	}
	if peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
		log.WithFields(log.Fields{
			"Topic": "Peer",
			"Key":   peer.fsm.pConf.Config.NeighborAddress,
		}).Debug("now syncing, suppress sending updates")
		return nil
	}

	outgoing := make([]*table.Path, 0, len(paths))
	// Note: multiple paths having the same prefix could exist the
	// withdrawals list in the case of Route Server setup with
	// import policies modifying paths. In such case, gobgp sends
	// duplicated update messages; withdraw messages for the same
	// prefix.
	// However, currently we don't support local path for Route
	// Server setup so this is NOT the case.
	for _, path := range withdrawals {
		if path.IsLocal() {
			if _, ok := peer.fsm.rfMap[path.GetRouteFamily()]; ok {
				outgoing = append(outgoing, path)
			}
		}
	}

	options := &table.PolicyOptions{
		Neighbor: peer.fsm.peerInfo.Address,
	}
	for _, path := range paths {
		path = peer.policy.ApplyPolicy(peer.TableID(), table.POLICY_DIRECTION_EXPORT, filterpath(peer, path), options)
		if path == nil {
			continue
		}
		if !peer.isRouteServerClient() {
			path = path.Clone(path.IsWithdraw)
			path.UpdatePathAttrs(peer.fsm.gConf, peer.fsm.pConf)
		}
		outgoing = append(outgoing, path)
	}

	peer.adjRibOut.Update(outgoing)
	return outgoing
}

func (peer *Peer) handleRouteRefresh(e *FsmMsg) []*table.Path {
	m := e.MsgData.(*bgp.BGPMessage)
	rr := m.Body.(*bgp.BGPRouteRefresh)
	rf := bgp.AfiSafiToRouteFamily(rr.AFI, rr.SAFI)
	if _, ok := peer.fsm.rfMap[rf]; !ok {
		log.WithFields(log.Fields{
			"Topic": "Peer",
			"Key":   peer.ID(),
			"Data":  rf,
		}).Warn("Route family isn't supported")
		return nil
	}
	if _, ok := peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; !ok {
		log.WithFields(log.Fields{
			"Topic": "Peer",
			"Key":   peer.ID(),
		}).Warn("ROUTE_REFRESH received but the capability wasn't advertised")
		return nil
	}
	rfList := []bgp.RouteFamily{rf}
	peer.adjRibOut.Drop(rfList)
	accepted, filtered := peer.getBestFromLocal(rfList)
	peer.adjRibOut.Update(accepted)
	for _, path := range filtered {
		path.IsWithdraw = true
		accepted = append(accepted, path)
	}
	return accepted
}

func (peer *Peer) doPrefixLimit(k bgp.RouteFamily, c *config.PrefixLimitConfig) *bgp.BGPMessage {
	if maxPrefixes := int(c.MaxPrefixes); maxPrefixes > 0 {
		count := peer.adjRibIn.Count([]bgp.RouteFamily{k})
		pct := int(c.ShutdownThresholdPct)
		if pct > 0 && !peer.prefixLimitWarned[k] && count > (maxPrefixes*pct/100) {
			peer.prefixLimitWarned[k] = true
			log.WithFields(log.Fields{
				"Topic":         "Peer",
				"Key":           peer.ID(),
				"AddressFamily": k.String(),
			}).Warnf("prefix limit %d%% reached", pct)
		}
		if count > maxPrefixes {
			log.WithFields(log.Fields{
				"Topic":         "Peer",
				"Key":           peer.ID(),
				"AddressFamily": k.String(),
			}).Warnf("prefix limit reached")
			return bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_MAXIMUM_NUMBER_OF_PREFIXES_REACHED, nil)
		}
	}
	return nil

}

func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) ([]*SenderMsg, error) {
	x := peer.fsm.pConf.AfiSafis
	y := c
	if len(x) != len(y) {
		return nil, fmt.Errorf("changing supported afi-safi is not allowed")
	}
	m := make(map[bgp.RouteFamily]config.PrefixLimitConfig)
	for _, e := range x {
		k, err := bgp.GetRouteFamily(string(e.Config.AfiSafiName))
		if err != nil {
			return nil, err
		}
		m[k] = e.PrefixLimit.Config
	}
	msgs := make([]*SenderMsg, 0, len(y))
	for _, e := range y {
		k, err := bgp.GetRouteFamily(string(e.Config.AfiSafiName))
		if err != nil {
			return nil, err
		}
		if p, ok := m[k]; !ok {
			return nil, fmt.Errorf("changing supported afi-safi is not allowed")
		} else if !p.Equal(&e.PrefixLimit.Config) {
			log.WithFields(log.Fields{
				"Topic":                   "Peer",
				"Key":                     peer.ID(),
				"AddressFamily":           e.Config.AfiSafiName,
				"OldMaxPrefixes":          p.MaxPrefixes,
				"NewMaxPrefixes":          e.PrefixLimit.Config.MaxPrefixes,
				"OldShutdownThresholdPct": p.ShutdownThresholdPct,
				"NewShutdownThresholdPct": e.PrefixLimit.Config.ShutdownThresholdPct,
			}).Warnf("update prefix limit configuration")
			peer.prefixLimitWarned[k] = false
			if msg := peer.doPrefixLimit(k, &e.PrefixLimit.Config); msg != nil {
				msgs = append(msgs, newSenderMsg(peer, nil, msg, true))
			}
		}
	}
	peer.fsm.pConf.AfiSafis = c
	return msgs, nil
}

func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bgp.BGPMessage) {
	m := e.MsgData.(*bgp.BGPMessage)
	log.WithFields(log.Fields{
		"Topic": "Peer",
		"Key":   peer.fsm.pConf.Config.NeighborAddress,
		"data":  m,
	}).Debug("received")
	peer.fsm.pConf.Timers.State.UpdateRecvTime = time.Now().Unix()
	if len(e.PathList) > 0 {
		peer.adjRibIn.Update(e.PathList)
		for _, family := range peer.fsm.pConf.AfiSafis {
			k, _ := bgp.GetRouteFamily(string(family.Config.AfiSafiName))
			if msg := peer.doPrefixLimit(k, &family.PrefixLimit.Config); msg != nil {
				return nil, nil, msg
			}
		}
		paths := make([]*table.Path, 0, len(e.PathList))
		eor := []bgp.RouteFamily{}
		for _, path := range e.PathList {
			if path.IsEOR() {
				family := path.GetRouteFamily()
				log.WithFields(log.Fields{
					"Topic":         "Peer",
					"Key":           peer.ID(),
					"AddressFamily": family,
				}).Debug("EOR received")
				eor = append(eor, family)
				continue
			}
			if path.Filtered(peer.ID()) != table.POLICY_DIRECTION_IN {
				paths = append(paths, path)
			}
		}
		return paths, eor, nil
	}
	return nil, nil, nil
}

func (peer *Peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *FsmMsg) {
	peer.fsm.h = NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing)
}

func (peer *Peer) StaleAll(rfList []bgp.RouteFamily) {
	peer.adjRibIn.StaleAll(rfList)
}

func (peer *Peer) PassConn(conn *net.TCPConn) {
	select {
	case peer.fsm.connCh <- conn:
	default:
		conn.Close()
		log.WithFields(log.Fields{
			"Topic": "Peer",
			"Key":   peer.ID(),
		}).Warn("accepted conn is closed to avoid be blocked")
	}
}

func (peer *Peer) MarshalJSON() ([]byte, error) {
	return json.Marshal(peer.ToApiStruct())
}

func (peer *Peer) ToApiStruct() *api.Peer {

	f := peer.fsm
	c := f.pConf

	remoteCap := make([][]byte, 0, len(peer.fsm.capMap))
	for _, c := range peer.fsm.capMap {
		for _, m := range c {
			buf, _ := m.Serialize()
			remoteCap = append(remoteCap, buf)
		}
	}

	caps := capabilitiesFromConfig(peer.fsm.pConf)
	localCap := make([][]byte, 0, len(caps))
	for _, c := range caps {
		buf, _ := c.Serialize()
		localCap = append(localCap, buf)
	}

	prefixLimits := make([]*api.PrefixLimit, 0, len(peer.fsm.pConf.AfiSafis))
	for _, family := range peer.fsm.pConf.AfiSafis {
		if c := family.PrefixLimit.Config; c.MaxPrefixes > 0 {
			k, _ := bgp.GetRouteFamily(string(family.Config.AfiSafiName))
			prefixLimits = append(prefixLimits, &api.PrefixLimit{
				Family:               uint32(k),
				MaxPrefixes:          c.MaxPrefixes,
				ShutdownThresholdPct: uint32(c.ShutdownThresholdPct),
			})
		}
	}

	conf := &api.PeerConf{
		NeighborAddress:  c.Config.NeighborAddress,
		Id:               peer.fsm.peerInfo.ID.To4().String(),
		PeerAs:           c.Config.PeerAs,
		LocalAs:          c.Config.LocalAs,
		PeerType:         uint32(c.Config.PeerType.ToInt()),
		AuthPassword:     c.Config.AuthPassword,
		RemovePrivateAs:  uint32(c.Config.RemovePrivateAs.ToInt()),
		RouteFlapDamping: c.Config.RouteFlapDamping,
		SendCommunity:    uint32(c.Config.SendCommunity.ToInt()),
		Description:      c.Config.Description,
		PeerGroup:        c.Config.PeerGroup,
		RemoteCap:        remoteCap,
		LocalCap:         localCap,
		PrefixLimits:     prefixLimits,
	}

	timer := c.Timers
	s := c.State

	advertised := uint32(0)
	received := uint32(0)
	accepted := uint32(0)
	if f.state == bgp.BGP_FSM_ESTABLISHED {
		rfList := peer.configuredRFlist()
		advertised = uint32(peer.adjRibOut.Count(rfList))
		received = uint32(peer.adjRibIn.Count(rfList))
		accepted = uint32(peer.adjRibIn.Accepted(rfList))
	}

	uptime := int64(0)
	if timer.State.Uptime != 0 {
		uptime = timer.State.Uptime
	}
	downtime := int64(0)
	if timer.State.Downtime != 0 {
		downtime = timer.State.Downtime
	}

	timerconf := &api.TimersConfig{
		ConnectRetry:      uint64(timer.Config.ConnectRetry),
		HoldTime:          uint64(timer.Config.HoldTime),
		KeepaliveInterval: uint64(timer.Config.KeepaliveInterval),
	}

	timerstate := &api.TimersState{
		KeepaliveInterval:  uint64(timer.State.KeepaliveInterval),
		NegotiatedHoldTime: uint64(timer.State.NegotiatedHoldTime),
		Uptime:             uint64(uptime),
		Downtime:           uint64(downtime),
	}

	apitimer := &api.Timers{
		Config: timerconf,
		State:  timerstate,
	}
	msgrcv := &api.Message{
		NOTIFICATION: s.Messages.Received.Notification,
		UPDATE:       s.Messages.Received.Update,
		OPEN:         s.Messages.Received.Open,
		KEEPALIVE:    s.Messages.Received.Keepalive,
		REFRESH:      s.Messages.Received.Refresh,
		DISCARDED:    s.Messages.Received.Discarded,
		TOTAL:        s.Messages.Received.Total,
	}
	msgsnt := &api.Message{
		NOTIFICATION: s.Messages.Sent.Notification,
		UPDATE:       s.Messages.Sent.Update,
		OPEN:         s.Messages.Sent.Open,
		KEEPALIVE:    s.Messages.Sent.Keepalive,
		REFRESH:      s.Messages.Sent.Refresh,
		DISCARDED:    s.Messages.Sent.Discarded,
		TOTAL:        s.Messages.Sent.Total,
	}
	msg := &api.Messages{
		Received: msgrcv,
		Sent:     msgsnt,
	}
	info := &api.PeerState{
		BgpState:   f.state.String(),
		AdminState: f.adminState.String(),
		Messages:   msg,
		Received:   received,
		Accepted:   accepted,
		Advertised: advertised,
	}

	return &api.Peer{
		Conf:   conf,
		Info:   info,
		Timers: apitimer,
	}
}

func (peer *Peer) DropAll(rfList []bgp.RouteFamily) {
	peer.adjRibIn.Drop(rfList)
	peer.adjRibOut.Drop(rfList)
}