diff options
Diffstat (limited to 'server/server.go')
-rw-r--r-- | server/server.go | 86 |
1 files changed, 74 insertions, 12 deletions
diff --git a/server/server.go b/server/server.go index 1699a5cb..9861ee7b 100644 --- a/server/server.go +++ b/server/server.go @@ -90,6 +90,7 @@ type BgpServer struct { deletedPeerCh chan config.Neighbor updatedPeerCh chan config.Neighbor rpkiConfigCh chan config.RpkiServers + bmpConfigCh chan config.BmpServers dumper *dumper GrpcReqCh chan *GrpcRequest @@ -103,6 +104,8 @@ type BgpServer struct { localRibMap map[string]*LocalRib zclient *zebra.Client roaClient *roaClient + bmpClient *bmpClient + bmpConnCh chan *bmpConn } func NewBgpServer(port int) *BgpServer { @@ -112,6 +115,8 @@ func NewBgpServer(port int) *BgpServer { b.deletedPeerCh = make(chan config.Neighbor) b.updatedPeerCh = make(chan config.Neighbor) b.rpkiConfigCh = make(chan config.RpkiServers) + b.bmpConfigCh = make(chan config.BmpServers) + b.bmpConnCh = make(chan *bmpConn) b.GrpcReqCh = make(chan *GrpcRequest, 1) b.policyUpdateCh = make(chan config.RoutingPolicy) b.localRibMap = make(map[string]*LocalRib) @@ -252,6 +257,32 @@ func (server *BgpServer) Serve() { select { case c := <-server.rpkiConfigCh: server.roaClient, _ = newROAClient(c) + case c := <-server.bmpConfigCh: + server.bmpClient, _ = newBMPClient(c, server.bmpConnCh) + case c := <-server.bmpConnCh: + bmpMsgList := []*bgp.BMPMessage{} + for _, targetPeer := range server.neighborMap { + pathList := make([]*table.Path, 0) + if targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + for _, rf := range targetPeer.configuredRFlist() { + pathList = append(pathList, targetPeer.adjRib.GetInPathList(rf)...) + } + for _, p := range pathList { + // avoid to merge for timestamp + u := table.CreateUpdateMsgFromPaths([]*table.Path{p}) + bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, targetPeer.peerInfo, p.GetTimestamp().Unix(), u[0])) + } + } + + m := &broadcastBMPMsg{ + ch: server.bmpClient.send(), + conn: c.conn, + addr: c.addr, + msgList: bmpMsgList, + } + server.broadcastMsgs = append(server.broadcastMsgs, m) case rmsg := <-server.roaClient.recieveROA(): server.roaClient.handleRTRMsg(rmsg) case zmsg := <-zapiMsgCh: @@ -733,6 +764,13 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan * globalRib := server.localRibMap[GLOBAL_RIB_NAME] if oldState == bgp.BGP_FSM_ESTABLISHED { + if ch := server.bmpClient.send(); ch != nil { + m := &broadcastBMPMsg{ + ch: ch, + msgList: []*bgp.BMPMessage{bmpPeerDown(bgp.BMP_PEER_DOWN_REASON_UNKNOWN, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.peerInfo, peer.conf.Timers.TimersState.Downtime)}, + } + server.broadcastMsgs = append(server.broadcastMsgs, m) + } t := time.Now() if t.Sub(time.Unix(peer.conf.Timers.TimersState.Uptime, 0)) < FLOP_THRESHOLD { peer.conf.NeighborState.Flops++ @@ -748,12 +786,22 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan * close(peer.outgoing) peer.outgoing = make(chan *bgp.BGPMessage, 128) if nextState == bgp.BGP_FSM_ESTABLISHED { + if ch := server.bmpClient.send(); ch != nil { + laddr, lport := peer.fsm.LocalHostPort() + _, rport := peer.fsm.RemoteHostPort() + m := &broadcastBMPMsg{ + ch: ch, + msgList: []*bgp.BMPMessage{bmpPeerUp(laddr, lport, rport, buildopen(peer.fsm.gConf, peer.fsm.pConf), peer.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.peerInfo, peer.conf.Timers.TimersState.Uptime)}, + } + server.broadcastMsgs = append(server.broadcastMsgs, m) + } pathList := make([]*table.Path, 0) if peer.isRouteServerClient() { loc := server.localRibMap[peer.conf.NeighborConfig.NeighborAddress.String()] pathList = applyPolicies(peer, loc, POLICY_DIRECTION_EXPORT, peer.getBests(loc)) } else { - peer.conf.Transport.TransportConfig.LocalAddress = peer.fsm.LocalAddr() + l, _ := peer.fsm.LocalHostPort() + peer.conf.Transport.TransportConfig.LocalAddress = net.ParseIP(l) for _, path := range filterpath(peer, peer.getBests(globalRib)) { p := path.Clone(path.IsWithdraw) p.UpdatePathAttrs(&server.bgpConfig.Global, &peer.conf) @@ -796,18 +844,28 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan * server.roaClient.validate(pathList) } } - if m.Header.Type == bgp.BGP_MSG_UPDATE && server.dumper != nil { - _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] - bm := &broadcastBGPMsg{ - message: m, - peerAS: peer.peerInfo.AS, - localAS: peer.peerInfo.LocalAS, - peerAddress: peer.peerInfo.Address, - localAddress: peer.fsm.LocalAddr(), - fourBytesAs: y, - ch: server.dumper.sendCh(), + if m.Header.Type == bgp.BGP_MSG_UPDATE { + if server.dumper != nil { + _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] + l, _ := peer.fsm.LocalHostPort() + bm := &broadcastBGPMsg{ + message: m, + peerAS: peer.peerInfo.AS, + localAS: peer.peerInfo.LocalAS, + peerAddress: peer.peerInfo.Address, + localAddress: net.ParseIP(l), + fourBytesAs: y, + ch: server.dumper.sendCh(), + } + server.broadcastMsgs = append(server.broadcastMsgs, bm) + } + if ch := server.bmpClient.send(); ch != nil { + bm := &broadcastBMPMsg{ + ch: ch, + msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.peerInfo, time.Now().Unix(), m)}, + } + server.broadcastMsgs = append(server.broadcastMsgs, bm) } - server.broadcastMsgs = append(server.broadcastMsgs, bm) } msgs = append(msgs, server.propagateUpdate(peer, pathList)...) default: @@ -829,6 +887,10 @@ func (server *BgpServer) SetRpkiConfig(c config.RpkiServers) { server.rpkiConfigCh <- c } +func (server *BgpServer) SetBmpConfig(c config.BmpServers) { + server.bmpConfigCh <- c +} + func (server *BgpServer) PeerAdd(peer config.Neighbor) { server.addedPeerCh <- peer } |