diff options
Diffstat (limited to 'pkg/sentry/socket/unix/transport')
-rw-r--r-- | pkg/sentry/socket/unix/transport/connectioned.go | 30 | ||||
-rw-r--r-- | pkg/sentry/socket/unix/transport/connectioned_state.go | 5 | ||||
-rw-r--r-- | pkg/sentry/socket/unix/transport/connectionless.go | 13 | ||||
-rw-r--r-- | pkg/sentry/socket/unix/transport/connectionless_state.go | 20 | ||||
-rw-r--r-- | pkg/sentry/socket/unix/transport/queue.go | 9 | ||||
-rw-r--r-- | pkg/sentry/socket/unix/transport/transport_state_autogen.go | 6 | ||||
-rw-r--r-- | pkg/sentry/socket/unix/transport/unix.go | 77 |
7 files changed, 124 insertions, 36 deletions
diff --git a/pkg/sentry/socket/unix/transport/connectioned.go b/pkg/sentry/socket/unix/transport/connectioned.go index fc5b823b0..809c95429 100644 --- a/pkg/sentry/socket/unix/transport/connectioned.go +++ b/pkg/sentry/socket/unix/transport/connectioned.go @@ -128,7 +128,9 @@ func newConnectioned(ctx context.Context, stype linux.SockType, uid UniqueIDProv idGenerator: uid, stype: stype, } - ep.ops.InitHandler(ep, nil, nil) + + ep.ops.SetSendBufferSize(defaultBufferSize, false /* notify */) + ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits) return ep } @@ -137,9 +139,9 @@ func NewPair(ctx context.Context, stype linux.SockType, uid UniqueIDProvider) (E a := newConnectioned(ctx, stype, uid) b := newConnectioned(ctx, stype, uid) - q1 := &queue{ReaderQueue: a.Queue, WriterQueue: b.Queue, limit: initialLimit} + q1 := &queue{ReaderQueue: a.Queue, WriterQueue: b.Queue, limit: defaultBufferSize} q1.InitRefs() - q2 := &queue{ReaderQueue: b.Queue, WriterQueue: a.Queue, limit: initialLimit} + q2 := &queue{ReaderQueue: b.Queue, WriterQueue: a.Queue, limit: defaultBufferSize} q2.InitRefs() if stype == linux.SOCK_STREAM { @@ -173,7 +175,8 @@ func NewExternal(ctx context.Context, stype linux.SockType, uid UniqueIDProvider idGenerator: uid, stype: stype, } - ep.ops.InitHandler(ep, nil, nil) + ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits) + ep.ops.SetSendBufferSize(connected.SendMaxQueueSize(), false /* notify */) return ep } @@ -296,16 +299,18 @@ func (e *connectionedEndpoint) BidirectionalConnect(ctx context.Context, ce Conn idGenerator: e.idGenerator, stype: e.stype, } - ne.ops.InitHandler(ne, nil, nil) + ne.ops.InitHandler(ne, &stackHandler{}, getSendBufferLimits) + ne.ops.SetSendBufferSize(defaultBufferSize, false /* notify */) - readQueue := &queue{ReaderQueue: ce.WaiterQueue(), WriterQueue: ne.Queue, limit: initialLimit} + readQueue := &queue{ReaderQueue: ce.WaiterQueue(), WriterQueue: ne.Queue, limit: defaultBufferSize} readQueue.InitRefs() ne.connected = &connectedEndpoint{ endpoint: ce, writeQueue: readQueue, } - writeQueue := &queue{ReaderQueue: ne.Queue, WriterQueue: ce.WaiterQueue(), limit: initialLimit} + // Make sure the accepted endpoint inherits this listening socket's SO_SNDBUF. + writeQueue := &queue{ReaderQueue: ne.Queue, WriterQueue: ce.WaiterQueue(), limit: e.ops.GetSendBufferSize()} writeQueue.InitRefs() if e.stype == linux.SOCK_STREAM { ne.receiver = &streamQueueReceiver{queueReceiver: queueReceiver{readQueue: writeQueue}} @@ -357,6 +362,9 @@ func (e *connectionedEndpoint) Connect(ctx context.Context, server BoundEndpoint returnConnect := func(r Receiver, ce ConnectedEndpoint) { e.receiver = r e.connected = ce + // Make sure the newly created connected endpoint's write queue is updated + // to reflect this endpoint's send buffer size. + e.connected.SetSendBufferSize(e.ops.GetSendBufferSize()) } return server.BidirectionalConnect(ctx, e, returnConnect) @@ -495,3 +503,11 @@ func (e *connectionedEndpoint) State() uint32 { } return linux.SS_UNCONNECTED } + +// OnSetSendBufferSize implements tcpip.SocketOptionsHandler.OnSetSendBufferSize. +func (e *connectionedEndpoint) OnSetSendBufferSize(v int64) (newSz int64) { + if e.Connected() { + return e.baseEndpoint.connected.SetSendBufferSize(v) + } + return v +} diff --git a/pkg/sentry/socket/unix/transport/connectioned_state.go b/pkg/sentry/socket/unix/transport/connectioned_state.go index 7e02a5db8..590b0bd01 100644 --- a/pkg/sentry/socket/unix/transport/connectioned_state.go +++ b/pkg/sentry/socket/unix/transport/connectioned_state.go @@ -51,3 +51,8 @@ func (e *connectionedEndpoint) loadAcceptedChan(acceptedSlice []*connectionedEnd } } } + +// afterLoad is invoked by stateify. +func (e *connectionedEndpoint) afterLoad() { + e.ops.InitHandler(e, &stackHandler{}, getSendBufferLimits) +} diff --git a/pkg/sentry/socket/unix/transport/connectionless.go b/pkg/sentry/socket/unix/transport/connectionless.go index 20fa8b874..0be78480c 100644 --- a/pkg/sentry/socket/unix/transport/connectionless.go +++ b/pkg/sentry/socket/unix/transport/connectionless.go @@ -41,10 +41,11 @@ var ( // NewConnectionless creates a new unbound dgram endpoint. func NewConnectionless(ctx context.Context) Endpoint { ep := &connectionlessEndpoint{baseEndpoint{Queue: &waiter.Queue{}}} - q := queue{ReaderQueue: ep.Queue, WriterQueue: &waiter.Queue{}, limit: initialLimit} + q := queue{ReaderQueue: ep.Queue, WriterQueue: &waiter.Queue{}, limit: defaultBufferSize} q.InitRefs() ep.receiver = &queueReceiver{readQueue: &q} - ep.ops.InitHandler(ep, nil, nil) + ep.ops.SetSendBufferSize(defaultBufferSize, false /* notify */) + ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits) return ep } @@ -217,3 +218,11 @@ func (e *connectionlessEndpoint) State() uint32 { return linux.SS_DISCONNECTING } } + +// OnSetSendBufferSize implements tcpip.SocketOptionsHandler.OnSetSendBufferSize. +func (e *connectionlessEndpoint) OnSetSendBufferSize(v int64) (newSz int64) { + if e.Connected() { + return e.baseEndpoint.connected.SetSendBufferSize(v) + } + return v +} diff --git a/pkg/sentry/socket/unix/transport/connectionless_state.go b/pkg/sentry/socket/unix/transport/connectionless_state.go new file mode 100644 index 000000000..2ef337ec8 --- /dev/null +++ b/pkg/sentry/socket/unix/transport/connectionless_state.go @@ -0,0 +1,20 @@ +// Copyright 2021 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport + +// afterLoad is invoked by stateify. +func (e *connectionlessEndpoint) afterLoad() { + e.ops.InitHandler(e, &stackHandler{}, getSendBufferLimits) +} diff --git a/pkg/sentry/socket/unix/transport/queue.go b/pkg/sentry/socket/unix/transport/queue.go index 342def28f..698a9a82c 100644 --- a/pkg/sentry/socket/unix/transport/queue.go +++ b/pkg/sentry/socket/unix/transport/queue.go @@ -237,9 +237,18 @@ func (q *queue) QueuedSize() int64 { // MaxQueueSize returns the maximum number of bytes storable in the queue. func (q *queue) MaxQueueSize() int64 { + q.mu.Lock() + defer q.mu.Unlock() return q.limit } +// SetMaxQueueSize sets the maximum number of bytes storable in the queue. +func (q *queue) SetMaxQueueSize(v int64) { + q.mu.Lock() + defer q.mu.Unlock() + q.limit = v +} + // CloseUnread sets flag to indicate that the peer is closed (not shutdown) // with unread data. So if read on this queue shall return ECONNRESET error. func (q *queue) CloseUnread() { diff --git a/pkg/sentry/socket/unix/transport/transport_state_autogen.go b/pkg/sentry/socket/unix/transport/transport_state_autogen.go index 2aeff0256..ab00f8858 100644 --- a/pkg/sentry/socket/unix/transport/transport_state_autogen.go +++ b/pkg/sentry/socket/unix/transport/transport_state_autogen.go @@ -32,14 +32,13 @@ func (e *connectionedEndpoint) StateSave(stateSinkObject state.Sink) { stateSinkObject.Save(3, &e.stype) } -func (e *connectionedEndpoint) afterLoad() {} - func (e *connectionedEndpoint) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(0, &e.baseEndpoint) stateSourceObject.Load(1, &e.id) stateSourceObject.Load(2, &e.idGenerator) stateSourceObject.Load(3, &e.stype) stateSourceObject.LoadValue(4, new([]*connectionedEndpoint), func(y interface{}) { e.loadAcceptedChan(y.([]*connectionedEndpoint)) }) + stateSourceObject.AfterLoad(e.afterLoad) } func (e *connectionlessEndpoint) StateTypeName() string { @@ -59,10 +58,9 @@ func (e *connectionlessEndpoint) StateSave(stateSinkObject state.Sink) { stateSinkObject.Save(0, &e.baseEndpoint) } -func (e *connectionlessEndpoint) afterLoad() {} - func (e *connectionlessEndpoint) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(0, &e.baseEndpoint) + stateSourceObject.AfterLoad(e.afterLoad) } func (q *queue) StateTypeName() string { diff --git a/pkg/sentry/socket/unix/transport/unix.go b/pkg/sentry/socket/unix/transport/unix.go index 70227bbd2..ceada54a8 100644 --- a/pkg/sentry/socket/unix/transport/unix.go +++ b/pkg/sentry/socket/unix/transport/unix.go @@ -26,8 +26,16 @@ import ( "gvisor.dev/gvisor/pkg/waiter" ) -// initialLimit is the starting limit for the socket buffers. -const initialLimit = 16 * 1024 +const ( + // The minimum size of the send/receive buffers. + minimumBufferSize = 4 << 10 // 4 KiB (match default in linux) + + // The default size of the send/receive buffers. + defaultBufferSize = 208 << 10 // 208 KiB (default in linux for net.core.wmem_default) + + // The maximum permitted size for the send/receive buffers. + maxBufferSize = 4 << 20 // 4 MiB 4 MiB (default in linux for net.core.wmem_max) +) // A RightsControlMessage is a control message containing FDs. // @@ -627,6 +635,10 @@ type ConnectedEndpoint interface { // CloseUnread sets the fact that this end is closed with unread data to // the peer socket. CloseUnread() + + // SetSendBufferSize is called when the endpoint's send buffer size is + // changed. + SetSendBufferSize(v int64) (newSz int64) } // +stateify savable @@ -722,6 +734,14 @@ func (e *connectedEndpoint) CloseUnread() { e.writeQueue.CloseUnread() } +// SetSendBufferSize implements ConnectedEndpoint.SetSendBufferSize. +// SetSendBufferSize sets the send buffer size for the write queue to the +// specified value. +func (e *connectedEndpoint) SetSendBufferSize(v int64) (newSz int64) { + e.writeQueue.SetMaxQueueSize(v) + return v +} + // baseEndpoint is an embeddable unix endpoint base used in both the connected and connectionless // unix domain socket Endpoint implementations. // @@ -849,27 +869,6 @@ func (e *baseEndpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error { return nil } -// IsUnixSocket implements tcpip.SocketOptionsHandler.IsUnixSocket. -func (e *baseEndpoint) IsUnixSocket() bool { - return true -} - -// GetSendBufferSize implements tcpip.SocketOptionsHandler.GetSendBufferSize. -func (e *baseEndpoint) GetSendBufferSize() (int64, tcpip.Error) { - e.Lock() - defer e.Unlock() - - if !e.Connected() { - return -1, &tcpip.ErrNotConnected{} - } - - v := e.connected.SendMaxQueueSize() - if v < 0 { - return -1, &tcpip.ErrQueueSizeNotSupported{} - } - return v, nil -} - func (e *baseEndpoint) GetSockOptInt(opt tcpip.SockOptInt) (int, tcpip.Error) { switch opt { case tcpip.ReceiveQueueSizeOption: @@ -987,3 +986,35 @@ func (e *baseEndpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { func (*baseEndpoint) Release(context.Context) { // Binding a baseEndpoint doesn't take a reference. } + +// stackHandler is just a stub implementation of tcpip.StackHandler to provide +// when initializing socketoptions. +type stackHandler struct { +} + +// Option implements tcpip.StackHandler. +func (h *stackHandler) Option(option interface{}) tcpip.Error { + panic("unimplemented") +} + +// TransportProtocolOption implements tcpip.StackHandler. +func (h *stackHandler) TransportProtocolOption(proto tcpip.TransportProtocolNumber, option tcpip.GettableTransportProtocolOption) tcpip.Error { + panic("unimplemented") +} + +// getSendBufferLimits implements tcpip.GetSendBufferLimits. +// +// AF_UNIX sockets buffer sizes are not tied to the networking stack/namespace +// in linux but are bound by net.core.(wmem|rmem)_(max|default). +// +// In gVisor net.core sysctls today are not exposed or if exposed are currently +// tied to the networking stack in use. This makes it complicated for AF_UNIX +// when we are in a new namespace w/ no networking stack. As a result for now we +// define default/max values here in the unix socket implementation itself. +func getSendBufferLimits(tcpip.StackHandler) tcpip.SendBufferSizeOption { + return tcpip.SendBufferSizeOption{ + Min: minimumBufferSize, + Default: defaultBufferSize, + Max: maxBufferSize, + } +} |