From 99b272438618c33fecc74ae8514c6a10f3de8f8f Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Tue, 13 Nov 2018 10:29:38 +0900 Subject: server: change List API design https://github.com/osrg/gobgp/issues/1763#issuecomment-437594975 Follow Chris's proposal; more consistent with gRPC streaming API. Also supports context properly. Signed-off-by: FUJITA Tomonori --- pkg/server/grpc_server.go | 107 ++++++++++++++++++++++------------------------ 1 file changed, 52 insertions(+), 55 deletions(-) (limited to 'pkg/server/grpc_server.go') diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index 89f3c534..98b28b00 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -90,13 +90,15 @@ func (s *server) serve() error { } func (s *server) ListPeer(r *api.ListPeerRequest, stream api.GobgpApi_ListPeerServer) error { - l, err := s.bgpServer.ListPeer(context.Background(), r) - for _, e := range l { - if err := stream.Send(&api.ListPeerResponse{Peer: e}); err != nil { - return err + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(p *api.Peer) { + if err := stream.Send(&api.ListPeerResponse{Peer: p}); err != nil { + cancel() + return } } - return err + return s.bgpServer.ListPeer(ctx, r, fn) } func newValidationFromTableStruct(v *table.Validation) *api.RPKIValidation { @@ -153,13 +155,14 @@ func getValidation(v []*table.Validation, i int) *table.Validation { } func (s *server) ListPath(r *api.ListPathRequest, stream api.GobgpApi_ListPathServer) error { - dsts, err := s.bgpServer.ListPath(context.Background(), r) - for _, d := range dsts { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(d *api.Destination) { if err := stream.Send(&api.ListPathResponse{Destination: d}); err != nil { - return err + cancel() } } - return err + return s.bgpServer.ListPath(ctx, r, fn) } func (s *server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_MonitorTableServer) error { @@ -380,29 +383,25 @@ func (s *server) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) (*empty } func (s *server) ListRpki(r *api.ListRpkiRequest, stream api.GobgpApi_ListRpkiServer) error { - servers, err := s.bgpServer.ListRpki(context.Background(), r) - if err != nil { - return err - } - for _, rpki := range servers { - if err := stream.Send(&api.ListRpkiResponse{Server: rpki}); err != nil { - return err + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(r *api.Rpki) { + if err := stream.Send(&api.ListRpkiResponse{Server: r}); err != nil { + cancel() } } - return nil + return s.bgpServer.ListRpki(ctx, r, fn) } func (s *server) ListRpkiTable(r *api.ListRpkiTableRequest, stream api.GobgpApi_ListRpkiTableServer) error { - roas, err := s.bgpServer.ListRpkiTable(context.Background(), r) - if err != nil { - return err - } - for _, roa := range roas { - if err := stream.Send(&api.ListRpkiTableResponse{Roa: roa}); err != nil { - return err + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(r *api.Roa) { + if err := stream.Send(&api.ListRpkiTableResponse{Roa: r}); err != nil { + cancel() } } - return nil + return s.bgpServer.ListRpkiTable(ctx, r, fn) } func (s *server) EnableZebra(ctx context.Context, r *api.EnableZebraRequest) (*empty.Empty, error) { @@ -410,12 +409,14 @@ func (s *server) EnableZebra(ctx context.Context, r *api.EnableZebraRequest) (*e } func (s *server) ListVrf(r *api.ListVrfRequest, stream api.GobgpApi_ListVrfServer) error { - for _, v := range s.bgpServer.ListVrf(context.Background(), r) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(v *api.Vrf) { if err := stream.Send(&api.ListVrfResponse{Vrf: v}); err != nil { - return err + cancel() } } - return nil + return s.bgpServer.ListVrf(ctx, r, fn) } func (s *server) AddVrf(ctx context.Context, r *api.AddVrfRequest) (*empty.Empty, error) { @@ -939,16 +940,14 @@ func newDefinedSetFromApiStruct(a *api.DefinedSet) (table.DefinedSet, error) { var _regexpPrefixMaskLengthRange = regexp.MustCompile(`(\d+)\.\.(\d+)`) func (s *server) ListDefinedSet(r *api.ListDefinedSetRequest, stream api.GobgpApi_ListDefinedSetServer) error { - sets, err := s.bgpServer.ListDefinedSet(context.Background(), r) - if err != nil { - return err - } - for _, set := range sets { - if err := stream.Send(&api.ListDefinedSetResponse{DefinedSet: set}); err != nil { - return err + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(d *api.DefinedSet) { + if err := stream.Send(&api.ListDefinedSetResponse{DefinedSet: d}); err != nil { + cancel() } } - return nil + return s.bgpServer.ListDefinedSet(ctx, r, fn) } func (s *server) AddDefinedSet(ctx context.Context, r *api.AddDefinedSetRequest) (*empty.Empty, error) { @@ -1502,16 +1501,14 @@ func newStatementFromApiStruct(a *api.Statement) (*table.Statement, error) { } func (s *server) ListStatement(r *api.ListStatementRequest, stream api.GobgpApi_ListStatementServer) error { - l, err := s.bgpServer.ListStatement(context.Background(), r) - if err != nil { - for _, st := range l { - err = stream.Send(&api.ListStatementResponse{Statement: st}) - if err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(s *api.Statement) { + if err := stream.Send(&api.ListStatementResponse{Statement: s}); err != nil { + cancel() } } - return err + return s.bgpServer.ListStatement(ctx, r, fn) } func (s *server) AddStatement(ctx context.Context, r *api.AddStatementRequest) (*empty.Empty, error) { @@ -1585,13 +1582,14 @@ func newRoaListFromTableStructList(origin []*table.ROA) []*api.Roa { } func (s *server) ListPolicy(r *api.ListPolicyRequest, stream api.GobgpApi_ListPolicyServer) error { - l, err := s.bgpServer.ListPolicy(context.Background(), r) - for _, p := range l { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(p *api.Policy) { if err := stream.Send(&api.ListPolicyResponse{Policy: p}); err != nil { - return err + cancel() } } - return err + return s.bgpServer.ListPolicy(ctx, r, fn) } func (s *server) AddPolicy(ctx context.Context, r *api.AddPolicyRequest) (*empty.Empty, error) { @@ -1603,15 +1601,14 @@ func (s *server) DeletePolicy(ctx context.Context, r *api.DeletePolicyRequest) ( } func (s *server) ListPolicyAssignment(r *api.ListPolicyAssignmentRequest, stream api.GobgpApi_ListPolicyAssignmentServer) error { - l, err := s.bgpServer.ListPolicyAssignment(context.Background(), r) - if err == nil { - for _, a := range l { - if err := stream.Send(&api.ListPolicyAssignmentResponse{Assignment: a}); err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(a *api.PolicyAssignment) { + if err := stream.Send(&api.ListPolicyAssignmentResponse{Assignment: a}); err != nil { + cancel() } } - return err + return s.bgpServer.ListPolicyAssignment(ctx, r, fn) } func defaultRouteType(d api.RouteAction) table.RouteType { -- cgit v1.2.3