diff options
-rw-r--r-- | pkg/sentry/socket/unix/transport/connectioned.go | 5 | ||||
-rw-r--r-- | pkg/sentry/socket/unix/transport/connectionless.go | 5 | ||||
-rw-r--r-- | pkg/tcpip/socketops.go | 11 | ||||
-rw-r--r-- | pkg/tcpip/stack/stack_state_autogen.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/stack/tcp.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 66 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 6 |
7 files changed, 96 insertions, 6 deletions
diff --git a/pkg/sentry/socket/unix/transport/connectioned.go b/pkg/sentry/socket/unix/transport/connectioned.go index 33f9aeb06..9a398c3b5 100644 --- a/pkg/sentry/socket/unix/transport/connectioned.go +++ b/pkg/sentry/socket/unix/transport/connectioned.go @@ -129,9 +129,9 @@ func newConnectioned(ctx context.Context, stype linux.SockType, uid UniqueIDProv stype: stype, } + ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits, getReceiveBufferLimits) ep.ops.SetSendBufferSize(defaultBufferSize, false /* notify */) ep.ops.SetReceiveBufferSize(defaultBufferSize, false /* notify */) - ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits, getReceiveBufferLimits) return ep } @@ -517,3 +517,6 @@ func (e *connectionedEndpoint) OnSetSendBufferSize(v int64) (newSz int64) { } return v } + +// WakeupWriters implements tcpip.SocketOptionsHandler.WakeupWriters. +func (e *connectionedEndpoint) WakeupWriters() {} diff --git a/pkg/sentry/socket/unix/transport/connectionless.go b/pkg/sentry/socket/unix/transport/connectionless.go index 61338728a..61311718e 100644 --- a/pkg/sentry/socket/unix/transport/connectionless.go +++ b/pkg/sentry/socket/unix/transport/connectionless.go @@ -44,9 +44,9 @@ func NewConnectionless(ctx context.Context) Endpoint { q := queue{ReaderQueue: ep.Queue, WriterQueue: &waiter.Queue{}, limit: defaultBufferSize} q.InitRefs() ep.receiver = &queueReceiver{readQueue: &q} + ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits, getReceiveBufferLimits) ep.ops.SetSendBufferSize(defaultBufferSize, false /* notify */) ep.ops.SetReceiveBufferSize(defaultBufferSize, false /* notify */) - ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits, getReceiveBufferLimits) return ep } @@ -227,3 +227,6 @@ func (e *connectionlessEndpoint) OnSetSendBufferSize(v int64) (newSz int64) { } return v } + +// WakeupWriters implements tcpip.SocketOptionsHandler.WakeupWriters. +func (e *connectionlessEndpoint) WakeupWriters() {} diff --git a/pkg/tcpip/socketops.go b/pkg/tcpip/socketops.go index 6bce3af04..34ac62444 100644 --- a/pkg/tcpip/socketops.go +++ b/pkg/tcpip/socketops.go @@ -57,6 +57,11 @@ type SocketOptionsHandler interface { // OnSetReceiveBufferSize is invoked by SO_RCVBUF and SO_RCVBUFFORCE. OnSetReceiveBufferSize(v, oldSz int64) (newSz int64) + + // WakeupWriters is invoked when the send buffer size for an endpoint is + // changed. The handler notifies the writers if the send buffer size is + // increased with setsockopt(2) for TCP endpoints. + WakeupWriters() } // DefaultSocketOptionsHandler is an embeddable type that implements no-op @@ -98,6 +103,9 @@ func (*DefaultSocketOptionsHandler) OnSetSendBufferSize(v int64) (newSz int64) { return v } +// WakeupWriters implements SocketOptionsHandler.WakeupWriters. +func (*DefaultSocketOptionsHandler) WakeupWriters() {} + // OnSetReceiveBufferSize implements SocketOptionsHandler.OnSetReceiveBufferSize. func (*DefaultSocketOptionsHandler) OnSetReceiveBufferSize(v, oldSz int64) (newSz int64) { return v @@ -626,6 +634,9 @@ func (so *SocketOptions) SetSendBufferSize(sendBufferSize int64, notify bool) { sendBufferSize = so.handler.OnSetSendBufferSize(sendBufferSize) } so.sendBufferSize.Store(sendBufferSize) + if notify { + so.handler.WakeupWriters() + } } // GetReceiveBufferSize gets value for SO_RCVBUF option. diff --git a/pkg/tcpip/stack/stack_state_autogen.go b/pkg/tcpip/stack/stack_state_autogen.go index 47c7e386e..037c7b745 100644 --- a/pkg/tcpip/stack/stack_state_autogen.go +++ b/pkg/tcpip/stack/stack_state_autogen.go @@ -1047,6 +1047,7 @@ func (t *TCPSndBufState) StateFields() []string { "SndClosed", "PacketTooBigCount", "SndMTU", + "AutoTuneSndBufDisabled", } } @@ -1060,6 +1061,7 @@ func (t *TCPSndBufState) StateSave(stateSinkObject state.Sink) { stateSinkObject.Save(2, &t.SndClosed) stateSinkObject.Save(3, &t.PacketTooBigCount) stateSinkObject.Save(4, &t.SndMTU) + stateSinkObject.Save(5, &t.AutoTuneSndBufDisabled) } func (t *TCPSndBufState) afterLoad() {} @@ -1071,6 +1073,7 @@ func (t *TCPSndBufState) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(2, &t.SndClosed) stateSourceObject.Load(3, &t.PacketTooBigCount) stateSourceObject.Load(4, &t.SndMTU) + stateSourceObject.Load(5, &t.AutoTuneSndBufDisabled) } func (t *TCPEndpointStateInner) StateTypeName() string { diff --git a/pkg/tcpip/stack/tcp.go b/pkg/tcpip/stack/tcp.go index 90a8ba6cf..93ea83cdc 100644 --- a/pkg/tcpip/stack/tcp.go +++ b/pkg/tcpip/stack/tcp.go @@ -386,6 +386,12 @@ type TCPSndBufState struct { // SndMTU is the smallest MTU seen in the control packets received. SndMTU int + + // AutoTuneSndBufDisabled indicates that the auto tuning of send buffer + // is disabled. + // + // Must be accessed using atomic operations. + AutoTuneSndBufDisabled uint32 } // TCPEndpointStateInner contains the members of TCPEndpointState used directly diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 044123185..355719beb 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -1717,6 +1717,27 @@ func (e *endpoint) OnSetReceiveBufferSize(rcvBufSz, oldSz int64) (newSz int64) { return rcvBufSz } +// OnSetSendBufferSize implements tcpip.SocketOptionsHandler.OnSetSendBufferSize. +func (e *endpoint) OnSetSendBufferSize(sz int64) int64 { + atomic.StoreUint32(&e.sndQueueInfo.TCPSndBufState.AutoTuneSndBufDisabled, 1) + return sz +} + +// WakeupWriters implements tcpip.SocketOptionsHandler.WakeupWriters. +func (e *endpoint) WakeupWriters() { + e.LockUser() + defer e.UnlockUser() + + sendBufferSize := e.getSendBufferSize() + e.sndQueueInfo.sndQueueMu.Lock() + notify := (sendBufferSize - e.sndQueueInfo.SndBufUsed) >= e.sndQueueInfo.SndBufUsed>>1 + e.sndQueueInfo.sndQueueMu.Unlock() + + if notify { + e.waiterQueue.Notify(waiter.WritableEvents) + } +} + // SetSockOptInt sets a socket option. func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error { // Lower 2 bits represents ECN bits. RFC 3168, section 23.1 @@ -2329,6 +2350,9 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp e.segmentQueue.mu.Unlock() e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0) e.setEndpointState(StateEstablished) + // Set the new auto tuned send buffer size after entering + // established state. + e.ops.SetSendBufferSize(e.computeTCPSendBufferSize(), false /* notify */) } if run { @@ -2763,13 +2787,20 @@ func (e *endpoint) updateSndBufferUsage(v int) { e.sndQueueInfo.sndQueueMu.Lock() notify := e.sndQueueInfo.SndBufUsed >= sendBufferSize>>1 e.sndQueueInfo.SndBufUsed -= v + + // Get the new send buffer size with auto tuning, but do not set it + // unless we decide to notify the writers. + newSndBufSz := e.computeTCPSendBufferSize() + // We only notify when there is half the sendBufferSize available after // a full buffer event occurs. This ensures that we don't wake up // writers to queue just 1-2 segments and go back to sleep. - notify = notify && e.sndQueueInfo.SndBufUsed < sendBufferSize>>1 + notify = notify && e.sndQueueInfo.SndBufUsed < int(newSndBufSz)>>1 e.sndQueueInfo.sndQueueMu.Unlock() if notify { + // Set the new send buffer size calculated from auto tuning. + e.ops.SetSendBufferSize(newSndBufSz, false /* notify */) e.waiterQueue.Notify(waiter.WritableEvents) } } @@ -3091,3 +3122,36 @@ func GetTCPReceiveBufferLimits(s tcpip.StackHandler) tcpip.ReceiveBufferSizeOpti Max: ss.Max, } } + +// computeTCPSendBufferSize implements auto tuning of send buffer size and +// returns the new send buffer size. +func (e *endpoint) computeTCPSendBufferSize() int64 { + curSndBufSz := int64(e.getSendBufferSize()) + + // Auto tuning is disabled when the user explicitly sets the send + // buffer size with SO_SNDBUF option. + if disabled := atomic.LoadUint32(&e.sndQueueInfo.TCPSndBufState.AutoTuneSndBufDisabled); disabled == 1 { + return curSndBufSz + } + + const packetOverheadFactor = 2 + curMSS := e.snd.MaxPayloadSize + numSeg := InitialCwnd + if numSeg < e.snd.SndCwnd { + numSeg = e.snd.SndCwnd + } + + // SndCwnd indicates the number of segments that can be sent. This means + // that the sender can send upto #SndCwnd segments and the send buffer + // size should be set to SndCwnd*MSS to accommodate sending of all the + // segments. + newSndBufSz := int64(numSeg * curMSS * packetOverheadFactor) + if newSndBufSz < curSndBufSz { + return curSndBufSz + } + if ss := GetTCPSendBufferLimits(e.stack); int64(ss.Max) < newSndBufSz { + newSndBufSz = int64(ss.Max) + } + + return newSndBufSz +} diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 92a66f17e..64302f576 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -1415,9 +1415,6 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { ackLeft -= datalen } - // Update the send buffer usage and notify potential waiters. - s.ep.updateSndBufferUsage(int(acked)) - // Clear SACK information for all acked data. s.ep.scoreboard.Delete(s.SndUna) @@ -1437,6 +1434,9 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { } } + // Update the send buffer usage and notify potential waiters. + s.ep.updateSndBufferUsage(int(acked)) + // It is possible for s.outstanding to drop below zero if we get // a retransmit timeout, reset outstanding to zero but later // get an ack that cover previously sent data. |