diff options
Diffstat (limited to 'device/send.go')
-rw-r--r-- | device/send.go | 86 |
1 files changed, 33 insertions, 53 deletions
diff --git a/device/send.go b/device/send.go index 1b16edd..1f71f79 100644 --- a/device/send.go +++ b/device/send.go @@ -372,6 +372,7 @@ func (peer *Peer) RoutineNonce() { logDebug.Println(peer, "- Routine: nonce worker - stopped") peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) device.queue.encryption.wg.Done() // no more writes from us + close(peer.queue.outbound) // no more writes to this channel peer.routines.stopping.Done() }() @@ -545,64 +546,43 @@ func (peer *Peer) RoutineSequentialSender() { logDebug := device.log.Debug logError := device.log.Error - defer func() { - for { - select { - case elem, ok := <-peer.queue.outbound: - if ok { - elem.Lock() - if !elem.IsDropped() { - device.PutMessageBuffer(elem.buffer) - elem.Drop() - } - device.PutOutboundElement(elem) - } - default: - goto out - } - } - out: - logDebug.Println(peer, "- Routine: sequential sender - stopped") - peer.routines.stopping.Done() - }() - + defer logDebug.Println(peer, "- Routine: sequential sender - stopped") logDebug.Println(peer, "- Routine: sequential sender - started") - for { - select { - - case <-peer.routines.stop: - return - - case elem, ok := <-peer.queue.outbound: - - if !ok { - return - } - - elem.Lock() - if elem.IsDropped() { - device.PutOutboundElement(elem) - continue - } - - peer.timersAnyAuthenticatedPacketTraversal() - peer.timersAnyAuthenticatedPacketSent() - - // send message and return buffer to pool - - err := peer.SendBuffer(elem.packet) - if len(elem.packet) != MessageKeepaliveSize { - peer.timersDataSent() - } + for elem := range peer.queue.outbound { + elem.Lock() + if elem.IsDropped() { + device.PutOutboundElement(elem) + continue + } + if !peer.isRunning.Get() { + // peer has been stopped; return re-usable elems to the shared pool. + // This is an optimization only. It is possible for the peer to be stopped + // immediately after this check, in which case, elem will get processed. + // The timers and SendBuffer code are resilient to a few stragglers. + // TODO(josharian): rework peer shutdown order to ensure + // that we never accidentally keep timers alive longer than necessary. device.PutMessageBuffer(elem.buffer) device.PutOutboundElement(elem) - if err != nil { - logError.Println(peer, "- Failed to send data packet", err) - continue - } + continue + } + + peer.timersAnyAuthenticatedPacketTraversal() + peer.timersAnyAuthenticatedPacketSent() - peer.keepKeyFreshSending() + // send message and return buffer to pool + + err := peer.SendBuffer(elem.packet) + if len(elem.packet) != MessageKeepaliveSize { + peer.timersDataSent() } + device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) + if err != nil { + logError.Println(peer, "- Failed to send data packet", err) + continue + } + + peer.keepKeyFreshSending() } } |