summaryrefslogtreecommitdiffhomepage
path: root/pkg/p9/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/p9/server.go')
-rw-r--r--pkg/p9/server.go170
1 files changed, 77 insertions, 93 deletions
diff --git a/pkg/p9/server.go b/pkg/p9/server.go
index 60cf94fa1..3736f12a3 100644
--- a/pkg/p9/server.go
+++ b/pkg/p9/server.go
@@ -60,12 +60,6 @@ type connState struct {
// server is the backing server.
server *Server
- // sendMu is the send lock.
- sendMu sync.Mutex
-
- // conn is the connection.
- conn *unet.Socket
-
// fids is the set of active FIDs.
//
// This is used to find FIDs for files.
@@ -87,16 +81,30 @@ type connState struct {
// version 0 implies 9P2000.L.
version uint32
+ // pendingWg counts requests that are still being handled.
+ pendingWg sync.WaitGroup
+
// -- below relates to the legacy handler --
- // recvOkay indicates that a receive may start.
- recvOkay chan bool
+ // recvMu serializes receiving from conn.
+ recvMu sync.Mutex
+
+ // recvIdle is the number of goroutines in handleRequests() attempting to
+ // lock recvMu so that they can receive from conn. recvIdle is accessed
+ // using atomic memory operations.
+ recvIdle int32
- // recvDone is signalled when a message is received.
- recvDone chan error
+ // If recvShutdown is true, at least one goroutine has observed a
+ // connection error while receiving from conn, and all goroutines in
+ // handleRequests() should exit immediately. recvShutdown is protected by
+ // recvMu.
+ recvShutdown bool
- // sendDone is signalled when a send is finished.
- sendDone chan error
+ // sendMu serializes sending to conn.
+ sendMu sync.Mutex
+
+ // conn is the connection used by the legacy transport.
+ conn *unet.Socket
// -- below relates to the flipcall handler --
@@ -479,7 +487,9 @@ func (cs *connState) lookupChannel(id uint32) *channel {
// handle handles a single message.
func (cs *connState) handle(m message) (r message) {
+ cs.pendingWg.Add(1)
defer func() {
+ cs.pendingWg.Done()
if r == nil {
// Don't allow a panic to propagate.
err := recover()
@@ -503,11 +513,21 @@ func (cs *connState) handle(m message) (r message) {
return
}
-// handleRequest handles a single request.
-//
-// The recvDone channel is signaled when recv is done (with a error if
-// necessary). The sendDone channel is signaled with the result of the send.
-func (cs *connState) handleRequest() {
+// handleRequest handles a single request. It returns true if the caller should
+// continue handling requests and false if it should terminate.
+func (cs *connState) handleRequest() bool {
+ // Obtain the right to receive a message from cs.conn.
+ atomic.AddInt32(&cs.recvIdle, 1)
+ cs.recvMu.Lock()
+ atomic.AddInt32(&cs.recvIdle, -1)
+
+ if cs.recvShutdown {
+ // Another goroutine already detected a connection problem; exit
+ // immediately.
+ cs.recvMu.Unlock()
+ return false
+ }
+
messageSize := atomic.LoadUint32(&cs.messageSize)
if messageSize == 0 {
// Default or not yet negotiated.
@@ -518,12 +538,17 @@ func (cs *connState) handleRequest() {
tag, m, err := recv(cs.conn, messageSize, msgRegistry.get)
if errSocket, ok := err.(ErrSocket); ok {
// Connection problem; stop serving.
- cs.recvDone <- errSocket.error
- return
+ log.Debugf("p9.recv: %v", errSocket.error)
+ cs.recvShutdown = true
+ cs.recvMu.Unlock()
+ return false
}
- // Signal receive is done.
- cs.recvDone <- nil
+ // Ensure that another goroutine is available to receive from cs.conn.
+ if atomic.LoadInt32(&cs.recvIdle) == 0 {
+ go cs.handleRequests() // S/R-SAFE: Irrelevant.
+ }
+ cs.recvMu.Unlock()
// Deal with other errors.
if err != nil && err != io.EOF {
@@ -532,16 +557,17 @@ func (cs *connState) handleRequest() {
cs.sendMu.Lock()
err := send(cs.conn, tag, newErr(err))
cs.sendMu.Unlock()
- cs.sendDone <- err
- return
+ if err != nil {
+ log.Debugf("p9.send: %v", err)
+ }
+ return true
}
// Try to start the tag.
if !cs.StartTag(tag) {
// Nothing we can do at this point; client is bogus.
log.Debugf("no valid tag [%05d]", tag)
- cs.sendDone <- ErrNoValidMessage
- return
+ return true
}
// Handle the message.
@@ -555,23 +581,29 @@ func (cs *connState) handleRequest() {
cs.sendMu.Lock()
err = send(cs.conn, tag, r)
cs.sendMu.Unlock()
- cs.sendDone <- err
+ if err != nil {
+ log.Debugf("p9.send: %v", err)
+ }
// Return the message to the cache.
msgRegistry.put(m)
+
+ return true
}
func (cs *connState) handleRequests() {
- for range cs.recvOkay {
- cs.handleRequest()
+ for {
+ if !cs.handleRequest() {
+ return
+ }
}
}
func (cs *connState) stop() {
- // Close all channels.
- close(cs.recvOkay)
- close(cs.recvDone)
- close(cs.sendDone)
+ // Wait for completion of all inflight requests. This is mostly so that if
+ // a request is stuck, the sandbox supervisor has the opportunity to kill
+ // us with SIGABRT to get a stack dump of the offending handler.
+ cs.pendingWg.Wait()
// Free the channels.
cs.channelMu.Lock()
@@ -590,6 +622,9 @@ func (cs *connState) stop() {
cs.channelAlloc.Destroy()
}
+ // Ensure the connection is closed.
+ cs.conn.Close()
+
// Close all remaining fids.
for fid, fidRef := range cs.fids {
delete(cs.fids, fid)
@@ -599,74 +634,23 @@ func (cs *connState) stop() {
// handlers running via the wait for Pending => 0 below.
fidRef.DecRef()
}
-
- // Ensure the connection is closed.
- cs.conn.Close()
-}
-
-// service services requests concurrently.
-func (cs *connState) service() error {
- // Pending is the number of handlers that have finished receiving but
- // not finished processing requests. These must be waiting on properly
- // below. See the next comment for an explanation of the loop.
- pending := 0
-
- // Start the first request handler.
- go cs.handleRequests() // S/R-SAFE: Irrelevant.
- cs.recvOkay <- true
-
- // We loop and make sure there's always one goroutine waiting for a new
- // request. We process all the data for a single request in one
- // goroutine however, to ensure the best turnaround time possible.
- for {
- select {
- case err := <-cs.recvDone:
- if err != nil {
- // Wait for pending handlers.
- for i := 0; i < pending; i++ {
- <-cs.sendDone
- }
- return nil
- }
-
- // This handler is now pending.
- pending++
-
- // Kick the next receiver, or start a new handler
- // if no receiver is currently waiting.
- select {
- case cs.recvOkay <- true:
- default:
- go cs.handleRequests() // S/R-SAFE: Irrelevant.
- cs.recvOkay <- true
- }
-
- case <-cs.sendDone:
- // This handler is finished.
- pending--
-
- // Error sending a response? Nothing can be done.
- //
- // We don't terminate on a send error though, since
- // we still have a pending receive. The error would
- // have been logged above, we just ignore it here.
- }
- }
}
// Handle handles a single connection.
func (s *Server) Handle(conn *unet.Socket) error {
cs := &connState{
- server: s,
- conn: conn,
- fids: make(map[FID]*fidRef),
- tags: make(map[Tag]chan struct{}),
- recvOkay: make(chan bool),
- recvDone: make(chan error, 10),
- sendDone: make(chan error, 10),
+ server: s,
+ fids: make(map[FID]*fidRef),
+ tags: make(map[Tag]chan struct{}),
+ conn: conn,
}
defer cs.stop()
- return cs.service()
+
+ // Serve requests from conn in the current goroutine; handleRequests() will
+ // create more goroutines as needed.
+ cs.handleRequests()
+
+ return nil
}
// Serve handles requests from the bound socket.