diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2015-12-30 20:50:27 +0900 |
---|---|---|
committer | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2016-01-09 22:51:23 +0900 |
commit | aff1c244ad0d88a814f2ce573800717ccd08b450 (patch) | |
tree | 711b432961c52c76a70cd76a980d00ad956c0d5b /server/server.go | |
parent | f4c07da88154dd4b21012576a4ceb205715f4b3e (diff) |
bmp: use watcher infra to implement bmp feature
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r-- | server/server.go | 232 |
1 files changed, 129 insertions, 103 deletions
diff --git a/server/server.go b/server/server.go index 8de474a2..cd86da38 100644 --- a/server/server.go +++ b/server/server.go @@ -74,6 +74,19 @@ func (m *broadcastBGPMsg) send() { m.ch <- m } +type Watchers map[watcherType]watcher + +func (ws Watchers) watching(typ watcherEventType) bool { + for _, w := range ws { + for _, ev := range w.watchingEventTypes() { + if ev == typ { + return true + } + } + } + return false +} + type BgpServer struct { bgpConfig config.Bgp globalTypeCh chan config.Global @@ -83,7 +96,6 @@ type BgpServer struct { fsmincomingCh chan *FsmMsg fsmStateCh chan *FsmMsg rpkiConfigCh chan []config.RpkiServer - bmpConfigCh chan []config.BmpServer GrpcReqCh chan *GrpcRequest policyUpdateCh chan config.RoutingPolicy @@ -95,10 +107,8 @@ type BgpServer struct { globalRib *table.TableManager zclient *zebra.Client roaManager *roaManager - bmpClient *bmpClient - bmpConnCh chan *bmpConn shutdown bool - watchers map[watcherType]watcher + watchers Watchers } func NewBgpServer() *BgpServer { @@ -108,12 +118,10 @@ func NewBgpServer() *BgpServer { b.deletedPeerCh = make(chan config.Neighbor) b.updatedPeerCh = make(chan config.Neighbor) b.rpkiConfigCh = make(chan []config.RpkiServer) - b.bmpConfigCh = make(chan []config.BmpServer) - b.bmpConnCh = make(chan *bmpConn) b.GrpcReqCh = make(chan *GrpcRequest, 1) b.policyUpdateCh = make(chan config.RoutingPolicy) b.neighborMap = make(map[string]*Peer) - b.watchers = make(map[watcherType]watcher) + b.watchers = Watchers(make(map[watcherType]watcher)) b.roaManager, _ = newROAManager(0, nil) b.policy = table.NewRoutingPolicy() return &b @@ -143,6 +151,18 @@ func listenAndAccept(proto string, port uint32, ch chan *net.TCPConn) (*net.TCPL return l, nil } +func (server *BgpServer) notify2watchers(typ watcherEventType, ev watcherEvent) error { + for _, watcher := range server.watchers { + if ch := watcher.notify(typ); ch != nil { + server.broadcastMsgs = append(server.broadcastMsgs, &broadcastWatcherMsg{ + ch: ch, + event: ev, + }) + } + } + return nil +} + func (server *BgpServer) Serve() { var g config.Global for { @@ -166,7 +186,6 @@ func (server *BgpServer) Serve() { } } - server.bmpClient, _ = newBMPClient(nil, server.bmpConnCh) server.roaManager, _ = newROAManager(g.Config.As, nil) if g.Mrt.FileName != "" { @@ -178,6 +197,20 @@ func (server *BgpServer) Serve() { } } + if len(g.BmpServers) > 0 { + w, err := newBmpWatcher(server.GrpcReqCh) + if err != nil { + log.Warn(err) + } else { + for _, server := range g.BmpServers { + if err := w.addServer(server.Config); err != nil { + log.Warn(err) + } + } + server.watchers[WATCHER_BMP] = w + } + } + if g.Zebra.Enabled == true { if g.Zebra.Url == "" { g.Zebra.Url = "unix:/var/run/quagga/zserv.api" @@ -343,34 +376,6 @@ func (server *BgpServer) Serve() { select { case c := <-server.rpkiConfigCh: server.roaManager, _ = newROAManager(server.bgpConfig.Global.Config.As, c) - case c := <-server.bmpConfigCh: - server.bmpClient, _ = newBMPClient(c, server.bmpConnCh) - case c := <-server.bmpConnCh: - bmpMsgList := []*bgp.BMPMessage{} - for _, targetPeer := range server.neighborMap { - if targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { - continue - } - for _, p := range targetPeer.adjRibIn.PathList(targetPeer.configuredRFlist(), false) { - // avoid to merge for timestamp - u := table.CreateUpdateMsgFromPaths([]*table.Path{p}) - buf, _ := u[0].Serialize() - bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, targetPeer.fsm.peerInfo, p.GetTimestamp().Unix(), buf)) - } - } - - for _, p := range server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist()) { - u := table.CreateUpdateMsgFromPaths([]*table.Path{p}) - buf, _ := u[0].Serialize() - bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, p.GetSource(), p.GetTimestamp().Unix(), buf)) - } - - m := &broadcastBMPMsg{ - ch: server.bmpClient.send(), - conn: c.conn, - msgList: bmpMsgList, - } - server.broadcastMsgs = append(server.broadcastMsgs, m) case rmsg := <-server.roaManager.recieveROA(): server.roaManager.handleROAEvent(rmsg) case zmsg := <-zapiMsgCh: @@ -408,7 +413,7 @@ func (server *BgpServer) Serve() { } server.neighborMap[addr] = peer peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) - server.broadcastPeerState(peer) + server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE) case config := <-server.deletedPeerCh: addr := config.Config.NeighborAddress SetTcpMD5SigSockopts(listener(addr), addr, "") @@ -682,7 +687,7 @@ func (server *BgpServer) broadcastBests(bests []*table.Path) { } } -func (server *BgpServer) broadcastPeerState(peer *Peer) { +func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { result := &GrpcResponse{ Data: peer.ToApiStruct(), } @@ -707,6 +712,29 @@ func (server *BgpServer) broadcastPeerState(peer *Peer) { remainReqs = append(remainReqs, req) } server.broadcastReqs = remainReqs + newState := peer.fsm.state + if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { + if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) { + _, rport := peer.fsm.RemoteHostPort() + laddr, lport := peer.fsm.LocalHostPort() + sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) + recvOpen := peer.fsm.recvOpen + ev := &watcherEventStateChangedMsg{ + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(laddr), + peerPort: rport, + localPort: lport, + peerID: peer.fsm.peerInfo.ID, + sentOpen: sentOpen, + recvOpen: recvOpen, + state: newState, + timestamp: time.Now(), + } + server.notify2watchers(WATCHER_EVENT_STATE_CHANGE, ev) + } + } } func (server *BgpServer) RSimportPaths(peer *Peer, pathList []*table.Path) []*table.Path { @@ -817,13 +845,6 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { peer.fsm.StateChange(nextState) if oldState == bgp.BGP_FSM_ESTABLISHED { - if ch := server.bmpClient.send(); ch != nil { - m := &broadcastBMPMsg{ - ch: ch, - msgList: []*bgp.BMPMessage{bmpPeerDown(bgp.BMP_PEER_DOWN_REASON_UNKNOWN, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, peer.conf.Timers.State.Downtime)}, - } - server.broadcastMsgs = append(server.broadcastMsgs, m) - } t := time.Now() if t.Sub(time.Unix(peer.conf.Timers.State.Uptime, 0)) < FLOP_THRESHOLD { peer.conf.State.Flops++ @@ -838,16 +859,8 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { peer.outgoing = make(chan *bgp.BGPMessage, 128) if nextState == bgp.BGP_FSM_ESTABLISHED { // update for export policy - laddr, lport := peer.fsm.LocalHostPort() + laddr, _ := peer.fsm.LocalHostPort() peer.conf.Transport.Config.LocalAddress = laddr - if ch := server.bmpClient.send(); ch != nil { - _, rport := peer.fsm.RemoteHostPort() - m := &broadcastBMPMsg{ - ch: ch, - msgList: []*bgp.BMPMessage{bmpPeerUp(laddr, lport, rport, buildopen(peer.fsm.gConf, peer.fsm.pConf), peer.fsm.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, peer.conf.Timers.State.Uptime)}, - } - server.broadcastMsgs = append(server.broadcastMsgs, m) - } pathList, _ := peer.getBestFromLocal(peer.configuredRFlist()) if len(pathList) > 0 { peer.adjRibOut.Update(pathList) @@ -874,55 +887,36 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { peer.conf.Timers.State = config.TimersState{} } peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) - server.broadcastPeerState(peer) + server.broadcastPeerState(peer, oldState) case FSM_MSG_BGP_MESSAGE: switch m := e.MsgData.(type) { case *bgp.MessageError: msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)})) case *bgp.BGPMessage: - if m.Header.Type == bgp.BGP_MSG_UPDATE { - listener := make(map[watcher]chan watcherEvent) - for _, watcher := range server.watchers { - if ch := watcher.notify(WATCHER_EVENT_UPDATE_MSG); ch != nil { - listener[watcher] = ch - } - } - if len(listener) > 0 { - _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] - l, _ := peer.fsm.LocalHostPort() - ev := &watcherEventUpdateMsg{ - message: m, - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(l), - fourBytesAs: y, - timestamp: e.timestamp, - payload: e.payload, - } - for _, ch := range listener { - bm := &broadcastWatcherMsg{ - ch: ch, - event: ev, - } - server.broadcastMsgs = append(server.broadcastMsgs, bm) - } - } - - if ch := server.bmpClient.send(); ch != nil { - bm := &broadcastBMPMsg{ - ch: ch, - msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, e.timestamp.Unix(), e.payload)}, - } - server.broadcastMsgs = append(server.broadcastMsgs, bm) + if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) { + _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] + l, _ := peer.fsm.LocalHostPort() + ev := &watcherEventUpdateMsg{ + message: m, + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(l), + peerID: peer.fsm.peerInfo.ID, + fourBytesAs: y, + timestamp: e.timestamp, + payload: e.payload, + postPolicy: false, } + server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev) } pathList, msgList := peer.handleBGPmessage(e) if len(msgList) > 0 { msgs = append(msgs, newSenderMsg(peer, msgList)) } + if len(pathList) > 0 { isMonitor := func() bool { if len(server.broadcastReqs) > 0 { @@ -936,15 +930,23 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { } m, altered := server.propagateUpdate(peer, pathList) msgs = append(msgs, m...) - - if ch := server.bmpClient.send(); ch != nil { + if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) { + _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] + l, _ := peer.fsm.LocalHostPort() + ev := &watcherEventUpdateMsg{ + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(l), + peerID: peer.fsm.peerInfo.ID, + fourBytesAs: y, + timestamp: e.timestamp, + postPolicy: true, + } for _, u := range table.CreateUpdateMsgFromPaths(altered) { payload, _ := u.Serialize() - bm := &broadcastBMPMsg{ - ch: ch, - msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, peer.fsm.peerInfo, e.timestamp.Unix(), payload)}, - } - server.broadcastMsgs = append(server.broadcastMsgs, bm) + ev.payload = payload + server.notify2watchers(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev) } } } @@ -969,10 +971,6 @@ func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) { server.rpkiConfigCh <- c } -func (server *BgpServer) SetBmpConfig(c []config.BmpServer) { - server.bmpConfigCh <- c -} - func (server *BgpServer) PeerAdd(peer config.Neighbor) { server.addedPeerCh <- peer } @@ -1635,6 +1633,18 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { Data: d, } close(grpcReq.ResponseCh) + case REQ_BMP_GLOBAL: + paths := server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist()) + bmpmsgs := make([]*bgp.BMPMessage, 0, len(paths)) + for _, path := range paths { + msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) + buf, _ := msgs[0].Serialize() + bmpmsgs = append(bmpmsgs, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, path.GetSource(), path.GetTimestamp().Unix(), buf)) + } + grpcReq.ResponseCh <- &GrpcResponse{ + Data: bmpmsgs, + } + close(grpcReq.ResponseCh) case REQ_MOD_PATH: pathList := server.handleModPathRequest(grpcReq) if len(pathList) > 0 { @@ -1670,7 +1680,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) info := peer.fsm.peerInfo timestamp := peer.conf.Timers.State.Uptime - msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp) + msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.fsm.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp) msgs = append(msgs, msg) } grpcReq.ResponseCh <- &GrpcResponse{ @@ -1750,6 +1760,22 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { Data: d, } close(grpcReq.ResponseCh) + case REQ_BMP_ADJ_IN: + bmpmsgs := make([]*bgp.BMPMessage, 0) + for _, peer := range server.neighborMap { + if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) { + msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) + buf, _ := msgs[0].Serialize() + bmpmsgs = append(bmpmsgs, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, path.GetTimestamp().Unix(), buf)) + } + } + grpcReq.ResponseCh <- &GrpcResponse{ + Data: bmpmsgs, + } + close(grpcReq.ResponseCh) case REQ_NEIGHBOR_SHUTDOWN: peers, err := reqToPeers(grpcReq) if err != nil { @@ -2129,7 +2155,7 @@ func (server *BgpServer) handleGrpcModNeighbor(grpcReq *GrpcRequest) (sMsgs []*S } server.neighborMap[addr] = peer peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) - server.broadcastPeerState(peer) + server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE) case api.Operation_DEL: SetTcpMD5SigSockopts(listener(net.ParseIP(addr)), addr, "") log.Info("Delete a peer configuration for ", addr) |