diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 22:22:08 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 17:28:38 +0900 |
commit | f097981a33d107a6d31c4da0f25b90957f3d25fe (patch) | |
tree | 25ebf98379bb5eec7131500bb31221698107a04a /server/server.go | |
parent | 3343bcec3565da1aa40fa1aa1ecc6feee7e902f6 (diff) |
bmp uses the new Watch API
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r-- | server/server.go | 259 |
1 files changed, 116 insertions, 143 deletions
diff --git a/server/server.go b/server/server.go index 4cd71ad6..0aeead21 100644 --- a/server/server.go +++ b/server/server.go @@ -30,7 +30,6 @@ import ( api "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet/bgp" - "github.com/osrg/gobgp/packet/bmp" "github.com/osrg/gobgp/table" "github.com/satori/go.uuid" ) @@ -107,11 +106,12 @@ type BgpServer struct { watchers *watcherManager watcherMap map[watchType][]*Watcher zclient *zebraClient + bmpManager *bmpClientManager } func NewBgpServer() *BgpServer { roaManager, _ := NewROAManager(0) - return &BgpServer{ + s := &BgpServer{ GrpcReqCh: make(chan *GrpcRequest, 1), neighborMap: make(map[string]*Peer), policy: table.NewRoutingPolicy(), @@ -120,6 +120,8 @@ func NewBgpServer() *BgpServer { mgmtCh: make(chan func(), 1), watcherMap: make(map[watchType][]*Watcher), } + s.bmpManager = newBmpClientManager(s) + return s } func (server *BgpServer) Listeners(addr string) []*net.TCPListener { @@ -383,27 +385,31 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil } } +func createWatcherEventStateChange(peer *Peer) *watcherEventStateChangedMsg { + _, rport := peer.fsm.RemoteHostPort() + laddr, lport := peer.fsm.LocalHostPort() + sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) + recvOpen := peer.fsm.recvOpen + return &watcherEventStateChangedMsg{ + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(laddr), + peerPort: rport, + localPort: lport, + peerID: peer.fsm.peerInfo.ID, + sentOpen: sentOpen, + recvOpen: recvOpen, + state: peer.fsm.state, + adminState: peer.fsm.adminState, + timestamp: time.Now(), + } +} + func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { newState := peer.fsm.state if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { - _, rport := peer.fsm.RemoteHostPort() - laddr, lport := peer.fsm.LocalHostPort() - sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) - recvOpen := peer.fsm.recvOpen - ev := &watcherEventStateChangedMsg{ - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(laddr), - peerPort: rport, - localPort: lport, - peerID: peer.fsm.peerInfo.ID, - sentOpen: sentOpen, - recvOpen: recvOpen, - state: newState, - adminState: peer.fsm.adminState, - timestamp: time.Now(), - } + ev := createWatcherEventStateChange(peer) if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) { server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev) } @@ -892,19 +898,28 @@ func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) error { return nil } -func (server *BgpServer) SetBmpConfig(c []config.BmpServer) error { - for _, s := range c { - ch := make(chan *GrpcResponse) - server.GrpcReqCh <- &GrpcRequest{ - RequestType: REQ_ADD_BMP, - Data: &s.Config, - ResponseCh: ch, - } - if err := (<-ch).Err(); err != nil { - return err - } +func (s *BgpServer) AddBmp(c *config.BmpServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.bmpManager.addServer(c) } - return nil + return err +} + +func (s *BgpServer) DeleteBmp(c *config.BmpServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.bmpManager.deleteServer(c) + } + return err } func (server *BgpServer) SetMrtConfig(c []config.Mrt) error { @@ -1512,37 +1527,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { Data: dsts, } close(grpcReq.ResponseCh) - case REQ_BMP_GLOBAL: - paths := server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist()) - bmpmsgs := make([]*bmp.BMPMessage, 0, len(paths)) - for _, path := range paths { - msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) - buf, _ := msgs[0].Serialize() - bmpmsgs = append(bmpmsgs, bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, true, 0, path.GetSource(), path.GetTimestamp().Unix(), buf)) - } - grpcReq.ResponseCh <- &GrpcResponse{ - Data: bmpmsgs, - } - close(grpcReq.ResponseCh) - case REQ_BMP_NEIGHBORS: - //TODO: merge REQ_NEIGHBORS and REQ_BMP_NEIGHBORS - msgs := make([]*bmp.BMPMessage, 0, len(server.neighborMap)) - for _, peer := range server.neighborMap { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { - continue - } - laddr, lport := peer.fsm.LocalHostPort() - _, rport := peer.fsm.RemoteHostPort() - sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) - info := peer.fsm.peerInfo - timestamp := peer.fsm.pConf.Timers.State.Uptime - msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.fsm.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp) - msgs = append(msgs, msg) - } - grpcReq.ResponseCh <- &GrpcResponse{ - Data: msgs, - } - close(grpcReq.ResponseCh) case REQ_NEIGHBOR: l := make([]*config.Neighbor, 0) for _, peer := range server.neighborMap { @@ -1613,22 +1597,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { Data: paths, } close(grpcReq.ResponseCh) - case REQ_BMP_ADJ_IN: - bmpmsgs := make([]*bmp.BMPMessage, 0) - for _, peer := range server.neighborMap { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { - continue - } - for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) { - msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) - buf, _ := msgs[0].Serialize() - bmpmsgs = append(bmpmsgs, bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, path.GetTimestamp().Unix(), buf)) - } - } - grpcReq.ResponseCh <- &GrpcResponse{ - Data: bmpmsgs, - } - close(grpcReq.ResponseCh) case REQ_NEIGHBOR_SHUTDOWN: peers, err := reqToPeers(grpcReq) if err != nil { @@ -1806,10 +1774,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { server.handleEnableMrtRequest(grpcReq) case REQ_DISABLE_MRT: server.handleDisableMrtRequest(grpcReq) - case REQ_ADD_BMP: - server.handleAddBmp(grpcReq) - case REQ_DELETE_BMP: - server.handleDeleteBmp(grpcReq) case REQ_VALIDATE_RIB: server.handleValidateRib(grpcReq) case REQ_INITIALIZE_RPKI: @@ -2686,57 +2650,6 @@ func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) { close(grpcReq.ResponseCh) } -func (server *BgpServer) handleAddBmp(grpcReq *GrpcRequest) { - var c *config.BmpServerConfig - switch arg := grpcReq.Data.(type) { - case *api.AddBmpRequest: - c = &config.BmpServerConfig{ - Address: arg.Address, - Port: arg.Port, - RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type), - } - case *config.BmpServerConfig: - c = arg - } - - w, y := server.watchers.watcher(WATCHER_BMP) - if !y { - w, _ = newBmpWatcher(server.GrpcReqCh) - server.watchers.addWatcher(WATCHER_BMP, w) - } - - err := w.(*bmpWatcher).addServer(*c) - grpcReq.ResponseCh <- &GrpcResponse{ - ResponseErr: err, - Data: &api.AddBmpResponse{}, - } - close(grpcReq.ResponseCh) -} - -func (server *BgpServer) handleDeleteBmp(grpcReq *GrpcRequest) { - var c *config.BmpServerConfig - switch arg := grpcReq.Data.(type) { - case *api.DeleteBmpRequest: - c = &config.BmpServerConfig{ - Address: arg.Address, - Port: arg.Port, - } - case *config.BmpServerConfig: - c = arg - } - - if w, y := server.watchers.watcher(WATCHER_BMP); y { - err := w.(*bmpWatcher).deleteServer(*c) - grpcReq.ResponseCh <- &GrpcResponse{ - ResponseErr: err, - Data: &api.DeleteBmpResponse{}, - } - close(grpcReq.ResponseCh) - } else { - grpcDone(grpcReq, fmt.Errorf("bmp not configured")) - } -} - func (server *BgpServer) handleValidateRib(grpcReq *GrpcRequest) { arg := grpcReq.Data.(*api.ValidateRibRequest) for _, rf := range server.globalRib.GetRFlist() { @@ -2796,10 +2709,13 @@ const ( ) type watchOptions struct { - bestpath bool - preUpdate bool - postUpdate bool - peerState bool + bestpath bool + preUpdate bool + postUpdate bool + peerState bool + initUpdate bool + initPostUpdate bool + initPeerState bool } type WatchOption func(*watchOptions) @@ -2810,21 +2726,30 @@ func WatchBestPath() WatchOption { } } -func WatchUpdate() WatchOption { +func WatchUpdate(current bool) WatchOption { return func(o *watchOptions) { o.preUpdate = true + if current { + o.initUpdate = true + } } } -func WatchPostUpdate() WatchOption { +func WatchPostUpdate(current bool) WatchOption { return func(o *watchOptions) { o.postUpdate = true + if current { + o.initPostUpdate = true + } } } -func WatchPeerState() WatchOption { +func WatchPeerState(current bool) WatchOption { return func(o *watchOptions) { o.peerState = true + if current { + o.initPeerState = true + } } } @@ -2859,10 +2784,8 @@ func (w *Watcher) loop() { func (w *Watcher) Stop() { ch := make(chan struct{}) defer func() { <-ch }() - w.s.mgmtCh <- func() { defer close(ch) - for k, l := range w.s.watcherMap { for i, v := range l { if w == v { @@ -2919,6 +2842,56 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { if w.opts.peerState { register(WATCH_TYPE_PEER_STATE, w) } + if w.opts.initPeerState { + for _, peer := range s.neighborMap { + if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + w.notify(createWatcherEventStateChange(peer)) + } + } + if w.opts.initUpdate { + for _, peer := range s.neighborMap { + if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) { + msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) + buf, _ := msgs[0].Serialize() + _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] + l, _ := peer.fsm.LocalHostPort() + w.notify(&watcherEventUpdateMsg{ + message: msgs[0], + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(l), + peerID: peer.fsm.peerInfo.ID, + fourBytesAs: y, + timestamp: path.GetTimestamp(), + payload: buf, + postPolicy: false, + }) + } + } + } + if w.opts.postUpdate { + for _, path := range s.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, s.globalRib.GetRFlist()) { + msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path}) + buf, _ := msgs[0].Serialize() + w.notify(&watcherEventUpdateMsg{ + peerAS: path.GetSource().AS, + peerAddress: path.GetSource().Address, + peerID: path.GetSource().ID, + message: msgs[0], + timestamp: path.GetTimestamp(), + payload: buf, + postPolicy: true, + }) + } + + } + go w.loop() } return w |