summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server/grpc_server.go122
-rw-r--r--server/server.go39
2 files changed, 65 insertions, 96 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go
index 84d953e8..b56e2e14 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -119,24 +119,31 @@ func (s *Server) GetNeighbor(ctx context.Context, arg *api.Arguments) (*api.Peer
return res.Data.(*api.Peer), nil
}
-func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error {
- var rf bgp.RouteFamily
- req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil)
- s.bgpServerCh <- req
-
+func handleMultipleResponses(req *GrpcRequest, f func(*GrpcResponse) error) error {
for res := range req.ResponseCh {
if err := res.Err(); err != nil {
log.Debug(err.Error())
+ req.EndCh <- struct{}{}
return err
}
- if err := stream.Send(res.Data.(*api.Peer)); err != nil {
+ if err := f(res); err != nil {
+ req.EndCh <- struct{}{}
return err
}
}
-
return nil
}
+func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error {
+ var rf bgp.RouteFamily
+ req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil)
+ s.bgpServerCh <- req
+
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Peer))
+ })
+}
+
func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error {
var reqType int
switch arg.Resource {
@@ -157,20 +164,12 @@ func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error
req := NewGrpcRequest(reqType, arg.Name, bgp.RouteFamily(arg.Rf), nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- return err
- }
- if err := stream.Send(res.Data.(*api.Destination)); err != nil {
- return err
- }
- }
- return nil
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Destination))
+ })
}
func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorBestChangedServer) error {
- var err error
var reqType int
switch arg.Resource {
case api.Resource_GLOBAL:
@@ -182,18 +181,9 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorB
req := NewGrpcRequest(reqType, "", bgp.RouteFamily(arg.Rf), nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err = res.Err(); err != nil {
- log.Debug(err.Error())
- goto END
- }
- if err = stream.Send(res.Data.(*api.Destination)); err != nil {
- goto END
- }
- }
-END:
- req.EndCh <- struct{}{}
- return err
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Destination))
+ })
}
func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPeerStateServer) error {
@@ -201,20 +191,9 @@ func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPee
req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil)
s.bgpServerCh <- req
- var err error
-
- for res := range req.ResponseCh {
- if err = res.Err(); err != nil {
- log.Debug(err.Error())
- goto END
- }
- if err = stream.Send(res.Data.(*api.Peer)); err != nil {
- goto END
- }
- }
-END:
- req.EndCh <- struct{}{}
- return err
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Peer))
+ })
}
func (s *Server) neighbor(reqType int, arg *api.Arguments) (*api.Error, error) {
@@ -462,16 +441,9 @@ func (s *Server) GetPolicyRoutePolicies(arg *api.PolicyArguments, stream api.Grp
}
req := NewGrpcRequest(reqType, "", rf, nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- return err
- }
- if err := stream.(api.Grpc_GetPolicyRoutePoliciesServer).Send(res.Data.(*api.PolicyDefinition)); err != nil {
- return err
- }
- }
- return nil
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.PolicyDefinition))
+ })
}
func (s *Server) GetPolicyRoutePolicy(ctx context.Context, arg *api.PolicyArguments) (*api.PolicyDefinition, error) {
@@ -520,7 +492,6 @@ func (s *Server) ModPolicyRoutePolicy(stream api.Grpc_ModPolicyRoutePolicyServer
}
func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) error {
- var err error
var reqType int
switch arg.Resource {
case api.Resource_GLOBAL:
@@ -532,50 +503,27 @@ func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) err
}
req := NewGrpcRequest(reqType, arg.NeighborAddress, bgp.RouteFamily(arg.Rf), arg.Interval)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err = res.Err(); err != nil {
- log.Debug(err.Error())
- goto END
- }
- if err = stream.Send(res.Data.(*api.MrtMessage)); err != nil {
- goto END
- }
- }
-END:
- req.EndCh <- struct{}{}
- return err
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.MrtMessage))
+ })
}
func (s *Server) GetRPKI(arg *api.Arguments, stream api.Grpc_GetRPKIServer) error {
req := NewGrpcRequest(REQ_RPKI, "", bgp.RouteFamily(arg.Rf), nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- return err
- }
- if err := stream.Send(res.Data.(*api.ROA)); err != nil {
- return err
- }
- }
- return nil
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.ROA))
+ })
}
func (s *Server) GetVrfs(arg *api.Arguments, stream api.Grpc_GetVrfsServer) error {
req := NewGrpcRequest(REQ_VRFS, "", bgp.RouteFamily(0), nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- return err
- }
- if err := stream.Send(res.Data.(*api.Vrf)); err != nil {
- return err
- }
- }
- return nil
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Vrf))
+ })
}
func (s *Server) ModVrf(ctx context.Context, arg *api.ModVrfArguments) (*api.Error, error) {
diff --git a/server/server.go b/server/server.go
index 366708a3..11fd807a 100644
--- a/server/server.go
+++ b/server/server.go
@@ -1085,19 +1085,33 @@ END:
return msgs
}
+func sendMultipleResponses(grpcReq *GrpcRequest, results []*GrpcResponse) {
+ defer close(grpcReq.ResponseCh)
+ for _, r := range results {
+ select {
+ case grpcReq.ResponseCh <- r:
+ case <-grpcReq.EndCh:
+ return
+ }
+ }
+}
+
func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
var msgs []*SenderMsg
switch grpcReq.RequestType {
case REQ_GLOBAL_RIB:
if t, ok := server.localRibMap[GLOBAL_RIB_NAME].rib.Tables[grpcReq.RouteFamily]; ok {
+ results := make([]*GrpcResponse, len(t.GetDestinations()))
+ i := 0
for _, dst := range t.GetDestinations() {
result := &GrpcResponse{}
result.Data = dst.ToApiStruct()
- grpcReq.ResponseCh <- result
+ results[i] = result
+ i++
}
+ go sendMultipleResponses(grpcReq, results)
}
- close(grpcReq.ResponseCh)
case REQ_MOD_PATH:
pi := &table.PeerInfo{
@@ -1112,13 +1126,16 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
}
case REQ_NEIGHBORS:
+ results := make([]*GrpcResponse, len(server.neighborMap))
+ i := 0
for _, peer := range server.neighborMap {
result := &GrpcResponse{
Data: peer.ToApiStruct(),
}
- grpcReq.ResponseCh <- result
+ results[i] = result
+ i++
}
- close(grpcReq.ResponseCh)
+ go sendMultipleResponses(grpcReq, results)
case REQ_NEIGHBOR:
peer, err := server.checkNeighborRequest(grpcReq)
@@ -1139,14 +1156,17 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
if peer.isRouteServerClient() && peer.fsm.adminState != ADMIN_STATE_DOWN {
remoteAddr := grpcReq.Name
if t, ok := server.localRibMap[remoteAddr].rib.Tables[grpcReq.RouteFamily]; ok {
+ results := make([]*GrpcResponse, len(t.GetDestinations()))
+ i := 0
for _, dst := range t.GetDestinations() {
result := &GrpcResponse{}
result.Data = dst.ToApiStruct()
- grpcReq.ResponseCh <- result
+ results[i] = result
+ i++
}
+ go sendMultipleResponses(grpcReq, results)
}
}
- close(grpcReq.ResponseCh)
case REQ_ADJ_RIB_IN, REQ_ADJ_RIB_OUT:
peer, err := server.checkNeighborRequest(grpcReq)
@@ -1164,16 +1184,17 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
log.Debugf("RouteFamily=%v adj-rib-out found : %d", rf.String(), len(paths))
}
- for _, p := range paths {
+ results := make([]*GrpcResponse, len(paths))
+ for i, p := range paths {
result := &GrpcResponse{
Data: &api.Destination{
Prefix: p.GetNlri().String(),
Paths: []*api.Path{p.ToApiStruct()},
},
}
- grpcReq.ResponseCh <- result
+ results[i] = result
}
- close(grpcReq.ResponseCh)
+ go sendMultipleResponses(grpcReq, results)
case REQ_NEIGHBOR_SHUTDOWN:
peer, err := server.checkNeighborRequest(grpcReq)