summaryrefslogtreecommitdiffhomepage
path: root/pkg/p9
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/p9')
-rw-r--r--pkg/p9/client_file.go38
-rw-r--r--pkg/p9/client_test.go2
-rw-r--r--pkg/p9/file.go24
-rw-r--r--pkg/p9/handlers.go31
-rw-r--r--pkg/p9/messages.go60
-rw-r--r--pkg/p9/messages_test.go24
-rw-r--r--pkg/p9/p9.go162
-rw-r--r--pkg/p9/p9test/client_test.go23
-rw-r--r--pkg/p9/server.go170
-rw-r--r--pkg/p9/transport.go64
-rw-r--r--pkg/p9/transport_test.go2
-rw-r--r--pkg/p9/version.go8
12 files changed, 390 insertions, 218 deletions
diff --git a/pkg/p9/client_file.go b/pkg/p9/client_file.go
index 2ee07b664..28fe081d6 100644
--- a/pkg/p9/client_file.go
+++ b/pkg/p9/client_file.go
@@ -54,6 +54,8 @@ func (c *Client) newFile(fid FID) *clientFile {
//
// This proxies all of the interfaces found in file.go.
type clientFile struct {
+ DisallowServerCalls
+
// client is the originating client.
client *Client
@@ -283,6 +285,39 @@ func (c *clientFile) Close() error {
return nil
}
+// SetAttrClose implements File.SetAttrClose.
+func (c *clientFile) SetAttrClose(valid SetAttrMask, attr SetAttr) error {
+ if !versionSupportsTsetattrclunk(c.client.version) {
+ setAttrErr := c.SetAttr(valid, attr)
+
+ // Try to close file even in case of failure above. Since the state of the
+ // file is unknown to the caller, it will not attempt to close the file
+ // again.
+ if err := c.Close(); err != nil {
+ return err
+ }
+
+ return setAttrErr
+ }
+
+ // Avoid double close.
+ if !atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
+ return syscall.EBADF
+ }
+
+ // Send the message.
+ if err := c.client.sendRecv(&Tsetattrclunk{FID: c.fid, Valid: valid, SetAttr: attr}, &Rsetattrclunk{}); err != nil {
+ // If an error occurred, we toss away the FID. This isn't ideal,
+ // but I'm not sure what else makes sense in this context.
+ log.Warningf("Tsetattrclunk failed, losing FID %v: %v", c.fid, err)
+ return err
+ }
+
+ // Return the FID to the pool.
+ c.client.fidPool.Put(uint64(c.fid))
+ return nil
+}
+
// Open implements File.Open.
func (c *clientFile) Open(flags OpenFlags) (*fd.FD, QID, uint32, error) {
if atomic.LoadUint32(&c.closed) != 0 {
@@ -681,6 +716,3 @@ func (c *clientFile) Flush() error {
return c.client.sendRecv(&Tflushf{FID: c.fid}, &Rflushf{})
}
-
-// Renamed implements File.Renamed.
-func (c *clientFile) Renamed(newDir File, newName string) {}
diff --git a/pkg/p9/client_test.go b/pkg/p9/client_test.go
index c757583e0..b78fdab7a 100644
--- a/pkg/p9/client_test.go
+++ b/pkg/p9/client_test.go
@@ -62,6 +62,8 @@ func TestVersion(t *testing.T) {
}
func benchmarkSendRecv(b *testing.B, fn func(c *Client) func(message, message) error) {
+ b.ReportAllocs()
+
// See above.
serverSocket, clientSocket, err := unet.SocketPair(false)
if err != nil {
diff --git a/pkg/p9/file.go b/pkg/p9/file.go
index cab35896f..c2e3a3f98 100644
--- a/pkg/p9/file.go
+++ b/pkg/p9/file.go
@@ -135,6 +135,14 @@ type File interface {
// On the server, Close has no concurrency guarantee.
Close() error
+ // SetAttrClose is the equivalent of calling SetAttr() followed by Close().
+ // This can be used to set file times before closing the file in a single
+ // operation.
+ //
+ // On the server, SetAttr has a write concurrency guarantee.
+ // On the server, Close has no concurrency guarantee.
+ SetAttrClose(valid SetAttrMask, attr SetAttr) error
+
// Open must be called prior to using Read, Write or Readdir. Once Open
// is called, some operations, such as Walk, will no longer work.
//
@@ -286,3 +294,19 @@ type DefaultWalkGetAttr struct{}
func (DefaultWalkGetAttr) WalkGetAttr([]string) ([]QID, File, AttrMask, Attr, error) {
return nil, nil, AttrMask{}, Attr{}, syscall.ENOSYS
}
+
+// DisallowClientCalls panics if a client-only function is called.
+type DisallowClientCalls struct{}
+
+// SetAttrClose implements File.SetAttrClose.
+func (DisallowClientCalls) SetAttrClose(SetAttrMask, SetAttr) error {
+ panic("SetAttrClose should not be called on the server")
+}
+
+// DisallowServerCalls panics if a server-only function is called.
+type DisallowServerCalls struct{}
+
+// Renamed implements File.Renamed.
+func (*clientFile) Renamed(File, string) {
+ panic("Renamed should not be called on the client")
+}
diff --git a/pkg/p9/handlers.go b/pkg/p9/handlers.go
index 1db5797dd..abd237f46 100644
--- a/pkg/p9/handlers.go
+++ b/pkg/p9/handlers.go
@@ -123,6 +123,37 @@ func (t *Tclunk) handle(cs *connState) message {
return &Rclunk{}
}
+func (t *Tsetattrclunk) handle(cs *connState) message {
+ ref, ok := cs.LookupFID(t.FID)
+ if !ok {
+ return newErr(syscall.EBADF)
+ }
+ defer ref.DecRef()
+
+ setAttrErr := ref.safelyWrite(func() error {
+ // We don't allow setattr on files that have been deleted.
+ // This might be technically incorrect, as it's possible that
+ // there were multiple links and you can still change the
+ // corresponding inode information.
+ if ref.isDeleted() {
+ return syscall.EINVAL
+ }
+
+ // Set the attributes.
+ return ref.file.SetAttr(t.Valid, t.SetAttr)
+ })
+
+ // Try to delete FID even in case of failure above. Since the state of the
+ // file is unknown to the caller, it will not attempt to close the file again.
+ if !cs.DeleteFID(t.FID) {
+ return newErr(syscall.EBADF)
+ }
+ if setAttrErr != nil {
+ return newErr(setAttrErr)
+ }
+ return &Rsetattrclunk{}
+}
+
// handle implements handler.handle.
func (t *Tremove) handle(cs *connState) message {
ref, ok := cs.LookupFID(t.FID)
diff --git a/pkg/p9/messages.go b/pkg/p9/messages.go
index 2cb59f934..cf13cbb69 100644
--- a/pkg/p9/messages.go
+++ b/pkg/p9/messages.go
@@ -317,6 +317,64 @@ func (r *Rclunk) String() string {
return "Rclunk{}"
}
+// Tsetattrclunk is a setattr+close request.
+type Tsetattrclunk struct {
+ // FID is the FID to change.
+ FID FID
+
+ // Valid is the set of bits which will be used.
+ Valid SetAttrMask
+
+ // SetAttr is the set request.
+ SetAttr SetAttr
+}
+
+// decode implements encoder.decode.
+func (t *Tsetattrclunk) decode(b *buffer) {
+ t.FID = b.ReadFID()
+ t.Valid.decode(b)
+ t.SetAttr.decode(b)
+}
+
+// encode implements encoder.encode.
+func (t *Tsetattrclunk) encode(b *buffer) {
+ b.WriteFID(t.FID)
+ t.Valid.encode(b)
+ t.SetAttr.encode(b)
+}
+
+// Type implements message.Type.
+func (*Tsetattrclunk) Type() MsgType {
+ return MsgTsetattrclunk
+}
+
+// String implements fmt.Stringer.
+func (t *Tsetattrclunk) String() string {
+ return fmt.Sprintf("Tsetattrclunk{FID: %d, Valid: %v, SetAttr: %s}", t.FID, t.Valid, t.SetAttr)
+}
+
+// Rsetattrclunk is a setattr+close response.
+type Rsetattrclunk struct {
+}
+
+// decode implements encoder.decode.
+func (*Rsetattrclunk) decode(*buffer) {
+}
+
+// encode implements encoder.encode.
+func (*Rsetattrclunk) encode(*buffer) {
+}
+
+// Type implements message.Type.
+func (*Rsetattrclunk) Type() MsgType {
+ return MsgRsetattrclunk
+}
+
+// String implements fmt.Stringer.
+func (r *Rsetattrclunk) String() string {
+ return "Rsetattrclunk{}"
+}
+
// Tremove is a remove request.
//
// This will eventually be replaced by Tunlinkat.
@@ -2657,6 +2715,8 @@ func init() {
msgRegistry.register(MsgRlconnect, func() message { return &Rlconnect{} })
msgRegistry.register(MsgTallocate, func() message { return &Tallocate{} })
msgRegistry.register(MsgRallocate, func() message { return &Rallocate{} })
+ msgRegistry.register(MsgTsetattrclunk, func() message { return &Tsetattrclunk{} })
+ msgRegistry.register(MsgRsetattrclunk, func() message { return &Rsetattrclunk{} })
msgRegistry.register(MsgTchannel, func() message { return &Tchannel{} })
msgRegistry.register(MsgRchannel, func() message { return &Rchannel{} })
}
diff --git a/pkg/p9/messages_test.go b/pkg/p9/messages_test.go
index 7facc9f5e..bfeb6c236 100644
--- a/pkg/p9/messages_test.go
+++ b/pkg/p9/messages_test.go
@@ -376,6 +376,30 @@ func TestEncodeDecode(t *testing.T) {
&Rumknod{
Rmknod{QID: QID{Type: 1}},
},
+ &Tsetattrclunk{
+ FID: 1,
+ Valid: SetAttrMask{
+ Permissions: true,
+ UID: true,
+ GID: true,
+ Size: true,
+ ATime: true,
+ MTime: true,
+ CTime: true,
+ ATimeNotSystemTime: true,
+ MTimeNotSystemTime: true,
+ },
+ SetAttr: SetAttr{
+ Permissions: 1,
+ UID: 2,
+ GID: 3,
+ Size: 4,
+ ATimeSeconds: 5,
+ ATimeNanoSeconds: 6,
+ MTimeSeconds: 7,
+ MTimeNanoSeconds: 8,
+ },
+ },
}
for _, enc := range objs {
diff --git a/pkg/p9/p9.go b/pkg/p9/p9.go
index 122c457d2..2235f8968 100644
--- a/pkg/p9/p9.go
+++ b/pkg/p9/p9.go
@@ -315,86 +315,88 @@ type MsgType uint8
// MsgType declarations.
const (
- MsgTlerror MsgType = 6
- MsgRlerror = 7
- MsgTstatfs = 8
- MsgRstatfs = 9
- MsgTlopen = 12
- MsgRlopen = 13
- MsgTlcreate = 14
- MsgRlcreate = 15
- MsgTsymlink = 16
- MsgRsymlink = 17
- MsgTmknod = 18
- MsgRmknod = 19
- MsgTrename = 20
- MsgRrename = 21
- MsgTreadlink = 22
- MsgRreadlink = 23
- MsgTgetattr = 24
- MsgRgetattr = 25
- MsgTsetattr = 26
- MsgRsetattr = 27
- MsgTlistxattr = 28
- MsgRlistxattr = 29
- MsgTxattrwalk = 30
- MsgRxattrwalk = 31
- MsgTxattrcreate = 32
- MsgRxattrcreate = 33
- MsgTgetxattr = 34
- MsgRgetxattr = 35
- MsgTsetxattr = 36
- MsgRsetxattr = 37
- MsgTremovexattr = 38
- MsgRremovexattr = 39
- MsgTreaddir = 40
- MsgRreaddir = 41
- MsgTfsync = 50
- MsgRfsync = 51
- MsgTlink = 70
- MsgRlink = 71
- MsgTmkdir = 72
- MsgRmkdir = 73
- MsgTrenameat = 74
- MsgRrenameat = 75
- MsgTunlinkat = 76
- MsgRunlinkat = 77
- MsgTversion = 100
- MsgRversion = 101
- MsgTauth = 102
- MsgRauth = 103
- MsgTattach = 104
- MsgRattach = 105
- MsgTflush = 108
- MsgRflush = 109
- MsgTwalk = 110
- MsgRwalk = 111
- MsgTread = 116
- MsgRread = 117
- MsgTwrite = 118
- MsgRwrite = 119
- MsgTclunk = 120
- MsgRclunk = 121
- MsgTremove = 122
- MsgRremove = 123
- MsgTflushf = 124
- MsgRflushf = 125
- MsgTwalkgetattr = 126
- MsgRwalkgetattr = 127
- MsgTucreate = 128
- MsgRucreate = 129
- MsgTumkdir = 130
- MsgRumkdir = 131
- MsgTumknod = 132
- MsgRumknod = 133
- MsgTusymlink = 134
- MsgRusymlink = 135
- MsgTlconnect = 136
- MsgRlconnect = 137
- MsgTallocate = 138
- MsgRallocate = 139
- MsgTchannel = 250
- MsgRchannel = 251
+ MsgTlerror MsgType = 6
+ MsgRlerror MsgType = 7
+ MsgTstatfs MsgType = 8
+ MsgRstatfs MsgType = 9
+ MsgTlopen MsgType = 12
+ MsgRlopen MsgType = 13
+ MsgTlcreate MsgType = 14
+ MsgRlcreate MsgType = 15
+ MsgTsymlink MsgType = 16
+ MsgRsymlink MsgType = 17
+ MsgTmknod MsgType = 18
+ MsgRmknod MsgType = 19
+ MsgTrename MsgType = 20
+ MsgRrename MsgType = 21
+ MsgTreadlink MsgType = 22
+ MsgRreadlink MsgType = 23
+ MsgTgetattr MsgType = 24
+ MsgRgetattr MsgType = 25
+ MsgTsetattr MsgType = 26
+ MsgRsetattr MsgType = 27
+ MsgTlistxattr MsgType = 28
+ MsgRlistxattr MsgType = 29
+ MsgTxattrwalk MsgType = 30
+ MsgRxattrwalk MsgType = 31
+ MsgTxattrcreate MsgType = 32
+ MsgRxattrcreate MsgType = 33
+ MsgTgetxattr MsgType = 34
+ MsgRgetxattr MsgType = 35
+ MsgTsetxattr MsgType = 36
+ MsgRsetxattr MsgType = 37
+ MsgTremovexattr MsgType = 38
+ MsgRremovexattr MsgType = 39
+ MsgTreaddir MsgType = 40
+ MsgRreaddir MsgType = 41
+ MsgTfsync MsgType = 50
+ MsgRfsync MsgType = 51
+ MsgTlink MsgType = 70
+ MsgRlink MsgType = 71
+ MsgTmkdir MsgType = 72
+ MsgRmkdir MsgType = 73
+ MsgTrenameat MsgType = 74
+ MsgRrenameat MsgType = 75
+ MsgTunlinkat MsgType = 76
+ MsgRunlinkat MsgType = 77
+ MsgTversion MsgType = 100
+ MsgRversion MsgType = 101
+ MsgTauth MsgType = 102
+ MsgRauth MsgType = 103
+ MsgTattach MsgType = 104
+ MsgRattach MsgType = 105
+ MsgTflush MsgType = 108
+ MsgRflush MsgType = 109
+ MsgTwalk MsgType = 110
+ MsgRwalk MsgType = 111
+ MsgTread MsgType = 116
+ MsgRread MsgType = 117
+ MsgTwrite MsgType = 118
+ MsgRwrite MsgType = 119
+ MsgTclunk MsgType = 120
+ MsgRclunk MsgType = 121
+ MsgTremove MsgType = 122
+ MsgRremove MsgType = 123
+ MsgTflushf MsgType = 124
+ MsgRflushf MsgType = 125
+ MsgTwalkgetattr MsgType = 126
+ MsgRwalkgetattr MsgType = 127
+ MsgTucreate MsgType = 128
+ MsgRucreate MsgType = 129
+ MsgTumkdir MsgType = 130
+ MsgRumkdir MsgType = 131
+ MsgTumknod MsgType = 132
+ MsgRumknod MsgType = 133
+ MsgTusymlink MsgType = 134
+ MsgRusymlink MsgType = 135
+ MsgTlconnect MsgType = 136
+ MsgRlconnect MsgType = 137
+ MsgTallocate MsgType = 138
+ MsgRallocate MsgType = 139
+ MsgTsetattrclunk MsgType = 140
+ MsgRsetattrclunk MsgType = 141
+ MsgTchannel MsgType = 250
+ MsgRchannel MsgType = 251
)
// QIDType represents the file type for QIDs.
diff --git a/pkg/p9/p9test/client_test.go b/pkg/p9/p9test/client_test.go
index 6e7bb3db2..6e605b14c 100644
--- a/pkg/p9/p9test/client_test.go
+++ b/pkg/p9/p9test/client_test.go
@@ -1225,22 +1225,31 @@ func TestOpen(t *testing.T) {
func TestClose(t *testing.T) {
type closeTest struct {
name string
- closeFn func(backend *Mock, f p9.File)
+ closeFn func(backend *Mock, f p9.File) error
}
cases := []closeTest{
{
name: "close",
- closeFn: func(_ *Mock, f p9.File) {
- f.Close()
+ closeFn: func(_ *Mock, f p9.File) error {
+ return f.Close()
},
},
{
name: "remove",
- closeFn: func(backend *Mock, f p9.File) {
+ closeFn: func(backend *Mock, f p9.File) error {
// Allow the rename call in the parent, automatically translated.
backend.parent.EXPECT().UnlinkAt(gomock.Any(), gomock.Any()).Times(1)
- f.(deprecatedRemover).Remove()
+ return f.(deprecatedRemover).Remove()
+ },
+ },
+ {
+ name: "setAttrClose",
+ closeFn: func(backend *Mock, f p9.File) error {
+ valid := p9.SetAttrMask{ATime: true}
+ attr := p9.SetAttr{ATimeSeconds: 1, ATimeNanoSeconds: 2}
+ backend.EXPECT().SetAttr(valid, attr).Times(1)
+ return f.SetAttrClose(valid, attr)
},
},
}
@@ -1258,7 +1267,9 @@ func TestClose(t *testing.T) {
_, backend, f := walkHelper(h, name, root)
// Close via the prescribed method.
- tc.closeFn(backend, f)
+ if err := tc.closeFn(backend, f); err != nil {
+ t.Fatalf("closeFn failed: %v", err)
+ }
// Everything should fail with EBADF.
if _, _, err := f.Walk(nil); err != syscall.EBADF {
diff --git a/pkg/p9/server.go b/pkg/p9/server.go
index 60cf94fa1..3736f12a3 100644
--- a/pkg/p9/server.go
+++ b/pkg/p9/server.go
@@ -60,12 +60,6 @@ type connState struct {
// server is the backing server.
server *Server
- // sendMu is the send lock.
- sendMu sync.Mutex
-
- // conn is the connection.
- conn *unet.Socket
-
// fids is the set of active FIDs.
//
// This is used to find FIDs for files.
@@ -87,16 +81,30 @@ type connState struct {
// version 0 implies 9P2000.L.
version uint32
+ // pendingWg counts requests that are still being handled.
+ pendingWg sync.WaitGroup
+
// -- below relates to the legacy handler --
- // recvOkay indicates that a receive may start.
- recvOkay chan bool
+ // recvMu serializes receiving from conn.
+ recvMu sync.Mutex
+
+ // recvIdle is the number of goroutines in handleRequests() attempting to
+ // lock recvMu so that they can receive from conn. recvIdle is accessed
+ // using atomic memory operations.
+ recvIdle int32
- // recvDone is signalled when a message is received.
- recvDone chan error
+ // If recvShutdown is true, at least one goroutine has observed a
+ // connection error while receiving from conn, and all goroutines in
+ // handleRequests() should exit immediately. recvShutdown is protected by
+ // recvMu.
+ recvShutdown bool
- // sendDone is signalled when a send is finished.
- sendDone chan error
+ // sendMu serializes sending to conn.
+ sendMu sync.Mutex
+
+ // conn is the connection used by the legacy transport.
+ conn *unet.Socket
// -- below relates to the flipcall handler --
@@ -479,7 +487,9 @@ func (cs *connState) lookupChannel(id uint32) *channel {
// handle handles a single message.
func (cs *connState) handle(m message) (r message) {
+ cs.pendingWg.Add(1)
defer func() {
+ cs.pendingWg.Done()
if r == nil {
// Don't allow a panic to propagate.
err := recover()
@@ -503,11 +513,21 @@ func (cs *connState) handle(m message) (r message) {
return
}
-// handleRequest handles a single request.
-//
-// The recvDone channel is signaled when recv is done (with a error if
-// necessary). The sendDone channel is signaled with the result of the send.
-func (cs *connState) handleRequest() {
+// handleRequest handles a single request. It returns true if the caller should
+// continue handling requests and false if it should terminate.
+func (cs *connState) handleRequest() bool {
+ // Obtain the right to receive a message from cs.conn.
+ atomic.AddInt32(&cs.recvIdle, 1)
+ cs.recvMu.Lock()
+ atomic.AddInt32(&cs.recvIdle, -1)
+
+ if cs.recvShutdown {
+ // Another goroutine already detected a connection problem; exit
+ // immediately.
+ cs.recvMu.Unlock()
+ return false
+ }
+
messageSize := atomic.LoadUint32(&cs.messageSize)
if messageSize == 0 {
// Default or not yet negotiated.
@@ -518,12 +538,17 @@ func (cs *connState) handleRequest() {
tag, m, err := recv(cs.conn, messageSize, msgRegistry.get)
if errSocket, ok := err.(ErrSocket); ok {
// Connection problem; stop serving.
- cs.recvDone <- errSocket.error
- return
+ log.Debugf("p9.recv: %v", errSocket.error)
+ cs.recvShutdown = true
+ cs.recvMu.Unlock()
+ return false
}
- // Signal receive is done.
- cs.recvDone <- nil
+ // Ensure that another goroutine is available to receive from cs.conn.
+ if atomic.LoadInt32(&cs.recvIdle) == 0 {
+ go cs.handleRequests() // S/R-SAFE: Irrelevant.
+ }
+ cs.recvMu.Unlock()
// Deal with other errors.
if err != nil && err != io.EOF {
@@ -532,16 +557,17 @@ func (cs *connState) handleRequest() {
cs.sendMu.Lock()
err := send(cs.conn, tag, newErr(err))
cs.sendMu.Unlock()
- cs.sendDone <- err
- return
+ if err != nil {
+ log.Debugf("p9.send: %v", err)
+ }
+ return true
}
// Try to start the tag.
if !cs.StartTag(tag) {
// Nothing we can do at this point; client is bogus.
log.Debugf("no valid tag [%05d]", tag)
- cs.sendDone <- ErrNoValidMessage
- return
+ return true
}
// Handle the message.
@@ -555,23 +581,29 @@ func (cs *connState) handleRequest() {
cs.sendMu.Lock()
err = send(cs.conn, tag, r)
cs.sendMu.Unlock()
- cs.sendDone <- err
+ if err != nil {
+ log.Debugf("p9.send: %v", err)
+ }
// Return the message to the cache.
msgRegistry.put(m)
+
+ return true
}
func (cs *connState) handleRequests() {
- for range cs.recvOkay {
- cs.handleRequest()
+ for {
+ if !cs.handleRequest() {
+ return
+ }
}
}
func (cs *connState) stop() {
- // Close all channels.
- close(cs.recvOkay)
- close(cs.recvDone)
- close(cs.sendDone)
+ // Wait for completion of all inflight requests. This is mostly so that if
+ // a request is stuck, the sandbox supervisor has the opportunity to kill
+ // us with SIGABRT to get a stack dump of the offending handler.
+ cs.pendingWg.Wait()
// Free the channels.
cs.channelMu.Lock()
@@ -590,6 +622,9 @@ func (cs *connState) stop() {
cs.channelAlloc.Destroy()
}
+ // Ensure the connection is closed.
+ cs.conn.Close()
+
// Close all remaining fids.
for fid, fidRef := range cs.fids {
delete(cs.fids, fid)
@@ -599,74 +634,23 @@ func (cs *connState) stop() {
// handlers running via the wait for Pending => 0 below.
fidRef.DecRef()
}
-
- // Ensure the connection is closed.
- cs.conn.Close()
-}
-
-// service services requests concurrently.
-func (cs *connState) service() error {
- // Pending is the number of handlers that have finished receiving but
- // not finished processing requests. These must be waiting on properly
- // below. See the next comment for an explanation of the loop.
- pending := 0
-
- // Start the first request handler.
- go cs.handleRequests() // S/R-SAFE: Irrelevant.
- cs.recvOkay <- true
-
- // We loop and make sure there's always one goroutine waiting for a new
- // request. We process all the data for a single request in one
- // goroutine however, to ensure the best turnaround time possible.
- for {
- select {
- case err := <-cs.recvDone:
- if err != nil {
- // Wait for pending handlers.
- for i := 0; i < pending; i++ {
- <-cs.sendDone
- }
- return nil
- }
-
- // This handler is now pending.
- pending++
-
- // Kick the next receiver, or start a new handler
- // if no receiver is currently waiting.
- select {
- case cs.recvOkay <- true:
- default:
- go cs.handleRequests() // S/R-SAFE: Irrelevant.
- cs.recvOkay <- true
- }
-
- case <-cs.sendDone:
- // This handler is finished.
- pending--
-
- // Error sending a response? Nothing can be done.
- //
- // We don't terminate on a send error though, since
- // we still have a pending receive. The error would
- // have been logged above, we just ignore it here.
- }
- }
}
// Handle handles a single connection.
func (s *Server) Handle(conn *unet.Socket) error {
cs := &connState{
- server: s,
- conn: conn,
- fids: make(map[FID]*fidRef),
- tags: make(map[Tag]chan struct{}),
- recvOkay: make(chan bool),
- recvDone: make(chan error, 10),
- sendDone: make(chan error, 10),
+ server: s,
+ fids: make(map[FID]*fidRef),
+ tags: make(map[Tag]chan struct{}),
+ conn: conn,
}
defer cs.stop()
- return cs.service()
+
+ // Serve requests from conn in the current goroutine; handleRequests() will
+ // create more goroutines as needed.
+ cs.handleRequests()
+
+ return nil
}
// Serve handles requests from the bound socket.
diff --git a/pkg/p9/transport.go b/pkg/p9/transport.go
index 7cec0e86d..02e665345 100644
--- a/pkg/p9/transport.go
+++ b/pkg/p9/transport.go
@@ -66,14 +66,17 @@ const (
var dataPool = sync.Pool{
New: func() interface{} {
// These buffers are used for decoding without a payload.
- return make([]byte, initialBufferLength)
+ // We need to return a pointer to avoid unnecessary allocations
+ // (see https://staticcheck.io/docs/checks#SA6002).
+ b := make([]byte, initialBufferLength)
+ return &b
},
}
// send sends the given message over the socket.
func send(s *unet.Socket, tag Tag, m message) error {
- data := dataPool.Get().([]byte)
- dataBuf := buffer{data: data[:0]}
+ data := dataPool.Get().(*[]byte)
+ dataBuf := buffer{data: (*data)[:0]}
if log.IsLogging(log.Debug) {
log.Debugf("send [FD %d] [Tag %06d] %s", s.FD(), tag, m.String())
@@ -141,7 +144,7 @@ func send(s *unet.Socket, tag Tag, m message) error {
}
// All set.
- dataPool.Put(dataBuf.data)
+ dataPool.Put(&dataBuf.data)
return nil
}
@@ -227,12 +230,29 @@ func recv(s *unet.Socket, msize uint32, lookup lookupTagAndType) (Tag, message,
// Not yet initialized.
var dataBuf buffer
+ var vecs [][]byte
+
+ appendBuffer := func(size int) *[]byte {
+ // Pull a data buffer from the pool.
+ datap := dataPool.Get().(*[]byte)
+ data := *datap
+ if size > len(data) {
+ // Create a larger data buffer.
+ data = make([]byte, size)
+ datap = &data
+ } else {
+ // Limit the data buffer.
+ data = data[:size]
+ }
+ dataBuf = buffer{data: data}
+ vecs = append(vecs, data)
+ return datap
+ }
// Read the rest of the payload.
//
// This requires some special care to ensure that the vectors all line
// up the way they should. We do this to minimize copying data around.
- var vecs [][]byte
if payloader, ok := m.(payloader); ok {
fixedSize := payloader.FixedSize()
@@ -246,22 +266,8 @@ func recv(s *unet.Socket, msize uint32, lookup lookupTagAndType) (Tag, message,
}
if fixedSize != 0 {
- // Pull a data buffer from the pool.
- data := dataPool.Get().([]byte)
- if int(fixedSize) > len(data) {
- // Create a larger data buffer, ensuring
- // sufficient capicity for the message.
- data = make([]byte, fixedSize)
- defer dataPool.Put(data)
- dataBuf = buffer{data: data}
- vecs = append(vecs, data)
- } else {
- // Limit the data buffer, and make sure it
- // gets filled before the payload buffer.
- defer dataPool.Put(data)
- dataBuf = buffer{data: data[:fixedSize]}
- vecs = append(vecs, data[:fixedSize])
- }
+ datap := appendBuffer(int(fixedSize))
+ defer dataPool.Put(datap)
}
// Include the payload.
@@ -274,20 +280,8 @@ func recv(s *unet.Socket, msize uint32, lookup lookupTagAndType) (Tag, message,
vecs = append(vecs, p)
}
} else if remaining != 0 {
- // Pull a data buffer from the pool.
- data := dataPool.Get().([]byte)
- if int(remaining) > len(data) {
- // Create a larger data buffer.
- data = make([]byte, remaining)
- defer dataPool.Put(data)
- dataBuf = buffer{data: data}
- vecs = append(vecs, data)
- } else {
- // Limit the data buffer.
- defer dataPool.Put(data)
- dataBuf = buffer{data: data[:remaining]}
- vecs = append(vecs, data[:remaining])
- }
+ datap := appendBuffer(int(remaining))
+ defer dataPool.Put(datap)
}
if len(vecs) > 0 {
diff --git a/pkg/p9/transport_test.go b/pkg/p9/transport_test.go
index 3668fcad7..e7406b374 100644
--- a/pkg/p9/transport_test.go
+++ b/pkg/p9/transport_test.go
@@ -182,6 +182,8 @@ func TestSendClosed(t *testing.T) {
}
func BenchmarkSendRecv(b *testing.B) {
+ b.ReportAllocs()
+
server, client, err := unet.SocketPair(false)
if err != nil {
b.Fatalf("socketpair got err %v expected nil", err)
diff --git a/pkg/p9/version.go b/pkg/p9/version.go
index 09cde9f5a..8d7168ef5 100644
--- a/pkg/p9/version.go
+++ b/pkg/p9/version.go
@@ -26,7 +26,7 @@ const (
//
// Clients are expected to start requesting this version number and
// to continuously decrement it until a Tversion request succeeds.
- highestSupportedVersion uint32 = 11
+ highestSupportedVersion uint32 = 12
// lowestSupportedVersion is the lowest supported version X in a
// version string of the format 9P2000.L.Google.X.
@@ -173,3 +173,9 @@ func versionSupportsGetSetXattr(v uint32) bool {
func versionSupportsListRemoveXattr(v uint32) bool {
return v >= 11
}
+
+// versionSupportsTsetattrclunk returns true if version v supports
+// the Tsetattrclunk message.
+func versionSupportsTsetattrclunk(v uint32) bool {
+ return v >= 12
+}