summaryrefslogtreecommitdiffhomepage
path: root/server/server.go
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-22 17:27:29 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-22 17:27:29 +0900
commit5aec36b646e2a3c01434828c0f0cc6f3e8566578 (patch)
treeab954b6214259f2c427e8e002515b606feff7961 /server/server.go
parent43dc07d72353fc8bcb79a18a5739ea0a90dda6bb (diff)
move gRPC-related code for REQ_MONITOR_RIB and REQ_MONITOR_NEIGHBOR_PEER_STATE to grpc_server.go
Add new Watch API. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r--server/server.go209
1 files changed, 176 insertions, 33 deletions
diff --git a/server/server.go b/server/server.go
index 54702226..e6d78112 100644
--- a/server/server.go
+++ b/server/server.go
@@ -105,6 +105,7 @@ type BgpServer struct {
roaManager *roaManager
shutdown bool
watchers *watcherManager
+ watcherMap map[watchType][]*Watcher
}
func NewBgpServer() *BgpServer {
@@ -116,6 +117,7 @@ func NewBgpServer() *BgpServer {
roaManager: roaManager,
watchers: newWatcherManager(),
mgmtCh: make(chan func(), 1),
+ watcherMap: make(map[watchType][]*Watcher),
}
}
@@ -133,9 +135,6 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
}
func (server *BgpServer) Serve() {
- w, _ := newGrpcWatcher()
- server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w)
-
server.listeners = make([]*TCPListener, 0, 2)
server.fsmincomingCh = channels.NewInfiniteChannel()
server.fsmStateCh = make(chan *FsmMsg, 4096)
@@ -367,6 +366,9 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
if !peer.isRouteServerClient() {
server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath})
+ for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] {
+ w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath})
+ }
}
for _, targetPeer := range server.neighborMap {
@@ -383,27 +385,30 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
newState := peer.fsm.state
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
+ _, rport := peer.fsm.RemoteHostPort()
+ laddr, lport := peer.fsm.LocalHostPort()
+ sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
+ recvOpen := peer.fsm.recvOpen
+ ev := &watcherEventStateChangedMsg{
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(laddr),
+ peerPort: rport,
+ localPort: lport,
+ peerID: peer.fsm.peerInfo.ID,
+ sentOpen: sentOpen,
+ recvOpen: recvOpen,
+ state: newState,
+ adminState: peer.fsm.adminState,
+ timestamp: time.Now(),
+ }
if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) {
- _, rport := peer.fsm.RemoteHostPort()
- laddr, lport := peer.fsm.LocalHostPort()
- sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
- recvOpen := peer.fsm.recvOpen
- ev := &watcherEventStateChangedMsg{
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(laddr),
- peerPort: rport,
- localPort: lport,
- peerID: peer.fsm.peerInfo.ID,
- sentOpen: sentOpen,
- recvOpen: recvOpen,
- state: newState,
- adminState: peer.fsm.adminState,
- timestamp: time.Now(),
- }
server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev)
}
+ for _, w := range server.watcherMap[WATCH_TYPE_PEER_STATE] {
+ w.notify(ev)
+ }
}
}
@@ -515,6 +520,10 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []*
return alteredPathList
}
server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi})
+ for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] {
+ w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi})
+ }
+
}
for _, targetPeer := range server.neighborMap {
@@ -657,7 +666,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
sendFsmOutgoingMsg(peer, nil, notification, true)
return
}
- if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) {
+ if m.Header.Type == bgp.BGP_MSG_UPDATE && (server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_PRE_UPDATE]) > 0) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &watcherEventUpdateMsg{
@@ -674,12 +683,15 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
pathList: pathList,
}
server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev)
+ for _, w := range server.watcherMap[WATCH_TYPE_PRE_UPDATE] {
+ w.notify(ev)
+ }
}
if len(pathList) > 0 {
var altered []*table.Path
altered = server.propagateUpdate(peer, pathList)
- if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) {
+ if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_POST_UPDATE]) > 0 {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &watcherEventUpdateMsg{
@@ -697,6 +709,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
payload, _ := u.Serialize()
ev.payload = payload
server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev)
+ for _, w := range server.watcherMap[WATCH_TYPE_POST_UPDATE] {
+ w.notify(ev)
+ }
}
}
}
@@ -1780,16 +1795,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
}
grpcReq.ResponseCh <- result
close(grpcReq.ResponseCh)
- case REQ_MONITOR_RIB, REQ_MONITOR_NEIGHBOR_PEER_STATE:
- if grpcReq.Name != "" {
- if _, err = server.checkNeighborRequest(grpcReq); err != nil {
- break
- }
- }
- w, y := server.watchers.watcher(WATCHER_GRPC_MONITOR)
- if y {
- go w.(*grpcWatcher).addRequest(grpcReq)
- }
case REQ_ENABLE_MRT:
server.handleEnableMrtRequest(grpcReq)
case REQ_DISABLE_MRT:
@@ -2787,3 +2792,141 @@ func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) {
done(grpcReq, &api.SoftResetRpkiResponse{}, server.roaManager.SoftReset(arg.Address))
}
}
+
+type watchType string
+
+const (
+ WATCH_TYPE_BESTPATH watchType = "bestpath"
+ WATCH_TYPE_PRE_UPDATE watchType = "preupdate"
+ WATCH_TYPE_POST_UPDATE watchType = "postupdate"
+ WATCH_TYPE_PEER_STATE watchType = "peerstate"
+)
+
+type watchOptions struct {
+ bestpath bool
+ preUpdate bool
+ postUpdate bool
+ peerState bool
+}
+
+type WatchOption func(*watchOptions)
+
+func WatchBestPath() WatchOption {
+ return func(o *watchOptions) {
+ o.bestpath = true
+ }
+}
+
+func WatchUpdate() WatchOption {
+ return func(o *watchOptions) {
+ o.preUpdate = true
+ }
+}
+
+func WatchPostUpdate() WatchOption {
+ return func(o *watchOptions) {
+ o.postUpdate = true
+ }
+}
+
+func WatchPeerState() WatchOption {
+ return func(o *watchOptions) {
+ o.peerState = true
+ }
+}
+
+type Watcher struct {
+ opts watchOptions
+ realCh chan watcherEvent
+ ch *channels.InfiniteChannel
+ s *BgpServer
+}
+
+func (w *Watcher) Event() <-chan watcherEvent {
+ return w.realCh
+}
+
+func (w *Watcher) notify(v watcherEvent) {
+ w.ch.In() <- v
+}
+
+func (w *Watcher) loop() {
+ for {
+ select {
+ case ev, ok := <-w.ch.Out():
+ if !ok {
+ close(w.realCh)
+ return
+ }
+ w.realCh <- ev.(watcherEvent)
+ }
+ }
+}
+
+func (w *Watcher) Stop() {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ w.s.mgmtCh <- func() {
+ defer close(ch)
+
+ for k, l := range w.s.watcherMap {
+ for i, v := range l {
+ if w == v {
+ w.s.watcherMap[k] = append(l[:i], l[i+1:]...)
+ break
+ }
+ }
+ }
+
+ w.ch.Close()
+ // make sure the loop function finishes
+ func() {
+ for {
+ select {
+ case <-w.realCh:
+ default:
+ return
+ }
+ }
+ }()
+ }
+}
+
+func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ s.mgmtCh <- func() {
+ defer close(ch)
+
+ w = &Watcher{
+ s: s,
+ realCh: make(chan watcherEvent, 8),
+ ch: channels.NewInfiniteChannel(),
+ }
+
+ for _, opt := range opts {
+ opt(&w.opts)
+ }
+
+ register := func(t watchType, w *Watcher) {
+ s.watcherMap[t] = append(s.watcherMap[t], w)
+ }
+
+ if w.opts.bestpath {
+ register(WATCH_TYPE_BESTPATH, w)
+ }
+ if w.opts.preUpdate {
+ register(WATCH_TYPE_PRE_UPDATE, w)
+ }
+ if w.opts.postUpdate {
+ register(WATCH_TYPE_POST_UPDATE, w)
+ }
+ if w.opts.peerState {
+ register(WATCH_TYPE_PEER_STATE, w)
+ }
+ go w.loop()
+ }
+ return w
+}