diff options
Diffstat (limited to 'pkg/p9/server.go')
-rw-r--r-- | pkg/p9/server.go | 170 |
1 files changed, 77 insertions, 93 deletions
diff --git a/pkg/p9/server.go b/pkg/p9/server.go index 60cf94fa1..3736f12a3 100644 --- a/pkg/p9/server.go +++ b/pkg/p9/server.go @@ -60,12 +60,6 @@ type connState struct { // server is the backing server. server *Server - // sendMu is the send lock. - sendMu sync.Mutex - - // conn is the connection. - conn *unet.Socket - // fids is the set of active FIDs. // // This is used to find FIDs for files. @@ -87,16 +81,30 @@ type connState struct { // version 0 implies 9P2000.L. version uint32 + // pendingWg counts requests that are still being handled. + pendingWg sync.WaitGroup + // -- below relates to the legacy handler -- - // recvOkay indicates that a receive may start. - recvOkay chan bool + // recvMu serializes receiving from conn. + recvMu sync.Mutex + + // recvIdle is the number of goroutines in handleRequests() attempting to + // lock recvMu so that they can receive from conn. recvIdle is accessed + // using atomic memory operations. + recvIdle int32 - // recvDone is signalled when a message is received. - recvDone chan error + // If recvShutdown is true, at least one goroutine has observed a + // connection error while receiving from conn, and all goroutines in + // handleRequests() should exit immediately. recvShutdown is protected by + // recvMu. + recvShutdown bool - // sendDone is signalled when a send is finished. - sendDone chan error + // sendMu serializes sending to conn. + sendMu sync.Mutex + + // conn is the connection used by the legacy transport. + conn *unet.Socket // -- below relates to the flipcall handler -- @@ -479,7 +487,9 @@ func (cs *connState) lookupChannel(id uint32) *channel { // handle handles a single message. func (cs *connState) handle(m message) (r message) { + cs.pendingWg.Add(1) defer func() { + cs.pendingWg.Done() if r == nil { // Don't allow a panic to propagate. err := recover() @@ -503,11 +513,21 @@ func (cs *connState) handle(m message) (r message) { return } -// handleRequest handles a single request. -// -// The recvDone channel is signaled when recv is done (with a error if -// necessary). The sendDone channel is signaled with the result of the send. -func (cs *connState) handleRequest() { +// handleRequest handles a single request. It returns true if the caller should +// continue handling requests and false if it should terminate. +func (cs *connState) handleRequest() bool { + // Obtain the right to receive a message from cs.conn. + atomic.AddInt32(&cs.recvIdle, 1) + cs.recvMu.Lock() + atomic.AddInt32(&cs.recvIdle, -1) + + if cs.recvShutdown { + // Another goroutine already detected a connection problem; exit + // immediately. + cs.recvMu.Unlock() + return false + } + messageSize := atomic.LoadUint32(&cs.messageSize) if messageSize == 0 { // Default or not yet negotiated. @@ -518,12 +538,17 @@ func (cs *connState) handleRequest() { tag, m, err := recv(cs.conn, messageSize, msgRegistry.get) if errSocket, ok := err.(ErrSocket); ok { // Connection problem; stop serving. - cs.recvDone <- errSocket.error - return + log.Debugf("p9.recv: %v", errSocket.error) + cs.recvShutdown = true + cs.recvMu.Unlock() + return false } - // Signal receive is done. - cs.recvDone <- nil + // Ensure that another goroutine is available to receive from cs.conn. + if atomic.LoadInt32(&cs.recvIdle) == 0 { + go cs.handleRequests() // S/R-SAFE: Irrelevant. + } + cs.recvMu.Unlock() // Deal with other errors. if err != nil && err != io.EOF { @@ -532,16 +557,17 @@ func (cs *connState) handleRequest() { cs.sendMu.Lock() err := send(cs.conn, tag, newErr(err)) cs.sendMu.Unlock() - cs.sendDone <- err - return + if err != nil { + log.Debugf("p9.send: %v", err) + } + return true } // Try to start the tag. if !cs.StartTag(tag) { // Nothing we can do at this point; client is bogus. log.Debugf("no valid tag [%05d]", tag) - cs.sendDone <- ErrNoValidMessage - return + return true } // Handle the message. @@ -555,23 +581,29 @@ func (cs *connState) handleRequest() { cs.sendMu.Lock() err = send(cs.conn, tag, r) cs.sendMu.Unlock() - cs.sendDone <- err + if err != nil { + log.Debugf("p9.send: %v", err) + } // Return the message to the cache. msgRegistry.put(m) + + return true } func (cs *connState) handleRequests() { - for range cs.recvOkay { - cs.handleRequest() + for { + if !cs.handleRequest() { + return + } } } func (cs *connState) stop() { - // Close all channels. - close(cs.recvOkay) - close(cs.recvDone) - close(cs.sendDone) + // Wait for completion of all inflight requests. This is mostly so that if + // a request is stuck, the sandbox supervisor has the opportunity to kill + // us with SIGABRT to get a stack dump of the offending handler. + cs.pendingWg.Wait() // Free the channels. cs.channelMu.Lock() @@ -590,6 +622,9 @@ func (cs *connState) stop() { cs.channelAlloc.Destroy() } + // Ensure the connection is closed. + cs.conn.Close() + // Close all remaining fids. for fid, fidRef := range cs.fids { delete(cs.fids, fid) @@ -599,74 +634,23 @@ func (cs *connState) stop() { // handlers running via the wait for Pending => 0 below. fidRef.DecRef() } - - // Ensure the connection is closed. - cs.conn.Close() -} - -// service services requests concurrently. -func (cs *connState) service() error { - // Pending is the number of handlers that have finished receiving but - // not finished processing requests. These must be waiting on properly - // below. See the next comment for an explanation of the loop. - pending := 0 - - // Start the first request handler. - go cs.handleRequests() // S/R-SAFE: Irrelevant. - cs.recvOkay <- true - - // We loop and make sure there's always one goroutine waiting for a new - // request. We process all the data for a single request in one - // goroutine however, to ensure the best turnaround time possible. - for { - select { - case err := <-cs.recvDone: - if err != nil { - // Wait for pending handlers. - for i := 0; i < pending; i++ { - <-cs.sendDone - } - return nil - } - - // This handler is now pending. - pending++ - - // Kick the next receiver, or start a new handler - // if no receiver is currently waiting. - select { - case cs.recvOkay <- true: - default: - go cs.handleRequests() // S/R-SAFE: Irrelevant. - cs.recvOkay <- true - } - - case <-cs.sendDone: - // This handler is finished. - pending-- - - // Error sending a response? Nothing can be done. - // - // We don't terminate on a send error though, since - // we still have a pending receive. The error would - // have been logged above, we just ignore it here. - } - } } // Handle handles a single connection. func (s *Server) Handle(conn *unet.Socket) error { cs := &connState{ - server: s, - conn: conn, - fids: make(map[FID]*fidRef), - tags: make(map[Tag]chan struct{}), - recvOkay: make(chan bool), - recvDone: make(chan error, 10), - sendDone: make(chan error, 10), + server: s, + fids: make(map[FID]*fidRef), + tags: make(map[Tag]chan struct{}), + conn: conn, } defer cs.stop() - return cs.service() + + // Serve requests from conn in the current goroutine; handleRequests() will + // create more goroutines as needed. + cs.handleRequests() + + return nil } // Serve handles requests from the bound socket. |