diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-26 10:23:51 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-26 10:23:51 +0900 |
commit | ebac86e07ac40d19037ca100c42bac7ba94aae12 (patch) | |
tree | c46d8b94dfcd98b99bb9e20fc28279c33fd6664d /server/server.go | |
parent | 511f487dd1dcdf836d15a231293189aaf0dbf528 (diff) |
export Watch feature
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r-- | server/server.go | 254 |
1 files changed, 127 insertions, 127 deletions
diff --git a/server/server.go b/server/server.go index 37ea1f8d..dc8ab200 100644 --- a/server/server.go +++ b/server/server.go @@ -103,7 +103,7 @@ type BgpServer struct { globalRib *table.TableManager roaManager *roaManager shutdown bool - watcherMap map[watchType][]*Watcher + watcherMap map[WatchEventType][]*Watcher zclient *zebraClient bmpManager *bmpClientManager mrt *mrtWriter @@ -117,7 +117,7 @@ func NewBgpServer() *BgpServer { policy: table.NewRoutingPolicy(), roaManager: roaManager, mgmtCh: make(chan func(), 1), - watcherMap: make(map[watchType][]*Watcher), + watcherMap: make(map[WatchEventType][]*Watcher), } s.bmpManager = newBmpClientManager(s) return s @@ -381,7 +381,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil for i, pathList := range multipath { clonedMpath[i] = clonePathList(pathList) } - server.notifyWatcher(WATCH_TYPE_BESTPATH, &watcherEventBestPathMsg{pathList: clonePathList(best[table.GLOBAL_RIB_NAME]), multiPathList: clonedMpath}) + server.notifyWatcher(WATCH_EVENT_TYPE_BEST_PATH, &WatchEventBestPath{PathList: clonePathList(best[table.GLOBAL_RIB_NAME]), MultiPathList: clonedMpath}) } for _, targetPeer := range server.neighborMap { @@ -395,31 +395,31 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil } } -func createWatcherEventStateChange(peer *Peer) *watcherEventStateChangedMsg { +func createWatchEventPeerState(peer *Peer) *WatchEventPeerState { _, rport := peer.fsm.RemoteHostPort() laddr, lport := peer.fsm.LocalHostPort() sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) recvOpen := peer.fsm.recvOpen - return &watcherEventStateChangedMsg{ - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(laddr), - peerPort: rport, - localPort: lport, - peerID: peer.fsm.peerInfo.ID, - sentOpen: sentOpen, - recvOpen: recvOpen, - state: peer.fsm.state, - adminState: peer.fsm.adminState, - timestamp: time.Now(), + return &WatchEventPeerState{ + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(laddr), + PeerPort: rport, + LocalPort: lport, + PeerID: peer.fsm.peerInfo.ID, + SentOpen: sentOpen, + RecvOpen: recvOpen, + State: peer.fsm.state, + AdminState: peer.fsm.adminState, + Timestamp: time.Now(), } } func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { newState := peer.fsm.state if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { - server.notifyWatcher(WATCH_TYPE_PEER_STATE, createWatcherEventStateChange(peer)) + server.notifyWatcher(WATCH_EVENT_TYPE_PEER_STATE, createWatchEventPeerState(peer)) } } @@ -534,7 +534,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []* for i, pathList := range multipath { clonedMpath[i] = clonePathList(pathList) } - server.notifyWatcher(WATCH_TYPE_BESTPATH, &watcherEventBestPathMsg{pathList: clonePathList(best[table.GLOBAL_RIB_NAME]), multiPathList: clonedMpath}) + server.notifyWatcher(WATCH_EVENT_TYPE_BEST_PATH, &WatchEventBestPath{PathList: clonePathList(best[table.GLOBAL_RIB_NAME]), MultiPathList: clonedMpath}) } @@ -678,46 +678,46 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { sendFsmOutgoingMsg(peer, nil, notification, true) return } - if m.Header.Type == bgp.BGP_MSG_UPDATE && server.isWatched(WATCH_TYPE_PRE_UPDATE) { + if m.Header.Type == bgp.BGP_MSG_UPDATE && server.isWatched(WATCH_EVENT_TYPE_PRE_UPDATE) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - ev := &watcherEventUpdateMsg{ - message: m, - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(l), - peerID: peer.fsm.peerInfo.ID, - fourBytesAs: y, - timestamp: e.timestamp, - payload: e.payload, - postPolicy: false, - pathList: clonePathList(pathList), + ev := &WatchEventUpdate{ + Message: m, + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Timestamp: e.timestamp, + Payload: e.payload, + PostPolicy: false, + PathList: clonePathList(pathList), } - server.notifyWatcher(WATCH_TYPE_PRE_UPDATE, ev) + server.notifyWatcher(WATCH_EVENT_TYPE_PRE_UPDATE, ev) } if len(pathList) > 0 { var altered []*table.Path altered = server.propagateUpdate(peer, pathList) - if server.isWatched(WATCH_TYPE_POST_UPDATE) { + if server.isWatched(WATCH_EVENT_TYPE_POST_UPDATE) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - ev := &watcherEventUpdateMsg{ - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(l), - peerID: peer.fsm.peerInfo.ID, - fourBytesAs: y, - timestamp: e.timestamp, - postPolicy: true, - pathList: clonePathList(altered), + ev := &WatchEventUpdate{ + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Timestamp: e.timestamp, + PostPolicy: true, + PathList: clonePathList(altered), } for _, u := range table.CreateUpdateMsgFromPaths(altered) { payload, _ := u.Serialize() - ev.payload = payload - server.notifyWatcher(WATCH_TYPE_POST_UPDATE, ev) + ev.Payload = payload + server.notifyWatcher(WATCH_EVENT_TYPE_POST_UPDATE, ev) } } } @@ -2653,15 +2653,56 @@ func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) { } } -type watchType string +type WatchEventType string const ( - WATCH_TYPE_BESTPATH watchType = "bestpath" - WATCH_TYPE_PRE_UPDATE watchType = "preupdate" - WATCH_TYPE_POST_UPDATE watchType = "postupdate" - WATCH_TYPE_PEER_STATE watchType = "peerstate" + WATCH_EVENT_TYPE_BEST_PATH WatchEventType = "bestpath" + WATCH_EVENT_TYPE_PRE_UPDATE WatchEventType = "preupdate" + WATCH_EVENT_TYPE_POST_UPDATE WatchEventType = "postupdate" + WATCH_EVENT_TYPE_PEER_STATE WatchEventType = "peerstate" ) +type WatchEvent interface { +} + +type WatchEventUpdate struct { + Message *bgp.BGPMessage + PeerAS uint32 + LocalAS uint32 + PeerAddress net.IP + LocalAddress net.IP + PeerID net.IP + FourBytesAs bool + Timestamp time.Time + Payload []byte + PostPolicy bool + PathList []*table.Path +} + +type WatchEventPeerState struct { + PeerAS uint32 + LocalAS uint32 + PeerAddress net.IP + LocalAddress net.IP + PeerPort uint16 + LocalPort uint16 + PeerID net.IP + SentOpen *bgp.BGPMessage + RecvOpen *bgp.BGPMessage + State bgp.FSMState + AdminState AdminState + Timestamp time.Time +} + +type WatchEventAdjIn struct { + PathList []*table.Path +} + +type WatchEventBestPath struct { + PathList []*table.Path + MultiPathList [][]*table.Path +} + type watchOptions struct { bestpath bool preUpdate bool @@ -2709,16 +2750,16 @@ func WatchPeerState(current bool) WatchOption { type Watcher struct { opts watchOptions - realCh chan watcherEvent + realCh chan WatchEvent ch *channels.InfiniteChannel s *BgpServer } -func (w *Watcher) Event() <-chan watcherEvent { +func (w *Watcher) Event() <-chan WatchEvent { return w.realCh } -func (w *Watcher) Generate(t watchType) (err error) { +func (w *Watcher) Generate(t WatchEventType) (err error) { ch := make(chan struct{}) defer func() { <-ch }() @@ -2726,7 +2767,7 @@ func (w *Watcher) Generate(t watchType) (err error) { defer close(ch) switch t { - case WATCH_TYPE_PRE_UPDATE: + case WATCH_EVENT_TYPE_PRE_UPDATE: default: err = fmt.Errorf("unsupported type ", t) return @@ -2735,12 +2776,12 @@ func (w *Watcher) Generate(t watchType) (err error) { for _, peer := range w.s.neighborMap { pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...) } - w.notify(&watcherEventAdjInMsg{pathList: clonePathList(pathList)}) + w.notify(&WatchEventAdjIn{PathList: clonePathList(pathList)}) } return err } -func (w *Watcher) notify(v watcherEvent) { +func (w *Watcher) notify(v WatchEvent) { w.ch.In() <- v } @@ -2752,7 +2793,7 @@ func (w *Watcher) loop() { close(w.realCh) return } - w.realCh <- ev.(watcherEvent) + w.realCh <- ev.(WatchEvent) } } } @@ -2785,11 +2826,11 @@ func (w *Watcher) Stop() { } } -func (s *BgpServer) isWatched(typ watchType) bool { +func (s *BgpServer) isWatched(typ WatchEventType) bool { return len(s.watcherMap[typ]) != 0 } -func (s *BgpServer) notifyWatcher(typ watchType, ev watcherEvent) { +func (s *BgpServer) notifyWatcher(typ WatchEventType, ev WatchEvent) { for _, w := range s.watcherMap[typ] { w.notify(ev) } @@ -2804,7 +2845,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { w = &Watcher{ s: s, - realCh: make(chan watcherEvent, 8), + realCh: make(chan WatchEvent, 8), ch: channels.NewInfiniteChannel(), } @@ -2812,28 +2853,28 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { opt(&w.opts) } - register := func(t watchType, w *Watcher) { + register := func(t WatchEventType, w *Watcher) { s.watcherMap[t] = append(s.watcherMap[t], w) } if w.opts.bestpath { - register(WATCH_TYPE_BESTPATH, w) + register(WATCH_EVENT_TYPE_BEST_PATH, w) } if w.opts.preUpdate { - register(WATCH_TYPE_PRE_UPDATE, w) + register(WATCH_EVENT_TYPE_PRE_UPDATE, w) } if w.opts.postUpdate { - register(WATCH_TYPE_POST_UPDATE, w) + register(WATCH_EVENT_TYPE_POST_UPDATE, w) } if w.opts.peerState { - register(WATCH_TYPE_PEER_STATE, w) + register(WATCH_EVENT_TYPE_PEER_STATE, w) } if w.opts.initPeerState { for _, peer := range s.neighborMap { if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { continue } - w.notify(createWatcherEventStateChange(peer)) + w.notify(createWatchEventPeerState(peer)) } } if w.opts.initUpdate { @@ -2846,17 +2887,17 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { buf, _ := msgs[0].Serialize() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - w.notify(&watcherEventUpdateMsg{ - message: msgs[0], - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(l), - peerID: peer.fsm.peerInfo.ID, - fourBytesAs: y, - timestamp: path.GetTimestamp(), - payload: buf, - postPolicy: false, + w.notify(&WatchEventUpdate{ + Message: msgs[0], + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Timestamp: path.GetTimestamp(), + Payload: buf, + PostPolicy: false, }) } } @@ -2865,14 +2906,14 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { for _, path := range s.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, s.globalRib.GetRFlist()) { msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) buf, _ := msgs[0].Serialize() - w.notify(&watcherEventUpdateMsg{ - peerAS: path.GetSource().AS, - peerAddress: path.GetSource().Address, - peerID: path.GetSource().ID, - message: msgs[0], - timestamp: path.GetTimestamp(), - payload: buf, - postPolicy: true, + w.notify(&WatchEventUpdate{ + PeerAS: path.GetSource().AS, + PeerAddress: path.GetSource().Address, + PeerID: path.GetSource().ID, + Message: msgs[0], + Timestamp: path.GetTimestamp(), + Payload: buf, + PostPolicy: true, }) } @@ -2882,44 +2923,3 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } return w } - -type watcherEvent interface { -} - -type watcherEventUpdateMsg struct { - message *bgp.BGPMessage - peerAS uint32 - localAS uint32 - peerAddress net.IP - localAddress net.IP - peerID net.IP - fourBytesAs bool - timestamp time.Time - payload []byte - postPolicy bool - pathList []*table.Path -} - -type watcherEventStateChangedMsg struct { - peerAS uint32 - localAS uint32 - peerAddress net.IP - localAddress net.IP - peerPort uint16 - localPort uint16 - peerID net.IP - sentOpen *bgp.BGPMessage - recvOpen *bgp.BGPMessage - state bgp.FSMState - adminState AdminState - timestamp time.Time -} - -type watcherEventAdjInMsg struct { - pathList []*table.Path -} - -type watcherEventBestPathMsg struct { - pathList []*table.Path - multiPathList [][]*table.Path -} |