summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/endpoint.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/transport/tcp/endpoint.go')
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go152
1 files changed, 67 insertions, 85 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 884332828..f25dc781a 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -822,11 +822,11 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
windowClamp: DefaultReceiveBufferSize,
maxSynRetries: DefaultSynRetries,
}
- e.rcvQueueInfo.RcvBufSize = DefaultReceiveBufferSize
- e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits)
+ e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits, GetTCPReceiveBufferLimits)
e.ops.SetMulticastLoop(true)
e.ops.SetQuickAck(true)
e.ops.SetSendBufferSize(DefaultSendBufferSize, false /* notify */)
+ e.ops.SetReceiveBufferSize(DefaultReceiveBufferSize, false /* notify */)
var ss tcpip.TCPSendBufferSizeRangeOption
if err := s.TransportProtocolOption(ProtocolNumber, &ss); err == nil {
@@ -835,7 +835,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
var rs tcpip.TCPReceiveBufferSizeRangeOption
if err := s.TransportProtocolOption(ProtocolNumber, &rs); err == nil {
- e.rcvQueueInfo.RcvBufSize = rs.Default
+ e.ops.SetReceiveBufferSize(int64(rs.Default), false /* notify */)
}
var cs tcpip.CongestionControlOption
@@ -1228,11 +1228,12 @@ func (e *endpoint) ModerateRecvBuf(copied int) {
// We do not adjust downwards as that can cause the receiver to
// reject valid data that might already be in flight as the
// acceptable window will shrink.
- if rcvWnd > e.rcvQueueInfo.RcvBufSize {
- availBefore := wndFromSpace(e.receiveBufferAvailableLocked())
- e.rcvQueueInfo.RcvBufSize = rcvWnd
- availAfter := wndFromSpace(e.receiveBufferAvailableLocked())
- if crossed, above := e.windowCrossedACKThresholdLocked(availAfter - availBefore); crossed && above {
+ rcvBufSize := int(e.ops.GetReceiveBufferSize())
+ if rcvWnd > rcvBufSize {
+ availBefore := wndFromSpace(e.receiveBufferAvailableLocked(rcvBufSize))
+ e.ops.SetReceiveBufferSize(int64(rcvWnd), false /* notify */)
+ availAfter := wndFromSpace(e.receiveBufferAvailableLocked(rcvWnd))
+ if crossed, above := e.windowCrossedACKThresholdLocked(availAfter-availBefore, rcvBufSize); crossed && above {
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
}
}
@@ -1424,7 +1425,7 @@ func (e *endpoint) commitRead(done int) *segment {
// enough buffer space, to either fit an aMSS or half a receive buffer
// (whichever smaller), then notify the protocol goroutine to send a
// window update.
- if crossed, above := e.windowCrossedACKThresholdLocked(memDelta); crossed && above {
+ if crossed, above := e.windowCrossedACKThresholdLocked(memDelta, int(e.ops.GetReceiveBufferSize())); crossed && above {
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
}
}
@@ -1556,9 +1557,9 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp
// selectWindowLocked returns the new window without checking for shrinking or scaling
// applied.
// Precondition: e.mu and e.rcvQueueMu must be held.
-func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) {
- wndFromAvailable := wndFromSpace(e.receiveBufferAvailableLocked())
- maxWindow := wndFromSpace(e.rcvQueueInfo.RcvBufSize)
+func (e *endpoint) selectWindowLocked(rcvBufSize int) (wnd seqnum.Size) {
+ wndFromAvailable := wndFromSpace(e.receiveBufferAvailableLocked(rcvBufSize))
+ maxWindow := wndFromSpace(rcvBufSize)
wndFromUsedBytes := maxWindow - e.rcvQueueInfo.RcvBufUsed
// We take the lesser of the wndFromAvailable and wndFromUsedBytes because in
@@ -1580,7 +1581,7 @@ func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) {
// selectWindow invokes selectWindowLocked after acquiring e.rcvQueueMu.
func (e *endpoint) selectWindow() (wnd seqnum.Size) {
e.rcvQueueInfo.rcvQueueMu.Lock()
- wnd = e.selectWindowLocked()
+ wnd = e.selectWindowLocked(int(e.ops.GetReceiveBufferSize()))
e.rcvQueueInfo.rcvQueueMu.Unlock()
return wnd
}
@@ -1600,8 +1601,8 @@ func (e *endpoint) selectWindow() (wnd seqnum.Size) {
// otherwise.
//
// Precondition: e.mu and e.rcvQueueMu must be held.
-func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed bool, above bool) {
- newAvail := int(e.selectWindowLocked())
+func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int, rcvBufSize int) (crossed bool, above bool) {
+ newAvail := int(e.selectWindowLocked(rcvBufSize))
oldAvail := newAvail - deltaBefore
if oldAvail < 0 {
oldAvail = 0
@@ -1610,7 +1611,7 @@ func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed boo
// rcvBufFraction is the inverse of the fraction of receive buffer size that
// is used to decide if the available buffer space is now above it.
const rcvBufFraction = 2
- if wndThreshold := wndFromSpace(e.rcvQueueInfo.RcvBufSize / rcvBufFraction); threshold > wndThreshold {
+ if wndThreshold := wndFromSpace(rcvBufSize / rcvBufFraction); threshold > wndThreshold {
threshold = wndThreshold
}
switch {
@@ -1661,6 +1662,37 @@ func (e *endpoint) getSendBufferSize() int {
return int(e.ops.GetSendBufferSize())
}
+// OnSetReceiveBufferSize implements tcpip.SocketOptionsHandler.OnSetReceiveBufferSize.
+func (e *endpoint) OnSetReceiveBufferSize(rcvBufSz, oldSz int64) (newSz int64) {
+ e.LockUser()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+
+ // Make sure the receive buffer size allows us to send a
+ // non-zero window size.
+ scale := uint8(0)
+ if e.rcv != nil {
+ scale = e.rcv.RcvWndScale
+ }
+ if rcvBufSz>>scale == 0 {
+ rcvBufSz = 1 << scale
+ }
+
+ availBefore := wndFromSpace(e.receiveBufferAvailableLocked(int(oldSz)))
+ availAfter := wndFromSpace(e.receiveBufferAvailableLocked(int(rcvBufSz)))
+ e.rcvQueueInfo.RcvAutoParams.Disabled = true
+
+ // Immediately send an ACK to uncork the sender silly window
+ // syndrome prevetion, when our available space grows above aMSS
+ // or half receive buffer, whichever smaller.
+ if crossed, above := e.windowCrossedACKThresholdLocked(availAfter-availBefore, int(rcvBufSz)); crossed && above {
+ e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
+ }
+
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
+ e.UnlockUser()
+ return rcvBufSz
+}
+
// SetSockOptInt sets a socket option.
func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error {
// Lower 2 bits represents ECN bits. RFC 3168, section 23.1
@@ -1704,56 +1736,6 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error {
return &tcpip.ErrNotSupported{}
}
- case tcpip.ReceiveBufferSizeOption:
- // Make sure the receive buffer size is within the min and max
- // allowed.
- var rs tcpip.TCPReceiveBufferSizeRangeOption
- if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err != nil {
- panic(fmt.Sprintf("e.stack.TransportProtocolOption(%d, %#v) = %s", ProtocolNumber, &rs, err))
- }
-
- if v > rs.Max {
- v = rs.Max
- }
-
- if v < math.MaxInt32/SegOverheadFactor {
- v *= SegOverheadFactor
- if v < rs.Min {
- v = rs.Min
- }
- } else {
- v = math.MaxInt32
- }
-
- e.LockUser()
- e.rcvQueueInfo.rcvQueueMu.Lock()
-
- // Make sure the receive buffer size allows us to send a
- // non-zero window size.
- scale := uint8(0)
- if e.rcv != nil {
- scale = e.rcv.RcvWndScale
- }
- if v>>scale == 0 {
- v = 1 << scale
- }
-
- availBefore := wndFromSpace(e.receiveBufferAvailableLocked())
- e.rcvQueueInfo.RcvBufSize = v
- availAfter := wndFromSpace(e.receiveBufferAvailableLocked())
-
- e.rcvQueueInfo.RcvAutoParams.Disabled = true
-
- // Immediately send an ACK to uncork the sender silly window
- // syndrome prevetion, when our available space grows above aMSS
- // or half receive buffer, whichever smaller.
- if crossed, above := e.windowCrossedACKThresholdLocked(availAfter - availBefore); crossed && above {
- e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
- }
-
- e.rcvQueueInfo.rcvQueueMu.Unlock()
- e.UnlockUser()
-
case tcpip.TTLOption:
e.LockUser()
e.ttl = uint8(v)
@@ -1939,12 +1921,6 @@ func (e *endpoint) GetSockOptInt(opt tcpip.SockOptInt) (int, tcpip.Error) {
case tcpip.ReceiveQueueSizeOption:
return e.readyReceiveSize()
- case tcpip.ReceiveBufferSizeOption:
- e.rcvQueueInfo.rcvQueueMu.Lock()
- v := e.rcvQueueInfo.RcvBufSize
- e.rcvQueueInfo.rcvQueueMu.Unlock()
- return v, nil
-
case tcpip.TTLOption:
e.LockUser()
v := int(e.ttl)
@@ -2780,15 +2756,15 @@ func (e *endpoint) readyToRead(s *segment) {
// receiveBufferAvailableLocked calculates how many bytes are still available
// in the receive buffer.
// rcvQueueMu must be held when this function is called.
-func (e *endpoint) receiveBufferAvailableLocked() int {
+func (e *endpoint) receiveBufferAvailableLocked(rcvBufSize int) int {
// We may use more bytes than the buffer size when the receive buffer
// shrinks.
memUsed := e.receiveMemUsed()
- if memUsed >= e.rcvQueueInfo.RcvBufSize {
+ if memUsed >= rcvBufSize {
return 0
}
- return e.rcvQueueInfo.RcvBufSize - memUsed
+ return rcvBufSize - memUsed
}
// receiveBufferAvailable calculates how many bytes are still available in the
@@ -2796,7 +2772,7 @@ func (e *endpoint) receiveBufferAvailableLocked() int {
// receive buffer/pending and segment queue.
func (e *endpoint) receiveBufferAvailable() int {
e.rcvQueueInfo.rcvQueueMu.Lock()
- available := e.receiveBufferAvailableLocked()
+ available := e.receiveBufferAvailableLocked(int(e.ops.GetReceiveBufferSize()))
e.rcvQueueInfo.rcvQueueMu.Unlock()
return available
}
@@ -2809,14 +2785,6 @@ func (e *endpoint) receiveBufferUsed() int {
return used
}
-// receiveBufferSize returns the current size of the receive buffer.
-func (e *endpoint) receiveBufferSize() int {
- e.rcvQueueInfo.rcvQueueMu.Lock()
- size := e.rcvQueueInfo.RcvBufSize
- e.rcvQueueInfo.rcvQueueMu.Unlock()
- return size
-}
-
// receiveMemUsed returns the total memory in use by segments held by this
// endpoint.
func (e *endpoint) receiveMemUsed() int {
@@ -2845,7 +2813,7 @@ func (e *endpoint) maxReceiveBufferSize() int {
// receiveBuffer otherwise we use the max permissible receive buffer size to
// compute the scale.
func (e *endpoint) rcvWndScaleForHandshake() int {
- bufSizeForScale := e.receiveBufferSize()
+ bufSizeForScale := e.ops.GetReceiveBufferSize()
e.rcvQueueInfo.rcvQueueMu.Lock()
autoTuningDisabled := e.rcvQueueInfo.RcvAutoParams.Disabled
@@ -3074,3 +3042,17 @@ func (e *endpoint) allowOutOfWindowAck() bool {
e.lastOutOfWindowAckTime = now
return true
}
+
+// GetTCPReceiveBufferLimits is used to get send buffer size limits for TCP.
+func GetTCPReceiveBufferLimits(s tcpip.StackHandler) tcpip.ReceiveBufferSizeOption {
+ var ss tcpip.TCPReceiveBufferSizeRangeOption
+ if err := s.TransportProtocolOption(header.TCPProtocolNumber, &ss); err != nil {
+ panic(fmt.Sprintf("s.TransportProtocolOption(%d, %#v) = %s", header.TCPProtocolNumber, ss, err))
+ }
+
+ return tcpip.ReceiveBufferSizeOption{
+ Min: ss.Min,
+ Default: ss.Default,
+ Max: ss.Max,
+ }
+}