summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/sleep/sleep_unsafe.go108
-rw-r--r--pkg/tcpip/link/qdisc/fifo/endpoint.go11
-rw-r--r--pkg/tcpip/transport/tcp/accept.go13
-rw-r--r--pkg/tcpip/transport/tcp/connect.go286
-rw-r--r--pkg/tcpip/transport/tcp/dispatcher.go7
5 files changed, 185 insertions, 240 deletions
diff --git a/pkg/sleep/sleep_unsafe.go b/pkg/sleep/sleep_unsafe.go
index c44206b1e..86c7cc983 100644
--- a/pkg/sleep/sleep_unsafe.go
+++ b/pkg/sleep/sleep_unsafe.go
@@ -37,15 +37,15 @@
//
// // One time set-up.
// s := sleep.Sleeper{}
-// s.AddWaker(&w1, constant1)
-// s.AddWaker(&w2, constant2)
+// s.AddWaker(&w1)
+// s.AddWaker(&w2)
//
// // Called repeatedly.
// for {
-// switch id, _ := s.Fetch(true); id {
-// case constant1:
+// switch s.Fetch(true) {
+// case &w1:
// // Do work triggered by w1 being asserted.
-// case constant2:
+// case &w2:
// // Do work triggered by w2 being asserted.
// }
// }
@@ -119,13 +119,18 @@ type Sleeper struct {
waitingG uintptr
}
-// AddWaker associates the given waker to the sleeper. id is the value to be
-// returned when the sleeper is woken by the given waker.
-func (s *Sleeper) AddWaker(w *Waker, id int) {
+// AddWaker associates the given waker to the sleeper.
+func (s *Sleeper) AddWaker(w *Waker) {
+ if w.allWakersNext != nil {
+ panic("waker has non-nil allWakersNext; owned by another sleeper?")
+ }
+ if w.next != nil {
+ panic("waker has non-nil next; queued in another sleeper?")
+ }
+
// Add the waker to the list of all wakers.
w.allWakersNext = s.allWakers
s.allWakers = w
- w.id = id
// Try to associate the waker with the sleeper. If it's already
// asserted, we simply enqueue it in the "ready" list.
@@ -213,28 +218,26 @@ func commitSleep(g uintptr, waitingG unsafe.Pointer) bool {
return sync.RaceUncheckedAtomicCompareAndSwapUintptr((*uintptr)(waitingG), preparingG, g)
}
-// Fetch fetches the next wake-up notification. If a notification is immediately
-// available, it is returned right away. Otherwise, the behavior depends on the
-// value of 'block': if true, the current goroutine blocks until a notification
-// arrives, then returns it; if false, returns 'ok' as false.
-//
-// When 'ok' is true, the value of 'id' corresponds to the id associated with
-// the waker; when 'ok' is false, 'id' is undefined.
+// Fetch fetches the next wake-up notification. If a notification is
+// immediately available, the asserted waker is returned immediately.
+// Otherwise, the behavior depends on the value of 'block': if true, the
+// current goroutine blocks until a notification arrives and returns the
+// asserted waker; if false, nil will be returned.
//
// N.B. This method is *not* thread-safe. Only one goroutine at a time is
// allowed to call this method.
-func (s *Sleeper) Fetch(block bool) (id int, ok bool) {
+func (s *Sleeper) Fetch(block bool) *Waker {
for {
w := s.nextWaker(block)
if w == nil {
- return -1, false
+ return nil
}
// Reassociate the waker with the sleeper. If the waker was
// still asserted we can return it, otherwise try the next one.
old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s)))
if old == &assertedSleeper {
- return w.id, true
+ return w
}
}
}
@@ -243,51 +246,34 @@ func (s *Sleeper) Fetch(block bool) (id int, ok bool) {
// removes the association with all wakers so that they can be safely reused
// by another sleeper after Done() returns.
func (s *Sleeper) Done() {
- // Remove all associations that we can, and build a list of the ones
- // we could not. An association can be removed right away from waker w
- // if w.s has a pointer to the sleeper, that is, the waker is not
- // asserted yet. By atomically switching w.s to nil, we guarantee that
- // subsequent calls to Assert() on the waker will not result in it being
- // queued to this sleeper.
- var pending *Waker
- w := s.allWakers
- for w != nil {
- next := w.allWakersNext
- for {
- t := atomic.LoadPointer(&w.s)
- if t != usleeper(s) {
- w.allWakersNext = pending
- pending = w
- break
- }
-
- if atomic.CompareAndSwapPointer(&w.s, t, nil) {
- break
- }
+ // Remove all associations that we can, and build a list of the ones we
+ // could not. An association can be removed right away from waker w if
+ // w.s has a pointer to the sleeper, that is, the waker is not asserted
+ // yet. By atomically switching w.s to nil, we guarantee that
+ // subsequent calls to Assert() on the waker will not result in it
+ // being queued.
+ for w := s.allWakers; w != nil; w = s.allWakers {
+ next := w.allWakersNext // Before zapping.
+ if atomic.CompareAndSwapPointer(&w.s, usleeper(s), nil) {
+ w.allWakersNext = nil
+ w.next = nil
+ s.allWakers = next // Move ahead.
+ continue
}
- w = next
- }
- // The associations that we could not remove are either asserted, or in
- // the process of being asserted, or have been asserted and cleared
- // before being pulled from the sleeper lists. We must wait for them all
- // to make it to the sleeper lists, so that we know that the wakers
- // won't do any more work towards waking this sleeper up.
- for pending != nil {
- pulled := s.nextWaker(true)
-
- // Remove the waker we just pulled from the list of associated
- // wakers.
- prev := &pending
- for w := *prev; w != nil; w = *prev {
- if pulled == w {
- *prev = w.allWakersNext
- break
+ // Dequeue exactly one waiter from the list, it may not be
+ // this one but we know this one is in the process. We must
+ // leave it in the asserted state but drop it from our lists.
+ if w := s.nextWaker(true); w != nil {
+ prev := &s.allWakers
+ for *prev != w {
+ prev = &((*prev).allWakersNext)
}
- prev = &w.allWakersNext
+ *prev = (*prev).allWakersNext
+ w.allWakersNext = nil
+ w.next = nil
}
}
- s.allWakers = nil
}
// enqueueAssertedWaker enqueues an asserted waker to the "ready" circular list
@@ -349,10 +335,6 @@ type Waker struct {
// allWakersNext is used to form a linked list of all wakers associated
// to a given sleeper.
allWakersNext *Waker
-
- // id is the value to be returned to sleepers when they wake up due to
- // this waker being asserted.
- id int
}
// Assert moves the waker to an asserted state, if it isn't asserted yet. When
diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go
index b41e3e2fa..c15cbf81b 100644
--- a/pkg/tcpip/link/qdisc/fifo/endpoint.go
+++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go
@@ -73,20 +73,19 @@ func New(lower stack.LinkEndpoint, n int, queueLen int) stack.LinkEndpoint {
}
func (q *queueDispatcher) dispatchLoop() {
- const newPacketWakerID = 1
- const closeWakerID = 2
s := sleep.Sleeper{}
- s.AddWaker(&q.newPacketWaker, newPacketWakerID)
- s.AddWaker(&q.closeWaker, closeWakerID)
+ s.AddWaker(&q.newPacketWaker)
+ s.AddWaker(&q.closeWaker)
defer s.Done()
const batchSize = 32
var batch stack.PacketBufferList
for {
- id, ok := s.Fetch(true)
- if ok && id == closeWakerID {
+ w := s.Fetch(true)
+ if w == &q.closeWaker {
return
}
+ // Must otherwise be the newPacketWaker.
for pkt := q.q.dequeue(); pkt != nil; pkt = q.q.dequeue() {
batch.PushBack(pkt)
if batch.Len() < batchSize && !q.q.empty() {
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index caf14b0dc..d0f68b72c 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -762,14 +762,15 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) {
}()
var s sleep.Sleeper
- s.AddWaker(&e.notificationWaker, wakerForNotification)
- s.AddWaker(&e.newSegmentWaker, wakerForNewSegment)
+ s.AddWaker(&e.notificationWaker)
+ s.AddWaker(&e.newSegmentWaker)
+ defer s.Done()
for {
e.mu.Unlock()
- index, _ := s.Fetch(true)
+ w := s.Fetch(true)
e.mu.Lock()
- switch index {
- case wakerForNotification:
+ switch w {
+ case &e.notificationWaker:
n := e.fetchNotifications()
if n&notifyClose != 0 {
return
@@ -788,7 +789,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) {
e.mu.Lock()
}
- case wakerForNewSegment:
+ case &e.newSegmentWaker:
// Process at most maxSegmentsPerWake segments.
mayRequeue := true
for i := 0; i < maxSegmentsPerWake; i++ {
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 80cd07218..12df7a7b4 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -51,13 +51,6 @@ const (
handshakeCompleted
)
-// The following are used to set up sleepers.
-const (
- wakerForNotification = iota
- wakerForNewSegment
- wakerForResend
-)
-
const (
// Maximum space available for options.
maxOptionSize = 40
@@ -530,9 +523,9 @@ func (h *handshake) complete() tcpip.Error {
// Set up the wakers.
var s sleep.Sleeper
resendWaker := sleep.Waker{}
- s.AddWaker(&resendWaker, wakerForResend)
- s.AddWaker(&h.ep.notificationWaker, wakerForNotification)
- s.AddWaker(&h.ep.newSegmentWaker, wakerForNewSegment)
+ s.AddWaker(&resendWaker)
+ s.AddWaker(&h.ep.notificationWaker)
+ s.AddWaker(&h.ep.newSegmentWaker)
defer s.Done()
// Initialize the resend timer.
@@ -545,11 +538,10 @@ func (h *handshake) complete() tcpip.Error {
// Unlock before blocking, and reacquire again afterwards (h.ep.mu is held
// throughout handshake processing).
h.ep.mu.Unlock()
- index, _ := s.Fetch(true /* block */)
+ w := s.Fetch(true /* block */)
h.ep.mu.Lock()
- switch index {
-
- case wakerForResend:
+ switch w {
+ case &resendWaker:
if err := timer.reset(); err != nil {
return err
}
@@ -577,7 +569,7 @@ func (h *handshake) complete() tcpip.Error {
h.sampleRTTWithTSOnly = true
}
- case wakerForNotification:
+ case &h.ep.notificationWaker:
n := h.ep.fetchNotifications()
if (n&notifyClose)|(n&notifyAbort) != 0 {
return &tcpip.ErrAborted{}
@@ -611,7 +603,7 @@ func (h *handshake) complete() tcpip.Error {
// cleared because of a socket layer call.
return &tcpip.ErrConnectionAborted{}
}
- case wakerForNewSegment:
+ case &h.ep.newSegmentWaker:
if err := h.processSegments(); err != nil {
return err
}
@@ -1346,6 +1338,103 @@ func (e *endpoint) protocolMainLoopDone(closeTimer tcpip.Timer) {
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
+// handleWakeup handles a wakeup event while connected.
+//
+// +checklocks:e.mu
+func (e *endpoint) handleWakeup(w, closeWaker *sleep.Waker, closeTimer *tcpip.Timer) tcpip.Error {
+ switch w {
+ case &e.sndQueueInfo.sndWaker:
+ e.sendData(nil /* next */)
+ case &e.newSegmentWaker:
+ return e.handleSegmentsLocked(false /* fastPath */)
+ case &e.snd.resendWaker:
+ if !e.snd.retransmitTimerExpired() {
+ e.stack.Stats().TCP.EstablishedTimedout.Increment()
+ return &tcpip.ErrTimeout{}
+ }
+ case closeWaker:
+ // This means the socket is being closed due to the
+ // TCP-FIN-WAIT2 timeout was hit. Just mark the socket as
+ // closed.
+ e.transitionToStateCloseLocked()
+ e.workerCleanup = true
+ case &e.snd.probeWaker:
+ return e.snd.probeTimerExpired()
+ case &e.keepalive.waker:
+ return e.keepaliveTimerExpired()
+ case &e.notificationWaker:
+ n := e.fetchNotifications()
+ if n&notifyNonZeroReceiveWindow != 0 {
+ e.rcv.nonZeroWindow()
+ }
+
+ if n&notifyMTUChanged != 0 {
+ e.sndQueueInfo.sndQueueMu.Lock()
+ count := e.sndQueueInfo.PacketTooBigCount
+ e.sndQueueInfo.PacketTooBigCount = 0
+ mtu := e.sndQueueInfo.SndMTU
+ e.sndQueueInfo.sndQueueMu.Unlock()
+
+ e.snd.updateMaxPayloadSize(mtu, count)
+ }
+
+ if n&notifyReset != 0 || n&notifyAbort != 0 {
+ return &tcpip.ErrConnectionAborted{}
+ }
+
+ if n&notifyResetByPeer != 0 {
+ return &tcpip.ErrConnectionReset{}
+ }
+
+ if n&notifyClose != 0 && e.closed {
+ switch e.EndpointState() {
+ case StateEstablished:
+ // Perform full shutdown if the endpoint is
+ // still established. This can occur when
+ // notifyClose was asserted just before
+ // becoming established.
+ e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead)
+ case StateFinWait2:
+ // The socket has been closed and we are in
+ // FIN_WAIT2 so start the FIN_WAIT2 timer.
+ if *closeTimer == nil {
+ *closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
+ }
+ }
+ }
+
+ if n&notifyKeepaliveChanged != 0 {
+ // The timer could fire in background when the endpoint
+ // is drained. That's OK. See above.
+ e.resetKeepaliveTimer(true)
+ }
+
+ if n&notifyDrain != 0 {
+ for !e.segmentQueue.empty() {
+ if err := e.handleSegmentsLocked(false /* fastPath */); err != nil {
+ return err
+ }
+ }
+ if !e.EndpointState().closed() {
+ // Only block the worker if the endpoint
+ // is not in closed state or error state.
+ close(e.drainDone)
+ e.mu.Unlock()
+ <-e.undrain
+ e.mu.Lock()
+ }
+ }
+
+ // N.B. notifyTickleWorker may be set, but there is no action
+ // to take in this case.
+ case &e.snd.reorderWaker:
+ return e.snd.rc.reorderTimerExpired()
+ default:
+ panic("unknown waker") // Shouldn't happen.
+ }
+ return nil
+}
+
// protocolMainLoop is the main loop of the TCP protocol. It runs in its own
// goroutine and is responsible for sending segments and handling received
// segments.
@@ -1403,139 +1492,16 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
e.mu.Lock()
}
- // Set up the functions that will be called when the main protocol loop
- // wakes up.
- funcs := []struct {
- w *sleep.Waker
- f func() tcpip.Error
- }{
- {
- w: &e.sndQueueInfo.sndWaker,
- f: func() tcpip.Error {
- e.sendData(nil /* next */)
- return nil
- },
- },
- {
- w: &closeWaker,
- f: func() tcpip.Error {
- // This means the socket is being closed due
- // to the TCP-FIN-WAIT2 timeout was hit. Just
- // mark the socket as closed.
- e.transitionToStateCloseLocked()
- e.workerCleanup = true
- return nil
- },
- },
- {
- w: &e.snd.resendWaker,
- f: func() tcpip.Error {
- if !e.snd.retransmitTimerExpired() {
- e.stack.Stats().TCP.EstablishedTimedout.Increment()
- return &tcpip.ErrTimeout{}
- }
- return nil
- },
- },
- {
- w: &e.snd.probeWaker,
- f: e.snd.probeTimerExpired,
- },
- {
- w: &e.newSegmentWaker,
- f: func() tcpip.Error {
- return e.handleSegmentsLocked(false /* fastPath */)
- },
- },
- {
- w: &e.keepalive.waker,
- f: e.keepaliveTimerExpired,
- },
- {
- w: &e.notificationWaker,
- f: func() tcpip.Error {
- n := e.fetchNotifications()
- if n&notifyNonZeroReceiveWindow != 0 {
- e.rcv.nonZeroWindow()
- }
-
- if n&notifyMTUChanged != 0 {
- e.sndQueueInfo.sndQueueMu.Lock()
- count := e.sndQueueInfo.PacketTooBigCount
- e.sndQueueInfo.PacketTooBigCount = 0
- mtu := e.sndQueueInfo.SndMTU
- e.sndQueueInfo.sndQueueMu.Unlock()
-
- e.snd.updateMaxPayloadSize(mtu, count)
- }
-
- if n&notifyReset != 0 || n&notifyAbort != 0 {
- return &tcpip.ErrConnectionAborted{}
- }
-
- if n&notifyResetByPeer != 0 {
- return &tcpip.ErrConnectionReset{}
- }
-
- if n&notifyClose != 0 && e.closed {
- switch e.EndpointState() {
- case StateEstablished:
- // Perform full shutdown if the endpoint is still
- // established. This can occur when notifyClose
- // was asserted just before becoming established.
- e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead)
- case StateFinWait2:
- // The socket has been closed and we are in FIN_WAIT2
- // so start the FIN_WAIT2 timer.
- if closeTimer == nil {
- closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
- }
- }
- }
-
- if n&notifyKeepaliveChanged != 0 {
- // The timer could fire in background
- // when the endpoint is drained. That's
- // OK. See above.
- e.resetKeepaliveTimer(true)
- }
-
- if n&notifyDrain != 0 {
- for !e.segmentQueue.empty() {
- if err := e.handleSegmentsLocked(false /* fastPath */); err != nil {
- return err
- }
- }
- if !e.EndpointState().closed() {
- // Only block the worker if the endpoint
- // is not in closed state or error state.
- close(e.drainDone)
- e.mu.Unlock() // +checklocksforce
- <-e.undrain
- e.mu.Lock()
- }
- }
-
- if n&notifyTickleWorker != 0 {
- // Just a tickle notification. No need to do
- // anything.
- return nil
- }
-
- return nil
- },
- },
- {
- w: &e.snd.reorderWaker,
- f: e.snd.rc.reorderTimerExpired,
- },
- }
-
- // Initialize the sleeper based on the wakers in funcs.
+ // Add all wakers.
var s sleep.Sleeper
- for i := range funcs {
- s.AddWaker(funcs[i].w, i)
- }
+ s.AddWaker(&e.sndQueueInfo.sndWaker)
+ s.AddWaker(&e.newSegmentWaker)
+ s.AddWaker(&e.snd.resendWaker)
+ s.AddWaker(&e.snd.probeWaker)
+ s.AddWaker(&closeWaker)
+ s.AddWaker(&e.keepalive.waker)
+ s.AddWaker(&e.notificationWaker)
+ s.AddWaker(&e.snd.reorderWaker)
// Notify the caller that the waker initialization is complete and the
// endpoint is ready.
@@ -1581,7 +1547,7 @@ loop:
}
e.mu.Unlock()
- v, _ := s.Fetch(true /* block */)
+ w := s.Fetch(true /* block */)
e.mu.Lock()
// We need to double check here because the notification may be
@@ -1601,7 +1567,7 @@ loop:
case StateClose:
break loop
default:
- if err := funcs[v].f(); err != nil {
+ if err := e.handleWakeup(w, &closeWaker, &closeTimer); err != nil {
cleanupOnError(err)
e.protocolMainLoopDone(closeTimer)
return
@@ -1714,26 +1680,22 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
timeWaitDuration = time.Duration(tcpTW)
}
- const newSegment = 1
- const notification = 2
- const timeWaitDone = 3
-
var s sleep.Sleeper
defer s.Done()
- s.AddWaker(&e.newSegmentWaker, newSegment)
- s.AddWaker(&e.notificationWaker, notification)
+ s.AddWaker(&e.newSegmentWaker)
+ s.AddWaker(&e.notificationWaker)
var timeWaitWaker sleep.Waker
- s.AddWaker(&timeWaitWaker, timeWaitDone)
+ s.AddWaker(&timeWaitWaker)
timeWaitTimer := e.stack.Clock().AfterFunc(timeWaitDuration, timeWaitWaker.Assert)
defer timeWaitTimer.Stop()
for {
e.mu.Unlock()
- v, _ := s.Fetch(true /* block */)
+ w := s.Fetch(true /* block */)
e.mu.Lock()
- switch v {
- case newSegment:
+ switch w {
+ case &e.newSegmentWaker:
extendTimeWait, reuseTW := e.handleTimeWaitSegments()
if reuseTW != nil {
return reuseTW
@@ -1741,7 +1703,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
if extendTimeWait {
timeWaitTimer.Reset(timeWaitDuration)
}
- case notification:
+ case &e.notificationWaker:
n := e.fetchNotifications()
if n&notifyAbort != 0 {
return nil
@@ -1759,7 +1721,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
e.mu.Lock()
return nil
}
- case timeWaitDone:
+ case &timeWaitWaker:
return nil
}
}
diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go
index 7d110516b..2e93d2664 100644
--- a/pkg/tcpip/transport/tcp/dispatcher.go
+++ b/pkg/tcpip/transport/tcp/dispatcher.go
@@ -94,9 +94,10 @@ func (p *processor) start(wg *sync.WaitGroup) {
defer p.sleeper.Done()
for {
- if id, _ := p.sleeper.Fetch(true); id == closeWaker {
+ if w := p.sleeper.Fetch(true); w == &p.closeWaker {
break
}
+ // If not the closeWaker, it must be &p.newEndpointWaker.
for {
ep := p.epQ.dequeue()
if ep == nil {
@@ -154,8 +155,8 @@ func (d *dispatcher) init(rng *rand.Rand, nProcessors int) {
d.seed = rng.Uint32()
for i := range d.processors {
p := &d.processors[i]
- p.sleeper.AddWaker(&p.newEndpointWaker, newEndpointWaker)
- p.sleeper.AddWaker(&p.closeWaker, closeWaker)
+ p.sleeper.AddWaker(&p.newEndpointWaker)
+ p.sleeper.AddWaker(&p.closeWaker)
d.wg.Add(1)
// NB: sleeper-waker registration must happen synchronously to avoid races
// with `close`. It's possible to pull all this logic into `start`, but