diff options
Diffstat (limited to 'server/grpc_server.go')
-rw-r--r-- | server/grpc_server.go | 122 |
1 files changed, 35 insertions, 87 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index 84d953e8..b56e2e14 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -119,24 +119,31 @@ func (s *Server) GetNeighbor(ctx context.Context, arg *api.Arguments) (*api.Peer return res.Data.(*api.Peer), nil } -func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error { - var rf bgp.RouteFamily - req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil) - s.bgpServerCh <- req - +func handleMultipleResponses(req *GrpcRequest, f func(*GrpcResponse) error) error { for res := range req.ResponseCh { if err := res.Err(); err != nil { log.Debug(err.Error()) + req.EndCh <- struct{}{} return err } - if err := stream.Send(res.Data.(*api.Peer)); err != nil { + if err := f(res); err != nil { + req.EndCh <- struct{}{} return err } } - return nil } +func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error { + var rf bgp.RouteFamily + req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil) + s.bgpServerCh <- req + + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Peer)) + }) +} + func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error { var reqType int switch arg.Resource { @@ -157,20 +164,12 @@ func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error req := NewGrpcRequest(reqType, arg.Name, bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.Send(res.Data.(*api.Destination)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Destination)) + }) } func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorBestChangedServer) error { - var err error var reqType int switch arg.Resource { case api.Resource_GLOBAL: @@ -182,18 +181,9 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorB req := NewGrpcRequest(reqType, "", bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err = res.Err(); err != nil { - log.Debug(err.Error()) - goto END - } - if err = stream.Send(res.Data.(*api.Destination)); err != nil { - goto END - } - } -END: - req.EndCh <- struct{}{} - return err + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Destination)) + }) } func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPeerStateServer) error { @@ -201,20 +191,9 @@ func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPee req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil) s.bgpServerCh <- req - var err error - - for res := range req.ResponseCh { - if err = res.Err(); err != nil { - log.Debug(err.Error()) - goto END - } - if err = stream.Send(res.Data.(*api.Peer)); err != nil { - goto END - } - } -END: - req.EndCh <- struct{}{} - return err + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Peer)) + }) } func (s *Server) neighbor(reqType int, arg *api.Arguments) (*api.Error, error) { @@ -462,16 +441,9 @@ func (s *Server) GetPolicyRoutePolicies(arg *api.PolicyArguments, stream api.Grp } req := NewGrpcRequest(reqType, "", rf, nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.(api.Grpc_GetPolicyRoutePoliciesServer).Send(res.Data.(*api.PolicyDefinition)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.PolicyDefinition)) + }) } func (s *Server) GetPolicyRoutePolicy(ctx context.Context, arg *api.PolicyArguments) (*api.PolicyDefinition, error) { @@ -520,7 +492,6 @@ func (s *Server) ModPolicyRoutePolicy(stream api.Grpc_ModPolicyRoutePolicyServer } func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) error { - var err error var reqType int switch arg.Resource { case api.Resource_GLOBAL: @@ -532,50 +503,27 @@ func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) err } req := NewGrpcRequest(reqType, arg.NeighborAddress, bgp.RouteFamily(arg.Rf), arg.Interval) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err = res.Err(); err != nil { - log.Debug(err.Error()) - goto END - } - if err = stream.Send(res.Data.(*api.MrtMessage)); err != nil { - goto END - } - } -END: - req.EndCh <- struct{}{} - return err + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.MrtMessage)) + }) } func (s *Server) GetRPKI(arg *api.Arguments, stream api.Grpc_GetRPKIServer) error { req := NewGrpcRequest(REQ_RPKI, "", bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.Send(res.Data.(*api.ROA)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.ROA)) + }) } func (s *Server) GetVrfs(arg *api.Arguments, stream api.Grpc_GetVrfsServer) error { req := NewGrpcRequest(REQ_VRFS, "", bgp.RouteFamily(0), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.Send(res.Data.(*api.Vrf)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Vrf)) + }) } func (s *Server) ModVrf(ctx context.Context, arg *api.ModVrfArguments) (*api.Error, error) { |