diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-08-19 11:38:45 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-08-19 13:21:01 +0900 |
commit | 22b43f7ae7daf381520a529157be4c71c11c2bc8 (patch) | |
tree | f089db0dca3b7e92319dfc46c88cf00f44f5cdcd /server/grpc_server.go | |
parent | 7ba78e831c23352179e846e096a2a3ecbd3f8f37 (diff) |
server: fix grpc deadlock
When grpc client stops reading, gobgpd becomes in deadlock because
gobgpd tries to write to grpc request's channel for a response but
nobody reads from the channel. This patch fixes this issue.
Another problem that this patch fixes is that grpc client reads
slowly, gobgpd would sleep on grpc request's channel for a
response. That should not happen. So instead grpc creates responses
first, then run goroutine to write the responses to the client.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/grpc_server.go')
-rw-r--r-- | server/grpc_server.go | 122 |
1 files changed, 35 insertions, 87 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index 84d953e8..b56e2e14 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -119,24 +119,31 @@ func (s *Server) GetNeighbor(ctx context.Context, arg *api.Arguments) (*api.Peer return res.Data.(*api.Peer), nil } -func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error { - var rf bgp.RouteFamily - req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil) - s.bgpServerCh <- req - +func handleMultipleResponses(req *GrpcRequest, f func(*GrpcResponse) error) error { for res := range req.ResponseCh { if err := res.Err(); err != nil { log.Debug(err.Error()) + req.EndCh <- struct{}{} return err } - if err := stream.Send(res.Data.(*api.Peer)); err != nil { + if err := f(res); err != nil { + req.EndCh <- struct{}{} return err } } - return nil } +func (s *Server) GetNeighbors(_ *api.Arguments, stream api.Grpc_GetNeighborsServer) error { + var rf bgp.RouteFamily + req := NewGrpcRequest(REQ_NEIGHBORS, "", rf, nil) + s.bgpServerCh <- req + + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Peer)) + }) +} + func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error { var reqType int switch arg.Resource { @@ -157,20 +164,12 @@ func (s *Server) GetRib(arg *api.Arguments, stream api.Grpc_GetRibServer) error req := NewGrpcRequest(reqType, arg.Name, bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.Send(res.Data.(*api.Destination)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Destination)) + }) } func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorBestChangedServer) error { - var err error var reqType int switch arg.Resource { case api.Resource_GLOBAL: @@ -182,18 +181,9 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorB req := NewGrpcRequest(reqType, "", bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err = res.Err(); err != nil { - log.Debug(err.Error()) - goto END - } - if err = stream.Send(res.Data.(*api.Destination)); err != nil { - goto END - } - } -END: - req.EndCh <- struct{}{} - return err + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Destination)) + }) } func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPeerStateServer) error { @@ -201,20 +191,9 @@ func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPee req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil) s.bgpServerCh <- req - var err error - - for res := range req.ResponseCh { - if err = res.Err(); err != nil { - log.Debug(err.Error()) - goto END - } - if err = stream.Send(res.Data.(*api.Peer)); err != nil { - goto END - } - } -END: - req.EndCh <- struct{}{} - return err + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Peer)) + }) } func (s *Server) neighbor(reqType int, arg *api.Arguments) (*api.Error, error) { @@ -462,16 +441,9 @@ func (s *Server) GetPolicyRoutePolicies(arg *api.PolicyArguments, stream api.Grp } req := NewGrpcRequest(reqType, "", rf, nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.(api.Grpc_GetPolicyRoutePoliciesServer).Send(res.Data.(*api.PolicyDefinition)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.PolicyDefinition)) + }) } func (s *Server) GetPolicyRoutePolicy(ctx context.Context, arg *api.PolicyArguments) (*api.PolicyDefinition, error) { @@ -520,7 +492,6 @@ func (s *Server) ModPolicyRoutePolicy(stream api.Grpc_ModPolicyRoutePolicyServer } func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) error { - var err error var reqType int switch arg.Resource { case api.Resource_GLOBAL: @@ -532,50 +503,27 @@ func (s *Server) GetMrt(arg *api.MrtArguments, stream api.Grpc_GetMrtServer) err } req := NewGrpcRequest(reqType, arg.NeighborAddress, bgp.RouteFamily(arg.Rf), arg.Interval) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err = res.Err(); err != nil { - log.Debug(err.Error()) - goto END - } - if err = stream.Send(res.Data.(*api.MrtMessage)); err != nil { - goto END - } - } -END: - req.EndCh <- struct{}{} - return err + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.MrtMessage)) + }) } func (s *Server) GetRPKI(arg *api.Arguments, stream api.Grpc_GetRPKIServer) error { req := NewGrpcRequest(REQ_RPKI, "", bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.Send(res.Data.(*api.ROA)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.ROA)) + }) } func (s *Server) GetVrfs(arg *api.Arguments, stream api.Grpc_GetVrfsServer) error { req := NewGrpcRequest(REQ_VRFS, "", bgp.RouteFamily(0), nil) s.bgpServerCh <- req - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - return err - } - if err := stream.Send(res.Data.(*api.Vrf)); err != nil { - return err - } - } - return nil + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Vrf)) + }) } func (s *Server) ModVrf(ctx context.Context, arg *api.ModVrfArguments) (*api.Error, error) { |