summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/sentry/socket/unix/transport/connectioned.go5
-rw-r--r--pkg/sentry/socket/unix/transport/connectionless.go5
-rw-r--r--pkg/tcpip/socketops.go11
-rw-r--r--pkg/tcpip/stack/tcp.go6
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go66
-rw-r--r--pkg/tcpip/transport/tcp/snd.go6
-rw-r--r--pkg/tcpip/transport/tcp/tcp_test.go92
7 files changed, 185 insertions, 6 deletions
diff --git a/pkg/sentry/socket/unix/transport/connectioned.go b/pkg/sentry/socket/unix/transport/connectioned.go
index 33f9aeb06..9a398c3b5 100644
--- a/pkg/sentry/socket/unix/transport/connectioned.go
+++ b/pkg/sentry/socket/unix/transport/connectioned.go
@@ -129,9 +129,9 @@ func newConnectioned(ctx context.Context, stype linux.SockType, uid UniqueIDProv
stype: stype,
}
+ ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits, getReceiveBufferLimits)
ep.ops.SetSendBufferSize(defaultBufferSize, false /* notify */)
ep.ops.SetReceiveBufferSize(defaultBufferSize, false /* notify */)
- ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits, getReceiveBufferLimits)
return ep
}
@@ -517,3 +517,6 @@ func (e *connectionedEndpoint) OnSetSendBufferSize(v int64) (newSz int64) {
}
return v
}
+
+// WakeupWriters implements tcpip.SocketOptionsHandler.WakeupWriters.
+func (e *connectionedEndpoint) WakeupWriters() {}
diff --git a/pkg/sentry/socket/unix/transport/connectionless.go b/pkg/sentry/socket/unix/transport/connectionless.go
index 61338728a..61311718e 100644
--- a/pkg/sentry/socket/unix/transport/connectionless.go
+++ b/pkg/sentry/socket/unix/transport/connectionless.go
@@ -44,9 +44,9 @@ func NewConnectionless(ctx context.Context) Endpoint {
q := queue{ReaderQueue: ep.Queue, WriterQueue: &waiter.Queue{}, limit: defaultBufferSize}
q.InitRefs()
ep.receiver = &queueReceiver{readQueue: &q}
+ ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits, getReceiveBufferLimits)
ep.ops.SetSendBufferSize(defaultBufferSize, false /* notify */)
ep.ops.SetReceiveBufferSize(defaultBufferSize, false /* notify */)
- ep.ops.InitHandler(ep, &stackHandler{}, getSendBufferLimits, getReceiveBufferLimits)
return ep
}
@@ -227,3 +227,6 @@ func (e *connectionlessEndpoint) OnSetSendBufferSize(v int64) (newSz int64) {
}
return v
}
+
+// WakeupWriters implements tcpip.SocketOptionsHandler.WakeupWriters.
+func (e *connectionlessEndpoint) WakeupWriters() {}
diff --git a/pkg/tcpip/socketops.go b/pkg/tcpip/socketops.go
index 6bce3af04..34ac62444 100644
--- a/pkg/tcpip/socketops.go
+++ b/pkg/tcpip/socketops.go
@@ -57,6 +57,11 @@ type SocketOptionsHandler interface {
// OnSetReceiveBufferSize is invoked by SO_RCVBUF and SO_RCVBUFFORCE.
OnSetReceiveBufferSize(v, oldSz int64) (newSz int64)
+
+ // WakeupWriters is invoked when the send buffer size for an endpoint is
+ // changed. The handler notifies the writers if the send buffer size is
+ // increased with setsockopt(2) for TCP endpoints.
+ WakeupWriters()
}
// DefaultSocketOptionsHandler is an embeddable type that implements no-op
@@ -98,6 +103,9 @@ func (*DefaultSocketOptionsHandler) OnSetSendBufferSize(v int64) (newSz int64) {
return v
}
+// WakeupWriters implements SocketOptionsHandler.WakeupWriters.
+func (*DefaultSocketOptionsHandler) WakeupWriters() {}
+
// OnSetReceiveBufferSize implements SocketOptionsHandler.OnSetReceiveBufferSize.
func (*DefaultSocketOptionsHandler) OnSetReceiveBufferSize(v, oldSz int64) (newSz int64) {
return v
@@ -626,6 +634,9 @@ func (so *SocketOptions) SetSendBufferSize(sendBufferSize int64, notify bool) {
sendBufferSize = so.handler.OnSetSendBufferSize(sendBufferSize)
}
so.sendBufferSize.Store(sendBufferSize)
+ if notify {
+ so.handler.WakeupWriters()
+ }
}
// GetReceiveBufferSize gets value for SO_RCVBUF option.
diff --git a/pkg/tcpip/stack/tcp.go b/pkg/tcpip/stack/tcp.go
index 90a8ba6cf..93ea83cdc 100644
--- a/pkg/tcpip/stack/tcp.go
+++ b/pkg/tcpip/stack/tcp.go
@@ -386,6 +386,12 @@ type TCPSndBufState struct {
// SndMTU is the smallest MTU seen in the control packets received.
SndMTU int
+
+ // AutoTuneSndBufDisabled indicates that the auto tuning of send buffer
+ // is disabled.
+ //
+ // Must be accessed using atomic operations.
+ AutoTuneSndBufDisabled uint32
}
// TCPEndpointStateInner contains the members of TCPEndpointState used directly
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 044123185..355719beb 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -1717,6 +1717,27 @@ func (e *endpoint) OnSetReceiveBufferSize(rcvBufSz, oldSz int64) (newSz int64) {
return rcvBufSz
}
+// OnSetSendBufferSize implements tcpip.SocketOptionsHandler.OnSetSendBufferSize.
+func (e *endpoint) OnSetSendBufferSize(sz int64) int64 {
+ atomic.StoreUint32(&e.sndQueueInfo.TCPSndBufState.AutoTuneSndBufDisabled, 1)
+ return sz
+}
+
+// WakeupWriters implements tcpip.SocketOptionsHandler.WakeupWriters.
+func (e *endpoint) WakeupWriters() {
+ e.LockUser()
+ defer e.UnlockUser()
+
+ sendBufferSize := e.getSendBufferSize()
+ e.sndQueueInfo.sndQueueMu.Lock()
+ notify := (sendBufferSize - e.sndQueueInfo.SndBufUsed) >= e.sndQueueInfo.SndBufUsed>>1
+ e.sndQueueInfo.sndQueueMu.Unlock()
+
+ if notify {
+ e.waiterQueue.Notify(waiter.WritableEvents)
+ }
+}
+
// SetSockOptInt sets a socket option.
func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error {
// Lower 2 bits represents ECN bits. RFC 3168, section 23.1
@@ -2329,6 +2350,9 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
e.segmentQueue.mu.Unlock()
e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0)
e.setEndpointState(StateEstablished)
+ // Set the new auto tuned send buffer size after entering
+ // established state.
+ e.ops.SetSendBufferSize(e.computeTCPSendBufferSize(), false /* notify */)
}
if run {
@@ -2763,13 +2787,20 @@ func (e *endpoint) updateSndBufferUsage(v int) {
e.sndQueueInfo.sndQueueMu.Lock()
notify := e.sndQueueInfo.SndBufUsed >= sendBufferSize>>1
e.sndQueueInfo.SndBufUsed -= v
+
+ // Get the new send buffer size with auto tuning, but do not set it
+ // unless we decide to notify the writers.
+ newSndBufSz := e.computeTCPSendBufferSize()
+
// We only notify when there is half the sendBufferSize available after
// a full buffer event occurs. This ensures that we don't wake up
// writers to queue just 1-2 segments and go back to sleep.
- notify = notify && e.sndQueueInfo.SndBufUsed < sendBufferSize>>1
+ notify = notify && e.sndQueueInfo.SndBufUsed < int(newSndBufSz)>>1
e.sndQueueInfo.sndQueueMu.Unlock()
if notify {
+ // Set the new send buffer size calculated from auto tuning.
+ e.ops.SetSendBufferSize(newSndBufSz, false /* notify */)
e.waiterQueue.Notify(waiter.WritableEvents)
}
}
@@ -3091,3 +3122,36 @@ func GetTCPReceiveBufferLimits(s tcpip.StackHandler) tcpip.ReceiveBufferSizeOpti
Max: ss.Max,
}
}
+
+// computeTCPSendBufferSize implements auto tuning of send buffer size and
+// returns the new send buffer size.
+func (e *endpoint) computeTCPSendBufferSize() int64 {
+ curSndBufSz := int64(e.getSendBufferSize())
+
+ // Auto tuning is disabled when the user explicitly sets the send
+ // buffer size with SO_SNDBUF option.
+ if disabled := atomic.LoadUint32(&e.sndQueueInfo.TCPSndBufState.AutoTuneSndBufDisabled); disabled == 1 {
+ return curSndBufSz
+ }
+
+ const packetOverheadFactor = 2
+ curMSS := e.snd.MaxPayloadSize
+ numSeg := InitialCwnd
+ if numSeg < e.snd.SndCwnd {
+ numSeg = e.snd.SndCwnd
+ }
+
+ // SndCwnd indicates the number of segments that can be sent. This means
+ // that the sender can send upto #SndCwnd segments and the send buffer
+ // size should be set to SndCwnd*MSS to accommodate sending of all the
+ // segments.
+ newSndBufSz := int64(numSeg * curMSS * packetOverheadFactor)
+ if newSndBufSz < curSndBufSz {
+ return curSndBufSz
+ }
+ if ss := GetTCPSendBufferLimits(e.stack); int64(ss.Max) < newSndBufSz {
+ newSndBufSz = int64(ss.Max)
+ }
+
+ return newSndBufSz
+}
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index 92a66f17e..64302f576 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -1415,9 +1415,6 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
ackLeft -= datalen
}
- // Update the send buffer usage and notify potential waiters.
- s.ep.updateSndBufferUsage(int(acked))
-
// Clear SACK information for all acked data.
s.ep.scoreboard.Delete(s.SndUna)
@@ -1437,6 +1434,9 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
}
}
+ // Update the send buffer usage and notify potential waiters.
+ s.ep.updateSndBufferUsage(int(acked))
+
// It is possible for s.outstanding to drop below zero if we get
// a retransmit timeout, reset outstanding to zero but later
// get an ack that cover previously sent data.
diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go
index 031f01357..db6b0955a 100644
--- a/pkg/tcpip/transport/tcp/tcp_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_test.go
@@ -7964,3 +7964,95 @@ func generateRandomPayload(t *testing.T, n int) []byte {
}
return buf
}
+
+func TestSendBufferTuning(t *testing.T) {
+ const maxPayload = 536
+ const mtu = header.TCPMinimumSize + header.IPv4MinimumSize + maxTCPOptionSize + maxPayload
+ const packetOverheadFactor = 2
+
+ testCases := []struct {
+ name string
+ autoTuningDisabled bool
+ }{
+ {"autoTuningDisabled", true},
+ {"autoTuningEnabled", false},
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ c := context.New(t, mtu)
+ defer c.Cleanup()
+
+ // Set the stack option for send buffer size.
+ const defaultSndBufSz = maxPayload * tcp.InitialCwnd
+ const maxSndBufSz = defaultSndBufSz * 10
+ {
+ opt := tcpip.TCPSendBufferSizeRangeOption{Min: 1, Default: defaultSndBufSz, Max: maxSndBufSz}
+ if err := c.Stack().SetTransportProtocolOption(tcp.ProtocolNumber, &opt); err != nil {
+ t.Fatalf("SetTransportProtocolOption(%d, &%#v): %s", tcp.ProtocolNumber, opt, err)
+ }
+ }
+
+ c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */)
+
+ oldSz := c.EP.SocketOptions().GetSendBufferSize()
+ if oldSz != defaultSndBufSz {
+ t.Fatalf("Wrong send buffer size got %d want %d", oldSz, defaultSndBufSz)
+ }
+
+ if tc.autoTuningDisabled {
+ c.EP.SocketOptions().SetSendBufferSize(defaultSndBufSz, true /* notify */)
+ }
+
+ data := make([]byte, maxPayload)
+ for i := range data {
+ data[i] = byte(i)
+ }
+
+ w, ch := waiter.NewChannelEntry(nil)
+ c.WQ.EventRegister(&w, waiter.WritableEvents)
+ defer c.WQ.EventUnregister(&w)
+
+ bytesRead := 0
+ for {
+ // Packets will be sent till the send buffer
+ // size is reached.
+ var r bytes.Reader
+ r.Reset(data[bytesRead : bytesRead+maxPayload])
+ _, err := c.EP.Write(&r, tcpip.WriteOptions{})
+ if cmp.Equal(&tcpip.ErrWouldBlock{}, err) {
+ break
+ }
+
+ c.ReceiveAndCheckPacketWithOptions(data, bytesRead, maxPayload, 0)
+ bytesRead += maxPayload
+ data = append(data, data...)
+ }
+
+ // Send an ACK and wait for connection to become writable again.
+ c.SendAck(seqnum.Value(context.TestInitialSequenceNumber).Add(1), bytesRead)
+ select {
+ case <-ch:
+ if err := c.EP.LastError(); err != nil {
+ t.Fatalf("Write failed: %s", err)
+ }
+ case <-time.After(1 * time.Second):
+ t.Fatalf("Timed out waiting for connection")
+ }
+
+ outSz := int64(defaultSndBufSz)
+ if !tc.autoTuningDisabled {
+ // Calculate the new auto tuned send buffer.
+ var info tcpip.TCPInfoOption
+ if err := c.EP.GetSockOpt(&info); err != nil {
+ t.Fatalf("GetSockOpt failed: %v", err)
+ }
+ outSz = (int64(info.SndCwnd) * packetOverheadFactor * (maxPayload))
+ }
+
+ if newSz := c.EP.SocketOptions().GetSendBufferSize(); newSz != outSz {
+ t.Fatalf("Wrong send buffer size, got %d want %d", newSz, outSz)
+ }
+ })
+ }
+}