summaryrefslogtreecommitdiffhomepage
path: root/server/server.go
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2015-12-30 20:50:27 +0900
committerISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2016-01-09 22:51:23 +0900
commitaff1c244ad0d88a814f2ce573800717ccd08b450 (patch)
tree711b432961c52c76a70cd76a980d00ad956c0d5b /server/server.go
parentf4c07da88154dd4b21012576a4ceb205715f4b3e (diff)
bmp: use watcher infra to implement bmp feature
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r--server/server.go232
1 files changed, 129 insertions, 103 deletions
diff --git a/server/server.go b/server/server.go
index 8de474a2..cd86da38 100644
--- a/server/server.go
+++ b/server/server.go
@@ -74,6 +74,19 @@ func (m *broadcastBGPMsg) send() {
m.ch <- m
}
+type Watchers map[watcherType]watcher
+
+func (ws Watchers) watching(typ watcherEventType) bool {
+ for _, w := range ws {
+ for _, ev := range w.watchingEventTypes() {
+ if ev == typ {
+ return true
+ }
+ }
+ }
+ return false
+}
+
type BgpServer struct {
bgpConfig config.Bgp
globalTypeCh chan config.Global
@@ -83,7 +96,6 @@ type BgpServer struct {
fsmincomingCh chan *FsmMsg
fsmStateCh chan *FsmMsg
rpkiConfigCh chan []config.RpkiServer
- bmpConfigCh chan []config.BmpServer
GrpcReqCh chan *GrpcRequest
policyUpdateCh chan config.RoutingPolicy
@@ -95,10 +107,8 @@ type BgpServer struct {
globalRib *table.TableManager
zclient *zebra.Client
roaManager *roaManager
- bmpClient *bmpClient
- bmpConnCh chan *bmpConn
shutdown bool
- watchers map[watcherType]watcher
+ watchers Watchers
}
func NewBgpServer() *BgpServer {
@@ -108,12 +118,10 @@ func NewBgpServer() *BgpServer {
b.deletedPeerCh = make(chan config.Neighbor)
b.updatedPeerCh = make(chan config.Neighbor)
b.rpkiConfigCh = make(chan []config.RpkiServer)
- b.bmpConfigCh = make(chan []config.BmpServer)
- b.bmpConnCh = make(chan *bmpConn)
b.GrpcReqCh = make(chan *GrpcRequest, 1)
b.policyUpdateCh = make(chan config.RoutingPolicy)
b.neighborMap = make(map[string]*Peer)
- b.watchers = make(map[watcherType]watcher)
+ b.watchers = Watchers(make(map[watcherType]watcher))
b.roaManager, _ = newROAManager(0, nil)
b.policy = table.NewRoutingPolicy()
return &b
@@ -143,6 +151,18 @@ func listenAndAccept(proto string, port uint32, ch chan *net.TCPConn) (*net.TCPL
return l, nil
}
+func (server *BgpServer) notify2watchers(typ watcherEventType, ev watcherEvent) error {
+ for _, watcher := range server.watchers {
+ if ch := watcher.notify(typ); ch != nil {
+ server.broadcastMsgs = append(server.broadcastMsgs, &broadcastWatcherMsg{
+ ch: ch,
+ event: ev,
+ })
+ }
+ }
+ return nil
+}
+
func (server *BgpServer) Serve() {
var g config.Global
for {
@@ -166,7 +186,6 @@ func (server *BgpServer) Serve() {
}
}
- server.bmpClient, _ = newBMPClient(nil, server.bmpConnCh)
server.roaManager, _ = newROAManager(g.Config.As, nil)
if g.Mrt.FileName != "" {
@@ -178,6 +197,20 @@ func (server *BgpServer) Serve() {
}
}
+ if len(g.BmpServers) > 0 {
+ w, err := newBmpWatcher(server.GrpcReqCh)
+ if err != nil {
+ log.Warn(err)
+ } else {
+ for _, server := range g.BmpServers {
+ if err := w.addServer(server.Config); err != nil {
+ log.Warn(err)
+ }
+ }
+ server.watchers[WATCHER_BMP] = w
+ }
+ }
+
if g.Zebra.Enabled == true {
if g.Zebra.Url == "" {
g.Zebra.Url = "unix:/var/run/quagga/zserv.api"
@@ -343,34 +376,6 @@ func (server *BgpServer) Serve() {
select {
case c := <-server.rpkiConfigCh:
server.roaManager, _ = newROAManager(server.bgpConfig.Global.Config.As, c)
- case c := <-server.bmpConfigCh:
- server.bmpClient, _ = newBMPClient(c, server.bmpConnCh)
- case c := <-server.bmpConnCh:
- bmpMsgList := []*bgp.BMPMessage{}
- for _, targetPeer := range server.neighborMap {
- if targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
- continue
- }
- for _, p := range targetPeer.adjRibIn.PathList(targetPeer.configuredRFlist(), false) {
- // avoid to merge for timestamp
- u := table.CreateUpdateMsgFromPaths([]*table.Path{p})
- buf, _ := u[0].Serialize()
- bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, targetPeer.fsm.peerInfo, p.GetTimestamp().Unix(), buf))
- }
- }
-
- for _, p := range server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist()) {
- u := table.CreateUpdateMsgFromPaths([]*table.Path{p})
- buf, _ := u[0].Serialize()
- bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, p.GetSource(), p.GetTimestamp().Unix(), buf))
- }
-
- m := &broadcastBMPMsg{
- ch: server.bmpClient.send(),
- conn: c.conn,
- msgList: bmpMsgList,
- }
- server.broadcastMsgs = append(server.broadcastMsgs, m)
case rmsg := <-server.roaManager.recieveROA():
server.roaManager.handleROAEvent(rmsg)
case zmsg := <-zapiMsgCh:
@@ -408,7 +413,7 @@ func (server *BgpServer) Serve() {
}
server.neighborMap[addr] = peer
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
- server.broadcastPeerState(peer)
+ server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE)
case config := <-server.deletedPeerCh:
addr := config.Config.NeighborAddress
SetTcpMD5SigSockopts(listener(addr), addr, "")
@@ -682,7 +687,7 @@ func (server *BgpServer) broadcastBests(bests []*table.Path) {
}
}
-func (server *BgpServer) broadcastPeerState(peer *Peer) {
+func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
result := &GrpcResponse{
Data: peer.ToApiStruct(),
}
@@ -707,6 +712,29 @@ func (server *BgpServer) broadcastPeerState(peer *Peer) {
remainReqs = append(remainReqs, req)
}
server.broadcastReqs = remainReqs
+ newState := peer.fsm.state
+ if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
+ if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) {
+ _, rport := peer.fsm.RemoteHostPort()
+ laddr, lport := peer.fsm.LocalHostPort()
+ sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
+ recvOpen := peer.fsm.recvOpen
+ ev := &watcherEventStateChangedMsg{
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(laddr),
+ peerPort: rport,
+ localPort: lport,
+ peerID: peer.fsm.peerInfo.ID,
+ sentOpen: sentOpen,
+ recvOpen: recvOpen,
+ state: newState,
+ timestamp: time.Now(),
+ }
+ server.notify2watchers(WATCHER_EVENT_STATE_CHANGE, ev)
+ }
+ }
}
func (server *BgpServer) RSimportPaths(peer *Peer, pathList []*table.Path) []*table.Path {
@@ -817,13 +845,6 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
peer.fsm.StateChange(nextState)
if oldState == bgp.BGP_FSM_ESTABLISHED {
- if ch := server.bmpClient.send(); ch != nil {
- m := &broadcastBMPMsg{
- ch: ch,
- msgList: []*bgp.BMPMessage{bmpPeerDown(bgp.BMP_PEER_DOWN_REASON_UNKNOWN, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, peer.conf.Timers.State.Downtime)},
- }
- server.broadcastMsgs = append(server.broadcastMsgs, m)
- }
t := time.Now()
if t.Sub(time.Unix(peer.conf.Timers.State.Uptime, 0)) < FLOP_THRESHOLD {
peer.conf.State.Flops++
@@ -838,16 +859,8 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
peer.outgoing = make(chan *bgp.BGPMessage, 128)
if nextState == bgp.BGP_FSM_ESTABLISHED {
// update for export policy
- laddr, lport := peer.fsm.LocalHostPort()
+ laddr, _ := peer.fsm.LocalHostPort()
peer.conf.Transport.Config.LocalAddress = laddr
- if ch := server.bmpClient.send(); ch != nil {
- _, rport := peer.fsm.RemoteHostPort()
- m := &broadcastBMPMsg{
- ch: ch,
- msgList: []*bgp.BMPMessage{bmpPeerUp(laddr, lport, rport, buildopen(peer.fsm.gConf, peer.fsm.pConf), peer.fsm.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, peer.conf.Timers.State.Uptime)},
- }
- server.broadcastMsgs = append(server.broadcastMsgs, m)
- }
pathList, _ := peer.getBestFromLocal(peer.configuredRFlist())
if len(pathList) > 0 {
peer.adjRibOut.Update(pathList)
@@ -874,55 +887,36 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
peer.conf.Timers.State = config.TimersState{}
}
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
- server.broadcastPeerState(peer)
+ server.broadcastPeerState(peer, oldState)
case FSM_MSG_BGP_MESSAGE:
switch m := e.MsgData.(type) {
case *bgp.MessageError:
msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)}))
case *bgp.BGPMessage:
- if m.Header.Type == bgp.BGP_MSG_UPDATE {
- listener := make(map[watcher]chan watcherEvent)
- for _, watcher := range server.watchers {
- if ch := watcher.notify(WATCHER_EVENT_UPDATE_MSG); ch != nil {
- listener[watcher] = ch
- }
- }
- if len(listener) > 0 {
- _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
- l, _ := peer.fsm.LocalHostPort()
- ev := &watcherEventUpdateMsg{
- message: m,
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(l),
- fourBytesAs: y,
- timestamp: e.timestamp,
- payload: e.payload,
- }
- for _, ch := range listener {
- bm := &broadcastWatcherMsg{
- ch: ch,
- event: ev,
- }
- server.broadcastMsgs = append(server.broadcastMsgs, bm)
- }
- }
-
- if ch := server.bmpClient.send(); ch != nil {
- bm := &broadcastBMPMsg{
- ch: ch,
- msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, e.timestamp.Unix(), e.payload)},
- }
- server.broadcastMsgs = append(server.broadcastMsgs, bm)
+ if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) {
+ _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
+ l, _ := peer.fsm.LocalHostPort()
+ ev := &watcherEventUpdateMsg{
+ message: m,
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(l),
+ peerID: peer.fsm.peerInfo.ID,
+ fourBytesAs: y,
+ timestamp: e.timestamp,
+ payload: e.payload,
+ postPolicy: false,
}
+ server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev)
}
pathList, msgList := peer.handleBGPmessage(e)
if len(msgList) > 0 {
msgs = append(msgs, newSenderMsg(peer, msgList))
}
+
if len(pathList) > 0 {
isMonitor := func() bool {
if len(server.broadcastReqs) > 0 {
@@ -936,15 +930,23 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
}
m, altered := server.propagateUpdate(peer, pathList)
msgs = append(msgs, m...)
-
- if ch := server.bmpClient.send(); ch != nil {
+ if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) {
+ _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
+ l, _ := peer.fsm.LocalHostPort()
+ ev := &watcherEventUpdateMsg{
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(l),
+ peerID: peer.fsm.peerInfo.ID,
+ fourBytesAs: y,
+ timestamp: e.timestamp,
+ postPolicy: true,
+ }
for _, u := range table.CreateUpdateMsgFromPaths(altered) {
payload, _ := u.Serialize()
- bm := &broadcastBMPMsg{
- ch: ch,
- msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, peer.fsm.peerInfo, e.timestamp.Unix(), payload)},
- }
- server.broadcastMsgs = append(server.broadcastMsgs, bm)
+ ev.payload = payload
+ server.notify2watchers(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev)
}
}
}
@@ -969,10 +971,6 @@ func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) {
server.rpkiConfigCh <- c
}
-func (server *BgpServer) SetBmpConfig(c []config.BmpServer) {
- server.bmpConfigCh <- c
-}
-
func (server *BgpServer) PeerAdd(peer config.Neighbor) {
server.addedPeerCh <- peer
}
@@ -1635,6 +1633,18 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
Data: d,
}
close(grpcReq.ResponseCh)
+ case REQ_BMP_GLOBAL:
+ paths := server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist())
+ bmpmsgs := make([]*bgp.BMPMessage, 0, len(paths))
+ for _, path := range paths {
+ msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
+ buf, _ := msgs[0].Serialize()
+ bmpmsgs = append(bmpmsgs, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, true, 0, path.GetSource(), path.GetTimestamp().Unix(), buf))
+ }
+ grpcReq.ResponseCh <- &GrpcResponse{
+ Data: bmpmsgs,
+ }
+ close(grpcReq.ResponseCh)
case REQ_MOD_PATH:
pathList := server.handleModPathRequest(grpcReq)
if len(pathList) > 0 {
@@ -1670,7 +1680,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
info := peer.fsm.peerInfo
timestamp := peer.conf.Timers.State.Uptime
- msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp)
+ msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.fsm.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp)
msgs = append(msgs, msg)
}
grpcReq.ResponseCh <- &GrpcResponse{
@@ -1750,6 +1760,22 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
Data: d,
}
close(grpcReq.ResponseCh)
+ case REQ_BMP_ADJ_IN:
+ bmpmsgs := make([]*bgp.BMPMessage, 0)
+ for _, peer := range server.neighborMap {
+ if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ continue
+ }
+ for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) {
+ msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
+ buf, _ := msgs[0].Serialize()
+ bmpmsgs = append(bmpmsgs, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, path.GetTimestamp().Unix(), buf))
+ }
+ }
+ grpcReq.ResponseCh <- &GrpcResponse{
+ Data: bmpmsgs,
+ }
+ close(grpcReq.ResponseCh)
case REQ_NEIGHBOR_SHUTDOWN:
peers, err := reqToPeers(grpcReq)
if err != nil {
@@ -2129,7 +2155,7 @@ func (server *BgpServer) handleGrpcModNeighbor(grpcReq *GrpcRequest) (sMsgs []*S
}
server.neighborMap[addr] = peer
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
- server.broadcastPeerState(peer)
+ server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE)
case api.Operation_DEL:
SetTcpMD5SigSockopts(listener(net.ParseIP(addr)), addr, "")
log.Info("Delete a peer configuration for ", addr)