diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-26 10:23:51 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-26 10:23:51 +0900 |
commit | ebac86e07ac40d19037ca100c42bac7ba94aae12 (patch) | |
tree | c46d8b94dfcd98b99bb9e20fc28279c33fd6664d /server | |
parent | 511f487dd1dcdf836d15a231293189aaf0dbf528 (diff) |
export Watch feature
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r-- | server/bmp.go | 24 | ||||
-rw-r--r-- | server/collector.go | 36 | ||||
-rw-r--r-- | server/grpc_server.go | 40 | ||||
-rw-r--r-- | server/mrt.go | 16 | ||||
-rw-r--r-- | server/server.go | 254 | ||||
-rw-r--r-- | server/zclient.go | 6 |
6 files changed, 188 insertions, 188 deletions
diff --git a/server/bmp.go b/server/bmp.go index 0c7bc61d..084066ab 100644 --- a/server/bmp.go +++ b/server/bmp.go @@ -87,27 +87,27 @@ func (b *bmpClient) loop() { select { case ev := <-w.Event(): switch msg := ev.(type) { - case *watcherEventUpdateMsg: + case *WatchEventUpdate: info := &table.PeerInfo{ - Address: msg.peerAddress, - AS: msg.peerAS, - ID: msg.peerID, + Address: msg.PeerAddress, + AS: msg.PeerAS, + ID: msg.PeerID, } - if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload)); err != nil { + if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, info, msg.Timestamp.Unix(), msg.Payload)); err != nil { return false } - case *watcherEventStateChangedMsg: + case *WatchEventPeerState: info := &table.PeerInfo{ - Address: msg.peerAddress, - AS: msg.peerAS, - ID: msg.peerID, + Address: msg.PeerAddress, + AS: msg.PeerAS, + ID: msg.PeerID, } - if msg.state == bgp.BGP_FSM_ESTABLISHED { - if err := write(bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil { + if msg.State == bgp.BGP_FSM_ESTABLISHED { + if err := write(bmpPeerUp(msg.LocalAddress.String(), msg.LocalPort, msg.PeerPort, msg.SentOpen, msg.RecvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.Timestamp.Unix())); err != nil { return false } } else { - if err := write(bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil { + if err := write(bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.Timestamp.Unix())); err != nil { return false } } diff --git a/server/collector.go b/server/collector.go index b4fdb061..583a1999 100644 --- a/server/collector.go +++ b/server/collector.go @@ -47,28 +47,28 @@ func (c *Collector) writePoints(points []*client.Point) error { return c.client.Write(bp) } -func (c *Collector) writePeer(msg *watcherEventStateChangedMsg) error { +func (c *Collector) writePeer(msg *WatchEventPeerState) error { var state string - switch msg.state { + switch msg.State { case bgp.BGP_FSM_ESTABLISHED: state = "Established" case bgp.BGP_FSM_IDLE: state = "Idle" default: - return fmt.Errorf("unexpected fsm state %v", msg.state) + return fmt.Errorf("unexpected fsm state %v", msg.State) } tags := map[string]string{ - "PeerAddress": msg.peerAddress.String(), - "PeerAS": fmt.Sprintf("%v", msg.peerAS), + "PeerAddress": msg.PeerAddress.String(), + "PeerAS": fmt.Sprintf("%v", msg.PeerAS), "State": state, } fields := map[string]interface{}{ - "PeerID": msg.peerID.String(), + "PeerID": msg.PeerID.String(), } - pt, err := client.NewPoint(MEATUREMENT_PEER, tags, fields, msg.timestamp) + pt, err := client.NewPoint(MEATUREMENT_PEER, tags, fields, msg.Timestamp) if err != nil { return err } @@ -121,14 +121,14 @@ func path2data(path *table.Path) (map[string]interface{}, map[string]string) { return fields, tags } -func (c *Collector) writeUpdate(msg *watcherEventUpdateMsg) error { - if len(msg.pathList) == 0 { +func (c *Collector) writeUpdate(msg *WatchEventUpdate) error { + if len(msg.PathList) == 0 { // EOR return nil } now := time.Now() - points := make([]*client.Point, 0, len(msg.pathList)) - for _, path := range msg.pathList { + points := make([]*client.Point, 0, len(msg.PathList)) + for _, path := range msg.PathList { fields, tags := path2data(path) tags["Withdraw"] = fmt.Sprintf("%v", path.IsWithdraw) pt, err := client.NewPoint(MEATUREMENT_UPDATE, tags, fields, now) @@ -140,10 +140,10 @@ func (c *Collector) writeUpdate(msg *watcherEventUpdateMsg) error { return c.writePoints(points) } -func (c *Collector) writeTable(msg *watcherEventAdjInMsg) error { +func (c *Collector) writeTable(msg *WatchEventAdjIn) error { now := time.Now() - points := make([]*client.Point, 0, len(msg.pathList)) - for _, path := range msg.pathList { + points := make([]*client.Point, 0, len(msg.PathList)) + for _, path := range msg.PathList { fields, tags := path2data(path) pt, err := client.NewPoint(MEATUREMENT_TABLE, tags, fields, now) if err != nil { @@ -168,18 +168,18 @@ func (c *Collector) loop() { for { select { case <-ticker.C: - w.Generate(WATCH_TYPE_PRE_UPDATE) + w.Generate(WATCH_EVENT_TYPE_PRE_UPDATE) case ev := <-w.Event(): switch msg := ev.(type) { - case *watcherEventUpdateMsg: + case *WatchEventUpdate: if err := c.writeUpdate(msg); err != nil { log.Error(err) } - case *watcherEventStateChangedMsg: + case *WatchEventPeerState: if err := c.writePeer(msg); err != nil { log.Error(err) } - case *watcherEventAdjInMsg: + case *WatchEventAdjIn: if err := c.writeTable(msg); err != nil { log.Error(err) } diff --git a/server/grpc_server.go b/server/grpc_server.go index f8c411ad..61a80f77 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -383,22 +383,22 @@ func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer select { case ev := <-w.Event(): switch msg := ev.(type) { - case *watcherEventBestPathMsg: + case *WatchEventBestPath: if err := sendPath(func() []*table.Path { - if len(msg.multiPathList) > 0 { + if len(msg.MultiPathList) > 0 { l := make([]*table.Path, 0) - for _, p := range msg.multiPathList { + for _, p := range msg.MultiPathList { l = append(l, p...) } return l } else { - return msg.pathList + return msg.PathList } }()); err != nil { return err } - case *watcherEventUpdateMsg: - if err := sendPath(msg.pathList); err != nil { + case *WatchEventUpdate: + if err := sendPath(msg.PathList); err != nil { return err } } @@ -416,28 +416,28 @@ func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_Monito select { case ev := <-w.Event(): switch msg := ev.(type) { - case *watcherEventStateChangedMsg: - if len(arg.Name) > 0 && arg.Name != msg.peerAddress.String() { + case *WatchEventPeerState: + if len(arg.Name) > 0 && arg.Name != msg.PeerAddress.String() { continue } if err := stream.Send(&api.Peer{ Conf: &api.PeerConf{ - PeerAs: msg.peerAS, - LocalAs: msg.localAS, - NeighborAddress: msg.peerAddress.String(), - Id: msg.peerID.String(), + PeerAs: msg.PeerAS, + LocalAs: msg.LocalAS, + NeighborAddress: msg.PeerAddress.String(), + Id: msg.PeerID.String(), }, Info: &api.PeerState{ - PeerAs: msg.peerAS, - LocalAs: msg.localAS, - NeighborAddress: msg.peerAddress.String(), - BgpState: msg.state.String(), - AdminState: msg.adminState.String(), + PeerAs: msg.PeerAS, + LocalAs: msg.LocalAS, + NeighborAddress: msg.PeerAddress.String(), + BgpState: msg.State.String(), + AdminState: msg.AdminState.String(), }, Transport: &api.Transport{ - LocalAddress: msg.localAddress.String(), - LocalPort: uint32(msg.localPort), - RemotePort: uint32(msg.peerPort), + LocalAddress: msg.LocalAddress.String(), + LocalPort: uint32(msg.LocalPort), + RemotePort: uint32(msg.PeerPort), }, }); err != nil { return err diff --git a/server/mrt.go b/server/mrt.go index 9c2673be..dcbd3542 100644 --- a/server/mrt.go +++ b/server/mrt.go @@ -56,15 +56,15 @@ func (m *mrtWriter) loop() error { }() for { - serialize := func(ev watcherEvent) ([]byte, error) { - m := ev.(*watcherEventUpdateMsg) + serialize := func(ev WatchEvent) ([]byte, error) { + m := ev.(*WatchEventUpdate) subtype := mrt.MESSAGE_AS4 - mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil) - mp.BGPMessagePayload = m.payload - if m.fourBytesAs == false { + mp := mrt.NewBGP4MPMessage(m.PeerAS, m.LocalAS, 0, m.PeerAddress.String(), m.LocalAddress.String(), m.FourBytesAs, nil) + mp.BGPMessagePayload = m.Payload + if m.FourBytesAs == false { subtype = mrt.MESSAGE } - bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp) + bm, err := mrt.NewMRTMessage(uint32(m.Timestamp.Unix()), mrt.BGP4MP, subtype, mp) if err != nil { log.WithFields(log.Fields{ "Topic": "mrt", @@ -75,8 +75,8 @@ func (m *mrtWriter) loop() error { return bm.Serialize() } - drain := func(ev watcherEvent) { - events := make([]watcherEvent, 0, 1+len(w.Event())) + drain := func(ev WatchEvent) { + events := make([]WatchEvent, 0, 1+len(w.Event())) if ev != nil { events = append(events, ev) } diff --git a/server/server.go b/server/server.go index 37ea1f8d..dc8ab200 100644 --- a/server/server.go +++ b/server/server.go @@ -103,7 +103,7 @@ type BgpServer struct { globalRib *table.TableManager roaManager *roaManager shutdown bool - watcherMap map[watchType][]*Watcher + watcherMap map[WatchEventType][]*Watcher zclient *zebraClient bmpManager *bmpClientManager mrt *mrtWriter @@ -117,7 +117,7 @@ func NewBgpServer() *BgpServer { policy: table.NewRoutingPolicy(), roaManager: roaManager, mgmtCh: make(chan func(), 1), - watcherMap: make(map[watchType][]*Watcher), + watcherMap: make(map[WatchEventType][]*Watcher), } s.bmpManager = newBmpClientManager(s) return s @@ -381,7 +381,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil for i, pathList := range multipath { clonedMpath[i] = clonePathList(pathList) } - server.notifyWatcher(WATCH_TYPE_BESTPATH, &watcherEventBestPathMsg{pathList: clonePathList(best[table.GLOBAL_RIB_NAME]), multiPathList: clonedMpath}) + server.notifyWatcher(WATCH_EVENT_TYPE_BEST_PATH, &WatchEventBestPath{PathList: clonePathList(best[table.GLOBAL_RIB_NAME]), MultiPathList: clonedMpath}) } for _, targetPeer := range server.neighborMap { @@ -395,31 +395,31 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil } } -func createWatcherEventStateChange(peer *Peer) *watcherEventStateChangedMsg { +func createWatchEventPeerState(peer *Peer) *WatchEventPeerState { _, rport := peer.fsm.RemoteHostPort() laddr, lport := peer.fsm.LocalHostPort() sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) recvOpen := peer.fsm.recvOpen - return &watcherEventStateChangedMsg{ - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(laddr), - peerPort: rport, - localPort: lport, - peerID: peer.fsm.peerInfo.ID, - sentOpen: sentOpen, - recvOpen: recvOpen, - state: peer.fsm.state, - adminState: peer.fsm.adminState, - timestamp: time.Now(), + return &WatchEventPeerState{ + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(laddr), + PeerPort: rport, + LocalPort: lport, + PeerID: peer.fsm.peerInfo.ID, + SentOpen: sentOpen, + RecvOpen: recvOpen, + State: peer.fsm.state, + AdminState: peer.fsm.adminState, + Timestamp: time.Now(), } } func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { newState := peer.fsm.state if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { - server.notifyWatcher(WATCH_TYPE_PEER_STATE, createWatcherEventStateChange(peer)) + server.notifyWatcher(WATCH_EVENT_TYPE_PEER_STATE, createWatchEventPeerState(peer)) } } @@ -534,7 +534,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []* for i, pathList := range multipath { clonedMpath[i] = clonePathList(pathList) } - server.notifyWatcher(WATCH_TYPE_BESTPATH, &watcherEventBestPathMsg{pathList: clonePathList(best[table.GLOBAL_RIB_NAME]), multiPathList: clonedMpath}) + server.notifyWatcher(WATCH_EVENT_TYPE_BEST_PATH, &WatchEventBestPath{PathList: clonePathList(best[table.GLOBAL_RIB_NAME]), MultiPathList: clonedMpath}) } @@ -678,46 +678,46 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { sendFsmOutgoingMsg(peer, nil, notification, true) return } - if m.Header.Type == bgp.BGP_MSG_UPDATE && server.isWatched(WATCH_TYPE_PRE_UPDATE) { + if m.Header.Type == bgp.BGP_MSG_UPDATE && server.isWatched(WATCH_EVENT_TYPE_PRE_UPDATE) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - ev := &watcherEventUpdateMsg{ - message: m, - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(l), - peerID: peer.fsm.peerInfo.ID, - fourBytesAs: y, - timestamp: e.timestamp, - payload: e.payload, - postPolicy: false, - pathList: clonePathList(pathList), + ev := &WatchEventUpdate{ + Message: m, + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Timestamp: e.timestamp, + Payload: e.payload, + PostPolicy: false, + PathList: clonePathList(pathList), } - server.notifyWatcher(WATCH_TYPE_PRE_UPDATE, ev) + server.notifyWatcher(WATCH_EVENT_TYPE_PRE_UPDATE, ev) } if len(pathList) > 0 { var altered []*table.Path altered = server.propagateUpdate(peer, pathList) - if server.isWatched(WATCH_TYPE_POST_UPDATE) { + if server.isWatched(WATCH_EVENT_TYPE_POST_UPDATE) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - ev := &watcherEventUpdateMsg{ - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(l), - peerID: peer.fsm.peerInfo.ID, - fourBytesAs: y, - timestamp: e.timestamp, - postPolicy: true, - pathList: clonePathList(altered), + ev := &WatchEventUpdate{ + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Timestamp: e.timestamp, + PostPolicy: true, + PathList: clonePathList(altered), } for _, u := range table.CreateUpdateMsgFromPaths(altered) { payload, _ := u.Serialize() - ev.payload = payload - server.notifyWatcher(WATCH_TYPE_POST_UPDATE, ev) + ev.Payload = payload + server.notifyWatcher(WATCH_EVENT_TYPE_POST_UPDATE, ev) } } } @@ -2653,15 +2653,56 @@ func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) { } } -type watchType string +type WatchEventType string const ( - WATCH_TYPE_BESTPATH watchType = "bestpath" - WATCH_TYPE_PRE_UPDATE watchType = "preupdate" - WATCH_TYPE_POST_UPDATE watchType = "postupdate" - WATCH_TYPE_PEER_STATE watchType = "peerstate" + WATCH_EVENT_TYPE_BEST_PATH WatchEventType = "bestpath" + WATCH_EVENT_TYPE_PRE_UPDATE WatchEventType = "preupdate" + WATCH_EVENT_TYPE_POST_UPDATE WatchEventType = "postupdate" + WATCH_EVENT_TYPE_PEER_STATE WatchEventType = "peerstate" ) +type WatchEvent interface { +} + +type WatchEventUpdate struct { + Message *bgp.BGPMessage + PeerAS uint32 + LocalAS uint32 + PeerAddress net.IP + LocalAddress net.IP + PeerID net.IP + FourBytesAs bool + Timestamp time.Time + Payload []byte + PostPolicy bool + PathList []*table.Path +} + +type WatchEventPeerState struct { + PeerAS uint32 + LocalAS uint32 + PeerAddress net.IP + LocalAddress net.IP + PeerPort uint16 + LocalPort uint16 + PeerID net.IP + SentOpen *bgp.BGPMessage + RecvOpen *bgp.BGPMessage + State bgp.FSMState + AdminState AdminState + Timestamp time.Time +} + +type WatchEventAdjIn struct { + PathList []*table.Path +} + +type WatchEventBestPath struct { + PathList []*table.Path + MultiPathList [][]*table.Path +} + type watchOptions struct { bestpath bool preUpdate bool @@ -2709,16 +2750,16 @@ func WatchPeerState(current bool) WatchOption { type Watcher struct { opts watchOptions - realCh chan watcherEvent + realCh chan WatchEvent ch *channels.InfiniteChannel s *BgpServer } -func (w *Watcher) Event() <-chan watcherEvent { +func (w *Watcher) Event() <-chan WatchEvent { return w.realCh } -func (w *Watcher) Generate(t watchType) (err error) { +func (w *Watcher) Generate(t WatchEventType) (err error) { ch := make(chan struct{}) defer func() { <-ch }() @@ -2726,7 +2767,7 @@ func (w *Watcher) Generate(t watchType) (err error) { defer close(ch) switch t { - case WATCH_TYPE_PRE_UPDATE: + case WATCH_EVENT_TYPE_PRE_UPDATE: default: err = fmt.Errorf("unsupported type ", t) return @@ -2735,12 +2776,12 @@ func (w *Watcher) Generate(t watchType) (err error) { for _, peer := range w.s.neighborMap { pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...) } - w.notify(&watcherEventAdjInMsg{pathList: clonePathList(pathList)}) + w.notify(&WatchEventAdjIn{PathList: clonePathList(pathList)}) } return err } -func (w *Watcher) notify(v watcherEvent) { +func (w *Watcher) notify(v WatchEvent) { w.ch.In() <- v } @@ -2752,7 +2793,7 @@ func (w *Watcher) loop() { close(w.realCh) return } - w.realCh <- ev.(watcherEvent) + w.realCh <- ev.(WatchEvent) } } } @@ -2785,11 +2826,11 @@ func (w *Watcher) Stop() { } } -func (s *BgpServer) isWatched(typ watchType) bool { +func (s *BgpServer) isWatched(typ WatchEventType) bool { return len(s.watcherMap[typ]) != 0 } -func (s *BgpServer) notifyWatcher(typ watchType, ev watcherEvent) { +func (s *BgpServer) notifyWatcher(typ WatchEventType, ev WatchEvent) { for _, w := range s.watcherMap[typ] { w.notify(ev) } @@ -2804,7 +2845,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { w = &Watcher{ s: s, - realCh: make(chan watcherEvent, 8), + realCh: make(chan WatchEvent, 8), ch: channels.NewInfiniteChannel(), } @@ -2812,28 +2853,28 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { opt(&w.opts) } - register := func(t watchType, w *Watcher) { + register := func(t WatchEventType, w *Watcher) { s.watcherMap[t] = append(s.watcherMap[t], w) } if w.opts.bestpath { - register(WATCH_TYPE_BESTPATH, w) + register(WATCH_EVENT_TYPE_BEST_PATH, w) } if w.opts.preUpdate { - register(WATCH_TYPE_PRE_UPDATE, w) + register(WATCH_EVENT_TYPE_PRE_UPDATE, w) } if w.opts.postUpdate { - register(WATCH_TYPE_POST_UPDATE, w) + register(WATCH_EVENT_TYPE_POST_UPDATE, w) } if w.opts.peerState { - register(WATCH_TYPE_PEER_STATE, w) + register(WATCH_EVENT_TYPE_PEER_STATE, w) } if w.opts.initPeerState { for _, peer := range s.neighborMap { if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { continue } - w.notify(createWatcherEventStateChange(peer)) + w.notify(createWatchEventPeerState(peer)) } } if w.opts.initUpdate { @@ -2846,17 +2887,17 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { buf, _ := msgs[0].Serialize() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - w.notify(&watcherEventUpdateMsg{ - message: msgs[0], - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(l), - peerID: peer.fsm.peerInfo.ID, - fourBytesAs: y, - timestamp: path.GetTimestamp(), - payload: buf, - postPolicy: false, + w.notify(&WatchEventUpdate{ + Message: msgs[0], + PeerAS: peer.fsm.peerInfo.AS, + LocalAS: peer.fsm.peerInfo.LocalAS, + PeerAddress: peer.fsm.peerInfo.Address, + LocalAddress: net.ParseIP(l), + PeerID: peer.fsm.peerInfo.ID, + FourBytesAs: y, + Timestamp: path.GetTimestamp(), + Payload: buf, + PostPolicy: false, }) } } @@ -2865,14 +2906,14 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { for _, path := range s.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, s.globalRib.GetRFlist()) { msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) buf, _ := msgs[0].Serialize() - w.notify(&watcherEventUpdateMsg{ - peerAS: path.GetSource().AS, - peerAddress: path.GetSource().Address, - peerID: path.GetSource().ID, - message: msgs[0], - timestamp: path.GetTimestamp(), - payload: buf, - postPolicy: true, + w.notify(&WatchEventUpdate{ + PeerAS: path.GetSource().AS, + PeerAddress: path.GetSource().Address, + PeerID: path.GetSource().ID, + Message: msgs[0], + Timestamp: path.GetTimestamp(), + Payload: buf, + PostPolicy: true, }) } @@ -2882,44 +2923,3 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } return w } - -type watcherEvent interface { -} - -type watcherEventUpdateMsg struct { - message *bgp.BGPMessage - peerAS uint32 - localAS uint32 - peerAddress net.IP - localAddress net.IP - peerID net.IP - fourBytesAs bool - timestamp time.Time - payload []byte - postPolicy bool - pathList []*table.Path -} - -type watcherEventStateChangedMsg struct { - peerAS uint32 - localAS uint32 - peerAddress net.IP - localAddress net.IP - peerPort uint16 - localPort uint16 - peerID net.IP - sentOpen *bgp.BGPMessage - recvOpen *bgp.BGPMessage - state bgp.FSMState - adminState AdminState - timestamp time.Time -} - -type watcherEventAdjInMsg struct { - pathList []*table.Path -} - -type watcherEventBestPathMsg struct { - pathList []*table.Path - multiPathList [][]*table.Path -} diff --git a/server/zclient.go b/server/zclient.go index 23b74963..0e20c7f9 100644 --- a/server/zclient.go +++ b/server/zclient.go @@ -176,15 +176,15 @@ func (z *zebraClient) loop() { } } case ev := <-w.Event(): - msg := ev.(*watcherEventBestPathMsg) + msg := ev.(*WatchEventBestPath) if table.UseMultiplePaths.Enabled { - for _, dst := range msg.multiPathList { + for _, dst := range msg.MultiPathList { if m := newIPRouteMessage(dst); m != nil { z.client.Send(m) } } } else { - for _, path := range msg.pathList { + for _, path := range msg.PathList { if m := newIPRouteMessage([]*table.Path{path}); m != nil { z.client.Send(m) } |