diff options
author | Josh Bleecher Snyder <josh@tailscale.com> | 2020-12-15 15:54:48 -0800 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2021-01-07 14:49:44 +0100 |
commit | 2832e96339b4b847172741e9252020fc7bfa59af (patch) | |
tree | 782633f9de39040f88930c905bca6d803a870f9e | |
parent | 63066ce4062a85224821ce302e3eb8c34e95a658 (diff) |
device: use channel close to shut down and drain outbound channel
This is a similar treatment to the handling of the encryption
channel found a few commits ago: Use the closing of the channel
to manage goroutine lifetime and shutdown.
It is considerably simpler because there is only a single writer.
Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
-rw-r--r-- | device/peer.go | 3 | ||||
-rw-r--r-- | device/send.go | 86 |
2 files changed, 34 insertions, 55 deletions
diff --git a/device/peer.go b/device/peer.go index 31b75c7..c094160 100644 --- a/device/peer.go +++ b/device/peer.go @@ -17,7 +17,7 @@ import ( ) const ( - PeerRoutineNumber = 3 + PeerRoutineNumber = 2 ) type Peer struct { @@ -287,7 +287,6 @@ func (peer *Peer) Stop() { peer.queue.Lock() close(peer.queue.nonce) - close(peer.queue.outbound) close(peer.queue.inbound) peer.queue.Unlock() diff --git a/device/send.go b/device/send.go index 1b16edd..1f71f79 100644 --- a/device/send.go +++ b/device/send.go @@ -372,6 +372,7 @@ func (peer *Peer) RoutineNonce() { logDebug.Println(peer, "- Routine: nonce worker - stopped") peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) device.queue.encryption.wg.Done() // no more writes from us + close(peer.queue.outbound) // no more writes to this channel peer.routines.stopping.Done() }() @@ -545,64 +546,43 @@ func (peer *Peer) RoutineSequentialSender() { logDebug := device.log.Debug logError := device.log.Error - defer func() { - for { - select { - case elem, ok := <-peer.queue.outbound: - if ok { - elem.Lock() - if !elem.IsDropped() { - device.PutMessageBuffer(elem.buffer) - elem.Drop() - } - device.PutOutboundElement(elem) - } - default: - goto out - } - } - out: - logDebug.Println(peer, "- Routine: sequential sender - stopped") - peer.routines.stopping.Done() - }() - + defer logDebug.Println(peer, "- Routine: sequential sender - stopped") logDebug.Println(peer, "- Routine: sequential sender - started") - for { - select { - - case <-peer.routines.stop: - return - - case elem, ok := <-peer.queue.outbound: - - if !ok { - return - } - - elem.Lock() - if elem.IsDropped() { - device.PutOutboundElement(elem) - continue - } - - peer.timersAnyAuthenticatedPacketTraversal() - peer.timersAnyAuthenticatedPacketSent() - - // send message and return buffer to pool - - err := peer.SendBuffer(elem.packet) - if len(elem.packet) != MessageKeepaliveSize { - peer.timersDataSent() - } + for elem := range peer.queue.outbound { + elem.Lock() + if elem.IsDropped() { + device.PutOutboundElement(elem) + continue + } + if !peer.isRunning.Get() { + // peer has been stopped; return re-usable elems to the shared pool. + // This is an optimization only. It is possible for the peer to be stopped + // immediately after this check, in which case, elem will get processed. + // The timers and SendBuffer code are resilient to a few stragglers. + // TODO(josharian): rework peer shutdown order to ensure + // that we never accidentally keep timers alive longer than necessary. device.PutMessageBuffer(elem.buffer) device.PutOutboundElement(elem) - if err != nil { - logError.Println(peer, "- Failed to send data packet", err) - continue - } + continue + } + + peer.timersAnyAuthenticatedPacketTraversal() + peer.timersAnyAuthenticatedPacketSent() - peer.keepKeyFreshSending() + // send message and return buffer to pool + + err := peer.SendBuffer(elem.packet) + if len(elem.packet) != MessageKeepaliveSize { + peer.timersDataSent() } + device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) + if err != nil { + logError.Println(peer, "- Failed to send data packet", err) + continue + } + + peer.keepKeyFreshSending() } } |