summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip')
-rw-r--r--pkg/tcpip/link/qdisc/fifo/endpoint.go11
-rw-r--r--pkg/tcpip/transport/tcp/accept.go13
-rw-r--r--pkg/tcpip/transport/tcp/connect.go286
-rw-r--r--pkg/tcpip/transport/tcp/dispatcher.go7
4 files changed, 140 insertions, 177 deletions
diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go
index b41e3e2fa..c15cbf81b 100644
--- a/pkg/tcpip/link/qdisc/fifo/endpoint.go
+++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go
@@ -73,20 +73,19 @@ func New(lower stack.LinkEndpoint, n int, queueLen int) stack.LinkEndpoint {
}
func (q *queueDispatcher) dispatchLoop() {
- const newPacketWakerID = 1
- const closeWakerID = 2
s := sleep.Sleeper{}
- s.AddWaker(&q.newPacketWaker, newPacketWakerID)
- s.AddWaker(&q.closeWaker, closeWakerID)
+ s.AddWaker(&q.newPacketWaker)
+ s.AddWaker(&q.closeWaker)
defer s.Done()
const batchSize = 32
var batch stack.PacketBufferList
for {
- id, ok := s.Fetch(true)
- if ok && id == closeWakerID {
+ w := s.Fetch(true)
+ if w == &q.closeWaker {
return
}
+ // Must otherwise be the newPacketWaker.
for pkt := q.q.dequeue(); pkt != nil; pkt = q.q.dequeue() {
batch.PushBack(pkt)
if batch.Len() < batchSize && !q.q.empty() {
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index caf14b0dc..d0f68b72c 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -762,14 +762,15 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) {
}()
var s sleep.Sleeper
- s.AddWaker(&e.notificationWaker, wakerForNotification)
- s.AddWaker(&e.newSegmentWaker, wakerForNewSegment)
+ s.AddWaker(&e.notificationWaker)
+ s.AddWaker(&e.newSegmentWaker)
+ defer s.Done()
for {
e.mu.Unlock()
- index, _ := s.Fetch(true)
+ w := s.Fetch(true)
e.mu.Lock()
- switch index {
- case wakerForNotification:
+ switch w {
+ case &e.notificationWaker:
n := e.fetchNotifications()
if n&notifyClose != 0 {
return
@@ -788,7 +789,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) {
e.mu.Lock()
}
- case wakerForNewSegment:
+ case &e.newSegmentWaker:
// Process at most maxSegmentsPerWake segments.
mayRequeue := true
for i := 0; i < maxSegmentsPerWake; i++ {
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 80cd07218..12df7a7b4 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -51,13 +51,6 @@ const (
handshakeCompleted
)
-// The following are used to set up sleepers.
-const (
- wakerForNotification = iota
- wakerForNewSegment
- wakerForResend
-)
-
const (
// Maximum space available for options.
maxOptionSize = 40
@@ -530,9 +523,9 @@ func (h *handshake) complete() tcpip.Error {
// Set up the wakers.
var s sleep.Sleeper
resendWaker := sleep.Waker{}
- s.AddWaker(&resendWaker, wakerForResend)
- s.AddWaker(&h.ep.notificationWaker, wakerForNotification)
- s.AddWaker(&h.ep.newSegmentWaker, wakerForNewSegment)
+ s.AddWaker(&resendWaker)
+ s.AddWaker(&h.ep.notificationWaker)
+ s.AddWaker(&h.ep.newSegmentWaker)
defer s.Done()
// Initialize the resend timer.
@@ -545,11 +538,10 @@ func (h *handshake) complete() tcpip.Error {
// Unlock before blocking, and reacquire again afterwards (h.ep.mu is held
// throughout handshake processing).
h.ep.mu.Unlock()
- index, _ := s.Fetch(true /* block */)
+ w := s.Fetch(true /* block */)
h.ep.mu.Lock()
- switch index {
-
- case wakerForResend:
+ switch w {
+ case &resendWaker:
if err := timer.reset(); err != nil {
return err
}
@@ -577,7 +569,7 @@ func (h *handshake) complete() tcpip.Error {
h.sampleRTTWithTSOnly = true
}
- case wakerForNotification:
+ case &h.ep.notificationWaker:
n := h.ep.fetchNotifications()
if (n&notifyClose)|(n&notifyAbort) != 0 {
return &tcpip.ErrAborted{}
@@ -611,7 +603,7 @@ func (h *handshake) complete() tcpip.Error {
// cleared because of a socket layer call.
return &tcpip.ErrConnectionAborted{}
}
- case wakerForNewSegment:
+ case &h.ep.newSegmentWaker:
if err := h.processSegments(); err != nil {
return err
}
@@ -1346,6 +1338,103 @@ func (e *endpoint) protocolMainLoopDone(closeTimer tcpip.Timer) {
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
+// handleWakeup handles a wakeup event while connected.
+//
+// +checklocks:e.mu
+func (e *endpoint) handleWakeup(w, closeWaker *sleep.Waker, closeTimer *tcpip.Timer) tcpip.Error {
+ switch w {
+ case &e.sndQueueInfo.sndWaker:
+ e.sendData(nil /* next */)
+ case &e.newSegmentWaker:
+ return e.handleSegmentsLocked(false /* fastPath */)
+ case &e.snd.resendWaker:
+ if !e.snd.retransmitTimerExpired() {
+ e.stack.Stats().TCP.EstablishedTimedout.Increment()
+ return &tcpip.ErrTimeout{}
+ }
+ case closeWaker:
+ // This means the socket is being closed due to the
+ // TCP-FIN-WAIT2 timeout was hit. Just mark the socket as
+ // closed.
+ e.transitionToStateCloseLocked()
+ e.workerCleanup = true
+ case &e.snd.probeWaker:
+ return e.snd.probeTimerExpired()
+ case &e.keepalive.waker:
+ return e.keepaliveTimerExpired()
+ case &e.notificationWaker:
+ n := e.fetchNotifications()
+ if n&notifyNonZeroReceiveWindow != 0 {
+ e.rcv.nonZeroWindow()
+ }
+
+ if n&notifyMTUChanged != 0 {
+ e.sndQueueInfo.sndQueueMu.Lock()
+ count := e.sndQueueInfo.PacketTooBigCount
+ e.sndQueueInfo.PacketTooBigCount = 0
+ mtu := e.sndQueueInfo.SndMTU
+ e.sndQueueInfo.sndQueueMu.Unlock()
+
+ e.snd.updateMaxPayloadSize(mtu, count)
+ }
+
+ if n&notifyReset != 0 || n&notifyAbort != 0 {
+ return &tcpip.ErrConnectionAborted{}
+ }
+
+ if n&notifyResetByPeer != 0 {
+ return &tcpip.ErrConnectionReset{}
+ }
+
+ if n&notifyClose != 0 && e.closed {
+ switch e.EndpointState() {
+ case StateEstablished:
+ // Perform full shutdown if the endpoint is
+ // still established. This can occur when
+ // notifyClose was asserted just before
+ // becoming established.
+ e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead)
+ case StateFinWait2:
+ // The socket has been closed and we are in
+ // FIN_WAIT2 so start the FIN_WAIT2 timer.
+ if *closeTimer == nil {
+ *closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
+ }
+ }
+ }
+
+ if n&notifyKeepaliveChanged != 0 {
+ // The timer could fire in background when the endpoint
+ // is drained. That's OK. See above.
+ e.resetKeepaliveTimer(true)
+ }
+
+ if n&notifyDrain != 0 {
+ for !e.segmentQueue.empty() {
+ if err := e.handleSegmentsLocked(false /* fastPath */); err != nil {
+ return err
+ }
+ }
+ if !e.EndpointState().closed() {
+ // Only block the worker if the endpoint
+ // is not in closed state or error state.
+ close(e.drainDone)
+ e.mu.Unlock()
+ <-e.undrain
+ e.mu.Lock()
+ }
+ }
+
+ // N.B. notifyTickleWorker may be set, but there is no action
+ // to take in this case.
+ case &e.snd.reorderWaker:
+ return e.snd.rc.reorderTimerExpired()
+ default:
+ panic("unknown waker") // Shouldn't happen.
+ }
+ return nil
+}
+
// protocolMainLoop is the main loop of the TCP protocol. It runs in its own
// goroutine and is responsible for sending segments and handling received
// segments.
@@ -1403,139 +1492,16 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
e.mu.Lock()
}
- // Set up the functions that will be called when the main protocol loop
- // wakes up.
- funcs := []struct {
- w *sleep.Waker
- f func() tcpip.Error
- }{
- {
- w: &e.sndQueueInfo.sndWaker,
- f: func() tcpip.Error {
- e.sendData(nil /* next */)
- return nil
- },
- },
- {
- w: &closeWaker,
- f: func() tcpip.Error {
- // This means the socket is being closed due
- // to the TCP-FIN-WAIT2 timeout was hit. Just
- // mark the socket as closed.
- e.transitionToStateCloseLocked()
- e.workerCleanup = true
- return nil
- },
- },
- {
- w: &e.snd.resendWaker,
- f: func() tcpip.Error {
- if !e.snd.retransmitTimerExpired() {
- e.stack.Stats().TCP.EstablishedTimedout.Increment()
- return &tcpip.ErrTimeout{}
- }
- return nil
- },
- },
- {
- w: &e.snd.probeWaker,
- f: e.snd.probeTimerExpired,
- },
- {
- w: &e.newSegmentWaker,
- f: func() tcpip.Error {
- return e.handleSegmentsLocked(false /* fastPath */)
- },
- },
- {
- w: &e.keepalive.waker,
- f: e.keepaliveTimerExpired,
- },
- {
- w: &e.notificationWaker,
- f: func() tcpip.Error {
- n := e.fetchNotifications()
- if n&notifyNonZeroReceiveWindow != 0 {
- e.rcv.nonZeroWindow()
- }
-
- if n&notifyMTUChanged != 0 {
- e.sndQueueInfo.sndQueueMu.Lock()
- count := e.sndQueueInfo.PacketTooBigCount
- e.sndQueueInfo.PacketTooBigCount = 0
- mtu := e.sndQueueInfo.SndMTU
- e.sndQueueInfo.sndQueueMu.Unlock()
-
- e.snd.updateMaxPayloadSize(mtu, count)
- }
-
- if n&notifyReset != 0 || n&notifyAbort != 0 {
- return &tcpip.ErrConnectionAborted{}
- }
-
- if n&notifyResetByPeer != 0 {
- return &tcpip.ErrConnectionReset{}
- }
-
- if n&notifyClose != 0 && e.closed {
- switch e.EndpointState() {
- case StateEstablished:
- // Perform full shutdown if the endpoint is still
- // established. This can occur when notifyClose
- // was asserted just before becoming established.
- e.shutdownLocked(tcpip.ShutdownWrite | tcpip.ShutdownRead)
- case StateFinWait2:
- // The socket has been closed and we are in FIN_WAIT2
- // so start the FIN_WAIT2 timer.
- if closeTimer == nil {
- closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
- }
- }
- }
-
- if n&notifyKeepaliveChanged != 0 {
- // The timer could fire in background
- // when the endpoint is drained. That's
- // OK. See above.
- e.resetKeepaliveTimer(true)
- }
-
- if n&notifyDrain != 0 {
- for !e.segmentQueue.empty() {
- if err := e.handleSegmentsLocked(false /* fastPath */); err != nil {
- return err
- }
- }
- if !e.EndpointState().closed() {
- // Only block the worker if the endpoint
- // is not in closed state or error state.
- close(e.drainDone)
- e.mu.Unlock() // +checklocksforce
- <-e.undrain
- e.mu.Lock()
- }
- }
-
- if n&notifyTickleWorker != 0 {
- // Just a tickle notification. No need to do
- // anything.
- return nil
- }
-
- return nil
- },
- },
- {
- w: &e.snd.reorderWaker,
- f: e.snd.rc.reorderTimerExpired,
- },
- }
-
- // Initialize the sleeper based on the wakers in funcs.
+ // Add all wakers.
var s sleep.Sleeper
- for i := range funcs {
- s.AddWaker(funcs[i].w, i)
- }
+ s.AddWaker(&e.sndQueueInfo.sndWaker)
+ s.AddWaker(&e.newSegmentWaker)
+ s.AddWaker(&e.snd.resendWaker)
+ s.AddWaker(&e.snd.probeWaker)
+ s.AddWaker(&closeWaker)
+ s.AddWaker(&e.keepalive.waker)
+ s.AddWaker(&e.notificationWaker)
+ s.AddWaker(&e.snd.reorderWaker)
// Notify the caller that the waker initialization is complete and the
// endpoint is ready.
@@ -1581,7 +1547,7 @@ loop:
}
e.mu.Unlock()
- v, _ := s.Fetch(true /* block */)
+ w := s.Fetch(true /* block */)
e.mu.Lock()
// We need to double check here because the notification may be
@@ -1601,7 +1567,7 @@ loop:
case StateClose:
break loop
default:
- if err := funcs[v].f(); err != nil {
+ if err := e.handleWakeup(w, &closeWaker, &closeTimer); err != nil {
cleanupOnError(err)
e.protocolMainLoopDone(closeTimer)
return
@@ -1714,26 +1680,22 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
timeWaitDuration = time.Duration(tcpTW)
}
- const newSegment = 1
- const notification = 2
- const timeWaitDone = 3
-
var s sleep.Sleeper
defer s.Done()
- s.AddWaker(&e.newSegmentWaker, newSegment)
- s.AddWaker(&e.notificationWaker, notification)
+ s.AddWaker(&e.newSegmentWaker)
+ s.AddWaker(&e.notificationWaker)
var timeWaitWaker sleep.Waker
- s.AddWaker(&timeWaitWaker, timeWaitDone)
+ s.AddWaker(&timeWaitWaker)
timeWaitTimer := e.stack.Clock().AfterFunc(timeWaitDuration, timeWaitWaker.Assert)
defer timeWaitTimer.Stop()
for {
e.mu.Unlock()
- v, _ := s.Fetch(true /* block */)
+ w := s.Fetch(true /* block */)
e.mu.Lock()
- switch v {
- case newSegment:
+ switch w {
+ case &e.newSegmentWaker:
extendTimeWait, reuseTW := e.handleTimeWaitSegments()
if reuseTW != nil {
return reuseTW
@@ -1741,7 +1703,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
if extendTimeWait {
timeWaitTimer.Reset(timeWaitDuration)
}
- case notification:
+ case &e.notificationWaker:
n := e.fetchNotifications()
if n&notifyAbort != 0 {
return nil
@@ -1759,7 +1721,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
e.mu.Lock()
return nil
}
- case timeWaitDone:
+ case &timeWaitWaker:
return nil
}
}
diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go
index 7d110516b..2e93d2664 100644
--- a/pkg/tcpip/transport/tcp/dispatcher.go
+++ b/pkg/tcpip/transport/tcp/dispatcher.go
@@ -94,9 +94,10 @@ func (p *processor) start(wg *sync.WaitGroup) {
defer p.sleeper.Done()
for {
- if id, _ := p.sleeper.Fetch(true); id == closeWaker {
+ if w := p.sleeper.Fetch(true); w == &p.closeWaker {
break
}
+ // If not the closeWaker, it must be &p.newEndpointWaker.
for {
ep := p.epQ.dequeue()
if ep == nil {
@@ -154,8 +155,8 @@ func (d *dispatcher) init(rng *rand.Rand, nProcessors int) {
d.seed = rng.Uint32()
for i := range d.processors {
p := &d.processors[i]
- p.sleeper.AddWaker(&p.newEndpointWaker, newEndpointWaker)
- p.sleeper.AddWaker(&p.closeWaker, closeWaker)
+ p.sleeper.AddWaker(&p.newEndpointWaker)
+ p.sleeper.AddWaker(&p.closeWaker)
d.wg.Add(1)
// NB: sleeper-waker registration must happen synchronously to avoid races
// with `close`. It's possible to pull all this logic into `start`, but