diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/grpc_server.go | 34 | ||||
-rw-r--r-- | server/server.go | 26 |
2 files changed, 60 insertions, 0 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index b14845b2..aa36a264 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -48,6 +48,7 @@ const ( REQ_GLOBAL_RIB REQ_GLOBAL_ADD REQ_GLOBAL_DELETE + REQ_GLOBAL_MONITOR_BEST_CHANGED REQ_POLICY_PREFIX REQ_POLICY_PREFIXES REQ_POLICY_PREFIX_ADD @@ -205,6 +206,37 @@ func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error return nil } +func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorBestChangedServer) error { + var reqType int + switch arg.Resource { + case api.Resource_GLOBAL: + reqType = REQ_GLOBAL_MONITOR_BEST_CHANGED + default: + return fmt.Errorf("unsupported resource type: %v", arg.Resource) + } + + rf, err := convertAf2Rf(arg.Af) + if err != nil { + return err + } + + req := NewGrpcRequest(reqType, "", rf, nil) + s.bgpServerCh <- req + + for res := range req.ResponseCh { + if err = res.Err(); err != nil { + log.Debug(err.Error()) + goto END + } + if err = stream.Send(res.Data.(*api.Path)); err != nil { + goto END + } + } +END: + req.EndCh <- struct{}{} + return err +} + func (s *Server) neighbor(reqType int, arg *api.Arguments) (*api.Error, error) { rf, err := convertAf2Rf(arg.Af) if err != nil { @@ -512,6 +544,7 @@ type GrpcRequest struct { RemoteAddr string RouteFamily bgp.RouteFamily ResponseCh chan *GrpcResponse + EndCh chan struct{} Err error Data interface{} } @@ -522,6 +555,7 @@ func NewGrpcRequest(reqType int, remoteAddr string, rf bgp.RouteFamily, d interf RouteFamily: rf, RemoteAddr: remoteAddr, ResponseCh: make(chan *GrpcResponse), + EndCh: make(chan struct{}), Data: d, } return r diff --git a/server/server.go b/server/server.go index 939c2f56..cc19ecc7 100644 --- a/server/server.go +++ b/server/server.go @@ -52,6 +52,7 @@ type BgpServer struct { policyUpdateCh chan config.RoutingPolicy policyMap map[string]*policy.Policy routingPolicy config.RoutingPolicy + broadcastReqs []*GrpcRequest neighborMap map[string]*Peer localRibMap map[string]*LocalRib @@ -354,6 +355,9 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer) []*SenderMsg { if len(pathList) == 0 { continue } + + server.broadcastBests(pathList) + msgList := table.CreateUpdateMsgFromPaths(pathList) for _, targetPeer := range server.neighborMap { if targetPeer.isRouteServerClient() || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { @@ -404,6 +408,24 @@ func applyPolicies(peer *Peer, loc *LocalRib, isExport bool, pathList []table.Pa return ret } +func (server *BgpServer) broadcastBests(bests []table.Path) { + for _, path := range bests { + result := &GrpcResponse{ + Data: path.ToApiStruct(), + } + remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) + for _, req := range server.broadcastReqs { + select { + case <-req.EndCh: + continue + case req.ResponseCh <- result: + } + remainReqs = append(remainReqs, req) + } + server.broadcastReqs = remainReqs + } +} + func (server *BgpServer) propagateUpdate(neighborAddress string, RouteServerClient bool, pathList []table.Path) []*SenderMsg { msgs := make([]*SenderMsg, 0) @@ -432,6 +454,8 @@ func (server *BgpServer) propagateUpdate(neighborAddress string, RouteServerClie return msgs } + server.broadcastBests(sendPathList) + for _, targetPeer := range server.neighborMap { if targetPeer.isRouteServerClient() || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { continue @@ -790,6 +814,8 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { close(grpcReq.ResponseCh) } + case REQ_GLOBAL_MONITOR_BEST_CHANGED: + server.broadcastReqs = append(server.broadcastReqs, grpcReq) case REQ_NEIGHBORS: for _, peer := range server.neighborMap { result := &GrpcResponse{ |