summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-26 10:23:51 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-26 10:23:51 +0900
commitebac86e07ac40d19037ca100c42bac7ba94aae12 (patch)
treec46d8b94dfcd98b99bb9e20fc28279c33fd6664d
parent511f487dd1dcdf836d15a231293189aaf0dbf528 (diff)
export Watch feature
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--server/bmp.go24
-rw-r--r--server/collector.go36
-rw-r--r--server/grpc_server.go40
-rw-r--r--server/mrt.go16
-rw-r--r--server/server.go254
-rw-r--r--server/zclient.go6
6 files changed, 188 insertions, 188 deletions
diff --git a/server/bmp.go b/server/bmp.go
index 0c7bc61d..084066ab 100644
--- a/server/bmp.go
+++ b/server/bmp.go
@@ -87,27 +87,27 @@ func (b *bmpClient) loop() {
select {
case ev := <-w.Event():
switch msg := ev.(type) {
- case *watcherEventUpdateMsg:
+ case *WatchEventUpdate:
info := &table.PeerInfo{
- Address: msg.peerAddress,
- AS: msg.peerAS,
- ID: msg.peerID,
+ Address: msg.PeerAddress,
+ AS: msg.PeerAS,
+ ID: msg.PeerID,
}
- if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload)); err != nil {
+ if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, info, msg.Timestamp.Unix(), msg.Payload)); err != nil {
return false
}
- case *watcherEventStateChangedMsg:
+ case *WatchEventPeerState:
info := &table.PeerInfo{
- Address: msg.peerAddress,
- AS: msg.peerAS,
- ID: msg.peerID,
+ Address: msg.PeerAddress,
+ AS: msg.PeerAS,
+ ID: msg.PeerID,
}
- if msg.state == bgp.BGP_FSM_ESTABLISHED {
- if err := write(bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil {
+ if msg.State == bgp.BGP_FSM_ESTABLISHED {
+ if err := write(bmpPeerUp(msg.LocalAddress.String(), msg.LocalPort, msg.PeerPort, msg.SentOpen, msg.RecvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.Timestamp.Unix())); err != nil {
return false
}
} else {
- if err := write(bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil {
+ if err := write(bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.Timestamp.Unix())); err != nil {
return false
}
}
diff --git a/server/collector.go b/server/collector.go
index b4fdb061..583a1999 100644
--- a/server/collector.go
+++ b/server/collector.go
@@ -47,28 +47,28 @@ func (c *Collector) writePoints(points []*client.Point) error {
return c.client.Write(bp)
}
-func (c *Collector) writePeer(msg *watcherEventStateChangedMsg) error {
+func (c *Collector) writePeer(msg *WatchEventPeerState) error {
var state string
- switch msg.state {
+ switch msg.State {
case bgp.BGP_FSM_ESTABLISHED:
state = "Established"
case bgp.BGP_FSM_IDLE:
state = "Idle"
default:
- return fmt.Errorf("unexpected fsm state %v", msg.state)
+ return fmt.Errorf("unexpected fsm state %v", msg.State)
}
tags := map[string]string{
- "PeerAddress": msg.peerAddress.String(),
- "PeerAS": fmt.Sprintf("%v", msg.peerAS),
+ "PeerAddress": msg.PeerAddress.String(),
+ "PeerAS": fmt.Sprintf("%v", msg.PeerAS),
"State": state,
}
fields := map[string]interface{}{
- "PeerID": msg.peerID.String(),
+ "PeerID": msg.PeerID.String(),
}
- pt, err := client.NewPoint(MEATUREMENT_PEER, tags, fields, msg.timestamp)
+ pt, err := client.NewPoint(MEATUREMENT_PEER, tags, fields, msg.Timestamp)
if err != nil {
return err
}
@@ -121,14 +121,14 @@ func path2data(path *table.Path) (map[string]interface{}, map[string]string) {
return fields, tags
}
-func (c *Collector) writeUpdate(msg *watcherEventUpdateMsg) error {
- if len(msg.pathList) == 0 {
+func (c *Collector) writeUpdate(msg *WatchEventUpdate) error {
+ if len(msg.PathList) == 0 {
// EOR
return nil
}
now := time.Now()
- points := make([]*client.Point, 0, len(msg.pathList))
- for _, path := range msg.pathList {
+ points := make([]*client.Point, 0, len(msg.PathList))
+ for _, path := range msg.PathList {
fields, tags := path2data(path)
tags["Withdraw"] = fmt.Sprintf("%v", path.IsWithdraw)
pt, err := client.NewPoint(MEATUREMENT_UPDATE, tags, fields, now)
@@ -140,10 +140,10 @@ func (c *Collector) writeUpdate(msg *watcherEventUpdateMsg) error {
return c.writePoints(points)
}
-func (c *Collector) writeTable(msg *watcherEventAdjInMsg) error {
+func (c *Collector) writeTable(msg *WatchEventAdjIn) error {
now := time.Now()
- points := make([]*client.Point, 0, len(msg.pathList))
- for _, path := range msg.pathList {
+ points := make([]*client.Point, 0, len(msg.PathList))
+ for _, path := range msg.PathList {
fields, tags := path2data(path)
pt, err := client.NewPoint(MEATUREMENT_TABLE, tags, fields, now)
if err != nil {
@@ -168,18 +168,18 @@ func (c *Collector) loop() {
for {
select {
case <-ticker.C:
- w.Generate(WATCH_TYPE_PRE_UPDATE)
+ w.Generate(WATCH_EVENT_TYPE_PRE_UPDATE)
case ev := <-w.Event():
switch msg := ev.(type) {
- case *watcherEventUpdateMsg:
+ case *WatchEventUpdate:
if err := c.writeUpdate(msg); err != nil {
log.Error(err)
}
- case *watcherEventStateChangedMsg:
+ case *WatchEventPeerState:
if err := c.writePeer(msg); err != nil {
log.Error(err)
}
- case *watcherEventAdjInMsg:
+ case *WatchEventAdjIn:
if err := c.writeTable(msg); err != nil {
log.Error(err)
}
diff --git a/server/grpc_server.go b/server/grpc_server.go
index f8c411ad..61a80f77 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -383,22 +383,22 @@ func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer
select {
case ev := <-w.Event():
switch msg := ev.(type) {
- case *watcherEventBestPathMsg:
+ case *WatchEventBestPath:
if err := sendPath(func() []*table.Path {
- if len(msg.multiPathList) > 0 {
+ if len(msg.MultiPathList) > 0 {
l := make([]*table.Path, 0)
- for _, p := range msg.multiPathList {
+ for _, p := range msg.MultiPathList {
l = append(l, p...)
}
return l
} else {
- return msg.pathList
+ return msg.PathList
}
}()); err != nil {
return err
}
- case *watcherEventUpdateMsg:
- if err := sendPath(msg.pathList); err != nil {
+ case *WatchEventUpdate:
+ if err := sendPath(msg.PathList); err != nil {
return err
}
}
@@ -416,28 +416,28 @@ func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_Monito
select {
case ev := <-w.Event():
switch msg := ev.(type) {
- case *watcherEventStateChangedMsg:
- if len(arg.Name) > 0 && arg.Name != msg.peerAddress.String() {
+ case *WatchEventPeerState:
+ if len(arg.Name) > 0 && arg.Name != msg.PeerAddress.String() {
continue
}
if err := stream.Send(&api.Peer{
Conf: &api.PeerConf{
- PeerAs: msg.peerAS,
- LocalAs: msg.localAS,
- NeighborAddress: msg.peerAddress.String(),
- Id: msg.peerID.String(),
+ PeerAs: msg.PeerAS,
+ LocalAs: msg.LocalAS,
+ NeighborAddress: msg.PeerAddress.String(),
+ Id: msg.PeerID.String(),
},
Info: &api.PeerState{
- PeerAs: msg.peerAS,
- LocalAs: msg.localAS,
- NeighborAddress: msg.peerAddress.String(),
- BgpState: msg.state.String(),
- AdminState: msg.adminState.String(),
+ PeerAs: msg.PeerAS,
+ LocalAs: msg.LocalAS,
+ NeighborAddress: msg.PeerAddress.String(),
+ BgpState: msg.State.String(),
+ AdminState: msg.AdminState.String(),
},
Transport: &api.Transport{
- LocalAddress: msg.localAddress.String(),
- LocalPort: uint32(msg.localPort),
- RemotePort: uint32(msg.peerPort),
+ LocalAddress: msg.LocalAddress.String(),
+ LocalPort: uint32(msg.LocalPort),
+ RemotePort: uint32(msg.PeerPort),
},
}); err != nil {
return err
diff --git a/server/mrt.go b/server/mrt.go
index 9c2673be..dcbd3542 100644
--- a/server/mrt.go
+++ b/server/mrt.go
@@ -56,15 +56,15 @@ func (m *mrtWriter) loop() error {
}()
for {
- serialize := func(ev watcherEvent) ([]byte, error) {
- m := ev.(*watcherEventUpdateMsg)
+ serialize := func(ev WatchEvent) ([]byte, error) {
+ m := ev.(*WatchEventUpdate)
subtype := mrt.MESSAGE_AS4
- mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil)
- mp.BGPMessagePayload = m.payload
- if m.fourBytesAs == false {
+ mp := mrt.NewBGP4MPMessage(m.PeerAS, m.LocalAS, 0, m.PeerAddress.String(), m.LocalAddress.String(), m.FourBytesAs, nil)
+ mp.BGPMessagePayload = m.Payload
+ if m.FourBytesAs == false {
subtype = mrt.MESSAGE
}
- bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp)
+ bm, err := mrt.NewMRTMessage(uint32(m.Timestamp.Unix()), mrt.BGP4MP, subtype, mp)
if err != nil {
log.WithFields(log.Fields{
"Topic": "mrt",
@@ -75,8 +75,8 @@ func (m *mrtWriter) loop() error {
return bm.Serialize()
}
- drain := func(ev watcherEvent) {
- events := make([]watcherEvent, 0, 1+len(w.Event()))
+ drain := func(ev WatchEvent) {
+ events := make([]WatchEvent, 0, 1+len(w.Event()))
if ev != nil {
events = append(events, ev)
}
diff --git a/server/server.go b/server/server.go
index 37ea1f8d..dc8ab200 100644
--- a/server/server.go
+++ b/server/server.go
@@ -103,7 +103,7 @@ type BgpServer struct {
globalRib *table.TableManager
roaManager *roaManager
shutdown bool
- watcherMap map[watchType][]*Watcher
+ watcherMap map[WatchEventType][]*Watcher
zclient *zebraClient
bmpManager *bmpClientManager
mrt *mrtWriter
@@ -117,7 +117,7 @@ func NewBgpServer() *BgpServer {
policy: table.NewRoutingPolicy(),
roaManager: roaManager,
mgmtCh: make(chan func(), 1),
- watcherMap: make(map[watchType][]*Watcher),
+ watcherMap: make(map[WatchEventType][]*Watcher),
}
s.bmpManager = newBmpClientManager(s)
return s
@@ -381,7 +381,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
for i, pathList := range multipath {
clonedMpath[i] = clonePathList(pathList)
}
- server.notifyWatcher(WATCH_TYPE_BESTPATH, &watcherEventBestPathMsg{pathList: clonePathList(best[table.GLOBAL_RIB_NAME]), multiPathList: clonedMpath})
+ server.notifyWatcher(WATCH_EVENT_TYPE_BEST_PATH, &WatchEventBestPath{PathList: clonePathList(best[table.GLOBAL_RIB_NAME]), MultiPathList: clonedMpath})
}
for _, targetPeer := range server.neighborMap {
@@ -395,31 +395,31 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
}
}
-func createWatcherEventStateChange(peer *Peer) *watcherEventStateChangedMsg {
+func createWatchEventPeerState(peer *Peer) *WatchEventPeerState {
_, rport := peer.fsm.RemoteHostPort()
laddr, lport := peer.fsm.LocalHostPort()
sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
recvOpen := peer.fsm.recvOpen
- return &watcherEventStateChangedMsg{
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(laddr),
- peerPort: rport,
- localPort: lport,
- peerID: peer.fsm.peerInfo.ID,
- sentOpen: sentOpen,
- recvOpen: recvOpen,
- state: peer.fsm.state,
- adminState: peer.fsm.adminState,
- timestamp: time.Now(),
+ return &WatchEventPeerState{
+ PeerAS: peer.fsm.peerInfo.AS,
+ LocalAS: peer.fsm.peerInfo.LocalAS,
+ PeerAddress: peer.fsm.peerInfo.Address,
+ LocalAddress: net.ParseIP(laddr),
+ PeerPort: rport,
+ LocalPort: lport,
+ PeerID: peer.fsm.peerInfo.ID,
+ SentOpen: sentOpen,
+ RecvOpen: recvOpen,
+ State: peer.fsm.state,
+ AdminState: peer.fsm.adminState,
+ Timestamp: time.Now(),
}
}
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
newState := peer.fsm.state
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
- server.notifyWatcher(WATCH_TYPE_PEER_STATE, createWatcherEventStateChange(peer))
+ server.notifyWatcher(WATCH_EVENT_TYPE_PEER_STATE, createWatchEventPeerState(peer))
}
}
@@ -534,7 +534,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []*
for i, pathList := range multipath {
clonedMpath[i] = clonePathList(pathList)
}
- server.notifyWatcher(WATCH_TYPE_BESTPATH, &watcherEventBestPathMsg{pathList: clonePathList(best[table.GLOBAL_RIB_NAME]), multiPathList: clonedMpath})
+ server.notifyWatcher(WATCH_EVENT_TYPE_BEST_PATH, &WatchEventBestPath{PathList: clonePathList(best[table.GLOBAL_RIB_NAME]), MultiPathList: clonedMpath})
}
@@ -678,46 +678,46 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
sendFsmOutgoingMsg(peer, nil, notification, true)
return
}
- if m.Header.Type == bgp.BGP_MSG_UPDATE && server.isWatched(WATCH_TYPE_PRE_UPDATE) {
+ if m.Header.Type == bgp.BGP_MSG_UPDATE && server.isWatched(WATCH_EVENT_TYPE_PRE_UPDATE) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- ev := &watcherEventUpdateMsg{
- message: m,
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(l),
- peerID: peer.fsm.peerInfo.ID,
- fourBytesAs: y,
- timestamp: e.timestamp,
- payload: e.payload,
- postPolicy: false,
- pathList: clonePathList(pathList),
+ ev := &WatchEventUpdate{
+ Message: m,
+ PeerAS: peer.fsm.peerInfo.AS,
+ LocalAS: peer.fsm.peerInfo.LocalAS,
+ PeerAddress: peer.fsm.peerInfo.Address,
+ LocalAddress: net.ParseIP(l),
+ PeerID: peer.fsm.peerInfo.ID,
+ FourBytesAs: y,
+ Timestamp: e.timestamp,
+ Payload: e.payload,
+ PostPolicy: false,
+ PathList: clonePathList(pathList),
}
- server.notifyWatcher(WATCH_TYPE_PRE_UPDATE, ev)
+ server.notifyWatcher(WATCH_EVENT_TYPE_PRE_UPDATE, ev)
}
if len(pathList) > 0 {
var altered []*table.Path
altered = server.propagateUpdate(peer, pathList)
- if server.isWatched(WATCH_TYPE_POST_UPDATE) {
+ if server.isWatched(WATCH_EVENT_TYPE_POST_UPDATE) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- ev := &watcherEventUpdateMsg{
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(l),
- peerID: peer.fsm.peerInfo.ID,
- fourBytesAs: y,
- timestamp: e.timestamp,
- postPolicy: true,
- pathList: clonePathList(altered),
+ ev := &WatchEventUpdate{
+ PeerAS: peer.fsm.peerInfo.AS,
+ LocalAS: peer.fsm.peerInfo.LocalAS,
+ PeerAddress: peer.fsm.peerInfo.Address,
+ LocalAddress: net.ParseIP(l),
+ PeerID: peer.fsm.peerInfo.ID,
+ FourBytesAs: y,
+ Timestamp: e.timestamp,
+ PostPolicy: true,
+ PathList: clonePathList(altered),
}
for _, u := range table.CreateUpdateMsgFromPaths(altered) {
payload, _ := u.Serialize()
- ev.payload = payload
- server.notifyWatcher(WATCH_TYPE_POST_UPDATE, ev)
+ ev.Payload = payload
+ server.notifyWatcher(WATCH_EVENT_TYPE_POST_UPDATE, ev)
}
}
}
@@ -2653,15 +2653,56 @@ func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) {
}
}
-type watchType string
+type WatchEventType string
const (
- WATCH_TYPE_BESTPATH watchType = "bestpath"
- WATCH_TYPE_PRE_UPDATE watchType = "preupdate"
- WATCH_TYPE_POST_UPDATE watchType = "postupdate"
- WATCH_TYPE_PEER_STATE watchType = "peerstate"
+ WATCH_EVENT_TYPE_BEST_PATH WatchEventType = "bestpath"
+ WATCH_EVENT_TYPE_PRE_UPDATE WatchEventType = "preupdate"
+ WATCH_EVENT_TYPE_POST_UPDATE WatchEventType = "postupdate"
+ WATCH_EVENT_TYPE_PEER_STATE WatchEventType = "peerstate"
)
+type WatchEvent interface {
+}
+
+type WatchEventUpdate struct {
+ Message *bgp.BGPMessage
+ PeerAS uint32
+ LocalAS uint32
+ PeerAddress net.IP
+ LocalAddress net.IP
+ PeerID net.IP
+ FourBytesAs bool
+ Timestamp time.Time
+ Payload []byte
+ PostPolicy bool
+ PathList []*table.Path
+}
+
+type WatchEventPeerState struct {
+ PeerAS uint32
+ LocalAS uint32
+ PeerAddress net.IP
+ LocalAddress net.IP
+ PeerPort uint16
+ LocalPort uint16
+ PeerID net.IP
+ SentOpen *bgp.BGPMessage
+ RecvOpen *bgp.BGPMessage
+ State bgp.FSMState
+ AdminState AdminState
+ Timestamp time.Time
+}
+
+type WatchEventAdjIn struct {
+ PathList []*table.Path
+}
+
+type WatchEventBestPath struct {
+ PathList []*table.Path
+ MultiPathList [][]*table.Path
+}
+
type watchOptions struct {
bestpath bool
preUpdate bool
@@ -2709,16 +2750,16 @@ func WatchPeerState(current bool) WatchOption {
type Watcher struct {
opts watchOptions
- realCh chan watcherEvent
+ realCh chan WatchEvent
ch *channels.InfiniteChannel
s *BgpServer
}
-func (w *Watcher) Event() <-chan watcherEvent {
+func (w *Watcher) Event() <-chan WatchEvent {
return w.realCh
}
-func (w *Watcher) Generate(t watchType) (err error) {
+func (w *Watcher) Generate(t WatchEventType) (err error) {
ch := make(chan struct{})
defer func() { <-ch }()
@@ -2726,7 +2767,7 @@ func (w *Watcher) Generate(t watchType) (err error) {
defer close(ch)
switch t {
- case WATCH_TYPE_PRE_UPDATE:
+ case WATCH_EVENT_TYPE_PRE_UPDATE:
default:
err = fmt.Errorf("unsupported type ", t)
return
@@ -2735,12 +2776,12 @@ func (w *Watcher) Generate(t watchType) (err error) {
for _, peer := range w.s.neighborMap {
pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...)
}
- w.notify(&watcherEventAdjInMsg{pathList: clonePathList(pathList)})
+ w.notify(&WatchEventAdjIn{PathList: clonePathList(pathList)})
}
return err
}
-func (w *Watcher) notify(v watcherEvent) {
+func (w *Watcher) notify(v WatchEvent) {
w.ch.In() <- v
}
@@ -2752,7 +2793,7 @@ func (w *Watcher) loop() {
close(w.realCh)
return
}
- w.realCh <- ev.(watcherEvent)
+ w.realCh <- ev.(WatchEvent)
}
}
}
@@ -2785,11 +2826,11 @@ func (w *Watcher) Stop() {
}
}
-func (s *BgpServer) isWatched(typ watchType) bool {
+func (s *BgpServer) isWatched(typ WatchEventType) bool {
return len(s.watcherMap[typ]) != 0
}
-func (s *BgpServer) notifyWatcher(typ watchType, ev watcherEvent) {
+func (s *BgpServer) notifyWatcher(typ WatchEventType, ev WatchEvent) {
for _, w := range s.watcherMap[typ] {
w.notify(ev)
}
@@ -2804,7 +2845,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
w = &Watcher{
s: s,
- realCh: make(chan watcherEvent, 8),
+ realCh: make(chan WatchEvent, 8),
ch: channels.NewInfiniteChannel(),
}
@@ -2812,28 +2853,28 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
opt(&w.opts)
}
- register := func(t watchType, w *Watcher) {
+ register := func(t WatchEventType, w *Watcher) {
s.watcherMap[t] = append(s.watcherMap[t], w)
}
if w.opts.bestpath {
- register(WATCH_TYPE_BESTPATH, w)
+ register(WATCH_EVENT_TYPE_BEST_PATH, w)
}
if w.opts.preUpdate {
- register(WATCH_TYPE_PRE_UPDATE, w)
+ register(WATCH_EVENT_TYPE_PRE_UPDATE, w)
}
if w.opts.postUpdate {
- register(WATCH_TYPE_POST_UPDATE, w)
+ register(WATCH_EVENT_TYPE_POST_UPDATE, w)
}
if w.opts.peerState {
- register(WATCH_TYPE_PEER_STATE, w)
+ register(WATCH_EVENT_TYPE_PEER_STATE, w)
}
if w.opts.initPeerState {
for _, peer := range s.neighborMap {
if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
continue
}
- w.notify(createWatcherEventStateChange(peer))
+ w.notify(createWatchEventPeerState(peer))
}
}
if w.opts.initUpdate {
@@ -2846,17 +2887,17 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
buf, _ := msgs[0].Serialize()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- w.notify(&watcherEventUpdateMsg{
- message: msgs[0],
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(l),
- peerID: peer.fsm.peerInfo.ID,
- fourBytesAs: y,
- timestamp: path.GetTimestamp(),
- payload: buf,
- postPolicy: false,
+ w.notify(&WatchEventUpdate{
+ Message: msgs[0],
+ PeerAS: peer.fsm.peerInfo.AS,
+ LocalAS: peer.fsm.peerInfo.LocalAS,
+ PeerAddress: peer.fsm.peerInfo.Address,
+ LocalAddress: net.ParseIP(l),
+ PeerID: peer.fsm.peerInfo.ID,
+ FourBytesAs: y,
+ Timestamp: path.GetTimestamp(),
+ Payload: buf,
+ PostPolicy: false,
})
}
}
@@ -2865,14 +2906,14 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
for _, path := range s.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, s.globalRib.GetRFlist()) {
msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
buf, _ := msgs[0].Serialize()
- w.notify(&watcherEventUpdateMsg{
- peerAS: path.GetSource().AS,
- peerAddress: path.GetSource().Address,
- peerID: path.GetSource().ID,
- message: msgs[0],
- timestamp: path.GetTimestamp(),
- payload: buf,
- postPolicy: true,
+ w.notify(&WatchEventUpdate{
+ PeerAS: path.GetSource().AS,
+ PeerAddress: path.GetSource().Address,
+ PeerID: path.GetSource().ID,
+ Message: msgs[0],
+ Timestamp: path.GetTimestamp(),
+ Payload: buf,
+ PostPolicy: true,
})
}
@@ -2882,44 +2923,3 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
}
return w
}
-
-type watcherEvent interface {
-}
-
-type watcherEventUpdateMsg struct {
- message *bgp.BGPMessage
- peerAS uint32
- localAS uint32
- peerAddress net.IP
- localAddress net.IP
- peerID net.IP
- fourBytesAs bool
- timestamp time.Time
- payload []byte
- postPolicy bool
- pathList []*table.Path
-}
-
-type watcherEventStateChangedMsg struct {
- peerAS uint32
- localAS uint32
- peerAddress net.IP
- localAddress net.IP
- peerPort uint16
- localPort uint16
- peerID net.IP
- sentOpen *bgp.BGPMessage
- recvOpen *bgp.BGPMessage
- state bgp.FSMState
- adminState AdminState
- timestamp time.Time
-}
-
-type watcherEventAdjInMsg struct {
- pathList []*table.Path
-}
-
-type watcherEventBestPathMsg struct {
- pathList []*table.Path
- multiPathList [][]*table.Path
-}
diff --git a/server/zclient.go b/server/zclient.go
index 23b74963..0e20c7f9 100644
--- a/server/zclient.go
+++ b/server/zclient.go
@@ -176,15 +176,15 @@ func (z *zebraClient) loop() {
}
}
case ev := <-w.Event():
- msg := ev.(*watcherEventBestPathMsg)
+ msg := ev.(*WatchEventBestPath)
if table.UseMultiplePaths.Enabled {
- for _, dst := range msg.multiPathList {
+ for _, dst := range msg.MultiPathList {
if m := newIPRouteMessage(dst); m != nil {
z.client.Send(m)
}
}
} else {
- for _, path := range msg.pathList {
+ for _, path := range msg.PathList {
if m := newIPRouteMessage([]*table.Path{path}); m != nil {
z.client.Send(m)
}