diff options
Diffstat (limited to 'pkg/p9/server.go')
-rw-r--r-- | pkg/p9/server.go | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/pkg/p9/server.go b/pkg/p9/server.go new file mode 100644 index 000000000..2965ae16e --- /dev/null +++ b/pkg/p9/server.go @@ -0,0 +1,380 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package p9 + +import ( + "sync" + "sync/atomic" + "syscall" + + "gvisor.googlesource.com/gvisor/pkg/log" + "gvisor.googlesource.com/gvisor/pkg/unet" +) + +// Server is a 9p2000.L server. +type Server struct { + // attacher provides the attach function. + attacher Attacher +} + +// NewServer returns a new server. +// +func NewServer(attacher Attacher) *Server { + return &Server{ + attacher: attacher, + } +} + +// connState is the state for a single connection. +type connState struct { + // server is the backing server. + server *Server + + // sendMu is the send lock. + sendMu sync.Mutex + + // conn is the connection. + conn *unet.Socket + + // fids is the set of active FIDs. + // + // This is used to find FIDs for files. + fidMu sync.Mutex + fids map[FID]*fidRef + + // tags is the set of active tags. + // + // The given channel is closed when the + // tag is finished with processing. + tagMu sync.Mutex + tags map[Tag]chan struct{} + + // messageSize is the maximum message size. The server does not + // do automatic splitting of messages. + messageSize uint32 + + // version is the agreed upon version X of 9P2000.L.Google.X. + // version 0 implies 9P2000.L. + version uint32 + + // recvOkay indicates that a receive may start. + recvOkay chan bool + + // recvDone is signalled when a message is received. + recvDone chan error + + // sendDone is signalled when a send is finished. + sendDone chan error +} + +// fidRef wraps a node and tracks references. +type fidRef struct { + // file is the associated File. + file File + + // refs is an active refence count. + // + // The node above will be closed only when refs reaches zero. + refs int64 + + // openedMu protects opened and openFlags. + openedMu sync.Mutex + + // opened indicates whether this has been opened already. + // + // This is updated in handlers.go. + opened bool + + // openFlags is the mode used in the open. + // + // This is updated in handlers.go. + openFlags OpenFlags +} + +// OpenFlags returns the flags the file was opened with and true iff the fid was opened previously. +func (f *fidRef) OpenFlags() (OpenFlags, bool) { + f.openedMu.Lock() + defer f.openedMu.Unlock() + return f.openFlags, f.opened +} + +// DecRef should be called when you're finished with a fid. +func (f *fidRef) DecRef() { + if atomic.AddInt64(&f.refs, -1) == 0 { + f.file.Close() + } +} + +// LookupFID finds the given FID. +// +// You should call fid.DecRef when you are finished using the fid. +func (cs *connState) LookupFID(fid FID) (*fidRef, bool) { + cs.fidMu.Lock() + defer cs.fidMu.Unlock() + fidRef, ok := cs.fids[fid] + if ok { + atomic.AddInt64(&fidRef.refs, 1) + return fidRef, true + } + return nil, false +} + +// InsertFID installs the given FID. +// +// This fid starts with a reference count of one. If a FID exists in +// the slot already it is closed, per the specification. +func (cs *connState) InsertFID(fid FID, newRef *fidRef) { + cs.fidMu.Lock() + defer cs.fidMu.Unlock() + origRef, ok := cs.fids[fid] + if ok { + defer origRef.DecRef() + } + atomic.AddInt64(&newRef.refs, 1) + cs.fids[fid] = newRef +} + +// DeleteFID removes the given FID. +// +// This simply removes it from the map and drops a reference. +func (cs *connState) DeleteFID(fid FID) bool { + cs.fidMu.Lock() + defer cs.fidMu.Unlock() + fidRef, ok := cs.fids[fid] + if !ok { + return false + } + delete(cs.fids, fid) + fidRef.DecRef() + return true +} + +// StartTag starts handling the tag. +// +// False is returned if this tag is already active. +func (cs *connState) StartTag(t Tag) bool { + cs.tagMu.Lock() + defer cs.tagMu.Unlock() + _, ok := cs.tags[t] + if ok { + return false + } + cs.tags[t] = make(chan struct{}) + return true +} + +// ClearTag finishes handling a tag. +func (cs *connState) ClearTag(t Tag) { + cs.tagMu.Lock() + defer cs.tagMu.Unlock() + ch, ok := cs.tags[t] + if !ok { + // Should never happen. + panic("unused tag cleared") + } + delete(cs.tags, t) + + // Notify. + close(ch) +} + +// WaitTag waits for a tag to finish. +func (cs *connState) WaitTag(t Tag) { + cs.tagMu.Lock() + ch, ok := cs.tags[t] + cs.tagMu.Unlock() + if !ok { + return + } + + // Wait for close. + <-ch +} + +// handleRequest handles a single request. +// +// The recvDone channel is signaled when recv is done (with a error if +// necessary). The sendDone channel is signaled with the result of the send. +func (cs *connState) handleRequest() { + messageSize := atomic.LoadUint32(&cs.messageSize) + if messageSize == 0 { + // Default or not yet negotiated. + messageSize = maximumLength + } + + // Receive a message. + tag, m, err := recv(cs.conn, messageSize, messageByType) + if errSocket, ok := err.(ErrSocket); ok { + // Connection problem; stop serving. + cs.recvDone <- errSocket.error + return + } + + // Signal receive is done. + cs.recvDone <- nil + + // Deal with other errors. + if err != nil { + // If it's not a connection error, but some other protocol error, + // we can send a response immediately. + log.Debugf("err [%05d] %v", tag, err) + cs.sendMu.Lock() + err := send(cs.conn, tag, newErr(err)) + cs.sendMu.Unlock() + cs.sendDone <- err + return + } + + // Try to start the tag. + if !cs.StartTag(tag) { + // Nothing we can do at this point; client is bogus. + cs.sendDone <- ErrNoValidMessage + return + } + + // Handle the message. + var r message + if handler, ok := m.(handler); ok { + // Call the message handler. + r = handler.handle(cs) + } else { + // Produce an ENOSYS error. + r = newErr(syscall.ENOSYS) + } + + // Clear the tag before sending. That's because as soon + // as this hits the wire, the client can legally send + // another message with the same tag. + cs.ClearTag(tag) + + // Send back the result. + cs.sendMu.Lock() + err = send(cs.conn, tag, r) + cs.sendMu.Unlock() + cs.sendDone <- err + return +} + +func (cs *connState) handleRequests() { + for range cs.recvOkay { + cs.handleRequest() + } +} + +func (cs *connState) stop() { + // Close all channels. + close(cs.recvOkay) + close(cs.recvDone) + close(cs.sendDone) + + for _, fidRef := range cs.fids { + // Drop final reference in the FID table. Note this should + // always close the file, since we've ensured that there are no + // handlers running via the wait for Pending => 0 below. + fidRef.DecRef() + } + + // Ensure the connection is closed. + cs.conn.Close() +} + +// service services requests concurrently. +func (cs *connState) service() error { + // Pending is the number of handlers that have finished receiving but + // not finished processing requests. These must be waiting on properly + // below. See the next comment for an explanation of the loop. + pending := 0 + + // Start the first request handler. + go cs.handleRequests() // S/R-SAFE: Irrelevant. + cs.recvOkay <- true + + // We loop and make sure there's always one goroutine waiting for a new + // request. We process all the data for a single request in one + // goroutine however, to ensure the best turnaround time possible. + for { + select { + case err := <-cs.recvDone: + if err != nil { + // Wait for pending handlers. + for i := 0; i < pending; i++ { + <-cs.sendDone + } + return err + } + + // This handler is now pending. + pending++ + + // Kick the next receiver, or start a new handler + // if no receiver is currently waiting. + select { + case cs.recvOkay <- true: + default: + go cs.handleRequests() // S/R-SAFE: Irrelevant. + cs.recvOkay <- true + } + + case <-cs.sendDone: + // This handler is finished. + pending-- + + // Error sending a response? Nothing can be done. + // + // We don't terminate on a send error though, since + // we still have a pending receive. The error would + // have been logged above, we just ignore it here. + } + } +} + +// Handle handles a single connection. +func (s *Server) Handle(conn *unet.Socket) error { + cs := &connState{ + server: s, + conn: conn, + fids: make(map[FID]*fidRef), + tags: make(map[Tag]chan struct{}), + recvOkay: make(chan bool), + recvDone: make(chan error, 10), + sendDone: make(chan error, 10), + } + defer cs.stop() + return cs.service() +} + +// Serve handles requests from the bound socket. +// +// The passed serverSocket _must_ be created in packet mode. +func (s *Server) Serve(serverSocket *unet.ServerSocket) error { + var wg sync.WaitGroup + defer wg.Wait() + + for { + conn, err := serverSocket.Accept() + if err != nil { + // Something went wrong. + // + // Socket closed? + return err + } + + wg.Add(1) + go func(conn *unet.Socket) { // S/R-SAFE: Irrelevant. + s.Handle(conn) + wg.Done() + }(conn) + } +} |