diff options
Diffstat (limited to 'device/send.go')
-rw-r--r-- | device/send.go | 48 |
1 files changed, 25 insertions, 23 deletions
diff --git a/device/send.go b/device/send.go index 04d2001..5261c2f 100644 --- a/device/send.go +++ b/device/send.go @@ -74,22 +74,17 @@ func (elem *QueueOutboundElement) clearPointers() { /* Queues a keepalive if no packets are queued for peer */ func (peer *Peer) SendKeepalive() { - var elem *QueueOutboundElement - peer.queue.RLock() - if len(peer.queue.staged) != 0 || !peer.isRunning.Get() { - goto out - } - elem = peer.device.NewOutboundElement() - elem.packet = nil - select { - case peer.queue.staged <- elem: - peer.device.log.Verbosef("%v - Sending keepalive packet", peer) - default: - peer.device.PutMessageBuffer(elem.buffer) - peer.device.PutOutboundElement(elem) + if len(peer.queue.staged) == 0 && peer.isRunning.Get() { + elem := peer.device.NewOutboundElement() + elem.packet = nil + select { + case peer.queue.staged <- elem: + peer.device.log.Verbosef("%v - Sending keepalive packet", peer) + default: + peer.device.PutMessageBuffer(elem.buffer) + peer.device.PutOutboundElement(elem) + } } -out: - peer.queue.RUnlock() peer.SendStagedPackets() } @@ -176,7 +171,6 @@ func (peer *Peer) SendHandshakeResponse() error { } func (device *Device) SendHandshakeCookie(initiatingElem *QueueHandshakeElement) error { - device.log.Verbosef("Sending cookie response for denied handshake message for %v", initiatingElem.endpoint.DstToString()) sender := binary.LittleEndian.Uint32(initiatingElem.packet[4:8]) @@ -297,6 +291,8 @@ func (peer *Peer) StagePacket(elem *QueueOutboundElement) { } func (peer *Peer) SendStagedPackets() { + peer.device.queue.encryption.wg.Add(1) + defer peer.device.queue.encryption.wg.Done() top: if len(peer.queue.staged) == 0 || !peer.device.isUp.Get() { return @@ -307,8 +303,6 @@ top: peer.SendHandshakeInitiation(false) return } - peer.device.queue.encryption.wg.Add(1) - defer peer.device.queue.encryption.wg.Done() for { select { @@ -325,8 +319,15 @@ top: elem.Lock() // add to parallel and sequential queue - peer.queue.outbound <- elem - peer.device.queue.encryption.c <- elem + peer.queue.RLock() + if peer.isRunning.Get() { + peer.queue.outbound <- elem + peer.device.queue.encryption.c <- elem + } else { + peer.device.PutMessageBuffer(elem.buffer) + peer.device.PutOutboundElement(elem) + } + peer.queue.RUnlock() default: return } @@ -410,10 +411,11 @@ func (device *Device) RoutineEncryption() { * The routine terminates then the outbound queue is closed. */ func (peer *Peer) RoutineSequentialSender() { - device := peer.device - - defer device.log.Verbosef("%v - Routine: sequential sender - stopped", peer) + defer func() { + defer device.log.Verbosef("%v - Routine: sequential sender - stopped", peer) + peer.stopping.Done() + }() device.log.Verbosef("%v - Routine: sequential sender - started", peer) for elem := range peer.queue.outbound { |