summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/endpoint.go
diff options
context:
space:
mode:
authorgVisor bot <gvisor-bot@google.com>2021-04-19 23:52:48 +0000
committergVisor bot <gvisor-bot@google.com>2021-04-19 23:52:48 +0000
commit971b7fe85715e357907cdb576af52c48a22eaa99 (patch)
treea0fe1d30b267f863a44078adfcd3084d460f67d7 /pkg/tcpip/transport/tcp/endpoint.go
parentae3938ad39944b0d67ba0a6aba8efad9307b9868 (diff)
parent20b1c3c632277bd64eac4d0442bda9695f184fc9 (diff)
Merge release-20210412.0-34-g20b1c3c63 (automated)
Diffstat (limited to 'pkg/tcpip/transport/tcp/endpoint.go')
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go648
1 files changed, 255 insertions, 393 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index bc88e48e9..884332828 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -191,42 +191,6 @@ type SACKInfo struct {
NumBlocks int
}
-// rcvBufAutoTuneParams are used to hold state variables to compute
-// the auto tuned recv buffer size.
-//
-// +stateify savable
-type rcvBufAutoTuneParams struct {
- // measureTime is the time at which the current measurement
- // was started.
- measureTime time.Time `state:".(unixTime)"`
-
- // copied is the number of bytes copied out of the receive
- // buffers since this measure began.
- copied int
-
- // prevCopied is the number of bytes copied out of the receive
- // buffers in the previous RTT period.
- prevCopied int
-
- // rtt is the non-smoothed minimum RTT as measured by observing the time
- // between when a byte is first acknowledged and the receipt of data
- // that is at least one window beyond the sequence number that was
- // acknowledged.
- rtt time.Duration
-
- // rttMeasureSeqNumber is the highest acceptable sequence number at the
- // time this RTT measurement period began.
- rttMeasureSeqNumber seqnum.Value
-
- // rttMeasureTime is the absolute time at which the current rtt
- // measurement period began.
- rttMeasureTime time.Time `state:".(unixTime)"`
-
- // disabled is true if an explicit receive buffer is set for the
- // endpoint.
- disabled bool
-}
-
// ReceiveErrors collect segment receive errors within transport layer.
type ReceiveErrors struct {
tcpip.ReceiveErrors
@@ -247,7 +211,7 @@ type ReceiveErrors struct {
ListenOverflowAckDrop tcpip.StatCounter
// ZeroRcvWindowState is the number of times we advertised
- // a zero receive window when rcvList is full.
+ // a zero receive window when rcvQueue is full.
ZeroRcvWindowState tcpip.StatCounter
// WantZeroWindow is the number of times we wanted to advertise a
@@ -310,18 +274,36 @@ type Stats struct {
// marker interface.
func (*Stats) IsEndpointStats() {}
-// EndpointInfo holds useful information about a transport endpoint which
-// can be queried by monitoring tools. This exists to allow tcp-only state to
-// be exposed.
+// sndQueueInfo implements a send queue.
//
// +stateify savable
-type EndpointInfo struct {
- stack.TransportEndpointInfo
+type sndQueueInfo struct {
+ sndQueueMu sync.Mutex `state:"nosave"`
+ stack.TCPSndBufState
+
+ // sndQueue holds segments that are ready to be sent.
+ sndQueue segmentList `state:"wait"`
+
+ // sndWaker is used to signal the protocol goroutine when segments are
+ // added to the `sndQueue`.
+ sndWaker sleep.Waker `state:"manual"`
+
+ // sndCloseWaker is used to notify the protocol goroutine when the send
+ // side is closed.
+ sndCloseWaker sleep.Waker `state:"manual"`
}
-// IsEndpointInfo is an empty method to implement the tcpip.EndpointInfo
-// marker interface.
-func (*EndpointInfo) IsEndpointInfo() {}
+// rcvQueueInfo contains the endpoint's rcvQueue and associated metadata.
+//
+// +stateify savable
+type rcvQueueInfo struct {
+ rcvQueueMu sync.Mutex `state:"nosave"`
+ stack.TCPRcvBufState
+
+ // rcvQueue is the queue for ready-for-delivery segments. This struct's
+ // mutex must be held in order append segments to list.
+ rcvQueue segmentList `state:"wait"`
+}
// +stateify savable
type accepted struct {
@@ -348,8 +330,8 @@ type accepted struct {
// acquired with e.mu then e.mu must be acquired first.
//
// e.acceptMu -> protects accepted.
-// e.rcvListMu -> Protects the rcvList and associated fields.
-// e.sndBufMu -> Protects the sndQueue and associated fields.
+// e.rcvQueueMu -> Protects e.rcvQueue and associated fields.
+// e.sndQueueMu -> Protects the e.sndQueue and associated fields.
// e.lastErrorMu -> Protects the lastError field.
//
// LOCKING/UNLOCKING of the endpoint. The locking of an endpoint is different
@@ -372,7 +354,8 @@ type accepted struct {
//
// +stateify savable
type endpoint struct {
- EndpointInfo
+ stack.TCPEndpointStateInner
+ stack.TransportEndpointInfo
tcpip.DefaultSocketOptionsHandler
// endpointEntry is used to queue endpoints for processing to the
@@ -405,38 +388,23 @@ type endpoint struct {
// rcvReadMu synchronizes calls to Read.
//
- // mu and rcvListMu are temporarily released during data copying. rcvReadMu
+ // mu and rcvQueueMu are temporarily released during data copying. rcvReadMu
// must be held during each read to ensure atomicity, so that multiple reads
// do not interleave.
//
// rcvReadMu should be held before holding mu.
rcvReadMu sync.Mutex `state:"nosave"`
- // rcvListMu synchronizes access to rcvList.
- //
- // rcvListMu can be taken after the endpoint mu below.
- rcvListMu sync.Mutex `state:"nosave"`
-
- // rcvList is the queue for ready-for-delivery segments.
- //
- // rcvReadMu, mu and rcvListMu must be held, in the stated order, to read data
- // and removing segments from list. A range of segment can be determined, then
- // temporarily release mu and rcvListMu while processing the segment range.
- // This allows new segments to be appended to the list while processing.
- //
- // rcvListMu must be held to append segments to list.
- rcvList segmentList `state:"wait"`
- rcvClosed bool
- // rcvBufSize is the total size of the receive buffer.
- rcvBufSize int
- // rcvBufUsed is the actual number of payload bytes held in the receive buffer
- // not counting any overheads of the segments itself. NOTE: This will always
- // be strictly <= rcvMemUsed below.
- rcvBufUsed int
- rcvAutoParams rcvBufAutoTuneParams
+ // rcvQueueInfo holds the implementation of the endpoint's receive buffer.
+ // The data within rcvQueueInfo should only be accessed while rcvReadMu, mu,
+ // and rcvQueueMu are held, in that stated order. While processing the segment
+ // range, you can determine a range and then temporarily release mu and
+ // rcvQueueMu, which allows new segments to be appended to the queue while
+ // processing.
+ rcvQueueInfo rcvQueueInfo
// rcvMemUsed tracks the total amount of memory in use by received segments
- // held in rcvList, pendingRcvdSegments and the segment queue. This is used to
+ // held in rcvQueue, pendingRcvdSegments and the segment queue. This is used to
// compute the window and the actual available buffer space. This is distinct
// from rcvBufUsed above which is the actual number of payload bytes held in
// the buffer not including any segment overheads.
@@ -498,33 +466,16 @@ type endpoint struct {
// also true, and they're both protected by the mutex.
workerCleanup bool
- // sendTSOk is used to indicate when the TS Option has been negotiated.
- // When sendTSOk is true every non-RST segment should carry a TS as per
- // RFC7323#section-1.1
- sendTSOk bool
-
- // recentTS is the timestamp that should be sent in the TSEcr field of
- // the timestamp for future segments sent by the endpoint. This field is
- // updated if required when a new segment is received by this endpoint.
- recentTS uint32
-
- // recentTSTime is the unix time when we updated recentTS last.
+ // recentTSTime is the unix time when we last updated
+ // TCPEndpointStateInner.RecentTS.
recentTSTime time.Time `state:".(unixTime)"`
- // tsOffset is a randomized offset added to the value of the
- // TSVal field in the timestamp option.
- tsOffset uint32
-
// shutdownFlags represent the current shutdown state of the endpoint.
shutdownFlags tcpip.ShutdownFlags
// tcpRecovery is the loss deteoction algorithm used by TCP.
tcpRecovery tcpip.TCPRecovery
- // sackPermitted is set to true if the peer sends the TCPSACKPermitted
- // option in the SYN/SYN-ACK.
- sackPermitted bool
-
// sack holds TCP SACK related information for this endpoint.
sack SACKInfo
@@ -560,32 +511,13 @@ type endpoint struct {
// this value.
windowClamp uint32
- // The following fields are used to manage the send buffer. When
- // segments are ready to be sent, they are added to sndQueue and the
- // protocol goroutine is signaled via sndWaker.
- //
- // When the send side is closed, the protocol goroutine is notified via
- // sndCloseWaker, and sndClosed is set to true.
- sndBufMu sync.Mutex `state:"nosave"`
- sndBufUsed int
- sndClosed bool
- sndBufInQueue seqnum.Size
- sndQueue segmentList `state:"wait"`
- sndWaker sleep.Waker `state:"manual"`
- sndCloseWaker sleep.Waker `state:"manual"`
+ // sndQueueInfo contains the implementation of the endpoint's send queue.
+ sndQueueInfo sndQueueInfo
// cc stores the name of the Congestion Control algorithm to use for
// this endpoint.
cc tcpip.CongestionControlOption
- // The following are used when a "packet too big" control packet is
- // received. They are protected by sndBufMu. They are used to
- // communicate to the main protocol goroutine how many such control
- // messages have been received since the last notification was processed
- // and what was the smallest MTU seen.
- packetTooBigCount int
- sndMTU int
-
// newSegmentWaker is used to indicate to the protocol goroutine that
// it needs to wake up and handle new segments queued to it.
newSegmentWaker sleep.Waker `state:"manual"`
@@ -782,7 +714,7 @@ func (e *endpoint) UnlockUser() {
switch e.EndpointState() {
case StateEstablished:
- if err := e.handleSegments(true /* fastPath */); err != nil {
+ if err := e.handleSegmentsLocked(true /* fastPath */); err != nil {
e.notifyProtocolGoroutine(notifyTickleWorker)
}
default:
@@ -842,13 +774,13 @@ func (e *endpoint) EndpointState() EndpointState {
// setRecentTimestamp sets the recentTS field to the provided value.
func (e *endpoint) setRecentTimestamp(recentTS uint32) {
- e.recentTS = recentTS
+ e.RecentTS = recentTS
e.recentTSTime = time.Now()
}
// recentTimestamp returns the value of the recentTS field.
func (e *endpoint) recentTimestamp() uint32 {
- return e.recentTS
+ return e.RecentTS
}
// keepalive is a synchronization wrapper used to appease stateify. See the
@@ -868,16 +800,17 @@ type keepalive struct {
func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQueue *waiter.Queue) *endpoint {
e := &endpoint{
stack: s,
- EndpointInfo: EndpointInfo{
- TransportEndpointInfo: stack.TransportEndpointInfo{
- NetProto: netProto,
- TransProto: header.TCPProtocolNumber,
+ TransportEndpointInfo: stack.TransportEndpointInfo{
+ NetProto: netProto,
+ TransProto: header.TCPProtocolNumber,
+ },
+ sndQueueInfo: sndQueueInfo{
+ TCPSndBufState: stack.TCPSndBufState{
+ SndMTU: int(math.MaxInt32),
},
},
waiterQueue: waiterQueue,
state: StateInitial,
- rcvBufSize: DefaultReceiveBufferSize,
- sndMTU: math.MaxInt32,
keepalive: keepalive{
// Linux defaults.
idle: 2 * time.Hour,
@@ -889,6 +822,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
windowClamp: DefaultReceiveBufferSize,
maxSynRetries: DefaultSynRetries,
}
+ e.rcvQueueInfo.RcvBufSize = DefaultReceiveBufferSize
e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits)
e.ops.SetMulticastLoop(true)
e.ops.SetQuickAck(true)
@@ -901,7 +835,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
var rs tcpip.TCPReceiveBufferSizeRangeOption
if err := s.TransportProtocolOption(ProtocolNumber, &rs); err == nil {
- e.rcvBufSize = rs.Default
+ e.rcvQueueInfo.RcvBufSize = rs.Default
}
var cs tcpip.CongestionControlOption
@@ -911,7 +845,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
var mrb tcpip.TCPModerateReceiveBufferOption
if err := s.TransportProtocolOption(ProtocolNumber, &mrb); err == nil {
- e.rcvAutoParams.disabled = !bool(mrb)
+ e.rcvQueueInfo.RcvAutoParams.Disabled = !bool(mrb)
}
var de tcpip.TCPDelayEnabled
@@ -936,7 +870,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
}
e.segmentQueue.ep = e
- e.tsOffset = timeStampOffset()
+ e.TSOffset = timeStampOffset()
e.acceptCond = sync.NewCond(&e.acceptMu)
e.keepalive.timer.init(&e.keepalive.waker)
@@ -974,21 +908,21 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
if e.EndpointState().connected() {
// Determine if the endpoint is writable if requested.
if (mask & waiter.WritableEvents) != 0 {
- e.sndBufMu.Lock()
+ e.sndQueueInfo.sndQueueMu.Lock()
sndBufSize := e.getSendBufferSize()
- if e.sndClosed || e.sndBufUsed < sndBufSize {
+ if e.sndQueueInfo.SndClosed || e.sndQueueInfo.SndBufUsed < sndBufSize {
result |= waiter.WritableEvents
}
- e.sndBufMu.Unlock()
+ e.sndQueueInfo.sndQueueMu.Unlock()
}
// Determine if the endpoint is readable if requested.
if (mask & waiter.ReadableEvents) != 0 {
- e.rcvListMu.Lock()
- if e.rcvBufUsed > 0 || e.rcvClosed {
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ if e.rcvQueueInfo.RcvBufUsed > 0 || e.rcvQueueInfo.RcvClosed {
result |= waiter.ReadableEvents
}
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
}
}
@@ -1096,15 +1030,15 @@ func (e *endpoint) closeNoShutdownLocked() {
// in Listen() when trying to register.
if e.EndpointState() == StateListen && e.isPortReserved {
if e.isRegistered {
- e.stack.StartTransportEndpointCleanup(e.effectiveNetProtos, ProtocolNumber, e.ID, e, e.boundPortFlags, e.boundBindToDevice)
+ e.stack.StartTransportEndpointCleanup(e.effectiveNetProtos, ProtocolNumber, e.TransportEndpointInfo.ID, e, e.boundPortFlags, e.boundBindToDevice)
e.isRegistered = false
}
portRes := ports.Reservation{
Networks: e.effectiveNetProtos,
Transport: ProtocolNumber,
- Addr: e.ID.LocalAddress,
- Port: e.ID.LocalPort,
+ Addr: e.TransportEndpointInfo.ID.LocalAddress,
+ Port: e.TransportEndpointInfo.ID.LocalPort,
Flags: e.boundPortFlags,
BindToDevice: e.boundBindToDevice,
Dest: e.boundDest,
@@ -1179,7 +1113,7 @@ func (e *endpoint) cleanupLocked() {
e.workerCleanup = false
if e.isRegistered {
- e.stack.StartTransportEndpointCleanup(e.effectiveNetProtos, ProtocolNumber, e.ID, e, e.boundPortFlags, e.boundBindToDevice)
+ e.stack.StartTransportEndpointCleanup(e.effectiveNetProtos, ProtocolNumber, e.TransportEndpointInfo.ID, e, e.boundPortFlags, e.boundBindToDevice)
e.isRegistered = false
}
@@ -1187,8 +1121,8 @@ func (e *endpoint) cleanupLocked() {
portRes := ports.Reservation{
Networks: e.effectiveNetProtos,
Transport: ProtocolNumber,
- Addr: e.ID.LocalAddress,
- Port: e.ID.LocalPort,
+ Addr: e.TransportEndpointInfo.ID.LocalAddress,
+ Port: e.TransportEndpointInfo.ID.LocalPort,
Flags: e.boundPortFlags,
BindToDevice: e.boundBindToDevice,
Dest: e.boundDest,
@@ -1250,19 +1184,19 @@ func (e *endpoint) ModerateRecvBuf(copied int) {
e.LockUser()
defer e.UnlockUser()
- e.rcvListMu.Lock()
- if e.rcvAutoParams.disabled {
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ if e.rcvQueueInfo.RcvAutoParams.Disabled {
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
return
}
now := time.Now()
- if rtt := e.rcvAutoParams.rtt; rtt == 0 || now.Sub(e.rcvAutoParams.measureTime) < rtt {
- e.rcvAutoParams.copied += copied
- e.rcvListMu.Unlock()
+ if rtt := e.rcvQueueInfo.RcvAutoParams.RTT; rtt == 0 || now.Sub(e.rcvQueueInfo.RcvAutoParams.MeasureTime) < rtt {
+ e.rcvQueueInfo.RcvAutoParams.CopiedBytes += copied
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
return
}
- prevRTTCopied := e.rcvAutoParams.copied + copied
- prevCopied := e.rcvAutoParams.prevCopied
+ prevRTTCopied := e.rcvQueueInfo.RcvAutoParams.CopiedBytes + copied
+ prevCopied := e.rcvQueueInfo.RcvAutoParams.PrevCopiedBytes
rcvWnd := 0
if prevRTTCopied > prevCopied {
// The minimal receive window based on what was copied by the app
@@ -1294,24 +1228,24 @@ func (e *endpoint) ModerateRecvBuf(copied int) {
// We do not adjust downwards as that can cause the receiver to
// reject valid data that might already be in flight as the
// acceptable window will shrink.
- if rcvWnd > e.rcvBufSize {
+ if rcvWnd > e.rcvQueueInfo.RcvBufSize {
availBefore := wndFromSpace(e.receiveBufferAvailableLocked())
- e.rcvBufSize = rcvWnd
+ e.rcvQueueInfo.RcvBufSize = rcvWnd
availAfter := wndFromSpace(e.receiveBufferAvailableLocked())
if crossed, above := e.windowCrossedACKThresholdLocked(availAfter - availBefore); crossed && above {
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
}
}
- // We only update prevCopied when we grow the buffer because in cases
- // where prevCopied > prevRTTCopied the existing buffer is already big
+ // We only update PrevCopiedBytes when we grow the buffer because in cases
+ // where PrevCopiedBytes > prevRTTCopied the existing buffer is already big
// enough to handle the current rate and we don't need to do any
// adjustments.
- e.rcvAutoParams.prevCopied = prevRTTCopied
+ e.rcvQueueInfo.RcvAutoParams.PrevCopiedBytes = prevRTTCopied
}
- e.rcvAutoParams.measureTime = now
- e.rcvAutoParams.copied = 0
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.RcvAutoParams.MeasureTime = now
+ e.rcvQueueInfo.RcvAutoParams.CopiedBytes = 0
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
}
// SetOwner implements tcpip.Endpoint.SetOwner.
@@ -1360,7 +1294,7 @@ func (e *endpoint) Read(dst io.Writer, opts tcpip.ReadOptions) (tcpip.ReadResult
defer e.rcvReadMu.Unlock()
// N.B. Here we get a range of segments to be processed. It is safe to not
- // hold rcvListMu when processing, since we hold rcvReadMu to ensure only we
+ // hold rcvQueueMu when processing, since we hold rcvReadMu to ensure only we
// can remove segments from the list through commitRead().
first, last, serr := e.startRead()
if serr != nil {
@@ -1432,10 +1366,10 @@ func (e *endpoint) startRead() (first, last *segment, err tcpip.Error) {
// but has some pending unread data. Also note that a RST being received
// would cause the state to become StateError so we should allow the
// reads to proceed before returning a ECONNRESET.
- e.rcvListMu.Lock()
- defer e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ defer e.rcvQueueInfo.rcvQueueMu.Unlock()
- bufUsed := e.rcvBufUsed
+ bufUsed := e.rcvQueueInfo.RcvBufUsed
if s := e.EndpointState(); !s.connected() && s != StateClose && bufUsed == 0 {
if s == StateError {
if err := e.hardErrorLocked(); err != nil {
@@ -1447,14 +1381,14 @@ func (e *endpoint) startRead() (first, last *segment, err tcpip.Error) {
return nil, nil, &tcpip.ErrNotConnected{}
}
- if e.rcvBufUsed == 0 {
- if e.rcvClosed || !e.EndpointState().connected() {
+ if e.rcvQueueInfo.RcvBufUsed == 0 {
+ if e.rcvQueueInfo.RcvClosed || !e.EndpointState().connected() {
return nil, nil, &tcpip.ErrClosedForReceive{}
}
return nil, nil, &tcpip.ErrWouldBlock{}
}
- return e.rcvList.Front(), e.rcvList.Back(), nil
+ return e.rcvQueueInfo.rcvQueue.Front(), e.rcvQueueInfo.rcvQueue.Back(), nil
}
// commitRead commits a read of done bytes and returns the next non-empty
@@ -1470,20 +1404,20 @@ func (e *endpoint) startRead() (first, last *segment, err tcpip.Error) {
func (e *endpoint) commitRead(done int) *segment {
e.LockUser()
defer e.UnlockUser()
- e.rcvListMu.Lock()
- defer e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ defer e.rcvQueueInfo.rcvQueueMu.Unlock()
memDelta := 0
- s := e.rcvList.Front()
+ s := e.rcvQueueInfo.rcvQueue.Front()
for s != nil && s.data.Size() == 0 {
- e.rcvList.Remove(s)
+ e.rcvQueueInfo.rcvQueue.Remove(s)
// Memory is only considered released when the whole segment has been
// read.
memDelta += s.segMemSize()
s.decRef()
- s = e.rcvList.Front()
+ s = e.rcvQueueInfo.rcvQueue.Front()
}
- e.rcvBufUsed -= done
+ e.rcvQueueInfo.RcvBufUsed -= done
if memDelta > 0 {
// If the window was small before this read and if the read freed up
@@ -1495,14 +1429,14 @@ func (e *endpoint) commitRead(done int) *segment {
}
}
- return e.rcvList.Front()
+ return e.rcvQueueInfo.rcvQueue.Front()
}
// isEndpointWritableLocked checks if a given endpoint is writable
// and also returns the number of bytes that can be written at this
// moment. If the endpoint is not writable then it returns an error
// indicating the reason why it's not writable.
-// Caller must hold e.mu and e.sndBufMu
+// Caller must hold e.mu and e.sndQueueMu
func (e *endpoint) isEndpointWritableLocked() (int, tcpip.Error) {
// The endpoint cannot be written to if it's not connected.
switch s := e.EndpointState(); {
@@ -1522,12 +1456,12 @@ func (e *endpoint) isEndpointWritableLocked() (int, tcpip.Error) {
}
// Check if the connection has already been closed for sends.
- if e.sndClosed {
+ if e.sndQueueInfo.SndClosed {
return 0, &tcpip.ErrClosedForSend{}
}
sndBufSize := e.getSendBufferSize()
- avail := sndBufSize - e.sndBufUsed
+ avail := sndBufSize - e.sndQueueInfo.SndBufUsed
if avail <= 0 {
return 0, &tcpip.ErrWouldBlock{}
}
@@ -1544,8 +1478,8 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp
defer e.UnlockUser()
nextSeg, n, err := func() (*segment, int, tcpip.Error) {
- e.sndBufMu.Lock()
- defer e.sndBufMu.Unlock()
+ e.sndQueueInfo.sndQueueMu.Lock()
+ defer e.sndQueueInfo.sndQueueMu.Unlock()
avail, err := e.isEndpointWritableLocked()
if err != nil {
@@ -1560,8 +1494,8 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp
// available buffer space to be consumed by some other caller while we
// are copying data in.
if !opts.Atomic {
- e.sndBufMu.Unlock()
- defer e.sndBufMu.Lock()
+ e.sndQueueInfo.sndQueueMu.Unlock()
+ defer e.sndQueueInfo.sndQueueMu.Lock()
e.UnlockUser()
defer e.LockUser()
@@ -1603,10 +1537,10 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp
}
// Add data to the send queue.
- s := newOutgoingSegment(e.ID, v)
- e.sndBufUsed += len(v)
- e.sndBufInQueue += seqnum.Size(len(v))
- e.sndQueue.PushBack(s)
+ s := newOutgoingSegment(e.TransportEndpointInfo.ID, v)
+ e.sndQueueInfo.SndBufUsed += len(v)
+ e.sndQueueInfo.SndBufInQueue += seqnum.Size(len(v))
+ e.sndQueueInfo.sndQueue.PushBack(s)
return e.drainSendQueueLocked(), len(v), nil
}()
@@ -1621,11 +1555,11 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp
// selectWindowLocked returns the new window without checking for shrinking or scaling
// applied.
-// Precondition: e.mu and e.rcvListMu must be held.
+// Precondition: e.mu and e.rcvQueueMu must be held.
func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) {
wndFromAvailable := wndFromSpace(e.receiveBufferAvailableLocked())
- maxWindow := wndFromSpace(e.rcvBufSize)
- wndFromUsedBytes := maxWindow - e.rcvBufUsed
+ maxWindow := wndFromSpace(e.rcvQueueInfo.RcvBufSize)
+ wndFromUsedBytes := maxWindow - e.rcvQueueInfo.RcvBufUsed
// We take the lesser of the wndFromAvailable and wndFromUsedBytes because in
// cases where we receive a lot of small segments the segment overhead is a
@@ -1643,11 +1577,11 @@ func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) {
return seqnum.Size(newWnd)
}
-// selectWindow invokes selectWindowLocked after acquiring e.rcvListMu.
+// selectWindow invokes selectWindowLocked after acquiring e.rcvQueueMu.
func (e *endpoint) selectWindow() (wnd seqnum.Size) {
- e.rcvListMu.Lock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
wnd = e.selectWindowLocked()
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
return wnd
}
@@ -1665,7 +1599,7 @@ func (e *endpoint) selectWindow() (wnd seqnum.Size) {
// above will be true if the new window is >= ACK threshold and false
// otherwise.
//
-// Precondition: e.mu and e.rcvListMu must be held.
+// Precondition: e.mu and e.rcvQueueMu must be held.
func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed bool, above bool) {
newAvail := int(e.selectWindowLocked())
oldAvail := newAvail - deltaBefore
@@ -1676,7 +1610,7 @@ func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed boo
// rcvBufFraction is the inverse of the fraction of receive buffer size that
// is used to decide if the available buffer space is now above it.
const rcvBufFraction = 2
- if wndThreshold := wndFromSpace(e.rcvBufSize / rcvBufFraction); threshold > wndThreshold {
+ if wndThreshold := wndFromSpace(e.rcvQueueInfo.RcvBufSize / rcvBufFraction); threshold > wndThreshold {
threshold = wndThreshold
}
switch {
@@ -1711,7 +1645,7 @@ func (e *endpoint) OnKeepAliveSet(bool) {
func (e *endpoint) OnDelayOptionSet(v bool) {
if !v {
// Handle delayed data.
- e.sndWaker.Assert()
+ e.sndQueueInfo.sndWaker.Assert()
}
}
@@ -1719,7 +1653,7 @@ func (e *endpoint) OnDelayOptionSet(v bool) {
func (e *endpoint) OnCorkOptionSet(v bool) {
if !v {
// Handle the corked data.
- e.sndWaker.Assert()
+ e.sndQueueInfo.sndWaker.Assert()
}
}
@@ -1792,23 +1726,23 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error {
}
e.LockUser()
- e.rcvListMu.Lock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
// Make sure the receive buffer size allows us to send a
// non-zero window size.
scale := uint8(0)
if e.rcv != nil {
- scale = e.rcv.rcvWndScale
+ scale = e.rcv.RcvWndScale
}
if v>>scale == 0 {
v = 1 << scale
}
availBefore := wndFromSpace(e.receiveBufferAvailableLocked())
- e.rcvBufSize = v
+ e.rcvQueueInfo.RcvBufSize = v
availAfter := wndFromSpace(e.receiveBufferAvailableLocked())
- e.rcvAutoParams.disabled = true
+ e.rcvQueueInfo.RcvAutoParams.Disabled = true
// Immediately send an ACK to uncork the sender silly window
// syndrome prevetion, when our available space grows above aMSS
@@ -1817,7 +1751,7 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error {
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
}
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
e.UnlockUser()
case tcpip.TTLOption:
@@ -1962,10 +1896,10 @@ func (e *endpoint) readyReceiveSize() (int, tcpip.Error) {
return 0, &tcpip.ErrInvalidEndpointState{}
}
- e.rcvListMu.Lock()
- defer e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ defer e.rcvQueueInfo.rcvQueueMu.Unlock()
- return e.rcvBufUsed, nil
+ return e.rcvQueueInfo.RcvBufUsed, nil
}
// GetSockOptInt implements tcpip.Endpoint.GetSockOptInt.
@@ -2006,9 +1940,9 @@ func (e *endpoint) GetSockOptInt(opt tcpip.SockOptInt) (int, tcpip.Error) {
return e.readyReceiveSize()
case tcpip.ReceiveBufferSizeOption:
- e.rcvListMu.Lock()
- v := e.rcvBufSize
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ v := e.rcvQueueInfo.RcvBufSize
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
return v, nil
case tcpip.TTLOption:
@@ -2046,15 +1980,15 @@ func (e *endpoint) getTCPInfo() tcpip.TCPInfoOption {
// the connection did not send and receive data, then RTT will
// be zero.
snd.rtt.Lock()
- info.RTT = snd.rtt.srtt
- info.RTTVar = snd.rtt.rttvar
+ info.RTT = snd.rtt.TCPRTTState.SRTT
+ info.RTTVar = snd.rtt.TCPRTTState.RTTVar
snd.rtt.Unlock()
- info.RTO = snd.rto
+ info.RTO = snd.RTO
info.CcState = snd.state
- info.SndSsthresh = uint32(snd.sndSsthresh)
- info.SndCwnd = uint32(snd.sndCwnd)
- info.ReorderSeen = snd.rc.reorderSeen
+ info.SndSsthresh = uint32(snd.Ssthresh)
+ info.SndCwnd = uint32(snd.SndCwnd)
+ info.ReorderSeen = snd.rc.Reord
}
e.UnlockUser()
return info
@@ -2099,7 +2033,7 @@ func (e *endpoint) GetSockOpt(opt tcpip.GettableSocketOption) tcpip.Error {
case *tcpip.OriginalDestinationOption:
e.LockUser()
ipt := e.stack.IPTables()
- addr, port, err := ipt.OriginalDst(e.ID, e.NetProto)
+ addr, port, err := ipt.OriginalDst(e.TransportEndpointInfo.ID, e.NetProto)
e.UnlockUser()
if err != nil {
return err
@@ -2207,20 +2141,20 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
}
// Find a route to the desired destination.
- r, err := e.stack.FindRoute(nicID, e.ID.LocalAddress, addr.Addr, netProto, false /* multicastLoop */)
+ r, err := e.stack.FindRoute(nicID, e.TransportEndpointInfo.ID.LocalAddress, addr.Addr, netProto, false /* multicastLoop */)
if err != nil {
return err
}
defer r.Release()
netProtos := []tcpip.NetworkProtocolNumber{netProto}
- e.ID.LocalAddress = r.LocalAddress()
- e.ID.RemoteAddress = r.RemoteAddress()
- e.ID.RemotePort = addr.Port
+ e.TransportEndpointInfo.ID.LocalAddress = r.LocalAddress()
+ e.TransportEndpointInfo.ID.RemoteAddress = r.RemoteAddress()
+ e.TransportEndpointInfo.ID.RemotePort = addr.Port
- if e.ID.LocalPort != 0 {
+ if e.TransportEndpointInfo.ID.LocalPort != 0 {
// The endpoint is bound to a port, attempt to register it.
- err := e.stack.RegisterTransportEndpoint(netProtos, ProtocolNumber, e.ID, e, e.boundPortFlags, e.boundBindToDevice)
+ err := e.stack.RegisterTransportEndpoint(netProtos, ProtocolNumber, e.TransportEndpointInfo.ID, e, e.boundPortFlags, e.boundBindToDevice)
if err != nil {
return err
}
@@ -2229,7 +2163,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
// one. Make sure that it isn't one that will result in the same
// address/port for both local and remote (otherwise this
// endpoint would be trying to connect to itself).
- sameAddr := e.ID.LocalAddress == e.ID.RemoteAddress
+ sameAddr := e.TransportEndpointInfo.ID.LocalAddress == e.TransportEndpointInfo.ID.RemoteAddress
// Calculate a port offset based on the destination IP/port and
// src IP to ensure that for a given tuple (srcIP, destIP,
@@ -2262,21 +2196,21 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
if twReuse == tcpip.TCPTimeWaitReuseLoopbackOnly {
switch netProto {
case header.IPv4ProtocolNumber:
- reuse = header.IsV4LoopbackAddress(e.ID.LocalAddress) && header.IsV4LoopbackAddress(e.ID.RemoteAddress)
+ reuse = header.IsV4LoopbackAddress(e.TransportEndpointInfo.ID.LocalAddress) && header.IsV4LoopbackAddress(e.TransportEndpointInfo.ID.RemoteAddress)
case header.IPv6ProtocolNumber:
- reuse = e.ID.LocalAddress == header.IPv6Loopback && e.ID.RemoteAddress == header.IPv6Loopback
+ reuse = e.TransportEndpointInfo.ID.LocalAddress == header.IPv6Loopback && e.TransportEndpointInfo.ID.RemoteAddress == header.IPv6Loopback
}
}
bindToDevice := tcpip.NICID(e.ops.GetBindToDevice())
if _, err := e.stack.PickEphemeralPortStable(portOffset, func(p uint16) (bool, tcpip.Error) {
- if sameAddr && p == e.ID.RemotePort {
+ if sameAddr && p == e.TransportEndpointInfo.ID.RemotePort {
return false, nil
}
portRes := ports.Reservation{
Networks: netProtos,
Transport: ProtocolNumber,
- Addr: e.ID.LocalAddress,
+ Addr: e.TransportEndpointInfo.ID.LocalAddress,
Port: p,
Flags: e.portFlags,
BindToDevice: bindToDevice,
@@ -2286,7 +2220,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
if _, ok := err.(*tcpip.ErrPortInUse); !ok || !reuse {
return false, nil
}
- transEPID := e.ID
+ transEPID := e.TransportEndpointInfo.ID
transEPID.LocalPort = p
// Check if an endpoint is registered with demuxer in TIME-WAIT and if
// we can reuse it. If we can't find a transport endpoint then we just
@@ -2323,7 +2257,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
portRes := ports.Reservation{
Networks: netProtos,
Transport: ProtocolNumber,
- Addr: e.ID.LocalAddress,
+ Addr: e.TransportEndpointInfo.ID.LocalAddress,
Port: p,
Flags: e.portFlags,
BindToDevice: bindToDevice,
@@ -2334,13 +2268,13 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
}
}
- id := e.ID
+ id := e.TransportEndpointInfo.ID
id.LocalPort = p
if err := e.stack.RegisterTransportEndpoint(netProtos, ProtocolNumber, id, e, e.portFlags, bindToDevice); err != nil {
portRes := ports.Reservation{
Networks: netProtos,
Transport: ProtocolNumber,
- Addr: e.ID.LocalAddress,
+ Addr: e.TransportEndpointInfo.ID.LocalAddress,
Port: p,
Flags: e.portFlags,
BindToDevice: bindToDevice,
@@ -2355,7 +2289,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
// Port picking successful. Save the details of
// the selected port.
- e.ID = id
+ e.TransportEndpointInfo.ID = id
e.isPortReserved = true
e.boundBindToDevice = bindToDevice
e.boundPortFlags = e.portFlags
@@ -2381,10 +2315,10 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
// connection setting here.
if !handshake {
e.segmentQueue.mu.Lock()
- for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} {
+ for _, l := range []segmentList{e.segmentQueue.list, e.sndQueueInfo.sndQueue, e.snd.writeList} {
for s := l.Front(); s != nil; s = s.Next() {
- s.id = e.ID
- e.sndWaker.Assert()
+ s.id = e.TransportEndpointInfo.ID
+ e.sndQueueInfo.sndWaker.Assert()
}
}
e.segmentQueue.mu.Unlock()
@@ -2426,10 +2360,10 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error {
// Close for read.
if e.shutdownFlags&tcpip.ShutdownRead != 0 {
// Mark read side as closed.
- e.rcvListMu.Lock()
- e.rcvClosed = true
- rcvBufUsed := e.rcvBufUsed
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ e.rcvQueueInfo.RcvClosed = true
+ rcvBufUsed := e.rcvQueueInfo.RcvBufUsed
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
// If we're fully closed and we have unread data we need to abort
// the connection with a RST.
@@ -2443,10 +2377,10 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error {
// Close for write.
if e.shutdownFlags&tcpip.ShutdownWrite != 0 {
- e.sndBufMu.Lock()
- if e.sndClosed {
+ e.sndQueueInfo.sndQueueMu.Lock()
+ if e.sndQueueInfo.SndClosed {
// Already closed.
- e.sndBufMu.Unlock()
+ e.sndQueueInfo.sndQueueMu.Unlock()
if e.EndpointState() == StateTimeWait {
return &tcpip.ErrNotConnected{}
}
@@ -2454,12 +2388,12 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error {
}
// Queue fin segment.
- s := newOutgoingSegment(e.ID, nil)
- e.sndQueue.PushBack(s)
- e.sndBufInQueue++
+ s := newOutgoingSegment(e.TransportEndpointInfo.ID, nil)
+ e.sndQueueInfo.sndQueue.PushBack(s)
+ e.sndQueueInfo.SndBufInQueue++
// Mark endpoint as closed.
- e.sndClosed = true
- e.sndBufMu.Unlock()
+ e.sndQueueInfo.SndClosed = true
+ e.sndQueueInfo.sndQueueMu.Unlock()
e.handleClose()
}
@@ -2472,9 +2406,9 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error {
//
// By not removing this endpoint from the demuxer mapping, we
// ensure that any other bind to the same port fails, as on Linux.
- e.rcvListMu.Lock()
- e.rcvClosed = true
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ e.rcvQueueInfo.RcvClosed = true
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
e.closePendingAcceptableConnectionsLocked()
// Notify waiters that the endpoint is shutdown.
e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr)
@@ -2513,9 +2447,9 @@ func (e *endpoint) listen(backlog int) tcpip.Error {
// listen is called after shutdown.
e.accepted.cap = backlog
e.shutdownFlags = 0
- e.rcvListMu.Lock()
- e.rcvClosed = false
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ e.rcvQueueInfo.RcvClosed = false
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
} else {
// Adjust the size of the backlog iff we can fit
// existing pending connections into the new one.
@@ -2548,7 +2482,7 @@ func (e *endpoint) listen(backlog int) tcpip.Error {
}
// Register the endpoint.
- if err := e.stack.RegisterTransportEndpoint(e.effectiveNetProtos, ProtocolNumber, e.ID, e, e.boundPortFlags, e.boundBindToDevice); err != nil {
+ if err := e.stack.RegisterTransportEndpoint(e.effectiveNetProtos, ProtocolNumber, e.TransportEndpointInfo.ID, e, e.boundPortFlags, e.boundBindToDevice); err != nil {
return err
}
@@ -2588,9 +2522,9 @@ func (e *endpoint) Accept(peerAddr *tcpip.FullAddress) (tcpip.Endpoint, *waiter.
e.LockUser()
defer e.UnlockUser()
- e.rcvListMu.Lock()
- rcvClosed := e.rcvClosed
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ rcvClosed := e.rcvQueueInfo.RcvClosed
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
// Endpoint must be in listen state before it can accept connections.
if rcvClosed || e.EndpointState() != StateListen {
return nil, nil, &tcpip.ErrInvalidEndpointState{}
@@ -2656,7 +2590,7 @@ func (e *endpoint) bindLocked(addr tcpip.FullAddress) (err tcpip.Error) {
if nic == 0 {
return &tcpip.ErrBadLocalAddress{}
}
- e.ID.LocalAddress = addr.Addr
+ e.TransportEndpointInfo.ID.LocalAddress = addr.Addr
}
bindToDevice := tcpip.NICID(e.ops.GetBindToDevice())
@@ -2670,7 +2604,7 @@ func (e *endpoint) bindLocked(addr tcpip.FullAddress) (err tcpip.Error) {
Dest: tcpip.FullAddress{},
}
port, err := e.stack.ReservePort(portRes, func(p uint16) (bool, tcpip.Error) {
- id := e.ID
+ id := e.TransportEndpointInfo.ID
id.LocalPort = p
// CheckRegisterTransportEndpoint should only return an error if there is a
// listening endpoint bound with the same id and portFlags and bindToDevice
@@ -2696,7 +2630,7 @@ func (e *endpoint) bindLocked(addr tcpip.FullAddress) (err tcpip.Error) {
e.boundNICID = nic
e.isPortReserved = true
e.effectiveNetProtos = netProtos
- e.ID.LocalPort = port
+ e.TransportEndpointInfo.ID.LocalPort = port
// Mark endpoint as bound.
e.setEndpointState(StateBound)
@@ -2710,8 +2644,8 @@ func (e *endpoint) GetLocalAddress() (tcpip.FullAddress, tcpip.Error) {
defer e.UnlockUser()
return tcpip.FullAddress{
- Addr: e.ID.LocalAddress,
- Port: e.ID.LocalPort,
+ Addr: e.TransportEndpointInfo.ID.LocalAddress,
+ Port: e.TransportEndpointInfo.ID.LocalPort,
NIC: e.boundNICID,
}, nil
}
@@ -2730,8 +2664,8 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) {
func (e *endpoint) getRemoteAddress() tcpip.FullAddress {
return tcpip.FullAddress{
- Addr: e.ID.RemoteAddress,
- Port: e.ID.RemotePort,
+ Addr: e.TransportEndpointInfo.ID.RemoteAddress,
+ Port: e.TransportEndpointInfo.ID.RemotePort,
NIC: e.boundNICID,
}
}
@@ -2770,13 +2704,13 @@ func (e *endpoint) onICMPError(err tcpip.Error, transErr stack.TransportError, p
Payload: pkt.Data().AsRange().ToOwnedView(),
Dst: tcpip.FullAddress{
NIC: pkt.NICID,
- Addr: e.ID.RemoteAddress,
- Port: e.ID.RemotePort,
+ Addr: e.TransportEndpointInfo.ID.RemoteAddress,
+ Port: e.TransportEndpointInfo.ID.RemotePort,
},
Offender: tcpip.FullAddress{
NIC: pkt.NICID,
- Addr: e.ID.LocalAddress,
- Port: e.ID.LocalPort,
+ Addr: e.TransportEndpointInfo.ID.LocalAddress,
+ Port: e.TransportEndpointInfo.ID.LocalPort,
},
NetProto: pkt.NetworkProtocolNumber,
})
@@ -2789,12 +2723,12 @@ func (e *endpoint) onICMPError(err tcpip.Error, transErr stack.TransportError, p
// HandleError implements stack.TransportEndpoint.
func (e *endpoint) HandleError(transErr stack.TransportError, pkt *stack.PacketBuffer) {
handlePacketTooBig := func(mtu uint32) {
- e.sndBufMu.Lock()
- e.packetTooBigCount++
- if v := int(mtu); v < e.sndMTU {
- e.sndMTU = v
+ e.sndQueueInfo.sndQueueMu.Lock()
+ e.sndQueueInfo.PacketTooBigCount++
+ if v := int(mtu); v < e.sndQueueInfo.SndMTU {
+ e.sndQueueInfo.SndMTU = v
}
- e.sndBufMu.Unlock()
+ e.sndQueueInfo.sndQueueMu.Unlock()
e.notifyProtocolGoroutine(notifyMTUChanged)
}
@@ -2813,14 +2747,14 @@ func (e *endpoint) HandleError(transErr stack.TransportError, pkt *stack.PacketB
// in the send buffer. The number of newly available bytes is v.
func (e *endpoint) updateSndBufferUsage(v int) {
sendBufferSize := e.getSendBufferSize()
- e.sndBufMu.Lock()
- notify := e.sndBufUsed >= sendBufferSize>>1
- e.sndBufUsed -= v
+ e.sndQueueInfo.sndQueueMu.Lock()
+ notify := e.sndQueueInfo.SndBufUsed >= sendBufferSize>>1
+ e.sndQueueInfo.SndBufUsed -= v
// We only notify when there is half the sendBufferSize available after
// a full buffer event occurs. This ensures that we don't wake up
// writers to queue just 1-2 segments and go back to sleep.
- notify = notify && e.sndBufUsed < sendBufferSize>>1
- e.sndBufMu.Unlock()
+ notify = notify && e.sndQueueInfo.SndBufUsed < int(sendBufferSize)>>1
+ e.sndQueueInfo.sndQueueMu.Unlock()
if notify {
e.waiterQueue.Notify(waiter.WritableEvents)
@@ -2831,55 +2765,55 @@ func (e *endpoint) updateSndBufferUsage(v int) {
// to be read, or when the connection is closed for receiving (in which case
// s will be nil).
func (e *endpoint) readyToRead(s *segment) {
- e.rcvListMu.Lock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
if s != nil {
- e.rcvBufUsed += s.payloadSize()
+ e.rcvQueueInfo.RcvBufUsed += s.payloadSize()
s.incRef()
- e.rcvList.PushBack(s)
+ e.rcvQueueInfo.rcvQueue.PushBack(s)
} else {
- e.rcvClosed = true
+ e.rcvQueueInfo.RcvClosed = true
}
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
e.waiterQueue.Notify(waiter.ReadableEvents)
}
// receiveBufferAvailableLocked calculates how many bytes are still available
// in the receive buffer.
-// rcvListMu must be held when this function is called.
+// rcvQueueMu must be held when this function is called.
func (e *endpoint) receiveBufferAvailableLocked() int {
// We may use more bytes than the buffer size when the receive buffer
// shrinks.
memUsed := e.receiveMemUsed()
- if memUsed >= e.rcvBufSize {
+ if memUsed >= e.rcvQueueInfo.RcvBufSize {
return 0
}
- return e.rcvBufSize - memUsed
+ return e.rcvQueueInfo.RcvBufSize - memUsed
}
// receiveBufferAvailable calculates how many bytes are still available in the
// receive buffer based on the actual memory used by all segments held in
// receive buffer/pending and segment queue.
func (e *endpoint) receiveBufferAvailable() int {
- e.rcvListMu.Lock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
available := e.receiveBufferAvailableLocked()
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
return available
}
// receiveBufferUsed returns the amount of in-use receive buffer.
func (e *endpoint) receiveBufferUsed() int {
- e.rcvListMu.Lock()
- used := e.rcvBufUsed
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ used := e.rcvQueueInfo.RcvBufUsed
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
return used
}
// receiveBufferSize returns the current size of the receive buffer.
func (e *endpoint) receiveBufferSize() int {
- e.rcvListMu.Lock()
- size := e.rcvBufSize
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ size := e.rcvQueueInfo.RcvBufSize
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
return size
}
@@ -2913,9 +2847,9 @@ func (e *endpoint) maxReceiveBufferSize() int {
func (e *endpoint) rcvWndScaleForHandshake() int {
bufSizeForScale := e.receiveBufferSize()
- e.rcvListMu.Lock()
- autoTuningDisabled := e.rcvAutoParams.disabled
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ autoTuningDisabled := e.rcvQueueInfo.RcvAutoParams.Disabled
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
if autoTuningDisabled {
return FindWndScale(seqnum.Size(bufSizeForScale))
}
@@ -2926,7 +2860,7 @@ func (e *endpoint) rcvWndScaleForHandshake() int {
// updateRecentTimestamp updates the recent timestamp using the algorithm
// described in https://tools.ietf.org/html/rfc7323#section-4.3
func (e *endpoint) updateRecentTimestamp(tsVal uint32, maxSentAck seqnum.Value, segSeq seqnum.Value) {
- if e.sendTSOk && seqnum.Value(e.recentTimestamp()).LessThan(seqnum.Value(tsVal)) && segSeq.LessThanEq(maxSentAck) {
+ if e.SendTSOk && seqnum.Value(e.recentTimestamp()).LessThan(seqnum.Value(tsVal)) && segSeq.LessThanEq(maxSentAck) {
e.setRecentTimestamp(tsVal)
}
}
@@ -2936,7 +2870,7 @@ func (e *endpoint) updateRecentTimestamp(tsVal uint32, maxSentAck seqnum.Value,
// initializes the recentTS with the value provided in synOpts.TSval.
func (e *endpoint) maybeEnableTimestamp(synOpts *header.TCPSynOptions) {
if synOpts.TS {
- e.sendTSOk = true
+ e.SendTSOk = true
e.setRecentTimestamp(synOpts.TSVal)
}
}
@@ -2944,7 +2878,7 @@ func (e *endpoint) maybeEnableTimestamp(synOpts *header.TCPSynOptions) {
// timestamp returns the timestamp value to be used in the TSVal field of the
// timestamp option for outgoing TCP segments for a given endpoint.
func (e *endpoint) timestamp() uint32 {
- return tcpTimeStamp(time.Now(), e.tsOffset)
+ return tcpTimeStamp(time.Now(), e.TSOffset)
}
// tcpTimeStamp returns a timestamp offset by the provided offset. This is
@@ -2983,7 +2917,7 @@ func (e *endpoint) maybeEnableSACKPermitted(synOpts *header.TCPSynOptions) {
return
}
if bool(v) && synOpts.SACKPermitted {
- e.sackPermitted = true
+ e.SACKPermitted = true
}
}
@@ -2997,118 +2931,46 @@ func (e *endpoint) maxOptionSize() (size int) {
return size
}
-// completeState makes a full copy of the endpoint and returns it. This is used
-// before invoking the probe. The state returned may not be fully consistent if
-// there are intervening syscalls when the state is being copied.
-func (e *endpoint) completeState() stack.TCPEndpointState {
- var s stack.TCPEndpointState
- s.SegTime = time.Now()
-
- // Copy EndpointID.
- s.ID = stack.TCPEndpointID(e.ID)
-
- // Copy endpoint rcv state.
- e.rcvListMu.Lock()
- s.RcvBufSize = e.rcvBufSize
- s.RcvBufUsed = e.rcvBufUsed
- s.RcvClosed = e.rcvClosed
- s.RcvAutoParams.MeasureTime = e.rcvAutoParams.measureTime
- s.RcvAutoParams.CopiedBytes = e.rcvAutoParams.copied
- s.RcvAutoParams.PrevCopiedBytes = e.rcvAutoParams.prevCopied
- s.RcvAutoParams.RTT = e.rcvAutoParams.rtt
- s.RcvAutoParams.RTTMeasureSeqNumber = e.rcvAutoParams.rttMeasureSeqNumber
- s.RcvAutoParams.RTTMeasureTime = e.rcvAutoParams.rttMeasureTime
- s.RcvAutoParams.Disabled = e.rcvAutoParams.disabled
- e.rcvListMu.Unlock()
-
- // Endpoint TCP Option state.
- s.SendTSOk = e.sendTSOk
- s.RecentTS = e.recentTimestamp()
- s.TSOffset = e.tsOffset
- s.SACKPermitted = e.sackPermitted
+// completeStateLocked makes a full copy of the endpoint and returns it. This is
+// used before invoking the probe.
+//
+// Precondition: e.mu must be held.
+func (e *endpoint) completeStateLocked() stack.TCPEndpointState {
+ s := stack.TCPEndpointState{
+ TCPEndpointStateInner: e.TCPEndpointStateInner,
+ ID: stack.TCPEndpointID(e.TransportEndpointInfo.ID),
+ SegTime: time.Now(),
+ Receiver: e.rcv.TCPReceiverState,
+ Sender: e.snd.TCPSenderState,
+ }
+
+ sndBufSize := e.getSendBufferSize()
+ // Copy the send buffer atomically.
+ e.sndQueueInfo.sndQueueMu.Lock()
+ s.SndBufState = e.sndQueueInfo.TCPSndBufState
+ s.SndBufState.SndBufSize = sndBufSize
+ e.sndQueueInfo.sndQueueMu.Unlock()
+
+ // Copy the receive buffer atomically.
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ s.RcvBufState = e.rcvQueueInfo.TCPRcvBufState
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
+
+ // Copy the endpoint TCP Option state.
s.SACK.Blocks = make([]header.SACKBlock, e.sack.NumBlocks)
copy(s.SACK.Blocks, e.sack.Blocks[:e.sack.NumBlocks])
s.SACK.ReceivedBlocks, s.SACK.MaxSACKED = e.scoreboard.Copy()
- // Copy endpoint send state.
- sndBufSize := e.getSendBufferSize()
- e.sndBufMu.Lock()
- s.SndBufSize = sndBufSize
- s.SndBufUsed = e.sndBufUsed
- s.SndClosed = e.sndClosed
- s.SndBufInQueue = e.sndBufInQueue
- s.PacketTooBigCount = e.packetTooBigCount
- s.SndMTU = e.sndMTU
- e.sndBufMu.Unlock()
-
- // Copy receiver state.
- s.Receiver = stack.TCPReceiverState{
- RcvNxt: e.rcv.rcvNxt,
- RcvAcc: e.rcv.rcvAcc,
- RcvWndScale: e.rcv.rcvWndScale,
- PendingBufUsed: e.rcv.pendingBufUsed,
- }
-
- // Copy sender state.
- s.Sender = stack.TCPSenderState{
- LastSendTime: e.snd.lastSendTime,
- DupAckCount: e.snd.dupAckCount,
- FastRecovery: stack.TCPFastRecoveryState{
- Active: e.snd.fr.active,
- First: e.snd.fr.first,
- Last: e.snd.fr.last,
- MaxCwnd: e.snd.fr.maxCwnd,
- HighRxt: e.snd.fr.highRxt,
- RescueRxt: e.snd.fr.rescueRxt,
- },
- SndCwnd: e.snd.sndCwnd,
- Ssthresh: e.snd.sndSsthresh,
- SndCAAckCount: e.snd.sndCAAckCount,
- Outstanding: e.snd.outstanding,
- SackedOut: e.snd.sackedOut,
- SndWnd: e.snd.sndWnd,
- SndUna: e.snd.sndUna,
- SndNxt: e.snd.sndNxt,
- RTTMeasureSeqNum: e.snd.rttMeasureSeqNum,
- RTTMeasureTime: e.snd.rttMeasureTime,
- Closed: e.snd.closed,
- RTO: e.snd.rto,
- MaxPayloadSize: e.snd.maxPayloadSize,
- SndWndScale: e.snd.sndWndScale,
- MaxSentAck: e.snd.maxSentAck,
- }
e.snd.rtt.Lock()
- s.Sender.SRTT = e.snd.rtt.srtt
- s.Sender.SRTTInited = e.snd.rtt.srttInited
+ s.Sender.RTTState = e.snd.rtt.TCPRTTState
e.snd.rtt.Unlock()
if cubic, ok := e.snd.cc.(*cubicState); ok {
- s.Sender.Cubic = stack.TCPCubicState{
- WMax: cubic.wMax,
- WLastMax: cubic.wLastMax,
- T: cubic.t,
- TimeSinceLastCongestion: time.Since(cubic.t),
- C: cubic.c,
- K: cubic.k,
- Beta: cubic.beta,
- WC: cubic.wC,
- WEst: cubic.wEst,
- }
- }
-
- rc := &e.snd.rc
- s.Sender.RACKState = stack.TCPRACKState{
- XmitTime: rc.xmitTime,
- EndSequence: rc.endSequence,
- FACK: rc.fack,
- RTT: rc.rtt,
- Reord: rc.reorderSeen,
- DSACKSeen: rc.dsackSeen,
- ReoWnd: rc.reoWnd,
- ReoWndIncr: rc.reoWndIncr,
- ReoWndPersist: rc.reoWndPersist,
- RTTSeq: rc.rttSeq,
+ s.Sender.Cubic = cubic.TCPCubicState
+ s.Sender.Cubic.TimeSinceLastCongestion = time.Since(s.Sender.Cubic.T)
}
+
+ s.Sender.RACKState = e.snd.rc.TCPRACKState
return s
}