summaryrefslogtreecommitdiffhomepage
path: root/pkg/server
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-10-29 18:59:09 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-11-07 20:19:23 +0900
commitdf8ad76b5ca5316ae2a9ca88c5aa6af5f2dc9b4e (patch)
treef811d7694ca0cf98613e6e09684356ad98322774 /pkg/server
parent96c129e5d0cc91a2b291527898e70093545e54b6 (diff)
server: unexported Watcher stuff
Replace it with the new API using api/. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'pkg/server')
-rw-r--r--pkg/server/bmp.go24
-rw-r--r--pkg/server/grpc_server.go16
-rw-r--r--pkg/server/mrt.go24
-rw-r--r--pkg/server/server.go132
-rw-r--r--pkg/server/server_test.go22
-rw-r--r--pkg/server/zclient.go10
6 files changed, 114 insertions, 114 deletions
diff --git a/pkg/server/bmp.go b/pkg/server/bmp.go
index b9821b58..a7d088bd 100644
--- a/pkg/server/bmp.go
+++ b/pkg/server/bmp.go
@@ -113,25 +113,25 @@ func (b *bmpClient) loop() {
}
if func() bool {
- ops := []WatchOption{WatchPeerState(true)}
+ ops := []watchOption{watchPeerState(true)}
if b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_BOTH {
log.WithFields(
log.Fields{"Topic": "bmp"},
).Warn("both option for route-monitoring-policy is obsoleted")
}
if b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY || b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_ALL {
- ops = append(ops, WatchUpdate(true))
+ ops = append(ops, watchUpdate(true))
}
if b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY || b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_ALL {
- ops = append(ops, WatchPostUpdate(true))
+ ops = append(ops, watchPostUpdate(true))
}
if b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_LOCAL_RIB || b.c.RouteMonitoringPolicy == config.BMP_ROUTE_MONITORING_POLICY_TYPE_ALL {
- ops = append(ops, WatchBestPath(true))
+ ops = append(ops, watchBestPath(true))
}
if b.c.RouteMirroringEnabled {
- ops = append(ops, WatchMessage(false))
+ ops = append(ops, watchMessage(false))
}
- w := b.s.Watch(ops...)
+ w := b.s.watch(ops...)
defer w.Stop()
var tickerCh <-chan time.Time
@@ -160,7 +160,7 @@ func (b *bmpClient) loop() {
select {
case ev := <-w.Event():
switch msg := ev.(type) {
- case *WatchEventUpdate:
+ case *watchEventUpdate:
info := &table.PeerInfo{
Address: msg.PeerAddress,
AS: msg.PeerAS,
@@ -190,7 +190,7 @@ func (b *bmpClient) loop() {
return false
}
}
- case *WatchEventBestPath:
+ case *watchEventBestPath:
info := &table.PeerInfo{
Address: net.ParseIP("0.0.0.0").To4(),
AS: b.s.bgpConfig.Global.Config.As,
@@ -204,7 +204,7 @@ func (b *bmpClient) loop() {
return false
}
}
- case *WatchEventPeerState:
+ case *watchEventPeerState:
if msg.State == bgp.BGP_FSM_ESTABLISHED {
if err := write(bmpPeerUp(msg, bmp.BMP_PEER_TYPE_GLOBAL, false, 0)); err != nil {
return false
@@ -214,7 +214,7 @@ func (b *bmpClient) loop() {
return false
}
}
- case *WatchEventMessage:
+ case *watchEventMessage:
info := &table.PeerInfo{
Address: msg.PeerAddress,
AS: msg.PeerAS,
@@ -259,7 +259,7 @@ type bmpClient struct {
ribout ribout
}
-func bmpPeerUp(ev *WatchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BMPMessage {
+func bmpPeerUp(ev *watchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BMPMessage {
var flags uint8 = 0
if policy {
flags |= bmp.BMP_PEER_FLAG_POST_POLICY
@@ -268,7 +268,7 @@ func bmpPeerUp(ev *WatchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BM
return bmp.NewBMPPeerUpNotification(*ph, ev.LocalAddress.String(), ev.LocalPort, ev.PeerPort, ev.SentOpen, ev.RecvOpen)
}
-func bmpPeerDown(ev *WatchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BMPMessage {
+func bmpPeerDown(ev *watchEventPeerState, t uint8, policy bool, pd uint64) *bmp.BMPMessage {
var flags uint8 = 0
if policy {
flags |= bmp.BMP_PEER_FLAG_POST_POLICY
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go
index ccd3b613..3b6cf7a9 100644
--- a/pkg/server/grpc_server.go
+++ b/pkg/server/grpc_server.go
@@ -174,15 +174,15 @@ func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_
if arg == nil {
return fmt.Errorf("invalid request")
}
- w, err := func() (*Watcher, error) {
+ w, err := func() (*watcher, error) {
switch arg.Type {
case api.Resource_GLOBAL:
- return s.bgpServer.Watch(WatchBestPath(arg.Current)), nil
+ return s.bgpServer.watch(watchBestPath(arg.Current)), nil
case api.Resource_ADJ_IN:
if arg.PostPolicy {
- return s.bgpServer.Watch(WatchPostUpdate(arg.Current)), nil
+ return s.bgpServer.watch(watchPostUpdate(arg.Current)), nil
}
- return s.bgpServer.Watch(WatchUpdate(arg.Current)), nil
+ return s.bgpServer.watch(watchUpdate(arg.Current)), nil
default:
return nil, fmt.Errorf("unsupported resource type: %v", arg.Type)
}
@@ -209,7 +209,7 @@ func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_
for ev := range w.Event() {
switch msg := ev.(type) {
- case *WatchEventBestPath:
+ case *watchEventBestPath:
if err := sendPath(func() []*table.Path {
if len(msg.MultiPathList) > 0 {
l := make([]*table.Path, 0)
@@ -223,7 +223,7 @@ func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_
}()); err != nil {
return err
}
- case *WatchEventUpdate:
+ case *watchEventUpdate:
if err := sendPath(msg.PathList); err != nil {
return err
}
@@ -238,12 +238,12 @@ func (s *Server) MonitorPeer(arg *api.MonitorPeerRequest, stream api.GobgpApi_Mo
return fmt.Errorf("invalid request")
}
return func() error {
- w := s.bgpServer.Watch(WatchPeerState(arg.Current))
+ w := s.bgpServer.watch(watchPeerState(arg.Current))
defer func() { w.Stop() }()
for ev := range w.Event() {
switch msg := ev.(type) {
- case *WatchEventPeerState:
+ case *watchEventPeerState:
if len(arg.Address) > 0 && arg.Address != msg.PeerAddress.String() && arg.Address != msg.PeerInterface {
continue
}
diff --git a/pkg/server/mrt.go b/pkg/server/mrt.go
index ddcb931b..fae9b5cc 100644
--- a/pkg/server/mrt.go
+++ b/pkg/server/mrt.go
@@ -48,16 +48,16 @@ func (m *mrtWriter) Stop() {
}
func (m *mrtWriter) loop() error {
- ops := []WatchOption{}
+ ops := []watchOption{}
switch m.c.DumpType {
case config.MRT_TYPE_UPDATES:
- ops = append(ops, WatchUpdate(false))
+ ops = append(ops, watchUpdate(false))
case config.MRT_TYPE_TABLE:
if len(m.c.TableName) > 0 {
- ops = append(ops, WatchTableName(m.c.TableName))
+ ops = append(ops, watchTableName(m.c.TableName))
}
}
- w := m.s.Watch(ops...)
+ w := m.s.watch(ops...)
rotator := func() *time.Ticker {
if m.rotationInterval == 0 {
return &time.Ticker{}
@@ -85,10 +85,10 @@ func (m *mrtWriter) loop() error {
}()
for {
- serialize := func(ev WatchEvent) []*mrt.MRTMessage {
+ serialize := func(ev watchEvent) []*mrt.MRTMessage {
msg := make([]*mrt.MRTMessage, 0, 1)
switch e := ev.(type) {
- case *WatchEventUpdate:
+ case *watchEventUpdate:
if e.Init {
return nil
}
@@ -113,7 +113,7 @@ func (m *mrtWriter) loop() error {
} else {
msg = append(msg, bm)
}
- case *WatchEventTable:
+ case *watchEventTable:
t := uint32(time.Now().Unix())
peers := make([]*mrt.Peer, 1, len(e.Neighbor)+1)
@@ -125,7 +125,7 @@ func (m *mrtWriter) loop() error {
neighborMap[pconf.State.NeighborAddress] = pconf
}
- if bm, err := mrt.NewMRTMessage(t, mrt.TABLE_DUMPv2, mrt.PEER_INDEX_TABLE, mrt.NewPeerIndexTable(e.RouterId, "", peers)); err != nil {
+ if bm, err := mrt.NewMRTMessage(t, mrt.TABLE_DUMPv2, mrt.PEER_INDEX_TABLE, mrt.NewPeerIndexTable(e.RouterID, "", peers)); err != nil {
log.WithFields(log.Fields{
"Topic": "mrt",
"Data": e,
@@ -205,8 +205,8 @@ func (m *mrtWriter) loop() error {
return msg
}
- drain := func(ev WatchEvent) {
- events := make([]WatchEvent, 0, 1+len(w.Event()))
+ drain := func(ev watchEvent) {
+ events := make([]watchEvent, 0, 1+len(w.Event()))
if ev != nil {
events = append(events, ev)
}
@@ -274,10 +274,10 @@ func (m *mrtWriter) loop() error {
if m.c.DumpType == config.MRT_TYPE_UPDATES {
rotate()
} else {
- w.Generate(WATCH_EVENT_TYPE_TABLE)
+ w.Generate(watchEventTypeTable)
}
case <-dump.C:
- w.Generate(WATCH_EVENT_TYPE_TABLE)
+ w.Generate(watchEventTypeTable)
}
}
}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index db6a9bb6..f5dfbeb6 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -111,7 +111,7 @@ type BgpServer struct {
rsRib *table.TableManager
roaManager *roaManager
shutdownWG *sync.WaitGroup
- watcherMap map[WatchEventType][]*Watcher
+ watcherMap map[watchEventType][]*watcher
zclient *zebraClient
bmpManager *bmpClientManager
mrtManager *mrtManager
@@ -126,7 +126,7 @@ func NewBgpServer() *BgpServer {
policy: table.NewRoutingPolicy(),
roaManager: roaManager,
mgmtCh: make(chan *mgmtOp, 1),
- watcherMap: make(map[WatchEventType][]*Watcher),
+ watcherMap: make(map[watchEventType][]*watcher),
uuidMap: make(map[uuid.UUID]string),
}
s.bmpManager = newBmpClientManager(s)
@@ -627,11 +627,11 @@ func (server *BgpServer) notifyBestWatcher(best []*table.Path, multipath [][]*ta
}
}
}
- w := &WatchEventBestPath{PathList: clonedB, MultiPathList: clonedM}
+ w := &watchEventBestPath{PathList: clonedB, MultiPathList: clonedM}
if len(m) > 0 {
w.Vrf = m
}
- server.notifyWatcher(WATCH_EVENT_TYPE_BEST_PATH, w)
+ server.notifyWatcher(watchEventTypeBestPath, w)
}
func (s *BgpServer) toConfig(peer *peer, getAdvertised bool) *config.Neighbor {
@@ -694,7 +694,7 @@ func (s *BgpServer) toConfig(peer *peer, getAdvertised bool) *config.Neighbor {
}
func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *peer, pathList []*table.Path, msg *bgp.BGPMessage, timestamp time.Time, payload []byte) {
- if !server.isWatched(WATCH_EVENT_TYPE_PRE_UPDATE) || peer == nil {
+ if !server.isWatched(watchEventTypePreUpdate) || peer == nil {
return
}
@@ -705,7 +705,7 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *peer, pathList []*ta
peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- ev := &WatchEventUpdate{
+ ev := &watchEventUpdate{
Message: msg,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
@@ -720,11 +720,11 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *peer, pathList []*ta
Neighbor: server.toConfig(peer, false),
}
peer.fsm.lock.RUnlock()
- server.notifyWatcher(WATCH_EVENT_TYPE_PRE_UPDATE, ev)
+ server.notifyWatcher(watchEventTypePreUpdate, ev)
}
func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *peer, pathList []*table.Path) {
- if !server.isWatched(WATCH_EVENT_TYPE_POST_UPDATE) || peer == nil {
+ if !server.isWatched(watchEventTypePostUpdate) || peer == nil {
return
}
@@ -735,7 +735,7 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *peer, pathList []*t
peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- ev := &WatchEventUpdate{
+ ev := &watchEventUpdate{
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
@@ -748,16 +748,16 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *peer, pathList []*t
Neighbor: server.toConfig(peer, false),
}
peer.fsm.lock.RUnlock()
- server.notifyWatcher(WATCH_EVENT_TYPE_POST_UPDATE, ev)
+ server.notifyWatcher(watchEventTypePostUpdate, ev)
}
-func newWatchEventPeerState(peer *peer, m *fsmMsg) *WatchEventPeerState {
+func newWatchEventPeerState(peer *peer, m *fsmMsg) *watchEventPeerState {
_, rport := peer.fsm.RemoteHostPort()
laddr, lport := peer.fsm.LocalHostPort()
sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
peer.fsm.lock.RLock()
recvOpen := peer.fsm.recvOpen
- e := &WatchEventPeerState{
+ e := &watchEventPeerState{
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
@@ -785,7 +785,7 @@ func (server *BgpServer) broadcastPeerState(peer *peer, oldState bgp.FSMState, e
newState := peer.fsm.state
peer.fsm.lock.RUnlock()
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
- server.notifyWatcher(WATCH_EVENT_TYPE_PEER_STATE, newWatchEventPeerState(peer, e))
+ server.notifyWatcher(watchEventTypePeerState, newWatchEventPeerState(peer, e))
}
}
@@ -794,7 +794,7 @@ func (server *BgpServer) notifyMessageWatcher(peer *peer, timestamp time.Time, m
peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- ev := &WatchEventMessage{
+ ev := &watchEventMessage{
Message: msg,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
@@ -807,12 +807,12 @@ func (server *BgpServer) notifyMessageWatcher(peer *peer, timestamp time.Time, m
}
peer.fsm.lock.RUnlock()
if !isSent {
- server.notifyWatcher(WATCH_EVENT_TYPE_RECV_MSG, ev)
+ server.notifyWatcher(watchEventTypeRecvMsg, ev)
}
}
func (server *BgpServer) notifyRecvMessageWatcher(peer *peer, timestamp time.Time, msg *bgp.BGPMessage) {
- if peer == nil || !server.isWatched(WATCH_EVENT_TYPE_RECV_MSG) {
+ if peer == nil || !server.isWatched(watchEventTypeRecvMsg) {
return
}
server.notifyMessageWatcher(peer, timestamp, msg, false)
@@ -3340,21 +3340,21 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro
}, false)
}
-type WatchEventType string
+type watchEventType string
const (
- WATCH_EVENT_TYPE_BEST_PATH WatchEventType = "bestpath"
- WATCH_EVENT_TYPE_PRE_UPDATE WatchEventType = "preupdate"
- WATCH_EVENT_TYPE_POST_UPDATE WatchEventType = "postupdate"
- WATCH_EVENT_TYPE_PEER_STATE WatchEventType = "peerstate"
- WATCH_EVENT_TYPE_TABLE WatchEventType = "table"
- WATCH_EVENT_TYPE_RECV_MSG WatchEventType = "receivedmessage"
+ watchEventTypeBestPath watchEventType = "bestpath"
+ watchEventTypePreUpdate watchEventType = "preupdate"
+ watchEventTypePostUpdate watchEventType = "postupdate"
+ watchEventTypePeerState watchEventType = "peerstate"
+ watchEventTypeTable watchEventType = "table"
+ watchEventTypeRecvMsg watchEventType = "receivedmessage"
)
-type WatchEvent interface {
+type watchEvent interface {
}
-type WatchEventUpdate struct {
+type watchEventUpdate struct {
Message *bgp.BGPMessage
PeerAS uint32
LocalAS uint32
@@ -3370,7 +3370,7 @@ type WatchEventUpdate struct {
Neighbor *config.Neighbor
}
-type WatchEventPeerState struct {
+type watchEventPeerState struct {
PeerAS uint32
LocalAS uint32
PeerAddress net.IP
@@ -3387,23 +3387,23 @@ type WatchEventPeerState struct {
PeerInterface string
}
-type WatchEventAdjIn struct {
+type watchEventAdjIn struct {
PathList []*table.Path
}
-type WatchEventTable struct {
- RouterId string
+type watchEventTable struct {
+ RouterID string
PathList map[string][]*table.Path
Neighbor []*config.Neighbor
}
-type WatchEventBestPath struct {
+type watchEventBestPath struct {
PathList []*table.Path
MultiPathList [][]*table.Path
Vrf map[string]uint32
}
-type WatchEventMessage struct {
+type watchEventMessage struct {
Message *bgp.BGPMessage
PeerAS uint32
LocalAS uint32
@@ -3428,9 +3428,9 @@ type watchOptions struct {
recvMessage bool
}
-type WatchOption func(*watchOptions)
+type watchOption func(*watchOptions)
-func WatchBestPath(current bool) WatchOption {
+func watchBestPath(current bool) watchOption {
return func(o *watchOptions) {
o.bestpath = true
if current {
@@ -3439,7 +3439,7 @@ func WatchBestPath(current bool) WatchOption {
}
}
-func WatchUpdate(current bool) WatchOption {
+func watchUpdate(current bool) watchOption {
return func(o *watchOptions) {
o.preUpdate = true
if current {
@@ -3448,7 +3448,7 @@ func WatchUpdate(current bool) WatchOption {
}
}
-func WatchPostUpdate(current bool) WatchOption {
+func watchPostUpdate(current bool) watchOption {
return func(o *watchOptions) {
o.postUpdate = true
if current {
@@ -3457,7 +3457,7 @@ func WatchPostUpdate(current bool) WatchOption {
}
}
-func WatchPeerState(current bool) WatchOption {
+func watchPeerState(current bool) watchOption {
return func(o *watchOptions) {
o.peerState = true
if current {
@@ -3466,13 +3466,13 @@ func WatchPeerState(current bool) WatchOption {
}
}
-func WatchTableName(name string) WatchOption {
+func watchTableName(name string) watchOption {
return func(o *watchOptions) {
o.tableName = name
}
}
-func WatchMessage(isSent bool) WatchOption {
+func watchMessage(isSent bool) watchOption {
return func(o *watchOptions) {
if isSent {
log.WithFields(log.Fields{
@@ -3485,27 +3485,27 @@ func WatchMessage(isSent bool) WatchOption {
}
}
-type Watcher struct {
+type watcher struct {
opts watchOptions
- realCh chan WatchEvent
+ realCh chan watchEvent
ch *channels.InfiniteChannel
s *BgpServer
}
-func (w *Watcher) Event() <-chan WatchEvent {
+func (w *watcher) Event() <-chan watchEvent {
return w.realCh
}
-func (w *Watcher) Generate(t WatchEventType) error {
+func (w *watcher) Generate(t watchEventType) error {
return w.s.mgmtOperation(func() error {
switch t {
- case WATCH_EVENT_TYPE_PRE_UPDATE:
+ case watchEventTypePreUpdate:
pathList := make([]*table.Path, 0)
for _, peer := range w.s.neighborMap {
pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...)
}
- w.notify(&WatchEventAdjIn{PathList: clonePathList(pathList)})
- case WATCH_EVENT_TYPE_TABLE:
+ w.notify(&watchEventAdjIn{PathList: clonePathList(pathList)})
+ case watchEventTypeTable:
rib := w.s.globalRib
as := uint32(0)
id := table.GLOBAL_RIB_NAME
@@ -3537,7 +3537,7 @@ func (w *Watcher) Generate(t WatchEventType) error {
for _, peer := range w.s.neighborMap {
l = append(l, w.s.toConfig(peer, false))
}
- w.notify(&WatchEventTable{PathList: pathList, Neighbor: l})
+ w.notify(&watchEventTable{PathList: pathList, Neighbor: l})
default:
return fmt.Errorf("unsupported type %v", t)
}
@@ -3545,18 +3545,18 @@ func (w *Watcher) Generate(t WatchEventType) error {
}, false)
}
-func (w *Watcher) notify(v WatchEvent) {
+func (w *watcher) notify(v watchEvent) {
w.ch.In() <- v
}
-func (w *Watcher) loop() {
+func (w *watcher) loop() {
for ev := range w.ch.Out() {
- w.realCh <- ev.(WatchEvent)
+ w.realCh <- ev.(watchEvent)
}
close(w.realCh)
}
-func (w *Watcher) Stop() {
+func (w *watcher) Stop() {
w.s.mgmtOperation(func() error {
for k, l := range w.s.watcherMap {
for i, v := range l {
@@ -3576,21 +3576,21 @@ func (w *Watcher) Stop() {
}, false)
}
-func (s *BgpServer) isWatched(typ WatchEventType) bool {
+func (s *BgpServer) isWatched(typ watchEventType) bool {
return len(s.watcherMap[typ]) != 0
}
-func (s *BgpServer) notifyWatcher(typ WatchEventType, ev WatchEvent) {
+func (s *BgpServer) notifyWatcher(typ watchEventType, ev watchEvent) {
for _, w := range s.watcherMap[typ] {
w.notify(ev)
}
}
-func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
+func (s *BgpServer) watch(opts ...watchOption) (w *watcher) {
s.mgmtOperation(func() error {
- w = &Watcher{
+ w = &watcher{
s: s,
- realCh: make(chan WatchEvent, 8),
+ realCh: make(chan watchEvent, 8),
ch: channels.NewInfiniteChannel(),
}
@@ -3598,21 +3598,21 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
opt(&w.opts)
}
- register := func(t WatchEventType, w *Watcher) {
+ register := func(t watchEventType, w *watcher) {
s.watcherMap[t] = append(s.watcherMap[t], w)
}
if w.opts.bestpath {
- register(WATCH_EVENT_TYPE_BEST_PATH, w)
+ register(watchEventTypeBestPath, w)
}
if w.opts.preUpdate {
- register(WATCH_EVENT_TYPE_PRE_UPDATE, w)
+ register(watchEventTypePreUpdate, w)
}
if w.opts.postUpdate {
- register(WATCH_EVENT_TYPE_POST_UPDATE, w)
+ register(watchEventTypePostUpdate, w)
}
if w.opts.peerState {
- register(WATCH_EVENT_TYPE_PEER_STATE, w)
+ register(watchEventTypePeerState, w)
}
if w.opts.initPeerState {
for _, peer := range s.neighborMap {
@@ -3626,7 +3626,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
}
}
if w.opts.initBest && s.active() == nil {
- w.notify(&WatchEventBestPath{
+ w.notify(&watchEventBestPath{
PathList: s.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, 0, nil),
MultiPathList: s.globalRib.GetBestMultiPathList(table.GLOBAL_RIB_NAME, nil),
})
@@ -3644,7 +3644,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- update := &WatchEventUpdate{
+ update := &watchEventUpdate{
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
@@ -3662,7 +3662,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
eor := bgp.NewEndOfRib(rf)
eorBuf, _ := eor.Serialize()
peer.fsm.lock.RLock()
- update = &WatchEventUpdate{
+ update = &watchEventUpdate{
Message: eor,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
@@ -3697,7 +3697,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
configNeighbor = w.s.toConfig(peer, false)
}
- w.notify(&WatchEventUpdate{
+ w.notify(&watchEventUpdate{
PeerAS: peerInfo.AS,
PeerAddress: peerInfo.Address,
PeerID: peerInfo.ID,
@@ -3709,7 +3709,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
eor := bgp.NewEndOfRib(rf)
eorBuf, _ := eor.Serialize()
- w.notify(&WatchEventUpdate{
+ w.notify(&watchEventUpdate{
Message: eor,
PeerAS: peerInfo.AS,
PeerAddress: peerInfo.Address,
@@ -3724,7 +3724,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
}
}
if w.opts.recvMessage {
- register(WATCH_EVENT_TYPE_RECV_MSG, w)
+ register(watchEventTypeRecvMsg, w)
}
go w.loop()
diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go
index 164247f1..27f56353 100644
--- a/pkg/server/server_test.go
+++ b/pkg/server/server_test.go
@@ -229,7 +229,7 @@ func TestMonitor(test *testing.T) {
}
// Test WatchBestPath.
- w := s.Watch(WatchBestPath(false))
+ w := s.watch(watchBestPath(false))
// Advertises a route.
attrs := []bgp.PathAttributeInterface{
@@ -240,7 +240,7 @@ func TestMonitor(test *testing.T) {
log.Fatal(err)
}
ev := <-w.Event()
- b := ev.(*WatchEventBestPath)
+ b := ev.(*watchEventBestPath)
assert.Equal(1, len(b.PathList))
assert.Equal("10.0.0.0/24", b.PathList[0].GetNlri().String())
assert.False(b.PathList[0].IsWithdraw)
@@ -251,7 +251,7 @@ func TestMonitor(test *testing.T) {
log.Fatal(err)
}
ev = <-w.Event()
- b = ev.(*WatchEventBestPath)
+ b = ev.(*watchEventBestPath)
assert.Equal(1, len(b.PathList))
assert.Equal("10.0.0.0/24", b.PathList[0].GetNlri().String())
assert.True(b.PathList[0].IsWithdraw)
@@ -276,16 +276,16 @@ func TestMonitor(test *testing.T) {
}
// Test WatchUpdate with "current" flag.
- w = s.Watch(WatchUpdate(true))
+ w = s.watch(watchUpdate(true))
// Test the initial route.
ev = <-w.Event()
- u := ev.(*WatchEventUpdate)
+ u := ev.(*watchEventUpdate)
assert.Equal(1, len(u.PathList))
assert.Equal("10.1.0.0/24", u.PathList[0].GetNlri().String())
assert.False(u.PathList[0].IsWithdraw)
ev = <-w.Event()
- u = ev.(*WatchEventUpdate)
+ u = ev.(*watchEventUpdate)
assert.Equal(len(u.PathList), 0) // End of RIB
// Advertises an additional route.
@@ -293,7 +293,7 @@ func TestMonitor(test *testing.T) {
log.Fatal(err)
}
ev = <-w.Event()
- u = ev.(*WatchEventUpdate)
+ u = ev.(*watchEventUpdate)
assert.Equal(1, len(u.PathList))
assert.Equal("10.2.0.0/24", u.PathList[0].GetNlri().String())
assert.False(u.PathList[0].IsWithdraw)
@@ -304,7 +304,7 @@ func TestMonitor(test *testing.T) {
log.Fatal(err)
}
ev = <-w.Event()
- u = ev.(*WatchEventUpdate)
+ u = ev.(*watchEventUpdate)
assert.Equal(1, len(u.PathList))
assert.Equal("10.2.0.0/24", u.PathList[0].GetNlri().String())
assert.True(u.PathList[0].IsWithdraw)
@@ -952,7 +952,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) {
if err = peerServers(t, ctx, []*BgpServer{s1, s2}, []config.AfiSafiType{config.AFI_SAFI_TYPE_L3VPN_IPV4_UNICAST, config.AFI_SAFI_TYPE_RTC}); err != nil {
t.Fatal(err)
}
- watcher := s1.Watch(WatchUpdate(true))
+ watcher := s1.watch(watchUpdate(true))
// Add route to vrf1 on s2
attrs := []bgp.PathAttributeInterface{
@@ -976,7 +976,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) {
select {
case ev := <-watcher.Event():
switch msg := ev.(type) {
- case *WatchEventUpdate:
+ case *watchEventUpdate:
for _, path := range msg.PathList {
log.Infof("tester received path: %s", path.String())
if vpnPath, ok := path.GetNlri().(*bgp.LabeledVPNIPAddrPrefix); ok {
@@ -1019,7 +1019,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) {
select {
case ev := <-watcher.Event():
switch msg := ev.(type) {
- case *WatchEventUpdate:
+ case *watchEventUpdate:
for _, path := range msg.PathList {
log.Infof("tester received path: %s", path.String())
if vpnPath, ok := path.GetNlri().(*bgp.LabeledVPNIPAddrPrefix); ok {
diff --git a/pkg/server/zclient.go b/pkg/server/zclient.go
index c4abd12b..33190591 100644
--- a/pkg/server/zclient.go
+++ b/pkg/server/zclient.go
@@ -331,9 +331,9 @@ func (z *zebraClient) updatePathByNexthopCache(paths []*table.Path) {
}
func (z *zebraClient) loop() {
- w := z.server.Watch([]WatchOption{
- WatchBestPath(true),
- WatchPostUpdate(true),
+ w := z.server.watch([]watchOption{
+ watchBestPath(true),
+ watchPostUpdate(true),
}...)
defer w.Stop()
@@ -368,7 +368,7 @@ func (z *zebraClient) loop() {
}
case ev := <-w.Event():
switch msg := ev.(type) {
- case *WatchEventBestPath:
+ case *watchEventBestPath:
if table.UseMultiplePaths.Enabled {
for _, paths := range msg.MultiPathList {
z.updatePathByNexthopCache(paths)
@@ -398,7 +398,7 @@ func (z *zebraClient) loop() {
}
}
}
- case *WatchEventUpdate:
+ case *watchEventUpdate:
if body := newNexthopRegisterBody(msg.PathList, z.nexthopCache); body != nil {
vrfID := uint32(0)
for _, vrf := range z.server.listVrf() {