summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-22 22:22:08 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-22 17:28:38 +0900
commitf097981a33d107a6d31c4da0f25b90957f3d25fe (patch)
tree25ebf98379bb5eec7131500bb31221698107a04a /server
parent3343bcec3565da1aa40fa1aa1ecc6feee7e902f6 (diff)
bmp uses the new Watch API
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r--server/bmp.go327
-rw-r--r--server/grpc_server.go28
-rw-r--r--server/server.go259
3 files changed, 232 insertions, 382 deletions
diff --git a/server/bmp.go b/server/bmp.go
index 4542ef2d..0c7bc61d 100644
--- a/server/bmp.go
+++ b/server/bmp.go
@@ -1,4 +1,4 @@
-// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
+// Copyright (C) 2015-2016 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -22,211 +22,114 @@ import (
"github.com/osrg/gobgp/packet/bgp"
"github.com/osrg/gobgp/packet/bmp"
"github.com/osrg/gobgp/table"
- "gopkg.in/tomb.v2"
"net"
"strconv"
"time"
)
-type bmpServer struct {
- conn *net.TCPConn
- host string
- typ config.BmpRouteMonitoringPolicyType
-}
-
-type bmpConfig struct {
- config config.BmpServerConfig
- del bool
- errCh chan error
-}
-
-type bmpWatcher struct {
- t tomb.Tomb
- ch chan watcherEvent
- apiCh chan *GrpcRequest
- newConnCh chan *net.TCPConn
- endCh chan *net.TCPConn
- connMap map[string]*bmpServer
- ctlCh chan *bmpConfig
-}
-
-func (w *bmpWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE {
- return w.ch
- }
- return nil
-}
-
-func (w *bmpWatcher) stop() {
- w.t.Kill(nil)
-}
-
-func (w *bmpWatcher) tryConnect(server *bmpServer) {
+func (b *bmpClient) tryConnect() *net.TCPConn {
interval := 1
- host := server.host
for {
- log.Debug("connecting bmp server: ", host)
- conn, err := net.Dial("tcp", host)
+ log.Debug("connecting bmp server: ", b.host)
+ conn, err := net.Dial("tcp", b.host)
if err != nil {
+ select {
+ case <-b.dead:
+ return nil
+ default:
+ }
time.Sleep(time.Duration(interval) * time.Second)
if interval < 30 {
interval *= 2
}
} else {
- log.Info("bmp server is connected, ", host)
- w.newConnCh <- conn.(*net.TCPConn)
- break
+ log.Info("bmp server is connected, ", b.host)
+ return conn.(*net.TCPConn)
}
}
}
-func (w *bmpWatcher) loop() error {
+func (b *bmpClient) Stop() {
+ close(b.dead)
+}
+
+func (b *bmpClient) loop() {
for {
- select {
- case <-w.t.Dying():
- for _, server := range w.connMap {
- if server.conn != nil {
- server.conn.Close()
- }
+ conn := b.tryConnect()
+ if conn == nil {
+ break
+ }
+
+ if func() bool {
+ ops := []WatchOption{WatchPeerState(true)}
+ if b.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY {
+ ops = append(ops, WatchUpdate(true))
+ } else if b.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY {
+ ops = append(ops, WatchPostUpdate(true))
}
- return nil
- case m := <-w.ctlCh:
- c := m.config
- if m.del {
- host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
- if _, y := w.connMap[host]; !y {
- m.errCh <- fmt.Errorf("bmp server %s doesn't exists", host)
- continue
- }
- conn := w.connMap[host].conn
- delete(w.connMap, host)
- conn.Close()
- } else {
- host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
- if _, y := w.connMap[host]; y {
- m.errCh <- fmt.Errorf("bmp server %s already exists", host)
- continue
+ w := b.s.Watch(ops...)
+ defer w.Stop()
+
+ write := func(msg *bmp.BMPMessage) error {
+ buf, _ := msg.Serialize()
+ _, err := conn.Write(buf)
+ if err != nil {
+ log.Warnf("failed to write to bmp server %s", b.host)
}
- server := &bmpServer{
- host: host,
- typ: c.RouteMonitoringPolicy,
- }
- w.connMap[host] = server
- go w.tryConnect(server)
- }
- m.errCh <- nil
- close(m.errCh)
- case newConn := <-w.newConnCh:
- server, y := w.connMap[newConn.RemoteAddr().String()]
- if !y {
- log.Warnf("Can't find bmp server %s", newConn.RemoteAddr().String())
- break
+ return err
}
- i := bmp.NewBMPInitiation([]bmp.BMPTLV{})
- buf, _ := i.Serialize()
- if _, err := newConn.Write(buf); err != nil {
- log.Warnf("failed to write to bmp server %s", server.host)
- go w.tryConnect(server)
- break
- }
- req := &GrpcRequest{
- RequestType: REQ_BMP_NEIGHBORS,
- ResponseCh: make(chan *GrpcResponse, 1),
+
+ if err := write(bmp.NewBMPInitiation([]bmp.BMPTLV{})); err != nil {
+ return false
}
- w.apiCh <- req
- write := func(req *GrpcRequest) error {
- for res := range req.ResponseCh {
- for _, msg := range res.Data.([]*bmp.BMPMessage) {
- buf, _ = msg.Serialize()
- if _, err := newConn.Write(buf); err != nil {
- log.Warnf("failed to write to bmp server %s %s", server.host, err)
- go w.tryConnect(server)
- return err
+
+ for {
+ select {
+ case ev := <-w.Event():
+ switch msg := ev.(type) {
+ case *watcherEventUpdateMsg:
+ info := &table.PeerInfo{
+ Address: msg.peerAddress,
+ AS: msg.peerAS,
+ ID: msg.peerID,
}
- }
- }
- return nil
- }
- if err := write(req); err != nil {
- break
- }
- if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY {
- req = &GrpcRequest{
- RequestType: REQ_BMP_ADJ_IN,
- ResponseCh: make(chan *GrpcResponse, 1),
- }
- w.apiCh <- req
- if err := write(req); err != nil {
- break
- }
- }
- if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY {
- req = &GrpcRequest{
- RequestType: REQ_BMP_GLOBAL,
- ResponseCh: make(chan *GrpcResponse, 1),
- }
- w.apiCh <- req
- if err := write(req); err != nil {
- break
- }
- }
- server.conn = newConn
- case ev := <-w.ch:
- switch msg := ev.(type) {
- case *watcherEventUpdateMsg:
- info := &table.PeerInfo{
- Address: msg.peerAddress,
- AS: msg.peerAS,
- ID: msg.peerID,
- }
- buf, _ := bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload).Serialize()
- for _, server := range w.connMap {
- if server.conn != nil {
- send := server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY && !msg.postPolicy
- send = send || (server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY && msg.postPolicy)
- if send {
- _, err := server.conn.Write(buf)
- if err != nil {
- log.Warnf("failed to write to bmp server %s", server.host)
- }
+ if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload)); err != nil {
+ return false
}
- }
- }
- case *watcherEventStateChangedMsg:
- var bmpmsg *bmp.BMPMessage
- info := &table.PeerInfo{
- Address: msg.peerAddress,
- AS: msg.peerAS,
- ID: msg.peerID,
- }
- if msg.state == bgp.BGP_FSM_ESTABLISHED {
- bmpmsg = bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())
- } else {
- bmpmsg = bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())
- }
- buf, _ := bmpmsg.Serialize()
- for _, server := range w.connMap {
- if server.conn != nil {
- _, err := server.conn.Write(buf)
- if err != nil {
- log.Warnf("failed to write to bmp server %s", server.host)
+ case *watcherEventStateChangedMsg:
+ info := &table.PeerInfo{
+ Address: msg.peerAddress,
+ AS: msg.peerAS,
+ ID: msg.peerID,
+ }
+ if msg.state == bgp.BGP_FSM_ESTABLISHED {
+ if err := write(bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil {
+ return false
+ }
+ } else {
+ if err := write(bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil {
+ return false
+ }
}
}
+ case <-b.dead:
+ conn.Close()
+ return true
}
- default:
- log.Warnf("unknown watcher event")
- }
- case conn := <-w.endCh:
- host := conn.RemoteAddr().String()
- log.Debugf("bmp connection to %s killed", host)
- if _, y := w.connMap[host]; y {
- w.connMap[host].conn = nil
- go w.tryConnect(w.connMap[host])
}
+ }() {
+ return
}
}
}
+type bmpClient struct {
+ s *BgpServer
+ dead chan struct{}
+ host string
+ typ config.BmpRouteMonitoringPolicyType
+}
+
func bmpPeerUp(laddr string, lport, rport uint16, sent, recv *bgp.BGPMessage, t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bmp.BMPMessage {
ph := bmp.NewBMPPeerHeader(t, policy, pd, peeri.Address.String(), peeri.AS, peeri.ID.String(), float64(timestamp))
return bmp.NewBMPPeerUpNotification(*ph, laddr, lport, rport, sent, recv)
@@ -245,62 +148,40 @@ func bmpPeerRoute(t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timest
return m
}
-func (w *bmpWatcher) addServer(c config.BmpServerConfig) error {
- ch := make(chan error)
- w.ctlCh <- &bmpConfig{
- config: c,
- errCh: ch,
+func (b *bmpClientManager) addServer(c *config.BmpServerConfig) error {
+ host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
+ if _, y := b.clientMap[host]; y {
+ return fmt.Errorf("bmp client %s is already configured", host)
}
- return <-ch
+ b.clientMap[host] = &bmpClient{
+ s: b.s,
+ dead: make(chan struct{}),
+ host: host,
+ typ: c.RouteMonitoringPolicy,
+ }
+ go b.clientMap[host].loop()
+ return nil
}
-func (w *bmpWatcher) deleteServer(c config.BmpServerConfig) error {
- ch := make(chan error)
- w.ctlCh <- &bmpConfig{
- config: c,
- del: true,
- errCh: ch,
+func (b *bmpClientManager) deleteServer(c *config.BmpServerConfig) error {
+ host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
+ if c, y := b.clientMap[host]; !y {
+ return fmt.Errorf("bmp client %s isn't found", host)
+ } else {
+ c.Stop()
+ delete(b.clientMap, host)
}
- return <-ch
+ return nil
}
-func (w *bmpWatcher) watchingEventTypes() []watcherEventType {
- state := false
- pre := false
- post := false
- for _, server := range w.connMap {
- if server.conn != nil {
- state = true
- if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY {
- pre = true
- }
- if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY {
- post = true
- }
- }
- }
- types := make([]watcherEventType, 0, 3)
- if state {
- types = append(types, WATCHER_EVENT_STATE_CHANGE)
- }
- if pre {
- types = append(types, WATCHER_EVENT_UPDATE_MSG)
- }
- if post {
- types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG)
- }
- return types
+type bmpClientManager struct {
+ s *BgpServer
+ clientMap map[string]*bmpClient
}
-func newBmpWatcher(grpcCh chan *GrpcRequest) (*bmpWatcher, error) {
- w := &bmpWatcher{
- ch: make(chan watcherEvent),
- apiCh: grpcCh,
- newConnCh: make(chan *net.TCPConn),
- endCh: make(chan *net.TCPConn),
- connMap: make(map[string]*bmpServer),
- ctlCh: make(chan *bmpConfig),
+func newBmpClientManager(s *BgpServer) *bmpClientManager {
+ return &bmpClientManager{
+ s: s,
+ clientMap: make(map[string]*bmpClient),
}
- w.t.Go(w.loop)
- return w, nil
}
diff --git a/server/grpc_server.go b/server/grpc_server.go
index ebe8e972..27d0a6d1 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -93,9 +93,6 @@ const (
REQ_ADD_POLICY_ASSIGNMENT
REQ_DELETE_POLICY_ASSIGNMENT
REQ_REPLACE_POLICY_ASSIGNMENT
- REQ_BMP_NEIGHBORS
- REQ_BMP_GLOBAL
- REQ_BMP_ADJ_IN
REQ_DEFERRAL_TIMER_EXPIRED
REQ_RELOAD_POLICY
REQ_INITIALIZE_ZEBRA
@@ -348,9 +345,9 @@ func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer
return s.bgpServer.Watch(WatchBestPath()), nil
case api.Resource_ADJ_IN:
if arg.PostPolicy {
- return s.bgpServer.Watch(WatchPostUpdate()), nil
+ return s.bgpServer.Watch(WatchPostUpdate(false)), nil
}
- return s.bgpServer.Watch(WatchUpdate()), nil
+ return s.bgpServer.Watch(WatchUpdate(false)), nil
default:
return nil, fmt.Errorf("unsupported resource type: %v", arg.Type)
}
@@ -414,7 +411,7 @@ func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer
func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_MonitorPeerStateServer) error {
return func() error {
- w := s.bgpServer.Watch(WatchPeerState())
+ w := s.bgpServer.Watch(WatchPeerState(false))
defer func() { w.Stop() }()
for {
@@ -657,19 +654,18 @@ func (s *Server) InjectMrt(stream api.GobgpApi_InjectMrtServer) error {
}
func (s *Server) AddBmp(ctx context.Context, arg *api.AddBmpRequest) (*api.AddBmpResponse, error) {
- d, err := s.get(REQ_ADD_BMP, arg)
- if err != nil {
- return nil, err
- }
- return d.(*api.AddBmpResponse), err
+ return &api.AddBmpResponse{}, s.bgpServer.AddBmp(&config.BmpServerConfig{
+ Address: arg.Address,
+ Port: arg.Port,
+ RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type),
+ })
}
func (s *Server) DeleteBmp(ctx context.Context, arg *api.DeleteBmpRequest) (*api.DeleteBmpResponse, error) {
- d, err := s.get(REQ_DELETE_BMP, arg)
- if err != nil {
- return nil, err
- }
- return d.(*api.DeleteBmpResponse), err
+ return &api.DeleteBmpResponse{}, s.bgpServer.DeleteBmp(&config.BmpServerConfig{
+ Address: arg.Address,
+ Port: arg.Port,
+ })
}
func (s *Server) ValidateRib(ctx context.Context, arg *api.ValidateRibRequest) (*api.ValidateRibResponse, error) {
diff --git a/server/server.go b/server/server.go
index 4cd71ad6..0aeead21 100644
--- a/server/server.go
+++ b/server/server.go
@@ -30,7 +30,6 @@ import (
api "github.com/osrg/gobgp/api"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet/bgp"
- "github.com/osrg/gobgp/packet/bmp"
"github.com/osrg/gobgp/table"
"github.com/satori/go.uuid"
)
@@ -107,11 +106,12 @@ type BgpServer struct {
watchers *watcherManager
watcherMap map[watchType][]*Watcher
zclient *zebraClient
+ bmpManager *bmpClientManager
}
func NewBgpServer() *BgpServer {
roaManager, _ := NewROAManager(0)
- return &BgpServer{
+ s := &BgpServer{
GrpcReqCh: make(chan *GrpcRequest, 1),
neighborMap: make(map[string]*Peer),
policy: table.NewRoutingPolicy(),
@@ -120,6 +120,8 @@ func NewBgpServer() *BgpServer {
mgmtCh: make(chan func(), 1),
watcherMap: make(map[watchType][]*Watcher),
}
+ s.bmpManager = newBmpClientManager(s)
+ return s
}
func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
@@ -383,27 +385,31 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
}
}
+func createWatcherEventStateChange(peer *Peer) *watcherEventStateChangedMsg {
+ _, rport := peer.fsm.RemoteHostPort()
+ laddr, lport := peer.fsm.LocalHostPort()
+ sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
+ recvOpen := peer.fsm.recvOpen
+ return &watcherEventStateChangedMsg{
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(laddr),
+ peerPort: rport,
+ localPort: lport,
+ peerID: peer.fsm.peerInfo.ID,
+ sentOpen: sentOpen,
+ recvOpen: recvOpen,
+ state: peer.fsm.state,
+ adminState: peer.fsm.adminState,
+ timestamp: time.Now(),
+ }
+}
+
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
newState := peer.fsm.state
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
- _, rport := peer.fsm.RemoteHostPort()
- laddr, lport := peer.fsm.LocalHostPort()
- sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
- recvOpen := peer.fsm.recvOpen
- ev := &watcherEventStateChangedMsg{
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(laddr),
- peerPort: rport,
- localPort: lport,
- peerID: peer.fsm.peerInfo.ID,
- sentOpen: sentOpen,
- recvOpen: recvOpen,
- state: newState,
- adminState: peer.fsm.adminState,
- timestamp: time.Now(),
- }
+ ev := createWatcherEventStateChange(peer)
if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) {
server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev)
}
@@ -892,19 +898,28 @@ func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) error {
return nil
}
-func (server *BgpServer) SetBmpConfig(c []config.BmpServer) error {
- for _, s := range c {
- ch := make(chan *GrpcResponse)
- server.GrpcReqCh <- &GrpcRequest{
- RequestType: REQ_ADD_BMP,
- Data: &s.Config,
- ResponseCh: ch,
- }
- if err := (<-ch).Err(); err != nil {
- return err
- }
+func (s *BgpServer) AddBmp(c *config.BmpServerConfig) (err error) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ s.mgmtCh <- func() {
+ defer close(ch)
+
+ err = s.bmpManager.addServer(c)
}
- return nil
+ return err
+}
+
+func (s *BgpServer) DeleteBmp(c *config.BmpServerConfig) (err error) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ s.mgmtCh <- func() {
+ defer close(ch)
+
+ err = s.bmpManager.deleteServer(c)
+ }
+ return err
}
func (server *BgpServer) SetMrtConfig(c []config.Mrt) error {
@@ -1512,37 +1527,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
Data: dsts,
}
close(grpcReq.ResponseCh)
- case REQ_BMP_GLOBAL:
- paths := server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist())
- bmpmsgs := make([]*bmp.BMPMessage, 0, len(paths))
- for _, path := range paths {
- msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
- buf, _ := msgs[0].Serialize()
- bmpmsgs = append(bmpmsgs, bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, true, 0, path.GetSource(), path.GetTimestamp().Unix(), buf))
- }
- grpcReq.ResponseCh <- &GrpcResponse{
- Data: bmpmsgs,
- }
- close(grpcReq.ResponseCh)
- case REQ_BMP_NEIGHBORS:
- //TODO: merge REQ_NEIGHBORS and REQ_BMP_NEIGHBORS
- msgs := make([]*bmp.BMPMessage, 0, len(server.neighborMap))
- for _, peer := range server.neighborMap {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
- continue
- }
- laddr, lport := peer.fsm.LocalHostPort()
- _, rport := peer.fsm.RemoteHostPort()
- sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
- info := peer.fsm.peerInfo
- timestamp := peer.fsm.pConf.Timers.State.Uptime
- msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.fsm.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp)
- msgs = append(msgs, msg)
- }
- grpcReq.ResponseCh <- &GrpcResponse{
- Data: msgs,
- }
- close(grpcReq.ResponseCh)
case REQ_NEIGHBOR:
l := make([]*config.Neighbor, 0)
for _, peer := range server.neighborMap {
@@ -1613,22 +1597,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
Data: paths,
}
close(grpcReq.ResponseCh)
- case REQ_BMP_ADJ_IN:
- bmpmsgs := make([]*bmp.BMPMessage, 0)
- for _, peer := range server.neighborMap {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
- continue
- }
- for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) {
- msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
- buf, _ := msgs[0].Serialize()
- bmpmsgs = append(bmpmsgs, bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, path.GetTimestamp().Unix(), buf))
- }
- }
- grpcReq.ResponseCh <- &GrpcResponse{
- Data: bmpmsgs,
- }
- close(grpcReq.ResponseCh)
case REQ_NEIGHBOR_SHUTDOWN:
peers, err := reqToPeers(grpcReq)
if err != nil {
@@ -1806,10 +1774,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
server.handleEnableMrtRequest(grpcReq)
case REQ_DISABLE_MRT:
server.handleDisableMrtRequest(grpcReq)
- case REQ_ADD_BMP:
- server.handleAddBmp(grpcReq)
- case REQ_DELETE_BMP:
- server.handleDeleteBmp(grpcReq)
case REQ_VALIDATE_RIB:
server.handleValidateRib(grpcReq)
case REQ_INITIALIZE_RPKI:
@@ -2686,57 +2650,6 @@ func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) {
close(grpcReq.ResponseCh)
}
-func (server *BgpServer) handleAddBmp(grpcReq *GrpcRequest) {
- var c *config.BmpServerConfig
- switch arg := grpcReq.Data.(type) {
- case *api.AddBmpRequest:
- c = &config.BmpServerConfig{
- Address: arg.Address,
- Port: arg.Port,
- RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type),
- }
- case *config.BmpServerConfig:
- c = arg
- }
-
- w, y := server.watchers.watcher(WATCHER_BMP)
- if !y {
- w, _ = newBmpWatcher(server.GrpcReqCh)
- server.watchers.addWatcher(WATCHER_BMP, w)
- }
-
- err := w.(*bmpWatcher).addServer(*c)
- grpcReq.ResponseCh <- &GrpcResponse{
- ResponseErr: err,
- Data: &api.AddBmpResponse{},
- }
- close(grpcReq.ResponseCh)
-}
-
-func (server *BgpServer) handleDeleteBmp(grpcReq *GrpcRequest) {
- var c *config.BmpServerConfig
- switch arg := grpcReq.Data.(type) {
- case *api.DeleteBmpRequest:
- c = &config.BmpServerConfig{
- Address: arg.Address,
- Port: arg.Port,
- }
- case *config.BmpServerConfig:
- c = arg
- }
-
- if w, y := server.watchers.watcher(WATCHER_BMP); y {
- err := w.(*bmpWatcher).deleteServer(*c)
- grpcReq.ResponseCh <- &GrpcResponse{
- ResponseErr: err,
- Data: &api.DeleteBmpResponse{},
- }
- close(grpcReq.ResponseCh)
- } else {
- grpcDone(grpcReq, fmt.Errorf("bmp not configured"))
- }
-}
-
func (server *BgpServer) handleValidateRib(grpcReq *GrpcRequest) {
arg := grpcReq.Data.(*api.ValidateRibRequest)
for _, rf := range server.globalRib.GetRFlist() {
@@ -2796,10 +2709,13 @@ const (
)
type watchOptions struct {
- bestpath bool
- preUpdate bool
- postUpdate bool
- peerState bool
+ bestpath bool
+ preUpdate bool
+ postUpdate bool
+ peerState bool
+ initUpdate bool
+ initPostUpdate bool
+ initPeerState bool
}
type WatchOption func(*watchOptions)
@@ -2810,21 +2726,30 @@ func WatchBestPath() WatchOption {
}
}
-func WatchUpdate() WatchOption {
+func WatchUpdate(current bool) WatchOption {
return func(o *watchOptions) {
o.preUpdate = true
+ if current {
+ o.initUpdate = true
+ }
}
}
-func WatchPostUpdate() WatchOption {
+func WatchPostUpdate(current bool) WatchOption {
return func(o *watchOptions) {
o.postUpdate = true
+ if current {
+ o.initPostUpdate = true
+ }
}
}
-func WatchPeerState() WatchOption {
+func WatchPeerState(current bool) WatchOption {
return func(o *watchOptions) {
o.peerState = true
+ if current {
+ o.initPeerState = true
+ }
}
}
@@ -2859,10 +2784,8 @@ func (w *Watcher) loop() {
func (w *Watcher) Stop() {
ch := make(chan struct{})
defer func() { <-ch }()
-
w.s.mgmtCh <- func() {
defer close(ch)
-
for k, l := range w.s.watcherMap {
for i, v := range l {
if w == v {
@@ -2919,6 +2842,56 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
if w.opts.peerState {
register(WATCH_TYPE_PEER_STATE, w)
}
+ if w.opts.initPeerState {
+ for _, peer := range s.neighborMap {
+ if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ continue
+ }
+ w.notify(createWatcherEventStateChange(peer))
+ }
+ }
+ if w.opts.initUpdate {
+ for _, peer := range s.neighborMap {
+ if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ continue
+ }
+ for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) {
+ msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
+ buf, _ := msgs[0].Serialize()
+ _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
+ l, _ := peer.fsm.LocalHostPort()
+ w.notify(&watcherEventUpdateMsg{
+ message: msgs[0],
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(l),
+ peerID: peer.fsm.peerInfo.ID,
+ fourBytesAs: y,
+ timestamp: path.GetTimestamp(),
+ payload: buf,
+ postPolicy: false,
+ })
+ }
+ }
+ }
+ if w.opts.postUpdate {
+ for _, path := range s.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, s.globalRib.GetRFlist()) {
+ msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
+ buf, _ := msgs[0].Serialize()
+ w.notify(&watcherEventUpdateMsg{
+ peerAS: path.GetSource().AS,
+ peerAddress: path.GetSource().Address,
+ peerID: path.GetSource().ID,
+ message: msgs[0],
+ timestamp: path.GetTimestamp(),
+ payload: buf,
+ postPolicy: true,
+ })
+ }
+
+ }
+
go w.loop()
}
return w