diff options
-rw-r--r-- | pkg/sleep/sleep_unsafe.go | 108 | ||||
-rw-r--r-- | pkg/tcpip/link/qdisc/fifo/endpoint.go | 11 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/accept.go | 13 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 286 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/dispatcher.go | 7 |
5 files changed, 185 insertions, 240 deletions
diff --git a/pkg/sleep/sleep_unsafe.go b/pkg/sleep/sleep_unsafe.go index c44206b1e..86c7cc983 100644 --- a/pkg/sleep/sleep_unsafe.go +++ b/pkg/sleep/sleep_unsafe.go @@ -37,15 +37,15 @@ // // // One time set-up. // s := sleep.Sleeper{} -// s.AddWaker(&w1, constant1) -// s.AddWaker(&w2, constant2) +// s.AddWaker(&w1) +// s.AddWaker(&w2) // // // Called repeatedly. // for { -// switch id, _ := s.Fetch(true); id { -// case constant1: +// switch s.Fetch(true) { +// case &w1: // // Do work triggered by w1 being asserted. -// case constant2: +// case &w2: // // Do work triggered by w2 being asserted. // } // } @@ -119,13 +119,18 @@ type Sleeper struct { waitingG uintptr } -// AddWaker associates the given waker to the sleeper. id is the value to be -// returned when the sleeper is woken by the given waker. -func (s *Sleeper) AddWaker(w *Waker, id int) { +// AddWaker associates the given waker to the sleeper. +func (s *Sleeper) AddWaker(w *Waker) { + if w.allWakersNext != nil { + panic("waker has non-nil allWakersNext; owned by another sleeper?") + } + if w.next != nil { + panic("waker has non-nil next; queued in another sleeper?") + } + // Add the waker to the list of all wakers. w.allWakersNext = s.allWakers s.allWakers = w - w.id = id // Try to associate the waker with the sleeper. If it's already // asserted, we simply enqueue it in the "ready" list. @@ -213,28 +218,26 @@ func commitSleep(g uintptr, waitingG unsafe.Pointer) bool { return sync.RaceUncheckedAtomicCompareAndSwapUintptr((*uintptr)(waitingG), preparingG, g) } -// Fetch fetches the next wake-up notification. If a notification is immediately -// available, it is returned right away. Otherwise, the behavior depends on the -// value of 'block': if true, the current goroutine blocks until a notification -// arrives, then returns it; if false, returns 'ok' as false. -// -// When 'ok' is true, the value of 'id' corresponds to the id associated with -// the waker; when 'ok' is false, 'id' is undefined. +// Fetch fetches the next wake-up notification. If a notification is +// immediately available, the asserted waker is returned immediately. +// Otherwise, the behavior depends on the value of 'block': if true, the +// current goroutine blocks until a notification arrives and returns the +// asserted waker; if false, nil will be returned. // // N.B. This method is *not* thread-safe. Only one goroutine at a time is // allowed to call this method. -func (s *Sleeper) Fetch(block bool) (id int, ok bool) { +func (s *Sleeper) Fetch(block bool) *Waker { for { w := s.nextWaker(block) if w == nil { - return -1, false + return nil } // Reassociate the waker with the sleeper. If the waker was // still asserted we can return it, otherwise try the next one. old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s))) if old == &assertedSleeper { - return w.id, true + return w } } } @@ -243,51 +246,34 @@ func (s *Sleeper) Fetch(block bool) (id int, ok bool) { // removes the association with all wakers so that they can be safely reused // by another sleeper after Done() returns. func (s *Sleeper) Done() { - // Remove all associations that we can, and build a list of the ones - // we could not. An association can be removed right away from waker w - // if w.s has a pointer to the sleeper, that is, the waker is not - // asserted yet. By atomically switching w.s to nil, we guarantee that - // subsequent calls to Assert() on the waker will not result in it being - // queued to this sleeper. - var pending *Waker - w := s.allWakers - for w != nil { - next := w.allWakersNext - for { - t := atomic.LoadPointer(&w.s) - if t != usleeper(s) { - w.allWakersNext = pending - pending = w - break - } - - if atomic.CompareAndSwapPointer(&w.s, t, nil) { - break - } + // Remove all associations that we can, and build a list of the ones we + // could not. An association can be removed right away from waker w if + // w.s has a pointer to the sleeper, that is, the waker is not asserted + // yet. By atomically switching w.s to nil, we guarantee that + // subsequent calls to Assert() on the waker will not result in it + // being queued. + for w := s.allWakers; w != nil; w = s.allWakers { + next := w.allWakersNext // Before zapping. + if atomic.CompareAndSwapPointer(&w.s, usleeper(s), nil) { + w.allWakersNext = nil + w.next = nil + s.allWakers = next // Move ahead. + continue } - w = next - } - // The associations that we could not remove are either asserted, or in - // the process of being asserted, or have been asserted and cleared - // before being pulled from the sleeper lists. We must wait for them all - // to make it to the sleeper lists, so that we know that the wakers - // won't do any more work towards waking this sleeper up. - for pending != nil { - pulled := s.nextWaker(true) - - // Remove the waker we just pulled from the list of associated - // wakers. - prev := &pending - for w := *prev; w != nil; w = *prev { - if pulled == w { - *prev = w.allWakersNext - break + // Dequeue exactly one waiter from the list, it may not be + // this one but we know this one is in the process. We must + // leave it in the asserted state but drop it from our lists. + if w := s.nextWaker(true); w != nil { + prev := &s.allWakers + for *prev != w { + prev = &((*prev).allWakersNext) } - prev = &w.allWakersNext + *prev = (*prev).allWakersNext + w.allWakersNext = nil + w.next = nil } } - s.allWakers = nil } // enqueueAssertedWaker enqueues an asserted waker to the "ready" circular list @@ -349,10 +335,6 @@ type Waker struct { // allWakersNext is used to form a linked list of all wakers associated // to a given sleeper. allWakersNext *Waker - - // id is the value to be returned to sleepers when they wake up due to - // this waker being asserted. - id int } // Assert moves the waker to an asserted state, if it isn't asserted yet. When diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go index b41e3e2fa..c15cbf81b 100644 --- a/pkg/tcpip/link/qdisc/fifo/endpoint.go +++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go @@ -73,20 +73,19 @@ func New(lower stack.LinkEndpoint, n int, queueLen int) stack.LinkEndpoint { } func (q *queueDispatcher) dispatchLoop() { - const newPacketWakerID = 1 - const closeWakerID = 2 s := sleep.Sleeper{} - s.AddWaker(&q.newPacketWaker, newPacketWakerID) - s.AddWaker(&q.closeWaker, closeWakerID) + s.AddWaker(&q.newPacketWaker) + s.AddWaker(&q.closeWaker) defer s.Done() const batchSize = 32 var batch stack.PacketBufferList for { - id, ok := s.Fetch(true) - if ok && id == closeWakerID { + w := s.Fetch(true) + if w == &q.closeWaker { return } + // Must otherwise be the newPacketWaker. for pkt := q.q.dequeue(); pkt != nil; pkt = q.q.dequeue() { batch.PushBack(pkt) if batch.Len() < batchSize && !q.q.empty() { diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index caf14b0dc..d0f68b72c 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -762,14 +762,15 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) { }() var s sleep.Sleeper - s.AddWaker(&e.notificationWaker, wakerForNotification) - s.AddWaker(&e.newSegmentWaker, wakerForNewSegment) + s.AddWaker(&e.notificationWaker) + s.AddWaker(&e.newSegmentWaker) + defer s.Done() for { e.mu.Unlock() - index, _ := s.Fetch(true) + w := s.Fetch(true) e.mu.Lock() - switch index { - case wakerForNotification: + switch w { + case &e.notificationWaker: n := e.fetchNotifications() if n¬ifyClose != 0 { return @@ -788,7 +789,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) { e.mu.Lock() } - case wakerForNewSegment: + case &e.newSegmentWaker: // Process at most maxSegmentsPerWake segments. mayRequeue := true for i := 0; i < maxSegmentsPerWake; i++ { diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 80cd07218..12df7a7b4 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -51,13 +51,6 @@ const ( handshakeCompleted ) -// The following are used to set up sleepers. -const ( - wakerForNotification = iota - wakerForNewSegment - wakerForResend -) - const ( // Maximum space available for options. maxOptionSize = 40 @@ -530,9 +523,9 @@ func (h *handshake) complete() tcpip.Error { // Set up the wakers. var s sleep.Sleeper resendWaker := sleep.Waker{} - s.AddWaker(&resendWaker, wakerForResend) - s.AddWaker(&h.ep.notificationWaker, wakerForNotification) - s.AddWaker(&h.ep.newSegmentWaker, wakerForNewSegment) + s.AddWaker(&resendWaker) + s.AddWaker(&h.ep.notificationWaker) + s.AddWaker(&h.ep.newSegmentWaker) defer s.Done() // Initialize the resend timer. @@ -545,11 +538,10 @@ func (h *handshake) complete() tcpip.Error { // Unlock before blocking, and reacquire again afterwards (h.ep.mu is held // throughout handshake processing). h.ep.mu.Unlock() - index, _ := s.Fetch(true /* block */) + w := s.Fetch(true /* block */) h.ep.mu.Lock() - switch index { - - case wakerForResend: + switch w { + case &resendWaker: if err := timer.reset(); err != nil { return err } @@ -577,7 +569,7 @@ func (h *handshake) complete() tcpip.Error { h.sampleRTTWithTSOnly = true } - case wakerForNotification: + case &h.ep.notificationWaker: n := h.ep.fetchNotifications() if (n¬ifyClose)|(n¬ifyAbort) != 0 { return &tcpip.ErrAborted{} @@ -611,7 +603,7 @@ func (h *handshake) complete() tcpip.Error { // cleared because of a socket layer call. return &tcpip.ErrConnectionAborted{} } - case wakerForNewSegment: + case &h.ep.newSegmentWaker: if err := h.processSegments(); err != nil { return err } @@ -1346,6 +1338,103 @@ func (e *endpoint) protocolMainLoopDone(closeTimer tcpip.Timer) { e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } +// handleWakeup handles a wakeup event while connected. +// +// +checklocks:e.mu +func (e *endpoint) handleWakeup(w, closeWaker *sleep.Waker, closeTimer *tcpip.Timer) tcpip.Error { + switch w { + case &e.sndQueueInfo.sndWaker: + e.sendData(nil /* next */) + case &e.newSegmentWaker: + return e.handleSegmentsLocked(false /* fastPath */) + case &e.snd.resendWaker: + if !e.snd.retransmitTimerExpired() { + e.stack.Stats().TCP.EstablishedTimedout.Increment() + return &tcpip.ErrTimeout{} + } + case closeWaker: + // This means the socket is being closed due to the + // TCP-FIN-WAIT2 timeout was hit. Just mark the socket as + // closed. + e.transitionToStateCloseLocked() + e.workerCleanup = true + case &e.snd.probeWaker: + return e.snd.probeTimerExpired() + case &e.keepalive.waker: + return e.keepaliveTimerExpired() + case &e.notificationWaker: + n := e.fetchNotifications() + if n¬ifyNonZeroReceiveWindow != 0 { + e.rcv.nonZeroWindow() + } + + if n¬ifyMTUChanged != 0 { + e.sndQueueInfo.sndQueueMu.Lock() + count := e.sndQueueInfo.PacketTooBigCount + e.sndQueueInfo.PacketTooBigCount = 0 + mtu := e.sndQueueInfo.SndMTU + e.sndQueueInfo.sndQueueMu.Unlock() + + e.snd.updateMaxPayloadSize(mtu, count) + } + + if n¬ifyReset != 0 || n¬ifyAbort != 0 { + return &tcpip.ErrConnectionAborted{} + } + + if n¬ifyResetByPeer != 0 { + return &tcpip.ErrConnectionReset{} + } + + if n¬ifyClose != 0 && e.closed { + switch e.EndpointState() { + case StateEstablished: + // Perform full shutdown if the endpoint is + // still established. This can occur when + // notifyClose was asserted just before + // becoming established. + e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead) + case StateFinWait2: + // The socket has been closed and we are in + // FIN_WAIT2 so start the FIN_WAIT2 timer. + if *closeTimer == nil { + *closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert) + } + } + } + + if n¬ifyKeepaliveChanged != 0 { + // The timer could fire in background when the endpoint + // is drained. That's OK. See above. + e.resetKeepaliveTimer(true) + } + + if n¬ifyDrain != 0 { + for !e.segmentQueue.empty() { + if err := e.handleSegmentsLocked(false /* fastPath */); err != nil { + return err + } + } + if !e.EndpointState().closed() { + // Only block the worker if the endpoint + // is not in closed state or error state. + close(e.drainDone) + e.mu.Unlock() + <-e.undrain + e.mu.Lock() + } + } + + // N.B. notifyTickleWorker may be set, but there is no action + // to take in this case. + case &e.snd.reorderWaker: + return e.snd.rc.reorderTimerExpired() + default: + panic("unknown waker") // Shouldn't happen. + } + return nil +} + // protocolMainLoop is the main loop of the TCP protocol. It runs in its own // goroutine and is responsible for sending segments and handling received // segments. @@ -1403,139 +1492,16 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.mu.Lock() } - // Set up the functions that will be called when the main protocol loop - // wakes up. - funcs := []struct { - w *sleep.Waker - f func() tcpip.Error - }{ - { - w: &e.sndQueueInfo.sndWaker, - f: func() tcpip.Error { - e.sendData(nil /* next */) - return nil - }, - }, - { - w: &closeWaker, - f: func() tcpip.Error { - // This means the socket is being closed due - // to the TCP-FIN-WAIT2 timeout was hit. Just - // mark the socket as closed. - e.transitionToStateCloseLocked() - e.workerCleanup = true - return nil - }, - }, - { - w: &e.snd.resendWaker, - f: func() tcpip.Error { - if !e.snd.retransmitTimerExpired() { - e.stack.Stats().TCP.EstablishedTimedout.Increment() - return &tcpip.ErrTimeout{} - } - return nil - }, - }, - { - w: &e.snd.probeWaker, - f: e.snd.probeTimerExpired, - }, - { - w: &e.newSegmentWaker, - f: func() tcpip.Error { - return e.handleSegmentsLocked(false /* fastPath */) - }, - }, - { - w: &e.keepalive.waker, - f: e.keepaliveTimerExpired, - }, - { - w: &e.notificationWaker, - f: func() tcpip.Error { - n := e.fetchNotifications() - if n¬ifyNonZeroReceiveWindow != 0 { - e.rcv.nonZeroWindow() - } - - if n¬ifyMTUChanged != 0 { - e.sndQueueInfo.sndQueueMu.Lock() - count := e.sndQueueInfo.PacketTooBigCount - e.sndQueueInfo.PacketTooBigCount = 0 - mtu := e.sndQueueInfo.SndMTU - e.sndQueueInfo.sndQueueMu.Unlock() - - e.snd.updateMaxPayloadSize(mtu, count) - } - - if n¬ifyReset != 0 || n¬ifyAbort != 0 { - return &tcpip.ErrConnectionAborted{} - } - - if n¬ifyResetByPeer != 0 { - return &tcpip.ErrConnectionReset{} - } - - if n¬ifyClose != 0 && e.closed { - switch e.EndpointState() { - case StateEstablished: - // Perform full shutdown if the endpoint is still - // established. This can occur when notifyClose - // was asserted just before becoming established. - e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead) - case StateFinWait2: - // The socket has been closed and we are in FIN_WAIT2 - // so start the FIN_WAIT2 timer. - if closeTimer == nil { - closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert) - } - } - } - - if n¬ifyKeepaliveChanged != 0 { - // The timer could fire in background - // when the endpoint is drained. That's - // OK. See above. - e.resetKeepaliveTimer(true) - } - - if n¬ifyDrain != 0 { - for !e.segmentQueue.empty() { - if err := e.handleSegmentsLocked(false /* fastPath */); err != nil { - return err - } - } - if !e.EndpointState().closed() { - // Only block the worker if the endpoint - // is not in closed state or error state. - close(e.drainDone) - e.mu.Unlock() // +checklocksforce - <-e.undrain - e.mu.Lock() - } - } - - if n¬ifyTickleWorker != 0 { - // Just a tickle notification. No need to do - // anything. - return nil - } - - return nil - }, - }, - { - w: &e.snd.reorderWaker, - f: e.snd.rc.reorderTimerExpired, - }, - } - - // Initialize the sleeper based on the wakers in funcs. + // Add all wakers. var s sleep.Sleeper - for i := range funcs { - s.AddWaker(funcs[i].w, i) - } + s.AddWaker(&e.sndQueueInfo.sndWaker) + s.AddWaker(&e.newSegmentWaker) + s.AddWaker(&e.snd.resendWaker) + s.AddWaker(&e.snd.probeWaker) + s.AddWaker(&closeWaker) + s.AddWaker(&e.keepalive.waker) + s.AddWaker(&e.notificationWaker) + s.AddWaker(&e.snd.reorderWaker) // Notify the caller that the waker initialization is complete and the // endpoint is ready. @@ -1581,7 +1547,7 @@ loop: } e.mu.Unlock() - v, _ := s.Fetch(true /* block */) + w := s.Fetch(true /* block */) e.mu.Lock() // We need to double check here because the notification may be @@ -1601,7 +1567,7 @@ loop: case StateClose: break loop default: - if err := funcs[v].f(); err != nil { + if err := e.handleWakeup(w, &closeWaker, &closeTimer); err != nil { cleanupOnError(err) e.protocolMainLoopDone(closeTimer) return @@ -1714,26 +1680,22 @@ func (e *endpoint) doTimeWait() (twReuse func()) { timeWaitDuration = time.Duration(tcpTW) } - const newSegment = 1 - const notification = 2 - const timeWaitDone = 3 - var s sleep.Sleeper defer s.Done() - s.AddWaker(&e.newSegmentWaker, newSegment) - s.AddWaker(&e.notificationWaker, notification) + s.AddWaker(&e.newSegmentWaker) + s.AddWaker(&e.notificationWaker) var timeWaitWaker sleep.Waker - s.AddWaker(&timeWaitWaker, timeWaitDone) + s.AddWaker(&timeWaitWaker) timeWaitTimer := e.stack.Clock().AfterFunc(timeWaitDuration, timeWaitWaker.Assert) defer timeWaitTimer.Stop() for { e.mu.Unlock() - v, _ := s.Fetch(true /* block */) + w := s.Fetch(true /* block */) e.mu.Lock() - switch v { - case newSegment: + switch w { + case &e.newSegmentWaker: extendTimeWait, reuseTW := e.handleTimeWaitSegments() if reuseTW != nil { return reuseTW @@ -1741,7 +1703,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) { if extendTimeWait { timeWaitTimer.Reset(timeWaitDuration) } - case notification: + case &e.notificationWaker: n := e.fetchNotifications() if n¬ifyAbort != 0 { return nil @@ -1759,7 +1721,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) { e.mu.Lock() return nil } - case timeWaitDone: + case &timeWaitWaker: return nil } } diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go index 7d110516b..2e93d2664 100644 --- a/pkg/tcpip/transport/tcp/dispatcher.go +++ b/pkg/tcpip/transport/tcp/dispatcher.go @@ -94,9 +94,10 @@ func (p *processor) start(wg *sync.WaitGroup) { defer p.sleeper.Done() for { - if id, _ := p.sleeper.Fetch(true); id == closeWaker { + if w := p.sleeper.Fetch(true); w == &p.closeWaker { break } + // If not the closeWaker, it must be &p.newEndpointWaker. for { ep := p.epQ.dequeue() if ep == nil { @@ -154,8 +155,8 @@ func (d *dispatcher) init(rng *rand.Rand, nProcessors int) { d.seed = rng.Uint32() for i := range d.processors { p := &d.processors[i] - p.sleeper.AddWaker(&p.newEndpointWaker, newEndpointWaker) - p.sleeper.AddWaker(&p.closeWaker, closeWaker) + p.sleeper.AddWaker(&p.newEndpointWaker) + p.sleeper.AddWaker(&p.closeWaker) d.wg.Add(1) // NB: sleeper-waker registration must happen synchronously to avoid races // with `close`. It's possible to pull all this logic into `start`, but |