diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-11-13 10:29:38 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-11-13 14:06:55 +0900 |
commit | 99b272438618c33fecc74ae8514c6a10f3de8f8f (patch) | |
tree | aa8ea9e10dec9811d5f09d46d8c113dc30a7ec00 /pkg/server | |
parent | 91dd60676b9da9a0c39e45f0a656ac2919b0d558 (diff) |
server: change List API design
https://github.com/osrg/gobgp/issues/1763#issuecomment-437594975
Follow Chris's proposal; more consistent with gRPC streaming API.
Also supports context properly.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'pkg/server')
-rw-r--r-- | pkg/server/grpc_server.go | 107 | ||||
-rw-r--r-- | pkg/server/server.go | 176 | ||||
-rw-r--r-- | pkg/server/server_test.go | 28 |
3 files changed, 204 insertions, 107 deletions
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index 89f3c534..98b28b00 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -90,13 +90,15 @@ func (s *server) serve() error { } func (s *server) ListPeer(r *api.ListPeerRequest, stream api.GobgpApi_ListPeerServer) error { - l, err := s.bgpServer.ListPeer(context.Background(), r) - for _, e := range l { - if err := stream.Send(&api.ListPeerResponse{Peer: e}); err != nil { - return err + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(p *api.Peer) { + if err := stream.Send(&api.ListPeerResponse{Peer: p}); err != nil { + cancel() + return } } - return err + return s.bgpServer.ListPeer(ctx, r, fn) } func newValidationFromTableStruct(v *table.Validation) *api.RPKIValidation { @@ -153,13 +155,14 @@ func getValidation(v []*table.Validation, i int) *table.Validation { } func (s *server) ListPath(r *api.ListPathRequest, stream api.GobgpApi_ListPathServer) error { - dsts, err := s.bgpServer.ListPath(context.Background(), r) - for _, d := range dsts { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(d *api.Destination) { if err := stream.Send(&api.ListPathResponse{Destination: d}); err != nil { - return err + cancel() } } - return err + return s.bgpServer.ListPath(ctx, r, fn) } func (s *server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_MonitorTableServer) error { @@ -380,29 +383,25 @@ func (s *server) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) (*empty } func (s *server) ListRpki(r *api.ListRpkiRequest, stream api.GobgpApi_ListRpkiServer) error { - servers, err := s.bgpServer.ListRpki(context.Background(), r) - if err != nil { - return err - } - for _, rpki := range servers { - if err := stream.Send(&api.ListRpkiResponse{Server: rpki}); err != nil { - return err + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(r *api.Rpki) { + if err := stream.Send(&api.ListRpkiResponse{Server: r}); err != nil { + cancel() } } - return nil + return s.bgpServer.ListRpki(ctx, r, fn) } func (s *server) ListRpkiTable(r *api.ListRpkiTableRequest, stream api.GobgpApi_ListRpkiTableServer) error { - roas, err := s.bgpServer.ListRpkiTable(context.Background(), r) - if err != nil { - return err - } - for _, roa := range roas { - if err := stream.Send(&api.ListRpkiTableResponse{Roa: roa}); err != nil { - return err + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(r *api.Roa) { + if err := stream.Send(&api.ListRpkiTableResponse{Roa: r}); err != nil { + cancel() } } - return nil + return s.bgpServer.ListRpkiTable(ctx, r, fn) } func (s *server) EnableZebra(ctx context.Context, r *api.EnableZebraRequest) (*empty.Empty, error) { @@ -410,12 +409,14 @@ func (s *server) EnableZebra(ctx context.Context, r *api.EnableZebraRequest) (*e } func (s *server) ListVrf(r *api.ListVrfRequest, stream api.GobgpApi_ListVrfServer) error { - for _, v := range s.bgpServer.ListVrf(context.Background(), r) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(v *api.Vrf) { if err := stream.Send(&api.ListVrfResponse{Vrf: v}); err != nil { - return err + cancel() } } - return nil + return s.bgpServer.ListVrf(ctx, r, fn) } func (s *server) AddVrf(ctx context.Context, r *api.AddVrfRequest) (*empty.Empty, error) { @@ -939,16 +940,14 @@ func newDefinedSetFromApiStruct(a *api.DefinedSet) (table.DefinedSet, error) { var _regexpPrefixMaskLengthRange = regexp.MustCompile(`(\d+)\.\.(\d+)`) func (s *server) ListDefinedSet(r *api.ListDefinedSetRequest, stream api.GobgpApi_ListDefinedSetServer) error { - sets, err := s.bgpServer.ListDefinedSet(context.Background(), r) - if err != nil { - return err - } - for _, set := range sets { - if err := stream.Send(&api.ListDefinedSetResponse{DefinedSet: set}); err != nil { - return err + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(d *api.DefinedSet) { + if err := stream.Send(&api.ListDefinedSetResponse{DefinedSet: d}); err != nil { + cancel() } } - return nil + return s.bgpServer.ListDefinedSet(ctx, r, fn) } func (s *server) AddDefinedSet(ctx context.Context, r *api.AddDefinedSetRequest) (*empty.Empty, error) { @@ -1502,16 +1501,14 @@ func newStatementFromApiStruct(a *api.Statement) (*table.Statement, error) { } func (s *server) ListStatement(r *api.ListStatementRequest, stream api.GobgpApi_ListStatementServer) error { - l, err := s.bgpServer.ListStatement(context.Background(), r) - if err != nil { - for _, st := range l { - err = stream.Send(&api.ListStatementResponse{Statement: st}) - if err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(s *api.Statement) { + if err := stream.Send(&api.ListStatementResponse{Statement: s}); err != nil { + cancel() } } - return err + return s.bgpServer.ListStatement(ctx, r, fn) } func (s *server) AddStatement(ctx context.Context, r *api.AddStatementRequest) (*empty.Empty, error) { @@ -1585,13 +1582,14 @@ func newRoaListFromTableStructList(origin []*table.ROA) []*api.Roa { } func (s *server) ListPolicy(r *api.ListPolicyRequest, stream api.GobgpApi_ListPolicyServer) error { - l, err := s.bgpServer.ListPolicy(context.Background(), r) - for _, p := range l { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(p *api.Policy) { if err := stream.Send(&api.ListPolicyResponse{Policy: p}); err != nil { - return err + cancel() } } - return err + return s.bgpServer.ListPolicy(ctx, r, fn) } func (s *server) AddPolicy(ctx context.Context, r *api.AddPolicyRequest) (*empty.Empty, error) { @@ -1603,15 +1601,14 @@ func (s *server) DeletePolicy(ctx context.Context, r *api.DeletePolicyRequest) ( } func (s *server) ListPolicyAssignment(r *api.ListPolicyAssignmentRequest, stream api.GobgpApi_ListPolicyAssignmentServer) error { - l, err := s.bgpServer.ListPolicyAssignment(context.Background(), r) - if err == nil { - for _, a := range l { - if err := stream.Send(&api.ListPolicyAssignmentResponse{Assignment: a}); err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fn := func(a *api.PolicyAssignment) { + if err := stream.Send(&api.ListPolicyAssignmentResponse{Assignment: a}); err != nil { + cancel() } } - return err + return s.bgpServer.ListPolicyAssignment(ctx, r, fn) } func defaultRouteType(d api.RouteAction) table.RouteType { diff --git a/pkg/server/server.go b/pkg/server/server.go index 3a00c700..688a8e9f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -2011,7 +2011,7 @@ func (s *BgpServer) listVrf() (l []*table.Vrf) { return l } -func (s *BgpServer) ListVrf(ctx context.Context, _ *api.ListVrfRequest) (l []*api.Vrf) { +func (s *BgpServer) ListVrf(ctx context.Context, _ *api.ListVrfRequest, fn func(*api.Vrf)) error { toApi := func(v *table.Vrf) *api.Vrf { return &api.Vrf{ Name: v.Name, @@ -2021,6 +2021,7 @@ func (s *BgpServer) ListVrf(ctx context.Context, _ *api.ListVrfRequest) (l []*ap ExportRt: apiutil.MarshalRTs(v.ExportRt), } } + var l []*api.Vrf s.mgmtOperation(func() error { l = make([]*api.Vrf, 0, len(s.globalRib.Vrfs)) for _, vrf := range s.globalRib.Vrfs { @@ -2028,7 +2029,15 @@ func (s *BgpServer) ListVrf(ctx context.Context, _ *api.ListVrfRequest) (l []*ap } return nil }, true) - return l + for _, v := range l { + select { + case <-ctx.Done(): + return nil + default: + fn(v) + } + } + return nil } func (s *BgpServer) AddVrf(ctx context.Context, r *api.AddVrfRequest) error { @@ -2320,8 +2329,7 @@ func (s *BgpServer) getAdjRib(addr string, family bgp.RouteFamily, in bool, pref return } -func (s *BgpServer) ListPath(ctx context.Context, r *api.ListPathRequest) ([]*api.Destination, error) { - var dsts []*api.Destination +func (s *BgpServer) ListPath(ctx context.Context, r *api.ListPathRequest, fn func(*api.Destination)) error { var tbl *table.Table var v []*table.Validation @@ -2353,11 +2361,11 @@ func (s *BgpServer) ListPath(ctx context.Context, r *api.ListPathRequest) ([]*ap case api.Resource_VRF: tbl, err = s.getVrfRib(r.Name, family, []*table.LookupPrefix{}) default: - return nil, fmt.Errorf("unsupported resource type: %v", r.Type) + return fmt.Errorf("unsupported resource type: %v", r.Type) } if err != nil { - return nil, err + return err } idx := 0 @@ -2378,11 +2386,16 @@ func (s *BgpServer) ListPath(ctx context.Context, r *api.ListPathRequest) ([]*ap } d.Paths = append(d.Paths, p) } - dsts = append(dsts, &d) + select { + case <-ctx.Done(): + return nil + default: + fn(&d) + } } return nil }() - return dsts, err + return err } func (s *BgpServer) getRibInfo(addr string, family bgp.RouteFamily) (info *table.TableInfo, err error) { @@ -2500,7 +2513,7 @@ func (s *BgpServer) getNeighbor(address string, getAdvertised bool) []*config.Ne return l } -func (s *BgpServer) ListPeer(ctx context.Context, r *api.ListPeerRequest) ([]*api.Peer, error) { +func (s *BgpServer) ListPeer(ctx context.Context, r *api.ListPeerRequest, fn func(*api.Peer)) error { var l []*api.Peer s.mgmtOperation(func() error { address := r.Address @@ -2518,7 +2531,15 @@ func (s *BgpServer) ListPeer(ctx context.Context, r *api.ListPeerRequest) ([]*ap } return nil }, false) - return l, nil + for _, p := range l { + select { + case <-ctx.Done(): + return nil + default: + fn(p) + } + } + return nil } func (s *BgpServer) addPeerGroup(c *config.PeerGroup) error { @@ -2982,17 +3003,27 @@ func (s *BgpServer) DisablePeer(ctx context.Context, r *api.DisablePeerRequest) }, true) } -func (s *BgpServer) ListDefinedSet(ctx context.Context, r *api.ListDefinedSetRequest) (sets []*api.DefinedSet, err error) { +func (s *BgpServer) ListDefinedSet(ctx context.Context, r *api.ListDefinedSetRequest, fn func(*api.DefinedSet)) error { var cd *config.DefinedSets + var err error err = s.mgmtOperation(func() error { cd, err = s.policy.GetDefinedSet(table.DefinedType(r.Type), r.Name) return err }, false) if err != nil { - return nil, err + return err + } + exec := func(d *api.DefinedSet) bool { + select { + case <-ctx.Done(): + return true + default: + fn(d) + } + return false } - sets = make([]*api.DefinedSet, 0) + for _, cs := range cd.PrefixSets { ad := &api.DefinedSet{ Type: api.DefinedType_PREFIX, @@ -3009,8 +3040,9 @@ func (s *BgpServer) ListDefinedSet(ctx context.Context, r *api.ListDefinedSetReq return l }(), } - sets = append(sets, ad) - + if exec(ad) { + return nil + } } for _, cs := range cd.NeighborSets { ad := &api.DefinedSet{ @@ -3018,7 +3050,9 @@ func (s *BgpServer) ListDefinedSet(ctx context.Context, r *api.ListDefinedSetReq Name: cs.NeighborSetName, List: cs.NeighborInfoList, } - sets = append(sets, ad) + if exec(ad) { + return nil + } } for _, cs := range cd.BgpDefinedSets.CommunitySets { ad := &api.DefinedSet{ @@ -3026,7 +3060,9 @@ func (s *BgpServer) ListDefinedSet(ctx context.Context, r *api.ListDefinedSetReq Name: cs.CommunitySetName, List: cs.CommunityList, } - sets = append(sets, ad) + if exec(ad) { + return nil + } } for _, cs := range cd.BgpDefinedSets.ExtCommunitySets { ad := &api.DefinedSet{ @@ -3034,7 +3070,9 @@ func (s *BgpServer) ListDefinedSet(ctx context.Context, r *api.ListDefinedSetReq Name: cs.ExtCommunitySetName, List: cs.ExtCommunityList, } - sets = append(sets, ad) + if exec(ad) { + return nil + } } for _, cs := range cd.BgpDefinedSets.LargeCommunitySets { ad := &api.DefinedSet{ @@ -3042,7 +3080,9 @@ func (s *BgpServer) ListDefinedSet(ctx context.Context, r *api.ListDefinedSetReq Name: cs.LargeCommunitySetName, List: cs.LargeCommunityList, } - sets = append(sets, ad) + if exec(ad) { + return nil + } } for _, cs := range cd.BgpDefinedSets.AsPathSets { ad := &api.DefinedSet{ @@ -3050,9 +3090,11 @@ func (s *BgpServer) ListDefinedSet(ctx context.Context, r *api.ListDefinedSetReq Name: cs.AsPathSetName, List: cs.AsPathList, } - sets = append(sets, ad) + if exec(ad) { + return nil + } } - return sets, nil + return nil } func (s *BgpServer) AddDefinedSet(ctx context.Context, r *api.AddDefinedSetRequest) error { @@ -3081,15 +3123,25 @@ func (s *BgpServer) DeleteDefinedSet(ctx context.Context, r *api.DeleteDefinedSe }, false) } -func (s *BgpServer) ListStatement(ctx context.Context, r *api.ListStatementRequest) ([]*api.Statement, error) { - l := make([]*api.Statement, 0) +func (s *BgpServer) ListStatement(ctx context.Context, r *api.ListStatementRequest, fn func(*api.Statement)) error { + var l []*api.Statement s.mgmtOperation(func() error { - for _, st := range s.policy.GetStatement(r.Name) { + s := s.policy.GetStatement(r.Name) + l = make([]*api.Statement, len(s)) + for _, st := range s { l = append(l, toStatementApi(st)) } return nil }, false) - return l, nil + for _, s := range l { + select { + case <-ctx.Done(): + return nil + default: + fn(s) + } + } + return nil } func (s *BgpServer) AddStatement(ctx context.Context, r *api.AddStatementRequest) error { @@ -3118,15 +3170,25 @@ func (s *BgpServer) DeleteStatement(ctx context.Context, r *api.DeleteStatementR }, false) } -func (s *BgpServer) ListPolicy(ctx context.Context, r *api.ListPolicyRequest) ([]*api.Policy, error) { - l := make([]*api.Policy, 0) +func (s *BgpServer) ListPolicy(ctx context.Context, r *api.ListPolicyRequest, fn func(*api.Policy)) error { + var l []*api.Policy s.mgmtOperation(func() error { - for _, p := range s.policy.GetPolicy(r.Name) { + pl := s.policy.GetPolicy(r.Name) + l = make([]*api.Policy, 0, len(pl)) + for _, p := range pl { l = append(l, table.ToPolicyApi(p)) } return nil }, false) - return l, nil + for _, p := range l { + select { + case <-ctx.Done(): + return nil + default: + fn(p) + } + } + return nil } func (s *BgpServer) AddPolicy(ctx context.Context, r *api.AddPolicyRequest) error { @@ -3188,8 +3250,8 @@ func (s *BgpServer) toPolicyInfo(name string, dir api.PolicyDirection) (string, return "", table.POLICY_DIRECTION_NONE, fmt.Errorf("invalid policy type") } -func (s *BgpServer) ListPolicyAssignment(ctx context.Context, r *api.ListPolicyAssignmentRequest) ([]*api.PolicyAssignment, error) { - a := make([]*api.PolicyAssignment, 0) +func (s *BgpServer) ListPolicyAssignment(ctx context.Context, r *api.ListPolicyAssignmentRequest, fn func(*api.PolicyAssignment)) error { + var a []*api.PolicyAssignment err := s.mgmtOperation(func() error { if r == nil { return fmt.Errorf("invalid request") @@ -3211,6 +3273,7 @@ func (s *BgpServer) ListPolicyAssignment(ctx context.Context, r *api.ListPolicyA dirs = append(dirs, r.Direction) } + a = make([]*api.PolicyAssignment, 0, len(names)) for _, name := range names { for _, dir := range dirs { id, dir, err := s.toPolicyInfo(name, dir) @@ -3235,7 +3298,17 @@ func (s *BgpServer) ListPolicyAssignment(ctx context.Context, r *api.ListPolicyA } return nil }, false) - return a, err + if err == nil { + for _, p := range a { + select { + case <-ctx.Done(): + return nil + default: + fn(p) + } + } + } + return err } func (s *BgpServer) AddPolicyAssignment(ctx context.Context, r *api.AddPolicyAssignmentRequest) error { @@ -3294,8 +3367,9 @@ func (s *BgpServer) DisableMrt(ctx context.Context, r *api.DisableMrtRequest) er }, false) } -func (s *BgpServer) ListRpki(ctx context.Context, r *api.ListRpkiRequest) (l []*api.Rpki, err error) { - err = s.mgmtOperation(func() error { +func (s *BgpServer) ListRpki(ctx context.Context, r *api.ListRpkiRequest, fn func(*api.Rpki)) error { + var l []*api.Rpki + err := s.mgmtOperation(func() error { for _, r := range s.roaManager.GetServers() { received := &r.State.RpkiMessages.RpkiReceived sent := &r.State.RpkiMessages.RpkiSent @@ -3328,23 +3402,43 @@ func (s *BgpServer) ListRpki(ctx context.Context, r *api.ListRpkiRequest) (l []* } return nil }, false) - return l, err + if err == nil { + for _, r := range l { + select { + case <-ctx.Done(): + return nil + default: + fn(r) + } + } + } + return err } -func (s *BgpServer) ListRpkiTable(ctx context.Context, r *api.ListRpkiTableRequest) (l []*api.Roa, err error) { - s.mgmtOperation(func() error { - var roas []*table.ROA +func (s *BgpServer) ListRpkiTable(ctx context.Context, r *api.ListRpkiTableRequest, fn func(*api.Roa)) error { + var l []*api.Roa + err := s.mgmtOperation(func() error { family := bgp.RouteFamily(0) if r.Family != nil { family = bgp.AfiSafiToRouteFamily(uint16(r.Family.Afi), uint8(r.Family.Safi)) } - roas, err = s.roaManager.GetRoa(family) + roas, err := s.roaManager.GetRoa(family) if err == nil { l = append(l, newRoaListFromTableStructList(roas)...) } - return nil + return err }, false) - return + if err == nil { + for _, roa := range l { + select { + case <-ctx.Done(): + return nil + default: + fn(roa) + } + } + } + return err } func (s *BgpServer) AddRpki(ctx context.Context, r *api.AddRpkiRequest) error { diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 27f56353..c8975e41 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -78,9 +78,10 @@ func TestModPolicyAssign(t *testing.T) { err = s.AddPolicyAssignment(context.Background(), &api.AddPolicyAssignmentRequest{Assignment: r}) assert.Nil(err) - ps, err := s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ + var ps []*api.PolicyAssignment + err = s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ Name: table.GLOBAL_RIB_NAME, - Direction: api.PolicyDirection_IMPORT}) + Direction: api.PolicyDirection_IMPORT}, func(p *api.PolicyAssignment) { ps = append(ps, p) }) assert.Nil(err) assert.Equal(len(ps[0].Policies), 3) @@ -91,14 +92,16 @@ func TestModPolicyAssign(t *testing.T) { err = s.DeletePolicyAssignment(context.Background(), &api.DeletePolicyAssignmentRequest{Assignment: r}) assert.Nil(err) - ps, _ = s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ + ps = []*api.PolicyAssignment{} + s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ Name: table.GLOBAL_RIB_NAME, - Direction: api.PolicyDirection_IMPORT}) + Direction: api.PolicyDirection_IMPORT}, func(p *api.PolicyAssignment) { ps = append(ps, p) }) assert.Equal(len(ps[0].Policies), 2) - ps, _ = s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ + ps = []*api.PolicyAssignment{} + s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ Name: table.GLOBAL_RIB_NAME, - }) + }, func(p *api.PolicyAssignment) { ps = append(ps, p) }) assert.Equal(len(ps), 2) } @@ -145,19 +148,22 @@ func TestListPolicyAssignment(t *testing.T) { assert.Nil(err) } - ps, err := s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ + ps := []*api.PolicyAssignment{} + err = s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ Name: table.GLOBAL_RIB_NAME, - }) + }, func(p *api.PolicyAssignment) { ps = append(ps, p) }) assert.Nil(err) assert.Equal(len(ps), 0) - ps, err = s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{}) + ps = []*api.PolicyAssignment{} + err = s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{}, func(p *api.PolicyAssignment) { ps = append(ps, p) }) assert.Nil(err) assert.Equal(len(ps), 3) - ps, err = s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ + ps = []*api.PolicyAssignment{} + err = s.ListPolicyAssignment(context.Background(), &api.ListPolicyAssignmentRequest{ Direction: api.PolicyDirection_EXPORT, - }) + }, func(p *api.PolicyAssignment) { ps = append(ps, p) }) assert.Nil(err) assert.Equal(len(ps), 0) } |