diff options
-rw-r--r-- | server/grpc_server.go | 122 | ||||
-rw-r--r-- | server/server.go | 39 |
2 files changed, 65 insertions, 96 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index 84d953e8..b56e2e14 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -119,24 +119,31 @@ func (s *Server) GetNeighbor(ctx context.Context, arg *api.Arguments) (*api.Peer return res.Data.(*api.Peer), nil } -func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error { - var rf bgp.RouteFamily - req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil) - s.bgpServerCh <- req - +func handleMultipleResponses(req *GrpcRequest, f func(*GrpcResponse) error) error { for res := range req.ResponseCh { if err := res.Err(); err != nil { log.Debug(err.Error()) + req.EndCh <- struct{}{} return err } - if err := stream.Send(res.Data.(*api.Peer)); err != nil { + if err := f(res); err != nil { + req.EndCh <- struct{}{} return err } } - return nil } +func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error { + var rf bgp.RouteFamily + req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil) + s.bgpServerCh <- req + + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Peer)) + }) +} + func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error { var reqType int switch arg.Resource { @@ -157,20 +164,12 @@ func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error req := NewGrpcRequest(reqType, arg.Name, bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.Send(res.Data.(*api.Destination)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Destination)) + }) } func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorBestChangedServer) error { - var err error var reqType int switch arg.Resource { case api.Resource_GLOBAL: @@ -182,18 +181,9 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorB req := NewGrpcRequest(reqType, "", bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err = res.Err(); err != nil { - log.Debug(err.Error()) - goto END - } - if err = stream.Send(res.Data.(*api.Destination)); err != nil { - goto END - } - } -END: - req.EndCh <- struct{}{} - return err + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Destination)) + }) } func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPeerStateServer) error { @@ -201,20 +191,9 @@ func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPee req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil) s.bgpServerCh <- req - var err error - - for res := range req.ResponseCh { - if err = res.Err(); err != nil { - log.Debug(err.Error()) - goto END - } - if err = stream.Send(res.Data.(*api.Peer)); err != nil { - goto END - } - } -END: - req.EndCh <- struct{}{} - return err + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Peer)) + }) } func (s *Server) neighbor(reqType int, arg *api.Arguments) (*api.Error, error) { @@ -462,16 +441,9 @@ func (s *Server) GetPolicyRoutePolicies(arg *api.PolicyArguments, stream api.Grp } req := NewGrpcRequest(reqType, "", rf, nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.(api.Grpc_GetPolicyRoutePoliciesServer).Send(res.Data.(*api.PolicyDefinition)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.PolicyDefinition)) + }) } func (s *Server) GetPolicyRoutePolicy(ctx context.Context, arg *api.PolicyArguments) (*api.PolicyDefinition, error) { @@ -520,7 +492,6 @@ func (s *Server) ModPolicyRoutePolicy(stream api.Grpc_ModPolicyRoutePolicyServer } func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) error { - var err error var reqType int switch arg.Resource { case api.Resource_GLOBAL: @@ -532,50 +503,27 @@ func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) err } req := NewGrpcRequest(reqType, arg.NeighborAddress, bgp.RouteFamily(arg.Rf), arg.Interval) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err = res.Err(); err != nil { - log.Debug(err.Error()) - goto END - } - if err = stream.Send(res.Data.(*api.MrtMessage)); err != nil { - goto END - } - } -END: - req.EndCh <- struct{}{} - return err + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.MrtMessage)) + }) } func (s *Server) GetRPKI(arg *api.Arguments, stream api.Grpc_GetRPKIServer) error { req := NewGrpcRequest(REQ_RPKI, "", bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.Send(res.Data.(*api.ROA)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.ROA)) + }) } func (s *Server) GetVrfs(arg *api.Arguments, stream api.Grpc_GetVrfsServer) error { req := NewGrpcRequest(REQ_VRFS, "", bgp.RouteFamily(0), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.Send(res.Data.(*api.Vrf)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Vrf)) + }) } func (s *Server) ModVrf(ctx context.Context, arg *api.ModVrfArguments) (*api.Error, error) { diff --git a/server/server.go b/server/server.go index 366708a3..11fd807a 100644 --- a/server/server.go +++ b/server/server.go @@ -1085,19 +1085,33 @@ END: return msgs } +func sendMultipleResponses(grpcReq *GrpcRequest, results []*GrpcResponse) { + defer close(grpcReq.ResponseCh) + for _, r := range results { + select { + case grpcReq.ResponseCh <- r: + case <-grpcReq.EndCh: + return + } + } +} + func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { var msgs []*SenderMsg switch grpcReq.RequestType { case REQ_GLOBAL_RIB: if t, ok := server.localRibMap[GLOBAL_RIB_NAME].rib.Tables[grpcReq.RouteFamily]; ok { + results := make([]*GrpcResponse, len(t.GetDestinations())) + i := 0 for _, dst := range t.GetDestinations() { result := &GrpcResponse{} result.Data = dst.ToApiStruct() - grpcReq.ResponseCh <- result + results[i] = result + i++ } + go sendMultipleResponses(grpcReq, results) } - close(grpcReq.ResponseCh) case REQ_MOD_PATH: pi := &table.PeerInfo{ @@ -1112,13 +1126,16 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { } case REQ_NEIGHBORS: + results := make([]*GrpcResponse, len(server.neighborMap)) + i := 0 for _, peer := range server.neighborMap { result := &GrpcResponse{ Data: peer.ToApiStruct(), } - grpcReq.ResponseCh <- result + results[i] = result + i++ } - close(grpcReq.ResponseCh) + go sendMultipleResponses(grpcReq, results) case REQ_NEIGHBOR: peer, err := server.checkNeighborRequest(grpcReq) @@ -1139,14 +1156,17 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { if peer.isRouteServerClient() && peer.fsm.adminState != ADMIN_STATE_DOWN { remoteAddr := grpcReq.Name if t, ok := server.localRibMap[remoteAddr].rib.Tables[grpcReq.RouteFamily]; ok { + results := make([]*GrpcResponse, len(t.GetDestinations())) + i := 0 for _, dst := range t.GetDestinations() { result := &GrpcResponse{} result.Data = dst.ToApiStruct() - grpcReq.ResponseCh <- result + results[i] = result + i++ } + go sendMultipleResponses(grpcReq, results) } } - close(grpcReq.ResponseCh) case REQ_ADJ_RIB_IN, REQ_ADJ_RIB_OUT: peer, err := server.checkNeighborRequest(grpcReq) @@ -1164,16 +1184,17 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { log.Debugf("RouteFamily=%v adj-rib-out found : %d", rf.String(), len(paths)) } - for _, p := range paths { + results := make([]*GrpcResponse, len(paths)) + for i, p := range paths { result := &GrpcResponse{ Data: &api.Destination{ Prefix: p.GetNlri().String(), Paths: []*api.Path{p.ToApiStruct()}, }, } - grpcReq.ResponseCh <- result + results[i] = result } - close(grpcReq.ResponseCh) + go sendMultipleResponses(grpcReq, results) case REQ_NEIGHBOR_SHUTDOWN: peer, err := server.checkNeighborRequest(grpcReq) |