diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-10-29 18:59:09 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-11-07 20:19:23 +0900 |
commit | df8ad76b5ca5316ae2a9ca88c5aa6af5f2dc9b4e (patch) | |
tree | f811d7694ca0cf98613e6e09684356ad98322774 /pkg | |
parent | 96c129e5d0cc91a2b291527898e70093545e54b6 (diff) |
server: unexported Watcher stuff
Replace it with the new API using api/.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/server/bmp.go | 24 | ||||
-rw-r--r-- | pkg/server/grpc_server.go | 16 | ||||
-rw-r--r-- | pkg/server/mrt.go | 24 | ||||
-rw-r--r-- | pkg/server/server.go | 132 | ||||
-rw-r--r-- | pkg/server/server_test.go | 22 | ||||
-rw-r--r-- | pkg/server/zclient.go | 10 |
6 files changed, 114 insertions, 114 deletions
diff --git a/pkg/server/bmp.go b/pkg/server/bmp.go index b9821b58..a7d088bd 100644 --- a/pkg/server/bmp.go +++ b/pkg/server/bmp.go @@ -113,25 +113,25 @@ func (b *bmpClient) loop() { } if func() bool { - ops := []WatchOption{WatchPeerState(true)} + ops := []watchOption{watchPeerState(true)} if b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_BOTH { log.WithFields( log.Fields{"Topic": "bmp"}, ).Warn("both option for route-monitoring-policy is obsoleted") } if b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY || b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_ALL { - ops = append(ops, WatchUpdate(true)) + ops = append(ops, watchUpdate(true)) } if b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY || b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_ALL { - ops = append(ops, WatchPostUpdate(true)) + ops = append(ops, watchPostUpdate(true)) } if b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_LOCAL_RIB || b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_ALL { - ops = append(ops, WatchBestPath(true)) + ops = append(ops, watchBestPath(true)) } if b.c.RouteMirroringEnabled { - ops = append(ops, WatchMessage(false)) + ops = append(ops, watchMessage(false)) } - w := b.s.Watch(ops...) + w := b.s.watch(ops...) defer w.Stop() var tickerCh <-chan time.Time @@ -160,7 +160,7 @@ func (b *bmpClient) loop() { select { case ev := <-w.Event(): switch msg := ev.(type) { - case *WatchEventUpdate: + case *watchEventUpdate: info := &table.PeerInfo{ Address: msg.PeerAddress, AS: msg.PeerAS, @@ -190,7 +190,7 @@ func (b *bmpClient) loop() { return false } } - case *WatchEventBestPath: + case *watchEventBestPath: info := &table.PeerInfo{ Address: net.ParseIP("0.0.0.0").To4(), AS: b.s.bgpConfig.Global.Config.As, @@ -204,7 +204,7 @@ func (b *bmpClient) loop() { return false } } - case *WatchEventPeerState: + case *watchEventPeerState: if msg.State == bgp.BGP_FSM_ESTABLISHED { if err := write(bmpPeerUp(msg, bmp.BMP_PEER_TYPE_GLOBAL, false, 0)); err != nil { return false @@ -214,7 +214,7 @@ func (b *bmpClient) loop() { return false } } - case *WatchEventMessage: + case *watchEventMessage: info := &table.PeerInfo{ Address: msg.PeerAddress, AS: msg.PeerAS, @@ -259,7 +259,7 @@ type bmpClient struct { ribout ribout } -func bmpPeerUp(ev *WatchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BMPMessage { +func bmpPeerUp(ev *watchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BMPMessage { var flags uint8 = 0 if policy { flags |= bmp.BMP_PEER_FLAG_POST_POLICY @@ -268,7 +268,7 @@ func bmpPeerUp(ev *WatchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BM return bmp.NewBMPPeerUpNotification(*ph, ev.LocalAddress.String(), ev.LocalPort, ev.PeerPort, ev.SentOpen, ev.RecvOpen) } -func bmpPeerDown(ev *WatchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BMPMessage { +func bmpPeerDown(ev *watchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BMPMessage { var flags uint8 = 0 if policy { flags |= bmp.BMP_PEER_FLAG_POST_POLICY diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index ccd3b613..3b6cf7a9 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -174,15 +174,15 @@ func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_ if arg == nil { return fmt.Errorf("invalid request") } - w, err := func() (*Watcher, error) { + w, err := func() (*watcher, error) { switch arg.Type { case api.Resource_GLOBAL: - return s.bgpServer.Watch(WatchBestPath(arg.Current)), nil + return s.bgpServer.watch(watchBestPath(arg.Current)), nil case api.Resource_ADJ_IN: if arg.PostPolicy { - return s.bgpServer.Watch(WatchPostUpdate(arg.Current)), nil + return s.bgpServer.watch(watchPostUpdate(arg.Current)), nil } - return s.bgpServer.Watch(WatchUpdate(arg.Current)), nil + return s.bgpServer.watch(watchUpdate(arg.Current)), nil default: return nil, fmt.Errorf("unsupported resource type: %v", arg.Type) } @@ -209,7 +209,7 @@ func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_ for ev := range w.Event() { switch msg := ev.(type) { - case *WatchEventBestPath: + case *watchEventBestPath: if err := sendPath(func() []*table.Path { if len(msg.MultiPathList) > 0 { l := make([]*table.Path, 0) @@ -223,7 +223,7 @@ func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_ }()); err != nil { return err } - case *WatchEventUpdate: + case *watchEventUpdate: if err := sendPath(msg.PathList); err != nil { return err } @@ -238,12 +238,12 @@ func (s *Server) MonitorPeer(arg *api.MonitorPeerRequest, stream api.GobgpApi_Mo return fmt.Errorf("invalid request") } return func() error { - w := s.bgpServer.Watch(WatchPeerState(arg.Current)) + w := s.bgpServer.watch(watchPeerState(arg.Current)) defer func() { w.Stop() }() for ev := range w.Event() { switch msg := ev.(type) { - case *WatchEventPeerState: + case *watchEventPeerState: if len(arg.Address) > 0 && arg.Address != msg.PeerAddress.String() && arg.Address != msg.PeerInterface { continue } diff --git a/pkg/server/mrt.go b/pkg/server/mrt.go index ddcb931b..fae9b5cc 100644 --- a/pkg/server/mrt.go +++ b/pkg/server/mrt.go @@ -48,16 +48,16 @@ func (m *mrtWriter) Stop() { } func (m *mrtWriter) loop() error { - ops := []WatchOption{} + ops := []watchOption{} switch m.c.DumpType { case config.MRT_TYPE_UPDATES: - ops = append(ops, WatchUpdate(false)) + ops = append(ops, watchUpdate(false)) case config.MRT_TYPE_TABLE: if len(m.c.TableName) > 0 { - ops = append(ops, WatchTableName(m.c.TableName)) + ops = append(ops, watchTableName(m.c.TableName)) } } - w := m.s.Watch(ops...) + w := m.s.watch(ops...) rotator := func() *time.Ticker { if m.rotationInterval == 0 { return &time.Ticker{} @@ -85,10 +85,10 @@ func (m *mrtWriter) loop() error { }() for { - serialize := func(ev WatchEvent) []*mrt.MRTMessage { + serialize := func(ev watchEvent) []*mrt.MRTMessage { msg := make([]*mrt.MRTMessage, 0, 1) switch e := ev.(type) { - case *WatchEventUpdate: + case *watchEventUpdate: if e.Init { return nil } @@ -113,7 +113,7 @@ func (m *mrtWriter) loop() error { } else { msg = append(msg, bm) } - case *WatchEventTable: + case *watchEventTable: t := uint32(time.Now().Unix()) peers := make([]*mrt.Peer, 1, len(e.Neighbor)+1) @@ -125,7 +125,7 @@ func (m *mrtWriter) loop() error { neighborMap[pconf.State.NeighborAddress] = pconf } - if bm, err := mrt.NewMRTMessage(t, mrt.TABLE_DUMPv2, mrt.PEER_INDEX_TABLE, mrt.NewPeerIndexTable(e.RouterId, "", peers)); err != nil { + if bm, err := mrt.NewMRTMessage(t, mrt.TABLE_DUMPv2, mrt.PEER_INDEX_TABLE, mrt.NewPeerIndexTable(e.RouterID, "", peers)); err != nil { log.WithFields(log.Fields{ "Topic": "mrt", "Data": e, @@ -205,8 +205,8 @@ func (m *mrtWriter) loop() error { return msg } - drain := func(ev WatchEvent) { - events := make([]WatchEvent, 0, 1+len(w.Event())) + drain := func(ev watchEvent) { + events := make([]watchEvent, 0, 1+len(w.Event())) if ev != nil { events = append(events, ev) } @@ -274,10 +274,10 @@ func (m *mrtWriter) loop() error { if m.c.DumpType == config.MRT_TYPE_UPDATES { rotate() } else { - w.Generate(WATCH_EVENT_TYPE_TABLE) + w.Generate(watchEventTypeTable) } case <-dump.C: - w.Generate(WATCH_EVENT_TYPE_TABLE) + w.Generate(watchEventTypeTable) } } } diff --git a/pkg/server/server.go b/pkg/server/server.go index db6a9bb6..f5dfbeb6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -111,7 +111,7 @@ type BgpServer struct { rsRib *table.TableManager roaManager *roaManager shutdownWG *sync.WaitGroup - watcherMap map[WatchEventType][]*Watcher + watcherMap map[watchEventType][]*watcher zclient *zebraClient bmpManager *bmpClientManager mrtManager *mrtManager @@ -126,7 +126,7 @@ func NewBgpServer() *BgpServer { policy: table.NewRoutingPolicy(), roaManager: roaManager, mgmtCh: make(chan *mgmtOp, 1), - watcherMap: make(map[WatchEventType][]*Watcher), + watcherMap: make(map[watchEventType][]*watcher), uuidMap: make(map[uuid.UUID]string), } s.bmpManager = newBmpClientManager(s) @@ -627,11 +627,11 @@ func (server *BgpServer) notifyBestWatcher(best []*table.Path, multipath [][]*ta } } } - w := &WatchEventBestPath{PathList: clonedB, MultiPathList: clonedM} + w := &watchEventBestPath{PathList: clonedB, MultiPathList: clonedM} if len(m) > 0 { w.Vrf = m } - server.notifyWatcher(WATCH_EVENT_TYPE_BEST_PATH, w) + server.notifyWatcher(watchEventTypeBestPath, w) } func (s *BgpServer) toConfig(peer *peer, getAdvertised bool) *config.Neighbor { @@ -694,7 +694,7 @@ func (s *BgpServer) toConfig(peer *peer, getAdvertised bool) *config.Neighbor { } func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *peer, pathList []*table.Path, msg *bgp.BGPMessage, timestamp time.Time, payload []byte) { - if !server.isWatched(WATCH_EVENT_TYPE_PRE_UPDATE) || peer == nil { + if !server.isWatched(watchEventTypePreUpdate) || peer == nil { return } @@ -705,7 +705,7 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *peer, pathList []*ta peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - ev := &WatchEventUpdate{ + ev := &watchEventUpdate{ Message: msg, PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, @@ -720,11 +720,11 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *peer, pathList []*ta Neighbor: server.toConfig(peer, false), } peer.fsm.lock.RUnlock() - server.notifyWatcher(WATCH_EVENT_TYPE_PRE_UPDATE, ev) + server.notifyWatcher(watchEventTypePreUpdate, ev) } func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *peer, pathList []*table.Path) { - if !server.isWatched(WATCH_EVENT_TYPE_POST_UPDATE) || peer == nil { + if !server.isWatched(watchEventTypePostUpdate) || peer == nil { return } @@ -735,7 +735,7 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *peer, pathList []*t peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - ev := &WatchEventUpdate{ + ev := &watchEventUpdate{ PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, PeerAddress: peer.fsm.peerInfo.Address, @@ -748,16 +748,16 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *peer, pathList []*t Neighbor: server.toConfig(peer, false), } peer.fsm.lock.RUnlock() - server.notifyWatcher(WATCH_EVENT_TYPE_POST_UPDATE, ev) + server.notifyWatcher(watchEventTypePostUpdate, ev) } -func newWatchEventPeerState(peer *peer, m *fsmMsg) *WatchEventPeerState { +func newWatchEventPeerState(peer *peer, m *fsmMsg) *watchEventPeerState { _, rport := peer.fsm.RemoteHostPort() laddr, lport := peer.fsm.LocalHostPort() sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) peer.fsm.lock.RLock() recvOpen := peer.fsm.recvOpen - e := &WatchEventPeerState{ + e := &watchEventPeerState{ PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, PeerAddress: peer.fsm.peerInfo.Address, @@ -785,7 +785,7 @@ func (server *BgpServer) broadcastPeerState(peer *peer, oldState bgp.FSMState, e newState := peer.fsm.state peer.fsm.lock.RUnlock() if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { - server.notifyWatcher(WATCH_EVENT_TYPE_PEER_STATE, newWatchEventPeerState(peer, e)) + server.notifyWatcher(watchEventTypePeerState, newWatchEventPeerState(peer, e)) } } @@ -794,7 +794,7 @@ func (server *BgpServer) notifyMessageWatcher(peer *peer, timestamp time.Time, m peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - ev := &WatchEventMessage{ + ev := &watchEventMessage{ Message: msg, PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, @@ -807,12 +807,12 @@ func (server *BgpServer) notifyMessageWatcher(peer *peer, timestamp time.Time, m } peer.fsm.lock.RUnlock() if !isSent { - server.notifyWatcher(WATCH_EVENT_TYPE_RECV_MSG, ev) + server.notifyWatcher(watchEventTypeRecvMsg, ev) } } func (server *BgpServer) notifyRecvMessageWatcher(peer *peer, timestamp time.Time, msg *bgp.BGPMessage) { - if peer == nil || !server.isWatched(WATCH_EVENT_TYPE_RECV_MSG) { + if peer == nil || !server.isWatched(watchEventTypeRecvMsg) { return } server.notifyMessageWatcher(peer, timestamp, msg, false) @@ -3340,21 +3340,21 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro }, false) } -type WatchEventType string +type watchEventType string const ( - WATCH_EVENT_TYPE_BEST_PATH WatchEventType = "bestpath" - WATCH_EVENT_TYPE_PRE_UPDATE WatchEventType = "preupdate" - WATCH_EVENT_TYPE_POST_UPDATE WatchEventType = "postupdate" - WATCH_EVENT_TYPE_PEER_STATE WatchEventType = "peerstate" - WATCH_EVENT_TYPE_TABLE WatchEventType = "table" - WATCH_EVENT_TYPE_RECV_MSG WatchEventType = "receivedmessage" + watchEventTypeBestPath watchEventType = "bestpath" + watchEventTypePreUpdate watchEventType = "preupdate" + watchEventTypePostUpdate watchEventType = "postupdate" + watchEventTypePeerState watchEventType = "peerstate" + watchEventTypeTable watchEventType = "table" + watchEventTypeRecvMsg watchEventType = "receivedmessage" ) -type WatchEvent interface { +type watchEvent interface { } -type WatchEventUpdate struct { +type watchEventUpdate struct { Message *bgp.BGPMessage PeerAS uint32 LocalAS uint32 @@ -3370,7 +3370,7 @@ type WatchEventUpdate struct { Neighbor *config.Neighbor } -type WatchEventPeerState struct { +type watchEventPeerState struct { PeerAS uint32 LocalAS uint32 PeerAddress net.IP @@ -3387,23 +3387,23 @@ type WatchEventPeerState struct { PeerInterface string } -type WatchEventAdjIn struct { +type watchEventAdjIn struct { PathList []*table.Path } -type WatchEventTable struct { - RouterId string +type watchEventTable struct { + RouterID string PathList map[string][]*table.Path Neighbor []*config.Neighbor } -type WatchEventBestPath struct { +type watchEventBestPath struct { PathList []*table.Path MultiPathList [][]*table.Path Vrf map[string]uint32 } -type WatchEventMessage struct { +type watchEventMessage struct { Message *bgp.BGPMessage PeerAS uint32 LocalAS uint32 @@ -3428,9 +3428,9 @@ type watchOptions struct { recvMessage bool } -type WatchOption func(*watchOptions) +type watchOption func(*watchOptions) -func WatchBestPath(current bool) WatchOption { +func watchBestPath(current bool) watchOption { return func(o *watchOptions) { o.bestpath = true if current { @@ -3439,7 +3439,7 @@ func WatchBestPath(current bool) WatchOption { } } -func WatchUpdate(current bool) WatchOption { +func watchUpdate(current bool) watchOption { return func(o *watchOptions) { o.preUpdate = true if current { @@ -3448,7 +3448,7 @@ func WatchUpdate(current bool) WatchOption { } } -func WatchPostUpdate(current bool) WatchOption { +func watchPostUpdate(current bool) watchOption { return func(o *watchOptions) { o.postUpdate = true if current { @@ -3457,7 +3457,7 @@ func WatchPostUpdate(current bool) WatchOption { } } -func WatchPeerState(current bool) WatchOption { +func watchPeerState(current bool) watchOption { return func(o *watchOptions) { o.peerState = true if current { @@ -3466,13 +3466,13 @@ func WatchPeerState(current bool) WatchOption { } } -func WatchTableName(name string) WatchOption { +func watchTableName(name string) watchOption { return func(o *watchOptions) { o.tableName = name } } -func WatchMessage(isSent bool) WatchOption { +func watchMessage(isSent bool) watchOption { return func(o *watchOptions) { if isSent { log.WithFields(log.Fields{ @@ -3485,27 +3485,27 @@ func WatchMessage(isSent bool) WatchOption { } } -type Watcher struct { +type watcher struct { opts watchOptions - realCh chan WatchEvent + realCh chan watchEvent ch *channels.InfiniteChannel s *BgpServer } -func (w *Watcher) Event() <-chan WatchEvent { +func (w *watcher) Event() <-chan watchEvent { return w.realCh } -func (w *Watcher) Generate(t WatchEventType) error { +func (w *watcher) Generate(t watchEventType) error { return w.s.mgmtOperation(func() error { switch t { - case WATCH_EVENT_TYPE_PRE_UPDATE: + case watchEventTypePreUpdate: pathList := make([]*table.Path, 0) for _, peer := range w.s.neighborMap { pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...) } - w.notify(&WatchEventAdjIn{PathList: clonePathList(pathList)}) - case WATCH_EVENT_TYPE_TABLE: + w.notify(&watchEventAdjIn{PathList: clonePathList(pathList)}) + case watchEventTypeTable: rib := w.s.globalRib as := uint32(0) id := table.GLOBAL_RIB_NAME @@ -3537,7 +3537,7 @@ func (w *Watcher) Generate(t WatchEventType) error { for _, peer := range w.s.neighborMap { l = append(l, w.s.toConfig(peer, false)) } - w.notify(&WatchEventTable{PathList: pathList, Neighbor: l}) + w.notify(&watchEventTable{PathList: pathList, Neighbor: l}) default: return fmt.Errorf("unsupported type %v", t) } @@ -3545,18 +3545,18 @@ func (w *Watcher) Generate(t WatchEventType) error { }, false) } -func (w *Watcher) notify(v WatchEvent) { +func (w *watcher) notify(v watchEvent) { w.ch.In() <- v } -func (w *Watcher) loop() { +func (w *watcher) loop() { for ev := range w.ch.Out() { - w.realCh <- ev.(WatchEvent) + w.realCh <- ev.(watchEvent) } close(w.realCh) } -func (w *Watcher) Stop() { +func (w *watcher) Stop() { w.s.mgmtOperation(func() error { for k, l := range w.s.watcherMap { for i, v := range l { @@ -3576,21 +3576,21 @@ func (w *Watcher) Stop() { }, false) } -func (s *BgpServer) isWatched(typ WatchEventType) bool { +func (s *BgpServer) isWatched(typ watchEventType) bool { return len(s.watcherMap[typ]) != 0 } -func (s *BgpServer) notifyWatcher(typ WatchEventType, ev WatchEvent) { +func (s *BgpServer) notifyWatcher(typ watchEventType, ev watchEvent) { for _, w := range s.watcherMap[typ] { w.notify(ev) } } -func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { +func (s *BgpServer) watch(opts ...watchOption) (w *watcher) { s.mgmtOperation(func() error { - w = &Watcher{ + w = &watcher{ s: s, - realCh: make(chan WatchEvent, 8), + realCh: make(chan watchEvent, 8), ch: channels.NewInfiniteChannel(), } @@ -3598,21 +3598,21 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { opt(&w.opts) } - register := func(t WatchEventType, w *Watcher) { + register := func(t watchEventType, w *watcher) { s.watcherMap[t] = append(s.watcherMap[t], w) } if w.opts.bestpath { - register(WATCH_EVENT_TYPE_BEST_PATH, w) + register(watchEventTypeBestPath, w) } if w.opts.preUpdate { - register(WATCH_EVENT_TYPE_PRE_UPDATE, w) + register(watchEventTypePreUpdate, w) } if w.opts.postUpdate { - register(WATCH_EVENT_TYPE_POST_UPDATE, w) + register(watchEventTypePostUpdate, w) } if w.opts.peerState { - register(WATCH_EVENT_TYPE_PEER_STATE, w) + register(watchEventTypePeerState, w) } if w.opts.initPeerState { for _, peer := range s.neighborMap { @@ -3626,7 +3626,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } } if w.opts.initBest && s.active() == nil { - w.notify(&WatchEventBestPath{ + w.notify(&watchEventBestPath{ PathList: s.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, 0, nil), MultiPathList: s.globalRib.GetBestMultiPathList(table.GLOBAL_RIB_NAME, nil), }) @@ -3644,7 +3644,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - update := &WatchEventUpdate{ + update := &watchEventUpdate{ PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, PeerAddress: peer.fsm.peerInfo.Address, @@ -3662,7 +3662,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { eor := bgp.NewEndOfRib(rf) eorBuf, _ := eor.Serialize() peer.fsm.lock.RLock() - update = &WatchEventUpdate{ + update = &watchEventUpdate{ Message: eor, PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, @@ -3697,7 +3697,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { configNeighbor = w.s.toConfig(peer, false) } - w.notify(&WatchEventUpdate{ + w.notify(&watchEventUpdate{ PeerAS: peerInfo.AS, PeerAddress: peerInfo.Address, PeerID: peerInfo.ID, @@ -3709,7 +3709,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { eor := bgp.NewEndOfRib(rf) eorBuf, _ := eor.Serialize() - w.notify(&WatchEventUpdate{ + w.notify(&watchEventUpdate{ Message: eor, PeerAS: peerInfo.AS, PeerAddress: peerInfo.Address, @@ -3724,7 +3724,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } } if w.opts.recvMessage { - register(WATCH_EVENT_TYPE_RECV_MSG, w) + register(watchEventTypeRecvMsg, w) } go w.loop() diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 164247f1..27f56353 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -229,7 +229,7 @@ func TestMonitor(test *testing.T) { } // Test WatchBestPath. - w := s.Watch(WatchBestPath(false)) + w := s.watch(watchBestPath(false)) // Advertises a route. attrs := []bgp.PathAttributeInterface{ @@ -240,7 +240,7 @@ func TestMonitor(test *testing.T) { log.Fatal(err) } ev := <-w.Event() - b := ev.(*WatchEventBestPath) + b := ev.(*watchEventBestPath) assert.Equal(1, len(b.PathList)) assert.Equal("10.0.0.0/24", b.PathList[0].GetNlri().String()) assert.False(b.PathList[0].IsWithdraw) @@ -251,7 +251,7 @@ func TestMonitor(test *testing.T) { log.Fatal(err) } ev = <-w.Event() - b = ev.(*WatchEventBestPath) + b = ev.(*watchEventBestPath) assert.Equal(1, len(b.PathList)) assert.Equal("10.0.0.0/24", b.PathList[0].GetNlri().String()) assert.True(b.PathList[0].IsWithdraw) @@ -276,16 +276,16 @@ func TestMonitor(test *testing.T) { } // Test WatchUpdate with "current" flag. - w = s.Watch(WatchUpdate(true)) + w = s.watch(watchUpdate(true)) // Test the initial route. ev = <-w.Event() - u := ev.(*WatchEventUpdate) + u := ev.(*watchEventUpdate) assert.Equal(1, len(u.PathList)) assert.Equal("10.1.0.0/24", u.PathList[0].GetNlri().String()) assert.False(u.PathList[0].IsWithdraw) ev = <-w.Event() - u = ev.(*WatchEventUpdate) + u = ev.(*watchEventUpdate) assert.Equal(len(u.PathList), 0) // End of RIB // Advertises an additional route. @@ -293,7 +293,7 @@ func TestMonitor(test *testing.T) { log.Fatal(err) } ev = <-w.Event() - u = ev.(*WatchEventUpdate) + u = ev.(*watchEventUpdate) assert.Equal(1, len(u.PathList)) assert.Equal("10.2.0.0/24", u.PathList[0].GetNlri().String()) assert.False(u.PathList[0].IsWithdraw) @@ -304,7 +304,7 @@ func TestMonitor(test *testing.T) { log.Fatal(err) } ev = <-w.Event() - u = ev.(*WatchEventUpdate) + u = ev.(*watchEventUpdate) assert.Equal(1, len(u.PathList)) assert.Equal("10.2.0.0/24", u.PathList[0].GetNlri().String()) assert.True(u.PathList[0].IsWithdraw) @@ -952,7 +952,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) { if err = peerServers(t, ctx, []*BgpServer{s1, s2}, []config.AfiSafiType{config.AFI_SAFI_TYPE_L3VPN_IPV4_UNICAST, config.AFI_SAFI_TYPE_RTC}); err != nil { t.Fatal(err) } - watcher := s1.Watch(WatchUpdate(true)) + watcher := s1.watch(watchUpdate(true)) // Add route to vrf1 on s2 attrs := []bgp.PathAttributeInterface{ @@ -976,7 +976,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) { select { case ev := <-watcher.Event(): switch msg := ev.(type) { - case *WatchEventUpdate: + case *watchEventUpdate: for _, path := range msg.PathList { log.Infof("tester received path: %s", path.String()) if vpnPath, ok := path.GetNlri().(*bgp.LabeledVPNIPAddrPrefix); ok { @@ -1019,7 +1019,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) { select { case ev := <-watcher.Event(): switch msg := ev.(type) { - case *WatchEventUpdate: + case *watchEventUpdate: for _, path := range msg.PathList { log.Infof("tester received path: %s", path.String()) if vpnPath, ok := path.GetNlri().(*bgp.LabeledVPNIPAddrPrefix); ok { diff --git a/pkg/server/zclient.go b/pkg/server/zclient.go index c4abd12b..33190591 100644 --- a/pkg/server/zclient.go +++ b/pkg/server/zclient.go @@ -331,9 +331,9 @@ func (z *zebraClient) updatePathByNexthopCache(paths []*table.Path) { } func (z *zebraClient) loop() { - w := z.server.Watch([]WatchOption{ - WatchBestPath(true), - WatchPostUpdate(true), + w := z.server.watch([]watchOption{ + watchBestPath(true), + watchPostUpdate(true), }...) defer w.Stop() @@ -368,7 +368,7 @@ func (z *zebraClient) loop() { } case ev := <-w.Event(): switch msg := ev.(type) { - case *WatchEventBestPath: + case *watchEventBestPath: if table.UseMultiplePaths.Enabled { for _, paths := range msg.MultiPathList { z.updatePathByNexthopCache(paths) @@ -398,7 +398,7 @@ func (z *zebraClient) loop() { } } } - case *WatchEventUpdate: + case *watchEventUpdate: if body := newNexthopRegisterBody(msg.PathList, z.nexthopCache); body != nil { vrfID := uint32(0) for _, vrf := range z.server.listVrf() { |