diff options
author | Googler <noreply@google.com> | 2018-04-27 10:37:02 -0700 |
---|---|---|
committer | Adin Scannell <ascannell@google.com> | 2018-04-28 01:44:26 -0400 |
commit | d02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch) | |
tree | 54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/tcpip/transport/unix | |
parent | f70210e742919f40aa2f0934a22f1c9ba6dada62 (diff) |
Check in gVisor.
PiperOrigin-RevId: 194583126
Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/tcpip/transport/unix')
-rw-r--r-- | pkg/tcpip/transport/unix/BUILD | 37 | ||||
-rw-r--r-- | pkg/tcpip/transport/unix/connectioned.go | 431 | ||||
-rw-r--r-- | pkg/tcpip/transport/unix/connectioned_state.go | 43 | ||||
-rw-r--r-- | pkg/tcpip/transport/unix/connectionless.go | 176 | ||||
-rw-r--r-- | pkg/tcpip/transport/unix/unix.go | 902 |
5 files changed, 1589 insertions, 0 deletions
diff --git a/pkg/tcpip/transport/unix/BUILD b/pkg/tcpip/transport/unix/BUILD new file mode 100644 index 000000000..47bc7a649 --- /dev/null +++ b/pkg/tcpip/transport/unix/BUILD @@ -0,0 +1,37 @@ +package(licenses = ["notice"]) # BSD + +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("//tools/go_stateify:defs.bzl", "go_stateify") + +go_stateify( + name = "unix_state", + srcs = [ + "connectioned.go", + "connectionless.go", + "unix.go", + ], + out = "unix_state.go", + package = "unix", +) + +go_library( + name = "unix", + srcs = [ + "connectioned.go", + "connectioned_state.go", + "connectionless.go", + "unix.go", + "unix_state.go", + ], + importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/unix", + visibility = ["//:sandbox"], + deps = [ + "//pkg/ilist", + "//pkg/log", + "//pkg/state", + "//pkg/tcpip", + "//pkg/tcpip/buffer", + "//pkg/tcpip/transport/queue", + "//pkg/waiter", + ], +) diff --git a/pkg/tcpip/transport/unix/connectioned.go b/pkg/tcpip/transport/unix/connectioned.go new file mode 100644 index 000000000..def1b2c99 --- /dev/null +++ b/pkg/tcpip/transport/unix/connectioned.go @@ -0,0 +1,431 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package unix + +import ( + "sync" + + "gvisor.googlesource.com/gvisor/pkg/tcpip" + "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/queue" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// UniqueIDProvider generates a sequence of unique identifiers useful for, +// among other things, lock ordering. +type UniqueIDProvider interface { + // UniqueID returns a new unique identifier. + UniqueID() uint64 +} + +// A ConnectingEndpoint is a connectioned unix endpoint that is attempting to +// establish a bidirectional connection with a BoundEndpoint. +type ConnectingEndpoint interface { + // ID returns the endpoint's globally unique identifier. This identifier + // must be used to determine locking order if more than one endpoint is + // to be locked in the same codepath. The endpoint with the smaller + // identifier must be locked before endpoints with larger identifiers. + ID() uint64 + + // Passcred implements socket.Credentialer.Passcred. + Passcred() bool + + // Type returns the socket type, typically either SockStream or + // SockSeqpacket. The connection attempt must be aborted if this + // value doesn't match the ConnectableEndpoint's type. + Type() SockType + + // GetLocalAddress returns the bound path. + GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) + + // Locker protects the following methods. While locked, only the holder of + // the lock can change the return value of the protected methods. + sync.Locker + + // Connected returns true iff the ConnectingEndpoint is in the connected + // state. ConnectingEndpoints can only be connected to a single endpoint, + // so the connection attempt must be aborted if this returns true. + Connected() bool + + // Listening returns true iff the ConnectingEndpoint is in the listening + // state. ConnectingEndpoints cannot make connections while listening, so + // the connection attempt must be aborted if this returns true. + Listening() bool + + // WaiterQueue returns a pointer to the endpoint's waiter queue. + WaiterQueue() *waiter.Queue +} + +// connectionedEndpoint is a Unix-domain connected or connectable endpoint and implements +// ConnectingEndpoint, ConnectableEndpoint and tcpip.Endpoint. +// +// connectionedEndpoints must be in connected state in order to transfer data. +// +// This implementation includes STREAM and SEQPACKET Unix sockets created with +// socket(2), accept(2) or socketpair(2) and dgram unix sockets created with +// socketpair(2). See unix_connectionless.go for the implementation of DGRAM +// Unix sockets created with socket(2). +// +// The state is much simpler than a TCP endpoint, so it is not encoded +// explicitly. Instead we enforce the following invariants: +// +// receiver != nil, connected != nil => connected. +// path != "" && acceptedChan == nil => bound, not listening. +// path != "" && acceptedChan != nil => bound and listening. +// +// Only one of these will be true at any moment. +type connectionedEndpoint struct { + baseEndpoint + + // id is the unique endpoint identifier. This is used exclusively for + // lock ordering within connect. + id uint64 + + // idGenerator is used to generate new unique endpoint identifiers. + idGenerator UniqueIDProvider + + // stype is used by connecting sockets to ensure that they are the + // same type. The value is typically either tcpip.SockSeqpacket or + // tcpip.SockStream. + stype SockType + + // acceptedChan is per the TCP endpoint implementation. Note that the + // sockets in this channel are _already in the connected state_, and + // have another associated connectionedEndpoint. + // + // If nil, then no listen call has been made. + acceptedChan chan *connectionedEndpoint `state:".([]*connectionedEndpoint)"` +} + +// NewConnectioned creates a new unbound connectionedEndpoint. +func NewConnectioned(stype SockType, uid UniqueIDProvider) Endpoint { + return &connectionedEndpoint{ + baseEndpoint: baseEndpoint{Queue: &waiter.Queue{}}, + id: uid.UniqueID(), + idGenerator: uid, + stype: stype, + } +} + +// NewPair allocates a new pair of connected unix-domain connectionedEndpoints. +func NewPair(stype SockType, uid UniqueIDProvider) (Endpoint, Endpoint) { + a := &connectionedEndpoint{ + baseEndpoint: baseEndpoint{Queue: &waiter.Queue{}}, + id: uid.UniqueID(), + idGenerator: uid, + stype: stype, + } + b := &connectionedEndpoint{ + baseEndpoint: baseEndpoint{Queue: &waiter.Queue{}}, + id: uid.UniqueID(), + idGenerator: uid, + stype: stype, + } + + q1 := queue.New(a.Queue, b.Queue, initialLimit) + q2 := queue.New(b.Queue, a.Queue, initialLimit) + + if stype == SockStream { + a.receiver = &streamQueueReceiver{queueReceiver: queueReceiver{q1}} + b.receiver = &streamQueueReceiver{queueReceiver: queueReceiver{q2}} + } else { + a.receiver = &queueReceiver{q1} + b.receiver = &queueReceiver{q2} + } + + a.connected = &connectedEndpoint{ + endpoint: b, + writeQueue: q2, + } + b.connected = &connectedEndpoint{ + endpoint: a, + writeQueue: q1, + } + + return a, b +} + +// ID implements ConnectingEndpoint.ID. +func (e *connectionedEndpoint) ID() uint64 { + return e.id +} + +// Type implements ConnectingEndpoint.Type and Endpoint.Type. +func (e *connectionedEndpoint) Type() SockType { + return e.stype +} + +// WaiterQueue implements ConnectingEndpoint.WaiterQueue. +func (e *connectionedEndpoint) WaiterQueue() *waiter.Queue { + return e.Queue +} + +// isBound returns true iff the connectionedEndpoint is bound (but not +// listening). +func (e *connectionedEndpoint) isBound() bool { + return e.path != "" && e.acceptedChan == nil +} + +// Listening implements ConnectingEndpoint.Listening. +func (e *connectionedEndpoint) Listening() bool { + return e.acceptedChan != nil +} + +// Close puts the connectionedEndpoint in a closed state and frees all +// resources associated with it. +// +// The socket will be a fresh state after a call to close and may be reused. +// That is, close may be used to "unbind" or "disconnect" the socket in error +// paths. +func (e *connectionedEndpoint) Close() { + e.Lock() + var c ConnectedEndpoint + var r Receiver + switch { + case e.Connected(): + e.connected.CloseSend() + e.receiver.CloseRecv() + c = e.connected + r = e.receiver + e.connected = nil + e.receiver = nil + case e.isBound(): + e.path = "" + case e.Listening(): + close(e.acceptedChan) + for n := range e.acceptedChan { + n.Close() + } + e.acceptedChan = nil + e.path = "" + } + e.Unlock() + if c != nil { + c.CloseNotify() + c.Release() + } + if r != nil { + r.CloseNotify() + r.Release() + } +} + +// BidirectionalConnect implements BoundEndpoint.BidirectionalConnect. +func (e *connectionedEndpoint) BidirectionalConnect(ce ConnectingEndpoint, returnConnect func(Receiver, ConnectedEndpoint)) *tcpip.Error { + if ce.Type() != e.stype { + return tcpip.ErrConnectionRefused + } + + // Check if ce is e to avoid a deadlock. + if ce, ok := ce.(*connectionedEndpoint); ok && ce == e { + return tcpip.ErrInvalidEndpointState + } + + // Do a dance to safely acquire locks on both endpoints. + if e.id < ce.ID() { + e.Lock() + ce.Lock() + } else { + ce.Lock() + e.Lock() + } + + // Check connecting state. + if ce.Connected() { + e.Unlock() + ce.Unlock() + return tcpip.ErrAlreadyConnected + } + if ce.Listening() { + e.Unlock() + ce.Unlock() + return tcpip.ErrInvalidEndpointState + } + + // Check bound state. + if !e.Listening() { + e.Unlock() + ce.Unlock() + return tcpip.ErrConnectionRefused + } + + // Create a newly bound connectionedEndpoint. + ne := &connectionedEndpoint{ + baseEndpoint: baseEndpoint{ + path: e.path, + Queue: &waiter.Queue{}, + }, + id: e.idGenerator.UniqueID(), + idGenerator: e.idGenerator, + stype: e.stype, + } + readQueue := queue.New(ce.WaiterQueue(), ne.Queue, initialLimit) + writeQueue := queue.New(ne.Queue, ce.WaiterQueue(), initialLimit) + ne.connected = &connectedEndpoint{ + endpoint: ce, + writeQueue: readQueue, + } + if e.stype == SockStream { + ne.receiver = &streamQueueReceiver{queueReceiver: queueReceiver{readQueue: writeQueue}} + } else { + ne.receiver = &queueReceiver{readQueue: writeQueue} + } + + select { + case e.acceptedChan <- ne: + // Commit state. + connected := &connectedEndpoint{ + endpoint: ne, + writeQueue: writeQueue, + } + if e.stype == SockStream { + returnConnect(&streamQueueReceiver{queueReceiver: queueReceiver{readQueue: readQueue}}, connected) + } else { + returnConnect(&queueReceiver{readQueue: readQueue}, connected) + } + + // Notify can deadlock if we are holding these locks. + e.Unlock() + ce.Unlock() + + // Notify on both ends. + e.Notify(waiter.EventIn) + ce.WaiterQueue().Notify(waiter.EventOut) + + return nil + default: + // Busy; return ECONNREFUSED per spec. + ne.Close() + e.Unlock() + ce.Unlock() + return tcpip.ErrConnectionRefused + } +} + +// UnidirectionalConnect implements BoundEndpoint.UnidirectionalConnect. +func (e *connectionedEndpoint) UnidirectionalConnect() (ConnectedEndpoint, *tcpip.Error) { + return nil, tcpip.ErrConnectionRefused +} + +// Connect attempts to directly connect to another Endpoint. +// Implements Endpoint.Connect. +func (e *connectionedEndpoint) Connect(server BoundEndpoint) *tcpip.Error { + returnConnect := func(r Receiver, ce ConnectedEndpoint) { + e.receiver = r + e.connected = ce + } + + return server.BidirectionalConnect(e, returnConnect) +} + +// Listen starts listening on the connection. +func (e *connectionedEndpoint) Listen(backlog int) *tcpip.Error { + e.Lock() + defer e.Unlock() + if e.Listening() { + // Adjust the size of the channel iff we can fix existing + // pending connections into the new one. + if len(e.acceptedChan) > backlog { + return tcpip.ErrInvalidEndpointState + } + origChan := e.acceptedChan + e.acceptedChan = make(chan *connectionedEndpoint, backlog) + close(origChan) + for ep := range origChan { + e.acceptedChan <- ep + } + return nil + } + if !e.isBound() { + return tcpip.ErrInvalidEndpointState + } + + // Normal case. + e.acceptedChan = make(chan *connectionedEndpoint, backlog) + return nil +} + +// Accept accepts a new connection. +func (e *connectionedEndpoint) Accept() (Endpoint, *tcpip.Error) { + e.Lock() + defer e.Unlock() + + if !e.Listening() { + return nil, tcpip.ErrInvalidEndpointState + } + + select { + case ne := <-e.acceptedChan: + return ne, nil + + default: + // Nothing left. + return nil, tcpip.ErrWouldBlock + } +} + +// Bind binds the connection. +// +// For Unix connectionedEndpoints, this _only sets the address associated with +// the socket_. Work associated with sockets in the filesystem or finding those +// sockets must be done by a higher level. +// +// Bind will fail only if the socket is connected, bound or the passed address +// is invalid (the empty string). +func (e *connectionedEndpoint) Bind(addr tcpip.FullAddress, commit func() *tcpip.Error) *tcpip.Error { + e.Lock() + defer e.Unlock() + if e.isBound() || e.Listening() { + return tcpip.ErrAlreadyBound + } + if addr.Addr == "" { + // The empty string is not permitted. + return tcpip.ErrBadLocalAddress + } + if commit != nil { + if err := commit(); err != nil { + return err + } + } + + // Save the bound address. + e.path = string(addr.Addr) + return nil +} + +// SendMsg writes data and a control message to the endpoint's peer. +// This method does not block if the data cannot be written. +func (e *connectionedEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoint) (uintptr, *tcpip.Error) { + // Stream sockets do not support specifying the endpoint. Seqpacket + // sockets ignore the passed endpoint. + if e.stype == SockStream && to != nil { + return 0, tcpip.ErrNotSupported + } + return e.baseEndpoint.SendMsg(data, c, to) +} + +// Readiness returns the current readiness of the connectionedEndpoint. For +// example, if waiter.EventIn is set, the connectionedEndpoint is immediately +// readable. +func (e *connectionedEndpoint) Readiness(mask waiter.EventMask) waiter.EventMask { + e.Lock() + defer e.Unlock() + + ready := waiter.EventMask(0) + switch { + case e.Connected(): + if mask&waiter.EventIn != 0 && e.receiver.Readable() { + ready |= waiter.EventIn + } + if mask&waiter.EventOut != 0 && e.connected.Writable() { + ready |= waiter.EventOut + } + case e.Listening(): + if mask&waiter.EventIn != 0 && len(e.acceptedChan) > 0 { + ready |= waiter.EventIn + } + } + + return ready +} diff --git a/pkg/tcpip/transport/unix/connectioned_state.go b/pkg/tcpip/transport/unix/connectioned_state.go new file mode 100644 index 000000000..5d835c8b2 --- /dev/null +++ b/pkg/tcpip/transport/unix/connectioned_state.go @@ -0,0 +1,43 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package unix + +// saveAcceptedChan is invoked by stateify. +func (e *connectionedEndpoint) saveAcceptedChan() []*connectionedEndpoint { + // If acceptedChan is nil (i.e. we are not listening) then we will save nil. + // Otherwise we create a (possibly empty) slice of the values in acceptedChan and + // save that. + var acceptedSlice []*connectionedEndpoint + if e.acceptedChan != nil { + // Swap out acceptedChan with a new empty channel of the same capacity. + saveChan := e.acceptedChan + e.acceptedChan = make(chan *connectionedEndpoint, cap(saveChan)) + + // Create a new slice with the same len and capacity as the channel. + acceptedSlice = make([]*connectionedEndpoint, len(saveChan), cap(saveChan)) + // Drain acceptedChan into saveSlice, and fill up the new acceptChan at the + // same time. + for i := range acceptedSlice { + ep := <-saveChan + acceptedSlice[i] = ep + e.acceptedChan <- ep + } + close(saveChan) + } + return acceptedSlice +} + +// loadAcceptedChan is invoked by stateify. +func (e *connectionedEndpoint) loadAcceptedChan(acceptedSlice []*connectionedEndpoint) { + // If acceptedSlice is nil, then acceptedChan should also be nil. + if acceptedSlice != nil { + // Otherwise, create a new channel with the same capacity as acceptedSlice. + e.acceptedChan = make(chan *connectionedEndpoint, cap(acceptedSlice)) + // Seed the channel with values from acceptedSlice. + for _, ep := range acceptedSlice { + e.acceptedChan <- ep + } + } +} diff --git a/pkg/tcpip/transport/unix/connectionless.go b/pkg/tcpip/transport/unix/connectionless.go new file mode 100644 index 000000000..34d34f99a --- /dev/null +++ b/pkg/tcpip/transport/unix/connectionless.go @@ -0,0 +1,176 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package unix + +import ( + "gvisor.googlesource.com/gvisor/pkg/tcpip" + "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/queue" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// connectionlessEndpoint is a unix endpoint for unix sockets that support operating in +// a conectionless fashon. +// +// Specifically, this means datagram unix sockets not created with +// socketpair(2). +type connectionlessEndpoint struct { + baseEndpoint +} + +// NewConnectionless creates a new unbound dgram endpoint. +func NewConnectionless() Endpoint { + ep := &connectionlessEndpoint{baseEndpoint{Queue: &waiter.Queue{}}} + ep.receiver = &queueReceiver{readQueue: queue.New(&waiter.Queue{}, ep.Queue, initialLimit)} + return ep +} + +// isBound returns true iff the endpoint is bound. +func (e *connectionlessEndpoint) isBound() bool { + return e.path != "" +} + +// Close puts the endpoint in a closed state and frees all resources associated +// with it. +// +// The socket will be a fresh state after a call to close and may be reused. +// That is, close may be used to "unbind" or "disconnect" the socket in error +// paths. +func (e *connectionlessEndpoint) Close() { + e.Lock() + var r Receiver + if e.Connected() { + e.receiver.CloseRecv() + r = e.receiver + e.receiver = nil + + e.connected.Release() + e.connected = nil + } + if e.isBound() { + e.path = "" + } + e.Unlock() + if r != nil { + r.CloseNotify() + r.Release() + } +} + +// BidirectionalConnect implements BoundEndpoint.BidirectionalConnect. +func (e *connectionlessEndpoint) BidirectionalConnect(ce ConnectingEndpoint, returnConnect func(Receiver, ConnectedEndpoint)) *tcpip.Error { + return tcpip.ErrConnectionRefused +} + +// UnidirectionalConnect implements BoundEndpoint.UnidirectionalConnect. +func (e *connectionlessEndpoint) UnidirectionalConnect() (ConnectedEndpoint, *tcpip.Error) { + return &connectedEndpoint{ + endpoint: e, + writeQueue: e.receiver.(*queueReceiver).readQueue, + }, nil +} + +// SendMsg writes data and a control message to the specified endpoint. +// This method does not block if the data cannot be written. +func (e *connectionlessEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoint) (uintptr, *tcpip.Error) { + if to == nil { + return e.baseEndpoint.SendMsg(data, c, nil) + } + + connected, err := to.UnidirectionalConnect() + if err != nil { + return 0, tcpip.ErrInvalidEndpointState + } + defer connected.Release() + + e.Lock() + n, notify, err := connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)}) + e.Unlock() + if err != nil { + return 0, err + } + if notify { + connected.SendNotify() + } + + return n, nil +} + +// Type implements Endpoint.Type. +func (e *connectionlessEndpoint) Type() SockType { + return SockDgram +} + +// Connect attempts to connect directly to server. +func (e *connectionlessEndpoint) Connect(server BoundEndpoint) *tcpip.Error { + connected, err := server.UnidirectionalConnect() + if err != nil { + return err + } + + e.Lock() + e.connected = connected + e.Unlock() + + return nil +} + +// Listen starts listening on the connection. +func (e *connectionlessEndpoint) Listen(int) *tcpip.Error { + return tcpip.ErrNotSupported +} + +// Accept accepts a new connection. +func (e *connectionlessEndpoint) Accept() (Endpoint, *tcpip.Error) { + return nil, tcpip.ErrNotSupported +} + +// Bind binds the connection. +// +// For Unix endpoints, this _only sets the address associated with the socket_. +// Work associated with sockets in the filesystem or finding those sockets must +// be done by a higher level. +// +// Bind will fail only if the socket is connected, bound or the passed address +// is invalid (the empty string). +func (e *connectionlessEndpoint) Bind(addr tcpip.FullAddress, commit func() *tcpip.Error) *tcpip.Error { + e.Lock() + defer e.Unlock() + if e.isBound() { + return tcpip.ErrAlreadyBound + } + if addr.Addr == "" { + // The empty string is not permitted. + return tcpip.ErrBadLocalAddress + } + if commit != nil { + if err := commit(); err != nil { + return err + } + } + + // Save the bound address. + e.path = string(addr.Addr) + return nil +} + +// Readiness returns the current readiness of the endpoint. For example, if +// waiter.EventIn is set, the endpoint is immediately readable. +func (e *connectionlessEndpoint) Readiness(mask waiter.EventMask) waiter.EventMask { + e.Lock() + defer e.Unlock() + + ready := waiter.EventMask(0) + if mask&waiter.EventIn != 0 && e.receiver.Readable() { + ready |= waiter.EventIn + } + + if e.Connected() { + if mask&waiter.EventOut != 0 && e.connected.Writable() { + ready |= waiter.EventOut + } + } + + return ready +} diff --git a/pkg/tcpip/transport/unix/unix.go b/pkg/tcpip/transport/unix/unix.go new file mode 100644 index 000000000..5fe37eb71 --- /dev/null +++ b/pkg/tcpip/transport/unix/unix.go @@ -0,0 +1,902 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package unix contains the implementation of Unix endpoints. +package unix + +import ( + "sync" + "sync/atomic" + + "gvisor.googlesource.com/gvisor/pkg/ilist" + "gvisor.googlesource.com/gvisor/pkg/tcpip" + "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" + "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/queue" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// initialLimit is the starting limit for the socket buffers. +const initialLimit = 16 * 1024 + +// A SockType is a type (as opposed to family) of sockets. These are enumerated +// in the syscall package as syscall.SOCK_* constants. +type SockType int + +const ( + // SockStream corresponds to syscall.SOCK_STREAM. + SockStream SockType = 1 + // SockDgram corresponds to syscall.SOCK_DGRAM. + SockDgram SockType = 2 + // SockRaw corresponds to syscall.SOCK_RAW. + SockRaw SockType = 3 + // SockSeqpacket corresponds to syscall.SOCK_SEQPACKET. + SockSeqpacket SockType = 5 +) + +// A RightsControlMessage is a control message containing FDs. +type RightsControlMessage interface { + // Clone returns a copy of the RightsControlMessage. + Clone() RightsControlMessage + + // Release releases any resources owned by the RightsControlMessage. + Release() +} + +// A CredentialsControlMessage is a control message containing Unix credentials. +type CredentialsControlMessage interface { + // Equals returns true iff the two messages are equal. + Equals(CredentialsControlMessage) bool +} + +// A ControlMessages represents a collection of socket control messages. +type ControlMessages struct { + // Rights is a control message containing FDs. + Rights RightsControlMessage + + // Credentials is a control message containing Unix credentials. + Credentials CredentialsControlMessage +} + +// Empty returns true iff the ControlMessages does not contain either +// credentials or rights. +func (c *ControlMessages) Empty() bool { + return c.Rights == nil && c.Credentials == nil +} + +// Clone clones both the credentials and the rights. +func (c *ControlMessages) Clone() ControlMessages { + cm := ControlMessages{} + if c.Rights != nil { + cm.Rights = c.Rights.Clone() + } + cm.Credentials = c.Credentials + return cm +} + +// Release releases both the credentials and the rights. +func (c *ControlMessages) Release() { + if c.Rights != nil { + c.Rights.Release() + } + *c = ControlMessages{} +} + +// Endpoint is the interface implemented by Unix transport protocol +// implementations that expose functionality like sendmsg, recvmsg, connect, +// etc. to Unix socket implementations. +type Endpoint interface { + Credentialer + waiter.Waitable + + // Close puts the endpoint in a closed state and frees all resources + // associated with it. + Close() + + // RecvMsg reads data and a control message from the endpoint. This method + // does not block if there is no data pending. + // + // creds indicates if credential control messages are requested by the + // caller. This is useful for determining if control messages can be + // coalesced. creds is a hint and can be safely ignored by the + // implementation if no coalescing is possible. It is fine to return + // credential control messages when none were requested or to not return + // credential control messages when they were requested. + // + // numRights is the number of SCM_RIGHTS FDs requested by the caller. This + // is useful if one must allocate a buffer to receive a SCM_RIGHTS message + // or determine if control messages can be coalesced. numRights is a hint + // and can be safely ignored by the implementation if the number of + // available SCM_RIGHTS FDs is known and no coalescing is possible. It is + // fine for the returned number of SCM_RIGHTS FDs to be either higher or + // lower than the requested number. + // + // If peek is true, no data should be consumed from the Endpoint. Any and + // all data returned from a peek should be available in the next call to + // RecvMsg. + // + // recvLen is the number of bytes copied into data. + // + // msgLen is the length of the read message consumed for datagram Endpoints. + // msgLen is always the same as recvLen for stream Endpoints. + RecvMsg(data [][]byte, creds bool, numRights uintptr, peek bool, addr *tcpip.FullAddress) (recvLen, msgLen uintptr, cm ControlMessages, err *tcpip.Error) + + // SendMsg writes data and a control message to the endpoint's peer. + // This method does not block if the data cannot be written. + // + // SendMsg does not take ownership of any of its arguments on error. + SendMsg([][]byte, ControlMessages, BoundEndpoint) (uintptr, *tcpip.Error) + + // Connect connects this endpoint directly to another. + // + // This should be called on the client endpoint, and the (bound) + // endpoint passed in as a parameter. + // + // The error codes are the same as Connect. + Connect(server BoundEndpoint) *tcpip.Error + + // Shutdown closes the read and/or write end of the endpoint connection + // to its peer. + Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error + + // Listen puts the endpoint in "listen" mode, which allows it to accept + // new connections. + Listen(backlog int) *tcpip.Error + + // Accept returns a new endpoint if a peer has established a connection + // to an endpoint previously set to listen mode. This method does not + // block if no new connections are available. + // + // The returned Queue is the wait queue for the newly created endpoint. + Accept() (Endpoint, *tcpip.Error) + + // Bind binds the endpoint to a specific local address and port. + // Specifying a NIC is optional. + // + // An optional commit function will be executed atomically with respect + // to binding the endpoint. If this returns an error, the bind will not + // occur and the error will be propagated back to the caller. + Bind(address tcpip.FullAddress, commit func() *tcpip.Error) *tcpip.Error + + // Type return the socket type, typically either SockStream, SockDgram + // or SockSeqpacket. + Type() SockType + + // GetLocalAddress returns the address to which the endpoint is bound. + GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) + + // GetRemoteAddress returns the address to which the endpoint is + // connected. + GetRemoteAddress() (tcpip.FullAddress, *tcpip.Error) + + // SetSockOpt sets a socket option. opt should be one of the tcpip.*Option + // types. + SetSockOpt(opt interface{}) *tcpip.Error + + // GetSockOpt gets a socket option. opt should be a pointer to one of the + // tcpip.*Option types. + GetSockOpt(opt interface{}) *tcpip.Error +} + +// A Credentialer is a socket or endpoint that supports the SO_PASSCRED socket +// option. +type Credentialer interface { + // Passcred returns whether or not the SO_PASSCRED socket option is + // enabled on this end. + Passcred() bool + + // ConnectedPasscred returns whether or not the SO_PASSCRED socket option + // is enabled on the connected end. + ConnectedPasscred() bool +} + +// A BoundEndpoint is a unix endpoint that can be connected to. +type BoundEndpoint interface { + // BidirectionalConnect establishes a bi-directional connection between two + // unix endpoints in an all-or-nothing manner. If an error occurs during + // connecting, the state of neither endpoint should be modified. + // + // In order for an endpoint to establish such a bidirectional connection + // with a BoundEndpoint, the endpoint calls the BidirectionalConnect method + // on the BoundEndpoint and sends a representation of itself (the + // ConnectingEndpoint) and a callback (returnConnect) to receive the + // connection information (Receiver and ConnectedEndpoint) upon a + // successful connect. The callback should only be called on a successful + // connect. + // + // For a connection attempt to be successful, the ConnectingEndpoint must + // be unconnected and not listening and the BoundEndpoint whose + // BidirectionalConnect method is being called must be listening. + // + // This method will return tcpip.ErrConnectionRefused on endpoints with a + // type that isn't SockStream or SockSeqpacket. + BidirectionalConnect(ep ConnectingEndpoint, returnConnect func(Receiver, ConnectedEndpoint)) *tcpip.Error + + // UnidirectionalConnect establishes a write-only connection to a unix endpoint. + // + // This method will return tcpip.ErrConnectionRefused on a non-SockDgram + // endpoint. + UnidirectionalConnect() (ConnectedEndpoint, *tcpip.Error) + + // Release releases any resources held by the BoundEndpoint. It must be + // called before dropping all references to a BoundEndpoint returned by a + // function. + Release() +} + +// message represents a message passed over a Unix domain socket. +type message struct { + ilist.Entry + + // Data is the Message payload. + Data buffer.View + + // Control is auxiliary control message data that goes along with the + // data. + Control ControlMessages + + // Address is the bound address of the endpoint that sent the message. + // + // If the endpoint that sent the message is not bound, the Address is + // the empty string. + Address tcpip.FullAddress +} + +// Length returns number of bytes stored in the Message. +func (m *message) Length() int64 { + return int64(len(m.Data)) +} + +// Release releases any resources held by the Message. +func (m *message) Release() { + m.Control.Release() +} + +func (m *message) Peek() queue.Entry { + return &message{Data: m.Data, Control: m.Control.Clone(), Address: m.Address} +} + +// A Receiver can be used to receive Messages. +type Receiver interface { + // Recv receives a single message. This method does not block. + // + // See Endpoint.RecvMsg for documentation on shared arguments. + // + // notify indicates if RecvNotify should be called. + Recv(data [][]byte, creds bool, numRights uintptr, peek bool) (recvLen, msgLen uintptr, cm ControlMessages, source tcpip.FullAddress, notify bool, err *tcpip.Error) + + // RecvNotify notifies the Receiver of a successful Recv. This must not be + // called while holding any endpoint locks. + RecvNotify() + + // CloseRecv prevents the receiving of additional Messages. + // + // After CloseRecv is called, CloseNotify must also be called. + CloseRecv() + + // CloseNotify notifies the Receiver of recv being closed. This must not be + // called while holding any endpoint locks. + CloseNotify() + + // Readable returns if messages should be attempted to be received. This + // includes when read has been shutdown. + Readable() bool + + // RecvQueuedSize returns the total amount of data currently receivable. + // RecvQueuedSize should return -1 if the operation isn't supported. + RecvQueuedSize() int64 + + // RecvMaxQueueSize returns maximum value for RecvQueuedSize. + // RecvMaxQueueSize should return -1 if the operation isn't supported. + RecvMaxQueueSize() int64 + + // Release releases any resources owned by the Receiver. It should be + // called before droping all references to a Receiver. + Release() +} + +// queueReceiver implements Receiver for datagram sockets. +type queueReceiver struct { + readQueue *queue.Queue +} + +// Recv implements Receiver.Recv. +func (q *queueReceiver) Recv(data [][]byte, creds bool, numRights uintptr, peek bool) (uintptr, uintptr, ControlMessages, tcpip.FullAddress, bool, *tcpip.Error) { + var m queue.Entry + var notify bool + var err *tcpip.Error + if peek { + m, err = q.readQueue.Peek() + } else { + m, notify, err = q.readQueue.Dequeue() + } + if err != nil { + return 0, 0, ControlMessages{}, tcpip.FullAddress{}, false, err + } + msg := m.(*message) + src := []byte(msg.Data) + var copied uintptr + for i := 0; i < len(data) && len(src) > 0; i++ { + n := copy(data[i], src) + copied += uintptr(n) + src = src[n:] + } + return copied, uintptr(len(msg.Data)), msg.Control, msg.Address, notify, nil +} + +// RecvNotify implements Receiver.RecvNotify. +func (q *queueReceiver) RecvNotify() { + q.readQueue.WriterQueue.Notify(waiter.EventOut) +} + +// CloseNotify implements Receiver.CloseNotify. +func (q *queueReceiver) CloseNotify() { + q.readQueue.ReaderQueue.Notify(waiter.EventIn) + q.readQueue.WriterQueue.Notify(waiter.EventOut) +} + +// CloseRecv implements Receiver.CloseRecv. +func (q *queueReceiver) CloseRecv() { + q.readQueue.Close() +} + +// Readable implements Receiver.Readable. +func (q *queueReceiver) Readable() bool { + return q.readQueue.IsReadable() +} + +// RecvQueuedSize implements Receiver.RecvQueuedSize. +func (q *queueReceiver) RecvQueuedSize() int64 { + return q.readQueue.QueuedSize() +} + +// RecvMaxQueueSize implements Receiver.RecvMaxQueueSize. +func (q *queueReceiver) RecvMaxQueueSize() int64 { + return q.readQueue.MaxQueueSize() +} + +// Release implements Receiver.Release. +func (*queueReceiver) Release() {} + +// streamQueueReceiver implements Receiver for stream sockets. +type streamQueueReceiver struct { + queueReceiver + + mu sync.Mutex `state:"nosave"` + buffer []byte + control ControlMessages + addr tcpip.FullAddress +} + +func vecCopy(data [][]byte, buf []byte) (uintptr, [][]byte, []byte) { + var copied uintptr + for len(data) > 0 && len(buf) > 0 { + n := copy(data[0], buf) + copied += uintptr(n) + buf = buf[n:] + data[0] = data[0][n:] + if len(data[0]) == 0 { + data = data[1:] + } + } + return copied, data, buf +} + +// Readable implements Receiver.Readable. +func (q *streamQueueReceiver) Readable() bool { + // We're readable if we have data in our buffer or if the queue receiver is + // readable. + return len(q.buffer) > 0 || q.readQueue.IsReadable() +} + +// RecvQueuedSize implements Receiver.RecvQueuedSize. +func (q *streamQueueReceiver) RecvQueuedSize() int64 { + return int64(len(q.buffer)) + q.readQueue.QueuedSize() +} + +// RecvMaxQueueSize implements Receiver.RecvMaxQueueSize. +func (q *streamQueueReceiver) RecvMaxQueueSize() int64 { + // The RecvMaxQueueSize() is the readQueue's MaxQueueSize() plus the largest + // message we can buffer which is also the largest message we can receive. + return 2 * q.readQueue.MaxQueueSize() +} + +// Recv implements Receiver.Recv. +func (q *streamQueueReceiver) Recv(data [][]byte, wantCreds bool, numRights uintptr, peek bool) (uintptr, uintptr, ControlMessages, tcpip.FullAddress, bool, *tcpip.Error) { + q.mu.Lock() + defer q.mu.Unlock() + + var notify bool + + // If we have no data in the endpoint, we need to get some. + if len(q.buffer) == 0 { + // Load the next message into a buffer, even if we are peeking. Peeking + // won't consume the message, so it will be still available to be read + // the next time Recv() is called. + m, n, err := q.readQueue.Dequeue() + if err != nil { + return 0, 0, ControlMessages{}, tcpip.FullAddress{}, false, err + } + notify = n + msg := m.(*message) + q.buffer = []byte(msg.Data) + q.control = msg.Control + q.addr = msg.Address + } + + var copied uintptr + if peek { + // Don't consume control message if we are peeking. + c := q.control.Clone() + + // Don't consume data since we are peeking. + copied, data, _ = vecCopy(data, q.buffer) + + return copied, copied, c, q.addr, notify, nil + } + + // Consume data and control message since we are not peeking. + copied, data, q.buffer = vecCopy(data, q.buffer) + + // Save the original state of q.control. + c := q.control + + // Remove rights from q.control and leave behind just the creds. + q.control.Rights = nil + if !wantCreds { + c.Credentials = nil + } + + if c.Rights != nil && numRights == 0 { + c.Rights.Release() + c.Rights = nil + } + + haveRights := c.Rights != nil + + // If we have more capacity for data and haven't received any usable + // rights. + // + // Linux never coalesces rights control messages. + for !haveRights && len(data) > 0 { + // Get a message from the readQueue. + m, n, err := q.readQueue.Dequeue() + if err != nil { + // We already got some data, so ignore this error. This will + // manifest as a short read to the user, which is what Linux + // does. + break + } + notify = notify || n + msg := m.(*message) + q.buffer = []byte(msg.Data) + q.control = msg.Control + q.addr = msg.Address + + if wantCreds { + if (q.control.Credentials == nil) != (c.Credentials == nil) { + // One message has credentials, the other does not. + break + } + + if q.control.Credentials != nil && c.Credentials != nil && !q.control.Credentials.Equals(c.Credentials) { + // Both messages have credentials, but they don't match. + break + } + } + + if numRights != 0 && c.Rights != nil && q.control.Rights != nil { + // Both messages have rights. + break + } + + var cpd uintptr + cpd, data, q.buffer = vecCopy(data, q.buffer) + copied += cpd + + if cpd == 0 { + // data was actually full. + break + } + + if q.control.Rights != nil { + // Consume rights. + if numRights == 0 { + q.control.Rights.Release() + } else { + c.Rights = q.control.Rights + haveRights = true + } + q.control.Rights = nil + } + } + return copied, copied, c, q.addr, notify, nil +} + +// A ConnectedEndpoint is an Endpoint that can be used to send Messages. +type ConnectedEndpoint interface { + // Passcred implements Endpoint.Passcred. + Passcred() bool + + // GetLocalAddress implements Endpoint.GetLocalAddress. + GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) + + // Send sends a single message. This method does not block. + // + // notify indicates if SendNotify should be called. + Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (n uintptr, notify bool, err *tcpip.Error) + + // SendNotify notifies the ConnectedEndpoint of a successful Send. This + // must not be called while holding any endpoint locks. + SendNotify() + + // CloseSend prevents the sending of additional Messages. + // + // After CloseSend is call, CloseNotify must also be called. + CloseSend() + + // CloseNotify notifies the ConnectedEndpoint of send being closed. This + // must not be called while holding any endpoint locks. + CloseNotify() + + // Writable returns if messages should be attempted to be sent. This + // includes when write has been shutdown. + Writable() bool + + // EventUpdate lets the ConnectedEndpoint know that event registrations + // have changed. + EventUpdate() + + // SendQueuedSize returns the total amount of data currently queued for + // sending. SendQueuedSize should return -1 if the operation isn't + // supported. + SendQueuedSize() int64 + + // SendMaxQueueSize returns maximum value for SendQueuedSize. + // SendMaxQueueSize should return -1 if the operation isn't supported. + SendMaxQueueSize() int64 + + // Release releases any resources owned by the ConnectedEndpoint. It should + // be called before droping all references to a ConnectedEndpoint. + Release() +} + +type connectedEndpoint struct { + // endpoint represents the subset of the Endpoint functionality needed by + // the connectedEndpoint. It is implemented by both connectionedEndpoint + // and connectionlessEndpoint and allows the use of types which don't + // fully implement Endpoint. + endpoint interface { + // Passcred implements Endpoint.Passcred. + Passcred() bool + + // GetLocalAddress implements Endpoint.GetLocalAddress. + GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) + + // Type implements Endpoint.Type. + Type() SockType + } + + writeQueue *queue.Queue +} + +// Passcred implements ConnectedEndpoint.Passcred. +func (e *connectedEndpoint) Passcred() bool { + return e.endpoint.Passcred() +} + +// GetLocalAddress implements ConnectedEndpoint.GetLocalAddress. +func (e *connectedEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) { + return e.endpoint.GetLocalAddress() +} + +// Send implements ConnectedEndpoint.Send. +func (e *connectedEndpoint) Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (uintptr, bool, *tcpip.Error) { + var l int + for _, d := range data { + l += len(d) + } + // Discard empty stream packets. Since stream sockets don't preserve + // message boundaries, sending zero bytes is a no-op. In Linux, the + // receiver actually uses a zero-length receive as an indication that the + // stream was closed. + if l == 0 && e.endpoint.Type() == SockStream { + controlMessages.Release() + return 0, false, nil + } + v := make([]byte, 0, l) + for _, d := range data { + v = append(v, d...) + } + notify, err := e.writeQueue.Enqueue(&message{Data: buffer.View(v), Control: controlMessages, Address: from}) + return uintptr(l), notify, err +} + +// SendNotify implements ConnectedEndpoint.SendNotify. +func (e *connectedEndpoint) SendNotify() { + e.writeQueue.ReaderQueue.Notify(waiter.EventIn) +} + +// CloseNotify implements ConnectedEndpoint.CloseNotify. +func (e *connectedEndpoint) CloseNotify() { + e.writeQueue.ReaderQueue.Notify(waiter.EventIn) + e.writeQueue.WriterQueue.Notify(waiter.EventOut) +} + +// CloseSend implements ConnectedEndpoint.CloseSend. +func (e *connectedEndpoint) CloseSend() { + e.writeQueue.Close() +} + +// Writable implements ConnectedEndpoint.Writable. +func (e *connectedEndpoint) Writable() bool { + return e.writeQueue.IsWritable() +} + +// EventUpdate implements ConnectedEndpoint.EventUpdate. +func (*connectedEndpoint) EventUpdate() {} + +// SendQueuedSize implements ConnectedEndpoint.SendQueuedSize. +func (e *connectedEndpoint) SendQueuedSize() int64 { + return e.writeQueue.QueuedSize() +} + +// SendMaxQueueSize implements ConnectedEndpoint.SendMaxQueueSize. +func (e *connectedEndpoint) SendMaxQueueSize() int64 { + return e.writeQueue.MaxQueueSize() +} + +// Release implements ConnectedEndpoint.Release. +func (*connectedEndpoint) Release() {} + +// baseEndpoint is an embeddable unix endpoint base used in both the connected and connectionless +// unix domain socket Endpoint implementations. +// +// Not to be used on its own. +type baseEndpoint struct { + *waiter.Queue + + // passcred specifies whether SCM_CREDENTIALS socket control messages are + // enabled on this endpoint. Must be accessed atomically. + passcred int32 + + // Mutex protects the below fields. + sync.Mutex `state:"nosave"` + + // receiver allows Messages to be received. + receiver Receiver + + // connected allows messages to be sent and state information about the + // connected endpoint to be read. + connected ConnectedEndpoint + + // path is not empty if the endpoint has been bound, + // or may be used if the endpoint is connected. + path string +} + +// EventRegister implements waiter.Waitable.EventRegister. +func (e *baseEndpoint) EventRegister(we *waiter.Entry, mask waiter.EventMask) { + e.Lock() + e.Queue.EventRegister(we, mask) + if e.connected != nil { + e.connected.EventUpdate() + } + e.Unlock() +} + +// EventUnregister implements waiter.Waitable.EventUnregister. +func (e *baseEndpoint) EventUnregister(we *waiter.Entry) { + e.Lock() + e.Queue.EventUnregister(we) + if e.connected != nil { + e.connected.EventUpdate() + } + e.Unlock() +} + +// Passcred implements Credentialer.Passcred. +func (e *baseEndpoint) Passcred() bool { + return atomic.LoadInt32(&e.passcred) != 0 +} + +// ConnectedPasscred implements Credentialer.ConnectedPasscred. +func (e *baseEndpoint) ConnectedPasscred() bool { + e.Lock() + defer e.Unlock() + return e.connected != nil && e.connected.Passcred() +} + +func (e *baseEndpoint) setPasscred(pc bool) { + if pc { + atomic.StoreInt32(&e.passcred, 1) + } else { + atomic.StoreInt32(&e.passcred, 0) + } +} + +// Connected implements ConnectingEndpoint.Connected. +func (e *baseEndpoint) Connected() bool { + return e.receiver != nil && e.connected != nil +} + +// RecvMsg reads data and a control message from the endpoint. +func (e *baseEndpoint) RecvMsg(data [][]byte, creds bool, numRights uintptr, peek bool, addr *tcpip.FullAddress) (uintptr, uintptr, ControlMessages, *tcpip.Error) { + e.Lock() + + if e.receiver == nil { + e.Unlock() + return 0, 0, ControlMessages{}, tcpip.ErrNotConnected + } + + recvLen, msgLen, cms, a, notify, err := e.receiver.Recv(data, creds, numRights, peek) + e.Unlock() + if err != nil { + return 0, 0, ControlMessages{}, err + } + + if notify { + e.receiver.RecvNotify() + } + + if addr != nil { + *addr = a + } + return recvLen, msgLen, cms, nil +} + +// SendMsg writes data and a control message to the endpoint's peer. +// This method does not block if the data cannot be written. +func (e *baseEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoint) (uintptr, *tcpip.Error) { + e.Lock() + if !e.Connected() { + e.Unlock() + return 0, tcpip.ErrNotConnected + } + if to != nil { + e.Unlock() + return 0, tcpip.ErrAlreadyConnected + } + + n, notify, err := e.connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)}) + e.Unlock() + if err != nil { + return 0, err + } + + if notify { + e.connected.SendNotify() + } + + return n, nil +} + +// SetSockOpt sets a socket option. Currently not supported. +func (e *baseEndpoint) SetSockOpt(opt interface{}) *tcpip.Error { + switch v := opt.(type) { + case tcpip.PasscredOption: + e.setPasscred(v != 0) + return nil + } + return nil +} + +// GetSockOpt implements tcpip.Endpoint.GetSockOpt. +func (e *baseEndpoint) GetSockOpt(opt interface{}) *tcpip.Error { + switch o := opt.(type) { + case tcpip.ErrorOption: + return nil + case *tcpip.SendQueueSizeOption: + e.Lock() + if !e.Connected() { + e.Unlock() + return tcpip.ErrNotConnected + } + qs := tcpip.SendQueueSizeOption(e.connected.SendQueuedSize()) + e.Unlock() + if qs < 0 { + return tcpip.ErrQueueSizeNotSupported + } + *o = qs + return nil + case *tcpip.ReceiveQueueSizeOption: + e.Lock() + if !e.Connected() { + e.Unlock() + return tcpip.ErrNotConnected + } + qs := tcpip.ReceiveQueueSizeOption(e.receiver.RecvQueuedSize()) + e.Unlock() + if qs < 0 { + return tcpip.ErrQueueSizeNotSupported + } + *o = qs + return nil + case *tcpip.PasscredOption: + if e.Passcred() { + *o = tcpip.PasscredOption(1) + } else { + *o = tcpip.PasscredOption(0) + } + return nil + case *tcpip.SendBufferSizeOption: + e.Lock() + if !e.Connected() { + e.Unlock() + return tcpip.ErrNotConnected + } + qs := tcpip.SendBufferSizeOption(e.connected.SendMaxQueueSize()) + e.Unlock() + if qs < 0 { + return tcpip.ErrQueueSizeNotSupported + } + *o = qs + return nil + case *tcpip.ReceiveBufferSizeOption: + e.Lock() + if e.receiver == nil { + e.Unlock() + return tcpip.ErrNotConnected + } + qs := tcpip.ReceiveBufferSizeOption(e.receiver.RecvMaxQueueSize()) + e.Unlock() + if qs < 0 { + return tcpip.ErrQueueSizeNotSupported + } + *o = qs + return nil + } + return tcpip.ErrUnknownProtocolOption +} + +// Shutdown closes the read and/or write end of the endpoint connection to its +// peer. +func (e *baseEndpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error { + e.Lock() + if !e.Connected() { + e.Unlock() + return tcpip.ErrNotConnected + } + + if flags&tcpip.ShutdownRead != 0 { + e.receiver.CloseRecv() + } + + if flags&tcpip.ShutdownWrite != 0 { + e.connected.CloseSend() + } + + e.Unlock() + + if flags&tcpip.ShutdownRead != 0 { + e.receiver.CloseNotify() + } + + if flags&tcpip.ShutdownWrite != 0 { + e.connected.CloseNotify() + } + + return nil +} + +// GetLocalAddress returns the bound path. +func (e *baseEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) { + e.Lock() + defer e.Unlock() + return tcpip.FullAddress{Addr: tcpip.Address(e.path)}, nil +} + +// GetRemoteAddress returns the local address of the connected endpoint (if +// available). +func (e *baseEndpoint) GetRemoteAddress() (tcpip.FullAddress, *tcpip.Error) { + e.Lock() + c := e.connected + e.Unlock() + if c != nil { + return c.GetLocalAddress() + } + return tcpip.FullAddress{}, tcpip.ErrNotConnected +} + +// Release implements BoundEndpoint.Release. +func (*baseEndpoint) Release() {} |