diff options
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 41 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/rcv.go | 54 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_state_autogen.go | 23 |
4 files changed, 91 insertions, 33 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 189c01c8f..0aaef495d 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1219,12 +1219,6 @@ func (e *endpoint) handleSegment(s *segment) (cont bool, err *tcpip.Error) { return true, nil } - // Increase counter if after processing the segment we would potentially - // advertise a zero window. - if crossed, above := e.windowCrossedACKThresholdLocked(-s.segMemSize()); crossed && !above { - e.stats.ReceiveErrors.ZeroRcvWindowState.Increment() - } - // Now check if the received segment has caused us to transition // to a CLOSED state, if yes then terminate processing and do // not invoke the sender. diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index bc3b409ba..3bcd3923a 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -248,6 +248,11 @@ type ReceiveErrors struct { // ZeroRcvWindowState is the number of times we advertised // a zero receive window when rcvList is full. ZeroRcvWindowState tcpip.StatCounter + + // WantZeroWindow is the number of times we wanted to advertise a + // zero receive window but couldn't because it would have caused + // the receive window's right edge to shrink. + WantZeroRcvWindow tcpip.StatCounter } // SendErrors collect segment send errors within the transport layer. @@ -1162,7 +1167,7 @@ func (e *endpoint) cleanupLocked() { // wndFromSpace returns the window that we can advertise based on the available // receive buffer space. func wndFromSpace(space int) int { - return space / (1 << rcvAdvWndScale) + return space >> rcvAdvWndScale } // initialReceiveWindow returns the initial receive window to advertise in the @@ -1518,6 +1523,38 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro return num, tcpip.ControlMessages{}, nil } +// selectWindowLocked returns the new window without checking for shrinking or scaling +// applied. +// Precondition: e.mu and e.rcvListMu must be held. +func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) { + wndFromAvailable := wndFromSpace(e.receiveBufferAvailableLocked()) + maxWindow := wndFromSpace(e.rcvBufSize) + wndFromUsedBytes := maxWindow - e.rcvBufUsed + + // We take the lesser of the wndFromAvailable and wndFromUsedBytes because in + // cases where we receive a lot of small segments the segment overhead is a + // lot higher and we can run out socket buffer space before we can fill the + // previous window we advertised. In cases where we receive MSS sized or close + // MSS sized segments we will probably run out of window space before we + // exhaust receive buffer. + newWnd := wndFromAvailable + if newWnd > wndFromUsedBytes { + newWnd = wndFromUsedBytes + } + if newWnd < 0 { + newWnd = 0 + } + return seqnum.Size(newWnd) +} + +// selectWindow invokes selectWindowLocked after acquiring e.rcvListMu. +func (e *endpoint) selectWindow() (wnd seqnum.Size) { + e.rcvListMu.Lock() + wnd = e.selectWindowLocked() + e.rcvListMu.Unlock() + return wnd +} + // windowCrossedACKThresholdLocked checks if the receive window to be announced // would be under aMSS or under the window derived from half receive buffer, // whichever smaller. This is useful as a receive side silly window syndrome @@ -1534,7 +1571,7 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro // // Precondition: e.mu and e.rcvListMu must be held. func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed bool, above bool) { - newAvail := wndFromSpace(e.receiveBufferAvailableLocked()) + newAvail := int(e.selectWindowLocked()) oldAvail := newAvail - deltaBefore if oldAvail < 0 { oldAvail = 0 diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go index 48bf196d8..8e0b7c843 100644 --- a/pkg/tcpip/transport/tcp/rcv.go +++ b/pkg/tcpip/transport/tcp/rcv.go @@ -43,6 +43,9 @@ type receiver struct { // rcvWnd is the non-scaled receive window last advertised to the peer. rcvWnd seqnum.Size + // rcvWUP is the rcvNxt value at the last window update sent. + rcvWUP seqnum.Value + rcvWndScale uint8 closed bool @@ -64,6 +67,7 @@ func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale rcvNxt: irs + 1, rcvAcc: irs.Add(rcvWnd + 1), rcvWnd: rcvWnd, + rcvWUP: irs + 1, rcvWndScale: rcvWndScale, lastRcvdAckTime: time.Now(), } @@ -84,34 +88,54 @@ func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool { return header.Acceptable(segSeq, segLen, r.rcvNxt, r.rcvNxt.Add(advertisedWindowSize)) } +// currentWindow returns the available space in the window that was advertised +// last to our peer. +func (r *receiver) currentWindow() (curWnd seqnum.Size) { + endOfWnd := r.rcvWUP.Add(r.rcvWnd) + if endOfWnd.LessThan(r.rcvNxt) { + // return 0 if r.rcvNxt is past the end of the previously advertised window. + // This can happen because we accept a large segment completely even if + // accepting it causes it to partially exceed the advertised window. + return 0 + } + return r.rcvNxt.Size(endOfWnd) +} + // getSendParams returns the parameters needed by the sender when building // segments to send. func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { - avail := wndFromSpace(r.ep.receiveBufferAvailable()) - if avail == 0 { - // We have no space available to accept any data, move to zero window - // state. - r.rcvWnd = 0 - return r.rcvNxt, 0 - } - - acc := r.rcvNxt.Add(seqnum.Size(avail)) - newWnd := r.rcvNxt.Size(acc) - curWnd := r.rcvNxt.Size(r.rcvAcc) - + newWnd := r.ep.selectWindow() + curWnd := r.currentWindow() // Update rcvAcc only if new window is > previously advertised window. We // should never shrink the acceptable sequence space once it has been // advertised the peer. If we shrink the acceptable sequence space then we // would end up dropping bytes that might already be in flight. - if newWnd > curWnd { - r.rcvAcc = r.rcvNxt.Add(newWnd) + // ==================================================== sequence space. + // ^ ^ ^ ^ + // rcvWUP rcvNxt rcvAcc new rcvAcc + // <=====curWnd ===> + // <========= newWnd > curWnd ========= > + if r.rcvNxt.Add(seqnum.Size(curWnd)).LessThan(r.rcvNxt.Add(seqnum.Size(newWnd))) { + // If the new window moves the right edge, then update rcvAcc. + r.rcvAcc = r.rcvNxt.Add(seqnum.Size(newWnd)) } else { + if newWnd == 0 { + // newWnd is zero but we can't advertise a zero as it would cause window + // to shrink so just increment a metric to record this event. + r.ep.stats.ReceiveErrors.WantZeroRcvWindow.Increment() + } newWnd = curWnd } // Stash away the non-scaled receive window as we use it for measuring // receiver's estimated RTT. r.rcvWnd = newWnd - return r.rcvNxt, r.rcvWnd >> r.rcvWndScale + r.rcvWUP = r.rcvNxt + scaledWnd := r.rcvWnd >> r.rcvWndScale + if scaledWnd == 0 { + // Increment a metric if we are advertising an actual zero window. + r.ep.stats.ReceiveErrors.ZeroRcvWindowState.Increment() + } + return r.rcvNxt, scaledWnd } // nonZeroWindow is called when the receive window grows from zero to nonzero; diff --git a/pkg/tcpip/transport/tcp/tcp_state_autogen.go b/pkg/tcpip/transport/tcp/tcp_state_autogen.go index 5466ce660..ee1ac778d 100644 --- a/pkg/tcpip/transport/tcp/tcp_state_autogen.go +++ b/pkg/tcpip/transport/tcp/tcp_state_autogen.go @@ -454,6 +454,7 @@ func (r *receiver) StateFields() []string { "rcvNxt", "rcvAcc", "rcvWnd", + "rcvWUP", "rcvWndScale", "closed", "pendingRcvdSegments", @@ -467,15 +468,16 @@ func (r *receiver) beforeSave() {} func (r *receiver) StateSave(stateSinkObject state.Sink) { r.beforeSave() var lastRcvdAckTimeValue unixTime = r.saveLastRcvdAckTime() - stateSinkObject.SaveValue(8, lastRcvdAckTimeValue) + stateSinkObject.SaveValue(9, lastRcvdAckTimeValue) stateSinkObject.Save(0, &r.ep) stateSinkObject.Save(1, &r.rcvNxt) stateSinkObject.Save(2, &r.rcvAcc) stateSinkObject.Save(3, &r.rcvWnd) - stateSinkObject.Save(4, &r.rcvWndScale) - stateSinkObject.Save(5, &r.closed) - stateSinkObject.Save(6, &r.pendingRcvdSegments) - stateSinkObject.Save(7, &r.pendingBufUsed) + stateSinkObject.Save(4, &r.rcvWUP) + stateSinkObject.Save(5, &r.rcvWndScale) + stateSinkObject.Save(6, &r.closed) + stateSinkObject.Save(7, &r.pendingRcvdSegments) + stateSinkObject.Save(8, &r.pendingBufUsed) } func (r *receiver) afterLoad() {} @@ -485,11 +487,12 @@ func (r *receiver) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(1, &r.rcvNxt) stateSourceObject.Load(2, &r.rcvAcc) stateSourceObject.Load(3, &r.rcvWnd) - stateSourceObject.Load(4, &r.rcvWndScale) - stateSourceObject.Load(5, &r.closed) - stateSourceObject.Load(6, &r.pendingRcvdSegments) - stateSourceObject.Load(7, &r.pendingBufUsed) - stateSourceObject.LoadValue(8, new(unixTime), func(y interface{}) { r.loadLastRcvdAckTime(y.(unixTime)) }) + stateSourceObject.Load(4, &r.rcvWUP) + stateSourceObject.Load(5, &r.rcvWndScale) + stateSourceObject.Load(6, &r.closed) + stateSourceObject.Load(7, &r.pendingRcvdSegments) + stateSourceObject.Load(8, &r.pendingBufUsed) + stateSourceObject.LoadValue(9, new(unixTime), func(y interface{}) { r.loadLastRcvdAckTime(y.(unixTime)) }) } func (r *renoState) StateTypeName() string { |