diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 22:22:08 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 17:28:38 +0900 |
commit | f097981a33d107a6d31c4da0f25b90957f3d25fe (patch) | |
tree | 25ebf98379bb5eec7131500bb31221698107a04a /server | |
parent | 3343bcec3565da1aa40fa1aa1ecc6feee7e902f6 (diff) |
bmp uses the new Watch API
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r-- | server/bmp.go | 327 | ||||
-rw-r--r-- | server/grpc_server.go | 28 | ||||
-rw-r--r-- | server/server.go | 259 |
3 files changed, 232 insertions, 382 deletions
diff --git a/server/bmp.go b/server/bmp.go index 4542ef2d..0c7bc61d 100644 --- a/server/bmp.go +++ b/server/bmp.go @@ -1,4 +1,4 @@ -// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// Copyright (C) 2015-2016 Nippon Telegraph and Telephone Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,211 +22,114 @@ import ( "github.com/osrg/gobgp/packet/bgp" "github.com/osrg/gobgp/packet/bmp" "github.com/osrg/gobgp/table" - "gopkg.in/tomb.v2" "net" "strconv" "time" ) -type bmpServer struct { - conn *net.TCPConn - host string - typ config.BmpRouteMonitoringPolicyType -} - -type bmpConfig struct { - config config.BmpServerConfig - del bool - errCh chan error -} - -type bmpWatcher struct { - t tomb.Tomb - ch chan watcherEvent - apiCh chan *GrpcRequest - newConnCh chan *net.TCPConn - endCh chan *net.TCPConn - connMap map[string]*bmpServer - ctlCh chan *bmpConfig -} - -func (w *bmpWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE { - return w.ch - } - return nil -} - -func (w *bmpWatcher) stop() { - w.t.Kill(nil) -} - -func (w *bmpWatcher) tryConnect(server *bmpServer) { +func (b *bmpClient) tryConnect() *net.TCPConn { interval := 1 - host := server.host for { - log.Debug("connecting bmp server: ", host) - conn, err := net.Dial("tcp", host) + log.Debug("connecting bmp server: ", b.host) + conn, err := net.Dial("tcp", b.host) if err != nil { + select { + case <-b.dead: + return nil + default: + } time.Sleep(time.Duration(interval) * time.Second) if interval < 30 { interval *= 2 } } else { - log.Info("bmp server is connected, ", host) - w.newConnCh <- conn.(*net.TCPConn) - break + log.Info("bmp server is connected, ", b.host) + return conn.(*net.TCPConn) } } } -func (w *bmpWatcher) loop() error { +func (b *bmpClient) Stop() { + close(b.dead) +} + +func (b *bmpClient) loop() { for { - select { - case <-w.t.Dying(): - for _, server := range w.connMap { - if server.conn != nil { - server.conn.Close() - } + conn := b.tryConnect() + if conn == nil { + break + } + + if func() bool { + ops := []WatchOption{WatchPeerState(true)} + if b.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY { + ops = append(ops, WatchUpdate(true)) + } else if b.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY { + ops = append(ops, WatchPostUpdate(true)) } - return nil - case m := <-w.ctlCh: - c := m.config - if m.del { - host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) - if _, y := w.connMap[host]; !y { - m.errCh <- fmt.Errorf("bmp server %s doesn't exists", host) - continue - } - conn := w.connMap[host].conn - delete(w.connMap, host) - conn.Close() - } else { - host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) - if _, y := w.connMap[host]; y { - m.errCh <- fmt.Errorf("bmp server %s already exists", host) - continue + w := b.s.Watch(ops...) + defer w.Stop() + + write := func(msg *bmp.BMPMessage) error { + buf, _ := msg.Serialize() + _, err := conn.Write(buf) + if err != nil { + log.Warnf("failed to write to bmp server %s", b.host) } - server := &bmpServer{ - host: host, - typ: c.RouteMonitoringPolicy, - } - w.connMap[host] = server - go w.tryConnect(server) - } - m.errCh <- nil - close(m.errCh) - case newConn := <-w.newConnCh: - server, y := w.connMap[newConn.RemoteAddr().String()] - if !y { - log.Warnf("Can't find bmp server %s", newConn.RemoteAddr().String()) - break + return err } - i := bmp.NewBMPInitiation([]bmp.BMPTLV{}) - buf, _ := i.Serialize() - if _, err := newConn.Write(buf); err != nil { - log.Warnf("failed to write to bmp server %s", server.host) - go w.tryConnect(server) - break - } - req := &GrpcRequest{ - RequestType: REQ_BMP_NEIGHBORS, - ResponseCh: make(chan *GrpcResponse, 1), + + if err := write(bmp.NewBMPInitiation([]bmp.BMPTLV{})); err != nil { + return false } - w.apiCh <- req - write := func(req *GrpcRequest) error { - for res := range req.ResponseCh { - for _, msg := range res.Data.([]*bmp.BMPMessage) { - buf, _ = msg.Serialize() - if _, err := newConn.Write(buf); err != nil { - log.Warnf("failed to write to bmp server %s %s", server.host, err) - go w.tryConnect(server) - return err + + for { + select { + case ev := <-w.Event(): + switch msg := ev.(type) { + case *watcherEventUpdateMsg: + info := &table.PeerInfo{ + Address: msg.peerAddress, + AS: msg.peerAS, + ID: msg.peerID, } - } - } - return nil - } - if err := write(req); err != nil { - break - } - if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY { - req = &GrpcRequest{ - RequestType: REQ_BMP_ADJ_IN, - ResponseCh: make(chan *GrpcResponse, 1), - } - w.apiCh <- req - if err := write(req); err != nil { - break - } - } - if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY { - req = &GrpcRequest{ - RequestType: REQ_BMP_GLOBAL, - ResponseCh: make(chan *GrpcResponse, 1), - } - w.apiCh <- req - if err := write(req); err != nil { - break - } - } - server.conn = newConn - case ev := <-w.ch: - switch msg := ev.(type) { - case *watcherEventUpdateMsg: - info := &table.PeerInfo{ - Address: msg.peerAddress, - AS: msg.peerAS, - ID: msg.peerID, - } - buf, _ := bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload).Serialize() - for _, server := range w.connMap { - if server.conn != nil { - send := server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY && !msg.postPolicy - send = send || (server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY && msg.postPolicy) - if send { - _, err := server.conn.Write(buf) - if err != nil { - log.Warnf("failed to write to bmp server %s", server.host) - } + if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload)); err != nil { + return false } - } - } - case *watcherEventStateChangedMsg: - var bmpmsg *bmp.BMPMessage - info := &table.PeerInfo{ - Address: msg.peerAddress, - AS: msg.peerAS, - ID: msg.peerID, - } - if msg.state == bgp.BGP_FSM_ESTABLISHED { - bmpmsg = bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix()) - } else { - bmpmsg = bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix()) - } - buf, _ := bmpmsg.Serialize() - for _, server := range w.connMap { - if server.conn != nil { - _, err := server.conn.Write(buf) - if err != nil { - log.Warnf("failed to write to bmp server %s", server.host) + case *watcherEventStateChangedMsg: + info := &table.PeerInfo{ + Address: msg.peerAddress, + AS: msg.peerAS, + ID: msg.peerID, + } + if msg.state == bgp.BGP_FSM_ESTABLISHED { + if err := write(bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil { + return false + } + } else { + if err := write(bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil { + return false + } } } + case <-b.dead: + conn.Close() + return true } - default: - log.Warnf("unknown watcher event") - } - case conn := <-w.endCh: - host := conn.RemoteAddr().String() - log.Debugf("bmp connection to %s killed", host) - if _, y := w.connMap[host]; y { - w.connMap[host].conn = nil - go w.tryConnect(w.connMap[host]) } + }() { + return } } } +type bmpClient struct { + s *BgpServer + dead chan struct{} + host string + typ config.BmpRouteMonitoringPolicyType +} + func bmpPeerUp(laddr string, lport, rport uint16, sent, recv *bgp.BGPMessage, t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bmp.BMPMessage { ph := bmp.NewBMPPeerHeader(t, policy, pd, peeri.Address.String(), peeri.AS, peeri.ID.String(), float64(timestamp)) return bmp.NewBMPPeerUpNotification(*ph, laddr, lport, rport, sent, recv) @@ -245,62 +148,40 @@ func bmpPeerRoute(t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timest return m } -func (w *bmpWatcher) addServer(c config.BmpServerConfig) error { - ch := make(chan error) - w.ctlCh <- &bmpConfig{ - config: c, - errCh: ch, +func (b *bmpClientManager) addServer(c *config.BmpServerConfig) error { + host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) + if _, y := b.clientMap[host]; y { + return fmt.Errorf("bmp client %s is already configured", host) } - return <-ch + b.clientMap[host] = &bmpClient{ + s: b.s, + dead: make(chan struct{}), + host: host, + typ: c.RouteMonitoringPolicy, + } + go b.clientMap[host].loop() + return nil } -func (w *bmpWatcher) deleteServer(c config.BmpServerConfig) error { - ch := make(chan error) - w.ctlCh <- &bmpConfig{ - config: c, - del: true, - errCh: ch, +func (b *bmpClientManager) deleteServer(c *config.BmpServerConfig) error { + host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) + if c, y := b.clientMap[host]; !y { + return fmt.Errorf("bmp client %s isn't found", host) + } else { + c.Stop() + delete(b.clientMap, host) } - return <-ch + return nil } -func (w *bmpWatcher) watchingEventTypes() []watcherEventType { - state := false - pre := false - post := false - for _, server := range w.connMap { - if server.conn != nil { - state = true - if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY { - pre = true - } - if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY { - post = true - } - } - } - types := make([]watcherEventType, 0, 3) - if state { - types = append(types, WATCHER_EVENT_STATE_CHANGE) - } - if pre { - types = append(types, WATCHER_EVENT_UPDATE_MSG) - } - if post { - types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG) - } - return types +type bmpClientManager struct { + s *BgpServer + clientMap map[string]*bmpClient } -func newBmpWatcher(grpcCh chan *GrpcRequest) (*bmpWatcher, error) { - w := &bmpWatcher{ - ch: make(chan watcherEvent), - apiCh: grpcCh, - newConnCh: make(chan *net.TCPConn), - endCh: make(chan *net.TCPConn), - connMap: make(map[string]*bmpServer), - ctlCh: make(chan *bmpConfig), +func newBmpClientManager(s *BgpServer) *bmpClientManager { + return &bmpClientManager{ + s: s, + clientMap: make(map[string]*bmpClient), } - w.t.Go(w.loop) - return w, nil } diff --git a/server/grpc_server.go b/server/grpc_server.go index ebe8e972..27d0a6d1 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -93,9 +93,6 @@ const ( REQ_ADD_POLICY_ASSIGNMENT REQ_DELETE_POLICY_ASSIGNMENT REQ_REPLACE_POLICY_ASSIGNMENT - REQ_BMP_NEIGHBORS - REQ_BMP_GLOBAL - REQ_BMP_ADJ_IN REQ_DEFERRAL_TIMER_EXPIRED REQ_RELOAD_POLICY REQ_INITIALIZE_ZEBRA @@ -348,9 +345,9 @@ func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer return s.bgpServer.Watch(WatchBestPath()), nil case api.Resource_ADJ_IN: if arg.PostPolicy { - return s.bgpServer.Watch(WatchPostUpdate()), nil + return s.bgpServer.Watch(WatchPostUpdate(false)), nil } - return s.bgpServer.Watch(WatchUpdate()), nil + return s.bgpServer.Watch(WatchUpdate(false)), nil default: return nil, fmt.Errorf("unsupported resource type: %v", arg.Type) } @@ -414,7 +411,7 @@ func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_MonitorPeerStateServer) error { return func() error { - w := s.bgpServer.Watch(WatchPeerState()) + w := s.bgpServer.Watch(WatchPeerState(false)) defer func() { w.Stop() }() for { @@ -657,19 +654,18 @@ func (s *Server) InjectMrt(stream api.GobgpApi_InjectMrtServer) error { } func (s *Server) AddBmp(ctx context.Context, arg *api.AddBmpRequest) (*api.AddBmpResponse, error) { - d, err := s.get(REQ_ADD_BMP, arg) - if err != nil { - return nil, err - } - return d.(*api.AddBmpResponse), err + return &api.AddBmpResponse{}, s.bgpServer.AddBmp(&config.BmpServerConfig{ + Address: arg.Address, + Port: arg.Port, + RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type), + }) } func (s *Server) DeleteBmp(ctx context.Context, arg *api.DeleteBmpRequest) (*api.DeleteBmpResponse, error) { - d, err := s.get(REQ_DELETE_BMP, arg) - if err != nil { - return nil, err - } - return d.(*api.DeleteBmpResponse), err + return &api.DeleteBmpResponse{}, s.bgpServer.DeleteBmp(&config.BmpServerConfig{ + Address: arg.Address, + Port: arg.Port, + }) } func (s *Server) ValidateRib(ctx context.Context, arg *api.ValidateRibRequest) (*api.ValidateRibResponse, error) { diff --git a/server/server.go b/server/server.go index 4cd71ad6..0aeead21 100644 --- a/server/server.go +++ b/server/server.go @@ -30,7 +30,6 @@ import ( api "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet/bgp" - "github.com/osrg/gobgp/packet/bmp" "github.com/osrg/gobgp/table" "github.com/satori/go.uuid" ) @@ -107,11 +106,12 @@ type BgpServer struct { watchers *watcherManager watcherMap map[watchType][]*Watcher zclient *zebraClient + bmpManager *bmpClientManager } func NewBgpServer() *BgpServer { roaManager, _ := NewROAManager(0) - return &BgpServer{ + s := &BgpServer{ GrpcReqCh: make(chan *GrpcRequest, 1), neighborMap: make(map[string]*Peer), policy: table.NewRoutingPolicy(), @@ -120,6 +120,8 @@ func NewBgpServer() *BgpServer { mgmtCh: make(chan func(), 1), watcherMap: make(map[watchType][]*Watcher), } + s.bmpManager = newBmpClientManager(s) + return s } func (server *BgpServer) Listeners(addr string) []*net.TCPListener { @@ -383,27 +385,31 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil } } +func createWatcherEventStateChange(peer *Peer) *watcherEventStateChangedMsg { + _, rport := peer.fsm.RemoteHostPort() + laddr, lport := peer.fsm.LocalHostPort() + sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) + recvOpen := peer.fsm.recvOpen + return &watcherEventStateChangedMsg{ + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(laddr), + peerPort: rport, + localPort: lport, + peerID: peer.fsm.peerInfo.ID, + sentOpen: sentOpen, + recvOpen: recvOpen, + state: peer.fsm.state, + adminState: peer.fsm.adminState, + timestamp: time.Now(), + } +} + func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { newState := peer.fsm.state if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { - _, rport := peer.fsm.RemoteHostPort() - laddr, lport := peer.fsm.LocalHostPort() - sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) - recvOpen := peer.fsm.recvOpen - ev := &watcherEventStateChangedMsg{ - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(laddr), - peerPort: rport, - localPort: lport, - peerID: peer.fsm.peerInfo.ID, - sentOpen: sentOpen, - recvOpen: recvOpen, - state: newState, - adminState: peer.fsm.adminState, - timestamp: time.Now(), - } + ev := createWatcherEventStateChange(peer) if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) { server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev) } @@ -892,19 +898,28 @@ func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) error { return nil } -func (server *BgpServer) SetBmpConfig(c []config.BmpServer) error { - for _, s := range c { - ch := make(chan *GrpcResponse) - server.GrpcReqCh <- &GrpcRequest{ - RequestType: REQ_ADD_BMP, - Data: &s.Config, - ResponseCh: ch, - } - if err := (<-ch).Err(); err != nil { - return err - } +func (s *BgpServer) AddBmp(c *config.BmpServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.bmpManager.addServer(c) } - return nil + return err +} + +func (s *BgpServer) DeleteBmp(c *config.BmpServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.bmpManager.deleteServer(c) + } + return err } func (server *BgpServer) SetMrtConfig(c []config.Mrt) error { @@ -1512,37 +1527,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { Data: dsts, } close(grpcReq.ResponseCh) - case REQ_BMP_GLOBAL: - paths := server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist()) - bmpmsgs := make([]*bmp.BMPMessage, 0, len(paths)) - for _, path := range paths { - msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) - buf, _ := msgs[0].Serialize() - bmpmsgs = append(bmpmsgs, bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, true, 0, path.GetSource(), path.GetTimestamp().Unix(), buf)) - } - grpcReq.ResponseCh <- &GrpcResponse{ - Data: bmpmsgs, - } - close(grpcReq.ResponseCh) - case REQ_BMP_NEIGHBORS: - //TODO: merge REQ_NEIGHBORS and REQ_BMP_NEIGHBORS - msgs := make([]*bmp.BMPMessage, 0, len(server.neighborMap)) - for _, peer := range server.neighborMap { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { - continue - } - laddr, lport := peer.fsm.LocalHostPort() - _, rport := peer.fsm.RemoteHostPort() - sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) - info := peer.fsm.peerInfo - timestamp := peer.fsm.pConf.Timers.State.Uptime - msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.fsm.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp) - msgs = append(msgs, msg) - } - grpcReq.ResponseCh <- &GrpcResponse{ - Data: msgs, - } - close(grpcReq.ResponseCh) case REQ_NEIGHBOR: l := make([]*config.Neighbor, 0) for _, peer := range server.neighborMap { @@ -1613,22 +1597,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { Data: paths, } close(grpcReq.ResponseCh) - case REQ_BMP_ADJ_IN: - bmpmsgs := make([]*bmp.BMPMessage, 0) - for _, peer := range server.neighborMap { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { - continue - } - for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) { - msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) - buf, _ := msgs[0].Serialize() - bmpmsgs = append(bmpmsgs, bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, path.GetTimestamp().Unix(), buf)) - } - } - grpcReq.ResponseCh <- &GrpcResponse{ - Data: bmpmsgs, - } - close(grpcReq.ResponseCh) case REQ_NEIGHBOR_SHUTDOWN: peers, err := reqToPeers(grpcReq) if err != nil { @@ -1806,10 +1774,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { server.handleEnableMrtRequest(grpcReq) case REQ_DISABLE_MRT: server.handleDisableMrtRequest(grpcReq) - case REQ_ADD_BMP: - server.handleAddBmp(grpcReq) - case REQ_DELETE_BMP: - server.handleDeleteBmp(grpcReq) case REQ_VALIDATE_RIB: server.handleValidateRib(grpcReq) case REQ_INITIALIZE_RPKI: @@ -2686,57 +2650,6 @@ func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) { close(grpcReq.ResponseCh) } -func (server *BgpServer) handleAddBmp(grpcReq *GrpcRequest) { - var c *config.BmpServerConfig - switch arg := grpcReq.Data.(type) { - case *api.AddBmpRequest: - c = &config.BmpServerConfig{ - Address: arg.Address, - Port: arg.Port, - RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type), - } - case *config.BmpServerConfig: - c = arg - } - - w, y := server.watchers.watcher(WATCHER_BMP) - if !y { - w, _ = newBmpWatcher(server.GrpcReqCh) - server.watchers.addWatcher(WATCHER_BMP, w) - } - - err := w.(*bmpWatcher).addServer(*c) - grpcReq.ResponseCh <- &GrpcResponse{ - ResponseErr: err, - Data: &api.AddBmpResponse{}, - } - close(grpcReq.ResponseCh) -} - -func (server *BgpServer) handleDeleteBmp(grpcReq *GrpcRequest) { - var c *config.BmpServerConfig - switch arg := grpcReq.Data.(type) { - case *api.DeleteBmpRequest: - c = &config.BmpServerConfig{ - Address: arg.Address, - Port: arg.Port, - } - case *config.BmpServerConfig: - c = arg - } - - if w, y := server.watchers.watcher(WATCHER_BMP); y { - err := w.(*bmpWatcher).deleteServer(*c) - grpcReq.ResponseCh <- &GrpcResponse{ - ResponseErr: err, - Data: &api.DeleteBmpResponse{}, - } - close(grpcReq.ResponseCh) - } else { - grpcDone(grpcReq, fmt.Errorf("bmp not configured")) - } -} - func (server *BgpServer) handleValidateRib(grpcReq *GrpcRequest) { arg := grpcReq.Data.(*api.ValidateRibRequest) for _, rf := range server.globalRib.GetRFlist() { @@ -2796,10 +2709,13 @@ const ( ) type watchOptions struct { - bestpath bool - preUpdate bool - postUpdate bool - peerState bool + bestpath bool + preUpdate bool + postUpdate bool + peerState bool + initUpdate bool + initPostUpdate bool + initPeerState bool } type WatchOption func(*watchOptions) @@ -2810,21 +2726,30 @@ func WatchBestPath() WatchOption { } } -func WatchUpdate() WatchOption { +func WatchUpdate(current bool) WatchOption { return func(o *watchOptions) { o.preUpdate = true + if current { + o.initUpdate = true + } } } -func WatchPostUpdate() WatchOption { +func WatchPostUpdate(current bool) WatchOption { return func(o *watchOptions) { o.postUpdate = true + if current { + o.initPostUpdate = true + } } } -func WatchPeerState() WatchOption { +func WatchPeerState(current bool) WatchOption { return func(o *watchOptions) { o.peerState = true + if current { + o.initPeerState = true + } } } @@ -2859,10 +2784,8 @@ func (w *Watcher) loop() { func (w *Watcher) Stop() { ch := make(chan struct{}) defer func() { <-ch }() - w.s.mgmtCh <- func() { defer close(ch) - for k, l := range w.s.watcherMap { for i, v := range l { if w == v { @@ -2919,6 +2842,56 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { if w.opts.peerState { register(WATCH_TYPE_PEER_STATE, w) } + if w.opts.initPeerState { + for _, peer := range s.neighborMap { + if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + w.notify(createWatcherEventStateChange(peer)) + } + } + if w.opts.initUpdate { + for _, peer := range s.neighborMap { + if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) { + msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) + buf, _ := msgs[0].Serialize() + _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] + l, _ := peer.fsm.LocalHostPort() + w.notify(&watcherEventUpdateMsg{ + message: msgs[0], + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(l), + peerID: peer.fsm.peerInfo.ID, + fourBytesAs: y, + timestamp: path.GetTimestamp(), + payload: buf, + postPolicy: false, + }) + } + } + } + if w.opts.postUpdate { + for _, path := range s.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, s.globalRib.GetRFlist()) { + msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) + buf, _ := msgs[0].Serialize() + w.notify(&watcherEventUpdateMsg{ + peerAS: path.GetSource().AS, + peerAddress: path.GetSource().Address, + peerID: path.GetSource().ID, + message: msgs[0], + timestamp: path.GetTimestamp(), + payload: buf, + postPolicy: true, + }) + } + + } + go w.loop() } return w |