summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp
diff options
context:
space:
mode:
authorgVisor bot <gvisor-bot@google.com>2021-04-20 16:38:23 +0000
committergVisor bot <gvisor-bot@google.com>2021-04-20 16:38:23 +0000
commit54422516dff22248de676ddca1fb4176e661491c (patch)
tree50fe32b0a3c798ab186f9fab2afcf8e49af92e86 /pkg/tcpip/transport/tcp
parent908e108fd49d40e9a90170156bf10a855b6414c4 (diff)
parent3fff4c4a0fbb1b132348d4b82f61cc38a4cc6cb2 (diff)
Merge release-20210412.0-37-g3fff4c4a0 (automated)
Diffstat (limited to 'pkg/tcpip/transport/tcp')
-rw-r--r--pkg/tcpip/transport/tcp/accept.go2
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go152
-rw-r--r--pkg/tcpip/transport/tcp/endpoint_state.go6
-rw-r--r--pkg/tcpip/transport/tcp/rcv.go2
-rw-r--r--pkg/tcpip/transport/tcp/segment_queue.go4
5 files changed, 74 insertions, 92 deletions
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index 664cb9420..d4bd4e80e 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -219,7 +219,7 @@ func (l *listenContext) createConnectingEndpoint(s *segment, rcvdSynOpts *header
n.boundNICID = s.nicID
n.route = route
n.effectiveNetProtos = []tcpip.NetworkProtocolNumber{s.netProto}
- n.rcvQueueInfo.RcvBufSize = int(l.rcvWnd)
+ n.ops.SetReceiveBufferSize(int64(l.rcvWnd), false /* notify */)
n.amss = calculateAdvertisedMSS(n.userMSS, n.route)
n.setEndpointState(StateConnecting)
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 884332828..f25dc781a 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -822,11 +822,11 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
windowClamp: DefaultReceiveBufferSize,
maxSynRetries: DefaultSynRetries,
}
- e.rcvQueueInfo.RcvBufSize = DefaultReceiveBufferSize
- e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits)
+ e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits, GetTCPReceiveBufferLimits)
e.ops.SetMulticastLoop(true)
e.ops.SetQuickAck(true)
e.ops.SetSendBufferSize(DefaultSendBufferSize, false /* notify */)
+ e.ops.SetReceiveBufferSize(DefaultReceiveBufferSize, false /* notify */)
var ss tcpip.TCPSendBufferSizeRangeOption
if err := s.TransportProtocolOption(ProtocolNumber, &ss); err == nil {
@@ -835,7 +835,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
var rs tcpip.TCPReceiveBufferSizeRangeOption
if err := s.TransportProtocolOption(ProtocolNumber, &rs); err == nil {
- e.rcvQueueInfo.RcvBufSize = rs.Default
+ e.ops.SetReceiveBufferSize(int64(rs.Default), false /* notify */)
}
var cs tcpip.CongestionControlOption
@@ -1228,11 +1228,12 @@ func (e *endpoint) ModerateRecvBuf(copied int) {
// We do not adjust downwards as that can cause the receiver to
// reject valid data that might already be in flight as the
// acceptable window will shrink.
- if rcvWnd > e.rcvQueueInfo.RcvBufSize {
- availBefore := wndFromSpace(e.receiveBufferAvailableLocked())
- e.rcvQueueInfo.RcvBufSize = rcvWnd
- availAfter := wndFromSpace(e.receiveBufferAvailableLocked())
- if crossed, above := e.windowCrossedACKThresholdLocked(availAfter - availBefore); crossed && above {
+ rcvBufSize := int(e.ops.GetReceiveBufferSize())
+ if rcvWnd > rcvBufSize {
+ availBefore := wndFromSpace(e.receiveBufferAvailableLocked(rcvBufSize))
+ e.ops.SetReceiveBufferSize(int64(rcvWnd), false /* notify */)
+ availAfter := wndFromSpace(e.receiveBufferAvailableLocked(rcvWnd))
+ if crossed, above := e.windowCrossedACKThresholdLocked(availAfter-availBefore, rcvBufSize); crossed && above {
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
}
}
@@ -1424,7 +1425,7 @@ func (e *endpoint) commitRead(done int) *segment {
// enough buffer space, to either fit an aMSS or half a receive buffer
// (whichever smaller), then notify the protocol goroutine to send a
// window update.
- if crossed, above := e.windowCrossedACKThresholdLocked(memDelta); crossed && above {
+ if crossed, above := e.windowCrossedACKThresholdLocked(memDelta, int(e.ops.GetReceiveBufferSize())); crossed && above {
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
}
}
@@ -1556,9 +1557,9 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp
// selectWindowLocked returns the new window without checking for shrinking or scaling
// applied.
// Precondition: e.mu and e.rcvQueueMu must be held.
-func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) {
- wndFromAvailable := wndFromSpace(e.receiveBufferAvailableLocked())
- maxWindow := wndFromSpace(e.rcvQueueInfo.RcvBufSize)
+func (e *endpoint) selectWindowLocked(rcvBufSize int) (wnd seqnum.Size) {
+ wndFromAvailable := wndFromSpace(e.receiveBufferAvailableLocked(rcvBufSize))
+ maxWindow := wndFromSpace(rcvBufSize)
wndFromUsedBytes := maxWindow - e.rcvQueueInfo.RcvBufUsed
// We take the lesser of the wndFromAvailable and wndFromUsedBytes because in
@@ -1580,7 +1581,7 @@ func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) {
// selectWindow invokes selectWindowLocked after acquiring e.rcvQueueMu.
func (e *endpoint) selectWindow() (wnd seqnum.Size) {
e.rcvQueueInfo.rcvQueueMu.Lock()
- wnd = e.selectWindowLocked()
+ wnd = e.selectWindowLocked(int(e.ops.GetReceiveBufferSize()))
e.rcvQueueInfo.rcvQueueMu.Unlock()
return wnd
}
@@ -1600,8 +1601,8 @@ func (e *endpoint) selectWindow() (wnd seqnum.Size) {
// otherwise.
//
// Precondition: e.mu and e.rcvQueueMu must be held.
-func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed bool, above bool) {
- newAvail := int(e.selectWindowLocked())
+func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int, rcvBufSize int) (crossed bool, above bool) {
+ newAvail := int(e.selectWindowLocked(rcvBufSize))
oldAvail := newAvail - deltaBefore
if oldAvail < 0 {
oldAvail = 0
@@ -1610,7 +1611,7 @@ func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed boo
// rcvBufFraction is the inverse of the fraction of receive buffer size that
// is used to decide if the available buffer space is now above it.
const rcvBufFraction = 2
- if wndThreshold := wndFromSpace(e.rcvQueueInfo.RcvBufSize / rcvBufFraction); threshold > wndThreshold {
+ if wndThreshold := wndFromSpace(rcvBufSize / rcvBufFraction); threshold > wndThreshold {
threshold = wndThreshold
}
switch {
@@ -1661,6 +1662,37 @@ func (e *endpoint) getSendBufferSize() int {
return int(e.ops.GetSendBufferSize())
}
+// OnSetReceiveBufferSize implements tcpip.SocketOptionsHandler.OnSetReceiveBufferSize.
+func (e *endpoint) OnSetReceiveBufferSize(rcvBufSz, oldSz int64) (newSz int64) {
+ e.LockUser()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+
+ // Make sure the receive buffer size allows us to send a
+ // non-zero window size.
+ scale := uint8(0)
+ if e.rcv != nil {
+ scale = e.rcv.RcvWndScale
+ }
+ if rcvBufSz>>scale == 0 {
+ rcvBufSz = 1 << scale
+ }
+
+ availBefore := wndFromSpace(e.receiveBufferAvailableLocked(int(oldSz)))
+ availAfter := wndFromSpace(e.receiveBufferAvailableLocked(int(rcvBufSz)))
+ e.rcvQueueInfo.RcvAutoParams.Disabled = true
+
+ // Immediately send an ACK to uncork the sender silly window
+ // syndrome prevetion, when our available space grows above aMSS
+ // or half receive buffer, whichever smaller.
+ if crossed, above := e.windowCrossedACKThresholdLocked(availAfter-availBefore, int(rcvBufSz)); crossed && above {
+ e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
+ }
+
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
+ e.UnlockUser()
+ return rcvBufSz
+}
+
// SetSockOptInt sets a socket option.
func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error {
// Lower 2 bits represents ECN bits. RFC 3168, section 23.1
@@ -1704,56 +1736,6 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error {
return &tcpip.ErrNotSupported{}
}
- case tcpip.ReceiveBufferSizeOption:
- // Make sure the receive buffer size is within the min and max
- // allowed.
- var rs tcpip.TCPReceiveBufferSizeRangeOption
- if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err != nil {
- panic(fmt.Sprintf("e.stack.TransportProtocolOption(%d, %#v) = %s", ProtocolNumber, &rs, err))
- }
-
- if v > rs.Max {
- v = rs.Max
- }
-
- if v < math.MaxInt32/SegOverheadFactor {
- v *= SegOverheadFactor
- if v < rs.Min {
- v = rs.Min
- }
- } else {
- v = math.MaxInt32
- }
-
- e.LockUser()
- e.rcvQueueInfo.rcvQueueMu.Lock()
-
- // Make sure the receive buffer size allows us to send a
- // non-zero window size.
- scale := uint8(0)
- if e.rcv != nil {
- scale = e.rcv.RcvWndScale
- }
- if v>>scale == 0 {
- v = 1 << scale
- }
-
- availBefore := wndFromSpace(e.receiveBufferAvailableLocked())
- e.rcvQueueInfo.RcvBufSize = v
- availAfter := wndFromSpace(e.receiveBufferAvailableLocked())
-
- e.rcvQueueInfo.RcvAutoParams.Disabled = true
-
- // Immediately send an ACK to uncork the sender silly window
- // syndrome prevetion, when our available space grows above aMSS
- // or half receive buffer, whichever smaller.
- if crossed, above := e.windowCrossedACKThresholdLocked(availAfter - availBefore); crossed && above {
- e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
- }
-
- e.rcvQueueInfo.rcvQueueMu.Unlock()
- e.UnlockUser()
-
case tcpip.TTLOption:
e.LockUser()
e.ttl = uint8(v)
@@ -1939,12 +1921,6 @@ func (e *endpoint) GetSockOptInt(opt tcpip.SockOptInt) (int, tcpip.Error) {
case tcpip.ReceiveQueueSizeOption:
return e.readyReceiveSize()
- case tcpip.ReceiveBufferSizeOption:
- e.rcvQueueInfo.rcvQueueMu.Lock()
- v := e.rcvQueueInfo.RcvBufSize
- e.rcvQueueInfo.rcvQueueMu.Unlock()
- return v, nil
-
case tcpip.TTLOption:
e.LockUser()
v := int(e.ttl)
@@ -2780,15 +2756,15 @@ func (e *endpoint) readyToRead(s *segment) {
// receiveBufferAvailableLocked calculates how many bytes are still available
// in the receive buffer.
// rcvQueueMu must be held when this function is called.
-func (e *endpoint) receiveBufferAvailableLocked() int {
+func (e *endpoint) receiveBufferAvailableLocked(rcvBufSize int) int {
// We may use more bytes than the buffer size when the receive buffer
// shrinks.
memUsed := e.receiveMemUsed()
- if memUsed >= e.rcvQueueInfo.RcvBufSize {
+ if memUsed >= rcvBufSize {
return 0
}
- return e.rcvQueueInfo.RcvBufSize - memUsed
+ return rcvBufSize - memUsed
}
// receiveBufferAvailable calculates how many bytes are still available in the
@@ -2796,7 +2772,7 @@ func (e *endpoint) receiveBufferAvailableLocked() int {
// receive buffer/pending and segment queue.
func (e *endpoint) receiveBufferAvailable() int {
e.rcvQueueInfo.rcvQueueMu.Lock()
- available := e.receiveBufferAvailableLocked()
+ available := e.receiveBufferAvailableLocked(int(e.ops.GetReceiveBufferSize()))
e.rcvQueueInfo.rcvQueueMu.Unlock()
return available
}
@@ -2809,14 +2785,6 @@ func (e *endpoint) receiveBufferUsed() int {
return used
}
-// receiveBufferSize returns the current size of the receive buffer.
-func (e *endpoint) receiveBufferSize() int {
- e.rcvQueueInfo.rcvQueueMu.Lock()
- size := e.rcvQueueInfo.RcvBufSize
- e.rcvQueueInfo.rcvQueueMu.Unlock()
- return size
-}
-
// receiveMemUsed returns the total memory in use by segments held by this
// endpoint.
func (e *endpoint) receiveMemUsed() int {
@@ -2845,7 +2813,7 @@ func (e *endpoint) maxReceiveBufferSize() int {
// receiveBuffer otherwise we use the max permissible receive buffer size to
// compute the scale.
func (e *endpoint) rcvWndScaleForHandshake() int {
- bufSizeForScale := e.receiveBufferSize()
+ bufSizeForScale := e.ops.GetReceiveBufferSize()
e.rcvQueueInfo.rcvQueueMu.Lock()
autoTuningDisabled := e.rcvQueueInfo.RcvAutoParams.Disabled
@@ -3074,3 +3042,17 @@ func (e *endpoint) allowOutOfWindowAck() bool {
e.lastOutOfWindowAckTime = now
return true
}
+
+// GetTCPReceiveBufferLimits is used to get send buffer size limits for TCP.
+func GetTCPReceiveBufferLimits(s tcpip.StackHandler) tcpip.ReceiveBufferSizeOption {
+ var ss tcpip.TCPReceiveBufferSizeRangeOption
+ if err := s.TransportProtocolOption(header.TCPProtocolNumber, &ss); err != nil {
+ panic(fmt.Sprintf("s.TransportProtocolOption(%d, %#v) = %s", header.TCPProtocolNumber, ss, err))
+ }
+
+ return tcpip.ReceiveBufferSizeOption{
+ Min: ss.Min,
+ Default: ss.Default,
+ Max: ss.Max,
+ }
+}
diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go
index 034eacd72..6e9777fe4 100644
--- a/pkg/tcpip/transport/tcp/endpoint_state.go
+++ b/pkg/tcpip/transport/tcp/endpoint_state.go
@@ -165,7 +165,7 @@ func (e *endpoint) afterLoad() {
// Resume implements tcpip.ResumableEndpoint.Resume.
func (e *endpoint) Resume(s *stack.Stack) {
e.stack = s
- e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits)
+ e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits, GetTCPReceiveBufferLimits)
e.segmentQueue.thaw()
epState := e.origEndpointState
switch epState {
@@ -180,8 +180,8 @@ func (e *endpoint) Resume(s *stack.Stack) {
var rs tcpip.TCPReceiveBufferSizeRangeOption
if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err == nil {
- if e.rcvQueueInfo.RcvBufSize < rs.Min || e.rcvQueueInfo.RcvBufSize > rs.Max {
- panic(fmt.Sprintf("endpoint.rcvQueueInfo.RcvBufSize %d is outside the min and max allowed [%d, %d]", e.rcvQueueInfo.RcvBufSize, rs.Min, rs.Max))
+ if rcvBufSize := e.ops.GetReceiveBufferSize(); rcvBufSize < int64(rs.Min) || rcvBufSize > int64(rs.Max) {
+ panic(fmt.Sprintf("endpoint rcvBufSize %d is outside the min and max allowed [%d, %d]", rcvBufSize, rs.Min, rs.Max))
}
}
}
diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go
index fc11b4ba9..ee2c08cd6 100644
--- a/pkg/tcpip/transport/tcp/rcv.go
+++ b/pkg/tcpip/transport/tcp/rcv.go
@@ -466,7 +466,7 @@ func (r *receiver) handleRcvdSegment(s *segment) (drop bool, err tcpip.Error) {
// segments. This ensures that we always leave some space for the inorder
// segments to arrive allowing pending segments to be processed and
// delivered to the user.
- if r.ep.receiveBufferAvailable() > 0 && r.PendingBufUsed < r.ep.receiveBufferSize()>>2 {
+ if rcvBufSize := r.ep.ops.GetReceiveBufferSize(); rcvBufSize > 0 && r.PendingBufUsed < int(rcvBufSize)>>2 {
r.ep.rcvQueueInfo.rcvQueueMu.Lock()
r.PendingBufUsed += s.segMemSize()
r.ep.rcvQueueInfo.rcvQueueMu.Unlock()
diff --git a/pkg/tcpip/transport/tcp/segment_queue.go b/pkg/tcpip/transport/tcp/segment_queue.go
index 54545a1b1..d0d1b0b8a 100644
--- a/pkg/tcpip/transport/tcp/segment_queue.go
+++ b/pkg/tcpip/transport/tcp/segment_queue.go
@@ -52,12 +52,12 @@ func (q *segmentQueue) empty() bool {
func (q *segmentQueue) enqueue(s *segment) bool {
// q.ep.receiveBufferParams() must be called without holding q.mu to
// avoid lock order inversion.
- bufSz := q.ep.receiveBufferSize()
+ bufSz := q.ep.ops.GetReceiveBufferSize()
used := q.ep.receiveMemUsed()
q.mu.Lock()
// Allow zero sized segments (ACK/FIN/RSTs etc even if the segment queue
// is currently full).
- allow := (used <= bufSz || s.payloadSize() == 0) && !q.frozen
+ allow := (used <= int(bufSz) || s.payloadSize() == 0) && !q.frozen
if allow {
q.list.PushBack(s)