diff options
Diffstat (limited to 'pkg/urpc')
-rw-r--r-- | pkg/urpc/urpc.go | 43 |
1 files changed, 27 insertions, 16 deletions
diff --git a/pkg/urpc/urpc.go b/pkg/urpc/urpc.go index 0e9a829f6..7872d6fa1 100644 --- a/pkg/urpc/urpc.go +++ b/pkg/urpc/urpc.go @@ -20,6 +20,7 @@ package urpc import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -458,29 +459,39 @@ func (s *Server) StartHandling(client *unet.Socket) { // No new requests should be initiated after calling Stop. Existing clients // will be closed after completing any pending RPCs. This method will block // until all clients have disconnected. -func (s *Server) Stop() { - // Wait for all outstanding requests. - defer s.wg.Wait() +func (s *Server) Stop(ctx context.Context) { + done := make(chan bool) // Call any Stop callbacks. for _, stopper := range s.stoppers { stopper.Stop() } + go func() { + select { + case <-done: + return + case <-ctx.Done(): + } - // Close all known clients. - s.mu.Lock() - defer s.mu.Unlock() - for client, state := range s.clients { - switch state { - case idle: - // Close connection now. - client.Close() - s.clients[client] = closed - case processing: - // Request close when done. - s.clients[client] = closeRequested + // Close all known clients. + s.mu.Lock() + defer s.mu.Unlock() + for client, state := range s.clients { + switch state { + case idle: + // Close connection now. + client.Close() + s.clients[client] = closed + case processing: + // Request close when done. + s.clients[client] = closeRequested + } } - } + }() + // Wait for all outstanding requests. + s.wg.Wait() + done <- true + } // Client is a urpc client. |