diff options
author | gVisor bot <gvisor-bot@google.com> | 2021-04-19 23:52:48 +0000 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2021-04-19 23:52:48 +0000 |
commit | 971b7fe85715e357907cdb576af52c48a22eaa99 (patch) | |
tree | a0fe1d30b267f863a44078adfcd3084d460f67d7 /pkg/tcpip/transport/tcp/endpoint.go | |
parent | ae3938ad39944b0d67ba0a6aba8efad9307b9868 (diff) | |
parent | 20b1c3c632277bd64eac4d0442bda9695f184fc9 (diff) |
Merge release-20210412.0-34-g20b1c3c63 (automated)
Diffstat (limited to 'pkg/tcpip/transport/tcp/endpoint.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 648 |
1 files changed, 255 insertions, 393 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index bc88e48e9..884332828 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -191,42 +191,6 @@ type SACKInfo struct { NumBlocks int } -// rcvBufAutoTuneParams are used to hold state variables to compute -// the auto tuned recv buffer size. -// -// +stateify savable -type rcvBufAutoTuneParams struct { - // measureTime is the time at which the current measurement - // was started. - measureTime time.Time `state:".(unixTime)"` - - // copied is the number of bytes copied out of the receive - // buffers since this measure began. - copied int - - // prevCopied is the number of bytes copied out of the receive - // buffers in the previous RTT period. - prevCopied int - - // rtt is the non-smoothed minimum RTT as measured by observing the time - // between when a byte is first acknowledged and the receipt of data - // that is at least one window beyond the sequence number that was - // acknowledged. - rtt time.Duration - - // rttMeasureSeqNumber is the highest acceptable sequence number at the - // time this RTT measurement period began. - rttMeasureSeqNumber seqnum.Value - - // rttMeasureTime is the absolute time at which the current rtt - // measurement period began. - rttMeasureTime time.Time `state:".(unixTime)"` - - // disabled is true if an explicit receive buffer is set for the - // endpoint. - disabled bool -} - // ReceiveErrors collect segment receive errors within transport layer. type ReceiveErrors struct { tcpip.ReceiveErrors @@ -247,7 +211,7 @@ type ReceiveErrors struct { ListenOverflowAckDrop tcpip.StatCounter // ZeroRcvWindowState is the number of times we advertised - // a zero receive window when rcvList is full. + // a zero receive window when rcvQueue is full. ZeroRcvWindowState tcpip.StatCounter // WantZeroWindow is the number of times we wanted to advertise a @@ -310,18 +274,36 @@ type Stats struct { // marker interface. func (*Stats) IsEndpointStats() {} -// EndpointInfo holds useful information about a transport endpoint which -// can be queried by monitoring tools. This exists to allow tcp-only state to -// be exposed. +// sndQueueInfo implements a send queue. // // +stateify savable -type EndpointInfo struct { - stack.TransportEndpointInfo +type sndQueueInfo struct { + sndQueueMu sync.Mutex `state:"nosave"` + stack.TCPSndBufState + + // sndQueue holds segments that are ready to be sent. + sndQueue segmentList `state:"wait"` + + // sndWaker is used to signal the protocol goroutine when segments are + // added to the `sndQueue`. + sndWaker sleep.Waker `state:"manual"` + + // sndCloseWaker is used to notify the protocol goroutine when the send + // side is closed. + sndCloseWaker sleep.Waker `state:"manual"` } -// IsEndpointInfo is an empty method to implement the tcpip.EndpointInfo -// marker interface. -func (*EndpointInfo) IsEndpointInfo() {} +// rcvQueueInfo contains the endpoint's rcvQueue and associated metadata. +// +// +stateify savable +type rcvQueueInfo struct { + rcvQueueMu sync.Mutex `state:"nosave"` + stack.TCPRcvBufState + + // rcvQueue is the queue for ready-for-delivery segments. This struct's + // mutex must be held in order append segments to list. + rcvQueue segmentList `state:"wait"` +} // +stateify savable type accepted struct { @@ -348,8 +330,8 @@ type accepted struct { // acquired with e.mu then e.mu must be acquired first. // // e.acceptMu -> protects accepted. -// e.rcvListMu -> Protects the rcvList and associated fields. -// e.sndBufMu -> Protects the sndQueue and associated fields. +// e.rcvQueueMu -> Protects e.rcvQueue and associated fields. +// e.sndQueueMu -> Protects the e.sndQueue and associated fields. // e.lastErrorMu -> Protects the lastError field. // // LOCKING/UNLOCKING of the endpoint. The locking of an endpoint is different @@ -372,7 +354,8 @@ type accepted struct { // // +stateify savable type endpoint struct { - EndpointInfo + stack.TCPEndpointStateInner + stack.TransportEndpointInfo tcpip.DefaultSocketOptionsHandler // endpointEntry is used to queue endpoints for processing to the @@ -405,38 +388,23 @@ type endpoint struct { // rcvReadMu synchronizes calls to Read. // - // mu and rcvListMu are temporarily released during data copying. rcvReadMu + // mu and rcvQueueMu are temporarily released during data copying. rcvReadMu // must be held during each read to ensure atomicity, so that multiple reads // do not interleave. // // rcvReadMu should be held before holding mu. rcvReadMu sync.Mutex `state:"nosave"` - // rcvListMu synchronizes access to rcvList. - // - // rcvListMu can be taken after the endpoint mu below. - rcvListMu sync.Mutex `state:"nosave"` - - // rcvList is the queue for ready-for-delivery segments. - // - // rcvReadMu, mu and rcvListMu must be held, in the stated order, to read data - // and removing segments from list. A range of segment can be determined, then - // temporarily release mu and rcvListMu while processing the segment range. - // This allows new segments to be appended to the list while processing. - // - // rcvListMu must be held to append segments to list. - rcvList segmentList `state:"wait"` - rcvClosed bool - // rcvBufSize is the total size of the receive buffer. - rcvBufSize int - // rcvBufUsed is the actual number of payload bytes held in the receive buffer - // not counting any overheads of the segments itself. NOTE: This will always - // be strictly <= rcvMemUsed below. - rcvBufUsed int - rcvAutoParams rcvBufAutoTuneParams + // rcvQueueInfo holds the implementation of the endpoint's receive buffer. + // The data within rcvQueueInfo should only be accessed while rcvReadMu, mu, + // and rcvQueueMu are held, in that stated order. While processing the segment + // range, you can determine a range and then temporarily release mu and + // rcvQueueMu, which allows new segments to be appended to the queue while + // processing. + rcvQueueInfo rcvQueueInfo // rcvMemUsed tracks the total amount of memory in use by received segments - // held in rcvList, pendingRcvdSegments and the segment queue. This is used to + // held in rcvQueue, pendingRcvdSegments and the segment queue. This is used to // compute the window and the actual available buffer space. This is distinct // from rcvBufUsed above which is the actual number of payload bytes held in // the buffer not including any segment overheads. @@ -498,33 +466,16 @@ type endpoint struct { // also true, and they're both protected by the mutex. workerCleanup bool - // sendTSOk is used to indicate when the TS Option has been negotiated. - // When sendTSOk is true every non-RST segment should carry a TS as per - // RFC7323#section-1.1 - sendTSOk bool - - // recentTS is the timestamp that should be sent in the TSEcr field of - // the timestamp for future segments sent by the endpoint. This field is - // updated if required when a new segment is received by this endpoint. - recentTS uint32 - - // recentTSTime is the unix time when we updated recentTS last. + // recentTSTime is the unix time when we last updated + // TCPEndpointStateInner.RecentTS. recentTSTime time.Time `state:".(unixTime)"` - // tsOffset is a randomized offset added to the value of the - // TSVal field in the timestamp option. - tsOffset uint32 - // shutdownFlags represent the current shutdown state of the endpoint. shutdownFlags tcpip.ShutdownFlags // tcpRecovery is the loss deteoction algorithm used by TCP. tcpRecovery tcpip.TCPRecovery - // sackPermitted is set to true if the peer sends the TCPSACKPermitted - // option in the SYN/SYN-ACK. - sackPermitted bool - // sack holds TCP SACK related information for this endpoint. sack SACKInfo @@ -560,32 +511,13 @@ type endpoint struct { // this value. windowClamp uint32 - // The following fields are used to manage the send buffer. When - // segments are ready to be sent, they are added to sndQueue and the - // protocol goroutine is signaled via sndWaker. - // - // When the send side is closed, the protocol goroutine is notified via - // sndCloseWaker, and sndClosed is set to true. - sndBufMu sync.Mutex `state:"nosave"` - sndBufUsed int - sndClosed bool - sndBufInQueue seqnum.Size - sndQueue segmentList `state:"wait"` - sndWaker sleep.Waker `state:"manual"` - sndCloseWaker sleep.Waker `state:"manual"` + // sndQueueInfo contains the implementation of the endpoint's send queue. + sndQueueInfo sndQueueInfo // cc stores the name of the Congestion Control algorithm to use for // this endpoint. cc tcpip.CongestionControlOption - // The following are used when a "packet too big" control packet is - // received. They are protected by sndBufMu. They are used to - // communicate to the main protocol goroutine how many such control - // messages have been received since the last notification was processed - // and what was the smallest MTU seen. - packetTooBigCount int - sndMTU int - // newSegmentWaker is used to indicate to the protocol goroutine that // it needs to wake up and handle new segments queued to it. newSegmentWaker sleep.Waker `state:"manual"` @@ -782,7 +714,7 @@ func (e *endpoint) UnlockUser() { switch e.EndpointState() { case StateEstablished: - if err := e.handleSegments(true /* fastPath */); err != nil { + if err := e.handleSegmentsLocked(true /* fastPath */); err != nil { e.notifyProtocolGoroutine(notifyTickleWorker) } default: @@ -842,13 +774,13 @@ func (e *endpoint) EndpointState() EndpointState { // setRecentTimestamp sets the recentTS field to the provided value. func (e *endpoint) setRecentTimestamp(recentTS uint32) { - e.recentTS = recentTS + e.RecentTS = recentTS e.recentTSTime = time.Now() } // recentTimestamp returns the value of the recentTS field. func (e *endpoint) recentTimestamp() uint32 { - return e.recentTS + return e.RecentTS } // keepalive is a synchronization wrapper used to appease stateify. See the @@ -868,16 +800,17 @@ type keepalive struct { func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQueue *waiter.Queue) *endpoint { e := &endpoint{ stack: s, - EndpointInfo: EndpointInfo{ - TransportEndpointInfo: stack.TransportEndpointInfo{ - NetProto: netProto, - TransProto: header.TCPProtocolNumber, + TransportEndpointInfo: stack.TransportEndpointInfo{ + NetProto: netProto, + TransProto: header.TCPProtocolNumber, + }, + sndQueueInfo: sndQueueInfo{ + TCPSndBufState: stack.TCPSndBufState{ + SndMTU: int(math.MaxInt32), }, }, waiterQueue: waiterQueue, state: StateInitial, - rcvBufSize: DefaultReceiveBufferSize, - sndMTU: math.MaxInt32, keepalive: keepalive{ // Linux defaults. idle: 2 * time.Hour, @@ -889,6 +822,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue windowClamp: DefaultReceiveBufferSize, maxSynRetries: DefaultSynRetries, } + e.rcvQueueInfo.RcvBufSize = DefaultReceiveBufferSize e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits) e.ops.SetMulticastLoop(true) e.ops.SetQuickAck(true) @@ -901,7 +835,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue var rs tcpip.TCPReceiveBufferSizeRangeOption if err := s.TransportProtocolOption(ProtocolNumber, &rs); err == nil { - e.rcvBufSize = rs.Default + e.rcvQueueInfo.RcvBufSize = rs.Default } var cs tcpip.CongestionControlOption @@ -911,7 +845,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue var mrb tcpip.TCPModerateReceiveBufferOption if err := s.TransportProtocolOption(ProtocolNumber, &mrb); err == nil { - e.rcvAutoParams.disabled = !bool(mrb) + e.rcvQueueInfo.RcvAutoParams.Disabled = !bool(mrb) } var de tcpip.TCPDelayEnabled @@ -936,7 +870,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue } e.segmentQueue.ep = e - e.tsOffset = timeStampOffset() + e.TSOffset = timeStampOffset() e.acceptCond = sync.NewCond(&e.acceptMu) e.keepalive.timer.init(&e.keepalive.waker) @@ -974,21 +908,21 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { if e.EndpointState().connected() { // Determine if the endpoint is writable if requested. if (mask & waiter.WritableEvents) != 0 { - e.sndBufMu.Lock() + e.sndQueueInfo.sndQueueMu.Lock() sndBufSize := e.getSendBufferSize() - if e.sndClosed || e.sndBufUsed < sndBufSize { + if e.sndQueueInfo.SndClosed || e.sndQueueInfo.SndBufUsed < sndBufSize { result |= waiter.WritableEvents } - e.sndBufMu.Unlock() + e.sndQueueInfo.sndQueueMu.Unlock() } // Determine if the endpoint is readable if requested. if (mask & waiter.ReadableEvents) != 0 { - e.rcvListMu.Lock() - if e.rcvBufUsed > 0 || e.rcvClosed { + e.rcvQueueInfo.rcvQueueMu.Lock() + if e.rcvQueueInfo.RcvBufUsed > 0 || e.rcvQueueInfo.RcvClosed { result |= waiter.ReadableEvents } - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Unlock() } } @@ -1096,15 +1030,15 @@ func (e *endpoint) closeNoShutdownLocked() { // in Listen() when trying to register. if e.EndpointState() == StateListen && e.isPortReserved { if e.isRegistered { - e.stack.StartTransportEndpointCleanup(e.effectiveNetProtos, ProtocolNumber, e.ID, e, e.boundPortFlags, e.boundBindToDevice) + e.stack.StartTransportEndpointCleanup(e.effectiveNetProtos, ProtocolNumber, e.TransportEndpointInfo.ID, e, e.boundPortFlags, e.boundBindToDevice) e.isRegistered = false } portRes := ports.Reservation{ Networks: e.effectiveNetProtos, Transport: ProtocolNumber, - Addr: e.ID.LocalAddress, - Port: e.ID.LocalPort, + Addr: e.TransportEndpointInfo.ID.LocalAddress, + Port: e.TransportEndpointInfo.ID.LocalPort, Flags: e.boundPortFlags, BindToDevice: e.boundBindToDevice, Dest: e.boundDest, @@ -1179,7 +1113,7 @@ func (e *endpoint) cleanupLocked() { e.workerCleanup = false if e.isRegistered { - e.stack.StartTransportEndpointCleanup(e.effectiveNetProtos, ProtocolNumber, e.ID, e, e.boundPortFlags, e.boundBindToDevice) + e.stack.StartTransportEndpointCleanup(e.effectiveNetProtos, ProtocolNumber, e.TransportEndpointInfo.ID, e, e.boundPortFlags, e.boundBindToDevice) e.isRegistered = false } @@ -1187,8 +1121,8 @@ func (e *endpoint) cleanupLocked() { portRes := ports.Reservation{ Networks: e.effectiveNetProtos, Transport: ProtocolNumber, - Addr: e.ID.LocalAddress, - Port: e.ID.LocalPort, + Addr: e.TransportEndpointInfo.ID.LocalAddress, + Port: e.TransportEndpointInfo.ID.LocalPort, Flags: e.boundPortFlags, BindToDevice: e.boundBindToDevice, Dest: e.boundDest, @@ -1250,19 +1184,19 @@ func (e *endpoint) ModerateRecvBuf(copied int) { e.LockUser() defer e.UnlockUser() - e.rcvListMu.Lock() - if e.rcvAutoParams.disabled { - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + if e.rcvQueueInfo.RcvAutoParams.Disabled { + e.rcvQueueInfo.rcvQueueMu.Unlock() return } now := time.Now() - if rtt := e.rcvAutoParams.rtt; rtt == 0 || now.Sub(e.rcvAutoParams.measureTime) < rtt { - e.rcvAutoParams.copied += copied - e.rcvListMu.Unlock() + if rtt := e.rcvQueueInfo.RcvAutoParams.RTT; rtt == 0 || now.Sub(e.rcvQueueInfo.RcvAutoParams.MeasureTime) < rtt { + e.rcvQueueInfo.RcvAutoParams.CopiedBytes += copied + e.rcvQueueInfo.rcvQueueMu.Unlock() return } - prevRTTCopied := e.rcvAutoParams.copied + copied - prevCopied := e.rcvAutoParams.prevCopied + prevRTTCopied := e.rcvQueueInfo.RcvAutoParams.CopiedBytes + copied + prevCopied := e.rcvQueueInfo.RcvAutoParams.PrevCopiedBytes rcvWnd := 0 if prevRTTCopied > prevCopied { // The minimal receive window based on what was copied by the app @@ -1294,24 +1228,24 @@ func (e *endpoint) ModerateRecvBuf(copied int) { // We do not adjust downwards as that can cause the receiver to // reject valid data that might already be in flight as the // acceptable window will shrink. - if rcvWnd > e.rcvBufSize { + if rcvWnd > e.rcvQueueInfo.RcvBufSize { availBefore := wndFromSpace(e.receiveBufferAvailableLocked()) - e.rcvBufSize = rcvWnd + e.rcvQueueInfo.RcvBufSize = rcvWnd availAfter := wndFromSpace(e.receiveBufferAvailableLocked()) if crossed, above := e.windowCrossedACKThresholdLocked(availAfter - availBefore); crossed && above { e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) } } - // We only update prevCopied when we grow the buffer because in cases - // where prevCopied > prevRTTCopied the existing buffer is already big + // We only update PrevCopiedBytes when we grow the buffer because in cases + // where PrevCopiedBytes > prevRTTCopied the existing buffer is already big // enough to handle the current rate and we don't need to do any // adjustments. - e.rcvAutoParams.prevCopied = prevRTTCopied + e.rcvQueueInfo.RcvAutoParams.PrevCopiedBytes = prevRTTCopied } - e.rcvAutoParams.measureTime = now - e.rcvAutoParams.copied = 0 - e.rcvListMu.Unlock() + e.rcvQueueInfo.RcvAutoParams.MeasureTime = now + e.rcvQueueInfo.RcvAutoParams.CopiedBytes = 0 + e.rcvQueueInfo.rcvQueueMu.Unlock() } // SetOwner implements tcpip.Endpoint.SetOwner. @@ -1360,7 +1294,7 @@ func (e *endpoint) Read(dst io.Writer, opts tcpip.ReadOptions) (tcpip.ReadResult defer e.rcvReadMu.Unlock() // N.B. Here we get a range of segments to be processed. It is safe to not - // hold rcvListMu when processing, since we hold rcvReadMu to ensure only we + // hold rcvQueueMu when processing, since we hold rcvReadMu to ensure only we // can remove segments from the list through commitRead(). first, last, serr := e.startRead() if serr != nil { @@ -1432,10 +1366,10 @@ func (e *endpoint) startRead() (first, last *segment, err tcpip.Error) { // but has some pending unread data. Also note that a RST being received // would cause the state to become StateError so we should allow the // reads to proceed before returning a ECONNRESET. - e.rcvListMu.Lock() - defer e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + defer e.rcvQueueInfo.rcvQueueMu.Unlock() - bufUsed := e.rcvBufUsed + bufUsed := e.rcvQueueInfo.RcvBufUsed if s := e.EndpointState(); !s.connected() && s != StateClose && bufUsed == 0 { if s == StateError { if err := e.hardErrorLocked(); err != nil { @@ -1447,14 +1381,14 @@ func (e *endpoint) startRead() (first, last *segment, err tcpip.Error) { return nil, nil, &tcpip.ErrNotConnected{} } - if e.rcvBufUsed == 0 { - if e.rcvClosed || !e.EndpointState().connected() { + if e.rcvQueueInfo.RcvBufUsed == 0 { + if e.rcvQueueInfo.RcvClosed || !e.EndpointState().connected() { return nil, nil, &tcpip.ErrClosedForReceive{} } return nil, nil, &tcpip.ErrWouldBlock{} } - return e.rcvList.Front(), e.rcvList.Back(), nil + return e.rcvQueueInfo.rcvQueue.Front(), e.rcvQueueInfo.rcvQueue.Back(), nil } // commitRead commits a read of done bytes and returns the next non-empty @@ -1470,20 +1404,20 @@ func (e *endpoint) startRead() (first, last *segment, err tcpip.Error) { func (e *endpoint) commitRead(done int) *segment { e.LockUser() defer e.UnlockUser() - e.rcvListMu.Lock() - defer e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + defer e.rcvQueueInfo.rcvQueueMu.Unlock() memDelta := 0 - s := e.rcvList.Front() + s := e.rcvQueueInfo.rcvQueue.Front() for s != nil && s.data.Size() == 0 { - e.rcvList.Remove(s) + e.rcvQueueInfo.rcvQueue.Remove(s) // Memory is only considered released when the whole segment has been // read. memDelta += s.segMemSize() s.decRef() - s = e.rcvList.Front() + s = e.rcvQueueInfo.rcvQueue.Front() } - e.rcvBufUsed -= done + e.rcvQueueInfo.RcvBufUsed -= done if memDelta > 0 { // If the window was small before this read and if the read freed up @@ -1495,14 +1429,14 @@ func (e *endpoint) commitRead(done int) *segment { } } - return e.rcvList.Front() + return e.rcvQueueInfo.rcvQueue.Front() } // isEndpointWritableLocked checks if a given endpoint is writable // and also returns the number of bytes that can be written at this // moment. If the endpoint is not writable then it returns an error // indicating the reason why it's not writable. -// Caller must hold e.mu and e.sndBufMu +// Caller must hold e.mu and e.sndQueueMu func (e *endpoint) isEndpointWritableLocked() (int, tcpip.Error) { // The endpoint cannot be written to if it's not connected. switch s := e.EndpointState(); { @@ -1522,12 +1456,12 @@ func (e *endpoint) isEndpointWritableLocked() (int, tcpip.Error) { } // Check if the connection has already been closed for sends. - if e.sndClosed { + if e.sndQueueInfo.SndClosed { return 0, &tcpip.ErrClosedForSend{} } sndBufSize := e.getSendBufferSize() - avail := sndBufSize - e.sndBufUsed + avail := sndBufSize - e.sndQueueInfo.SndBufUsed if avail <= 0 { return 0, &tcpip.ErrWouldBlock{} } @@ -1544,8 +1478,8 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp defer e.UnlockUser() nextSeg, n, err := func() (*segment, int, tcpip.Error) { - e.sndBufMu.Lock() - defer e.sndBufMu.Unlock() + e.sndQueueInfo.sndQueueMu.Lock() + defer e.sndQueueInfo.sndQueueMu.Unlock() avail, err := e.isEndpointWritableLocked() if err != nil { @@ -1560,8 +1494,8 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp // available buffer space to be consumed by some other caller while we // are copying data in. if !opts.Atomic { - e.sndBufMu.Unlock() - defer e.sndBufMu.Lock() + e.sndQueueInfo.sndQueueMu.Unlock() + defer e.sndQueueInfo.sndQueueMu.Lock() e.UnlockUser() defer e.LockUser() @@ -1603,10 +1537,10 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp } // Add data to the send queue. - s := newOutgoingSegment(e.ID, v) - e.sndBufUsed += len(v) - e.sndBufInQueue += seqnum.Size(len(v)) - e.sndQueue.PushBack(s) + s := newOutgoingSegment(e.TransportEndpointInfo.ID, v) + e.sndQueueInfo.SndBufUsed += len(v) + e.sndQueueInfo.SndBufInQueue += seqnum.Size(len(v)) + e.sndQueueInfo.sndQueue.PushBack(s) return e.drainSendQueueLocked(), len(v), nil }() @@ -1621,11 +1555,11 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp // selectWindowLocked returns the new window without checking for shrinking or scaling // applied. -// Precondition: e.mu and e.rcvListMu must be held. +// Precondition: e.mu and e.rcvQueueMu must be held. func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) { wndFromAvailable := wndFromSpace(e.receiveBufferAvailableLocked()) - maxWindow := wndFromSpace(e.rcvBufSize) - wndFromUsedBytes := maxWindow - e.rcvBufUsed + maxWindow := wndFromSpace(e.rcvQueueInfo.RcvBufSize) + wndFromUsedBytes := maxWindow - e.rcvQueueInfo.RcvBufUsed // We take the lesser of the wndFromAvailable and wndFromUsedBytes because in // cases where we receive a lot of small segments the segment overhead is a @@ -1643,11 +1577,11 @@ func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) { return seqnum.Size(newWnd) } -// selectWindow invokes selectWindowLocked after acquiring e.rcvListMu. +// selectWindow invokes selectWindowLocked after acquiring e.rcvQueueMu. func (e *endpoint) selectWindow() (wnd seqnum.Size) { - e.rcvListMu.Lock() + e.rcvQueueInfo.rcvQueueMu.Lock() wnd = e.selectWindowLocked() - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Unlock() return wnd } @@ -1665,7 +1599,7 @@ func (e *endpoint) selectWindow() (wnd seqnum.Size) { // above will be true if the new window is >= ACK threshold and false // otherwise. // -// Precondition: e.mu and e.rcvListMu must be held. +// Precondition: e.mu and e.rcvQueueMu must be held. func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed bool, above bool) { newAvail := int(e.selectWindowLocked()) oldAvail := newAvail - deltaBefore @@ -1676,7 +1610,7 @@ func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed boo // rcvBufFraction is the inverse of the fraction of receive buffer size that // is used to decide if the available buffer space is now above it. const rcvBufFraction = 2 - if wndThreshold := wndFromSpace(e.rcvBufSize / rcvBufFraction); threshold > wndThreshold { + if wndThreshold := wndFromSpace(e.rcvQueueInfo.RcvBufSize / rcvBufFraction); threshold > wndThreshold { threshold = wndThreshold } switch { @@ -1711,7 +1645,7 @@ func (e *endpoint) OnKeepAliveSet(bool) { func (e *endpoint) OnDelayOptionSet(v bool) { if !v { // Handle delayed data. - e.sndWaker.Assert() + e.sndQueueInfo.sndWaker.Assert() } } @@ -1719,7 +1653,7 @@ func (e *endpoint) OnDelayOptionSet(v bool) { func (e *endpoint) OnCorkOptionSet(v bool) { if !v { // Handle the corked data. - e.sndWaker.Assert() + e.sndQueueInfo.sndWaker.Assert() } } @@ -1792,23 +1726,23 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error { } e.LockUser() - e.rcvListMu.Lock() + e.rcvQueueInfo.rcvQueueMu.Lock() // Make sure the receive buffer size allows us to send a // non-zero window size. scale := uint8(0) if e.rcv != nil { - scale = e.rcv.rcvWndScale + scale = e.rcv.RcvWndScale } if v>>scale == 0 { v = 1 << scale } availBefore := wndFromSpace(e.receiveBufferAvailableLocked()) - e.rcvBufSize = v + e.rcvQueueInfo.RcvBufSize = v availAfter := wndFromSpace(e.receiveBufferAvailableLocked()) - e.rcvAutoParams.disabled = true + e.rcvQueueInfo.RcvAutoParams.Disabled = true // Immediately send an ACK to uncork the sender silly window // syndrome prevetion, when our available space grows above aMSS @@ -1817,7 +1751,7 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error { e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) } - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Unlock() e.UnlockUser() case tcpip.TTLOption: @@ -1962,10 +1896,10 @@ func (e *endpoint) readyReceiveSize() (int, tcpip.Error) { return 0, &tcpip.ErrInvalidEndpointState{} } - e.rcvListMu.Lock() - defer e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + defer e.rcvQueueInfo.rcvQueueMu.Unlock() - return e.rcvBufUsed, nil + return e.rcvQueueInfo.RcvBufUsed, nil } // GetSockOptInt implements tcpip.Endpoint.GetSockOptInt. @@ -2006,9 +1940,9 @@ func (e *endpoint) GetSockOptInt(opt tcpip.SockOptInt) (int, tcpip.Error) { return e.readyReceiveSize() case tcpip.ReceiveBufferSizeOption: - e.rcvListMu.Lock() - v := e.rcvBufSize - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + v := e.rcvQueueInfo.RcvBufSize + e.rcvQueueInfo.rcvQueueMu.Unlock() return v, nil case tcpip.TTLOption: @@ -2046,15 +1980,15 @@ func (e *endpoint) getTCPInfo() tcpip.TCPInfoOption { // the connection did not send and receive data, then RTT will // be zero. snd.rtt.Lock() - info.RTT = snd.rtt.srtt - info.RTTVar = snd.rtt.rttvar + info.RTT = snd.rtt.TCPRTTState.SRTT + info.RTTVar = snd.rtt.TCPRTTState.RTTVar snd.rtt.Unlock() - info.RTO = snd.rto + info.RTO = snd.RTO info.CcState = snd.state - info.SndSsthresh = uint32(snd.sndSsthresh) - info.SndCwnd = uint32(snd.sndCwnd) - info.ReorderSeen = snd.rc.reorderSeen + info.SndSsthresh = uint32(snd.Ssthresh) + info.SndCwnd = uint32(snd.SndCwnd) + info.ReorderSeen = snd.rc.Reord } e.UnlockUser() return info @@ -2099,7 +2033,7 @@ func (e *endpoint) GetSockOpt(opt tcpip.GettableSocketOption) tcpip.Error { case *tcpip.OriginalDestinationOption: e.LockUser() ipt := e.stack.IPTables() - addr, port, err := ipt.OriginalDst(e.ID, e.NetProto) + addr, port, err := ipt.OriginalDst(e.TransportEndpointInfo.ID, e.NetProto) e.UnlockUser() if err != nil { return err @@ -2207,20 +2141,20 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp } // Find a route to the desired destination. - r, err := e.stack.FindRoute(nicID, e.ID.LocalAddress, addr.Addr, netProto, false /* multicastLoop */) + r, err := e.stack.FindRoute(nicID, e.TransportEndpointInfo.ID.LocalAddress, addr.Addr, netProto, false /* multicastLoop */) if err != nil { return err } defer r.Release() netProtos := []tcpip.NetworkProtocolNumber{netProto} - e.ID.LocalAddress = r.LocalAddress() - e.ID.RemoteAddress = r.RemoteAddress() - e.ID.RemotePort = addr.Port + e.TransportEndpointInfo.ID.LocalAddress = r.LocalAddress() + e.TransportEndpointInfo.ID.RemoteAddress = r.RemoteAddress() + e.TransportEndpointInfo.ID.RemotePort = addr.Port - if e.ID.LocalPort != 0 { + if e.TransportEndpointInfo.ID.LocalPort != 0 { // The endpoint is bound to a port, attempt to register it. - err := e.stack.RegisterTransportEndpoint(netProtos, ProtocolNumber, e.ID, e, e.boundPortFlags, e.boundBindToDevice) + err := e.stack.RegisterTransportEndpoint(netProtos, ProtocolNumber, e.TransportEndpointInfo.ID, e, e.boundPortFlags, e.boundBindToDevice) if err != nil { return err } @@ -2229,7 +2163,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp // one. Make sure that it isn't one that will result in the same // address/port for both local and remote (otherwise this // endpoint would be trying to connect to itself). - sameAddr := e.ID.LocalAddress == e.ID.RemoteAddress + sameAddr := e.TransportEndpointInfo.ID.LocalAddress == e.TransportEndpointInfo.ID.RemoteAddress // Calculate a port offset based on the destination IP/port and // src IP to ensure that for a given tuple (srcIP, destIP, @@ -2262,21 +2196,21 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp if twReuse == tcpip.TCPTimeWaitReuseLoopbackOnly { switch netProto { case header.IPv4ProtocolNumber: - reuse = header.IsV4LoopbackAddress(e.ID.LocalAddress) && header.IsV4LoopbackAddress(e.ID.RemoteAddress) + reuse = header.IsV4LoopbackAddress(e.TransportEndpointInfo.ID.LocalAddress) && header.IsV4LoopbackAddress(e.TransportEndpointInfo.ID.RemoteAddress) case header.IPv6ProtocolNumber: - reuse = e.ID.LocalAddress == header.IPv6Loopback && e.ID.RemoteAddress == header.IPv6Loopback + reuse = e.TransportEndpointInfo.ID.LocalAddress == header.IPv6Loopback && e.TransportEndpointInfo.ID.RemoteAddress == header.IPv6Loopback } } bindToDevice := tcpip.NICID(e.ops.GetBindToDevice()) if _, err := e.stack.PickEphemeralPortStable(portOffset, func(p uint16) (bool, tcpip.Error) { - if sameAddr && p == e.ID.RemotePort { + if sameAddr && p == e.TransportEndpointInfo.ID.RemotePort { return false, nil } portRes := ports.Reservation{ Networks: netProtos, Transport: ProtocolNumber, - Addr: e.ID.LocalAddress, + Addr: e.TransportEndpointInfo.ID.LocalAddress, Port: p, Flags: e.portFlags, BindToDevice: bindToDevice, @@ -2286,7 +2220,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp if _, ok := err.(*tcpip.ErrPortInUse); !ok || !reuse { return false, nil } - transEPID := e.ID + transEPID := e.TransportEndpointInfo.ID transEPID.LocalPort = p // Check if an endpoint is registered with demuxer in TIME-WAIT and if // we can reuse it. If we can't find a transport endpoint then we just @@ -2323,7 +2257,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp portRes := ports.Reservation{ Networks: netProtos, Transport: ProtocolNumber, - Addr: e.ID.LocalAddress, + Addr: e.TransportEndpointInfo.ID.LocalAddress, Port: p, Flags: e.portFlags, BindToDevice: bindToDevice, @@ -2334,13 +2268,13 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp } } - id := e.ID + id := e.TransportEndpointInfo.ID id.LocalPort = p if err := e.stack.RegisterTransportEndpoint(netProtos, ProtocolNumber, id, e, e.portFlags, bindToDevice); err != nil { portRes := ports.Reservation{ Networks: netProtos, Transport: ProtocolNumber, - Addr: e.ID.LocalAddress, + Addr: e.TransportEndpointInfo.ID.LocalAddress, Port: p, Flags: e.portFlags, BindToDevice: bindToDevice, @@ -2355,7 +2289,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp // Port picking successful. Save the details of // the selected port. - e.ID = id + e.TransportEndpointInfo.ID = id e.isPortReserved = true e.boundBindToDevice = bindToDevice e.boundPortFlags = e.portFlags @@ -2381,10 +2315,10 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp // connection setting here. if !handshake { e.segmentQueue.mu.Lock() - for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} { + for _, l := range []segmentList{e.segmentQueue.list, e.sndQueueInfo.sndQueue, e.snd.writeList} { for s := l.Front(); s != nil; s = s.Next() { - s.id = e.ID - e.sndWaker.Assert() + s.id = e.TransportEndpointInfo.ID + e.sndQueueInfo.sndWaker.Assert() } } e.segmentQueue.mu.Unlock() @@ -2426,10 +2360,10 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { // Close for read. if e.shutdownFlags&tcpip.ShutdownRead != 0 { // Mark read side as closed. - e.rcvListMu.Lock() - e.rcvClosed = true - rcvBufUsed := e.rcvBufUsed - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + e.rcvQueueInfo.RcvClosed = true + rcvBufUsed := e.rcvQueueInfo.RcvBufUsed + e.rcvQueueInfo.rcvQueueMu.Unlock() // If we're fully closed and we have unread data we need to abort // the connection with a RST. @@ -2443,10 +2377,10 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { // Close for write. if e.shutdownFlags&tcpip.ShutdownWrite != 0 { - e.sndBufMu.Lock() - if e.sndClosed { + e.sndQueueInfo.sndQueueMu.Lock() + if e.sndQueueInfo.SndClosed { // Already closed. - e.sndBufMu.Unlock() + e.sndQueueInfo.sndQueueMu.Unlock() if e.EndpointState() == StateTimeWait { return &tcpip.ErrNotConnected{} } @@ -2454,12 +2388,12 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { } // Queue fin segment. - s := newOutgoingSegment(e.ID, nil) - e.sndQueue.PushBack(s) - e.sndBufInQueue++ + s := newOutgoingSegment(e.TransportEndpointInfo.ID, nil) + e.sndQueueInfo.sndQueue.PushBack(s) + e.sndQueueInfo.SndBufInQueue++ // Mark endpoint as closed. - e.sndClosed = true - e.sndBufMu.Unlock() + e.sndQueueInfo.SndClosed = true + e.sndQueueInfo.sndQueueMu.Unlock() e.handleClose() } @@ -2472,9 +2406,9 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { // // By not removing this endpoint from the demuxer mapping, we // ensure that any other bind to the same port fails, as on Linux. - e.rcvListMu.Lock() - e.rcvClosed = true - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + e.rcvQueueInfo.RcvClosed = true + e.rcvQueueInfo.rcvQueueMu.Unlock() e.closePendingAcceptableConnectionsLocked() // Notify waiters that the endpoint is shutdown. e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr) @@ -2513,9 +2447,9 @@ func (e *endpoint) listen(backlog int) tcpip.Error { // listen is called after shutdown. e.accepted.cap = backlog e.shutdownFlags = 0 - e.rcvListMu.Lock() - e.rcvClosed = false - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + e.rcvQueueInfo.RcvClosed = false + e.rcvQueueInfo.rcvQueueMu.Unlock() } else { // Adjust the size of the backlog iff we can fit // existing pending connections into the new one. @@ -2548,7 +2482,7 @@ func (e *endpoint) listen(backlog int) tcpip.Error { } // Register the endpoint. - if err := e.stack.RegisterTransportEndpoint(e.effectiveNetProtos, ProtocolNumber, e.ID, e, e.boundPortFlags, e.boundBindToDevice); err != nil { + if err := e.stack.RegisterTransportEndpoint(e.effectiveNetProtos, ProtocolNumber, e.TransportEndpointInfo.ID, e, e.boundPortFlags, e.boundBindToDevice); err != nil { return err } @@ -2588,9 +2522,9 @@ func (e *endpoint) Accept(peerAddr *tcpip.FullAddress) (tcpip.Endpoint, *waiter. e.LockUser() defer e.UnlockUser() - e.rcvListMu.Lock() - rcvClosed := e.rcvClosed - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + rcvClosed := e.rcvQueueInfo.RcvClosed + e.rcvQueueInfo.rcvQueueMu.Unlock() // Endpoint must be in listen state before it can accept connections. if rcvClosed || e.EndpointState() != StateListen { return nil, nil, &tcpip.ErrInvalidEndpointState{} @@ -2656,7 +2590,7 @@ func (e *endpoint) bindLocked(addr tcpip.FullAddress) (err tcpip.Error) { if nic == 0 { return &tcpip.ErrBadLocalAddress{} } - e.ID.LocalAddress = addr.Addr + e.TransportEndpointInfo.ID.LocalAddress = addr.Addr } bindToDevice := tcpip.NICID(e.ops.GetBindToDevice()) @@ -2670,7 +2604,7 @@ func (e *endpoint) bindLocked(addr tcpip.FullAddress) (err tcpip.Error) { Dest: tcpip.FullAddress{}, } port, err := e.stack.ReservePort(portRes, func(p uint16) (bool, tcpip.Error) { - id := e.ID + id := e.TransportEndpointInfo.ID id.LocalPort = p // CheckRegisterTransportEndpoint should only return an error if there is a // listening endpoint bound with the same id and portFlags and bindToDevice @@ -2696,7 +2630,7 @@ func (e *endpoint) bindLocked(addr tcpip.FullAddress) (err tcpip.Error) { e.boundNICID = nic e.isPortReserved = true e.effectiveNetProtos = netProtos - e.ID.LocalPort = port + e.TransportEndpointInfo.ID.LocalPort = port // Mark endpoint as bound. e.setEndpointState(StateBound) @@ -2710,8 +2644,8 @@ func (e *endpoint) GetLocalAddress() (tcpip.FullAddress, tcpip.Error) { defer e.UnlockUser() return tcpip.FullAddress{ - Addr: e.ID.LocalAddress, - Port: e.ID.LocalPort, + Addr: e.TransportEndpointInfo.ID.LocalAddress, + Port: e.TransportEndpointInfo.ID.LocalPort, NIC: e.boundNICID, }, nil } @@ -2730,8 +2664,8 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { func (e *endpoint) getRemoteAddress() tcpip.FullAddress { return tcpip.FullAddress{ - Addr: e.ID.RemoteAddress, - Port: e.ID.RemotePort, + Addr: e.TransportEndpointInfo.ID.RemoteAddress, + Port: e.TransportEndpointInfo.ID.RemotePort, NIC: e.boundNICID, } } @@ -2770,13 +2704,13 @@ func (e *endpoint) onICMPError(err tcpip.Error, transErr stack.TransportError, p Payload: pkt.Data().AsRange().ToOwnedView(), Dst: tcpip.FullAddress{ NIC: pkt.NICID, - Addr: e.ID.RemoteAddress, - Port: e.ID.RemotePort, + Addr: e.TransportEndpointInfo.ID.RemoteAddress, + Port: e.TransportEndpointInfo.ID.RemotePort, }, Offender: tcpip.FullAddress{ NIC: pkt.NICID, - Addr: e.ID.LocalAddress, - Port: e.ID.LocalPort, + Addr: e.TransportEndpointInfo.ID.LocalAddress, + Port: e.TransportEndpointInfo.ID.LocalPort, }, NetProto: pkt.NetworkProtocolNumber, }) @@ -2789,12 +2723,12 @@ func (e *endpoint) onICMPError(err tcpip.Error, transErr stack.TransportError, p // HandleError implements stack.TransportEndpoint. func (e *endpoint) HandleError(transErr stack.TransportError, pkt *stack.PacketBuffer) { handlePacketTooBig := func(mtu uint32) { - e.sndBufMu.Lock() - e.packetTooBigCount++ - if v := int(mtu); v < e.sndMTU { - e.sndMTU = v + e.sndQueueInfo.sndQueueMu.Lock() + e.sndQueueInfo.PacketTooBigCount++ + if v := int(mtu); v < e.sndQueueInfo.SndMTU { + e.sndQueueInfo.SndMTU = v } - e.sndBufMu.Unlock() + e.sndQueueInfo.sndQueueMu.Unlock() e.notifyProtocolGoroutine(notifyMTUChanged) } @@ -2813,14 +2747,14 @@ func (e *endpoint) HandleError(transErr stack.TransportError, pkt *stack.PacketB // in the send buffer. The number of newly available bytes is v. func (e *endpoint) updateSndBufferUsage(v int) { sendBufferSize := e.getSendBufferSize() - e.sndBufMu.Lock() - notify := e.sndBufUsed >= sendBufferSize>>1 - e.sndBufUsed -= v + e.sndQueueInfo.sndQueueMu.Lock() + notify := e.sndQueueInfo.SndBufUsed >= sendBufferSize>>1 + e.sndQueueInfo.SndBufUsed -= v // We only notify when there is half the sendBufferSize available after // a full buffer event occurs. This ensures that we don't wake up // writers to queue just 1-2 segments and go back to sleep. - notify = notify && e.sndBufUsed < sendBufferSize>>1 - e.sndBufMu.Unlock() + notify = notify && e.sndQueueInfo.SndBufUsed < int(sendBufferSize)>>1 + e.sndQueueInfo.sndQueueMu.Unlock() if notify { e.waiterQueue.Notify(waiter.WritableEvents) @@ -2831,55 +2765,55 @@ func (e *endpoint) updateSndBufferUsage(v int) { // to be read, or when the connection is closed for receiving (in which case // s will be nil). func (e *endpoint) readyToRead(s *segment) { - e.rcvListMu.Lock() + e.rcvQueueInfo.rcvQueueMu.Lock() if s != nil { - e.rcvBufUsed += s.payloadSize() + e.rcvQueueInfo.RcvBufUsed += s.payloadSize() s.incRef() - e.rcvList.PushBack(s) + e.rcvQueueInfo.rcvQueue.PushBack(s) } else { - e.rcvClosed = true + e.rcvQueueInfo.RcvClosed = true } - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Unlock() e.waiterQueue.Notify(waiter.ReadableEvents) } // receiveBufferAvailableLocked calculates how many bytes are still available // in the receive buffer. -// rcvListMu must be held when this function is called. +// rcvQueueMu must be held when this function is called. func (e *endpoint) receiveBufferAvailableLocked() int { // We may use more bytes than the buffer size when the receive buffer // shrinks. memUsed := e.receiveMemUsed() - if memUsed >= e.rcvBufSize { + if memUsed >= e.rcvQueueInfo.RcvBufSize { return 0 } - return e.rcvBufSize - memUsed + return e.rcvQueueInfo.RcvBufSize - memUsed } // receiveBufferAvailable calculates how many bytes are still available in the // receive buffer based on the actual memory used by all segments held in // receive buffer/pending and segment queue. func (e *endpoint) receiveBufferAvailable() int { - e.rcvListMu.Lock() + e.rcvQueueInfo.rcvQueueMu.Lock() available := e.receiveBufferAvailableLocked() - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Unlock() return available } // receiveBufferUsed returns the amount of in-use receive buffer. func (e *endpoint) receiveBufferUsed() int { - e.rcvListMu.Lock() - used := e.rcvBufUsed - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + used := e.rcvQueueInfo.RcvBufUsed + e.rcvQueueInfo.rcvQueueMu.Unlock() return used } // receiveBufferSize returns the current size of the receive buffer. func (e *endpoint) receiveBufferSize() int { - e.rcvListMu.Lock() - size := e.rcvBufSize - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + size := e.rcvQueueInfo.RcvBufSize + e.rcvQueueInfo.rcvQueueMu.Unlock() return size } @@ -2913,9 +2847,9 @@ func (e *endpoint) maxReceiveBufferSize() int { func (e *endpoint) rcvWndScaleForHandshake() int { bufSizeForScale := e.receiveBufferSize() - e.rcvListMu.Lock() - autoTuningDisabled := e.rcvAutoParams.disabled - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Lock() + autoTuningDisabled := e.rcvQueueInfo.RcvAutoParams.Disabled + e.rcvQueueInfo.rcvQueueMu.Unlock() if autoTuningDisabled { return FindWndScale(seqnum.Size(bufSizeForScale)) } @@ -2926,7 +2860,7 @@ func (e *endpoint) rcvWndScaleForHandshake() int { // updateRecentTimestamp updates the recent timestamp using the algorithm // described in https://tools.ietf.org/html/rfc7323#section-4.3 func (e *endpoint) updateRecentTimestamp(tsVal uint32, maxSentAck seqnum.Value, segSeq seqnum.Value) { - if e.sendTSOk && seqnum.Value(e.recentTimestamp()).LessThan(seqnum.Value(tsVal)) && segSeq.LessThanEq(maxSentAck) { + if e.SendTSOk && seqnum.Value(e.recentTimestamp()).LessThan(seqnum.Value(tsVal)) && segSeq.LessThanEq(maxSentAck) { e.setRecentTimestamp(tsVal) } } @@ -2936,7 +2870,7 @@ func (e *endpoint) updateRecentTimestamp(tsVal uint32, maxSentAck seqnum.Value, // initializes the recentTS with the value provided in synOpts.TSval. func (e *endpoint) maybeEnableTimestamp(synOpts *header.TCPSynOptions) { if synOpts.TS { - e.sendTSOk = true + e.SendTSOk = true e.setRecentTimestamp(synOpts.TSVal) } } @@ -2944,7 +2878,7 @@ func (e *endpoint) maybeEnableTimestamp(synOpts *header.TCPSynOptions) { // timestamp returns the timestamp value to be used in the TSVal field of the // timestamp option for outgoing TCP segments for a given endpoint. func (e *endpoint) timestamp() uint32 { - return tcpTimeStamp(time.Now(), e.tsOffset) + return tcpTimeStamp(time.Now(), e.TSOffset) } // tcpTimeStamp returns a timestamp offset by the provided offset. This is @@ -2983,7 +2917,7 @@ func (e *endpoint) maybeEnableSACKPermitted(synOpts *header.TCPSynOptions) { return } if bool(v) && synOpts.SACKPermitted { - e.sackPermitted = true + e.SACKPermitted = true } } @@ -2997,118 +2931,46 @@ func (e *endpoint) maxOptionSize() (size int) { return size } -// completeState makes a full copy of the endpoint and returns it. This is used -// before invoking the probe. The state returned may not be fully consistent if -// there are intervening syscalls when the state is being copied. -func (e *endpoint) completeState() stack.TCPEndpointState { - var s stack.TCPEndpointState - s.SegTime = time.Now() - - // Copy EndpointID. - s.ID = stack.TCPEndpointID(e.ID) - - // Copy endpoint rcv state. - e.rcvListMu.Lock() - s.RcvBufSize = e.rcvBufSize - s.RcvBufUsed = e.rcvBufUsed - s.RcvClosed = e.rcvClosed - s.RcvAutoParams.MeasureTime = e.rcvAutoParams.measureTime - s.RcvAutoParams.CopiedBytes = e.rcvAutoParams.copied - s.RcvAutoParams.PrevCopiedBytes = e.rcvAutoParams.prevCopied - s.RcvAutoParams.RTT = e.rcvAutoParams.rtt - s.RcvAutoParams.RTTMeasureSeqNumber = e.rcvAutoParams.rttMeasureSeqNumber - s.RcvAutoParams.RTTMeasureTime = e.rcvAutoParams.rttMeasureTime - s.RcvAutoParams.Disabled = e.rcvAutoParams.disabled - e.rcvListMu.Unlock() - - // Endpoint TCP Option state. - s.SendTSOk = e.sendTSOk - s.RecentTS = e.recentTimestamp() - s.TSOffset = e.tsOffset - s.SACKPermitted = e.sackPermitted +// completeStateLocked makes a full copy of the endpoint and returns it. This is +// used before invoking the probe. +// +// Precondition: e.mu must be held. +func (e *endpoint) completeStateLocked() stack.TCPEndpointState { + s := stack.TCPEndpointState{ + TCPEndpointStateInner: e.TCPEndpointStateInner, + ID: stack.TCPEndpointID(e.TransportEndpointInfo.ID), + SegTime: time.Now(), + Receiver: e.rcv.TCPReceiverState, + Sender: e.snd.TCPSenderState, + } + + sndBufSize := e.getSendBufferSize() + // Copy the send buffer atomically. + e.sndQueueInfo.sndQueueMu.Lock() + s.SndBufState = e.sndQueueInfo.TCPSndBufState + s.SndBufState.SndBufSize = sndBufSize + e.sndQueueInfo.sndQueueMu.Unlock() + + // Copy the receive buffer atomically. + e.rcvQueueInfo.rcvQueueMu.Lock() + s.RcvBufState = e.rcvQueueInfo.TCPRcvBufState + e.rcvQueueInfo.rcvQueueMu.Unlock() + + // Copy the endpoint TCP Option state. s.SACK.Blocks = make([]header.SACKBlock, e.sack.NumBlocks) copy(s.SACK.Blocks, e.sack.Blocks[:e.sack.NumBlocks]) s.SACK.ReceivedBlocks, s.SACK.MaxSACKED = e.scoreboard.Copy() - // Copy endpoint send state. - sndBufSize := e.getSendBufferSize() - e.sndBufMu.Lock() - s.SndBufSize = sndBufSize - s.SndBufUsed = e.sndBufUsed - s.SndClosed = e.sndClosed - s.SndBufInQueue = e.sndBufInQueue - s.PacketTooBigCount = e.packetTooBigCount - s.SndMTU = e.sndMTU - e.sndBufMu.Unlock() - - // Copy receiver state. - s.Receiver = stack.TCPReceiverState{ - RcvNxt: e.rcv.rcvNxt, - RcvAcc: e.rcv.rcvAcc, - RcvWndScale: e.rcv.rcvWndScale, - PendingBufUsed: e.rcv.pendingBufUsed, - } - - // Copy sender state. - s.Sender = stack.TCPSenderState{ - LastSendTime: e.snd.lastSendTime, - DupAckCount: e.snd.dupAckCount, - FastRecovery: stack.TCPFastRecoveryState{ - Active: e.snd.fr.active, - First: e.snd.fr.first, - Last: e.snd.fr.last, - MaxCwnd: e.snd.fr.maxCwnd, - HighRxt: e.snd.fr.highRxt, - RescueRxt: e.snd.fr.rescueRxt, - }, - SndCwnd: e.snd.sndCwnd, - Ssthresh: e.snd.sndSsthresh, - SndCAAckCount: e.snd.sndCAAckCount, - Outstanding: e.snd.outstanding, - SackedOut: e.snd.sackedOut, - SndWnd: e.snd.sndWnd, - SndUna: e.snd.sndUna, - SndNxt: e.snd.sndNxt, - RTTMeasureSeqNum: e.snd.rttMeasureSeqNum, - RTTMeasureTime: e.snd.rttMeasureTime, - Closed: e.snd.closed, - RTO: e.snd.rto, - MaxPayloadSize: e.snd.maxPayloadSize, - SndWndScale: e.snd.sndWndScale, - MaxSentAck: e.snd.maxSentAck, - } e.snd.rtt.Lock() - s.Sender.SRTT = e.snd.rtt.srtt - s.Sender.SRTTInited = e.snd.rtt.srttInited + s.Sender.RTTState = e.snd.rtt.TCPRTTState e.snd.rtt.Unlock() if cubic, ok := e.snd.cc.(*cubicState); ok { - s.Sender.Cubic = stack.TCPCubicState{ - WMax: cubic.wMax, - WLastMax: cubic.wLastMax, - T: cubic.t, - TimeSinceLastCongestion: time.Since(cubic.t), - C: cubic.c, - K: cubic.k, - Beta: cubic.beta, - WC: cubic.wC, - WEst: cubic.wEst, - } - } - - rc := &e.snd.rc - s.Sender.RACKState = stack.TCPRACKState{ - XmitTime: rc.xmitTime, - EndSequence: rc.endSequence, - FACK: rc.fack, - RTT: rc.rtt, - Reord: rc.reorderSeen, - DSACKSeen: rc.dsackSeen, - ReoWnd: rc.reoWnd, - ReoWndIncr: rc.reoWndIncr, - ReoWndPersist: rc.reoWndPersist, - RTTSeq: rc.rttSeq, + s.Sender.Cubic = cubic.TCPCubicState + s.Sender.Cubic.TimeSinceLastCongestion = time.Since(s.Sender.Cubic.T) } + + s.Sender.RACKState = e.snd.rc.TCPRACKState return s } |