diff options
-rw-r--r-- | pkg/tcpip/stack/stack_state_autogen.go | 11 | ||||
-rw-r--r-- | pkg/tcpip/stack/tcp.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 43 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 29 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_state_autogen.go | 3 |
5 files changed, 21 insertions, 68 deletions
diff --git a/pkg/tcpip/stack/stack_state_autogen.go b/pkg/tcpip/stack/stack_state_autogen.go index b287f64c3..47c7e386e 100644 --- a/pkg/tcpip/stack/stack_state_autogen.go +++ b/pkg/tcpip/stack/stack_state_autogen.go @@ -1045,7 +1045,6 @@ func (t *TCPSndBufState) StateFields() []string { "SndBufSize", "SndBufUsed", "SndClosed", - "SndBufInQueue", "PacketTooBigCount", "SndMTU", } @@ -1059,9 +1058,8 @@ func (t *TCPSndBufState) StateSave(stateSinkObject state.Sink) { stateSinkObject.Save(0, &t.SndBufSize) stateSinkObject.Save(1, &t.SndBufUsed) stateSinkObject.Save(2, &t.SndClosed) - stateSinkObject.Save(3, &t.SndBufInQueue) - stateSinkObject.Save(4, &t.PacketTooBigCount) - stateSinkObject.Save(5, &t.SndMTU) + stateSinkObject.Save(3, &t.PacketTooBigCount) + stateSinkObject.Save(4, &t.SndMTU) } func (t *TCPSndBufState) afterLoad() {} @@ -1071,9 +1069,8 @@ func (t *TCPSndBufState) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(0, &t.SndBufSize) stateSourceObject.Load(1, &t.SndBufUsed) stateSourceObject.Load(2, &t.SndClosed) - stateSourceObject.Load(3, &t.SndBufInQueue) - stateSourceObject.Load(4, &t.PacketTooBigCount) - stateSourceObject.Load(5, &t.SndMTU) + stateSourceObject.Load(3, &t.PacketTooBigCount) + stateSourceObject.Load(4, &t.SndMTU) } func (t *TCPEndpointStateInner) StateTypeName() string { diff --git a/pkg/tcpip/stack/tcp.go b/pkg/tcpip/stack/tcp.go index e90c1a770..90a8ba6cf 100644 --- a/pkg/tcpip/stack/tcp.go +++ b/pkg/tcpip/stack/tcp.go @@ -380,9 +380,6 @@ type TCPSndBufState struct { // SndClosed indicates that the endpoint has been closed for sends. SndClosed bool - // SndBufInQueue is the number of bytes in the send queue. - SndBufInQueue seqnum.Size - // PacketTooBigCount is used to notify the main protocol routine how // many times a "packet too big" control packet is received. PacketTooBigCount int diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index f86450298..e39d1623d 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -909,30 +909,13 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags header.TCPFlags, se return err } -func (e *endpoint) handleWrite() { - e.sndQueueInfo.sndQueueMu.Lock() - next := e.drainSendQueueLocked() - e.sndQueueInfo.sndQueueMu.Unlock() - - e.sendData(next) -} - -// Move packets from send queue to send list. -// -// Precondition: e.sndBufMu must be locked. -func (e *endpoint) drainSendQueueLocked() *segment { - first := e.sndQueueInfo.sndQueue.Front() - if first != nil { - e.snd.writeList.PushBackList(&e.sndQueueInfo.sndQueue) - e.sndQueueInfo.SndBufInQueue = 0 - } - return first -} - // Precondition: e.mu must be locked. func (e *endpoint) sendData(next *segment) { // Initialize the next segment to write if it's currently nil. if e.snd.writeNext == nil { + if next == nil { + return + } e.snd.writeNext = next } @@ -940,17 +923,6 @@ func (e *endpoint) sendData(next *segment) { e.snd.sendData() } -func (e *endpoint) handleClose() { - if !e.EndpointState().connected() { - return - } - // Drain the send queue. - e.handleWrite() - - // Mark send side as closed. - e.snd.Closed = true -} - // resetConnectionLocked puts the endpoint in an error state with the given // error code and sends a RST if and only if the error is not ErrConnectionReset // indicating that the connection is being reset due to receiving a RST. This @@ -1402,14 +1374,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ { w: &e.sndQueueInfo.sndWaker, f: func() tcpip.Error { - e.handleWrite() - return nil - }, - }, - { - w: &e.sndQueueInfo.sndCloseWaker, - f: func() tcpip.Error { - e.handleClose() + e.sendData(nil /* next */) return nil }, }, diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 242e6b7f8..4acddc959 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -293,16 +293,9 @@ type sndQueueInfo struct { sndQueueMu sync.Mutex `state:"nosave"` stack.TCPSndBufState - // sndQueue holds segments that are ready to be sent. - sndQueue segmentList `state:"wait"` - - // sndWaker is used to signal the protocol goroutine when segments are - // added to the `sndQueue`. + // sndWaker is used to signal the protocol goroutine when there may be + // segments that need to be sent. sndWaker sleep.Waker `state:"manual"` - - // sndCloseWaker is used to notify the protocol goroutine when the send - // side is closed. - sndCloseWaker sleep.Waker `state:"manual"` } // rcvQueueInfo contains the endpoint's rcvQueue and associated metadata. @@ -1558,10 +1551,9 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp // Add data to the send queue. s := newOutgoingSegment(e.TransportEndpointInfo.ID, e.stack.Clock(), v) e.sndQueueInfo.SndBufUsed += len(v) - e.sndQueueInfo.SndBufInQueue += seqnum.Size(len(v)) - e.sndQueueInfo.sndQueue.PushBack(s) + e.snd.writeList.PushBack(s) - return e.drainSendQueueLocked(), len(v), nil + return s, len(v), nil }() // Return if either we didn't queue anything or if an error occurred while // attempting to queue data. @@ -2314,7 +2306,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp // connection setting here. if !handshake { e.segmentQueue.mu.Lock() - for _, l := range []segmentList{e.segmentQueue.list, e.sndQueueInfo.sndQueue, e.snd.writeList} { + for _, l := range []segmentList{e.segmentQueue.list, e.snd.writeList} { for s := l.Front(); s != nil; s = s.Next() { s.id = e.TransportEndpointInfo.ID e.sndQueueInfo.sndWaker.Assert() @@ -2391,12 +2383,17 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { // Queue fin segment. s := newOutgoingSegment(e.TransportEndpointInfo.ID, e.stack.Clock(), nil) - e.sndQueueInfo.sndQueue.PushBack(s) - e.sndQueueInfo.SndBufInQueue++ + e.snd.writeList.PushBack(s) // Mark endpoint as closed. e.sndQueueInfo.SndClosed = true e.sndQueueInfo.sndQueueMu.Unlock() - e.handleClose() + + // Drain the send queue. + e.sendData(s) + + // Mark send side as closed. + e.snd.Closed = true + // Wake up any writers that maybe waiting for the stream to become // writable. e.waiterQueue.Notify(waiter.WritableEvents) diff --git a/pkg/tcpip/transport/tcp/tcp_state_autogen.go b/pkg/tcpip/transport/tcp/tcp_state_autogen.go index 18efbcc3a..9c46d31d7 100644 --- a/pkg/tcpip/transport/tcp/tcp_state_autogen.go +++ b/pkg/tcpip/transport/tcp/tcp_state_autogen.go @@ -73,7 +73,6 @@ func (s *sndQueueInfo) StateTypeName() string { func (s *sndQueueInfo) StateFields() []string { return []string{ "TCPSndBufState", - "sndQueue", } } @@ -83,7 +82,6 @@ func (s *sndQueueInfo) beforeSave() {} func (s *sndQueueInfo) StateSave(stateSinkObject state.Sink) { s.beforeSave() stateSinkObject.Save(0, &s.TCPSndBufState) - stateSinkObject.Save(1, &s.sndQueue) } func (s *sndQueueInfo) afterLoad() {} @@ -91,7 +89,6 @@ func (s *sndQueueInfo) afterLoad() {} // +checklocksignore func (s *sndQueueInfo) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(0, &s.TCPSndBufState) - stateSourceObject.LoadWait(1, &s.sndQueue) } func (r *rcvQueueInfo) StateTypeName() string { |