diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/control/server/server.go | 5 | ||||
-rw-r--r-- | pkg/urpc/urpc.go | 51 |
2 files changed, 37 insertions, 19 deletions
diff --git a/pkg/control/server/server.go b/pkg/control/server/server.go index 629dae8f4..889568177 100644 --- a/pkg/control/server/server.go +++ b/pkg/control/server/server.go @@ -22,6 +22,7 @@ package server import ( "os" + "time" "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/sync" @@ -65,13 +66,13 @@ func (s *Server) Wait() { // Stop stops the server. Note that this function should only be called once // and the server should not be used afterwards. -func (s *Server) Stop() { +func (s *Server) Stop(timeout time.Duration) { s.socket.Close() s.Wait() // This will cause existing clients to be terminated safely. If the // registered handlers have a Stop callback, it will be called. - s.server.Stop() + s.server.Stop(timeout) } // StartServing starts listening for connect and spawns the main service diff --git a/pkg/urpc/urpc.go b/pkg/urpc/urpc.go index 0e9a829f6..0ef635a2f 100644 --- a/pkg/urpc/urpc.go +++ b/pkg/urpc/urpc.go @@ -27,6 +27,7 @@ import ( "os" "reflect" "runtime" + "time" "gvisor.dev/gvisor/pkg/fd" "gvisor.dev/gvisor/pkg/log" @@ -458,29 +459,45 @@ func (s *Server) StartHandling(client *unet.Socket) { // No new requests should be initiated after calling Stop. Existing clients // will be closed after completing any pending RPCs. This method will block // until all clients have disconnected. -func (s *Server) Stop() { - // Wait for all outstanding requests. - defer s.wg.Wait() - +// +// timeout is the time for clients to complete ongoing RPCs. +func (s *Server) Stop(timeout time.Duration) { // Call any Stop callbacks. for _, stopper := range s.stoppers { stopper.Stop() } - // Close all known clients. - s.mu.Lock() - defer s.mu.Unlock() - for client, state := range s.clients { - switch state { - case idle: - // Close connection now. - client.Close() - s.clients[client] = closed - case processing: - // Request close when done. - s.clients[client] = closeRequested + done := make(chan bool, 1) + go func() { + if timeout != 0 { + timer := time.NewTicker(timeout) + defer timer.Stop() + select { + case <-done: + return + case <-timer.C: + } } - } + + // Close all known clients. + s.mu.Lock() + defer s.mu.Unlock() + for client, state := range s.clients { + switch state { + case idle: + // Close connection now. + client.Close() + s.clients[client] = closed + case processing: + // Request close when done. + s.clients[client] = closeRequested + } + } + }() + + // Wait for all outstanding requests. + s.wg.Wait() + done <- true } // Client is a urpc client. |