diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 17:27:29 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 17:27:29 +0900 |
commit | 5aec36b646e2a3c01434828c0f0cc6f3e8566578 (patch) | |
tree | ab954b6214259f2c427e8e002515b606feff7961 /server/server.go | |
parent | 43dc07d72353fc8bcb79a18a5739ea0a90dda6bb (diff) |
move gRPC-related code for REQ_MONITOR_RIB and REQ_MONITOR_NEIGHBOR_PEER_STATE to grpc_server.go
Add new Watch API.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r-- | server/server.go | 209 |
1 files changed, 176 insertions, 33 deletions
diff --git a/server/server.go b/server/server.go index 54702226..e6d78112 100644 --- a/server/server.go +++ b/server/server.go @@ -105,6 +105,7 @@ type BgpServer struct { roaManager *roaManager shutdown bool watchers *watcherManager + watcherMap map[watchType][]*Watcher } func NewBgpServer() *BgpServer { @@ -116,6 +117,7 @@ func NewBgpServer() *BgpServer { roaManager: roaManager, watchers: newWatcherManager(), mgmtCh: make(chan func(), 1), + watcherMap: make(map[watchType][]*Watcher), } } @@ -133,9 +135,6 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener { } func (server *BgpServer) Serve() { - w, _ := newGrpcWatcher() - server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w) - server.listeners = make([]*TCPListener, 0, 2) server.fsmincomingCh = channels.NewInfiniteChannel() server.fsmStateCh = make(chan *FsmMsg, 4096) @@ -367,6 +366,9 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil if !peer.isRouteServerClient() { server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath}) + for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] { + w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath}) + } } for _, targetPeer := range server.neighborMap { @@ -383,27 +385,30 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { newState := peer.fsm.state if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { + _, rport := peer.fsm.RemoteHostPort() + laddr, lport := peer.fsm.LocalHostPort() + sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) + recvOpen := peer.fsm.recvOpen + ev := &watcherEventStateChangedMsg{ + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(laddr), + peerPort: rport, + localPort: lport, + peerID: peer.fsm.peerInfo.ID, + sentOpen: sentOpen, + recvOpen: recvOpen, + state: newState, + adminState: peer.fsm.adminState, + timestamp: time.Now(), + } if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) { - _, rport := peer.fsm.RemoteHostPort() - laddr, lport := peer.fsm.LocalHostPort() - sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) - recvOpen := peer.fsm.recvOpen - ev := &watcherEventStateChangedMsg{ - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(laddr), - peerPort: rport, - localPort: lport, - peerID: peer.fsm.peerInfo.ID, - sentOpen: sentOpen, - recvOpen: recvOpen, - state: newState, - adminState: peer.fsm.adminState, - timestamp: time.Now(), - } server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev) } + for _, w := range server.watcherMap[WATCH_TYPE_PEER_STATE] { + w.notify(ev) + } } } @@ -515,6 +520,10 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []* return alteredPathList } server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi}) + for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] { + w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi}) + } + } for _, targetPeer := range server.neighborMap { @@ -657,7 +666,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { sendFsmOutgoingMsg(peer, nil, notification, true) return } - if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) { + if m.Header.Type == bgp.BGP_MSG_UPDATE && (server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_PRE_UPDATE]) > 0) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &watcherEventUpdateMsg{ @@ -674,12 +683,15 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { pathList: pathList, } server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev) + for _, w := range server.watcherMap[WATCH_TYPE_PRE_UPDATE] { + w.notify(ev) + } } if len(pathList) > 0 { var altered []*table.Path altered = server.propagateUpdate(peer, pathList) - if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) { + if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_POST_UPDATE]) > 0 { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &watcherEventUpdateMsg{ @@ -697,6 +709,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { payload, _ := u.Serialize() ev.payload = payload server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev) + for _, w := range server.watcherMap[WATCH_TYPE_POST_UPDATE] { + w.notify(ev) + } } } } @@ -1780,16 +1795,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { } grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) - case REQ_MONITOR_RIB, REQ_MONITOR_NEIGHBOR_PEER_STATE: - if grpcReq.Name != "" { - if _, err = server.checkNeighborRequest(grpcReq); err != nil { - break - } - } - w, y := server.watchers.watcher(WATCHER_GRPC_MONITOR) - if y { - go w.(*grpcWatcher).addRequest(grpcReq) - } case REQ_ENABLE_MRT: server.handleEnableMrtRequest(grpcReq) case REQ_DISABLE_MRT: @@ -2787,3 +2792,141 @@ func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) { done(grpcReq, &api.SoftResetRpkiResponse{}, server.roaManager.SoftReset(arg.Address)) } } + +type watchType string + +const ( + WATCH_TYPE_BESTPATH watchType = "bestpath" + WATCH_TYPE_PRE_UPDATE watchType = "preupdate" + WATCH_TYPE_POST_UPDATE watchType = "postupdate" + WATCH_TYPE_PEER_STATE watchType = "peerstate" +) + +type watchOptions struct { + bestpath bool + preUpdate bool + postUpdate bool + peerState bool +} + +type WatchOption func(*watchOptions) + +func WatchBestPath() WatchOption { + return func(o *watchOptions) { + o.bestpath = true + } +} + +func WatchUpdate() WatchOption { + return func(o *watchOptions) { + o.preUpdate = true + } +} + +func WatchPostUpdate() WatchOption { + return func(o *watchOptions) { + o.postUpdate = true + } +} + +func WatchPeerState() WatchOption { + return func(o *watchOptions) { + o.peerState = true + } +} + +type Watcher struct { + opts watchOptions + realCh chan watcherEvent + ch *channels.InfiniteChannel + s *BgpServer +} + +func (w *Watcher) Event() <-chan watcherEvent { + return w.realCh +} + +func (w *Watcher) notify(v watcherEvent) { + w.ch.In() <- v +} + +func (w *Watcher) loop() { + for { + select { + case ev, ok := <-w.ch.Out(): + if !ok { + close(w.realCh) + return + } + w.realCh <- ev.(watcherEvent) + } + } +} + +func (w *Watcher) Stop() { + ch := make(chan struct{}) + defer func() { <-ch }() + + w.s.mgmtCh <- func() { + defer close(ch) + + for k, l := range w.s.watcherMap { + for i, v := range l { + if w == v { + w.s.watcherMap[k] = append(l[:i], l[i+1:]...) + break + } + } + } + + w.ch.Close() + // make sure the loop function finishes + func() { + for { + select { + case <-w.realCh: + default: + return + } + } + }() + } +} + +func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + w = &Watcher{ + s: s, + realCh: make(chan watcherEvent, 8), + ch: channels.NewInfiniteChannel(), + } + + for _, opt := range opts { + opt(&w.opts) + } + + register := func(t watchType, w *Watcher) { + s.watcherMap[t] = append(s.watcherMap[t], w) + } + + if w.opts.bestpath { + register(WATCH_TYPE_BESTPATH, w) + } + if w.opts.preUpdate { + register(WATCH_TYPE_PRE_UPDATE, w) + } + if w.opts.postUpdate { + register(WATCH_TYPE_POST_UPDATE, w) + } + if w.opts.peerState { + register(WATCH_TYPE_PEER_STATE, w) + } + go w.loop() + } + return w +} |