diff options
Diffstat (limited to 'pkg/tcpip')
-rw-r--r-- | pkg/tcpip/link/qdisc/fifo/endpoint.go | 11 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/accept.go | 13 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 286 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/dispatcher.go | 7 |
4 files changed, 140 insertions, 177 deletions
diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go index b41e3e2fa..c15cbf81b 100644 --- a/pkg/tcpip/link/qdisc/fifo/endpoint.go +++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go @@ -73,20 +73,19 @@ func New(lower stack.LinkEndpoint, n int, queueLen int) stack.LinkEndpoint { } func (q *queueDispatcher) dispatchLoop() { - const newPacketWakerID = 1 - const closeWakerID = 2 s := sleep.Sleeper{} - s.AddWaker(&q.newPacketWaker, newPacketWakerID) - s.AddWaker(&q.closeWaker, closeWakerID) + s.AddWaker(&q.newPacketWaker) + s.AddWaker(&q.closeWaker) defer s.Done() const batchSize = 32 var batch stack.PacketBufferList for { - id, ok := s.Fetch(true) - if ok && id == closeWakerID { + w := s.Fetch(true) + if w == &q.closeWaker { return } + // Must otherwise be the newPacketWaker. for pkt := q.q.dequeue(); pkt != nil; pkt = q.q.dequeue() { batch.PushBack(pkt) if batch.Len() < batchSize && !q.q.empty() { diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index caf14b0dc..d0f68b72c 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -762,14 +762,15 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) { }() var s sleep.Sleeper - s.AddWaker(&e.notificationWaker, wakerForNotification) - s.AddWaker(&e.newSegmentWaker, wakerForNewSegment) + s.AddWaker(&e.notificationWaker) + s.AddWaker(&e.newSegmentWaker) + defer s.Done() for { e.mu.Unlock() - index, _ := s.Fetch(true) + w := s.Fetch(true) e.mu.Lock() - switch index { - case wakerForNotification: + switch w { + case &e.notificationWaker: n := e.fetchNotifications() if n¬ifyClose != 0 { return @@ -788,7 +789,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) { e.mu.Lock() } - case wakerForNewSegment: + case &e.newSegmentWaker: // Process at most maxSegmentsPerWake segments. mayRequeue := true for i := 0; i < maxSegmentsPerWake; i++ { diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 80cd07218..12df7a7b4 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -51,13 +51,6 @@ const ( handshakeCompleted ) -// The following are used to set up sleepers. -const ( - wakerForNotification = iota - wakerForNewSegment - wakerForResend -) - const ( // Maximum space available for options. maxOptionSize = 40 @@ -530,9 +523,9 @@ func (h *handshake) complete() tcpip.Error { // Set up the wakers. var s sleep.Sleeper resendWaker := sleep.Waker{} - s.AddWaker(&resendWaker, wakerForResend) - s.AddWaker(&h.ep.notificationWaker, wakerForNotification) - s.AddWaker(&h.ep.newSegmentWaker, wakerForNewSegment) + s.AddWaker(&resendWaker) + s.AddWaker(&h.ep.notificationWaker) + s.AddWaker(&h.ep.newSegmentWaker) defer s.Done() // Initialize the resend timer. @@ -545,11 +538,10 @@ func (h *handshake) complete() tcpip.Error { // Unlock before blocking, and reacquire again afterwards (h.ep.mu is held // throughout handshake processing). h.ep.mu.Unlock() - index, _ := s.Fetch(true /* block */) + w := s.Fetch(true /* block */) h.ep.mu.Lock() - switch index { - - case wakerForResend: + switch w { + case &resendWaker: if err := timer.reset(); err != nil { return err } @@ -577,7 +569,7 @@ func (h *handshake) complete() tcpip.Error { h.sampleRTTWithTSOnly = true } - case wakerForNotification: + case &h.ep.notificationWaker: n := h.ep.fetchNotifications() if (n¬ifyClose)|(n¬ifyAbort) != 0 { return &tcpip.ErrAborted{} @@ -611,7 +603,7 @@ func (h *handshake) complete() tcpip.Error { // cleared because of a socket layer call. return &tcpip.ErrConnectionAborted{} } - case wakerForNewSegment: + case &h.ep.newSegmentWaker: if err := h.processSegments(); err != nil { return err } @@ -1346,6 +1338,103 @@ func (e *endpoint) protocolMainLoopDone(closeTimer tcpip.Timer) { e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } +// handleWakeup handles a wakeup event while connected. +// +// +checklocks:e.mu +func (e *endpoint) handleWakeup(w, closeWaker *sleep.Waker, closeTimer *tcpip.Timer) tcpip.Error { + switch w { + case &e.sndQueueInfo.sndWaker: + e.sendData(nil /* next */) + case &e.newSegmentWaker: + return e.handleSegmentsLocked(false /* fastPath */) + case &e.snd.resendWaker: + if !e.snd.retransmitTimerExpired() { + e.stack.Stats().TCP.EstablishedTimedout.Increment() + return &tcpip.ErrTimeout{} + } + case closeWaker: + // This means the socket is being closed due to the + // TCP-FIN-WAIT2 timeout was hit. Just mark the socket as + // closed. + e.transitionToStateCloseLocked() + e.workerCleanup = true + case &e.snd.probeWaker: + return e.snd.probeTimerExpired() + case &e.keepalive.waker: + return e.keepaliveTimerExpired() + case &e.notificationWaker: + n := e.fetchNotifications() + if n¬ifyNonZeroReceiveWindow != 0 { + e.rcv.nonZeroWindow() + } + + if n¬ifyMTUChanged != 0 { + e.sndQueueInfo.sndQueueMu.Lock() + count := e.sndQueueInfo.PacketTooBigCount + e.sndQueueInfo.PacketTooBigCount = 0 + mtu := e.sndQueueInfo.SndMTU + e.sndQueueInfo.sndQueueMu.Unlock() + + e.snd.updateMaxPayloadSize(mtu, count) + } + + if n¬ifyReset != 0 || n¬ifyAbort != 0 { + return &tcpip.ErrConnectionAborted{} + } + + if n¬ifyResetByPeer != 0 { + return &tcpip.ErrConnectionReset{} + } + + if n¬ifyClose != 0 && e.closed { + switch e.EndpointState() { + case StateEstablished: + // Perform full shutdown if the endpoint is + // still established. This can occur when + // notifyClose was asserted just before + // becoming established. + e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead) + case StateFinWait2: + // The socket has been closed and we are in + // FIN_WAIT2 so start the FIN_WAIT2 timer. + if *closeTimer == nil { + *closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert) + } + } + } + + if n¬ifyKeepaliveChanged != 0 { + // The timer could fire in background when the endpoint + // is drained. That's OK. See above. + e.resetKeepaliveTimer(true) + } + + if n¬ifyDrain != 0 { + for !e.segmentQueue.empty() { + if err := e.handleSegmentsLocked(false /* fastPath */); err != nil { + return err + } + } + if !e.EndpointState().closed() { + // Only block the worker if the endpoint + // is not in closed state or error state. + close(e.drainDone) + e.mu.Unlock() + <-e.undrain + e.mu.Lock() + } + } + + // N.B. notifyTickleWorker may be set, but there is no action + // to take in this case. + case &e.snd.reorderWaker: + return e.snd.rc.reorderTimerExpired() + default: + panic("unknown waker") // Shouldn't happen. + } + return nil +} + // protocolMainLoop is the main loop of the TCP protocol. It runs in its own // goroutine and is responsible for sending segments and handling received // segments. @@ -1403,139 +1492,16 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.mu.Lock() } - // Set up the functions that will be called when the main protocol loop - // wakes up. - funcs := []struct { - w *sleep.Waker - f func() tcpip.Error - }{ - { - w: &e.sndQueueInfo.sndWaker, - f: func() tcpip.Error { - e.sendData(nil /* next */) - return nil - }, - }, - { - w: &closeWaker, - f: func() tcpip.Error { - // This means the socket is being closed due - // to the TCP-FIN-WAIT2 timeout was hit. Just - // mark the socket as closed. - e.transitionToStateCloseLocked() - e.workerCleanup = true - return nil - }, - }, - { - w: &e.snd.resendWaker, - f: func() tcpip.Error { - if !e.snd.retransmitTimerExpired() { - e.stack.Stats().TCP.EstablishedTimedout.Increment() - return &tcpip.ErrTimeout{} - } - return nil - }, - }, - { - w: &e.snd.probeWaker, - f: e.snd.probeTimerExpired, - }, - { - w: &e.newSegmentWaker, - f: func() tcpip.Error { - return e.handleSegmentsLocked(false /* fastPath */) - }, - }, - { - w: &e.keepalive.waker, - f: e.keepaliveTimerExpired, - }, - { - w: &e.notificationWaker, - f: func() tcpip.Error { - n := e.fetchNotifications() - if n¬ifyNonZeroReceiveWindow != 0 { - e.rcv.nonZeroWindow() - } - - if n¬ifyMTUChanged != 0 { - e.sndQueueInfo.sndQueueMu.Lock() - count := e.sndQueueInfo.PacketTooBigCount - e.sndQueueInfo.PacketTooBigCount = 0 - mtu := e.sndQueueInfo.SndMTU - e.sndQueueInfo.sndQueueMu.Unlock() - - e.snd.updateMaxPayloadSize(mtu, count) - } - - if n¬ifyReset != 0 || n¬ifyAbort != 0 { - return &tcpip.ErrConnectionAborted{} - } - - if n¬ifyResetByPeer != 0 { - return &tcpip.ErrConnectionReset{} - } - - if n¬ifyClose != 0 && e.closed { - switch e.EndpointState() { - case StateEstablished: - // Perform full shutdown if the endpoint is still - // established. This can occur when notifyClose - // was asserted just before becoming established. - e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead) - case StateFinWait2: - // The socket has been closed and we are in FIN_WAIT2 - // so start the FIN_WAIT2 timer. - if closeTimer == nil { - closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert) - } - } - } - - if n¬ifyKeepaliveChanged != 0 { - // The timer could fire in background - // when the endpoint is drained. That's - // OK. See above. - e.resetKeepaliveTimer(true) - } - - if n¬ifyDrain != 0 { - for !e.segmentQueue.empty() { - if err := e.handleSegmentsLocked(false /* fastPath */); err != nil { - return err - } - } - if !e.EndpointState().closed() { - // Only block the worker if the endpoint - // is not in closed state or error state. - close(e.drainDone) - e.mu.Unlock() // +checklocksforce - <-e.undrain - e.mu.Lock() - } - } - - if n¬ifyTickleWorker != 0 { - // Just a tickle notification. No need to do - // anything. - return nil - } - - return nil - }, - }, - { - w: &e.snd.reorderWaker, - f: e.snd.rc.reorderTimerExpired, - }, - } - - // Initialize the sleeper based on the wakers in funcs. + // Add all wakers. var s sleep.Sleeper - for i := range funcs { - s.AddWaker(funcs[i].w, i) - } + s.AddWaker(&e.sndQueueInfo.sndWaker) + s.AddWaker(&e.newSegmentWaker) + s.AddWaker(&e.snd.resendWaker) + s.AddWaker(&e.snd.probeWaker) + s.AddWaker(&closeWaker) + s.AddWaker(&e.keepalive.waker) + s.AddWaker(&e.notificationWaker) + s.AddWaker(&e.snd.reorderWaker) // Notify the caller that the waker initialization is complete and the // endpoint is ready. @@ -1581,7 +1547,7 @@ loop: } e.mu.Unlock() - v, _ := s.Fetch(true /* block */) + w := s.Fetch(true /* block */) e.mu.Lock() // We need to double check here because the notification may be @@ -1601,7 +1567,7 @@ loop: case StateClose: break loop default: - if err := funcs[v].f(); err != nil { + if err := e.handleWakeup(w, &closeWaker, &closeTimer); err != nil { cleanupOnError(err) e.protocolMainLoopDone(closeTimer) return @@ -1714,26 +1680,22 @@ func (e *endpoint) doTimeWait() (twReuse func()) { timeWaitDuration = time.Duration(tcpTW) } - const newSegment = 1 - const notification = 2 - const timeWaitDone = 3 - var s sleep.Sleeper defer s.Done() - s.AddWaker(&e.newSegmentWaker, newSegment) - s.AddWaker(&e.notificationWaker, notification) + s.AddWaker(&e.newSegmentWaker) + s.AddWaker(&e.notificationWaker) var timeWaitWaker sleep.Waker - s.AddWaker(&timeWaitWaker, timeWaitDone) + s.AddWaker(&timeWaitWaker) timeWaitTimer := e.stack.Clock().AfterFunc(timeWaitDuration, timeWaitWaker.Assert) defer timeWaitTimer.Stop() for { e.mu.Unlock() - v, _ := s.Fetch(true /* block */) + w := s.Fetch(true /* block */) e.mu.Lock() - switch v { - case newSegment: + switch w { + case &e.newSegmentWaker: extendTimeWait, reuseTW := e.handleTimeWaitSegments() if reuseTW != nil { return reuseTW @@ -1741,7 +1703,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) { if extendTimeWait { timeWaitTimer.Reset(timeWaitDuration) } - case notification: + case &e.notificationWaker: n := e.fetchNotifications() if n¬ifyAbort != 0 { return nil @@ -1759,7 +1721,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) { e.mu.Lock() return nil } - case timeWaitDone: + case &timeWaitWaker: return nil } } diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go index 7d110516b..2e93d2664 100644 --- a/pkg/tcpip/transport/tcp/dispatcher.go +++ b/pkg/tcpip/transport/tcp/dispatcher.go @@ -94,9 +94,10 @@ func (p *processor) start(wg *sync.WaitGroup) { defer p.sleeper.Done() for { - if id, _ := p.sleeper.Fetch(true); id == closeWaker { + if w := p.sleeper.Fetch(true); w == &p.closeWaker { break } + // If not the closeWaker, it must be &p.newEndpointWaker. for { ep := p.epQ.dequeue() if ep == nil { @@ -154,8 +155,8 @@ func (d *dispatcher) init(rng *rand.Rand, nProcessors int) { d.seed = rng.Uint32() for i := range d.processors { p := &d.processors[i] - p.sleeper.AddWaker(&p.newEndpointWaker, newEndpointWaker) - p.sleeper.AddWaker(&p.closeWaker, closeWaker) + p.sleeper.AddWaker(&p.newEndpointWaker) + p.sleeper.AddWaker(&p.closeWaker) d.wg.Add(1) // NB: sleeper-waker registration must happen synchronously to avoid races // with `close`. It's possible to pull all this logic into `start`, but |