summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/grpc_server.go')
-rw-r--r--pkg/server/grpc_server.go107
1 files changed, 52 insertions, 55 deletions
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go
index 89f3c534..98b28b00 100644
--- a/pkg/server/grpc_server.go
+++ b/pkg/server/grpc_server.go
@@ -90,13 +90,15 @@ func (s *server) serve() error {
}
func (s *server) ListPeer(r *api.ListPeerRequest, stream api.GobgpApi_ListPeerServer) error {
- l, err := s.bgpServer.ListPeer(context.Background(), r)
- for _, e := range l {
- if err := stream.Send(&api.ListPeerResponse{Peer: e}); err != nil {
- return err
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fn := func(p *api.Peer) {
+ if err := stream.Send(&api.ListPeerResponse{Peer: p}); err != nil {
+ cancel()
+ return
}
}
- return err
+ return s.bgpServer.ListPeer(ctx, r, fn)
}
func newValidationFromTableStruct(v *table.Validation) *api.RPKIValidation {
@@ -153,13 +155,14 @@ func getValidation(v []*table.Validation, i int) *table.Validation {
}
func (s *server) ListPath(r *api.ListPathRequest, stream api.GobgpApi_ListPathServer) error {
- dsts, err := s.bgpServer.ListPath(context.Background(), r)
- for _, d := range dsts {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fn := func(d *api.Destination) {
if err := stream.Send(&api.ListPathResponse{Destination: d}); err != nil {
- return err
+ cancel()
}
}
- return err
+ return s.bgpServer.ListPath(ctx, r, fn)
}
func (s *server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_MonitorTableServer) error {
@@ -380,29 +383,25 @@ func (s *server) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) (*empty
}
func (s *server) ListRpki(r *api.ListRpkiRequest, stream api.GobgpApi_ListRpkiServer) error {
- servers, err := s.bgpServer.ListRpki(context.Background(), r)
- if err != nil {
- return err
- }
- for _, rpki := range servers {
- if err := stream.Send(&api.ListRpkiResponse{Server: rpki}); err != nil {
- return err
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fn := func(r *api.Rpki) {
+ if err := stream.Send(&api.ListRpkiResponse{Server: r}); err != nil {
+ cancel()
}
}
- return nil
+ return s.bgpServer.ListRpki(ctx, r, fn)
}
func (s *server) ListRpkiTable(r *api.ListRpkiTableRequest, stream api.GobgpApi_ListRpkiTableServer) error {
- roas, err := s.bgpServer.ListRpkiTable(context.Background(), r)
- if err != nil {
- return err
- }
- for _, roa := range roas {
- if err := stream.Send(&api.ListRpkiTableResponse{Roa: roa}); err != nil {
- return err
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fn := func(r *api.Roa) {
+ if err := stream.Send(&api.ListRpkiTableResponse{Roa: r}); err != nil {
+ cancel()
}
}
- return nil
+ return s.bgpServer.ListRpkiTable(ctx, r, fn)
}
func (s *server) EnableZebra(ctx context.Context, r *api.EnableZebraRequest) (*empty.Empty, error) {
@@ -410,12 +409,14 @@ func (s *server) EnableZebra(ctx context.Context, r *api.EnableZebraRequest) (*e
}
func (s *server) ListVrf(r *api.ListVrfRequest, stream api.GobgpApi_ListVrfServer) error {
- for _, v := range s.bgpServer.ListVrf(context.Background(), r) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fn := func(v *api.Vrf) {
if err := stream.Send(&api.ListVrfResponse{Vrf: v}); err != nil {
- return err
+ cancel()
}
}
- return nil
+ return s.bgpServer.ListVrf(ctx, r, fn)
}
func (s *server) AddVrf(ctx context.Context, r *api.AddVrfRequest) (*empty.Empty, error) {
@@ -939,16 +940,14 @@ func newDefinedSetFromApiStruct(a *api.DefinedSet) (table.DefinedSet, error) {
var _regexpPrefixMaskLengthRange = regexp.MustCompile(`(\d+)\.\.(\d+)`)
func (s *server) ListDefinedSet(r *api.ListDefinedSetRequest, stream api.GobgpApi_ListDefinedSetServer) error {
- sets, err := s.bgpServer.ListDefinedSet(context.Background(), r)
- if err != nil {
- return err
- }
- for _, set := range sets {
- if err := stream.Send(&api.ListDefinedSetResponse{DefinedSet: set}); err != nil {
- return err
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fn := func(d *api.DefinedSet) {
+ if err := stream.Send(&api.ListDefinedSetResponse{DefinedSet: d}); err != nil {
+ cancel()
}
}
- return nil
+ return s.bgpServer.ListDefinedSet(ctx, r, fn)
}
func (s *server) AddDefinedSet(ctx context.Context, r *api.AddDefinedSetRequest) (*empty.Empty, error) {
@@ -1502,16 +1501,14 @@ func newStatementFromApiStruct(a *api.Statement) (*table.Statement, error) {
}
func (s *server) ListStatement(r *api.ListStatementRequest, stream api.GobgpApi_ListStatementServer) error {
- l, err := s.bgpServer.ListStatement(context.Background(), r)
- if err != nil {
- for _, st := range l {
- err = stream.Send(&api.ListStatementResponse{Statement: st})
- if err != nil {
- return err
- }
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fn := func(s *api.Statement) {
+ if err := stream.Send(&api.ListStatementResponse{Statement: s}); err != nil {
+ cancel()
}
}
- return err
+ return s.bgpServer.ListStatement(ctx, r, fn)
}
func (s *server) AddStatement(ctx context.Context, r *api.AddStatementRequest) (*empty.Empty, error) {
@@ -1585,13 +1582,14 @@ func newRoaListFromTableStructList(origin []*table.ROA) []*api.Roa {
}
func (s *server) ListPolicy(r *api.ListPolicyRequest, stream api.GobgpApi_ListPolicyServer) error {
- l, err := s.bgpServer.ListPolicy(context.Background(), r)
- for _, p := range l {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fn := func(p *api.Policy) {
if err := stream.Send(&api.ListPolicyResponse{Policy: p}); err != nil {
- return err
+ cancel()
}
}
- return err
+ return s.bgpServer.ListPolicy(ctx, r, fn)
}
func (s *server) AddPolicy(ctx context.Context, r *api.AddPolicyRequest) (*empty.Empty, error) {
@@ -1603,15 +1601,14 @@ func (s *server) DeletePolicy(ctx context.Context, r *api.DeletePolicyRequest) (
}
func (s *server) ListPolicyAssignment(r *api.ListPolicyAssignmentRequest, stream api.GobgpApi_ListPolicyAssignmentServer) error {
- l, err := s.bgpServer.ListPolicyAssignment(context.Background(), r)
- if err == nil {
- for _, a := range l {
- if err := stream.Send(&api.ListPolicyAssignmentResponse{Assignment: a}); err != nil {
- return err
- }
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fn := func(a *api.PolicyAssignment) {
+ if err := stream.Send(&api.ListPolicyAssignmentResponse{Assignment: a}); err != nil {
+ cancel()
}
}
- return err
+ return s.bgpServer.ListPolicyAssignment(ctx, r, fn)
}
func defaultRouteType(d api.RouteAction) table.RouteType {