summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-08-19 11:38:45 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-08-19 13:21:01 +0900
commit22b43f7ae7daf381520a529157be4c71c11c2bc8 (patch)
treef089db0dca3b7e92319dfc46c88cf00f44f5cdcd
parent7ba78e831c23352179e846e096a2a3ecbd3f8f37 (diff)
server: fix grpc deadlock
When grpc client stops reading, gobgpd becomes in deadlock because gobgpd tries to write to grpc request's channel for a response but nobody reads from the channel. This patch fixes this issue. Another problem that this patch fixes is that grpc client reads slowly, gobgpd would sleep on grpc request's channel for a response. That should not happen. So instead grpc creates responses first, then run goroutine to write the responses to the client. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--server/grpc_server.go122
-rw-r--r--server/server.go39
2 files changed, 65 insertions, 96 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go
index 84d953e8..b56e2e14 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -119,24 +119,31 @@ func (s *Server) GetNeighbor(ctx context.Context, arg *api.Arguments) (*api.Peer
return res.Data.(*api.Peer), nil
}
-func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error {
- var rf bgp.RouteFamily
- req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil)
- s.bgpServerCh <- req
-
+func handleMultipleResponses(req *GrpcRequest, f func(*GrpcResponse) error) error {
for res := range req.ResponseCh {
if err := res.Err(); err != nil {
log.Debug(err.Error())
+ req.EndCh <- struct{}{}
return err
}
- if err := stream.Send(res.Data.(*api.Peer)); err != nil {
+ if err := f(res); err != nil {
+ req.EndCh <- struct{}{}
return err
}
}
-
return nil
}
+func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error {
+ var rf bgp.RouteFamily
+ req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil)
+ s.bgpServerCh <- req
+
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Peer))
+ })
+}
+
func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error {
var reqType int
switch arg.Resource {
@@ -157,20 +164,12 @@ func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error
req := NewGrpcRequest(reqType, arg.Name, bgp.RouteFamily(arg.Rf), nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- return err
- }
- if err := stream.Send(res.Data.(*api.Destination)); err != nil {
- return err
- }
- }
- return nil
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Destination))
+ })
}
func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorBestChangedServer) error {
- var err error
var reqType int
switch arg.Resource {
case api.Resource_GLOBAL:
@@ -182,18 +181,9 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorB
req := NewGrpcRequest(reqType, "", bgp.RouteFamily(arg.Rf), nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err = res.Err(); err != nil {
- log.Debug(err.Error())
- goto END
- }
- if err = stream.Send(res.Data.(*api.Destination)); err != nil {
- goto END
- }
- }
-END:
- req.EndCh <- struct{}{}
- return err
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Destination))
+ })
}
func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPeerStateServer) error {
@@ -201,20 +191,9 @@ func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPee
req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil)
s.bgpServerCh <- req
- var err error
-
- for res := range req.ResponseCh {
- if err = res.Err(); err != nil {
- log.Debug(err.Error())
- goto END
- }
- if err = stream.Send(res.Data.(*api.Peer)); err != nil {
- goto END
- }
- }
-END:
- req.EndCh <- struct{}{}
- return err
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Peer))
+ })
}
func (s *Server) neighbor(reqType int, arg *api.Arguments) (*api.Error, error) {
@@ -462,16 +441,9 @@ func (s *Server) GetPolicyRoutePolicies(arg *api.PolicyArguments, stream api.Grp
}
req := NewGrpcRequest(reqType, "", rf, nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- return err
- }
- if err := stream.(api.Grpc_GetPolicyRoutePoliciesServer).Send(res.Data.(*api.PolicyDefinition)); err != nil {
- return err
- }
- }
- return nil
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.PolicyDefinition))
+ })
}
func (s *Server) GetPolicyRoutePolicy(ctx context.Context, arg *api.PolicyArguments) (*api.PolicyDefinition, error) {
@@ -520,7 +492,6 @@ func (s *Server) ModPolicyRoutePolicy(stream api.Grpc_ModPolicyRoutePolicyServer
}
func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) error {
- var err error
var reqType int
switch arg.Resource {
case api.Resource_GLOBAL:
@@ -532,50 +503,27 @@ func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) err
}
req := NewGrpcRequest(reqType, arg.NeighborAddress, bgp.RouteFamily(arg.Rf), arg.Interval)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err = res.Err(); err != nil {
- log.Debug(err.Error())
- goto END
- }
- if err = stream.Send(res.Data.(*api.MrtMessage)); err != nil {
- goto END
- }
- }
-END:
- req.EndCh <- struct{}{}
- return err
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.MrtMessage))
+ })
}
func (s *Server) GetRPKI(arg *api.Arguments, stream api.Grpc_GetRPKIServer) error {
req := NewGrpcRequest(REQ_RPKI, "", bgp.RouteFamily(arg.Rf), nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- return err
- }
- if err := stream.Send(res.Data.(*api.ROA)); err != nil {
- return err
- }
- }
- return nil
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.ROA))
+ })
}
func (s *Server) GetVrfs(arg *api.Arguments, stream api.Grpc_GetVrfsServer) error {
req := NewGrpcRequest(REQ_VRFS, "", bgp.RouteFamily(0), nil)
s.bgpServerCh <- req
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- return err
- }
- if err := stream.Send(res.Data.(*api.Vrf)); err != nil {
- return err
- }
- }
- return nil
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Vrf))
+ })
}
func (s *Server) ModVrf(ctx context.Context, arg *api.ModVrfArguments) (*api.Error, error) {
diff --git a/server/server.go b/server/server.go
index 366708a3..11fd807a 100644
--- a/server/server.go
+++ b/server/server.go
@@ -1085,19 +1085,33 @@ END:
return msgs
}
+func sendMultipleResponses(grpcReq *GrpcRequest, results []*GrpcResponse) {
+ defer close(grpcReq.ResponseCh)
+ for _, r := range results {
+ select {
+ case grpcReq.ResponseCh <- r:
+ case <-grpcReq.EndCh:
+ return
+ }
+ }
+}
+
func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
var msgs []*SenderMsg
switch grpcReq.RequestType {
case REQ_GLOBAL_RIB:
if t, ok := server.localRibMap[GLOBAL_RIB_NAME].rib.Tables[grpcReq.RouteFamily]; ok {
+ results := make([]*GrpcResponse, len(t.GetDestinations()))
+ i := 0
for _, dst := range t.GetDestinations() {
result := &GrpcResponse{}
result.Data = dst.ToApiStruct()
- grpcReq.ResponseCh <- result
+ results[i] = result
+ i++
}
+ go sendMultipleResponses(grpcReq, results)
}
- close(grpcReq.ResponseCh)
case REQ_MOD_PATH:
pi := &table.PeerInfo{
@@ -1112,13 +1126,16 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
}
case REQ_NEIGHBORS:
+ results := make([]*GrpcResponse, len(server.neighborMap))
+ i := 0
for _, peer := range server.neighborMap {
result := &GrpcResponse{
Data: peer.ToApiStruct(),
}
- grpcReq.ResponseCh <- result
+ results[i] = result
+ i++
}
- close(grpcReq.ResponseCh)
+ go sendMultipleResponses(grpcReq, results)
case REQ_NEIGHBOR:
peer, err := server.checkNeighborRequest(grpcReq)
@@ -1139,14 +1156,17 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
if peer.isRouteServerClient() && peer.fsm.adminState != ADMIN_STATE_DOWN {
remoteAddr := grpcReq.Name
if t, ok := server.localRibMap[remoteAddr].rib.Tables[grpcReq.RouteFamily]; ok {
+ results := make([]*GrpcResponse, len(t.GetDestinations()))
+ i := 0
for _, dst := range t.GetDestinations() {
result := &GrpcResponse{}
result.Data = dst.ToApiStruct()
- grpcReq.ResponseCh <- result
+ results[i] = result
+ i++
}
+ go sendMultipleResponses(grpcReq, results)
}
}
- close(grpcReq.ResponseCh)
case REQ_ADJ_RIB_IN, REQ_ADJ_RIB_OUT:
peer, err := server.checkNeighborRequest(grpcReq)
@@ -1164,16 +1184,17 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
log.Debugf("RouteFamily=%v adj-rib-out found : %d", rf.String(), len(paths))
}
- for _, p := range paths {
+ results := make([]*GrpcResponse, len(paths))
+ for i, p := range paths {
result := &GrpcResponse{
Data: &api.Destination{
Prefix: p.GetNlri().String(),
Paths: []*api.Path{p.ToApiStruct()},
},
}
- grpcReq.ResponseCh <- result
+ results[i] = result
}
- close(grpcReq.ResponseCh)
+ go sendMultipleResponses(grpcReq, results)
case REQ_NEIGHBOR_SHUTDOWN:
peer, err := server.checkNeighborRequest(grpcReq)