summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport
diff options
context:
space:
mode:
authorIan Gudger <igudger@google.com>2018-10-17 11:36:32 -0700
committerShentubot <shentubot@google.com>2018-10-17 11:37:51 -0700
commit6cba410df0ea2eabb87bad5074a8a79ed89312b8 (patch)
treef0e67873a5d25b9b241f33b3a128a2a66955afc8 /pkg/tcpip/transport
parent8cbca46b6d99bcf0b2647ffa247b0963f872916b (diff)
Move Unix transport out of netstack
PiperOrigin-RevId: 217557656 Change-Id: I63d27635b1a6c12877279995d2d9847b6a19da9b
Diffstat (limited to 'pkg/tcpip/transport')
-rw-r--r--pkg/tcpip/transport/queue/BUILD15
-rw-r--r--pkg/tcpip/transport/queue/queue.go227
-rw-r--r--pkg/tcpip/transport/unix/BUILD22
-rw-r--r--pkg/tcpip/transport/unix/connectioned.go454
-rw-r--r--pkg/tcpip/transport/unix/connectioned_state.go53
-rw-r--r--pkg/tcpip/transport/unix/connectionless.go192
-rw-r--r--pkg/tcpip/transport/unix/unix.go953
7 files changed, 0 insertions, 1916 deletions
diff --git a/pkg/tcpip/transport/queue/BUILD b/pkg/tcpip/transport/queue/BUILD
deleted file mode 100644
index 6dcec312e..000000000
--- a/pkg/tcpip/transport/queue/BUILD
+++ /dev/null
@@ -1,15 +0,0 @@
-package(licenses = ["notice"]) # Apache 2.0
-
-load("//tools/go_stateify:defs.bzl", "go_library")
-
-go_library(
- name = "queue",
- srcs = ["queue.go"],
- importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/queue",
- visibility = ["//:sandbox"],
- deps = [
- "//pkg/ilist",
- "//pkg/tcpip",
- "//pkg/waiter",
- ],
-)
diff --git a/pkg/tcpip/transport/queue/queue.go b/pkg/tcpip/transport/queue/queue.go
deleted file mode 100644
index b3d2ea68b..000000000
--- a/pkg/tcpip/transport/queue/queue.go
+++ /dev/null
@@ -1,227 +0,0 @@
-// Copyright 2018 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package queue provides the implementation of buffer queue
-// and interface of queue entry with Length method.
-package queue
-
-import (
- "sync"
-
- "gvisor.googlesource.com/gvisor/pkg/ilist"
- "gvisor.googlesource.com/gvisor/pkg/tcpip"
- "gvisor.googlesource.com/gvisor/pkg/waiter"
-)
-
-// Entry implements Linker interface and has additional required methods.
-type Entry interface {
- ilist.Linker
-
- // Length returns the number of bytes stored in the entry.
- Length() int64
-
- // Release releases any resources held by the entry.
- Release()
-
- // Peek returns a copy of the entry. It must be Released separately.
- Peek() Entry
-
- // Truncate reduces the number of bytes stored in the entry to n bytes.
- //
- // Preconditions: n <= Length().
- Truncate(n int64)
-}
-
-// Queue is a buffer queue.
-//
-// +stateify savable
-type Queue struct {
- ReaderQueue *waiter.Queue
- WriterQueue *waiter.Queue
-
- mu sync.Mutex `state:"nosave"`
- closed bool
- used int64
- limit int64
- dataList ilist.List
-}
-
-// New allocates and initializes a new queue.
-func New(ReaderQueue *waiter.Queue, WriterQueue *waiter.Queue, limit int64) *Queue {
- return &Queue{ReaderQueue: ReaderQueue, WriterQueue: WriterQueue, limit: limit}
-}
-
-// Close closes q for reading and writing. It is immediately not writable and
-// will become unreadable when no more data is pending.
-//
-// Both the read and write queues must be notified after closing:
-// q.ReaderQueue.Notify(waiter.EventIn)
-// q.WriterQueue.Notify(waiter.EventOut)
-func (q *Queue) Close() {
- q.mu.Lock()
- q.closed = true
- q.mu.Unlock()
-}
-
-// Reset empties the queue and Releases all of the Entries.
-//
-// Both the read and write queues must be notified after resetting:
-// q.ReaderQueue.Notify(waiter.EventIn)
-// q.WriterQueue.Notify(waiter.EventOut)
-func (q *Queue) Reset() {
- q.mu.Lock()
- for cur := q.dataList.Front(); cur != nil; cur = cur.Next() {
- cur.(Entry).Release()
- }
- q.dataList.Reset()
- q.used = 0
- q.mu.Unlock()
-}
-
-// IsReadable determines if q is currently readable.
-func (q *Queue) IsReadable() bool {
- q.mu.Lock()
- defer q.mu.Unlock()
-
- return q.closed || q.dataList.Front() != nil
-}
-
-// bufWritable returns true if there is space for writing.
-//
-// N.B. Linux only considers a unix socket "writable" if >75% of the buffer is
-// free.
-//
-// See net/unix/af_unix.c:unix_writeable.
-func (q *Queue) bufWritable() bool {
- return 4*q.used < q.limit
-}
-
-// IsWritable determines if q is currently writable.
-func (q *Queue) IsWritable() bool {
- q.mu.Lock()
- defer q.mu.Unlock()
-
- return q.closed || q.bufWritable()
-}
-
-// Enqueue adds an entry to the data queue if room is available.
-//
-// If truncate is true, Enqueue may truncate the message beforing enqueuing it.
-// Otherwise, the entire message must fit. If n < e.Length(), err indicates why.
-//
-// If notify is true, ReaderQueue.Notify must be called:
-// q.ReaderQueue.Notify(waiter.EventIn)
-func (q *Queue) Enqueue(e Entry, truncate bool) (l int64, notify bool, err *tcpip.Error) {
- q.mu.Lock()
-
- if q.closed {
- q.mu.Unlock()
- return 0, false, tcpip.ErrClosedForSend
- }
-
- free := q.limit - q.used
-
- l = e.Length()
-
- if l > free && truncate {
- if free == 0 {
- // Message can't fit right now.
- q.mu.Unlock()
- return 0, false, tcpip.ErrWouldBlock
- }
-
- e.Truncate(free)
- l = e.Length()
- err = tcpip.ErrWouldBlock
- }
-
- if l > q.limit {
- // Message is too big to ever fit.
- q.mu.Unlock()
- return 0, false, tcpip.ErrMessageTooLong
- }
-
- if l > free {
- // Message can't fit right now.
- q.mu.Unlock()
- return 0, false, tcpip.ErrWouldBlock
- }
-
- notify = q.dataList.Front() == nil
- q.used += l
- q.dataList.PushBack(e)
-
- q.mu.Unlock()
-
- return l, notify, err
-}
-
-// Dequeue removes the first entry in the data queue, if one exists.
-//
-// If notify is true, WriterQueue.Notify must be called:
-// q.WriterQueue.Notify(waiter.EventOut)
-func (q *Queue) Dequeue() (e Entry, notify bool, err *tcpip.Error) {
- q.mu.Lock()
-
- if q.dataList.Front() == nil {
- err := tcpip.ErrWouldBlock
- if q.closed {
- err = tcpip.ErrClosedForReceive
- }
- q.mu.Unlock()
-
- return nil, false, err
- }
-
- notify = !q.bufWritable()
-
- e = q.dataList.Front().(Entry)
- q.dataList.Remove(e)
- q.used -= e.Length()
-
- notify = notify && q.bufWritable()
-
- q.mu.Unlock()
-
- return e, notify, nil
-}
-
-// Peek returns the first entry in the data queue, if one exists.
-func (q *Queue) Peek() (Entry, *tcpip.Error) {
- q.mu.Lock()
- defer q.mu.Unlock()
-
- if q.dataList.Front() == nil {
- err := tcpip.ErrWouldBlock
- if q.closed {
- err = tcpip.ErrClosedForReceive
- }
- return nil, err
- }
-
- return q.dataList.Front().(Entry).Peek(), nil
-}
-
-// QueuedSize returns the number of bytes currently in the queue, that is, the
-// number of readable bytes.
-func (q *Queue) QueuedSize() int64 {
- q.mu.Lock()
- defer q.mu.Unlock()
- return q.used
-}
-
-// MaxQueueSize returns the maximum number of bytes storable in the queue.
-func (q *Queue) MaxQueueSize() int64 {
- return q.limit
-}
diff --git a/pkg/tcpip/transport/unix/BUILD b/pkg/tcpip/transport/unix/BUILD
deleted file mode 100644
index dae0bd079..000000000
--- a/pkg/tcpip/transport/unix/BUILD
+++ /dev/null
@@ -1,22 +0,0 @@
-package(licenses = ["notice"]) # Apache 2.0
-
-load("//tools/go_stateify:defs.bzl", "go_library")
-
-go_library(
- name = "unix",
- srcs = [
- "connectioned.go",
- "connectioned_state.go",
- "connectionless.go",
- "unix.go",
- ],
- importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/unix",
- visibility = ["//:sandbox"],
- deps = [
- "//pkg/ilist",
- "//pkg/tcpip",
- "//pkg/tcpip/buffer",
- "//pkg/tcpip/transport/queue",
- "//pkg/waiter",
- ],
-)
diff --git a/pkg/tcpip/transport/unix/connectioned.go b/pkg/tcpip/transport/unix/connectioned.go
deleted file mode 100644
index e319b3bb8..000000000
--- a/pkg/tcpip/transport/unix/connectioned.go
+++ /dev/null
@@ -1,454 +0,0 @@
-// Copyright 2018 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package unix
-
-import (
- "sync"
-
- "gvisor.googlesource.com/gvisor/pkg/tcpip"
- "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/queue"
- "gvisor.googlesource.com/gvisor/pkg/waiter"
-)
-
-// UniqueIDProvider generates a sequence of unique identifiers useful for,
-// among other things, lock ordering.
-type UniqueIDProvider interface {
- // UniqueID returns a new unique identifier.
- UniqueID() uint64
-}
-
-// A ConnectingEndpoint is a connectioned unix endpoint that is attempting to
-// establish a bidirectional connection with a BoundEndpoint.
-type ConnectingEndpoint interface {
- // ID returns the endpoint's globally unique identifier. This identifier
- // must be used to determine locking order if more than one endpoint is
- // to be locked in the same codepath. The endpoint with the smaller
- // identifier must be locked before endpoints with larger identifiers.
- ID() uint64
-
- // Passcred implements socket.Credentialer.Passcred.
- Passcred() bool
-
- // Type returns the socket type, typically either SockStream or
- // SockSeqpacket. The connection attempt must be aborted if this
- // value doesn't match the ConnectableEndpoint's type.
- Type() SockType
-
- // GetLocalAddress returns the bound path.
- GetLocalAddress() (tcpip.FullAddress, *tcpip.Error)
-
- // Locker protects the following methods. While locked, only the holder of
- // the lock can change the return value of the protected methods.
- sync.Locker
-
- // Connected returns true iff the ConnectingEndpoint is in the connected
- // state. ConnectingEndpoints can only be connected to a single endpoint,
- // so the connection attempt must be aborted if this returns true.
- Connected() bool
-
- // Listening returns true iff the ConnectingEndpoint is in the listening
- // state. ConnectingEndpoints cannot make connections while listening, so
- // the connection attempt must be aborted if this returns true.
- Listening() bool
-
- // WaiterQueue returns a pointer to the endpoint's waiter queue.
- WaiterQueue() *waiter.Queue
-}
-
-// connectionedEndpoint is a Unix-domain connected or connectable endpoint and implements
-// ConnectingEndpoint, ConnectableEndpoint and tcpip.Endpoint.
-//
-// connectionedEndpoints must be in connected state in order to transfer data.
-//
-// This implementation includes STREAM and SEQPACKET Unix sockets created with
-// socket(2), accept(2) or socketpair(2) and dgram unix sockets created with
-// socketpair(2). See unix_connectionless.go for the implementation of DGRAM
-// Unix sockets created with socket(2).
-//
-// The state is much simpler than a TCP endpoint, so it is not encoded
-// explicitly. Instead we enforce the following invariants:
-//
-// receiver != nil, connected != nil => connected.
-// path != "" && acceptedChan == nil => bound, not listening.
-// path != "" && acceptedChan != nil => bound and listening.
-//
-// Only one of these will be true at any moment.
-//
-// +stateify savable
-type connectionedEndpoint struct {
- baseEndpoint
-
- // id is the unique endpoint identifier. This is used exclusively for
- // lock ordering within connect.
- id uint64
-
- // idGenerator is used to generate new unique endpoint identifiers.
- idGenerator UniqueIDProvider
-
- // stype is used by connecting sockets to ensure that they are the
- // same type. The value is typically either tcpip.SockSeqpacket or
- // tcpip.SockStream.
- stype SockType
-
- // acceptedChan is per the TCP endpoint implementation. Note that the
- // sockets in this channel are _already in the connected state_, and
- // have another associated connectionedEndpoint.
- //
- // If nil, then no listen call has been made.
- acceptedChan chan *connectionedEndpoint `state:".([]*connectionedEndpoint)"`
-}
-
-// NewConnectioned creates a new unbound connectionedEndpoint.
-func NewConnectioned(stype SockType, uid UniqueIDProvider) Endpoint {
- return &connectionedEndpoint{
- baseEndpoint: baseEndpoint{Queue: &waiter.Queue{}},
- id: uid.UniqueID(),
- idGenerator: uid,
- stype: stype,
- }
-}
-
-// NewPair allocates a new pair of connected unix-domain connectionedEndpoints.
-func NewPair(stype SockType, uid UniqueIDProvider) (Endpoint, Endpoint) {
- a := &connectionedEndpoint{
- baseEndpoint: baseEndpoint{Queue: &waiter.Queue{}},
- id: uid.UniqueID(),
- idGenerator: uid,
- stype: stype,
- }
- b := &connectionedEndpoint{
- baseEndpoint: baseEndpoint{Queue: &waiter.Queue{}},
- id: uid.UniqueID(),
- idGenerator: uid,
- stype: stype,
- }
-
- q1 := queue.New(a.Queue, b.Queue, initialLimit)
- q2 := queue.New(b.Queue, a.Queue, initialLimit)
-
- if stype == SockStream {
- a.receiver = &streamQueueReceiver{queueReceiver: queueReceiver{q1}}
- b.receiver = &streamQueueReceiver{queueReceiver: queueReceiver{q2}}
- } else {
- a.receiver = &queueReceiver{q1}
- b.receiver = &queueReceiver{q2}
- }
-
- a.connected = &connectedEndpoint{
- endpoint: b,
- writeQueue: q2,
- }
- b.connected = &connectedEndpoint{
- endpoint: a,
- writeQueue: q1,
- }
-
- return a, b
-}
-
-// NewExternal creates a new externally backed Endpoint. It behaves like a
-// socketpair.
-func NewExternal(stype SockType, uid UniqueIDProvider, queue *waiter.Queue, receiver Receiver, connected ConnectedEndpoint) Endpoint {
- return &connectionedEndpoint{
- baseEndpoint: baseEndpoint{Queue: queue, receiver: receiver, connected: connected},
- id: uid.UniqueID(),
- idGenerator: uid,
- stype: stype,
- }
-}
-
-// ID implements ConnectingEndpoint.ID.
-func (e *connectionedEndpoint) ID() uint64 {
- return e.id
-}
-
-// Type implements ConnectingEndpoint.Type and Endpoint.Type.
-func (e *connectionedEndpoint) Type() SockType {
- return e.stype
-}
-
-// WaiterQueue implements ConnectingEndpoint.WaiterQueue.
-func (e *connectionedEndpoint) WaiterQueue() *waiter.Queue {
- return e.Queue
-}
-
-// isBound returns true iff the connectionedEndpoint is bound (but not
-// listening).
-func (e *connectionedEndpoint) isBound() bool {
- return e.path != "" && e.acceptedChan == nil
-}
-
-// Listening implements ConnectingEndpoint.Listening.
-func (e *connectionedEndpoint) Listening() bool {
- return e.acceptedChan != nil
-}
-
-// Close puts the connectionedEndpoint in a closed state and frees all
-// resources associated with it.
-//
-// The socket will be a fresh state after a call to close and may be reused.
-// That is, close may be used to "unbind" or "disconnect" the socket in error
-// paths.
-func (e *connectionedEndpoint) Close() {
- e.Lock()
- var c ConnectedEndpoint
- var r Receiver
- switch {
- case e.Connected():
- e.connected.CloseSend()
- e.receiver.CloseRecv()
- c = e.connected
- r = e.receiver
- e.connected = nil
- e.receiver = nil
- case e.isBound():
- e.path = ""
- case e.Listening():
- close(e.acceptedChan)
- for n := range e.acceptedChan {
- n.Close()
- }
- e.acceptedChan = nil
- e.path = ""
- }
- e.Unlock()
- if c != nil {
- c.CloseNotify()
- c.Release()
- }
- if r != nil {
- r.CloseNotify()
- r.Release()
- }
-}
-
-// BidirectionalConnect implements BoundEndpoint.BidirectionalConnect.
-func (e *connectionedEndpoint) BidirectionalConnect(ce ConnectingEndpoint, returnConnect func(Receiver, ConnectedEndpoint)) *tcpip.Error {
- if ce.Type() != e.stype {
- return tcpip.ErrConnectionRefused
- }
-
- // Check if ce is e to avoid a deadlock.
- if ce, ok := ce.(*connectionedEndpoint); ok && ce == e {
- return tcpip.ErrInvalidEndpointState
- }
-
- // Do a dance to safely acquire locks on both endpoints.
- if e.id < ce.ID() {
- e.Lock()
- ce.Lock()
- } else {
- ce.Lock()
- e.Lock()
- }
-
- // Check connecting state.
- if ce.Connected() {
- e.Unlock()
- ce.Unlock()
- return tcpip.ErrAlreadyConnected
- }
- if ce.Listening() {
- e.Unlock()
- ce.Unlock()
- return tcpip.ErrInvalidEndpointState
- }
-
- // Check bound state.
- if !e.Listening() {
- e.Unlock()
- ce.Unlock()
- return tcpip.ErrConnectionRefused
- }
-
- // Create a newly bound connectionedEndpoint.
- ne := &connectionedEndpoint{
- baseEndpoint: baseEndpoint{
- path: e.path,
- Queue: &waiter.Queue{},
- },
- id: e.idGenerator.UniqueID(),
- idGenerator: e.idGenerator,
- stype: e.stype,
- }
- readQueue := queue.New(ce.WaiterQueue(), ne.Queue, initialLimit)
- writeQueue := queue.New(ne.Queue, ce.WaiterQueue(), initialLimit)
- ne.connected = &connectedEndpoint{
- endpoint: ce,
- writeQueue: readQueue,
- }
- if e.stype == SockStream {
- ne.receiver = &streamQueueReceiver{queueReceiver: queueReceiver{readQueue: writeQueue}}
- } else {
- ne.receiver = &queueReceiver{readQueue: writeQueue}
- }
-
- select {
- case e.acceptedChan <- ne:
- // Commit state.
- connected := &connectedEndpoint{
- endpoint: ne,
- writeQueue: writeQueue,
- }
- if e.stype == SockStream {
- returnConnect(&streamQueueReceiver{queueReceiver: queueReceiver{readQueue: readQueue}}, connected)
- } else {
- returnConnect(&queueReceiver{readQueue: readQueue}, connected)
- }
-
- // Notify can deadlock if we are holding these locks.
- e.Unlock()
- ce.Unlock()
-
- // Notify on both ends.
- e.Notify(waiter.EventIn)
- ce.WaiterQueue().Notify(waiter.EventOut)
-
- return nil
- default:
- // Busy; return ECONNREFUSED per spec.
- ne.Close()
- e.Unlock()
- ce.Unlock()
- return tcpip.ErrConnectionRefused
- }
-}
-
-// UnidirectionalConnect implements BoundEndpoint.UnidirectionalConnect.
-func (e *connectionedEndpoint) UnidirectionalConnect() (ConnectedEndpoint, *tcpip.Error) {
- return nil, tcpip.ErrConnectionRefused
-}
-
-// Connect attempts to directly connect to another Endpoint.
-// Implements Endpoint.Connect.
-func (e *connectionedEndpoint) Connect(server BoundEndpoint) *tcpip.Error {
- returnConnect := func(r Receiver, ce ConnectedEndpoint) {
- e.receiver = r
- e.connected = ce
- }
-
- return server.BidirectionalConnect(e, returnConnect)
-}
-
-// Listen starts listening on the connection.
-func (e *connectionedEndpoint) Listen(backlog int) *tcpip.Error {
- e.Lock()
- defer e.Unlock()
- if e.Listening() {
- // Adjust the size of the channel iff we can fix existing
- // pending connections into the new one.
- if len(e.acceptedChan) > backlog {
- return tcpip.ErrInvalidEndpointState
- }
- origChan := e.acceptedChan
- e.acceptedChan = make(chan *connectionedEndpoint, backlog)
- close(origChan)
- for ep := range origChan {
- e.acceptedChan <- ep
- }
- return nil
- }
- if !e.isBound() {
- return tcpip.ErrInvalidEndpointState
- }
-
- // Normal case.
- e.acceptedChan = make(chan *connectionedEndpoint, backlog)
- return nil
-}
-
-// Accept accepts a new connection.
-func (e *connectionedEndpoint) Accept() (Endpoint, *tcpip.Error) {
- e.Lock()
- defer e.Unlock()
-
- if !e.Listening() {
- return nil, tcpip.ErrInvalidEndpointState
- }
-
- select {
- case ne := <-e.acceptedChan:
- return ne, nil
-
- default:
- // Nothing left.
- return nil, tcpip.ErrWouldBlock
- }
-}
-
-// Bind binds the connection.
-//
-// For Unix connectionedEndpoints, this _only sets the address associated with
-// the socket_. Work associated with sockets in the filesystem or finding those
-// sockets must be done by a higher level.
-//
-// Bind will fail only if the socket is connected, bound or the passed address
-// is invalid (the empty string).
-func (e *connectionedEndpoint) Bind(addr tcpip.FullAddress, commit func() *tcpip.Error) *tcpip.Error {
- e.Lock()
- defer e.Unlock()
- if e.isBound() || e.Listening() {
- return tcpip.ErrAlreadyBound
- }
- if addr.Addr == "" {
- // The empty string is not permitted.
- return tcpip.ErrBadLocalAddress
- }
- if commit != nil {
- if err := commit(); err != nil {
- return err
- }
- }
-
- // Save the bound address.
- e.path = string(addr.Addr)
- return nil
-}
-
-// SendMsg writes data and a control message to the endpoint's peer.
-// This method does not block if the data cannot be written.
-func (e *connectionedEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoint) (uintptr, *tcpip.Error) {
- // Stream sockets do not support specifying the endpoint. Seqpacket
- // sockets ignore the passed endpoint.
- if e.stype == SockStream && to != nil {
- return 0, tcpip.ErrNotSupported
- }
- return e.baseEndpoint.SendMsg(data, c, to)
-}
-
-// Readiness returns the current readiness of the connectionedEndpoint. For
-// example, if waiter.EventIn is set, the connectionedEndpoint is immediately
-// readable.
-func (e *connectionedEndpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
- e.Lock()
- defer e.Unlock()
-
- ready := waiter.EventMask(0)
- switch {
- case e.Connected():
- if mask&waiter.EventIn != 0 && e.receiver.Readable() {
- ready |= waiter.EventIn
- }
- if mask&waiter.EventOut != 0 && e.connected.Writable() {
- ready |= waiter.EventOut
- }
- case e.Listening():
- if mask&waiter.EventIn != 0 && len(e.acceptedChan) > 0 {
- ready |= waiter.EventIn
- }
- }
-
- return ready
-}
diff --git a/pkg/tcpip/transport/unix/connectioned_state.go b/pkg/tcpip/transport/unix/connectioned_state.go
deleted file mode 100644
index 39e0ca2d6..000000000
--- a/pkg/tcpip/transport/unix/connectioned_state.go
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright 2018 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package unix
-
-// saveAcceptedChan is invoked by stateify.
-func (e *connectionedEndpoint) saveAcceptedChan() []*connectionedEndpoint {
- // If acceptedChan is nil (i.e. we are not listening) then we will save nil.
- // Otherwise we create a (possibly empty) slice of the values in acceptedChan and
- // save that.
- var acceptedSlice []*connectionedEndpoint
- if e.acceptedChan != nil {
- // Swap out acceptedChan with a new empty channel of the same capacity.
- saveChan := e.acceptedChan
- e.acceptedChan = make(chan *connectionedEndpoint, cap(saveChan))
-
- // Create a new slice with the same len and capacity as the channel.
- acceptedSlice = make([]*connectionedEndpoint, len(saveChan), cap(saveChan))
- // Drain acceptedChan into saveSlice, and fill up the new acceptChan at the
- // same time.
- for i := range acceptedSlice {
- ep := <-saveChan
- acceptedSlice[i] = ep
- e.acceptedChan <- ep
- }
- close(saveChan)
- }
- return acceptedSlice
-}
-
-// loadAcceptedChan is invoked by stateify.
-func (e *connectionedEndpoint) loadAcceptedChan(acceptedSlice []*connectionedEndpoint) {
- // If acceptedSlice is nil, then acceptedChan should also be nil.
- if acceptedSlice != nil {
- // Otherwise, create a new channel with the same capacity as acceptedSlice.
- e.acceptedChan = make(chan *connectionedEndpoint, cap(acceptedSlice))
- // Seed the channel with values from acceptedSlice.
- for _, ep := range acceptedSlice {
- e.acceptedChan <- ep
- }
- }
-}
diff --git a/pkg/tcpip/transport/unix/connectionless.go b/pkg/tcpip/transport/unix/connectionless.go
deleted file mode 100644
index ae93c61d7..000000000
--- a/pkg/tcpip/transport/unix/connectionless.go
+++ /dev/null
@@ -1,192 +0,0 @@
-// Copyright 2018 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package unix
-
-import (
- "gvisor.googlesource.com/gvisor/pkg/tcpip"
- "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/queue"
- "gvisor.googlesource.com/gvisor/pkg/waiter"
-)
-
-// connectionlessEndpoint is a unix endpoint for unix sockets that support operating in
-// a conectionless fashon.
-//
-// Specifically, this means datagram unix sockets not created with
-// socketpair(2).
-//
-// +stateify savable
-type connectionlessEndpoint struct {
- baseEndpoint
-}
-
-// NewConnectionless creates a new unbound dgram endpoint.
-func NewConnectionless() Endpoint {
- ep := &connectionlessEndpoint{baseEndpoint{Queue: &waiter.Queue{}}}
- ep.receiver = &queueReceiver{readQueue: queue.New(&waiter.Queue{}, ep.Queue, initialLimit)}
- return ep
-}
-
-// isBound returns true iff the endpoint is bound.
-func (e *connectionlessEndpoint) isBound() bool {
- return e.path != ""
-}
-
-// Close puts the endpoint in a closed state and frees all resources associated
-// with it.
-//
-// The socket will be a fresh state after a call to close and may be reused.
-// That is, close may be used to "unbind" or "disconnect" the socket in error
-// paths.
-func (e *connectionlessEndpoint) Close() {
- e.Lock()
- var r Receiver
- if e.Connected() {
- e.receiver.CloseRecv()
- r = e.receiver
- e.receiver = nil
-
- e.connected.Release()
- e.connected = nil
- }
- if e.isBound() {
- e.path = ""
- }
- e.Unlock()
- if r != nil {
- r.CloseNotify()
- r.Release()
- }
-}
-
-// BidirectionalConnect implements BoundEndpoint.BidirectionalConnect.
-func (e *connectionlessEndpoint) BidirectionalConnect(ce ConnectingEndpoint, returnConnect func(Receiver, ConnectedEndpoint)) *tcpip.Error {
- return tcpip.ErrConnectionRefused
-}
-
-// UnidirectionalConnect implements BoundEndpoint.UnidirectionalConnect.
-func (e *connectionlessEndpoint) UnidirectionalConnect() (ConnectedEndpoint, *tcpip.Error) {
- e.Lock()
- r := e.receiver
- e.Unlock()
- if r == nil {
- return nil, tcpip.ErrConnectionRefused
- }
- return &connectedEndpoint{
- endpoint: e,
- writeQueue: r.(*queueReceiver).readQueue,
- }, nil
-}
-
-// SendMsg writes data and a control message to the specified endpoint.
-// This method does not block if the data cannot be written.
-func (e *connectionlessEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoint) (uintptr, *tcpip.Error) {
- if to == nil {
- return e.baseEndpoint.SendMsg(data, c, nil)
- }
-
- connected, err := to.UnidirectionalConnect()
- if err != nil {
- return 0, tcpip.ErrInvalidEndpointState
- }
- defer connected.Release()
-
- e.Lock()
- n, notify, err := connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)})
- e.Unlock()
-
- if notify {
- connected.SendNotify()
- }
-
- return n, err
-}
-
-// Type implements Endpoint.Type.
-func (e *connectionlessEndpoint) Type() SockType {
- return SockDgram
-}
-
-// Connect attempts to connect directly to server.
-func (e *connectionlessEndpoint) Connect(server BoundEndpoint) *tcpip.Error {
- connected, err := server.UnidirectionalConnect()
- if err != nil {
- return err
- }
-
- e.Lock()
- e.connected = connected
- e.Unlock()
-
- return nil
-}
-
-// Listen starts listening on the connection.
-func (e *connectionlessEndpoint) Listen(int) *tcpip.Error {
- return tcpip.ErrNotSupported
-}
-
-// Accept accepts a new connection.
-func (e *connectionlessEndpoint) Accept() (Endpoint, *tcpip.Error) {
- return nil, tcpip.ErrNotSupported
-}
-
-// Bind binds the connection.
-//
-// For Unix endpoints, this _only sets the address associated with the socket_.
-// Work associated with sockets in the filesystem or finding those sockets must
-// be done by a higher level.
-//
-// Bind will fail only if the socket is connected, bound or the passed address
-// is invalid (the empty string).
-func (e *connectionlessEndpoint) Bind(addr tcpip.FullAddress, commit func() *tcpip.Error) *tcpip.Error {
- e.Lock()
- defer e.Unlock()
- if e.isBound() {
- return tcpip.ErrAlreadyBound
- }
- if addr.Addr == "" {
- // The empty string is not permitted.
- return tcpip.ErrBadLocalAddress
- }
- if commit != nil {
- if err := commit(); err != nil {
- return err
- }
- }
-
- // Save the bound address.
- e.path = string(addr.Addr)
- return nil
-}
-
-// Readiness returns the current readiness of the endpoint. For example, if
-// waiter.EventIn is set, the endpoint is immediately readable.
-func (e *connectionlessEndpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
- e.Lock()
- defer e.Unlock()
-
- ready := waiter.EventMask(0)
- if mask&waiter.EventIn != 0 && e.receiver.Readable() {
- ready |= waiter.EventIn
- }
-
- if e.Connected() {
- if mask&waiter.EventOut != 0 && e.connected.Writable() {
- ready |= waiter.EventOut
- }
- }
-
- return ready
-}
diff --git a/pkg/tcpip/transport/unix/unix.go b/pkg/tcpip/transport/unix/unix.go
deleted file mode 100644
index 1bca4b0b4..000000000
--- a/pkg/tcpip/transport/unix/unix.go
+++ /dev/null
@@ -1,953 +0,0 @@
-// Copyright 2018 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package unix contains the implementation of Unix endpoints.
-package unix
-
-import (
- "sync"
- "sync/atomic"
-
- "gvisor.googlesource.com/gvisor/pkg/ilist"
- "gvisor.googlesource.com/gvisor/pkg/tcpip"
- "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer"
- "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/queue"
- "gvisor.googlesource.com/gvisor/pkg/waiter"
-)
-
-// initialLimit is the starting limit for the socket buffers.
-const initialLimit = 16 * 1024
-
-// A SockType is a type (as opposed to family) of sockets. These are enumerated
-// in the syscall package as syscall.SOCK_* constants.
-type SockType int
-
-const (
- // SockStream corresponds to syscall.SOCK_STREAM.
- SockStream SockType = 1
- // SockDgram corresponds to syscall.SOCK_DGRAM.
- SockDgram SockType = 2
- // SockRaw corresponds to syscall.SOCK_RAW.
- SockRaw SockType = 3
- // SockSeqpacket corresponds to syscall.SOCK_SEQPACKET.
- SockSeqpacket SockType = 5
-)
-
-// A RightsControlMessage is a control message containing FDs.
-type RightsControlMessage interface {
- // Clone returns a copy of the RightsControlMessage.
- Clone() RightsControlMessage
-
- // Release releases any resources owned by the RightsControlMessage.
- Release()
-}
-
-// A CredentialsControlMessage is a control message containing Unix credentials.
-type CredentialsControlMessage interface {
- // Equals returns true iff the two messages are equal.
- Equals(CredentialsControlMessage) bool
-}
-
-// A ControlMessages represents a collection of socket control messages.
-//
-// +stateify savable
-type ControlMessages struct {
- // Rights is a control message containing FDs.
- Rights RightsControlMessage
-
- // Credentials is a control message containing Unix credentials.
- Credentials CredentialsControlMessage
-}
-
-// Empty returns true iff the ControlMessages does not contain either
-// credentials or rights.
-func (c *ControlMessages) Empty() bool {
- return c.Rights == nil && c.Credentials == nil
-}
-
-// Clone clones both the credentials and the rights.
-func (c *ControlMessages) Clone() ControlMessages {
- cm := ControlMessages{}
- if c.Rights != nil {
- cm.Rights = c.Rights.Clone()
- }
- cm.Credentials = c.Credentials
- return cm
-}
-
-// Release releases both the credentials and the rights.
-func (c *ControlMessages) Release() {
- if c.Rights != nil {
- c.Rights.Release()
- }
- *c = ControlMessages{}
-}
-
-// Endpoint is the interface implemented by Unix transport protocol
-// implementations that expose functionality like sendmsg, recvmsg, connect,
-// etc. to Unix socket implementations.
-type Endpoint interface {
- Credentialer
- waiter.Waitable
-
- // Close puts the endpoint in a closed state and frees all resources
- // associated with it.
- Close()
-
- // RecvMsg reads data and a control message from the endpoint. This method
- // does not block if there is no data pending.
- //
- // creds indicates if credential control messages are requested by the
- // caller. This is useful for determining if control messages can be
- // coalesced. creds is a hint and can be safely ignored by the
- // implementation if no coalescing is possible. It is fine to return
- // credential control messages when none were requested or to not return
- // credential control messages when they were requested.
- //
- // numRights is the number of SCM_RIGHTS FDs requested by the caller. This
- // is useful if one must allocate a buffer to receive a SCM_RIGHTS message
- // or determine if control messages can be coalesced. numRights is a hint
- // and can be safely ignored by the implementation if the number of
- // available SCM_RIGHTS FDs is known and no coalescing is possible. It is
- // fine for the returned number of SCM_RIGHTS FDs to be either higher or
- // lower than the requested number.
- //
- // If peek is true, no data should be consumed from the Endpoint. Any and
- // all data returned from a peek should be available in the next call to
- // RecvMsg.
- //
- // recvLen is the number of bytes copied into data.
- //
- // msgLen is the length of the read message consumed for datagram Endpoints.
- // msgLen is always the same as recvLen for stream Endpoints.
- RecvMsg(data [][]byte, creds bool, numRights uintptr, peek bool, addr *tcpip.FullAddress) (recvLen, msgLen uintptr, cm ControlMessages, err *tcpip.Error)
-
- // SendMsg writes data and a control message to the endpoint's peer.
- // This method does not block if the data cannot be written.
- //
- // SendMsg does not take ownership of any of its arguments on error.
- SendMsg([][]byte, ControlMessages, BoundEndpoint) (uintptr, *tcpip.Error)
-
- // Connect connects this endpoint directly to another.
- //
- // This should be called on the client endpoint, and the (bound)
- // endpoint passed in as a parameter.
- //
- // The error codes are the same as Connect.
- Connect(server BoundEndpoint) *tcpip.Error
-
- // Shutdown closes the read and/or write end of the endpoint connection
- // to its peer.
- Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error
-
- // Listen puts the endpoint in "listen" mode, which allows it to accept
- // new connections.
- Listen(backlog int) *tcpip.Error
-
- // Accept returns a new endpoint if a peer has established a connection
- // to an endpoint previously set to listen mode. This method does not
- // block if no new connections are available.
- //
- // The returned Queue is the wait queue for the newly created endpoint.
- Accept() (Endpoint, *tcpip.Error)
-
- // Bind binds the endpoint to a specific local address and port.
- // Specifying a NIC is optional.
- //
- // An optional commit function will be executed atomically with respect
- // to binding the endpoint. If this returns an error, the bind will not
- // occur and the error will be propagated back to the caller.
- Bind(address tcpip.FullAddress, commit func() *tcpip.Error) *tcpip.Error
-
- // Type return the socket type, typically either SockStream, SockDgram
- // or SockSeqpacket.
- Type() SockType
-
- // GetLocalAddress returns the address to which the endpoint is bound.
- GetLocalAddress() (tcpip.FullAddress, *tcpip.Error)
-
- // GetRemoteAddress returns the address to which the endpoint is
- // connected.
- GetRemoteAddress() (tcpip.FullAddress, *tcpip.Error)
-
- // SetSockOpt sets a socket option. opt should be one of the tcpip.*Option
- // types.
- SetSockOpt(opt interface{}) *tcpip.Error
-
- // GetSockOpt gets a socket option. opt should be a pointer to one of the
- // tcpip.*Option types.
- GetSockOpt(opt interface{}) *tcpip.Error
-}
-
-// A Credentialer is a socket or endpoint that supports the SO_PASSCRED socket
-// option.
-type Credentialer interface {
- // Passcred returns whether or not the SO_PASSCRED socket option is
- // enabled on this end.
- Passcred() bool
-
- // ConnectedPasscred returns whether or not the SO_PASSCRED socket option
- // is enabled on the connected end.
- ConnectedPasscred() bool
-}
-
-// A BoundEndpoint is a unix endpoint that can be connected to.
-type BoundEndpoint interface {
- // BidirectionalConnect establishes a bi-directional connection between two
- // unix endpoints in an all-or-nothing manner. If an error occurs during
- // connecting, the state of neither endpoint should be modified.
- //
- // In order for an endpoint to establish such a bidirectional connection
- // with a BoundEndpoint, the endpoint calls the BidirectionalConnect method
- // on the BoundEndpoint and sends a representation of itself (the
- // ConnectingEndpoint) and a callback (returnConnect) to receive the
- // connection information (Receiver and ConnectedEndpoint) upon a
- // successful connect. The callback should only be called on a successful
- // connect.
- //
- // For a connection attempt to be successful, the ConnectingEndpoint must
- // be unconnected and not listening and the BoundEndpoint whose
- // BidirectionalConnect method is being called must be listening.
- //
- // This method will return tcpip.ErrConnectionRefused on endpoints with a
- // type that isn't SockStream or SockSeqpacket.
- BidirectionalConnect(ep ConnectingEndpoint, returnConnect func(Receiver, ConnectedEndpoint)) *tcpip.Error
-
- // UnidirectionalConnect establishes a write-only connection to a unix
- // endpoint.
- //
- // An endpoint which calls UnidirectionalConnect and supports it itself must
- // not hold its own lock when calling UnidirectionalConnect.
- //
- // This method will return tcpip.ErrConnectionRefused on a non-SockDgram
- // endpoint.
- UnidirectionalConnect() (ConnectedEndpoint, *tcpip.Error)
-
- // Release releases any resources held by the BoundEndpoint. It must be
- // called before dropping all references to a BoundEndpoint returned by a
- // function.
- Release()
-}
-
-// message represents a message passed over a Unix domain socket.
-//
-// +stateify savable
-type message struct {
- ilist.Entry
-
- // Data is the Message payload.
- Data buffer.View
-
- // Control is auxiliary control message data that goes along with the
- // data.
- Control ControlMessages
-
- // Address is the bound address of the endpoint that sent the message.
- //
- // If the endpoint that sent the message is not bound, the Address is
- // the empty string.
- Address tcpip.FullAddress
-}
-
-// Length returns number of bytes stored in the message.
-func (m *message) Length() int64 {
- return int64(len(m.Data))
-}
-
-// Release releases any resources held by the message.
-func (m *message) Release() {
- m.Control.Release()
-}
-
-// Peek returns a copy of the message.
-func (m *message) Peek() queue.Entry {
- return &message{Data: m.Data, Control: m.Control.Clone(), Address: m.Address}
-}
-
-// Truncate reduces the length of the message payload to n bytes.
-//
-// Preconditions: n <= m.Length().
-func (m *message) Truncate(n int64) {
- m.Data.CapLength(int(n))
-}
-
-// A Receiver can be used to receive Messages.
-type Receiver interface {
- // Recv receives a single message. This method does not block.
- //
- // See Endpoint.RecvMsg for documentation on shared arguments.
- //
- // notify indicates if RecvNotify should be called.
- Recv(data [][]byte, creds bool, numRights uintptr, peek bool) (recvLen, msgLen uintptr, cm ControlMessages, source tcpip.FullAddress, notify bool, err *tcpip.Error)
-
- // RecvNotify notifies the Receiver of a successful Recv. This must not be
- // called while holding any endpoint locks.
- RecvNotify()
-
- // CloseRecv prevents the receiving of additional Messages.
- //
- // After CloseRecv is called, CloseNotify must also be called.
- CloseRecv()
-
- // CloseNotify notifies the Receiver of recv being closed. This must not be
- // called while holding any endpoint locks.
- CloseNotify()
-
- // Readable returns if messages should be attempted to be received. This
- // includes when read has been shutdown.
- Readable() bool
-
- // RecvQueuedSize returns the total amount of data currently receivable.
- // RecvQueuedSize should return -1 if the operation isn't supported.
- RecvQueuedSize() int64
-
- // RecvMaxQueueSize returns maximum value for RecvQueuedSize.
- // RecvMaxQueueSize should return -1 if the operation isn't supported.
- RecvMaxQueueSize() int64
-
- // Release releases any resources owned by the Receiver. It should be
- // called before droping all references to a Receiver.
- Release()
-}
-
-// queueReceiver implements Receiver for datagram sockets.
-//
-// +stateify savable
-type queueReceiver struct {
- readQueue *queue.Queue
-}
-
-// Recv implements Receiver.Recv.
-func (q *queueReceiver) Recv(data [][]byte, creds bool, numRights uintptr, peek bool) (uintptr, uintptr, ControlMessages, tcpip.FullAddress, bool, *tcpip.Error) {
- var m queue.Entry
- var notify bool
- var err *tcpip.Error
- if peek {
- m, err = q.readQueue.Peek()
- } else {
- m, notify, err = q.readQueue.Dequeue()
- }
- if err != nil {
- return 0, 0, ControlMessages{}, tcpip.FullAddress{}, false, err
- }
- msg := m.(*message)
- src := []byte(msg.Data)
- var copied uintptr
- for i := 0; i < len(data) && len(src) > 0; i++ {
- n := copy(data[i], src)
- copied += uintptr(n)
- src = src[n:]
- }
- return copied, uintptr(len(msg.Data)), msg.Control, msg.Address, notify, nil
-}
-
-// RecvNotify implements Receiver.RecvNotify.
-func (q *queueReceiver) RecvNotify() {
- q.readQueue.WriterQueue.Notify(waiter.EventOut)
-}
-
-// CloseNotify implements Receiver.CloseNotify.
-func (q *queueReceiver) CloseNotify() {
- q.readQueue.ReaderQueue.Notify(waiter.EventIn)
- q.readQueue.WriterQueue.Notify(waiter.EventOut)
-}
-
-// CloseRecv implements Receiver.CloseRecv.
-func (q *queueReceiver) CloseRecv() {
- q.readQueue.Close()
-}
-
-// Readable implements Receiver.Readable.
-func (q *queueReceiver) Readable() bool {
- return q.readQueue.IsReadable()
-}
-
-// RecvQueuedSize implements Receiver.RecvQueuedSize.
-func (q *queueReceiver) RecvQueuedSize() int64 {
- return q.readQueue.QueuedSize()
-}
-
-// RecvMaxQueueSize implements Receiver.RecvMaxQueueSize.
-func (q *queueReceiver) RecvMaxQueueSize() int64 {
- return q.readQueue.MaxQueueSize()
-}
-
-// Release implements Receiver.Release.
-func (*queueReceiver) Release() {}
-
-// streamQueueReceiver implements Receiver for stream sockets.
-//
-// +stateify savable
-type streamQueueReceiver struct {
- queueReceiver
-
- mu sync.Mutex `state:"nosave"`
- buffer []byte
- control ControlMessages
- addr tcpip.FullAddress
-}
-
-func vecCopy(data [][]byte, buf []byte) (uintptr, [][]byte, []byte) {
- var copied uintptr
- for len(data) > 0 && len(buf) > 0 {
- n := copy(data[0], buf)
- copied += uintptr(n)
- buf = buf[n:]
- data[0] = data[0][n:]
- if len(data[0]) == 0 {
- data = data[1:]
- }
- }
- return copied, data, buf
-}
-
-// Readable implements Receiver.Readable.
-func (q *streamQueueReceiver) Readable() bool {
- q.mu.Lock()
- bl := len(q.buffer)
- r := q.readQueue.IsReadable()
- q.mu.Unlock()
- // We're readable if we have data in our buffer or if the queue receiver is
- // readable.
- return bl > 0 || r
-}
-
-// RecvQueuedSize implements Receiver.RecvQueuedSize.
-func (q *streamQueueReceiver) RecvQueuedSize() int64 {
- q.mu.Lock()
- bl := len(q.buffer)
- qs := q.readQueue.QueuedSize()
- q.mu.Unlock()
- return int64(bl) + qs
-}
-
-// RecvMaxQueueSize implements Receiver.RecvMaxQueueSize.
-func (q *streamQueueReceiver) RecvMaxQueueSize() int64 {
- // The RecvMaxQueueSize() is the readQueue's MaxQueueSize() plus the largest
- // message we can buffer which is also the largest message we can receive.
- return 2 * q.readQueue.MaxQueueSize()
-}
-
-// Recv implements Receiver.Recv.
-func (q *streamQueueReceiver) Recv(data [][]byte, wantCreds bool, numRights uintptr, peek bool) (uintptr, uintptr, ControlMessages, tcpip.FullAddress, bool, *tcpip.Error) {
- q.mu.Lock()
- defer q.mu.Unlock()
-
- var notify bool
-
- // If we have no data in the endpoint, we need to get some.
- if len(q.buffer) == 0 {
- // Load the next message into a buffer, even if we are peeking. Peeking
- // won't consume the message, so it will be still available to be read
- // the next time Recv() is called.
- m, n, err := q.readQueue.Dequeue()
- if err != nil {
- return 0, 0, ControlMessages{}, tcpip.FullAddress{}, false, err
- }
- notify = n
- msg := m.(*message)
- q.buffer = []byte(msg.Data)
- q.control = msg.Control
- q.addr = msg.Address
- }
-
- var copied uintptr
- if peek {
- // Don't consume control message if we are peeking.
- c := q.control.Clone()
-
- // Don't consume data since we are peeking.
- copied, data, _ = vecCopy(data, q.buffer)
-
- return copied, copied, c, q.addr, notify, nil
- }
-
- // Consume data and control message since we are not peeking.
- copied, data, q.buffer = vecCopy(data, q.buffer)
-
- // Save the original state of q.control.
- c := q.control
-
- // Remove rights from q.control and leave behind just the creds.
- q.control.Rights = nil
- if !wantCreds {
- c.Credentials = nil
- }
-
- if c.Rights != nil && numRights == 0 {
- c.Rights.Release()
- c.Rights = nil
- }
-
- haveRights := c.Rights != nil
-
- // If we have more capacity for data and haven't received any usable
- // rights.
- //
- // Linux never coalesces rights control messages.
- for !haveRights && len(data) > 0 {
- // Get a message from the readQueue.
- m, n, err := q.readQueue.Dequeue()
- if err != nil {
- // We already got some data, so ignore this error. This will
- // manifest as a short read to the user, which is what Linux
- // does.
- break
- }
- notify = notify || n
- msg := m.(*message)
- q.buffer = []byte(msg.Data)
- q.control = msg.Control
- q.addr = msg.Address
-
- if wantCreds {
- if (q.control.Credentials == nil) != (c.Credentials == nil) {
- // One message has credentials, the other does not.
- break
- }
-
- if q.control.Credentials != nil && c.Credentials != nil && !q.control.Credentials.Equals(c.Credentials) {
- // Both messages have credentials, but they don't match.
- break
- }
- }
-
- if numRights != 0 && c.Rights != nil && q.control.Rights != nil {
- // Both messages have rights.
- break
- }
-
- var cpd uintptr
- cpd, data, q.buffer = vecCopy(data, q.buffer)
- copied += cpd
-
- if cpd == 0 {
- // data was actually full.
- break
- }
-
- if q.control.Rights != nil {
- // Consume rights.
- if numRights == 0 {
- q.control.Rights.Release()
- } else {
- c.Rights = q.control.Rights
- haveRights = true
- }
- q.control.Rights = nil
- }
- }
- return copied, copied, c, q.addr, notify, nil
-}
-
-// A ConnectedEndpoint is an Endpoint that can be used to send Messages.
-type ConnectedEndpoint interface {
- // Passcred implements Endpoint.Passcred.
- Passcred() bool
-
- // GetLocalAddress implements Endpoint.GetLocalAddress.
- GetLocalAddress() (tcpip.FullAddress, *tcpip.Error)
-
- // Send sends a single message. This method does not block.
- //
- // notify indicates if SendNotify should be called.
- //
- // tcpip.ErrWouldBlock can be returned along with a partial write if
- // the caller should block to send the rest of the data.
- Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (n uintptr, notify bool, err *tcpip.Error)
-
- // SendNotify notifies the ConnectedEndpoint of a successful Send. This
- // must not be called while holding any endpoint locks.
- SendNotify()
-
- // CloseSend prevents the sending of additional Messages.
- //
- // After CloseSend is call, CloseNotify must also be called.
- CloseSend()
-
- // CloseNotify notifies the ConnectedEndpoint of send being closed. This
- // must not be called while holding any endpoint locks.
- CloseNotify()
-
- // Writable returns if messages should be attempted to be sent. This
- // includes when write has been shutdown.
- Writable() bool
-
- // EventUpdate lets the ConnectedEndpoint know that event registrations
- // have changed.
- EventUpdate()
-
- // SendQueuedSize returns the total amount of data currently queued for
- // sending. SendQueuedSize should return -1 if the operation isn't
- // supported.
- SendQueuedSize() int64
-
- // SendMaxQueueSize returns maximum value for SendQueuedSize.
- // SendMaxQueueSize should return -1 if the operation isn't supported.
- SendMaxQueueSize() int64
-
- // Release releases any resources owned by the ConnectedEndpoint. It should
- // be called before droping all references to a ConnectedEndpoint.
- Release()
-}
-
-// +stateify savable
-type connectedEndpoint struct {
- // endpoint represents the subset of the Endpoint functionality needed by
- // the connectedEndpoint. It is implemented by both connectionedEndpoint
- // and connectionlessEndpoint and allows the use of types which don't
- // fully implement Endpoint.
- endpoint interface {
- // Passcred implements Endpoint.Passcred.
- Passcred() bool
-
- // GetLocalAddress implements Endpoint.GetLocalAddress.
- GetLocalAddress() (tcpip.FullAddress, *tcpip.Error)
-
- // Type implements Endpoint.Type.
- Type() SockType
- }
-
- writeQueue *queue.Queue
-}
-
-// Passcred implements ConnectedEndpoint.Passcred.
-func (e *connectedEndpoint) Passcred() bool {
- return e.endpoint.Passcred()
-}
-
-// GetLocalAddress implements ConnectedEndpoint.GetLocalAddress.
-func (e *connectedEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) {
- return e.endpoint.GetLocalAddress()
-}
-
-// Send implements ConnectedEndpoint.Send.
-func (e *connectedEndpoint) Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (uintptr, bool, *tcpip.Error) {
- var l int64
- for _, d := range data {
- l += int64(len(d))
- }
-
- truncate := false
- if e.endpoint.Type() == SockStream {
- // Since stream sockets don't preserve message boundaries, we
- // can write only as much of the message as fits in the queue.
- truncate = true
-
- // Discard empty stream packets. Since stream sockets don't
- // preserve message boundaries, sending zero bytes is a no-op.
- // In Linux, the receiver actually uses a zero-length receive
- // as an indication that the stream was closed.
- if l == 0 {
- controlMessages.Release()
- return 0, false, nil
- }
- }
-
- v := make([]byte, 0, l)
- for _, d := range data {
- v = append(v, d...)
- }
-
- l, notify, err := e.writeQueue.Enqueue(&message{Data: buffer.View(v), Control: controlMessages, Address: from}, truncate)
- return uintptr(l), notify, err
-}
-
-// SendNotify implements ConnectedEndpoint.SendNotify.
-func (e *connectedEndpoint) SendNotify() {
- e.writeQueue.ReaderQueue.Notify(waiter.EventIn)
-}
-
-// CloseNotify implements ConnectedEndpoint.CloseNotify.
-func (e *connectedEndpoint) CloseNotify() {
- e.writeQueue.ReaderQueue.Notify(waiter.EventIn)
- e.writeQueue.WriterQueue.Notify(waiter.EventOut)
-}
-
-// CloseSend implements ConnectedEndpoint.CloseSend.
-func (e *connectedEndpoint) CloseSend() {
- e.writeQueue.Close()
-}
-
-// Writable implements ConnectedEndpoint.Writable.
-func (e *connectedEndpoint) Writable() bool {
- return e.writeQueue.IsWritable()
-}
-
-// EventUpdate implements ConnectedEndpoint.EventUpdate.
-func (*connectedEndpoint) EventUpdate() {}
-
-// SendQueuedSize implements ConnectedEndpoint.SendQueuedSize.
-func (e *connectedEndpoint) SendQueuedSize() int64 {
- return e.writeQueue.QueuedSize()
-}
-
-// SendMaxQueueSize implements ConnectedEndpoint.SendMaxQueueSize.
-func (e *connectedEndpoint) SendMaxQueueSize() int64 {
- return e.writeQueue.MaxQueueSize()
-}
-
-// Release implements ConnectedEndpoint.Release.
-func (*connectedEndpoint) Release() {}
-
-// baseEndpoint is an embeddable unix endpoint base used in both the connected and connectionless
-// unix domain socket Endpoint implementations.
-//
-// Not to be used on its own.
-//
-// +stateify savable
-type baseEndpoint struct {
- *waiter.Queue
-
- // passcred specifies whether SCM_CREDENTIALS socket control messages are
- // enabled on this endpoint. Must be accessed atomically.
- passcred int32
-
- // Mutex protects the below fields.
- sync.Mutex `state:"nosave"`
-
- // receiver allows Messages to be received.
- receiver Receiver
-
- // connected allows messages to be sent and state information about the
- // connected endpoint to be read.
- connected ConnectedEndpoint
-
- // path is not empty if the endpoint has been bound,
- // or may be used if the endpoint is connected.
- path string
-}
-
-// EventRegister implements waiter.Waitable.EventRegister.
-func (e *baseEndpoint) EventRegister(we *waiter.Entry, mask waiter.EventMask) {
- e.Queue.EventRegister(we, mask)
- e.Lock()
- if e.connected != nil {
- e.connected.EventUpdate()
- }
- e.Unlock()
-}
-
-// EventUnregister implements waiter.Waitable.EventUnregister.
-func (e *baseEndpoint) EventUnregister(we *waiter.Entry) {
- e.Queue.EventUnregister(we)
- e.Lock()
- if e.connected != nil {
- e.connected.EventUpdate()
- }
- e.Unlock()
-}
-
-// Passcred implements Credentialer.Passcred.
-func (e *baseEndpoint) Passcred() bool {
- return atomic.LoadInt32(&e.passcred) != 0
-}
-
-// ConnectedPasscred implements Credentialer.ConnectedPasscred.
-func (e *baseEndpoint) ConnectedPasscred() bool {
- e.Lock()
- defer e.Unlock()
- return e.connected != nil && e.connected.Passcred()
-}
-
-func (e *baseEndpoint) setPasscred(pc bool) {
- if pc {
- atomic.StoreInt32(&e.passcred, 1)
- } else {
- atomic.StoreInt32(&e.passcred, 0)
- }
-}
-
-// Connected implements ConnectingEndpoint.Connected.
-func (e *baseEndpoint) Connected() bool {
- return e.receiver != nil && e.connected != nil
-}
-
-// RecvMsg reads data and a control message from the endpoint.
-func (e *baseEndpoint) RecvMsg(data [][]byte, creds bool, numRights uintptr, peek bool, addr *tcpip.FullAddress) (uintptr, uintptr, ControlMessages, *tcpip.Error) {
- e.Lock()
-
- if e.receiver == nil {
- e.Unlock()
- return 0, 0, ControlMessages{}, tcpip.ErrNotConnected
- }
-
- recvLen, msgLen, cms, a, notify, err := e.receiver.Recv(data, creds, numRights, peek)
- e.Unlock()
- if err != nil {
- return 0, 0, ControlMessages{}, err
- }
-
- if notify {
- e.receiver.RecvNotify()
- }
-
- if addr != nil {
- *addr = a
- }
- return recvLen, msgLen, cms, nil
-}
-
-// SendMsg writes data and a control message to the endpoint's peer.
-// This method does not block if the data cannot be written.
-func (e *baseEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoint) (uintptr, *tcpip.Error) {
- e.Lock()
- if !e.Connected() {
- e.Unlock()
- return 0, tcpip.ErrNotConnected
- }
- if to != nil {
- e.Unlock()
- return 0, tcpip.ErrAlreadyConnected
- }
-
- n, notify, err := e.connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)})
- e.Unlock()
-
- if notify {
- e.connected.SendNotify()
- }
-
- return n, err
-}
-
-// SetSockOpt sets a socket option. Currently not supported.
-func (e *baseEndpoint) SetSockOpt(opt interface{}) *tcpip.Error {
- switch v := opt.(type) {
- case tcpip.PasscredOption:
- e.setPasscred(v != 0)
- return nil
- }
- return nil
-}
-
-// GetSockOpt implements tcpip.Endpoint.GetSockOpt.
-func (e *baseEndpoint) GetSockOpt(opt interface{}) *tcpip.Error {
- switch o := opt.(type) {
- case tcpip.ErrorOption:
- return nil
- case *tcpip.SendQueueSizeOption:
- e.Lock()
- if !e.Connected() {
- e.Unlock()
- return tcpip.ErrNotConnected
- }
- qs := tcpip.SendQueueSizeOption(e.connected.SendQueuedSize())
- e.Unlock()
- if qs < 0 {
- return tcpip.ErrQueueSizeNotSupported
- }
- *o = qs
- return nil
- case *tcpip.ReceiveQueueSizeOption:
- e.Lock()
- if !e.Connected() {
- e.Unlock()
- return tcpip.ErrNotConnected
- }
- qs := tcpip.ReceiveQueueSizeOption(e.receiver.RecvQueuedSize())
- e.Unlock()
- if qs < 0 {
- return tcpip.ErrQueueSizeNotSupported
- }
- *o = qs
- return nil
- case *tcpip.PasscredOption:
- if e.Passcred() {
- *o = tcpip.PasscredOption(1)
- } else {
- *o = tcpip.PasscredOption(0)
- }
- return nil
- case *tcpip.SendBufferSizeOption:
- e.Lock()
- if !e.Connected() {
- e.Unlock()
- return tcpip.ErrNotConnected
- }
- qs := tcpip.SendBufferSizeOption(e.connected.SendMaxQueueSize())
- e.Unlock()
- if qs < 0 {
- return tcpip.ErrQueueSizeNotSupported
- }
- *o = qs
- return nil
- case *tcpip.ReceiveBufferSizeOption:
- e.Lock()
- if e.receiver == nil {
- e.Unlock()
- return tcpip.ErrNotConnected
- }
- qs := tcpip.ReceiveBufferSizeOption(e.receiver.RecvMaxQueueSize())
- e.Unlock()
- if qs < 0 {
- return tcpip.ErrQueueSizeNotSupported
- }
- *o = qs
- return nil
- }
- return tcpip.ErrUnknownProtocolOption
-}
-
-// Shutdown closes the read and/or write end of the endpoint connection to its
-// peer.
-func (e *baseEndpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error {
- e.Lock()
- if !e.Connected() {
- e.Unlock()
- return tcpip.ErrNotConnected
- }
-
- if flags&tcpip.ShutdownRead != 0 {
- e.receiver.CloseRecv()
- }
-
- if flags&tcpip.ShutdownWrite != 0 {
- e.connected.CloseSend()
- }
-
- e.Unlock()
-
- if flags&tcpip.ShutdownRead != 0 {
- e.receiver.CloseNotify()
- }
-
- if flags&tcpip.ShutdownWrite != 0 {
- e.connected.CloseNotify()
- }
-
- return nil
-}
-
-// GetLocalAddress returns the bound path.
-func (e *baseEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) {
- e.Lock()
- defer e.Unlock()
- return tcpip.FullAddress{Addr: tcpip.Address(e.path)}, nil
-}
-
-// GetRemoteAddress returns the local address of the connected endpoint (if
-// available).
-func (e *baseEndpoint) GetRemoteAddress() (tcpip.FullAddress, *tcpip.Error) {
- e.Lock()
- c := e.connected
- e.Unlock()
- if c != nil {
- return c.GetLocalAddress()
- }
- return tcpip.FullAddress{}, tcpip.ErrNotConnected
-}
-
-// Release implements BoundEndpoint.Release.
-func (*baseEndpoint) Release() {}