summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/abi/linux/linux_abi_autogen_unsafe.go36
-rw-r--r--pkg/p9/server.go147
2 files changed, 86 insertions, 97 deletions
diff --git a/pkg/abi/linux/linux_abi_autogen_unsafe.go b/pkg/abi/linux/linux_abi_autogen_unsafe.go
index 2e5798873..d26f8879f 100644
--- a/pkg/abi/linux/linux_abi_autogen_unsafe.go
+++ b/pkg/abi/linux/linux_abi_autogen_unsafe.go
@@ -157,7 +157,7 @@ func (s *Statx) Packed() bool {
// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe.
func (s *Statx) MarshalUnsafe(dst []byte) {
- if s.Btime.Packed() && s.Ctime.Packed() && s.Mtime.Packed() && s.Atime.Packed() {
+ if s.Atime.Packed() && s.Btime.Packed() && s.Ctime.Packed() && s.Mtime.Packed() {
safecopy.CopyIn(dst, unsafe.Pointer(s))
} else {
// Type Statx doesn't have a packed layout in memory, fallback to MarshalBytes.
@@ -167,7 +167,7 @@ func (s *Statx) MarshalUnsafe(dst []byte) {
// UnmarshalUnsafe implements marshal.Marshallable.UnmarshalUnsafe.
func (s *Statx) UnmarshalUnsafe(src []byte) {
- if s.Btime.Packed() && s.Ctime.Packed() && s.Mtime.Packed() && s.Atime.Packed() {
+ if s.Atime.Packed() && s.Btime.Packed() && s.Ctime.Packed() && s.Mtime.Packed() {
safecopy.CopyOut(unsafe.Pointer(s), src)
} else {
// Type Statx doesn't have a packed layout in memory, fallback to UnmarshalBytes.
@@ -178,7 +178,7 @@ func (s *Statx) UnmarshalUnsafe(src []byte) {
// CopyOutN implements marshal.Marshallable.CopyOutN.
//go:nosplit
func (s *Statx) CopyOutN(task marshal.Task, addr usermem.Addr, limit int) (int, error) {
- if !s.Atime.Packed() && s.Btime.Packed() && s.Ctime.Packed() && s.Mtime.Packed() {
+ if !s.Mtime.Packed() && s.Atime.Packed() && s.Btime.Packed() && s.Ctime.Packed() {
// Type Statx doesn't have a packed layout in memory, fall back to MarshalBytes.
buf := task.CopyScratchBuffer(s.SizeBytes()) // escapes: okay.
s.MarshalBytes(buf) // escapes: fallback.
@@ -627,7 +627,7 @@ func (f *FUSEHeaderIn) UnmarshalBytes(src []byte) {
// Packed implements marshal.Marshallable.Packed.
//go:nosplit
func (f *FUSEHeaderIn) Packed() bool {
- return f.Unique.Packed() && f.Opcode.Packed()
+ return f.Opcode.Packed() && f.Unique.Packed()
}
// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe.
@@ -642,7 +642,7 @@ func (f *FUSEHeaderIn) MarshalUnsafe(dst []byte) {
// UnmarshalUnsafe implements marshal.Marshallable.UnmarshalUnsafe.
func (f *FUSEHeaderIn) UnmarshalUnsafe(src []byte) {
- if f.Opcode.Packed() && f.Unique.Packed() {
+ if f.Unique.Packed() && f.Opcode.Packed() {
safecopy.CopyOut(unsafe.Pointer(f), src)
} else {
// Type FUSEHeaderIn doesn't have a packed layout in memory, fallback to UnmarshalBytes.
@@ -683,7 +683,7 @@ func (f *FUSEHeaderIn) CopyOut(task marshal.Task, addr usermem.Addr) (int, error
// CopyIn implements marshal.Marshallable.CopyIn.
//go:nosplit
func (f *FUSEHeaderIn) CopyIn(task marshal.Task, addr usermem.Addr) (int, error) {
- if !f.Opcode.Packed() && f.Unique.Packed() {
+ if !f.Unique.Packed() && f.Opcode.Packed() {
// Type FUSEHeaderIn doesn't have a packed layout in memory, fall back to UnmarshalBytes.
buf := task.CopyScratchBuffer(f.SizeBytes()) // escapes: okay.
length, err := task.CopyInBytes(addr, buf) // escapes: okay.
@@ -709,7 +709,7 @@ func (f *FUSEHeaderIn) CopyIn(task marshal.Task, addr usermem.Addr) (int, error)
// WriteTo implements io.WriterTo.WriteTo.
func (f *FUSEHeaderIn) WriteTo(w io.Writer) (int64, error) {
- if !f.Opcode.Packed() && f.Unique.Packed() {
+ if !f.Unique.Packed() && f.Opcode.Packed() {
// Type FUSEHeaderIn doesn't have a packed layout in memory, fall back to MarshalBytes.
buf := make([]byte, f.SizeBytes())
f.MarshalBytes(buf)
@@ -2025,7 +2025,7 @@ func (i *IPTEntry) Packed() bool {
// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe.
func (i *IPTEntry) MarshalUnsafe(dst []byte) {
- if i.Counters.Packed() && i.IP.Packed() {
+ if i.IP.Packed() && i.Counters.Packed() {
safecopy.CopyIn(dst, unsafe.Pointer(i))
} else {
// Type IPTEntry doesn't have a packed layout in memory, fallback to MarshalBytes.
@@ -2035,7 +2035,7 @@ func (i *IPTEntry) MarshalUnsafe(dst []byte) {
// UnmarshalUnsafe implements marshal.Marshallable.UnmarshalUnsafe.
func (i *IPTEntry) UnmarshalUnsafe(src []byte) {
- if i.IP.Packed() && i.Counters.Packed() {
+ if i.Counters.Packed() && i.IP.Packed() {
safecopy.CopyOut(unsafe.Pointer(i), src)
} else {
// Type IPTEntry doesn't have a packed layout in memory, fallback to UnmarshalBytes.
@@ -2076,7 +2076,7 @@ func (i *IPTEntry) CopyOut(task marshal.Task, addr usermem.Addr) (int, error) {
// CopyIn implements marshal.Marshallable.CopyIn.
//go:nosplit
func (i *IPTEntry) CopyIn(task marshal.Task, addr usermem.Addr) (int, error) {
- if !i.Counters.Packed() && i.IP.Packed() {
+ if !i.IP.Packed() && i.Counters.Packed() {
// Type IPTEntry doesn't have a packed layout in memory, fall back to UnmarshalBytes.
buf := task.CopyScratchBuffer(i.SizeBytes()) // escapes: okay.
length, err := task.CopyInBytes(addr, buf) // escapes: okay.
@@ -2208,7 +2208,7 @@ func (i *IPTIP) UnmarshalBytes(src []byte) {
// Packed implements marshal.Marshallable.Packed.
//go:nosplit
func (i *IPTIP) Packed() bool {
- return i.Src.Packed() && i.Dst.Packed() && i.SrcMask.Packed() && i.DstMask.Packed()
+ return i.DstMask.Packed() && i.Src.Packed() && i.Dst.Packed() && i.SrcMask.Packed()
}
// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe.
@@ -2234,7 +2234,7 @@ func (i *IPTIP) UnmarshalUnsafe(src []byte) {
// CopyOutN implements marshal.Marshallable.CopyOutN.
//go:nosplit
func (i *IPTIP) CopyOutN(task marshal.Task, addr usermem.Addr, limit int) (int, error) {
- if !i.SrcMask.Packed() && i.DstMask.Packed() && i.Src.Packed() && i.Dst.Packed() {
+ if !i.DstMask.Packed() && i.Src.Packed() && i.Dst.Packed() && i.SrcMask.Packed() {
// Type IPTIP doesn't have a packed layout in memory, fall back to MarshalBytes.
buf := task.CopyScratchBuffer(i.SizeBytes()) // escapes: okay.
i.MarshalBytes(buf) // escapes: fallback.
@@ -2264,7 +2264,7 @@ func (i *IPTIP) CopyOut(task marshal.Task, addr usermem.Addr) (int, error) {
// CopyIn implements marshal.Marshallable.CopyIn.
//go:nosplit
func (i *IPTIP) CopyIn(task marshal.Task, addr usermem.Addr) (int, error) {
- if !i.SrcMask.Packed() && i.DstMask.Packed() && i.Src.Packed() && i.Dst.Packed() {
+ if !i.Src.Packed() && i.Dst.Packed() && i.SrcMask.Packed() && i.DstMask.Packed() {
// Type IPTIP doesn't have a packed layout in memory, fall back to UnmarshalBytes.
buf := task.CopyScratchBuffer(i.SizeBytes()) // escapes: okay.
length, err := task.CopyInBytes(addr, buf) // escapes: okay.
@@ -3004,7 +3004,7 @@ func (i *IP6TEntry) Packed() bool {
// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe.
func (i *IP6TEntry) MarshalUnsafe(dst []byte) {
- if i.IPv6.Packed() && i.Counters.Packed() {
+ if i.Counters.Packed() && i.IPv6.Packed() {
safecopy.CopyIn(dst, unsafe.Pointer(i))
} else {
// Type IP6TEntry doesn't have a packed layout in memory, fallback to MarshalBytes.
@@ -3196,12 +3196,12 @@ func (i *IP6TIP) UnmarshalBytes(src []byte) {
// Packed implements marshal.Marshallable.Packed.
//go:nosplit
func (i *IP6TIP) Packed() bool {
- return i.SrcMask.Packed() && i.DstMask.Packed() && i.Src.Packed() && i.Dst.Packed()
+ return i.Dst.Packed() && i.SrcMask.Packed() && i.DstMask.Packed() && i.Src.Packed()
}
// MarshalUnsafe implements marshal.Marshallable.MarshalUnsafe.
func (i *IP6TIP) MarshalUnsafe(dst []byte) {
- if i.Dst.Packed() && i.SrcMask.Packed() && i.DstMask.Packed() && i.Src.Packed() {
+ if i.Src.Packed() && i.Dst.Packed() && i.SrcMask.Packed() && i.DstMask.Packed() {
safecopy.CopyIn(dst, unsafe.Pointer(i))
} else {
// Type IP6TIP doesn't have a packed layout in memory, fallback to MarshalBytes.
@@ -3211,7 +3211,7 @@ func (i *IP6TIP) MarshalUnsafe(dst []byte) {
// UnmarshalUnsafe implements marshal.Marshallable.UnmarshalUnsafe.
func (i *IP6TIP) UnmarshalUnsafe(src []byte) {
- if i.Src.Packed() && i.Dst.Packed() && i.SrcMask.Packed() && i.DstMask.Packed() {
+ if i.Dst.Packed() && i.SrcMask.Packed() && i.DstMask.Packed() && i.Src.Packed() {
safecopy.CopyOut(unsafe.Pointer(i), src)
} else {
// Type IP6TIP doesn't have a packed layout in memory, fallback to UnmarshalBytes.
@@ -3278,7 +3278,7 @@ func (i *IP6TIP) CopyIn(task marshal.Task, addr usermem.Addr) (int, error) {
// WriteTo implements io.WriterTo.WriteTo.
func (i *IP6TIP) WriteTo(w io.Writer) (int64, error) {
- if !i.SrcMask.Packed() && i.DstMask.Packed() && i.Src.Packed() && i.Dst.Packed() {
+ if !i.DstMask.Packed() && i.Src.Packed() && i.Dst.Packed() && i.SrcMask.Packed() {
// Type IP6TIP doesn't have a packed layout in memory, fall back to MarshalBytes.
buf := make([]byte, i.SizeBytes())
i.MarshalBytes(buf)
diff --git a/pkg/p9/server.go b/pkg/p9/server.go
index b9f15e4ed..3736f12a3 100644
--- a/pkg/p9/server.go
+++ b/pkg/p9/server.go
@@ -60,12 +60,6 @@ type connState struct {
// server is the backing server.
server *Server
- // sendMu is the send lock.
- sendMu sync.Mutex
-
- // conn is the connection.
- conn *unet.Socket
-
// fids is the set of active FIDs.
//
// This is used to find FIDs for files.
@@ -92,14 +86,25 @@ type connState struct {
// -- below relates to the legacy handler --
- // recvOkay indicates that a receive may start.
- recvOkay chan bool
+ // recvMu serializes receiving from conn.
+ recvMu sync.Mutex
+
+ // recvIdle is the number of goroutines in handleRequests() attempting to
+ // lock recvMu so that they can receive from conn. recvIdle is accessed
+ // using atomic memory operations.
+ recvIdle int32
+
+ // If recvShutdown is true, at least one goroutine has observed a
+ // connection error while receiving from conn, and all goroutines in
+ // handleRequests() should exit immediately. recvShutdown is protected by
+ // recvMu.
+ recvShutdown bool
- // recvDone is signalled when a message is received.
- recvDone chan error
+ // sendMu serializes sending to conn.
+ sendMu sync.Mutex
- // sendDone is signalled when a send is finished.
- sendDone chan error
+ // conn is the connection used by the legacy transport.
+ conn *unet.Socket
// -- below relates to the flipcall handler --
@@ -508,11 +513,21 @@ func (cs *connState) handle(m message) (r message) {
return
}
-// handleRequest handles a single request.
-//
-// The recvDone channel is signaled when recv is done (with a error if
-// necessary). The sendDone channel is signaled with the result of the send.
-func (cs *connState) handleRequest() {
+// handleRequest handles a single request. It returns true if the caller should
+// continue handling requests and false if it should terminate.
+func (cs *connState) handleRequest() bool {
+ // Obtain the right to receive a message from cs.conn.
+ atomic.AddInt32(&cs.recvIdle, 1)
+ cs.recvMu.Lock()
+ atomic.AddInt32(&cs.recvIdle, -1)
+
+ if cs.recvShutdown {
+ // Another goroutine already detected a connection problem; exit
+ // immediately.
+ cs.recvMu.Unlock()
+ return false
+ }
+
messageSize := atomic.LoadUint32(&cs.messageSize)
if messageSize == 0 {
// Default or not yet negotiated.
@@ -523,12 +538,17 @@ func (cs *connState) handleRequest() {
tag, m, err := recv(cs.conn, messageSize, msgRegistry.get)
if errSocket, ok := err.(ErrSocket); ok {
// Connection problem; stop serving.
- cs.recvDone <- errSocket.error
- return
+ log.Debugf("p9.recv: %v", errSocket.error)
+ cs.recvShutdown = true
+ cs.recvMu.Unlock()
+ return false
}
- // Signal receive is done.
- cs.recvDone <- nil
+ // Ensure that another goroutine is available to receive from cs.conn.
+ if atomic.LoadInt32(&cs.recvIdle) == 0 {
+ go cs.handleRequests() // S/R-SAFE: Irrelevant.
+ }
+ cs.recvMu.Unlock()
// Deal with other errors.
if err != nil && err != io.EOF {
@@ -537,16 +557,17 @@ func (cs *connState) handleRequest() {
cs.sendMu.Lock()
err := send(cs.conn, tag, newErr(err))
cs.sendMu.Unlock()
- cs.sendDone <- err
- return
+ if err != nil {
+ log.Debugf("p9.send: %v", err)
+ }
+ return true
}
// Try to start the tag.
if !cs.StartTag(tag) {
// Nothing we can do at this point; client is bogus.
log.Debugf("no valid tag [%05d]", tag)
- cs.sendDone <- ErrNoValidMessage
- return
+ return true
}
// Handle the message.
@@ -560,15 +581,21 @@ func (cs *connState) handleRequest() {
cs.sendMu.Lock()
err = send(cs.conn, tag, r)
cs.sendMu.Unlock()
- cs.sendDone <- err
+ if err != nil {
+ log.Debugf("p9.send: %v", err)
+ }
// Return the message to the cache.
msgRegistry.put(m)
+
+ return true
}
func (cs *connState) handleRequests() {
- for range cs.recvOkay {
- cs.handleRequest()
+ for {
+ if !cs.handleRequest() {
+ return
+ }
}
}
@@ -578,11 +605,6 @@ func (cs *connState) stop() {
// us with SIGABRT to get a stack dump of the offending handler.
cs.pendingWg.Wait()
- // Close all channels.
- close(cs.recvOkay)
- close(cs.recvDone)
- close(cs.sendDone)
-
// Free the channels.
cs.channelMu.Lock()
for _, ch := range cs.channels {
@@ -600,6 +622,9 @@ func (cs *connState) stop() {
cs.channelAlloc.Destroy()
}
+ // Ensure the connection is closed.
+ cs.conn.Close()
+
// Close all remaining fids.
for fid, fidRef := range cs.fids {
delete(cs.fids, fid)
@@ -609,59 +634,23 @@ func (cs *connState) stop() {
// handlers running via the wait for Pending => 0 below.
fidRef.DecRef()
}
-
- // Ensure the connection is closed.
- cs.conn.Close()
-}
-
-// service services requests concurrently.
-func (cs *connState) service() error {
- // Start the first request handler.
- go cs.handleRequests() // S/R-SAFE: Irrelevant.
- cs.recvOkay <- true
-
- // We loop and make sure there's always one goroutine waiting for a new
- // request. We process all the data for a single request in one
- // goroutine however, to ensure the best turnaround time possible.
- for {
- select {
- case err := <-cs.recvDone:
- if err != nil {
- return err
- }
-
- // Kick the next receiver, or start a new handler
- // if no receiver is currently waiting.
- select {
- case cs.recvOkay <- true:
- default:
- go cs.handleRequests() // S/R-SAFE: Irrelevant.
- cs.recvOkay <- true
- }
-
- case <-cs.sendDone:
- // Error sending a response? Nothing can be done.
- //
- // We don't terminate on a send error though, since
- // we still have a pending receive. The error would
- // have been logged above, we just ignore it here.
- }
- }
}
// Handle handles a single connection.
func (s *Server) Handle(conn *unet.Socket) error {
cs := &connState{
- server: s,
- conn: conn,
- fids: make(map[FID]*fidRef),
- tags: make(map[Tag]chan struct{}),
- recvOkay: make(chan bool),
- recvDone: make(chan error, 10),
- sendDone: make(chan error, 10),
+ server: s,
+ fids: make(map[FID]*fidRef),
+ tags: make(map[Tag]chan struct{}),
+ conn: conn,
}
defer cs.stop()
- return cs.service()
+
+ // Serve requests from conn in the current goroutine; handleRequests() will
+ // create more goroutines as needed.
+ cs.handleRequests()
+
+ return nil
}
// Serve handles requests from the bound socket.