summaryrefslogtreecommitdiffhomepage
path: root/pkg/p9
diff options
context:
space:
mode:
authorJamie Liu <jamieliu@google.com>2020-08-19 11:50:54 -0700
committergVisor bot <gvisor-bot@google.com>2020-08-19 11:58:59 -0700
commit3fd4b83fa325de5e4e49c6fdcaa95a1a3db486ed (patch)
tree28ca2ac8a3ebead11ab66f7dbc8aab8610782a69 /pkg/p9
parent41777e90ea6a48eadbfdedc71029a634e1a23b5f (diff)
Remove use of channels from p9.connState legacy transport.
- Remove sendDone, which currently does nothing whatsoever (errors sent to the channel are completely unused). Instead, have request handlers log errors they get from p9.send() inline. - Replace recvOkay and recvDone with recvMu/recvIdle/recvShutdown. In addition to being slightly clearer (IMO), this eliminates the p9.connState.service() goroutine, significantly reducing the overhead involved in passing connection receive access between goroutines (from buffered chan send/recv + unbuffered chan send/recv to just a mutex unlock/lock). PiperOrigin-RevId: 327476755
Diffstat (limited to 'pkg/p9')
-rw-r--r--pkg/p9/server.go147
1 files changed, 68 insertions, 79 deletions
diff --git a/pkg/p9/server.go b/pkg/p9/server.go
index b9f15e4ed..3736f12a3 100644
--- a/pkg/p9/server.go
+++ b/pkg/p9/server.go
@@ -60,12 +60,6 @@ type connState struct {
// server is the backing server.
server *Server
- // sendMu is the send lock.
- sendMu sync.Mutex
-
- // conn is the connection.
- conn *unet.Socket
-
// fids is the set of active FIDs.
//
// This is used to find FIDs for files.
@@ -92,14 +86,25 @@ type connState struct {
// -- below relates to the legacy handler --
- // recvOkay indicates that a receive may start.
- recvOkay chan bool
+ // recvMu serializes receiving from conn.
+ recvMu sync.Mutex
+
+ // recvIdle is the number of goroutines in handleRequests() attempting to
+ // lock recvMu so that they can receive from conn. recvIdle is accessed
+ // using atomic memory operations.
+ recvIdle int32
+
+ // If recvShutdown is true, at least one goroutine has observed a
+ // connection error while receiving from conn, and all goroutines in
+ // handleRequests() should exit immediately. recvShutdown is protected by
+ // recvMu.
+ recvShutdown bool
- // recvDone is signalled when a message is received.
- recvDone chan error
+ // sendMu serializes sending to conn.
+ sendMu sync.Mutex
- // sendDone is signalled when a send is finished.
- sendDone chan error
+ // conn is the connection used by the legacy transport.
+ conn *unet.Socket
// -- below relates to the flipcall handler --
@@ -508,11 +513,21 @@ func (cs *connState) handle(m message) (r message) {
return
}
-// handleRequest handles a single request.
-//
-// The recvDone channel is signaled when recv is done (with a error if
-// necessary). The sendDone channel is signaled with the result of the send.
-func (cs *connState) handleRequest() {
+// handleRequest handles a single request. It returns true if the caller should
+// continue handling requests and false if it should terminate.
+func (cs *connState) handleRequest() bool {
+ // Obtain the right to receive a message from cs.conn.
+ atomic.AddInt32(&cs.recvIdle, 1)
+ cs.recvMu.Lock()
+ atomic.AddInt32(&cs.recvIdle, -1)
+
+ if cs.recvShutdown {
+ // Another goroutine already detected a connection problem; exit
+ // immediately.
+ cs.recvMu.Unlock()
+ return false
+ }
+
messageSize := atomic.LoadUint32(&cs.messageSize)
if messageSize == 0 {
// Default or not yet negotiated.
@@ -523,12 +538,17 @@ func (cs *connState) handleRequest() {
tag, m, err := recv(cs.conn, messageSize, msgRegistry.get)
if errSocket, ok := err.(ErrSocket); ok {
// Connection problem; stop serving.
- cs.recvDone <- errSocket.error
- return
+ log.Debugf("p9.recv: %v", errSocket.error)
+ cs.recvShutdown = true
+ cs.recvMu.Unlock()
+ return false
}
- // Signal receive is done.
- cs.recvDone <- nil
+ // Ensure that another goroutine is available to receive from cs.conn.
+ if atomic.LoadInt32(&cs.recvIdle) == 0 {
+ go cs.handleRequests() // S/R-SAFE: Irrelevant.
+ }
+ cs.recvMu.Unlock()
// Deal with other errors.
if err != nil && err != io.EOF {
@@ -537,16 +557,17 @@ func (cs *connState) handleRequest() {
cs.sendMu.Lock()
err := send(cs.conn, tag, newErr(err))
cs.sendMu.Unlock()
- cs.sendDone <- err
- return
+ if err != nil {
+ log.Debugf("p9.send: %v", err)
+ }
+ return true
}
// Try to start the tag.
if !cs.StartTag(tag) {
// Nothing we can do at this point; client is bogus.
log.Debugf("no valid tag [%05d]", tag)
- cs.sendDone <- ErrNoValidMessage
- return
+ return true
}
// Handle the message.
@@ -560,15 +581,21 @@ func (cs *connState) handleRequest() {
cs.sendMu.Lock()
err = send(cs.conn, tag, r)
cs.sendMu.Unlock()
- cs.sendDone <- err
+ if err != nil {
+ log.Debugf("p9.send: %v", err)
+ }
// Return the message to the cache.
msgRegistry.put(m)
+
+ return true
}
func (cs *connState) handleRequests() {
- for range cs.recvOkay {
- cs.handleRequest()
+ for {
+ if !cs.handleRequest() {
+ return
+ }
}
}
@@ -578,11 +605,6 @@ func (cs *connState) stop() {
// us with SIGABRT to get a stack dump of the offending handler.
cs.pendingWg.Wait()
- // Close all channels.
- close(cs.recvOkay)
- close(cs.recvDone)
- close(cs.sendDone)
-
// Free the channels.
cs.channelMu.Lock()
for _, ch := range cs.channels {
@@ -600,6 +622,9 @@ func (cs *connState) stop() {
cs.channelAlloc.Destroy()
}
+ // Ensure the connection is closed.
+ cs.conn.Close()
+
// Close all remaining fids.
for fid, fidRef := range cs.fids {
delete(cs.fids, fid)
@@ -609,59 +634,23 @@ func (cs *connState) stop() {
// handlers running via the wait for Pending => 0 below.
fidRef.DecRef()
}
-
- // Ensure the connection is closed.
- cs.conn.Close()
-}
-
-// service services requests concurrently.
-func (cs *connState) service() error {
- // Start the first request handler.
- go cs.handleRequests() // S/R-SAFE: Irrelevant.
- cs.recvOkay <- true
-
- // We loop and make sure there's always one goroutine waiting for a new
- // request. We process all the data for a single request in one
- // goroutine however, to ensure the best turnaround time possible.
- for {
- select {
- case err := <-cs.recvDone:
- if err != nil {
- return err
- }
-
- // Kick the next receiver, or start a new handler
- // if no receiver is currently waiting.
- select {
- case cs.recvOkay <- true:
- default:
- go cs.handleRequests() // S/R-SAFE: Irrelevant.
- cs.recvOkay <- true
- }
-
- case <-cs.sendDone:
- // Error sending a response? Nothing can be done.
- //
- // We don't terminate on a send error though, since
- // we still have a pending receive. The error would
- // have been logged above, we just ignore it here.
- }
- }
}
// Handle handles a single connection.
func (s *Server) Handle(conn *unet.Socket) error {
cs := &connState{
- server: s,
- conn: conn,
- fids: make(map[FID]*fidRef),
- tags: make(map[Tag]chan struct{}),
- recvOkay: make(chan bool),
- recvDone: make(chan error, 10),
- sendDone: make(chan error, 10),
+ server: s,
+ fids: make(map[FID]*fidRef),
+ tags: make(map[Tag]chan struct{}),
+ conn: conn,
}
defer cs.stop()
- return cs.service()
+
+ // Serve requests from conn in the current goroutine; handleRequests() will
+ // create more goroutines as needed.
+ cs.handleRequests()
+
+ return nil
}
// Serve handles requests from the bound socket.