summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip
diff options
context:
space:
mode:
authorAdin Scannell <ascannell@google.com>2021-11-04 18:50:20 -0700
committergVisor bot <gvisor-bot@google.com>2021-11-04 18:53:15 -0700
commitd80af5f8b58d2bfe23d57e133a8d35eaed59fa13 (patch)
tree31d00204bdd4be118ddc5409ad1155b78c196938 /pkg/tcpip
parentfe8e48fc6d5094fe34783b1040b2ae4ba05349b5 (diff)
Remove id from sleep.Sleeper API.
In a subsequent change, the Sleeper API will be plumbed through and used for arbitrary task wakeups. This requires a non-static association of Wakers and Sleepers, which means that a fixed ID no longer works. This is a relatively simple change that removes the ID from the Waker association, and simply uses the Waker pointer itself. That change also makes minor improvements to the tests to ensure that the benchmarks are more representative by removing goroutine start from the hot path (and uses Wakers for required synchronization), adds assertion checks to AddWaker, and clears relevant fields during Done (to allow assertions to pass). PiperOrigin-RevId: 407719630
Diffstat (limited to 'pkg/tcpip')
-rw-r--r--pkg/tcpip/link/qdisc/fifo/endpoint.go11
-rw-r--r--pkg/tcpip/transport/tcp/accept.go13
-rw-r--r--pkg/tcpip/transport/tcp/connect.go286
-rw-r--r--pkg/tcpip/transport/tcp/dispatcher.go7
4 files changed, 140 insertions, 177 deletions
diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go
index b41e3e2fa..c15cbf81b 100644
--- a/pkg/tcpip/link/qdisc/fifo/endpoint.go
+++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go
@@ -73,20 +73,19 @@ func New(lower stack.LinkEndpoint, n int, queueLen int) stack.LinkEndpoint {
}
func (q *queueDispatcher) dispatchLoop() {
- const newPacketWakerID = 1
- const closeWakerID = 2
s := sleep.Sleeper{}
- s.AddWaker(&q.newPacketWaker, newPacketWakerID)
- s.AddWaker(&q.closeWaker, closeWakerID)
+ s.AddWaker(&q.newPacketWaker)
+ s.AddWaker(&q.closeWaker)
defer s.Done()
const batchSize = 32
var batch stack.PacketBufferList
for {
- id, ok := s.Fetch(true)
- if ok && id == closeWakerID {
+ w := s.Fetch(true)
+ if w == &q.closeWaker {
return
}
+ // Must otherwise be the newPacketWaker.
for pkt := q.q.dequeue(); pkt != nil; pkt = q.q.dequeue() {
batch.PushBack(pkt)
if batch.Len() < batchSize && !q.q.empty() {
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index caf14b0dc..d0f68b72c 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -762,14 +762,15 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) {
}()
var s sleep.Sleeper
- s.AddWaker(&e.notificationWaker, wakerForNotification)
- s.AddWaker(&e.newSegmentWaker, wakerForNewSegment)
+ s.AddWaker(&e.notificationWaker)
+ s.AddWaker(&e.newSegmentWaker)
+ defer s.Done()
for {
e.mu.Unlock()
- index, _ := s.Fetch(true)
+ w := s.Fetch(true)
e.mu.Lock()
- switch index {
- case wakerForNotification:
+ switch w {
+ case &e.notificationWaker:
n := e.fetchNotifications()
if n&notifyClose != 0 {
return
@@ -788,7 +789,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) {
e.mu.Lock()
}
- case wakerForNewSegment:
+ case &e.newSegmentWaker:
// Process at most maxSegmentsPerWake segments.
mayRequeue := true
for i := 0; i < maxSegmentsPerWake; i++ {
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 80cd07218..12df7a7b4 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -51,13 +51,6 @@ const (
handshakeCompleted
)
-// The following are used to set up sleepers.
-const (
- wakerForNotification = iota
- wakerForNewSegment
- wakerForResend
-)
-
const (
// Maximum space available for options.
maxOptionSize = 40
@@ -530,9 +523,9 @@ func (h *handshake) complete() tcpip.Error {
// Set up the wakers.
var s sleep.Sleeper
resendWaker := sleep.Waker{}
- s.AddWaker(&resendWaker, wakerForResend)
- s.AddWaker(&h.ep.notificationWaker, wakerForNotification)
- s.AddWaker(&h.ep.newSegmentWaker, wakerForNewSegment)
+ s.AddWaker(&resendWaker)
+ s.AddWaker(&h.ep.notificationWaker)
+ s.AddWaker(&h.ep.newSegmentWaker)
defer s.Done()
// Initialize the resend timer.
@@ -545,11 +538,10 @@ func (h *handshake) complete() tcpip.Error {
// Unlock before blocking, and reacquire again afterwards (h.ep.mu is held
// throughout handshake processing).
h.ep.mu.Unlock()
- index, _ := s.Fetch(true /* block */)
+ w := s.Fetch(true /* block */)
h.ep.mu.Lock()
- switch index {
-
- case wakerForResend:
+ switch w {
+ case &resendWaker:
if err := timer.reset(); err != nil {
return err
}
@@ -577,7 +569,7 @@ func (h *handshake) complete() tcpip.Error {
h.sampleRTTWithTSOnly = true
}
- case wakerForNotification:
+ case &h.ep.notificationWaker:
n := h.ep.fetchNotifications()
if (n&notifyClose)|(n&notifyAbort) != 0 {
return &tcpip.ErrAborted{}
@@ -611,7 +603,7 @@ func (h *handshake) complete() tcpip.Error {
// cleared because of a socket layer call.
return &tcpip.ErrConnectionAborted{}
}
- case wakerForNewSegment:
+ case &h.ep.newSegmentWaker:
if err := h.processSegments(); err != nil {
return err
}
@@ -1346,6 +1338,103 @@ func (e *endpoint) protocolMainLoopDone(closeTimer tcpip.Timer) {
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
+// handleWakeup handles a wakeup event while connected.
+//
+// +checklocks:e.mu
+func (e *endpoint) handleWakeup(w, closeWaker *sleep.Waker, closeTimer *tcpip.Timer) tcpip.Error {
+ switch w {
+ case &e.sndQueueInfo.sndWaker:
+ e.sendData(nil /* next */)
+ case &e.newSegmentWaker:
+ return e.handleSegmentsLocked(false /* fastPath */)
+ case &e.snd.resendWaker:
+ if !e.snd.retransmitTimerExpired() {
+ e.stack.Stats().TCP.EstablishedTimedout.Increment()
+ return &tcpip.ErrTimeout{}
+ }
+ case closeWaker:
+ // This means the socket is being closed due to the
+ // TCP-FIN-WAIT2 timeout was hit. Just mark the socket as
+ // closed.
+ e.transitionToStateCloseLocked()
+ e.workerCleanup = true
+ case &e.snd.probeWaker:
+ return e.snd.probeTimerExpired()
+ case &e.keepalive.waker:
+ return e.keepaliveTimerExpired()
+ case &e.notificationWaker:
+ n := e.fetchNotifications()
+ if n&notifyNonZeroReceiveWindow != 0 {
+ e.rcv.nonZeroWindow()
+ }
+
+ if n&notifyMTUChanged != 0 {
+ e.sndQueueInfo.sndQueueMu.Lock()
+ count := e.sndQueueInfo.PacketTooBigCount
+ e.sndQueueInfo.PacketTooBigCount = 0
+ mtu := e.sndQueueInfo.SndMTU
+ e.sndQueueInfo.sndQueueMu.Unlock()
+
+ e.snd.updateMaxPayloadSize(mtu, count)
+ }
+
+ if n&notifyReset != 0 || n&notifyAbort != 0 {
+ return &tcpip.ErrConnectionAborted{}
+ }
+
+ if n&notifyResetByPeer != 0 {
+ return &tcpip.ErrConnectionReset{}
+ }
+
+ if n&notifyClose != 0 && e.closed {
+ switch e.EndpointState() {
+ case StateEstablished:
+ // Perform full shutdown if the endpoint is
+ // still established. This can occur when
+ // notifyClose was asserted just before
+ // becoming established.
+ e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead)
+ case StateFinWait2:
+ // The socket has been closed and we are in
+ // FIN_WAIT2 so start the FIN_WAIT2 timer.
+ if *closeTimer == nil {
+ *closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
+ }
+ }
+ }
+
+ if n&notifyKeepaliveChanged != 0 {
+ // The timer could fire in background when the endpoint
+ // is drained. That's OK. See above.
+ e.resetKeepaliveTimer(true)
+ }
+
+ if n&notifyDrain != 0 {
+ for !e.segmentQueue.empty() {
+ if err := e.handleSegmentsLocked(false /* fastPath */); err != nil {
+ return err
+ }
+ }
+ if !e.EndpointState().closed() {
+ // Only block the worker if the endpoint
+ // is not in closed state or error state.
+ close(e.drainDone)
+ e.mu.Unlock()
+ <-e.undrain
+ e.mu.Lock()
+ }
+ }
+
+ // N.B. notifyTickleWorker may be set, but there is no action
+ // to take in this case.
+ case &e.snd.reorderWaker:
+ return e.snd.rc.reorderTimerExpired()
+ default:
+ panic("unknown waker") // Shouldn't happen.
+ }
+ return nil
+}
+
// protocolMainLoop is the main loop of the TCP protocol. It runs in its own
// goroutine and is responsible for sending segments and handling received
// segments.
@@ -1403,139 +1492,16 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
e.mu.Lock()
}
- // Set up the functions that will be called when the main protocol loop
- // wakes up.
- funcs := []struct {
- w *sleep.Waker
- f func() tcpip.Error
- }{
- {
- w: &e.sndQueueInfo.sndWaker,
- f: func() tcpip.Error {
- e.sendData(nil /* next */)
- return nil
- },
- },
- {
- w: &closeWaker,
- f: func() tcpip.Error {
- // This means the socket is being closed due
- // to the TCP-FIN-WAIT2 timeout was hit. Just
- // mark the socket as closed.
- e.transitionToStateCloseLocked()
- e.workerCleanup = true
- return nil
- },
- },
- {
- w: &e.snd.resendWaker,
- f: func() tcpip.Error {
- if !e.snd.retransmitTimerExpired() {
- e.stack.Stats().TCP.EstablishedTimedout.Increment()
- return &tcpip.ErrTimeout{}
- }
- return nil
- },
- },
- {
- w: &e.snd.probeWaker,
- f: e.snd.probeTimerExpired,
- },
- {
- w: &e.newSegmentWaker,
- f: func() tcpip.Error {
- return e.handleSegmentsLocked(false /* fastPath */)
- },
- },
- {
- w: &e.keepalive.waker,
- f: e.keepaliveTimerExpired,
- },
- {
- w: &e.notificationWaker,
- f: func() tcpip.Error {
- n := e.fetchNotifications()
- if n&notifyNonZeroReceiveWindow != 0 {
- e.rcv.nonZeroWindow()
- }
-
- if n&notifyMTUChanged != 0 {
- e.sndQueueInfo.sndQueueMu.Lock()
- count := e.sndQueueInfo.PacketTooBigCount
- e.sndQueueInfo.PacketTooBigCount = 0
- mtu := e.sndQueueInfo.SndMTU
- e.sndQueueInfo.sndQueueMu.Unlock()
-
- e.snd.updateMaxPayloadSize(mtu, count)
- }
-
- if n&notifyReset != 0 || n&notifyAbort != 0 {
- return &tcpip.ErrConnectionAborted{}
- }
-
- if n&notifyResetByPeer != 0 {
- return &tcpip.ErrConnectionReset{}
- }
-
- if n&notifyClose != 0 && e.closed {
- switch e.EndpointState() {
- case StateEstablished:
- // Perform full shutdown if the endpoint is still
- // established. This can occur when notifyClose
- // was asserted just before becoming established.
- e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead)
- case StateFinWait2:
- // The socket has been closed and we are in FIN_WAIT2
- // so start the FIN_WAIT2 timer.
- if closeTimer == nil {
- closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
- }
- }
- }
-
- if n&notifyKeepaliveChanged != 0 {
- // The timer could fire in background
- // when the endpoint is drained. That's
- // OK. See above.
- e.resetKeepaliveTimer(true)
- }
-
- if n&notifyDrain != 0 {
- for !e.segmentQueue.empty() {
- if err := e.handleSegmentsLocked(false /* fastPath */); err != nil {
- return err
- }
- }
- if !e.EndpointState().closed() {
- // Only block the worker if the endpoint
- // is not in closed state or error state.
- close(e.drainDone)
- e.mu.Unlock() // +checklocksforce
- <-e.undrain
- e.mu.Lock()
- }
- }
-
- if n&notifyTickleWorker != 0 {
- // Just a tickle notification. No need to do
- // anything.
- return nil
- }
-
- return nil
- },
- },
- {
- w: &e.snd.reorderWaker,
- f: e.snd.rc.reorderTimerExpired,
- },
- }
-
- // Initialize the sleeper based on the wakers in funcs.
+ // Add all wakers.
var s sleep.Sleeper
- for i := range funcs {
- s.AddWaker(funcs[i].w, i)
- }
+ s.AddWaker(&e.sndQueueInfo.sndWaker)
+ s.AddWaker(&e.newSegmentWaker)
+ s.AddWaker(&e.snd.resendWaker)
+ s.AddWaker(&e.snd.probeWaker)
+ s.AddWaker(&closeWaker)
+ s.AddWaker(&e.keepalive.waker)
+ s.AddWaker(&e.notificationWaker)
+ s.AddWaker(&e.snd.reorderWaker)
// Notify the caller that the waker initialization is complete and the
// endpoint is ready.
@@ -1581,7 +1547,7 @@ loop:
}
e.mu.Unlock()
- v, _ := s.Fetch(true /* block */)
+ w := s.Fetch(true /* block */)
e.mu.Lock()
// We need to double check here because the notification may be
@@ -1601,7 +1567,7 @@ loop:
case StateClose:
break loop
default:
- if err := funcs[v].f(); err != nil {
+ if err := e.handleWakeup(w, &closeWaker, &closeTimer); err != nil {
cleanupOnError(err)
e.protocolMainLoopDone(closeTimer)
return
@@ -1714,26 +1680,22 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
timeWaitDuration = time.Duration(tcpTW)
}
- const newSegment = 1
- const notification = 2
- const timeWaitDone = 3
-
var s sleep.Sleeper
defer s.Done()
- s.AddWaker(&e.newSegmentWaker, newSegment)
- s.AddWaker(&e.notificationWaker, notification)
+ s.AddWaker(&e.newSegmentWaker)
+ s.AddWaker(&e.notificationWaker)
var timeWaitWaker sleep.Waker
- s.AddWaker(&timeWaitWaker, timeWaitDone)
+ s.AddWaker(&timeWaitWaker)
timeWaitTimer := e.stack.Clock().AfterFunc(timeWaitDuration, timeWaitWaker.Assert)
defer timeWaitTimer.Stop()
for {
e.mu.Unlock()
- v, _ := s.Fetch(true /* block */)
+ w := s.Fetch(true /* block */)
e.mu.Lock()
- switch v {
- case newSegment:
+ switch w {
+ case &e.newSegmentWaker:
extendTimeWait, reuseTW := e.handleTimeWaitSegments()
if reuseTW != nil {
return reuseTW
@@ -1741,7 +1703,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
if extendTimeWait {
timeWaitTimer.Reset(timeWaitDuration)
}
- case notification:
+ case &e.notificationWaker:
n := e.fetchNotifications()
if n&notifyAbort != 0 {
return nil
@@ -1759,7 +1721,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
e.mu.Lock()
return nil
}
- case timeWaitDone:
+ case &timeWaitWaker:
return nil
}
}
diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go
index 7d110516b..2e93d2664 100644
--- a/pkg/tcpip/transport/tcp/dispatcher.go
+++ b/pkg/tcpip/transport/tcp/dispatcher.go
@@ -94,9 +94,10 @@ func (p *processor) start(wg *sync.WaitGroup) {
defer p.sleeper.Done()
for {
- if id, _ := p.sleeper.Fetch(true); id == closeWaker {
+ if w := p.sleeper.Fetch(true); w == &p.closeWaker {
break
}
+ // If not the closeWaker, it must be &p.newEndpointWaker.
for {
ep := p.epQ.dequeue()
if ep == nil {
@@ -154,8 +155,8 @@ func (d *dispatcher) init(rng *rand.Rand, nProcessors int) {
d.seed = rng.Uint32()
for i := range d.processors {
p := &d.processors[i]
- p.sleeper.AddWaker(&p.newEndpointWaker, newEndpointWaker)
- p.sleeper.AddWaker(&p.closeWaker, closeWaker)
+ p.sleeper.AddWaker(&p.newEndpointWaker)
+ p.sleeper.AddWaker(&p.closeWaker)
d.wg.Add(1)
// NB: sleeper-waker registration must happen synchronously to avoid races
// with `close`. It's possible to pull all this logic into `start`, but