// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
	"fmt"
	"net"
	"sync"
	"time"

	log "github.com/Sirupsen/logrus"
	"github.com/eapache/channels"
	"github.com/osrg/gobgp/packet/bgp"
	"github.com/osrg/gobgp/table"
	"gopkg.in/tomb.v2"
)

type watcherType uint8

const (
	_           watcherType = iota
	WATCHER_MRT             // UPDATE MSG
	WATCHER_BMP
	WATCHER_ZEBRA
	WATCHER_COLLECTOR
	WATCHER_GRPC_MONITOR
)

type watcherEventType uint8

const (
	_ watcherEventType = iota
	WATCHER_EVENT_UPDATE_MSG
	WATCHER_EVENT_STATE_CHANGE
	WATCHER_EVENT_BESTPATH_CHANGE
	WATCHER_EVENT_POST_POLICY_UPDATE_MSG
	WATCHER_EVENT_ADJ_IN
)

type watcherEvent interface {
}

type watcherEventUpdateMsg struct {
	message      *bgp.BGPMessage
	peerAS       uint32
	localAS      uint32
	peerAddress  net.IP
	localAddress net.IP
	peerID       net.IP
	fourBytesAs  bool
	timestamp    time.Time
	payload      []byte
	postPolicy   bool
	pathList     []*table.Path
}

type watcherEventStateChangedMsg struct {
	peerAS       uint32
	localAS      uint32
	peerAddress  net.IP
	localAddress net.IP
	peerPort     uint16
	localPort    uint16
	peerID       net.IP
	sentOpen     *bgp.BGPMessage
	recvOpen     *bgp.BGPMessage
	state        bgp.FSMState
	adminState   AdminState
	timestamp    time.Time
}

type watcherEventAdjInMsg struct {
	pathList []*table.Path
}

type watcherEventBestPathMsg struct {
	pathList      []*table.Path
	multiPathList [][]*table.Path
}

type watcher interface {
	notify(watcherEventType) chan watcherEvent
	stop()
	watchingEventTypes() []watcherEventType
}

type watcherMsg struct {
	typ watcherEventType
	ev  watcherEvent
}

type watcherManager struct {
	t  tomb.Tomb
	mu sync.RWMutex
	m  map[watcherType]watcher
	ch *channels.InfiniteChannel
}

func (m *watcherManager) watching(typ watcherEventType) bool {
	for _, w := range m.m {
		for _, ev := range w.watchingEventTypes() {
			if ev == typ {
				return true
			}
		}
	}
	return false
}

// this will be called from server's main goroutine.
// shouldn't block.
func (m *watcherManager) notify(typ watcherEventType, ev watcherEvent) {
	m.ch.In() <- &watcherMsg{typ, ev}
}

func (m *watcherManager) loop() error {
	for {
		select {
		case i, ok := <-m.ch.Out():
			if !ok {
				continue
			}
			msg := i.(*watcherMsg)
			m.mu.RLock()
			for _, w := range m.m {
				if ch := w.notify(msg.typ); ch != nil {
					t := time.NewTimer(time.Second)
					select {
					case ch <- msg.ev:
					case <-t.C:
						log.WithFields(log.Fields{
							"Topic": "Watcher",
						}).Warnf("notification to %s timeout expired")
					}
				}
			}
			m.mu.RUnlock()
		}
	}
}

func (m *watcherManager) watcher(typ watcherType) (watcher, bool) {
	m.mu.RLock()
	defer m.mu.RUnlock()
	w, y := m.m[typ]
	return w, y
}

func (m *watcherManager) addWatcher(typ watcherType, w watcher) error {
	m.mu.Lock()
	defer m.mu.Unlock()
	if _, y := m.m[typ]; y {
		return fmt.Errorf("already exists %s watcher", typ)
	}
	m.m[typ] = w
	return nil
}

func (m *watcherManager) delWatcher(typ watcherType) error {
	m.mu.Lock()
	defer m.mu.Unlock()
	if _, y := m.m[typ]; !y {
		return fmt.Errorf("not found %s watcher", typ)
	}
	w := m.m[typ]
	w.stop()
	delete(m.m, typ)
	return nil
}

func newWatcherManager() *watcherManager {
	m := &watcherManager{
		m:  make(map[watcherType]watcher),
		ch: channels.NewInfiniteChannel(),
	}
	m.t.Go(m.loop)
	return m
}