diff options
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/fdchannel/fdchannel_state_autogen.go | 4 | ||||
-rwxr-xr-x | pkg/fdchannel/fdchannel_unsafe.go | 146 | ||||
-rwxr-xr-x | pkg/flipcall/ctrl_futex.go | 176 | ||||
-rwxr-xr-x | pkg/flipcall/flipcall.go | 255 | ||||
-rwxr-xr-x | pkg/flipcall/flipcall_state_autogen.go | 4 | ||||
-rwxr-xr-x | pkg/flipcall/flipcall_unsafe.go | 87 | ||||
-rwxr-xr-x | pkg/flipcall/futex_linux.go | 118 | ||||
-rwxr-xr-x | pkg/flipcall/io.go | 113 | ||||
-rwxr-xr-x | pkg/flipcall/packet_window_allocator.go | 166 | ||||
-rw-r--r-- | pkg/p9/client.go | 280 | ||||
-rw-r--r-- | pkg/p9/handlers.go | 53 | ||||
-rw-r--r-- | pkg/p9/messages.go | 103 | ||||
-rw-r--r-- | pkg/p9/p9.go | 2 | ||||
-rw-r--r-- | pkg/p9/server.go | 178 | ||||
-rw-r--r-- | pkg/p9/transport.go | 5 | ||||
-rwxr-xr-x | pkg/p9/transport_flipcall.go | 254 | ||||
-rw-r--r-- | pkg/p9/version.go | 9 |
17 files changed, 1873 insertions, 80 deletions
diff --git a/pkg/fdchannel/fdchannel_state_autogen.go b/pkg/fdchannel/fdchannel_state_autogen.go new file mode 100755 index 000000000..8dbf80cba --- /dev/null +++ b/pkg/fdchannel/fdchannel_state_autogen.go @@ -0,0 +1,4 @@ +// automatically generated by stateify. + +package fdchannel + diff --git a/pkg/fdchannel/fdchannel_unsafe.go b/pkg/fdchannel/fdchannel_unsafe.go new file mode 100755 index 000000000..367235be5 --- /dev/null +++ b/pkg/fdchannel/fdchannel_unsafe.go @@ -0,0 +1,146 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris + +// Package fdchannel implements passing file descriptors between processes over +// Unix domain sockets. +package fdchannel + +import ( + "fmt" + "reflect" + "sync/atomic" + "syscall" + "unsafe" +) + +// int32 is the real type of a file descriptor. +const sizeofInt32 = int(unsafe.Sizeof(int32(0))) + +// NewConnectedSockets returns a pair of file descriptors, owned by the caller, +// representing connected sockets that may be passed to separate calls to +// NewEndpoint to create connected Endpoints. +func NewConnectedSockets() ([2]int, error) { + return syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET|syscall.SOCK_CLOEXEC, 0) +} + +// Endpoint sends file descriptors to, and receives them from, another +// connected Endpoint. +// +// Endpoint is not copyable or movable by value. +type Endpoint struct { + sockfd int32 // accessed using atomic memory operations + msghdr syscall.Msghdr + cmsg *syscall.Cmsghdr // followed by sizeofInt32 bytes of data +} + +// Init must be called on zero-value Endpoints before first use. sockfd must be +// a blocking AF_UNIX SOCK_SEQPACKET socket. +func (ep *Endpoint) Init(sockfd int) { + // "Datagram sockets in various domains (e.g., the UNIX and Internet + // domains) permit zero-length datagrams." - recv(2). Experimentally, + // sendmsg+recvmsg for a zero-length datagram is slightly faster than + // sendmsg+recvmsg for a single byte over a stream socket. + cmsgSlice := make([]byte, syscall.CmsgSpace(sizeofInt32)) + cmsgReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&cmsgSlice)) + ep.sockfd = int32(sockfd) + ep.msghdr.Control = (*byte)((unsafe.Pointer)(cmsgReflect.Data)) + ep.cmsg = (*syscall.Cmsghdr)((unsafe.Pointer)(cmsgReflect.Data)) + // ep.msghdr.Controllen and ep.cmsg.* are mutated by recvmsg(2), so they're + // set before calling sendmsg/recvmsg. +} + +// NewEndpoint is a convenience function that returns an initialized Endpoint +// allocated on the heap. +func NewEndpoint(sockfd int) *Endpoint { + ep := &Endpoint{} + ep.Init(sockfd) + return ep +} + +// Destroy releases resources owned by ep. No other Endpoint methods may be +// called after Destroy. +func (ep *Endpoint) Destroy() { + // These need not use sync/atomic since there must not be any concurrent + // calls to Endpoint methods. + if ep.sockfd >= 0 { + syscall.Close(int(ep.sockfd)) + ep.sockfd = -1 + } +} + +// Shutdown causes concurrent and future calls to ep.SendFD(), ep.RecvFD(), and +// ep.RecvFDNonblock(), as well as the same calls in the connected Endpoint, to +// unblock and return errors. It does not wait for concurrent calls to return. +// +// Shutdown is the only Endpoint method that may be called concurrently with +// other methods. +func (ep *Endpoint) Shutdown() { + if sockfd := int(atomic.SwapInt32(&ep.sockfd, -1)); sockfd >= 0 { + syscall.Shutdown(sockfd, syscall.SHUT_RDWR) + syscall.Close(sockfd) + } +} + +// SendFD sends the open file description represented by the given file +// descriptor to the connected Endpoint. +func (ep *Endpoint) SendFD(fd int) error { + cmsgLen := syscall.CmsgLen(sizeofInt32) + ep.cmsg.Level = syscall.SOL_SOCKET + ep.cmsg.Type = syscall.SCM_RIGHTS + ep.cmsg.SetLen(cmsgLen) + *ep.cmsgData() = int32(fd) + ep.msghdr.SetControllen(cmsgLen) + _, _, e := syscall.Syscall(syscall.SYS_SENDMSG, uintptr(atomic.LoadInt32(&ep.sockfd)), uintptr((unsafe.Pointer)(&ep.msghdr)), 0) + if e != 0 { + return e + } + return nil +} + +// RecvFD receives an open file description from the connected Endpoint and +// returns a file descriptor representing it, owned by the caller. +func (ep *Endpoint) RecvFD() (int, error) { + return ep.recvFD(0) +} + +// RecvFDNonblock receives an open file description from the connected Endpoint +// and returns a file descriptor representing it, owned by the caller. If there +// are no pending receivable open file descriptions, RecvFDNonblock returns +// (<unspecified>, EAGAIN or EWOULDBLOCK). +func (ep *Endpoint) RecvFDNonblock() (int, error) { + return ep.recvFD(syscall.MSG_DONTWAIT) +} + +func (ep *Endpoint) recvFD(flags uintptr) (int, error) { + cmsgLen := syscall.CmsgLen(sizeofInt32) + ep.msghdr.SetControllen(cmsgLen) + _, _, e := syscall.Syscall(syscall.SYS_RECVMSG, uintptr(atomic.LoadInt32(&ep.sockfd)), uintptr((unsafe.Pointer)(&ep.msghdr)), flags|syscall.MSG_TRUNC) + if e != 0 { + return -1, e + } + if int(ep.msghdr.Controllen) != cmsgLen { + return -1, fmt.Errorf("received control message has incorrect length: got %d, wanted %d", ep.msghdr.Controllen, cmsgLen) + } + if ep.cmsg.Level != syscall.SOL_SOCKET || ep.cmsg.Type != syscall.SCM_RIGHTS { + return -1, fmt.Errorf("received control message has incorrect (level, type): got (%v, %v), wanted (%v, %v)", ep.cmsg.Level, ep.cmsg.Type, syscall.SOL_SOCKET, syscall.SCM_RIGHTS) + } + return int(*ep.cmsgData()), nil +} + +func (ep *Endpoint) cmsgData() *int32 { + // syscall.CmsgLen(0) == syscall.cmsgAlignOf(syscall.SizeofCmsghdr) + return (*int32)((unsafe.Pointer)(uintptr((unsafe.Pointer)(ep.cmsg)) + uintptr(syscall.CmsgLen(0)))) +} diff --git a/pkg/flipcall/ctrl_futex.go b/pkg/flipcall/ctrl_futex.go new file mode 100755 index 000000000..8390915a2 --- /dev/null +++ b/pkg/flipcall/ctrl_futex.go @@ -0,0 +1,176 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "encoding/json" + "fmt" + "math" + "sync/atomic" + + "gvisor.dev/gvisor/pkg/log" +) + +type endpointControlImpl struct { + state int32 +} + +// Bits in endpointControlImpl.state. +const ( + epsBlocked = 1 << iota + epsShutdown +) + +func (ep *Endpoint) ctrlInit(opts ...EndpointOption) error { + if len(opts) != 0 { + return fmt.Errorf("unknown EndpointOption: %T", opts[0]) + } + return nil +} + +type ctrlHandshakeRequest struct{} + +type ctrlHandshakeResponse struct{} + +func (ep *Endpoint) ctrlConnect() error { + if err := ep.enterFutexWait(); err != nil { + return err + } + _, err := ep.futexConnect(&ctrlHandshakeRequest{}) + ep.exitFutexWait() + return err +} + +func (ep *Endpoint) ctrlWaitFirst() error { + if err := ep.enterFutexWait(); err != nil { + return err + } + defer ep.exitFutexWait() + + // Wait for the handshake request. + if err := ep.futexSwitchFromPeer(); err != nil { + return err + } + + // Read the handshake request. + reqLen := atomic.LoadUint32(ep.dataLen()) + if reqLen > ep.dataCap { + return fmt.Errorf("invalid handshake request length %d (maximum %d)", reqLen, ep.dataCap) + } + var req ctrlHandshakeRequest + if err := json.NewDecoder(ep.NewReader(reqLen)).Decode(&req); err != nil { + return fmt.Errorf("error reading handshake request: %v", err) + } + + // Write the handshake response. + w := ep.NewWriter() + if err := json.NewEncoder(w).Encode(ctrlHandshakeResponse{}); err != nil { + return fmt.Errorf("error writing handshake response: %v", err) + } + *ep.dataLen() = w.Len() + + // Return control to the client. + raceBecomeInactive() + if err := ep.futexSwitchToPeer(); err != nil { + return err + } + + // Wait for the first non-handshake message. + return ep.futexSwitchFromPeer() +} + +func (ep *Endpoint) ctrlRoundTrip() error { + if err := ep.futexSwitchToPeer(); err != nil { + return err + } + if err := ep.enterFutexWait(); err != nil { + return err + } + err := ep.futexSwitchFromPeer() + ep.exitFutexWait() + return err +} + +func (ep *Endpoint) ctrlWakeLast() error { + return ep.futexSwitchToPeer() +} + +func (ep *Endpoint) enterFutexWait() error { + switch eps := atomic.AddInt32(&ep.ctrl.state, epsBlocked); eps { + case epsBlocked: + return nil + case epsBlocked | epsShutdown: + atomic.AddInt32(&ep.ctrl.state, -epsBlocked) + return shutdownError{} + default: + // Most likely due to ep.enterFutexWait() being called concurrently + // from multiple goroutines. + panic(fmt.Sprintf("invalid flipcall.Endpoint.ctrl.state before flipcall.Endpoint.enterFutexWait(): %v", eps-epsBlocked)) + } +} + +func (ep *Endpoint) exitFutexWait() { + switch eps := atomic.AddInt32(&ep.ctrl.state, -epsBlocked); eps { + case 0: + return + case epsShutdown: + // ep.ctrlShutdown() was called while we were blocked, so we are + // repsonsible for indicating connection shutdown. + ep.shutdownConn() + default: + panic(fmt.Sprintf("invalid flipcall.Endpoint.ctrl.state after flipcall.Endpoint.exitFutexWait(): %v", eps+epsBlocked)) + } +} + +func (ep *Endpoint) ctrlShutdown() { + // Set epsShutdown to ensure that future calls to ep.enterFutexWait() fail. + if atomic.AddInt32(&ep.ctrl.state, epsShutdown)&epsBlocked != 0 { + // Wake the blocked thread. This must loop because it's possible that + // FUTEX_WAKE occurs after the waiter sets epsBlocked, but before it + // blocks in FUTEX_WAIT. + for { + // Wake MaxInt32 threads to prevent a broken or malicious peer from + // swallowing our wakeup by FUTEX_WAITing from multiple threads. + if err := ep.futexWakeConnState(math.MaxInt32); err != nil { + log.Warningf("failed to FUTEX_WAKE Endpoints: %v", err) + break + } + yieldThread() + if atomic.LoadInt32(&ep.ctrl.state)&epsBlocked == 0 { + break + } + } + } else { + // There is no blocked thread, so we are responsible for indicating + // connection shutdown. + ep.shutdownConn() + } +} + +func (ep *Endpoint) shutdownConn() { + switch cs := atomic.SwapUint32(ep.connState(), csShutdown); cs { + case ep.activeState: + if err := ep.futexWakeConnState(1); err != nil { + log.Warningf("failed to FUTEX_WAKE peer Endpoint for shutdown: %v", err) + } + case ep.inactiveState: + // The peer is currently active and will detect shutdown when it tries + // to update the connection state. + case csShutdown: + // The peer also called Endpoint.Shutdown(). + default: + log.Warningf("unexpected connection state before Endpoint.shutdownConn(): %v", cs) + } +} diff --git a/pkg/flipcall/flipcall.go b/pkg/flipcall/flipcall.go new file mode 100755 index 000000000..386cee42c --- /dev/null +++ b/pkg/flipcall/flipcall.go @@ -0,0 +1,255 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package flipcall implements a protocol providing Fast Local Interprocess +// Procedure Calls between mutually-distrusting processes. +package flipcall + +import ( + "fmt" + "math" + "sync/atomic" + "syscall" +) + +// An Endpoint provides the ability to synchronously transfer data and control +// to a connected peer Endpoint, which may be in another process. +// +// Since the Endpoint control transfer model is synchronous, at any given time +// one Endpoint "has control" (designated the active Endpoint), and the other +// is "waiting for control" (designated the inactive Endpoint). Users of the +// flipcall package designate one Endpoint as the client, which is initially +// active, and the other as the server, which is initially inactive. See +// flipcall_example_test.go for usage. +type Endpoint struct { + // packet is a pointer to the beginning of the packet window. (Since this + // is a raw OS memory mapping and not a Go object, it does not need to be + // represented as an unsafe.Pointer.) packet is immutable. + packet uintptr + + // dataCap is the size of the datagram part of the packet window in bytes. + // dataCap is immutable. + dataCap uint32 + + // activeState is csClientActive if this is a client Endpoint and + // csServerActive if this is a server Endpoint. + activeState uint32 + + // inactiveState is csServerActive if this is a client Endpoint and + // csClientActive if this is a server Endpoint. + inactiveState uint32 + + // shutdown is non-zero if Endpoint.Shutdown() has been called, or if the + // Endpoint has acknowledged shutdown initiated by the peer. shutdown is + // accessed using atomic memory operations. + shutdown uint32 + + ctrl endpointControlImpl +} + +// EndpointSide indicates which side of a connection an Endpoint belongs to. +type EndpointSide int + +const ( + // ClientSide indicates that an Endpoint is a client (initially-active; + // first method call should be Connect). + ClientSide EndpointSide = iota + + // ServerSide indicates that an Endpoint is a server (initially-inactive; + // first method call should be RecvFirst.) + ServerSide +) + +// Init must be called on zero-value Endpoints before first use. If it +// succeeds, ep.Destroy() must be called once the Endpoint is no longer in use. +// +// pwd represents the packet window used to exchange data with the peer +// Endpoint. FD may differ between Endpoints if they are in different +// processes, but must represent the same file. The packet window must +// initially be filled with zero bytes. +func (ep *Endpoint) Init(side EndpointSide, pwd PacketWindowDescriptor, opts ...EndpointOption) error { + switch side { + case ClientSide: + ep.activeState = csClientActive + ep.inactiveState = csServerActive + case ServerSide: + ep.activeState = csServerActive + ep.inactiveState = csClientActive + default: + return fmt.Errorf("invalid EndpointSide: %v", side) + } + if pwd.Length < pageSize { + return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize) + } + if pwd.Length > math.MaxUint32 { + return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32) + } + m, _, e := syscall.RawSyscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset)) + if e != 0 { + return fmt.Errorf("failed to mmap packet window: %v", e) + } + ep.packet = m + ep.dataCap = uint32(pwd.Length) - uint32(PacketHeaderBytes) + if err := ep.ctrlInit(opts...); err != nil { + ep.unmapPacket() + return err + } + return nil +} + +// NewEndpoint is a convenience function that returns an initialized Endpoint +// allocated on the heap. +func NewEndpoint(side EndpointSide, pwd PacketWindowDescriptor, opts ...EndpointOption) (*Endpoint, error) { + var ep Endpoint + if err := ep.Init(side, pwd, opts...); err != nil { + return nil, err + } + return &ep, nil +} + +// An EndpointOption configures an Endpoint. +type EndpointOption interface { + isEndpointOption() +} + +// Destroy releases resources owned by ep. No other Endpoint methods may be +// called after Destroy. +func (ep *Endpoint) Destroy() { + ep.unmapPacket() +} + +func (ep *Endpoint) unmapPacket() { + syscall.RawSyscall(syscall.SYS_MUNMAP, ep.packet, uintptr(ep.dataCap)+PacketHeaderBytes, 0) + ep.packet = 0 +} + +// Shutdown causes concurrent and future calls to ep.Connect(), ep.SendRecv(), +// ep.RecvFirst(), and ep.SendLast(), as well as the same calls in the peer +// Endpoint, to unblock and return errors. It does not wait for concurrent +// calls to return. Successive calls to Shutdown have no effect. +// +// Shutdown is the only Endpoint method that may be called concurrently with +// other methods on the same Endpoint. +func (ep *Endpoint) Shutdown() { + if atomic.SwapUint32(&ep.shutdown, 1) != 0 { + // ep.Shutdown() has previously been called. + return + } + ep.ctrlShutdown() +} + +// isShutdownLocally returns true if ep.Shutdown() has been called. +func (ep *Endpoint) isShutdownLocally() bool { + return atomic.LoadUint32(&ep.shutdown) != 0 +} + +type shutdownError struct{} + +// Error implements error.Error. +func (shutdownError) Error() string { + return "flipcall connection shutdown" +} + +// DataCap returns the maximum datagram size supported by ep. Equivalently, +// DataCap returns len(ep.Data()). +func (ep *Endpoint) DataCap() uint32 { + return ep.dataCap +} + +// Connection state. +const ( + // The client is, by definition, initially active, so this must be 0. + csClientActive = 0 + csServerActive = 1 + csShutdown = 2 +) + +// Connect blocks until the peer Endpoint has called Endpoint.RecvFirst(). +// +// Preconditions: ep is a client Endpoint. ep.Connect(), ep.RecvFirst(), +// ep.SendRecv(), and ep.SendLast() have never been called. +func (ep *Endpoint) Connect() error { + err := ep.ctrlConnect() + if err == nil { + raceBecomeActive() + } + return err +} + +// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then +// returns the datagram length specified by that call. +// +// Preconditions: ep is a server Endpoint. ep.SendRecv(), ep.RecvFirst(), and +// ep.SendLast() have never been called. +func (ep *Endpoint) RecvFirst() (uint32, error) { + if err := ep.ctrlWaitFirst(); err != nil { + return 0, err + } + raceBecomeActive() + recvDataLen := atomic.LoadUint32(ep.dataLen()) + if recvDataLen > ep.dataCap { + return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap) + } + return recvDataLen, nil +} + +// SendRecv transfers control to the peer Endpoint, causing its call to +// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given +// datagram length, then blocks until the peer Endpoint calls +// Endpoint.SendRecv() or Endpoint.SendLast(). +// +// Preconditions: dataLen <= ep.DataCap(). No previous call to ep.SendRecv() or +// ep.RecvFirst() has returned an error. ep.SendLast() has never been called. +// If ep is a client Endpoint, ep.Connect() has previously been called and +// returned nil. +func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) { + if dataLen > ep.dataCap { + panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap)) + } + // This store can safely be non-atomic: Under correct operation we should + // be the only thread writing ep.dataLen(), and ep.ctrlRoundTrip() will + // synchronize with the receiver. We will not read from ep.dataLen() until + // after ep.ctrlRoundTrip(), so if the peer is mutating it concurrently then + // they can only shoot themselves in the foot. + *ep.dataLen() = dataLen + raceBecomeInactive() + if err := ep.ctrlRoundTrip(); err != nil { + return 0, err + } + raceBecomeActive() + recvDataLen := atomic.LoadUint32(ep.dataLen()) + if recvDataLen > ep.dataCap { + return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, ep.dataCap) + } + return recvDataLen, nil +} + +// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or +// Endpoint.RecvFirst() to return with the given datagram length. +// +// Preconditions: dataLen <= ep.DataCap(). No previous call to ep.SendRecv() or +// ep.RecvFirst() has returned an error. ep.SendLast() has never been called. +// If ep is a client Endpoint, ep.Connect() has previously been called and +// returned nil. +func (ep *Endpoint) SendLast(dataLen uint32) error { + if dataLen > ep.dataCap { + panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap)) + } + *ep.dataLen() = dataLen + raceBecomeInactive() + if err := ep.ctrlWakeLast(); err != nil { + return err + } + return nil +} diff --git a/pkg/flipcall/flipcall_state_autogen.go b/pkg/flipcall/flipcall_state_autogen.go new file mode 100755 index 000000000..e9371b536 --- /dev/null +++ b/pkg/flipcall/flipcall_state_autogen.go @@ -0,0 +1,4 @@ +// automatically generated by stateify. + +package flipcall + diff --git a/pkg/flipcall/flipcall_unsafe.go b/pkg/flipcall/flipcall_unsafe.go new file mode 100755 index 000000000..a37952637 --- /dev/null +++ b/pkg/flipcall/flipcall_unsafe.go @@ -0,0 +1,87 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "reflect" + "unsafe" + + "gvisor.dev/gvisor/third_party/gvsync" +) + +// Packets consist of a 16-byte header followed by an arbitrarily-sized +// datagram. The header consists of: +// +// - A 4-byte native-endian connection state. +// +// - A 4-byte native-endian datagram length in bytes. +// +// - 8 reserved bytes. +const ( + // PacketHeaderBytes is the size of a flipcall packet header in bytes. The + // maximum datagram size supported by a flipcall connection is equal to the + // length of the packet window minus PacketHeaderBytes. + // + // PacketHeaderBytes is exported to support its use in constant + // expressions. Non-constant expressions may prefer to use + // PacketWindowLengthForDataCap(). + PacketHeaderBytes = 16 +) + +func (ep *Endpoint) connState() *uint32 { + return (*uint32)((unsafe.Pointer)(ep.packet)) +} + +func (ep *Endpoint) dataLen() *uint32 { + return (*uint32)((unsafe.Pointer)(ep.packet + 4)) +} + +// Data returns the datagram part of ep's packet window as a byte slice. +// +// Note that the packet window is shared with the potentially-untrusted peer +// Endpoint, which may concurrently mutate the contents of the packet window. +// Thus: +// +// - Readers must not assume that two reads of the same byte in Data() will +// return the same result. In other words, readers should read any given byte +// in Data() at most once. +// +// - Writers must not assume that they will read back the same data that they +// have written. In other words, writers should avoid reading from Data() at +// all. +func (ep *Endpoint) Data() []byte { + var bs []byte + bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs)) + bsReflect.Data = ep.packet + PacketHeaderBytes + bsReflect.Len = int(ep.dataCap) + bsReflect.Cap = int(ep.dataCap) + return bs +} + +// ioSync is a dummy variable used to indicate synchronization to the Go race +// detector. Compare syscall.ioSync. +var ioSync int64 + +func raceBecomeActive() { + if gvsync.RaceEnabled { + gvsync.RaceAcquire((unsafe.Pointer)(&ioSync)) + } +} + +func raceBecomeInactive() { + if gvsync.RaceEnabled { + gvsync.RaceReleaseMerge((unsafe.Pointer)(&ioSync)) + } +} diff --git a/pkg/flipcall/futex_linux.go b/pkg/flipcall/futex_linux.go new file mode 100755 index 000000000..b127a2bbb --- /dev/null +++ b/pkg/flipcall/futex_linux.go @@ -0,0 +1,118 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package flipcall + +import ( + "encoding/json" + "fmt" + "runtime" + "sync/atomic" + "syscall" + + "gvisor.dev/gvisor/pkg/abi/linux" +) + +func (ep *Endpoint) futexConnect(req *ctrlHandshakeRequest) (ctrlHandshakeResponse, error) { + var resp ctrlHandshakeResponse + + // Write the handshake request. + w := ep.NewWriter() + if err := json.NewEncoder(w).Encode(req); err != nil { + return resp, fmt.Errorf("error writing handshake request: %v", err) + } + *ep.dataLen() = w.Len() + + // Exchange control with the server. + if err := ep.futexSwitchToPeer(); err != nil { + return resp, err + } + if err := ep.futexSwitchFromPeer(); err != nil { + return resp, err + } + + // Read the handshake response. + respLen := atomic.LoadUint32(ep.dataLen()) + if respLen > ep.dataCap { + return resp, fmt.Errorf("invalid handshake response length %d (maximum %d)", respLen, ep.dataCap) + } + if err := json.NewDecoder(ep.NewReader(respLen)).Decode(&resp); err != nil { + return resp, fmt.Errorf("error reading handshake response: %v", err) + } + + return resp, nil +} + +func (ep *Endpoint) futexSwitchToPeer() error { + // Update connection state to indicate that the peer should be active. + if !atomic.CompareAndSwapUint32(ep.connState(), ep.activeState, ep.inactiveState) { + switch cs := atomic.LoadUint32(ep.connState()); cs { + case csShutdown: + return shutdownError{} + default: + return fmt.Errorf("unexpected connection state before FUTEX_WAKE: %v", cs) + } + } + + // Wake the peer's Endpoint.futexSwitchFromPeer(). + if err := ep.futexWakeConnState(1); err != nil { + return fmt.Errorf("failed to FUTEX_WAKE peer Endpoint: %v", err) + } + return nil +} + +func (ep *Endpoint) futexSwitchFromPeer() error { + for { + switch cs := atomic.LoadUint32(ep.connState()); cs { + case ep.activeState: + return nil + case ep.inactiveState: + if ep.isShutdownLocally() { + return shutdownError{} + } + if err := ep.futexWaitConnState(ep.inactiveState); err != nil { + return fmt.Errorf("failed to FUTEX_WAIT for peer Endpoint: %v", err) + } + continue + case csShutdown: + return shutdownError{} + default: + return fmt.Errorf("unexpected connection state before FUTEX_WAIT: %v", cs) + } + } +} + +func (ep *Endpoint) futexWakeConnState(numThreads int32) error { + if _, _, e := syscall.RawSyscall(syscall.SYS_FUTEX, ep.packet, linux.FUTEX_WAKE, uintptr(numThreads)); e != 0 { + return e + } + return nil +} + +func (ep *Endpoint) futexWaitConnState(curState uint32) error { + _, _, e := syscall.Syscall6(syscall.SYS_FUTEX, ep.packet, linux.FUTEX_WAIT, uintptr(curState), 0, 0, 0) + if e != 0 && e != syscall.EAGAIN && e != syscall.EINTR { + return e + } + return nil +} + +func yieldThread() { + syscall.Syscall(syscall.SYS_SCHED_YIELD, 0, 0, 0) + // The thread we're trying to yield to may be waiting for a Go runtime P. + // runtime.Gosched() will hand off ours if necessary. + runtime.Gosched() +} diff --git a/pkg/flipcall/io.go b/pkg/flipcall/io.go new file mode 100755 index 000000000..85e40b932 --- /dev/null +++ b/pkg/flipcall/io.go @@ -0,0 +1,113 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "fmt" + "io" +) + +// DatagramReader implements io.Reader by reading a datagram from an Endpoint's +// packet window. Its use is optional; users that can use Endpoint.Data() more +// efficiently are advised to do so. +type DatagramReader struct { + ep *Endpoint + off uint32 + end uint32 +} + +// Init must be called on zero-value DatagramReaders before first use. +// +// Preconditions: dataLen is 0, or was returned by a previous call to +// ep.RecvFirst() or ep.SendRecv(). +func (r *DatagramReader) Init(ep *Endpoint, dataLen uint32) { + r.ep = ep + r.Reset(dataLen) +} + +// Reset causes r to begin reading a new datagram of the given length from the +// associated Endpoint. +// +// Preconditions: dataLen is 0, or was returned by a previous call to the +// associated Endpoint's RecvFirst() or SendRecv() methods. +func (r *DatagramReader) Reset(dataLen uint32) { + if dataLen > r.ep.dataCap { + panic(fmt.Sprintf("invalid dataLen (%d) > ep.dataCap (%d)", dataLen, r.ep.dataCap)) + } + r.off = 0 + r.end = dataLen +} + +// NewReader is a convenience function that returns an initialized +// DatagramReader allocated on the heap. +// +// Preconditions: dataLen was returned by a previous call to ep.RecvFirst() or +// ep.SendRecv(). +func (ep *Endpoint) NewReader(dataLen uint32) *DatagramReader { + r := &DatagramReader{} + r.Init(ep, dataLen) + return r +} + +// Read implements io.Reader.Read. +func (r *DatagramReader) Read(dst []byte) (int, error) { + n := copy(dst, r.ep.Data()[r.off:r.end]) + r.off += uint32(n) + if r.off == r.end { + return n, io.EOF + } + return n, nil +} + +// DatagramWriter implements io.Writer by writing a datagram to an Endpoint's +// packet window. Its use is optional; users that can use Endpoint.Data() more +// efficiently are advised to do so. +type DatagramWriter struct { + ep *Endpoint + off uint32 +} + +// Init must be called on zero-value DatagramWriters before first use. +func (w *DatagramWriter) Init(ep *Endpoint) { + w.ep = ep +} + +// Reset causes w to begin writing a new datagram to the associated Endpoint. +func (w *DatagramWriter) Reset() { + w.off = 0 +} + +// NewWriter is a convenience function that returns an initialized +// DatagramWriter allocated on the heap. +func (ep *Endpoint) NewWriter() *DatagramWriter { + w := &DatagramWriter{} + w.Init(ep) + return w +} + +// Write implements io.Writer.Write. +func (w *DatagramWriter) Write(src []byte) (int, error) { + n := copy(w.ep.Data()[w.off:w.ep.dataCap], src) + w.off += uint32(n) + if n != len(src) { + return n, fmt.Errorf("datagram would exceed maximum size of %d bytes", w.ep.dataCap) + } + return n, nil +} + +// Len returns the length of the written datagram. +func (w *DatagramWriter) Len() uint32 { + return w.off +} diff --git a/pkg/flipcall/packet_window_allocator.go b/pkg/flipcall/packet_window_allocator.go new file mode 100755 index 000000000..ccb918fab --- /dev/null +++ b/pkg/flipcall/packet_window_allocator.go @@ -0,0 +1,166 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flipcall + +import ( + "fmt" + "math/bits" + "os" + "syscall" + + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/memutil" +) + +var ( + pageSize = os.Getpagesize() + pageMask = pageSize - 1 +) + +func init() { + if bits.OnesCount(uint(pageSize)) != 1 { + // This is depended on by roundUpToPage(). + panic(fmt.Sprintf("system page size (%d) is not a power of 2", pageSize)) + } + if uintptr(pageSize) < PacketHeaderBytes { + // This is required since Endpoint.Init() imposes a minimum packet + // window size of 1 page. + panic(fmt.Sprintf("system page size (%d) is less than packet header size (%d)", pageSize, PacketHeaderBytes)) + } +} + +// PacketWindowDescriptor represents a packet window, a range of pages in a +// shared memory file that is used to exchange packets between partner +// Endpoints. +type PacketWindowDescriptor struct { + // FD is the file descriptor representing the shared memory file. + FD int + + // Offset is the offset into the shared memory file at which the packet + // window begins. + Offset int64 + + // Length is the size of the packet window in bytes. + Length int +} + +// PacketWindowLengthForDataCap returns the minimum packet window size required +// to accommodate datagrams of the given size in bytes. +func PacketWindowLengthForDataCap(dataCap uint32) int { + return roundUpToPage(int(dataCap) + int(PacketHeaderBytes)) +} + +func roundUpToPage(x int) int { + return (x + pageMask) &^ pageMask +} + +// A PacketWindowAllocator owns a shared memory file, and allocates packet +// windows from it. +type PacketWindowAllocator struct { + fd int + nextAlloc int64 + fileSize int64 +} + +// Init must be called on zero-value PacketWindowAllocators before first use. +// If it succeeds, Destroy() must be called once the PacketWindowAllocator is +// no longer in use. +func (pwa *PacketWindowAllocator) Init() error { + fd, err := memutil.CreateMemFD("flipcall_packet_windows", linux.MFD_CLOEXEC|linux.MFD_ALLOW_SEALING) + if err != nil { + return fmt.Errorf("failed to create memfd: %v", err) + } + // Apply F_SEAL_SHRINK to prevent either party from causing SIGBUS in the + // other by truncating the file, and F_SEAL_SEAL to prevent either party + // from applying F_SEAL_GROW or F_SEAL_WRITE. + if _, _, e := syscall.RawSyscall(syscall.SYS_FCNTL, uintptr(fd), linux.F_ADD_SEALS, linux.F_SEAL_SHRINK|linux.F_SEAL_SEAL); e != 0 { + syscall.Close(fd) + return fmt.Errorf("failed to apply memfd seals: %v", e) + } + pwa.fd = fd + return nil +} + +// NewPacketWindowAllocator is a convenience function that returns an +// initialized PacketWindowAllocator allocated on the heap. +func NewPacketWindowAllocator() (*PacketWindowAllocator, error) { + var pwa PacketWindowAllocator + if err := pwa.Init(); err != nil { + return nil, err + } + return &pwa, nil +} + +// Destroy releases resources owned by pwa. This invalidates file descriptors +// previously returned by pwa.FD() and pwd.Allocate(). +func (pwa *PacketWindowAllocator) Destroy() { + syscall.Close(pwa.fd) +} + +// FD represents the file descriptor of the shared memory file backing pwa. +func (pwa *PacketWindowAllocator) FD() int { + return pwa.fd +} + +// Allocate allocates a new packet window of at least the given size and +// returns a PacketWindowDescriptor representing it. +// +// Preconditions: size > 0. +func (pwa *PacketWindowAllocator) Allocate(size int) (PacketWindowDescriptor, error) { + if size <= 0 { + return PacketWindowDescriptor{}, fmt.Errorf("invalid size: %d", size) + } + // Page-align size to ensure that pwa.nextAlloc remains page-aligned. + size = roundUpToPage(size) + if size <= 0 { + return PacketWindowDescriptor{}, fmt.Errorf("size %d overflows after rounding up to page size", size) + } + end := pwa.nextAlloc + int64(size) // overflow checked by ensureFileSize + if err := pwa.ensureFileSize(end); err != nil { + return PacketWindowDescriptor{}, err + } + start := pwa.nextAlloc + pwa.nextAlloc = end + return PacketWindowDescriptor{ + FD: pwa.fd, + Offset: start, + Length: size, + }, nil +} + +func (pwa *PacketWindowAllocator) ensureFileSize(min int64) error { + if min <= 0 { + return fmt.Errorf("file size would overflow") + } + if pwa.fileSize >= min { + return nil + } + newSize := 2 * pwa.fileSize + if newSize == 0 { + newSize = int64(pageSize) + } + for newSize < min { + newNewSize := newSize * 2 + if newNewSize <= 0 { + return fmt.Errorf("file size would overflow") + } + newSize = newNewSize + } + if err := syscall.Ftruncate(pwa.fd, newSize); err != nil { + return fmt.Errorf("ftruncate failed: %v", err) + } + pwa.fileSize = newSize + return nil +} diff --git a/pkg/p9/client.go b/pkg/p9/client.go index 7dc20aeef..123f54e29 100644 --- a/pkg/p9/client.go +++ b/pkg/p9/client.go @@ -20,6 +20,8 @@ import ( "sync" "syscall" + "golang.org/x/sys/unix" + "gvisor.dev/gvisor/pkg/flipcall" "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/unet" ) @@ -77,6 +79,45 @@ type Client struct { // fidPool is the collection of available fids. fidPool pool + // messageSize is the maximum total size of a message. + messageSize uint32 + + // payloadSize is the maximum payload size of a read or write. + // + // For large reads and writes this means that the read or write is + // broken up into buffer-size/payloadSize requests. + payloadSize uint32 + + // version is the agreed upon version X of 9P2000.L.Google.X. + // version 0 implies 9P2000.L. + version uint32 + + // sendRecv is the transport function. + // + // This is determined dynamically based on whether or not the server + // supports flipcall channels (preferred as it is faster and more + // efficient, and does not require tags). + sendRecv func(message, message) error + + // -- below corresponds to sendRecvChannel -- + + // channelsMu protects channels. + channelsMu sync.Mutex + + // channelsWg is a wait group for active clients. + channelsWg sync.WaitGroup + + // channels are the set of initialized IPCs channels. + channels []*channel + + // inuse is set when the channels are actually in use. + // + // This is a fixed-size slice, and the entries will be nil when the + // corresponding channel is available. + inuse []*channel + + // -- below corresponds to sendRecvLegacy -- + // pending is the set of pending messages. pending map[Tag]*response pendingMu sync.Mutex @@ -89,19 +130,6 @@ type Client struct { // Whoever writes to this channel is permitted to call recv. When // finished calling recv, this channel should be emptied. recvr chan bool - - // messageSize is the maximum total size of a message. - messageSize uint32 - - // payloadSize is the maximum payload size of a read or write - // request. For large reads and writes this means that the - // read or write is broken up into buffer-size/payloadSize - // requests. - payloadSize uint32 - - // version is the agreed upon version X of 9P2000.L.Google.X. - // version 0 implies 9P2000.L. - version uint32 } // NewClient creates a new client. It performs a Tversion exchange with @@ -138,8 +166,15 @@ func NewClient(socket *unet.Socket, messageSize uint32, version string) (*Client return nil, ErrBadVersionString } for { + // Always exchange the version using the legacy version of the + // protocol. If the protocol supports flipcall, then we switch + // our sendRecv function to use that functionality. Otherwise, + // we stick to sendRecvLegacy. rversion := Rversion{} - err := c.sendRecv(&Tversion{Version: versionString(requested), MSize: messageSize}, &rversion) + err := c.sendRecvLegacy(&Tversion{ + Version: versionString(requested), + MSize: messageSize, + }, &rversion) // The server told us to try again with a lower version. if err == syscall.EAGAIN { @@ -165,9 +200,125 @@ func NewClient(socket *unet.Socket, messageSize uint32, version string) (*Client c.version = version break } + + // Can we switch to use the more advanced channels and create + // independent channels for communication? Prefer it if possible. + if versionSupportsFlipcall(c.version) { + // Attempt to initialize IPC-based communication. + for i := 0; i < channelsPerClient; i++ { + if err := c.openChannel(i); err != nil { + log.Warningf("error opening flipcall channel: %v", err) + break // Stop. + } + } + if len(c.channels) >= 1 { + // At least one channel created. + c.sendRecv = c.sendRecvChannel + + // If we are using channels for communication, then we must poll + // for shutdown events on the main socket. If the socket happens + // to shutdown, then we will close the channels as well. This is + // necessary because channels can hang forever if the server dies + // while we're expecting a response. + go c.watch(socket) // S/R-SAFE: not relevant. + } else { + // Channel setup failed; fallback. + c.sendRecv = c.sendRecvLegacy + } + } else { + // No channels available: use the legacy mechanism. + c.sendRecv = c.sendRecvLegacy + } + return c, nil } +// watch watches the given socket and calls Close on hang up events. +// +// This is intended to be called as a goroutine. +func (c *Client) watch(socket *unet.Socket) { + events := []unix.PollFd{ + unix.PollFd{ + Fd: int32(socket.FD()), + Events: unix.POLLHUP | unix.POLLRDHUP, + }, + } + + for { + // Wait for a shutdown event. + n, err := unix.Ppoll(events, nil, nil) + if n == 0 || err == syscall.EAGAIN { + continue + } + break + } + + // Close everything down: this will kick all active clients off any + // pending requests. Note that Close must be safe to call concurrently, + // and multiple times (see Close below). + c.Close() +} + +// openChannel attempts to open a client channel. +// +// Note that this function returns naked errors which should not be propagated +// directly to a caller. It is expected that the errors will be logged and a +// fallback path will be used instead. +func (c *Client) openChannel(id int) error { + var ( + rchannel0 Rchannel + rchannel1 Rchannel + res = new(channel) + ) + + // Open the data channel. + if err := c.sendRecvLegacy(&Tchannel{ + ID: uint32(id), + Control: 0, + }, &rchannel0); err != nil { + return fmt.Errorf("error handling Tchannel message: %v", err) + } + if rchannel0.FilePayload() == nil { + return fmt.Errorf("missing file descriptor on primary channel") + } + + // We don't need to hold this. + defer rchannel0.FilePayload().Close() + + // Open the channel for file descriptors. + if err := c.sendRecvLegacy(&Tchannel{ + ID: uint32(id), + Control: 1, + }, &rchannel1); err != nil { + return err + } + if rchannel1.FilePayload() == nil { + return fmt.Errorf("missing file descriptor on file descriptor channel") + } + + // Construct the endpoints. + res.desc = flipcall.PacketWindowDescriptor{ + FD: rchannel0.FilePayload().FD(), + Offset: int64(rchannel0.Offset), + Length: int(rchannel0.Length), + } + if err := res.data.Init(flipcall.ClientSide, res.desc); err != nil { + rchannel1.FilePayload().Close() + return err + } + + // The fds channel owns the control payload, and it will be closed when + // the channel object is closed. + res.fds.Init(rchannel1.FilePayload().Release()) + + // Save the channel. + c.channelsMu.Lock() + defer c.channelsMu.Unlock() + c.channels = append(c.channels, res) + c.inuse = append(c.inuse, nil) + return nil +} + // handleOne handles a single incoming message. // // This should only be called with the token from recvr. Note that the received @@ -247,10 +398,10 @@ func (c *Client) waitAndRecv(done chan error) error { } } -// sendRecv performs a roundtrip message exchange. +// sendRecvLegacy performs a roundtrip message exchange. // // This is called by internal functions. -func (c *Client) sendRecv(t message, r message) error { +func (c *Client) sendRecvLegacy(t message, r message) error { tag, ok := c.tagPool.Get() if !ok { return ErrOutOfTags @@ -296,12 +447,107 @@ func (c *Client) sendRecv(t message, r message) error { return nil } +// sendRecvChannel uses channels to send a message. +func (c *Client) sendRecvChannel(t message, r message) error { + c.channelsMu.Lock() + if len(c.channels) == 0 { + // No channel available. + c.channelsMu.Unlock() + return c.sendRecvLegacy(t, r) + } + + // Find the last used channel. + // + // Note that we must add one to the wait group while holding the + // channel mutex, in order for the Wait operation to be race-free + // below. The Wait operation shuts down all in use channels and + // waits for them to return, but must do so holding the mutex. + idx := len(c.channels) - 1 + ch := c.channels[idx] + c.channels = c.channels[:idx] + c.inuse[idx] = ch + c.channelsWg.Add(1) + c.channelsMu.Unlock() + + // Ensure that it's connected. + if !ch.connected { + ch.connected = true + if err := ch.data.Connect(); err != nil { + // The channel is unusable, so don't return it. + ch.Close() + c.channelsWg.Done() + return err + } + } + + // Send the message. + err := ch.sendRecv(c, t, r) + if err != nil { + // On shutdown, we'll see ENOENT. This is a normal situation, and + // we shouldn't generate a spurious warning message in that case. + log.Debugf("error calling sendRecvChannel: %v", err) + } + c.channelsWg.Done() + + // Return the channel. + // + // Note that we check the channel from the inuse slice here. This + // prevents a race where Close is called, which clears inuse, and + // means that we will not actually return the closed channel. + c.channelsMu.Lock() + if c.inuse[idx] != nil { + c.channels = append(c.channels, ch) + c.inuse[idx] = nil + } + c.channelsMu.Unlock() + + return err +} + // Version returns the negotiated 9P2000.L.Google version number. func (c *Client) Version() uint32 { return c.version } -// Close closes the underlying socket. +// Close closes the underlying socket and channels. +// +// Because Close may be called asynchronously from watch, it must be +// safe to call concurrently and multiple times. func (c *Client) Close() error { + c.channelsMu.Lock() + defer c.channelsMu.Unlock() + + // Close all inactive channels. + for _, ch := range c.channels { + ch.Shutdown() + ch.Close() + } + // Close all active channels. + for _, ch := range c.inuse { + if ch != nil { + log.Debugf("shutting down active channel@%p...", ch) + ch.Shutdown() + } + } + + // Wait for active users. + c.channelsWg.Wait() + + // Close all previously active channels. + for i, ch := range c.inuse { + if ch != nil { + ch.Close() + + // Clear the inuse entry here so that it will not be returned + // to the channel slice, which is cleared below. See the + // comment at the end of sendRecvChannel. + c.inuse[i] = nil + } + } + c.channels = nil // Prevent use again. + + // Close the main socket. Note that operation is safe to be called + // multiple times, unlikely the channel Close operations above, which + // we are careful to ensure aren't called twice. return c.socket.Close() } diff --git a/pkg/p9/handlers.go b/pkg/p9/handlers.go index 999b4f684..ba9a55d6d 100644 --- a/pkg/p9/handlers.go +++ b/pkg/p9/handlers.go @@ -305,7 +305,9 @@ func (t *Tlopen) handle(cs *connState) message { ref.opened = true ref.openFlags = t.Flags - return &Rlopen{QID: qid, IoUnit: ioUnit, File: osFile} + rlopen := &Rlopen{QID: qid, IoUnit: ioUnit} + rlopen.SetFilePayload(osFile) + return rlopen } func (t *Tlcreate) do(cs *connState, uid UID) (*Rlcreate, error) { @@ -364,7 +366,9 @@ func (t *Tlcreate) do(cs *connState, uid UID) (*Rlcreate, error) { // Replace the FID reference. cs.InsertFID(t.FID, newRef) - return &Rlcreate{Rlopen: Rlopen{QID: qid, IoUnit: ioUnit, File: osFile}}, nil + rlcreate := &Rlcreate{Rlopen: Rlopen{QID: qid, IoUnit: ioUnit}} + rlcreate.SetFilePayload(osFile) + return rlcreate, nil } // handle implements handler.handle. @@ -1287,5 +1291,48 @@ func (t *Tlconnect) handle(cs *connState) message { return newErr(err) } - return &Rlconnect{File: osFile} + rlconnect := &Rlconnect{} + rlconnect.SetFilePayload(osFile) + return rlconnect +} + +// handle implements handler.handle. +func (t *Tchannel) handle(cs *connState) message { + // Ensure that channels are enabled. + if err := cs.initializeChannels(); err != nil { + return newErr(err) + } + + // Lookup the given channel. + ch := cs.lookupChannel(t.ID) + if ch == nil { + return newErr(syscall.ENOSYS) + } + + // Return the payload. Note that we need to duplicate the file + // descriptor for the channel allocator, because sending is a + // destructive operation between sendRecvLegacy (and now the newer + // channel send operations). Same goes for the client FD. + rchannel := &Rchannel{ + Offset: uint64(ch.desc.Offset), + Length: uint64(ch.desc.Length), + } + switch t.Control { + case 0: + // Open the main data channel. + mfd, err := syscall.Dup(int(cs.channelAlloc.FD())) + if err != nil { + return newErr(err) + } + rchannel.SetFilePayload(fd.New(mfd)) + case 1: + cfd, err := syscall.Dup(ch.client.FD()) + if err != nil { + return newErr(err) + } + rchannel.SetFilePayload(fd.New(cfd)) + default: + return newErr(syscall.EINVAL) + } + return rchannel } diff --git a/pkg/p9/messages.go b/pkg/p9/messages.go index fd9eb1c5d..ffdd7e8c6 100644 --- a/pkg/p9/messages.go +++ b/pkg/p9/messages.go @@ -64,6 +64,21 @@ type filer interface { SetFilePayload(*fd.FD) } +// filePayload embeds a File object. +type filePayload struct { + File *fd.FD +} + +// FilePayload returns the file payload. +func (f *filePayload) FilePayload() *fd.FD { + return f.File +} + +// SetFilePayload sets the received file. +func (f *filePayload) SetFilePayload(file *fd.FD) { + f.File = file +} + // Tversion is a version request. type Tversion struct { // MSize is the message size to use. @@ -524,10 +539,7 @@ type Rlopen struct { // IoUnit is the recommended I/O unit. IoUnit uint32 - // File may be attached via the socket. - // - // This is an extension specific to this package. - File *fd.FD + filePayload } // Decode implements encoder.Decode. @@ -547,16 +559,6 @@ func (*Rlopen) Type() MsgType { return MsgRlopen } -// FilePayload returns the file payload. -func (r *Rlopen) FilePayload() *fd.FD { - return r.File -} - -// SetFilePayload sets the received file. -func (r *Rlopen) SetFilePayload(file *fd.FD) { - r.File = file -} - // String implements fmt.Stringer. func (r *Rlopen) String() string { return fmt.Sprintf("Rlopen{QID: %s, IoUnit: %d, File: %v}", r.QID, r.IoUnit, r.File) @@ -2171,8 +2173,7 @@ func (t *Tlconnect) String() string { // Rlconnect is a connect response. type Rlconnect struct { - // File is a host socket. - File *fd.FD + filePayload } // Decode implements encoder.Decode. @@ -2186,19 +2187,71 @@ func (*Rlconnect) Type() MsgType { return MsgRlconnect } -// FilePayload returns the file payload. -func (r *Rlconnect) FilePayload() *fd.FD { - return r.File +// String implements fmt.Stringer. +func (r *Rlconnect) String() string { + return fmt.Sprintf("Rlconnect{File: %v}", r.File) } -// SetFilePayload sets the received file. -func (r *Rlconnect) SetFilePayload(file *fd.FD) { - r.File = file +// Tchannel creates a new channel. +type Tchannel struct { + // ID is the channel ID. + ID uint32 + + // Control is 0 if the Rchannel response should provide the flipcall + // component of the channel, and 1 if the Rchannel response should + // provide the fdchannel component of the channel. + Control uint32 +} + +// Decode implements encoder.Decode. +func (t *Tchannel) Decode(b *buffer) { + t.ID = b.Read32() + t.Control = b.Read32() +} + +// Encode implements encoder.Encode. +func (t *Tchannel) Encode(b *buffer) { + b.Write32(t.ID) + b.Write32(t.Control) +} + +// Type implements message.Type. +func (*Tchannel) Type() MsgType { + return MsgTchannel } // String implements fmt.Stringer. -func (r *Rlconnect) String() string { - return fmt.Sprintf("Rlconnect{File: %v}", r.File) +func (t *Tchannel) String() string { + return fmt.Sprintf("Tchannel{ID: %d, Control: %d}", t.ID, t.Control) +} + +// Rchannel is the channel response. +type Rchannel struct { + Offset uint64 + Length uint64 + filePayload +} + +// Decode implements encoder.Decode. +func (r *Rchannel) Decode(b *buffer) { + r.Offset = b.Read64() + r.Length = b.Read64() +} + +// Encode implements encoder.Encode. +func (r *Rchannel) Encode(b *buffer) { + b.Write64(r.Offset) + b.Write64(r.Length) +} + +// Type implements message.Type. +func (*Rchannel) Type() MsgType { + return MsgRchannel +} + +// String implements fmt.Stringer. +func (r *Rchannel) String() string { + return fmt.Sprintf("Rchannel{Offset: %d, Length: %d}", r.Offset, r.Length) } const maxCacheSize = 3 @@ -2356,4 +2409,6 @@ func init() { msgRegistry.register(MsgRlconnect, func() message { return &Rlconnect{} }) msgRegistry.register(MsgTallocate, func() message { return &Tallocate{} }) msgRegistry.register(MsgRallocate, func() message { return &Rallocate{} }) + msgRegistry.register(MsgTchannel, func() message { return &Tchannel{} }) + msgRegistry.register(MsgRchannel, func() message { return &Rchannel{} }) } diff --git a/pkg/p9/p9.go b/pkg/p9/p9.go index e12831dbd..25530adca 100644 --- a/pkg/p9/p9.go +++ b/pkg/p9/p9.go @@ -378,6 +378,8 @@ const ( MsgRlconnect = 137 MsgTallocate = 138 MsgRallocate = 139 + MsgTchannel = 250 + MsgRchannel = 251 ) // QIDType represents the file type for QIDs. diff --git a/pkg/p9/server.go b/pkg/p9/server.go index b294efbb0..69c886a5d 100644 --- a/pkg/p9/server.go +++ b/pkg/p9/server.go @@ -21,6 +21,9 @@ import ( "sync/atomic" "syscall" + "gvisor.dev/gvisor/pkg/fd" + "gvisor.dev/gvisor/pkg/fdchannel" + "gvisor.dev/gvisor/pkg/flipcall" "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/unet" ) @@ -45,7 +48,6 @@ type Server struct { } // NewServer returns a new server. -// func NewServer(attacher Attacher) *Server { return &Server{ attacher: attacher, @@ -85,6 +87,8 @@ type connState struct { // version 0 implies 9P2000.L. version uint32 + // -- below relates to the legacy handler -- + // recvOkay indicates that a receive may start. recvOkay chan bool @@ -93,6 +97,20 @@ type connState struct { // sendDone is signalled when a send is finished. sendDone chan error + + // -- below relates to the flipcall handler -- + + // channelMu protects below. + channelMu sync.Mutex + + // channelWg represents active workers. + channelWg sync.WaitGroup + + // channelAlloc allocates channel memory. + channelAlloc *flipcall.PacketWindowAllocator + + // channels are the set of initialized channels. + channels []*channel } // fidRef wraps a node and tracks references. @@ -386,6 +404,99 @@ func (cs *connState) WaitTag(t Tag) { <-ch } +// initializeChannels initializes all channels. +// +// This is a no-op if channels are already initialized. +func (cs *connState) initializeChannels() (err error) { + cs.channelMu.Lock() + defer cs.channelMu.Unlock() + + // Initialize our channel allocator. + if cs.channelAlloc == nil { + alloc, err := flipcall.NewPacketWindowAllocator() + if err != nil { + return err + } + cs.channelAlloc = alloc + } + + // Create all the channels. + for len(cs.channels) < channelsPerClient { + res := &channel{ + done: make(chan struct{}), + } + + res.desc, err = cs.channelAlloc.Allocate(channelSize) + if err != nil { + return err + } + if err := res.data.Init(flipcall.ServerSide, res.desc); err != nil { + return err + } + + socks, err := fdchannel.NewConnectedSockets() + if err != nil { + res.data.Destroy() // Cleanup. + return err + } + res.fds.Init(socks[0]) + res.client = fd.New(socks[1]) + + cs.channels = append(cs.channels, res) + + // Start servicing the channel. + // + // When we call stop, we will close all the channels and these + // routines should finish. We need the wait group to ensure + // that active handlers are actually finished before cleanup. + cs.channelWg.Add(1) + go func() { // S/R-SAFE: Server side. + defer cs.channelWg.Done() + res.service(cs) + }() + } + + return nil +} + +// lookupChannel looks up the channel with given id. +// +// The function returns nil if no such channel is available. +func (cs *connState) lookupChannel(id uint32) *channel { + cs.channelMu.Lock() + defer cs.channelMu.Unlock() + if id >= uint32(len(cs.channels)) { + return nil + } + return cs.channels[id] +} + +// handle handles a single message. +func (cs *connState) handle(m message) (r message) { + defer func() { + if r == nil { + // Don't allow a panic to propagate. + recover() + + // Include a useful log message. + log.Warningf("panic in handler: %s", debug.Stack()) + + // Wrap in an EFAULT error; we don't really have a + // better way to describe this kind of error. It will + // usually manifest as a result of the test framework. + r = newErr(syscall.EFAULT) + } + }() + if handler, ok := m.(handler); ok { + // Call the message handler. + r = handler.handle(cs) + } else { + // Produce an ENOSYS error. + r = newErr(syscall.ENOSYS) + } + return +} + // handleRequest handles a single request. // // The recvDone channel is signaled when recv is done (with a error if @@ -428,41 +539,20 @@ func (cs *connState) handleRequest() { } // Handle the message. - var r message // r is the response. - defer func() { - if r == nil { - // Don't allow a panic to propagate. - recover() + r := cs.handle(m) - // Include a useful log message. - log.Warningf("panic in handler: %s", debug.Stack()) + // Clear the tag before sending. That's because as soon as this hits + // the wire, the client can legally send the same tag. + cs.ClearTag(tag) - // Wrap in an EFAULT error; we don't really have a - // better way to describe this kind of error. It will - // usually manifest as a result of the test framework. - r = newErr(syscall.EFAULT) - } + // Send back the result. + cs.sendMu.Lock() + err = send(cs.conn, tag, r) + cs.sendMu.Unlock() + cs.sendDone <- err - // Clear the tag before sending. That's because as soon as this - // hits the wire, the client can legally send another message - // with the same tag. - cs.ClearTag(tag) - - // Send back the result. - cs.sendMu.Lock() - err = send(cs.conn, tag, r) - cs.sendMu.Unlock() - cs.sendDone <- err - }() - if handler, ok := m.(handler); ok { - // Call the message handler. - r = handler.handle(cs) - } else { - // Produce an ENOSYS error. - r = newErr(syscall.ENOSYS) - } + // Return the message to the cache. msgRegistry.put(m) - m = nil // 'm' should not be touched after this point. } func (cs *connState) handleRequests() { @@ -477,7 +567,27 @@ func (cs *connState) stop() { close(cs.recvDone) close(cs.sendDone) - for _, fidRef := range cs.fids { + // Free the channels. + cs.channelMu.Lock() + for _, ch := range cs.channels { + ch.Shutdown() + } + cs.channelWg.Wait() + for _, ch := range cs.channels { + ch.Close() + } + cs.channels = nil // Clear. + cs.channelMu.Unlock() + + // Free the channel memory. + if cs.channelAlloc != nil { + cs.channelAlloc.Destroy() + } + + // Close all remaining fids. + for fid, fidRef := range cs.fids { + delete(cs.fids, fid) + // Drop final reference in the FID table. Note this should // always close the file, since we've ensured that there are no // handlers running via the wait for Pending => 0 below. @@ -510,7 +620,7 @@ func (cs *connState) service() error { for i := 0; i < pending; i++ { <-cs.sendDone } - return err + return nil } // This handler is now pending. diff --git a/pkg/p9/transport.go b/pkg/p9/transport.go index 5648df589..6e8b4bbcd 100644 --- a/pkg/p9/transport.go +++ b/pkg/p9/transport.go @@ -54,7 +54,10 @@ const ( headerLength uint32 = 7 // maximumLength is the largest possible message. - maximumLength uint32 = 4 * 1024 * 1024 + maximumLength uint32 = 1 << 20 + + // DefaultMessageSize is a sensible default. + DefaultMessageSize uint32 = 64 << 10 // initialBufferLength is the initial data buffer we allocate. initialBufferLength uint32 = 64 diff --git a/pkg/p9/transport_flipcall.go b/pkg/p9/transport_flipcall.go new file mode 100755 index 000000000..aebb54959 --- /dev/null +++ b/pkg/p9/transport_flipcall.go @@ -0,0 +1,254 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package p9 + +import ( + "runtime" + "syscall" + + "gvisor.dev/gvisor/pkg/fd" + "gvisor.dev/gvisor/pkg/fdchannel" + "gvisor.dev/gvisor/pkg/flipcall" + "gvisor.dev/gvisor/pkg/log" +) + +// channelsPerClient is the number of channels to create per client. +// +// While the client and server will generally agree on this number, in reality +// it's completely up to the server. We simply define a minimum of 2, and a +// maximum of 4, and select the number of available processes as a tie-breaker. +// Note that we don't want the number of channels to be too large, because each +// will account for channelSize memory used, which can be large. +var channelsPerClient = func() int { + n := runtime.NumCPU() + if n < 2 { + return 2 + } + if n > 4 { + return 4 + } + return n +}() + +// channelSize is the channel size to create. +// +// We simply ensure that this is larger than the largest possible message size, +// plus the flipcall packet header, plus the two bytes we write below. +const channelSize = int(2 + flipcall.PacketHeaderBytes + 2 + maximumLength) + +// channel is a fast IPC channel. +// +// The same object is used by both the server and client implementations. In +// general, the client will use only the send and recv methods. +type channel struct { + desc flipcall.PacketWindowDescriptor + data flipcall.Endpoint + fds fdchannel.Endpoint + buf buffer + + // -- client only -- + connected bool + + // -- server only -- + client *fd.FD + done chan struct{} +} + +// reset resets the channel buffer. +func (ch *channel) reset(sz uint32) { + ch.buf.data = ch.data.Data()[:sz] +} + +// service services the channel. +func (ch *channel) service(cs *connState) error { + rsz, err := ch.data.RecvFirst() + if err != nil { + return err + } + for rsz > 0 { + m, err := ch.recv(nil, rsz) + if err != nil { + return err + } + r := cs.handle(m) + msgRegistry.put(m) + rsz, err = ch.send(r) + if err != nil { + return err + } + } + return nil // Done. +} + +// Shutdown shuts down the channel. +// +// This must be called before Close. +func (ch *channel) Shutdown() { + ch.data.Shutdown() +} + +// Close closes the channel. +// +// This must only be called once, and cannot return an error. Note that +// synchronization for this method is provided at a high-level, depending on +// whether it is the client or server. This cannot be called while there are +// active callers in either service or sendRecv. +// +// Precondition: the channel should be shutdown. +func (ch *channel) Close() error { + // Close all backing transports. + ch.fds.Destroy() + ch.data.Destroy() + if ch.client != nil { + ch.client.Close() + } + return nil +} + +// send sends the given message. +// +// The return value is the size of the received response. Not that in the +// server case, this is the size of the next request. +func (ch *channel) send(m message) (uint32, error) { + if log.IsLogging(log.Debug) { + log.Debugf("send [channel @%p] %s", ch, m.String()) + } + + // Send any file payload. + sentFD := false + if filer, ok := m.(filer); ok { + if f := filer.FilePayload(); f != nil { + if err := ch.fds.SendFD(f.FD()); err != nil { + return 0, syscall.EIO // Map everything to EIO. + } + f.Close() // Per sendRecvLegacy. + sentFD = true // To mark below. + } + } + + // Encode the message. + // + // Note that IPC itself encodes the length of messages, so we don't + // need to encode a standard 9P header. We write only the message type. + ch.reset(0) + + ch.buf.WriteMsgType(m.Type()) + if sentFD { + ch.buf.Write8(1) // Incoming FD. + } else { + ch.buf.Write8(0) // No incoming FD. + } + m.Encode(&ch.buf) + ssz := uint32(len(ch.buf.data)) // Updated below. + + // Is there a payload? + if payloader, ok := m.(payloader); ok { + p := payloader.Payload() + copy(ch.data.Data()[ssz:], p) + ssz += uint32(len(p)) + } + + // Perform the one-shot communication. + n, err := ch.data.SendRecv(ssz) + if err != nil { + if n > 0 { + return n, nil + } + return 0, syscall.EIO // See above. + } + + return n, nil +} + +// recv decodes a message that exists on the channel. +// +// If the passed r is non-nil, then the type must match or an error will be +// generated. If the passed r is nil, then a new message will be created and +// returned. +func (ch *channel) recv(r message, rsz uint32) (message, error) { + // Decode the response from the inline buffer. + ch.reset(rsz) + t := ch.buf.ReadMsgType() + hasFD := ch.buf.Read8() != 0 + if t == MsgRlerror { + // Change the message type. We check for this special case + // after decoding below, and transform into an error. + r = &Rlerror{} + } else if r == nil { + nr, err := msgRegistry.get(0, t) + if err != nil { + return nil, err + } + r = nr // New message. + } else if t != r.Type() { + // Not an error and not the expected response; propagate. + return nil, &ErrBadResponse{Got: t, Want: r.Type()} + } + + // Is there a payload? Set to the latter portion. + if payloader, ok := r.(payloader); ok { + fs := payloader.FixedSize() + payloader.SetPayload(ch.buf.data[fs:]) + ch.buf.data = ch.buf.data[:fs] + } + + r.Decode(&ch.buf) + if ch.buf.isOverrun() { + // Nothing valid was available. + log.Debugf("recv [got %d bytes, needed more]", rsz) + return nil, ErrNoValidMessage + } + + // Read any FD result. + if hasFD { + if rfd, err := ch.fds.RecvFDNonblock(); err == nil { + f := fd.New(rfd) + if filer, ok := r.(filer); ok { + // Set the payload. + filer.SetFilePayload(f) + } else { + // Don't want the FD. + f.Close() + } + } else { + // The header bit was set but nothing came in. + log.Warningf("expected FD, got err: %v", err) + } + } + + // Log a message. + if log.IsLogging(log.Debug) { + log.Debugf("recv [channel @%p] %s", ch, r.String()) + } + + // Convert errors appropriately; see above. + if rlerr, ok := r.(*Rlerror); ok { + return nil, syscall.Errno(rlerr.Error) + } + + return r, nil +} + +// sendRecv sends the given message over the channel. +// +// This is used by the client. +func (ch *channel) sendRecv(c *Client, m, r message) error { + rsz, err := ch.send(m) + if err != nil { + return err + } + _, err = ch.recv(r, rsz) + return err +} diff --git a/pkg/p9/version.go b/pkg/p9/version.go index c2a2885ae..f1ffdd23a 100644 --- a/pkg/p9/version.go +++ b/pkg/p9/version.go @@ -26,7 +26,7 @@ const ( // // Clients are expected to start requesting this version number and // to continuously decrement it until a Tversion request succeeds. - highestSupportedVersion uint32 = 7 + highestSupportedVersion uint32 = 8 // lowestSupportedVersion is the lowest supported version X in a // version string of the format 9P2000.L.Google.X. @@ -148,3 +148,10 @@ func VersionSupportsMultiUser(v uint32) bool { func versionSupportsTallocate(v uint32) bool { return v >= 7 } + +// versionSupportsFlipcall returns true if version v supports IPC channels from +// the flipcall package. Note that these must be negotiated, but this version +// string indicates that such a facility exists. +func versionSupportsFlipcall(v uint32) bool { + return v >= 8 +} |