summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/grpc_server.go1
-rw-r--r--server/server.go63
-rw-r--r--server/watcher.go191
3 files changed, 43 insertions, 212 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go
index 86d1267b..f8c411ad 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -96,7 +96,6 @@ const (
REQ_DEFERRAL_TIMER_EXPIRED
REQ_RELOAD_POLICY
REQ_INITIALIZE_ZEBRA
- REQ_WATCHER_ADJ_RIB_IN // FIXME
)
type Server struct {
diff --git a/server/server.go b/server/server.go
index 7e3943fe..74aa73b8 100644
--- a/server/server.go
+++ b/server/server.go
@@ -103,7 +103,6 @@ type BgpServer struct {
globalRib *table.TableManager
roaManager *roaManager
shutdown bool
- watchers *watcherManager
watcherMap map[watchType][]*Watcher
zclient *zebraClient
bmpManager *bmpClientManager
@@ -117,7 +116,6 @@ func NewBgpServer() *BgpServer {
neighborMap: make(map[string]*Peer),
policy: table.NewRoutingPolicy(),
roaManager: roaManager,
- watchers: newWatcherManager(),
mgmtCh: make(chan func(), 1),
watcherMap: make(map[watchType][]*Watcher),
}
@@ -369,7 +367,6 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
best, _, multipath := server.globalRib.DeletePathsByPeer(ids, peer.fsm.peerInfo, rf)
if !peer.isRouteServerClient() {
- server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath})
for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] {
w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath})
}
@@ -411,9 +408,6 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
newState := peer.fsm.state
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
ev := createWatcherEventStateChange(peer)
- if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) {
- server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev)
- }
for _, w := range server.watcherMap[WATCH_TYPE_PEER_STATE] {
w.notify(ev)
}
@@ -527,7 +521,6 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []*
if len(best[table.GLOBAL_RIB_NAME]) == 0 {
return alteredPathList
}
- server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi})
for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] {
w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi})
}
@@ -674,7 +667,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
sendFsmOutgoingMsg(peer, nil, notification, true)
return
}
- if m.Header.Type == bgp.BGP_MSG_UPDATE && (server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_PRE_UPDATE]) > 0) {
+ if m.Header.Type == bgp.BGP_MSG_UPDATE && len(server.watcherMap[WATCH_TYPE_PRE_UPDATE]) > 0 {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &watcherEventUpdateMsg{
@@ -690,7 +683,6 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
postPolicy: false,
pathList: pathList,
}
- server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev)
for _, w := range server.watcherMap[WATCH_TYPE_PRE_UPDATE] {
w.notify(ev)
}
@@ -699,7 +691,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
if len(pathList) > 0 {
var altered []*table.Path
altered = server.propagateUpdate(peer, pathList)
- if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_POST_UPDATE]) > 0 {
+ if len(server.watcherMap[WATCH_TYPE_POST_UPDATE]) > 0 {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &watcherEventUpdateMsg{
@@ -716,7 +708,6 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
for _, u := range table.CreateUpdateMsgFromPaths(altered) {
payload, _ := u.Serialize()
ev.payload = payload
- server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev)
for _, w := range server.watcherMap[WATCH_TYPE_POST_UPDATE] {
w.notify(ev)
}
@@ -1761,15 +1752,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
if len(pathList) > 0 {
server.propagateUpdate(nil, pathList)
}
- case REQ_WATCHER_ADJ_RIB_IN:
- pathList := make([]*table.Path, 0)
- for _, peer := range server.neighborMap {
- pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...)
- }
-
- grpcReq.ResponseCh <- &GrpcResponse{}
- close(grpcReq.ResponseCh)
- server.watchers.notify(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList})
default:
err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType)
goto ERROR
@@ -2883,3 +2865,44 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
}
return w
}
+
+type watcherEvent interface {
+}
+
+type watcherEventUpdateMsg struct {
+ message *bgp.BGPMessage
+ peerAS uint32
+ localAS uint32
+ peerAddress net.IP
+ localAddress net.IP
+ peerID net.IP
+ fourBytesAs bool
+ timestamp time.Time
+ payload []byte
+ postPolicy bool
+ pathList []*table.Path
+}
+
+type watcherEventStateChangedMsg struct {
+ peerAS uint32
+ localAS uint32
+ peerAddress net.IP
+ localAddress net.IP
+ peerPort uint16
+ localPort uint16
+ peerID net.IP
+ sentOpen *bgp.BGPMessage
+ recvOpen *bgp.BGPMessage
+ state bgp.FSMState
+ adminState AdminState
+ timestamp time.Time
+}
+
+type watcherEventAdjInMsg struct {
+ pathList []*table.Path
+}
+
+type watcherEventBestPathMsg struct {
+ pathList []*table.Path
+ multiPathList [][]*table.Path
+}
diff --git a/server/watcher.go b/server/watcher.go
deleted file mode 100644
index 674206d9..00000000
--- a/server/watcher.go
+++ /dev/null
@@ -1,191 +0,0 @@
-// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-// implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package server
-
-import (
- "fmt"
- "net"
- "sync"
- "time"
-
- log "github.com/Sirupsen/logrus"
- "github.com/eapache/channels"
- "github.com/osrg/gobgp/packet/bgp"
- "github.com/osrg/gobgp/table"
- "gopkg.in/tomb.v2"
-)
-
-type watcherType uint8
-
-const (
- _ watcherType = iota
- WATCHER_MRT // UPDATE MSG
- WATCHER_BMP
- WATCHER_ZEBRA
- WATCHER_COLLECTOR
- WATCHER_GRPC_MONITOR
-)
-
-type watcherEventType uint8
-
-const (
- _ watcherEventType = iota
- WATCHER_EVENT_UPDATE_MSG
- WATCHER_EVENT_STATE_CHANGE
- WATCHER_EVENT_BESTPATH_CHANGE
- WATCHER_EVENT_POST_POLICY_UPDATE_MSG
- WATCHER_EVENT_ADJ_IN
-)
-
-type watcherEvent interface {
-}
-
-type watcherEventUpdateMsg struct {
- message *bgp.BGPMessage
- peerAS uint32
- localAS uint32
- peerAddress net.IP
- localAddress net.IP
- peerID net.IP
- fourBytesAs bool
- timestamp time.Time
- payload []byte
- postPolicy bool
- pathList []*table.Path
-}
-
-type watcherEventStateChangedMsg struct {
- peerAS uint32
- localAS uint32
- peerAddress net.IP
- localAddress net.IP
- peerPort uint16
- localPort uint16
- peerID net.IP
- sentOpen *bgp.BGPMessage
- recvOpen *bgp.BGPMessage
- state bgp.FSMState
- adminState AdminState
- timestamp time.Time
-}
-
-type watcherEventAdjInMsg struct {
- pathList []*table.Path
-}
-
-type watcherEventBestPathMsg struct {
- pathList []*table.Path
- multiPathList [][]*table.Path
-}
-
-type watcher interface {
- notify(watcherEventType) chan watcherEvent
- stop()
- watchingEventTypes() []watcherEventType
-}
-
-type watcherMsg struct {
- typ watcherEventType
- ev watcherEvent
-}
-
-type watcherManager struct {
- t tomb.Tomb
- mu sync.RWMutex
- m map[watcherType]watcher
- ch *channels.InfiniteChannel
-}
-
-func (m *watcherManager) watching(typ watcherEventType) bool {
- for _, w := range m.m {
- for _, ev := range w.watchingEventTypes() {
- if ev == typ {
- return true
- }
- }
- }
- return false
-}
-
-// this will be called from server's main goroutine.
-// shouldn't block.
-func (m *watcherManager) notify(typ watcherEventType, ev watcherEvent) {
- m.ch.In() <- &watcherMsg{typ, ev}
-}
-
-func (m *watcherManager) loop() error {
- for {
- select {
- case i, ok := <-m.ch.Out():
- if !ok {
- continue
- }
- msg := i.(*watcherMsg)
- m.mu.RLock()
- for _, w := range m.m {
- if ch := w.notify(msg.typ); ch != nil {
- t := time.NewTimer(time.Second)
- select {
- case ch <- msg.ev:
- case <-t.C:
- log.WithFields(log.Fields{
- "Topic": "Watcher",
- }).Warnf("notification to %s timeout expired")
- }
- }
- }
- m.mu.RUnlock()
- }
- }
-}
-
-func (m *watcherManager) watcher(typ watcherType) (watcher, bool) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- w, y := m.m[typ]
- return w, y
-}
-
-func (m *watcherManager) addWatcher(typ watcherType, w watcher) error {
- m.mu.Lock()
- defer m.mu.Unlock()
- if _, y := m.m[typ]; y {
- return fmt.Errorf("already exists %s watcher", typ)
- }
- m.m[typ] = w
- return nil
-}
-
-func (m *watcherManager) delWatcher(typ watcherType) error {
- m.mu.Lock()
- defer m.mu.Unlock()
- if _, y := m.m[typ]; !y {
- return fmt.Errorf("not found %s watcher", typ)
- }
- w := m.m[typ]
- w.stop()
- delete(m.m, typ)
- return nil
-}
-
-func newWatcherManager() *watcherManager {
- m := &watcherManager{
- m: make(map[watcherType]watcher),
- ch: channels.NewInfiniteChannel(),
- }
- m.t.Go(m.loop)
- return m
-}