diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2015-12-30 20:50:27 +0900 |
---|---|---|
committer | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2016-01-09 22:51:23 +0900 |
commit | aff1c244ad0d88a814f2ce573800717ccd08b450 (patch) | |
tree | 711b432961c52c76a70cd76a980d00ad956c0d5b | |
parent | f4c07da88154dd4b21012576a4ceb205715f4b3e (diff) |
bmp: use watcher infra to implement bmp feature
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
-rw-r--r-- | config/bgp_configs.go | 56 | ||||
-rw-r--r-- | config/default.go | 4 | ||||
-rw-r--r-- | docs/sources/bmp.md | 4 | ||||
-rw-r--r-- | gobgpd/main.go | 1 | ||||
-rw-r--r-- | server/bmp.go | 314 | ||||
-rw-r--r-- | server/grpc_server.go | 2 | ||||
-rw-r--r-- | server/server.go | 232 | ||||
-rw-r--r-- | server/watcher.go | 22 | ||||
-rw-r--r-- | tools/pyang_plugins/gobgp.yang | 2 |
9 files changed, 405 insertions, 232 deletions
diff --git a/config/bgp_configs.go b/config/bgp_configs.go index d5f6975d..24fc2535 100644 --- a/config/bgp_configs.go +++ b/config/bgp_configs.go @@ -443,32 +443,6 @@ func (v RpkiValidationResultType) Validate() error { return nil } -//struct for container gobgp:state -type BmpServerState struct { -} - -//struct for container gobgp:config -type BmpServerConfig struct { - // original -> gobgp:address - //gobgp:address's original type is inet:ip-address - Address string `mapstructure:"address"` - // original -> gobgp:port - Port uint32 `mapstructure:"port"` - // original -> gobgp:route-monitoring-policy - RouteMonitoringPolicy BmpRouteMonitoringPolicyType `mapstructure:"route-monitoring-policy"` -} - -//struct for container gobgp:bmp-server -type BmpServer struct { - // original -> gobgp:address - //gobgp:address's original type is inet:ip-address - Address string `mapstructure:"address"` - // original -> gobgp:bmp-server-config - Config BmpServerConfig `mapstructure:"config"` - // original -> gobgp:bmp-server-state - State BmpServerState `mapstructure:"state"` -} - //struct for container gobgp:rpki-received type RpkiReceived struct { // original -> gobgp:serial-notify @@ -1119,6 +1093,32 @@ type Mrt struct { FileName string `mapstructure:"file-name"` } +//struct for container gobgp:state +type BmpServerState struct { +} + +//struct for container gobgp:config +type BmpServerConfig struct { + // original -> gobgp:address + //gobgp:address's original type is inet:ip-address + Address string `mapstructure:"address"` + // original -> gobgp:port + Port uint32 `mapstructure:"port"` + // original -> gobgp:route-monitoring-policy + RouteMonitoringPolicy BmpRouteMonitoringPolicyType `mapstructure:"route-monitoring-policy"` +} + +//struct for container gobgp:bmp-server +type BmpServer struct { + // original -> gobgp:address + //gobgp:address's original type is inet:ip-address + Address string `mapstructure:"address"` + // original -> gobgp:bmp-server-config + Config BmpServerConfig `mapstructure:"config"` + // original -> gobgp:bmp-server-state + State BmpServerState `mapstructure:"state"` +} + //struct for container bgp-mp:l2vpn-evpn type L2vpnEvpn struct { // original -> bgp-mp:prefix-limit @@ -1649,6 +1649,8 @@ type Global struct { AfiSafis []AfiSafi `mapstructure:"afi-safis"` // original -> rpol:apply-policy ApplyPolicy ApplyPolicy `mapstructure:"apply-policy"` + // original -> gobgp:bmp-servers + BmpServers []BmpServer `mapstructure:"bmp-servers"` // original -> gobgp:mrt Mrt Mrt `mapstructure:"mrt"` // original -> gobgp:zebra @@ -1669,8 +1671,6 @@ type Bgp struct { PeerGroups []PeerGroup `mapstructure:"peer-groups"` // original -> gobgp:rpki-servers RpkiServers []RpkiServer `mapstructure:"rpki-servers"` - // original -> gobgp:bmp-servers - BmpServers []BmpServer `mapstructure:"bmp-servers"` } //struct for container bgp-pol:set-ext-community-method diff --git a/config/default.go b/config/default.go index 82a17446..c0764c44 100644 --- a/config/default.go +++ b/config/default.go @@ -39,11 +39,11 @@ func SetDefaultConfigValues(v *viper.Viper, b *Bgp) error { b.Global.ListenConfig.Port = bgp.BGP_PORT } - for idx, server := range b.BmpServers { + for idx, server := range b.Global.BmpServers { if server.Config.Port == 0 { server.Config.Port = bgp.BMP_DEFAULT_PORT } - b.BmpServers[idx] = server + b.Global.BmpServers[idx] = server } if !v.IsSet("global.mpls-label-range.min-label") { diff --git a/docs/sources/bmp.md b/docs/sources/bmp.md index b8333cd4..f798ed26 100644 --- a/docs/sources/bmp.md +++ b/docs/sources/bmp.md @@ -12,14 +12,14 @@ Assume you finished [Getting Started](https://github.com/osrg/gobgp/blob/master/ ## <a name="config"> Configuration -Add `[bmp-servers]` section to enable BMP like below. +Add `[bmp-servers]` section under `[global]` to enable BMP like below. ```toml [global.config] as = 64512 router-id = "192.168.255.1" -[[bmp-servers]] +[[global.bmp-servers]] [bmp-servers.config] address = "127.0.0.1" port=11019 diff --git a/gobgpd/main.go b/gobgpd/main.go index e32af2e8..ca950d29 100644 --- a/gobgpd/main.go +++ b/gobgpd/main.go @@ -186,7 +186,6 @@ func main() { bgpServer.SetGlobalType(newConfig.Bgp.Global) bgpConfig = &newConfig.Bgp bgpServer.SetRpkiConfig(newConfig.Bgp.RpkiServers) - bgpServer.SetBmpConfig(newConfig.Bgp.BmpServers) added = newConfig.Bgp.Neighbors deleted = []config.Neighbor{} updated = []config.Neighbor{} diff --git a/server/bmp.go b/server/bmp.go index 3d8b50f6..90928acd 100644 --- a/server/bmp.go +++ b/server/bmp.go @@ -16,10 +16,12 @@ package server import ( + "fmt" log "github.com/Sirupsen/logrus" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" + "gopkg.in/tomb.v2" "net" "strconv" "time" @@ -38,126 +40,198 @@ func (m *broadcastBMPMsg) send() { type bmpServer struct { conn *net.TCPConn host string + typ config.BmpRouteMonitoringPolicyType } -type bmpClient struct { - ch chan *broadcastBMPMsg - connCh chan *bmpConn +type bmpConfig struct { + config config.BmpServerConfig + del bool + errCh chan error } -func newBMPClient(servers []config.BmpServer, connCh chan *bmpConn) (*bmpClient, error) { - b := &bmpClient{} - if len(servers) == 0 { - return b, nil +type bmpWatcher struct { + t tomb.Tomb + ch chan watcherEvent + apiCh chan *GrpcRequest + newServerCh chan *bmpServer + endCh chan *net.TCPConn + connMap map[string]*bmpServer + ctlCh chan *bmpConfig +} + +func (w *bmpWatcher) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE { + return w.ch } + return nil +} - b.ch = make(chan *broadcastBMPMsg) - b.connCh = connCh +func (w *bmpWatcher) stop() { + w.t.Kill(nil) +} - endCh := make(chan *net.TCPConn) +func (w *bmpWatcher) tryConnect(server *bmpServer) { + interval := 1 + host := server.host + for { + log.Debug("connecting bmp server: ", host) + conn, err := net.Dial("tcp", host) + if err != nil { + time.Sleep(time.Duration(interval) * time.Second) + if interval < 30 { + interval *= 2 + } + } else { + log.Info("bmp server is connected, ", host) + server.conn = conn.(*net.TCPConn) + go func() { + buf := make([]byte, 1) + for { + _, err := conn.Read(buf) + if err != nil { + w.endCh <- conn.(*net.TCPConn) + return + } + } + }() + w.newServerCh <- server + break + } + } +} - tryConnect := func(host string) { - interval := 1 - for { - log.Debug("connecting bmp server: ", host) - conn, err := net.Dial("tcp", host) - if err != nil { - time.Sleep(time.Duration(interval) * time.Second) - if interval < 30 { - interval *= 2 +func (w *bmpWatcher) loop() error { + for { + select { + case <-w.t.Dying(): + for _, server := range w.connMap { + if server.conn != nil { + server.conn.Close() } + } + return nil + case m := <-w.ctlCh: + c := m.config + if m.del { + host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) + if _, y := w.connMap[host]; !y { + m.errCh <- fmt.Errorf("bmp server %s doesn't exists", host) + continue + } + conn := w.connMap[host].conn + delete(w.connMap, host) + conn.Close() } else { - log.Info("bmp server is connected, ", host) - go func() { - buf := make([]byte, 1) - for { - _, err := conn.Read(buf) + host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) + if _, y := w.connMap[host]; y { + m.errCh <- fmt.Errorf("bmp server %s already exists", host) + continue + } + server := &bmpServer{ + host: host, + typ: c.RouteMonitoringPolicy, + } + w.connMap[host] = server + go w.tryConnect(server) + } + m.errCh <- nil + close(m.errCh) + case server := <-w.newServerCh: + i := bgp.NewBMPInitiation([]bgp.BMPTLV{}) + buf, _ := i.Serialize() + _, err := server.conn.Write(buf) + if err != nil { + log.Warnf("failed to write to bmp server %s", server.host) + } + req := &GrpcRequest{ + RequestType: REQ_BMP_NEIGHBORS, + ResponseCh: make(chan *GrpcResponse, 1), + } + w.apiCh <- req + write := func(req *GrpcRequest) { + for res := range req.ResponseCh { + for _, msg := range res.Data.([]*bgp.BMPMessage) { + buf, _ = msg.Serialize() + _, err := server.conn.Write(buf) if err != nil { - endCh <- conn.(*net.TCPConn) - return + log.Warnf("failed to write to bmp server %s", server.host) } } - }() - connCh <- &bmpConn{ - conn: conn.(*net.TCPConn), - host: host, } - break } - } - } - - for _, c := range servers { - b := c.Config - go tryConnect(net.JoinHostPort(b.Address, strconv.Itoa(int(b.Port)))) - } - - go func() { - connMap := make(map[string]*net.TCPConn) - for { - select { - case m := <-b.ch: - if m.conn != nil { - i := bgp.NewBMPInitiation([]bgp.BMPTLV{}) - buf, _ := i.Serialize() - _, err := m.conn.Write(buf) - if err == nil { - connMap[m.conn.RemoteAddr().String()] = m.conn - } + write(req) + if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY { + req = &GrpcRequest{ + RequestType: REQ_BMP_ADJ_IN, + ResponseCh: make(chan *GrpcResponse, 1), } - - for host, conn := range connMap { - if m.conn != nil && m.conn != conn { - continue - } - - for _, msg := range m.msgList { - if msg.Header.Type == bgp.BMP_MSG_ROUTE_MONITORING { - c := func() *config.BmpServerConfig { - for _, c := range servers { - b := &c.Config - if host == net.JoinHostPort(b.Address, strconv.Itoa(int(b.Port))) { - return b - } - } - return nil - }() - if c == nil { - log.Fatal(host) - } - ph := msg.PeerHeader - switch c.RouteMonitoringPolicy { - case config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY: - if ph.IsPostPolicy != false { - continue - } - case config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY: - if ph.IsPostPolicy != true { - continue - } + w.apiCh <- req + write(req) + } + if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY { + req = &GrpcRequest{ + RequestType: REQ_BMP_GLOBAL, + ResponseCh: make(chan *GrpcResponse, 1), + } + w.apiCh <- req + write(req) + } + case ev := <-w.ch: + switch msg := ev.(type) { + case *watcherEventUpdateMsg: + info := &table.PeerInfo{ + Address: msg.peerAddress, + AS: msg.peerAS, + ID: msg.peerID, + } + buf, _ := bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload).Serialize() + for _, server := range w.connMap { + if server.conn != nil { + send := server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY && !msg.postPolicy + send = send || (server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY && msg.postPolicy) + if send { + _, err := server.conn.Write(buf) + if err != nil { + log.Warnf("failed to write to bmp server %s", server.host) } - } - b, _ := msg.Serialize() - if _, err := conn.Write(b); err != nil { - break + } + } + case *watcherEventStateChangedMsg: + var bmpmsg *bgp.BMPMessage + info := &table.PeerInfo{ + Address: msg.peerAddress, + AS: msg.peerAS, + ID: msg.peerID, + } + if msg.state == bgp.BGP_FSM_ESTABLISHED { + bmpmsg = bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix()) + } else { + bmpmsg = bmpPeerDown(bgp.BMP_PEER_DOWN_REASON_UNKNOWN, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix()) + } + buf, _ := bmpmsg.Serialize() + for _, server := range w.connMap { + if server.conn != nil { + _, err := server.conn.Write(buf) + if err != nil { + log.Warnf("failed to write to bmp server %s", server.host) } } } - case conn := <-endCh: - host := conn.RemoteAddr().String() - log.Debugf("bmp connection to %s killed", host) - delete(connMap, host) - go tryConnect(host) + default: + log.Warnf("unknown watcher event") } + case conn := <-w.endCh: + host := conn.RemoteAddr().String() + log.Debugf("bmp connection to %s killed", host) + w.connMap[host].conn = nil + go w.tryConnect(w.connMap[host]) } - }() - - return b, nil + } } -func (c *bmpClient) send() chan *broadcastBMPMsg { - return c.ch +func (w *bmpWatcher) restart(string) error { + return nil } func bmpPeerUp(laddr string, lport, rport uint16, sent, recv *bgp.BGPMessage, t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bgp.BMPMessage { @@ -177,3 +251,53 @@ func bmpPeerRoute(t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timest body.BGPUpdatePayload = payload return m } + +func (w *bmpWatcher) addServer(c config.BmpServerConfig) error { + ch := make(chan error) + w.ctlCh <- &bmpConfig{ + config: c, + errCh: ch, + } + return <-ch +} + +func (w *bmpWatcher) watchingEventTypes() []watcherEventType { + state := false + pre := false + post := false + for _, server := range w.connMap { + if server.conn != nil { + state = true + if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY { + pre = true + } + if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY { + post = true + } + } + } + types := make([]watcherEventType, 0, 3) + if state { + types = append(types, WATCHER_EVENT_STATE_CHANGE) + } + if pre { + types = append(types, WATCHER_EVENT_UPDATE_MSG) + } + if post { + types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG) + } + return types +} + +func newBmpWatcher(grpcCh chan *GrpcRequest) (*bmpWatcher, error) { + w := &bmpWatcher{ + ch: make(chan watcherEvent), + apiCh: grpcCh, + newServerCh: make(chan *bmpServer), + endCh: make(chan *net.TCPConn), + connMap: make(map[string]*bmpServer), + ctlCh: make(chan *bmpConfig), + } + w.t.Go(w.loop) + return w, nil +} diff --git a/server/grpc_server.go b/server/grpc_server.go index 63d6e1ad..554c62a9 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -67,6 +67,8 @@ const ( REQ_POLICY_ASSIGNMENT REQ_MOD_POLICY_ASSIGNMENT REQ_BMP_NEIGHBORS + REQ_BMP_GLOBAL + REQ_BMP_ADJ_IN ) type Server struct { diff --git a/server/server.go b/server/server.go index 8de474a2..cd86da38 100644 --- a/server/server.go +++ b/server/server.go @@ -74,6 +74,19 @@ func (m *broadcastBGPMsg) send() { m.ch <- m } +type Watchers map[watcherType]watcher + +func (ws Watchers) watching(typ watcherEventType) bool { + for _, w := range ws { + for _, ev := range w.watchingEventTypes() { + if ev == typ { + return true + } + } + } + return false +} + type BgpServer struct { bgpConfig config.Bgp globalTypeCh chan config.Global @@ -83,7 +96,6 @@ type BgpServer struct { fsmincomingCh chan *FsmMsg fsmStateCh chan *FsmMsg rpkiConfigCh chan []config.RpkiServer - bmpConfigCh chan []config.BmpServer GrpcReqCh chan *GrpcRequest policyUpdateCh chan config.RoutingPolicy @@ -95,10 +107,8 @@ type BgpServer struct { globalRib *table.TableManager zclient *zebra.Client roaManager *roaManager - bmpClient *bmpClient - bmpConnCh chan *bmpConn shutdown bool - watchers map[watcherType]watcher + watchers Watchers } func NewBgpServer() *BgpServer { @@ -108,12 +118,10 @@ func NewBgpServer() *BgpServer { b.deletedPeerCh = make(chan config.Neighbor) b.updatedPeerCh = make(chan config.Neighbor) b.rpkiConfigCh = make(chan []config.RpkiServer) - b.bmpConfigCh = make(chan []config.BmpServer) - b.bmpConnCh = make(chan *bmpConn) b.GrpcReqCh = make(chan *GrpcRequest, 1) b.policyUpdateCh = make(chan config.RoutingPolicy) b.neighborMap = make(map[string]*Peer) - b.watchers = make(map[watcherType]watcher) + b.watchers = Watchers(make(map[watcherType]watcher)) b.roaManager, _ = newROAManager(0, nil) b.policy = table.NewRoutingPolicy() return &b @@ -143,6 +151,18 @@ func listenAndAccept(proto string, port uint32, ch chan *net.TCPConn) (*net.TCPL return l, nil } +func (server *BgpServer) notify2watchers(typ watcherEventType, ev watcherEvent) error { + for _, watcher := range server.watchers { + if ch := watcher.notify(typ); ch != nil { + server.broadcastMsgs = append(server.broadcastMsgs, &broadcastWatcherMsg{ + ch: ch, + event: ev, + }) + } + } + return nil +} + func (server *BgpServer) Serve() { var g config.Global for { @@ -166,7 +186,6 @@ func (server *BgpServer) Serve() { } } - server.bmpClient, _ = newBMPClient(nil, server.bmpConnCh) server.roaManager, _ = newROAManager(g.Config.As, nil) if g.Mrt.FileName != "" { @@ -178,6 +197,20 @@ func (server *BgpServer) Serve() { } } + if len(g.BmpServers) > 0 { + w, err := newBmpWatcher(server.GrpcReqCh) + if err != nil { + log.Warn(err) + } else { + for _, server := range g.BmpServers { + if err := w.addServer(server.Config); err != nil { + log.Warn(err) + } + } + server.watchers[WATCHER_BMP] = w + } + } + if g.Zebra.Enabled == true { if g.Zebra.Url == "" { g.Zebra.Url = "unix:/var/run/quagga/zserv.api" @@ -343,34 +376,6 @@ func (server *BgpServer) Serve() { select { case c := <-server.rpkiConfigCh: server.roaManager, _ = newROAManager(server.bgpConfig.Global.Config.As, c) - case c := <-server.bmpConfigCh: - server.bmpClient, _ = newBMPClient(c, server.bmpConnCh) - case c := <-server.bmpConnCh: - bmpMsgList := []*bgp.BMPMessage{} - for _, targetPeer := range server.neighborMap { - if targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { - continue - } - for _, p := range targetPeer.adjRibIn.PathList(targetPeer.configuredRFlist(), false) { - // avoid to merge for timestamp - u := table.CreateUpdateMsgFromPaths([]*table.Path{p}) - buf, _ := u[0].Serialize() - bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, targetPeer.fsm.peerInfo, p.GetTimestamp().Unix(), buf)) - } - } - - for _, p := range server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist()) { - u := table.CreateUpdateMsgFromPaths([]*table.Path{p}) - buf, _ := u[0].Serialize() - bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, p.GetSource(), p.GetTimestamp().Unix(), buf)) - } - - m := &broadcastBMPMsg{ - ch: server.bmpClient.send(), - conn: c.conn, - msgList: bmpMsgList, - } - server.broadcastMsgs = append(server.broadcastMsgs, m) case rmsg := <-server.roaManager.recieveROA(): server.roaManager.handleROAEvent(rmsg) case zmsg := <-zapiMsgCh: @@ -408,7 +413,7 @@ func (server *BgpServer) Serve() { } server.neighborMap[addr] = peer peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) - server.broadcastPeerState(peer) + server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE) case config := <-server.deletedPeerCh: addr := config.Config.NeighborAddress SetTcpMD5SigSockopts(listener(addr), addr, "") @@ -682,7 +687,7 @@ func (server *BgpServer) broadcastBests(bests []*table.Path) { } } -func (server *BgpServer) broadcastPeerState(peer *Peer) { +func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { result := &GrpcResponse{ Data: peer.ToApiStruct(), } @@ -707,6 +712,29 @@ func (server *BgpServer) broadcastPeerState(peer *Peer) { remainReqs = append(remainReqs, req) } server.broadcastReqs = remainReqs + newState := peer.fsm.state + if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { + if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) { + _, rport := peer.fsm.RemoteHostPort() + laddr, lport := peer.fsm.LocalHostPort() + sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) + recvOpen := peer.fsm.recvOpen + ev := &watcherEventStateChangedMsg{ + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(laddr), + peerPort: rport, + localPort: lport, + peerID: peer.fsm.peerInfo.ID, + sentOpen: sentOpen, + recvOpen: recvOpen, + state: newState, + timestamp: time.Now(), + } + server.notify2watchers(WATCHER_EVENT_STATE_CHANGE, ev) + } + } } func (server *BgpServer) RSimportPaths(peer *Peer, pathList []*table.Path) []*table.Path { @@ -817,13 +845,6 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { peer.fsm.StateChange(nextState) if oldState == bgp.BGP_FSM_ESTABLISHED { - if ch := server.bmpClient.send(); ch != nil { - m := &broadcastBMPMsg{ - ch: ch, - msgList: []*bgp.BMPMessage{bmpPeerDown(bgp.BMP_PEER_DOWN_REASON_UNKNOWN, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, peer.conf.Timers.State.Downtime)}, - } - server.broadcastMsgs = append(server.broadcastMsgs, m) - } t := time.Now() if t.Sub(time.Unix(peer.conf.Timers.State.Uptime, 0)) < FLOP_THRESHOLD { peer.conf.State.Flops++ @@ -838,16 +859,8 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { peer.outgoing = make(chan *bgp.BGPMessage, 128) if nextState == bgp.BGP_FSM_ESTABLISHED { // update for export policy - laddr, lport := peer.fsm.LocalHostPort() + laddr, _ := peer.fsm.LocalHostPort() peer.conf.Transport.Config.LocalAddress = laddr - if ch := server.bmpClient.send(); ch != nil { - _, rport := peer.fsm.RemoteHostPort() - m := &broadcastBMPMsg{ - ch: ch, - msgList: []*bgp.BMPMessage{bmpPeerUp(laddr, lport, rport, buildopen(peer.fsm.gConf, peer.fsm.pConf), peer.fsm.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, peer.conf.Timers.State.Uptime)}, - } - server.broadcastMsgs = append(server.broadcastMsgs, m) - } pathList, _ := peer.getBestFromLocal(peer.configuredRFlist()) if len(pathList) > 0 { peer.adjRibOut.Update(pathList) @@ -874,55 +887,36 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { peer.conf.Timers.State = config.TimersState{} } peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) - server.broadcastPeerState(peer) + server.broadcastPeerState(peer, oldState) case FSM_MSG_BGP_MESSAGE: switch m := e.MsgData.(type) { case *bgp.MessageError: msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)})) case *bgp.BGPMessage: - if m.Header.Type == bgp.BGP_MSG_UPDATE { - listener := make(map[watcher]chan watcherEvent) - for _, watcher := range server.watchers { - if ch := watcher.notify(WATCHER_EVENT_UPDATE_MSG); ch != nil { - listener[watcher] = ch - } - } - if len(listener) > 0 { - _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] - l, _ := peer.fsm.LocalHostPort() - ev := &watcherEventUpdateMsg{ - message: m, - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(l), - fourBytesAs: y, - timestamp: e.timestamp, - payload: e.payload, - } - for _, ch := range listener { - bm := &broadcastWatcherMsg{ - ch: ch, - event: ev, - } - server.broadcastMsgs = append(server.broadcastMsgs, bm) - } - } - - if ch := server.bmpClient.send(); ch != nil { - bm := &broadcastBMPMsg{ - ch: ch, - msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, e.timestamp.Unix(), e.payload)}, - } - server.broadcastMsgs = append(server.broadcastMsgs, bm) + if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) { + _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] + l, _ := peer.fsm.LocalHostPort() + ev := &watcherEventUpdateMsg{ + message: m, + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(l), + peerID: peer.fsm.peerInfo.ID, + fourBytesAs: y, + timestamp: e.timestamp, + payload: e.payload, + postPolicy: false, } + server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev) } pathList, msgList := peer.handleBGPmessage(e) if len(msgList) > 0 { msgs = append(msgs, newSenderMsg(peer, msgList)) } + if len(pathList) > 0 { isMonitor := func() bool { if len(server.broadcastReqs) > 0 { @@ -936,15 +930,23 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { } m, altered := server.propagateUpdate(peer, pathList) msgs = append(msgs, m...) - - if ch := server.bmpClient.send(); ch != nil { + if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) { + _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] + l, _ := peer.fsm.LocalHostPort() + ev := &watcherEventUpdateMsg{ + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(l), + peerID: peer.fsm.peerInfo.ID, + fourBytesAs: y, + timestamp: e.timestamp, + postPolicy: true, + } for _, u := range table.CreateUpdateMsgFromPaths(altered) { payload, _ := u.Serialize() - bm := &broadcastBMPMsg{ - ch: ch, - msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, peer.fsm.peerInfo, e.timestamp.Unix(), payload)}, - } - server.broadcastMsgs = append(server.broadcastMsgs, bm) + ev.payload = payload + server.notify2watchers(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev) } } } @@ -969,10 +971,6 @@ func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) { server.rpkiConfigCh <- c } -func (server *BgpServer) SetBmpConfig(c []config.BmpServer) { - server.bmpConfigCh <- c -} - func (server *BgpServer) PeerAdd(peer config.Neighbor) { server.addedPeerCh <- peer } @@ -1635,6 +1633,18 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { Data: d, } close(grpcReq.ResponseCh) + case REQ_BMP_GLOBAL: + paths := server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist()) + bmpmsgs := make([]*bgp.BMPMessage, 0, len(paths)) + for _, path := range paths { + msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) + buf, _ := msgs[0].Serialize() + bmpmsgs = append(bmpmsgs, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, path.GetSource(), path.GetTimestamp().Unix(), buf)) + } + grpcReq.ResponseCh <- &GrpcResponse{ + Data: bmpmsgs, + } + close(grpcReq.ResponseCh) case REQ_MOD_PATH: pathList := server.handleModPathRequest(grpcReq) if len(pathList) > 0 { @@ -1670,7 +1680,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) info := peer.fsm.peerInfo timestamp := peer.conf.Timers.State.Uptime - msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp) + msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.fsm.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp) msgs = append(msgs, msg) } grpcReq.ResponseCh <- &GrpcResponse{ @@ -1750,6 +1760,22 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { Data: d, } close(grpcReq.ResponseCh) + case REQ_BMP_ADJ_IN: + bmpmsgs := make([]*bgp.BMPMessage, 0) + for _, peer := range server.neighborMap { + if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) { + msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) + buf, _ := msgs[0].Serialize() + bmpmsgs = append(bmpmsgs, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, path.GetTimestamp().Unix(), buf)) + } + } + grpcReq.ResponseCh <- &GrpcResponse{ + Data: bmpmsgs, + } + close(grpcReq.ResponseCh) case REQ_NEIGHBOR_SHUTDOWN: peers, err := reqToPeers(grpcReq) if err != nil { @@ -2129,7 +2155,7 @@ func (server *BgpServer) handleGrpcModNeighbor(grpcReq *GrpcRequest) (sMsgs []*S } server.neighborMap[addr] = peer peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) - server.broadcastPeerState(peer) + server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE) case api.Operation_DEL: SetTcpMD5SigSockopts(listener(net.ParseIP(addr)), addr, "") log.Info("Delete a peer configuration for ", addr) diff --git a/server/watcher.go b/server/watcher.go index d186e9cf..6cf2abc8 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -51,6 +51,7 @@ const ( WATCHER_EVENT_UPDATE_MSG WATCHER_EVENT_STATE_CHANGE WATCHER_EVENT_BESTPATH_CHANGE + WATCHER_EVENT_POST_POLICY_UPDATE_MSG ) type watcherEvent interface { @@ -62,15 +63,32 @@ type watcherEventUpdateMsg struct { localAS uint32 peerAddress net.IP localAddress net.IP + peerID net.IP fourBytesAs bool timestamp time.Time payload []byte + postPolicy bool +} + +type watcherEventStateChangedMsg struct { + peerAS uint32 + localAS uint32 + peerAddress net.IP + localAddress net.IP + peerPort uint16 + localPort uint16 + peerID net.IP + sentOpen *bgp.BGPMessage + recvOpen *bgp.BGPMessage + state bgp.FSMState + timestamp time.Time } type watcher interface { notify(watcherEventType) chan watcherEvent restart(string) error stop() + watchingEventTypes() []watcherEventType } type mrtWatcherOp struct { @@ -173,6 +191,10 @@ func (w *mrtWatcher) loop() error { } } +func (w *mrtWatcher) watchingEventTypes() []watcherEventType { + return []watcherEventType{WATCHER_EVENT_UPDATE_MSG} +} + func mrtFileOpen(filename string) (*os.File, error) { file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) if err != nil { diff --git a/tools/pyang_plugins/gobgp.yang b/tools/pyang_plugins/gobgp.yang index ff1099d5..1cf34800 100644 --- a/tools/pyang_plugins/gobgp.yang +++ b/tools/pyang_plugins/gobgp.yang @@ -598,7 +598,7 @@ module gobgp { uses gobgp-rpki-servers; } - augment "/bgp:bgp" { + augment "/bgp:bgp/bgp:global" { description "additional bmp configuration"; uses gobgp-bmp-servers; } |