summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--config/bgp_configs.go56
-rw-r--r--config/default.go4
-rw-r--r--docs/sources/bmp.md4
-rw-r--r--gobgpd/main.go1
-rw-r--r--server/bmp.go314
-rw-r--r--server/grpc_server.go2
-rw-r--r--server/server.go232
-rw-r--r--server/watcher.go22
-rw-r--r--tools/pyang_plugins/gobgp.yang2
9 files changed, 405 insertions, 232 deletions
diff --git a/config/bgp_configs.go b/config/bgp_configs.go
index d5f6975d..24fc2535 100644
--- a/config/bgp_configs.go
+++ b/config/bgp_configs.go
@@ -443,32 +443,6 @@ func (v RpkiValidationResultType) Validate() error {
return nil
}
-//struct for container gobgp:state
-type BmpServerState struct {
-}
-
-//struct for container gobgp:config
-type BmpServerConfig struct {
- // original -> gobgp:address
- //gobgp:address's original type is inet:ip-address
- Address string `mapstructure:"address"`
- // original -> gobgp:port
- Port uint32 `mapstructure:"port"`
- // original -> gobgp:route-monitoring-policy
- RouteMonitoringPolicy BmpRouteMonitoringPolicyType `mapstructure:"route-monitoring-policy"`
-}
-
-//struct for container gobgp:bmp-server
-type BmpServer struct {
- // original -> gobgp:address
- //gobgp:address's original type is inet:ip-address
- Address string `mapstructure:"address"`
- // original -> gobgp:bmp-server-config
- Config BmpServerConfig `mapstructure:"config"`
- // original -> gobgp:bmp-server-state
- State BmpServerState `mapstructure:"state"`
-}
-
//struct for container gobgp:rpki-received
type RpkiReceived struct {
// original -> gobgp:serial-notify
@@ -1119,6 +1093,32 @@ type Mrt struct {
FileName string `mapstructure:"file-name"`
}
+//struct for container gobgp:state
+type BmpServerState struct {
+}
+
+//struct for container gobgp:config
+type BmpServerConfig struct {
+ // original -> gobgp:address
+ //gobgp:address's original type is inet:ip-address
+ Address string `mapstructure:"address"`
+ // original -> gobgp:port
+ Port uint32 `mapstructure:"port"`
+ // original -> gobgp:route-monitoring-policy
+ RouteMonitoringPolicy BmpRouteMonitoringPolicyType `mapstructure:"route-monitoring-policy"`
+}
+
+//struct for container gobgp:bmp-server
+type BmpServer struct {
+ // original -> gobgp:address
+ //gobgp:address's original type is inet:ip-address
+ Address string `mapstructure:"address"`
+ // original -> gobgp:bmp-server-config
+ Config BmpServerConfig `mapstructure:"config"`
+ // original -> gobgp:bmp-server-state
+ State BmpServerState `mapstructure:"state"`
+}
+
//struct for container bgp-mp:l2vpn-evpn
type L2vpnEvpn struct {
// original -> bgp-mp:prefix-limit
@@ -1649,6 +1649,8 @@ type Global struct {
AfiSafis []AfiSafi `mapstructure:"afi-safis"`
// original -> rpol:apply-policy
ApplyPolicy ApplyPolicy `mapstructure:"apply-policy"`
+ // original -> gobgp:bmp-servers
+ BmpServers []BmpServer `mapstructure:"bmp-servers"`
// original -> gobgp:mrt
Mrt Mrt `mapstructure:"mrt"`
// original -> gobgp:zebra
@@ -1669,8 +1671,6 @@ type Bgp struct {
PeerGroups []PeerGroup `mapstructure:"peer-groups"`
// original -> gobgp:rpki-servers
RpkiServers []RpkiServer `mapstructure:"rpki-servers"`
- // original -> gobgp:bmp-servers
- BmpServers []BmpServer `mapstructure:"bmp-servers"`
}
//struct for container bgp-pol:set-ext-community-method
diff --git a/config/default.go b/config/default.go
index 82a17446..c0764c44 100644
--- a/config/default.go
+++ b/config/default.go
@@ -39,11 +39,11 @@ func SetDefaultConfigValues(v *viper.Viper, b *Bgp) error {
b.Global.ListenConfig.Port = bgp.BGP_PORT
}
- for idx, server := range b.BmpServers {
+ for idx, server := range b.Global.BmpServers {
if server.Config.Port == 0 {
server.Config.Port = bgp.BMP_DEFAULT_PORT
}
- b.BmpServers[idx] = server
+ b.Global.BmpServers[idx] = server
}
if !v.IsSet("global.mpls-label-range.min-label") {
diff --git a/docs/sources/bmp.md b/docs/sources/bmp.md
index b8333cd4..f798ed26 100644
--- a/docs/sources/bmp.md
+++ b/docs/sources/bmp.md
@@ -12,14 +12,14 @@ Assume you finished [Getting Started](https://github.com/osrg/gobgp/blob/master/
## <a name="config"> Configuration
-Add `[bmp-servers]` section to enable BMP like below.
+Add `[bmp-servers]` section under `[global]` to enable BMP like below.
```toml
[global.config]
as = 64512
router-id = "192.168.255.1"
-[[bmp-servers]]
+[[global.bmp-servers]]
[bmp-servers.config]
address = "127.0.0.1"
port=11019
diff --git a/gobgpd/main.go b/gobgpd/main.go
index e32af2e8..ca950d29 100644
--- a/gobgpd/main.go
+++ b/gobgpd/main.go
@@ -186,7 +186,6 @@ func main() {
bgpServer.SetGlobalType(newConfig.Bgp.Global)
bgpConfig = &newConfig.Bgp
bgpServer.SetRpkiConfig(newConfig.Bgp.RpkiServers)
- bgpServer.SetBmpConfig(newConfig.Bgp.BmpServers)
added = newConfig.Bgp.Neighbors
deleted = []config.Neighbor{}
updated = []config.Neighbor{}
diff --git a/server/bmp.go b/server/bmp.go
index 3d8b50f6..90928acd 100644
--- a/server/bmp.go
+++ b/server/bmp.go
@@ -16,10 +16,12 @@
package server
import (
+ "fmt"
log "github.com/Sirupsen/logrus"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
"github.com/osrg/gobgp/table"
+ "gopkg.in/tomb.v2"
"net"
"strconv"
"time"
@@ -38,126 +40,198 @@ func (m *broadcastBMPMsg) send() {
type bmpServer struct {
conn *net.TCPConn
host string
+ typ config.BmpRouteMonitoringPolicyType
}
-type bmpClient struct {
- ch chan *broadcastBMPMsg
- connCh chan *bmpConn
+type bmpConfig struct {
+ config config.BmpServerConfig
+ del bool
+ errCh chan error
}
-func newBMPClient(servers []config.BmpServer, connCh chan *bmpConn) (*bmpClient, error) {
- b := &bmpClient{}
- if len(servers) == 0 {
- return b, nil
+type bmpWatcher struct {
+ t tomb.Tomb
+ ch chan watcherEvent
+ apiCh chan *GrpcRequest
+ newServerCh chan *bmpServer
+ endCh chan *net.TCPConn
+ connMap map[string]*bmpServer
+ ctlCh chan *bmpConfig
+}
+
+func (w *bmpWatcher) notify(t watcherEventType) chan watcherEvent {
+ if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE {
+ return w.ch
}
+ return nil
+}
- b.ch = make(chan *broadcastBMPMsg)
- b.connCh = connCh
+func (w *bmpWatcher) stop() {
+ w.t.Kill(nil)
+}
- endCh := make(chan *net.TCPConn)
+func (w *bmpWatcher) tryConnect(server *bmpServer) {
+ interval := 1
+ host := server.host
+ for {
+ log.Debug("connecting bmp server: ", host)
+ conn, err := net.Dial("tcp", host)
+ if err != nil {
+ time.Sleep(time.Duration(interval) * time.Second)
+ if interval < 30 {
+ interval *= 2
+ }
+ } else {
+ log.Info("bmp server is connected, ", host)
+ server.conn = conn.(*net.TCPConn)
+ go func() {
+ buf := make([]byte, 1)
+ for {
+ _, err := conn.Read(buf)
+ if err != nil {
+ w.endCh <- conn.(*net.TCPConn)
+ return
+ }
+ }
+ }()
+ w.newServerCh <- server
+ break
+ }
+ }
+}
- tryConnect := func(host string) {
- interval := 1
- for {
- log.Debug("connecting bmp server: ", host)
- conn, err := net.Dial("tcp", host)
- if err != nil {
- time.Sleep(time.Duration(interval) * time.Second)
- if interval < 30 {
- interval *= 2
+func (w *bmpWatcher) loop() error {
+ for {
+ select {
+ case <-w.t.Dying():
+ for _, server := range w.connMap {
+ if server.conn != nil {
+ server.conn.Close()
}
+ }
+ return nil
+ case m := <-w.ctlCh:
+ c := m.config
+ if m.del {
+ host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
+ if _, y := w.connMap[host]; !y {
+ m.errCh <- fmt.Errorf("bmp server %s doesn't exists", host)
+ continue
+ }
+ conn := w.connMap[host].conn
+ delete(w.connMap, host)
+ conn.Close()
} else {
- log.Info("bmp server is connected, ", host)
- go func() {
- buf := make([]byte, 1)
- for {
- _, err := conn.Read(buf)
+ host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
+ if _, y := w.connMap[host]; y {
+ m.errCh <- fmt.Errorf("bmp server %s already exists", host)
+ continue
+ }
+ server := &bmpServer{
+ host: host,
+ typ: c.RouteMonitoringPolicy,
+ }
+ w.connMap[host] = server
+ go w.tryConnect(server)
+ }
+ m.errCh <- nil
+ close(m.errCh)
+ case server := <-w.newServerCh:
+ i := bgp.NewBMPInitiation([]bgp.BMPTLV{})
+ buf, _ := i.Serialize()
+ _, err := server.conn.Write(buf)
+ if err != nil {
+ log.Warnf("failed to write to bmp server %s", server.host)
+ }
+ req := &GrpcRequest{
+ RequestType: REQ_BMP_NEIGHBORS,
+ ResponseCh: make(chan *GrpcResponse, 1),
+ }
+ w.apiCh <- req
+ write := func(req *GrpcRequest) {
+ for res := range req.ResponseCh {
+ for _, msg := range res.Data.([]*bgp.BMPMessage) {
+ buf, _ = msg.Serialize()
+ _, err := server.conn.Write(buf)
if err != nil {
- endCh <- conn.(*net.TCPConn)
- return
+ log.Warnf("failed to write to bmp server %s", server.host)
}
}
- }()
- connCh <- &bmpConn{
- conn: conn.(*net.TCPConn),
- host: host,
}
- break
}
- }
- }
-
- for _, c := range servers {
- b := c.Config
- go tryConnect(net.JoinHostPort(b.Address, strconv.Itoa(int(b.Port))))
- }
-
- go func() {
- connMap := make(map[string]*net.TCPConn)
- for {
- select {
- case m := <-b.ch:
- if m.conn != nil {
- i := bgp.NewBMPInitiation([]bgp.BMPTLV{})
- buf, _ := i.Serialize()
- _, err := m.conn.Write(buf)
- if err == nil {
- connMap[m.conn.RemoteAddr().String()] = m.conn
- }
+ write(req)
+ if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY {
+ req = &GrpcRequest{
+ RequestType: REQ_BMP_ADJ_IN,
+ ResponseCh: make(chan *GrpcResponse, 1),
}
-
- for host, conn := range connMap {
- if m.conn != nil && m.conn != conn {
- continue
- }
-
- for _, msg := range m.msgList {
- if msg.Header.Type == bgp.BMP_MSG_ROUTE_MONITORING {
- c := func() *config.BmpServerConfig {
- for _, c := range servers {
- b := &c.Config
- if host == net.JoinHostPort(b.Address, strconv.Itoa(int(b.Port))) {
- return b
- }
- }
- return nil
- }()
- if c == nil {
- log.Fatal(host)
- }
- ph := msg.PeerHeader
- switch c.RouteMonitoringPolicy {
- case config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY:
- if ph.IsPostPolicy != false {
- continue
- }
- case config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY:
- if ph.IsPostPolicy != true {
- continue
- }
+ w.apiCh <- req
+ write(req)
+ }
+ if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY {
+ req = &GrpcRequest{
+ RequestType: REQ_BMP_GLOBAL,
+ ResponseCh: make(chan *GrpcResponse, 1),
+ }
+ w.apiCh <- req
+ write(req)
+ }
+ case ev := <-w.ch:
+ switch msg := ev.(type) {
+ case *watcherEventUpdateMsg:
+ info := &table.PeerInfo{
+ Address: msg.peerAddress,
+ AS: msg.peerAS,
+ ID: msg.peerID,
+ }
+ buf, _ := bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload).Serialize()
+ for _, server := range w.connMap {
+ if server.conn != nil {
+ send := server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY && !msg.postPolicy
+ send = send || (server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY && msg.postPolicy)
+ if send {
+ _, err := server.conn.Write(buf)
+ if err != nil {
+ log.Warnf("failed to write to bmp server %s", server.host)
}
-
}
- b, _ := msg.Serialize()
- if _, err := conn.Write(b); err != nil {
- break
+ }
+ }
+ case *watcherEventStateChangedMsg:
+ var bmpmsg *bgp.BMPMessage
+ info := &table.PeerInfo{
+ Address: msg.peerAddress,
+ AS: msg.peerAS,
+ ID: msg.peerID,
+ }
+ if msg.state == bgp.BGP_FSM_ESTABLISHED {
+ bmpmsg = bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())
+ } else {
+ bmpmsg = bmpPeerDown(bgp.BMP_PEER_DOWN_REASON_UNKNOWN, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())
+ }
+ buf, _ := bmpmsg.Serialize()
+ for _, server := range w.connMap {
+ if server.conn != nil {
+ _, err := server.conn.Write(buf)
+ if err != nil {
+ log.Warnf("failed to write to bmp server %s", server.host)
}
}
}
- case conn := <-endCh:
- host := conn.RemoteAddr().String()
- log.Debugf("bmp connection to %s killed", host)
- delete(connMap, host)
- go tryConnect(host)
+ default:
+ log.Warnf("unknown watcher event")
}
+ case conn := <-w.endCh:
+ host := conn.RemoteAddr().String()
+ log.Debugf("bmp connection to %s killed", host)
+ w.connMap[host].conn = nil
+ go w.tryConnect(w.connMap[host])
}
- }()
-
- return b, nil
+ }
}
-func (c *bmpClient) send() chan *broadcastBMPMsg {
- return c.ch
+func (w *bmpWatcher) restart(string) error {
+ return nil
}
func bmpPeerUp(laddr string, lport, rport uint16, sent, recv *bgp.BGPMessage, t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bgp.BMPMessage {
@@ -177,3 +251,53 @@ func bmpPeerRoute(t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timest
body.BGPUpdatePayload = payload
return m
}
+
+func (w *bmpWatcher) addServer(c config.BmpServerConfig) error {
+ ch := make(chan error)
+ w.ctlCh <- &bmpConfig{
+ config: c,
+ errCh: ch,
+ }
+ return <-ch
+}
+
+func (w *bmpWatcher) watchingEventTypes() []watcherEventType {
+ state := false
+ pre := false
+ post := false
+ for _, server := range w.connMap {
+ if server.conn != nil {
+ state = true
+ if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY {
+ pre = true
+ }
+ if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY {
+ post = true
+ }
+ }
+ }
+ types := make([]watcherEventType, 0, 3)
+ if state {
+ types = append(types, WATCHER_EVENT_STATE_CHANGE)
+ }
+ if pre {
+ types = append(types, WATCHER_EVENT_UPDATE_MSG)
+ }
+ if post {
+ types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG)
+ }
+ return types
+}
+
+func newBmpWatcher(grpcCh chan *GrpcRequest) (*bmpWatcher, error) {
+ w := &bmpWatcher{
+ ch: make(chan watcherEvent),
+ apiCh: grpcCh,
+ newServerCh: make(chan *bmpServer),
+ endCh: make(chan *net.TCPConn),
+ connMap: make(map[string]*bmpServer),
+ ctlCh: make(chan *bmpConfig),
+ }
+ w.t.Go(w.loop)
+ return w, nil
+}
diff --git a/server/grpc_server.go b/server/grpc_server.go
index 63d6e1ad..554c62a9 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -67,6 +67,8 @@ const (
REQ_POLICY_ASSIGNMENT
REQ_MOD_POLICY_ASSIGNMENT
REQ_BMP_NEIGHBORS
+ REQ_BMP_GLOBAL
+ REQ_BMP_ADJ_IN
)
type Server struct {
diff --git a/server/server.go b/server/server.go
index 8de474a2..cd86da38 100644
--- a/server/server.go
+++ b/server/server.go
@@ -74,6 +74,19 @@ func (m *broadcastBGPMsg) send() {
m.ch <- m
}
+type Watchers map[watcherType]watcher
+
+func (ws Watchers) watching(typ watcherEventType) bool {
+ for _, w := range ws {
+ for _, ev := range w.watchingEventTypes() {
+ if ev == typ {
+ return true
+ }
+ }
+ }
+ return false
+}
+
type BgpServer struct {
bgpConfig config.Bgp
globalTypeCh chan config.Global
@@ -83,7 +96,6 @@ type BgpServer struct {
fsmincomingCh chan *FsmMsg
fsmStateCh chan *FsmMsg
rpkiConfigCh chan []config.RpkiServer
- bmpConfigCh chan []config.BmpServer
GrpcReqCh chan *GrpcRequest
policyUpdateCh chan config.RoutingPolicy
@@ -95,10 +107,8 @@ type BgpServer struct {
globalRib *table.TableManager
zclient *zebra.Client
roaManager *roaManager
- bmpClient *bmpClient
- bmpConnCh chan *bmpConn
shutdown bool
- watchers map[watcherType]watcher
+ watchers Watchers
}
func NewBgpServer() *BgpServer {
@@ -108,12 +118,10 @@ func NewBgpServer() *BgpServer {
b.deletedPeerCh = make(chan config.Neighbor)
b.updatedPeerCh = make(chan config.Neighbor)
b.rpkiConfigCh = make(chan []config.RpkiServer)
- b.bmpConfigCh = make(chan []config.BmpServer)
- b.bmpConnCh = make(chan *bmpConn)
b.GrpcReqCh = make(chan *GrpcRequest, 1)
b.policyUpdateCh = make(chan config.RoutingPolicy)
b.neighborMap = make(map[string]*Peer)
- b.watchers = make(map[watcherType]watcher)
+ b.watchers = Watchers(make(map[watcherType]watcher))
b.roaManager, _ = newROAManager(0, nil)
b.policy = table.NewRoutingPolicy()
return &b
@@ -143,6 +151,18 @@ func listenAndAccept(proto string, port uint32, ch chan *net.TCPConn) (*net.TCPL
return l, nil
}
+func (server *BgpServer) notify2watchers(typ watcherEventType, ev watcherEvent) error {
+ for _, watcher := range server.watchers {
+ if ch := watcher.notify(typ); ch != nil {
+ server.broadcastMsgs = append(server.broadcastMsgs, &broadcastWatcherMsg{
+ ch: ch,
+ event: ev,
+ })
+ }
+ }
+ return nil
+}
+
func (server *BgpServer) Serve() {
var g config.Global
for {
@@ -166,7 +186,6 @@ func (server *BgpServer) Serve() {
}
}
- server.bmpClient, _ = newBMPClient(nil, server.bmpConnCh)
server.roaManager, _ = newROAManager(g.Config.As, nil)
if g.Mrt.FileName != "" {
@@ -178,6 +197,20 @@ func (server *BgpServer) Serve() {
}
}
+ if len(g.BmpServers) > 0 {
+ w, err := newBmpWatcher(server.GrpcReqCh)
+ if err != nil {
+ log.Warn(err)
+ } else {
+ for _, server := range g.BmpServers {
+ if err := w.addServer(server.Config); err != nil {
+ log.Warn(err)
+ }
+ }
+ server.watchers[WATCHER_BMP] = w
+ }
+ }
+
if g.Zebra.Enabled == true {
if g.Zebra.Url == "" {
g.Zebra.Url = "unix:/var/run/quagga/zserv.api"
@@ -343,34 +376,6 @@ func (server *BgpServer) Serve() {
select {
case c := <-server.rpkiConfigCh:
server.roaManager, _ = newROAManager(server.bgpConfig.Global.Config.As, c)
- case c := <-server.bmpConfigCh:
- server.bmpClient, _ = newBMPClient(c, server.bmpConnCh)
- case c := <-server.bmpConnCh:
- bmpMsgList := []*bgp.BMPMessage{}
- for _, targetPeer := range server.neighborMap {
- if targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
- continue
- }
- for _, p := range targetPeer.adjRibIn.PathList(targetPeer.configuredRFlist(), false) {
- // avoid to merge for timestamp
- u := table.CreateUpdateMsgFromPaths([]*table.Path{p})
- buf, _ := u[0].Serialize()
- bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, targetPeer.fsm.peerInfo, p.GetTimestamp().Unix(), buf))
- }
- }
-
- for _, p := range server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist()) {
- u := table.CreateUpdateMsgFromPaths([]*table.Path{p})
- buf, _ := u[0].Serialize()
- bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, p.GetSource(), p.GetTimestamp().Unix(), buf))
- }
-
- m := &broadcastBMPMsg{
- ch: server.bmpClient.send(),
- conn: c.conn,
- msgList: bmpMsgList,
- }
- server.broadcastMsgs = append(server.broadcastMsgs, m)
case rmsg := <-server.roaManager.recieveROA():
server.roaManager.handleROAEvent(rmsg)
case zmsg := <-zapiMsgCh:
@@ -408,7 +413,7 @@ func (server *BgpServer) Serve() {
}
server.neighborMap[addr] = peer
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
- server.broadcastPeerState(peer)
+ server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE)
case config := <-server.deletedPeerCh:
addr := config.Config.NeighborAddress
SetTcpMD5SigSockopts(listener(addr), addr, "")
@@ -682,7 +687,7 @@ func (server *BgpServer) broadcastBests(bests []*table.Path) {
}
}
-func (server *BgpServer) broadcastPeerState(peer *Peer) {
+func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
result := &GrpcResponse{
Data: peer.ToApiStruct(),
}
@@ -707,6 +712,29 @@ func (server *BgpServer) broadcastPeerState(peer *Peer) {
remainReqs = append(remainReqs, req)
}
server.broadcastReqs = remainReqs
+ newState := peer.fsm.state
+ if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
+ if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) {
+ _, rport := peer.fsm.RemoteHostPort()
+ laddr, lport := peer.fsm.LocalHostPort()
+ sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
+ recvOpen := peer.fsm.recvOpen
+ ev := &watcherEventStateChangedMsg{
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(laddr),
+ peerPort: rport,
+ localPort: lport,
+ peerID: peer.fsm.peerInfo.ID,
+ sentOpen: sentOpen,
+ recvOpen: recvOpen,
+ state: newState,
+ timestamp: time.Now(),
+ }
+ server.notify2watchers(WATCHER_EVENT_STATE_CHANGE, ev)
+ }
+ }
}
func (server *BgpServer) RSimportPaths(peer *Peer, pathList []*table.Path) []*table.Path {
@@ -817,13 +845,6 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
peer.fsm.StateChange(nextState)
if oldState == bgp.BGP_FSM_ESTABLISHED {
- if ch := server.bmpClient.send(); ch != nil {
- m := &broadcastBMPMsg{
- ch: ch,
- msgList: []*bgp.BMPMessage{bmpPeerDown(bgp.BMP_PEER_DOWN_REASON_UNKNOWN, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, peer.conf.Timers.State.Downtime)},
- }
- server.broadcastMsgs = append(server.broadcastMsgs, m)
- }
t := time.Now()
if t.Sub(time.Unix(peer.conf.Timers.State.Uptime, 0)) < FLOP_THRESHOLD {
peer.conf.State.Flops++
@@ -838,16 +859,8 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
peer.outgoing = make(chan *bgp.BGPMessage, 128)
if nextState == bgp.BGP_FSM_ESTABLISHED {
// update for export policy
- laddr, lport := peer.fsm.LocalHostPort()
+ laddr, _ := peer.fsm.LocalHostPort()
peer.conf.Transport.Config.LocalAddress = laddr
- if ch := server.bmpClient.send(); ch != nil {
- _, rport := peer.fsm.RemoteHostPort()
- m := &broadcastBMPMsg{
- ch: ch,
- msgList: []*bgp.BMPMessage{bmpPeerUp(laddr, lport, rport, buildopen(peer.fsm.gConf, peer.fsm.pConf), peer.fsm.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, peer.conf.Timers.State.Uptime)},
- }
- server.broadcastMsgs = append(server.broadcastMsgs, m)
- }
pathList, _ := peer.getBestFromLocal(peer.configuredRFlist())
if len(pathList) > 0 {
peer.adjRibOut.Update(pathList)
@@ -874,55 +887,36 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
peer.conf.Timers.State = config.TimersState{}
}
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
- server.broadcastPeerState(peer)
+ server.broadcastPeerState(peer, oldState)
case FSM_MSG_BGP_MESSAGE:
switch m := e.MsgData.(type) {
case *bgp.MessageError:
msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)}))
case *bgp.BGPMessage:
- if m.Header.Type == bgp.BGP_MSG_UPDATE {
- listener := make(map[watcher]chan watcherEvent)
- for _, watcher := range server.watchers {
- if ch := watcher.notify(WATCHER_EVENT_UPDATE_MSG); ch != nil {
- listener[watcher] = ch
- }
- }
- if len(listener) > 0 {
- _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
- l, _ := peer.fsm.LocalHostPort()
- ev := &watcherEventUpdateMsg{
- message: m,
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(l),
- fourBytesAs: y,
- timestamp: e.timestamp,
- payload: e.payload,
- }
- for _, ch := range listener {
- bm := &broadcastWatcherMsg{
- ch: ch,
- event: ev,
- }
- server.broadcastMsgs = append(server.broadcastMsgs, bm)
- }
- }
-
- if ch := server.bmpClient.send(); ch != nil {
- bm := &broadcastBMPMsg{
- ch: ch,
- msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, e.timestamp.Unix(), e.payload)},
- }
- server.broadcastMsgs = append(server.broadcastMsgs, bm)
+ if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) {
+ _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
+ l, _ := peer.fsm.LocalHostPort()
+ ev := &watcherEventUpdateMsg{
+ message: m,
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(l),
+ peerID: peer.fsm.peerInfo.ID,
+ fourBytesAs: y,
+ timestamp: e.timestamp,
+ payload: e.payload,
+ postPolicy: false,
}
+ server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev)
}
pathList, msgList := peer.handleBGPmessage(e)
if len(msgList) > 0 {
msgs = append(msgs, newSenderMsg(peer, msgList))
}
+
if len(pathList) > 0 {
isMonitor := func() bool {
if len(server.broadcastReqs) > 0 {
@@ -936,15 +930,23 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
}
m, altered := server.propagateUpdate(peer, pathList)
msgs = append(msgs, m...)
-
- if ch := server.bmpClient.send(); ch != nil {
+ if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) {
+ _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
+ l, _ := peer.fsm.LocalHostPort()
+ ev := &watcherEventUpdateMsg{
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(l),
+ peerID: peer.fsm.peerInfo.ID,
+ fourBytesAs: y,
+ timestamp: e.timestamp,
+ postPolicy: true,
+ }
for _, u := range table.CreateUpdateMsgFromPaths(altered) {
payload, _ := u.Serialize()
- bm := &broadcastBMPMsg{
- ch: ch,
- msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, peer.fsm.peerInfo, e.timestamp.Unix(), payload)},
- }
- server.broadcastMsgs = append(server.broadcastMsgs, bm)
+ ev.payload = payload
+ server.notify2watchers(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev)
}
}
}
@@ -969,10 +971,6 @@ func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) {
server.rpkiConfigCh <- c
}
-func (server *BgpServer) SetBmpConfig(c []config.BmpServer) {
- server.bmpConfigCh <- c
-}
-
func (server *BgpServer) PeerAdd(peer config.Neighbor) {
server.addedPeerCh <- peer
}
@@ -1635,6 +1633,18 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
Data: d,
}
close(grpcReq.ResponseCh)
+ case REQ_BMP_GLOBAL:
+ paths := server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist())
+ bmpmsgs := make([]*bgp.BMPMessage, 0, len(paths))
+ for _, path := range paths {
+ msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
+ buf, _ := msgs[0].Serialize()
+ bmpmsgs = append(bmpmsgs, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, path.GetSource(), path.GetTimestamp().Unix(), buf))
+ }
+ grpcReq.ResponseCh <- &GrpcResponse{
+ Data: bmpmsgs,
+ }
+ close(grpcReq.ResponseCh)
case REQ_MOD_PATH:
pathList := server.handleModPathRequest(grpcReq)
if len(pathList) > 0 {
@@ -1670,7 +1680,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
info := peer.fsm.peerInfo
timestamp := peer.conf.Timers.State.Uptime
- msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp)
+ msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.fsm.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp)
msgs = append(msgs, msg)
}
grpcReq.ResponseCh <- &GrpcResponse{
@@ -1750,6 +1760,22 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
Data: d,
}
close(grpcReq.ResponseCh)
+ case REQ_BMP_ADJ_IN:
+ bmpmsgs := make([]*bgp.BMPMessage, 0)
+ for _, peer := range server.neighborMap {
+ if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ continue
+ }
+ for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) {
+ msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
+ buf, _ := msgs[0].Serialize()
+ bmpmsgs = append(bmpmsgs, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, path.GetTimestamp().Unix(), buf))
+ }
+ }
+ grpcReq.ResponseCh <- &GrpcResponse{
+ Data: bmpmsgs,
+ }
+ close(grpcReq.ResponseCh)
case REQ_NEIGHBOR_SHUTDOWN:
peers, err := reqToPeers(grpcReq)
if err != nil {
@@ -2129,7 +2155,7 @@ func (server *BgpServer) handleGrpcModNeighbor(grpcReq *GrpcRequest) (sMsgs []*S
}
server.neighborMap[addr] = peer
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
- server.broadcastPeerState(peer)
+ server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE)
case api.Operation_DEL:
SetTcpMD5SigSockopts(listener(net.ParseIP(addr)), addr, "")
log.Info("Delete a peer configuration for ", addr)
diff --git a/server/watcher.go b/server/watcher.go
index d186e9cf..6cf2abc8 100644
--- a/server/watcher.go
+++ b/server/watcher.go
@@ -51,6 +51,7 @@ const (
WATCHER_EVENT_UPDATE_MSG
WATCHER_EVENT_STATE_CHANGE
WATCHER_EVENT_BESTPATH_CHANGE
+ WATCHER_EVENT_POST_POLICY_UPDATE_MSG
)
type watcherEvent interface {
@@ -62,15 +63,32 @@ type watcherEventUpdateMsg struct {
localAS uint32
peerAddress net.IP
localAddress net.IP
+ peerID net.IP
fourBytesAs bool
timestamp time.Time
payload []byte
+ postPolicy bool
+}
+
+type watcherEventStateChangedMsg struct {
+ peerAS uint32
+ localAS uint32
+ peerAddress net.IP
+ localAddress net.IP
+ peerPort uint16
+ localPort uint16
+ peerID net.IP
+ sentOpen *bgp.BGPMessage
+ recvOpen *bgp.BGPMessage
+ state bgp.FSMState
+ timestamp time.Time
}
type watcher interface {
notify(watcherEventType) chan watcherEvent
restart(string) error
stop()
+ watchingEventTypes() []watcherEventType
}
type mrtWatcherOp struct {
@@ -173,6 +191,10 @@ func (w *mrtWatcher) loop() error {
}
}
+func (w *mrtWatcher) watchingEventTypes() []watcherEventType {
+ return []watcherEventType{WATCHER_EVENT_UPDATE_MSG}
+}
+
func mrtFileOpen(filename string) (*os.File, error) {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
diff --git a/tools/pyang_plugins/gobgp.yang b/tools/pyang_plugins/gobgp.yang
index ff1099d5..1cf34800 100644
--- a/tools/pyang_plugins/gobgp.yang
+++ b/tools/pyang_plugins/gobgp.yang
@@ -598,7 +598,7 @@ module gobgp {
uses gobgp-rpki-servers;
}
- augment "/bgp:bgp" {
+ augment "/bgp:bgp/bgp:global" {
description "additional bmp configuration";
uses gobgp-bmp-servers;
}