diff options
Diffstat (limited to 'device/send.go')
-rw-r--r-- | device/send.go | 93 |
1 files changed, 34 insertions, 59 deletions
diff --git a/device/send.go b/device/send.go index 0801b71..1b16edd 100644 --- a/device/send.go +++ b/device/send.go @@ -352,6 +352,9 @@ func (peer *Peer) RoutineNonce() { device := peer.device logDebug := device.log.Debug + // We write to the encryption queue; keep it alive until we are done. + device.queue.encryption.wg.Add(1) + flush := func() { for { select { @@ -368,6 +371,7 @@ func (peer *Peer) RoutineNonce() { flush() logDebug.Println(peer, "- Routine: nonce worker - stopped") peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) + device.queue.encryption.wg.Done() // no more writes from us peer.routines.stopping.Done() }() @@ -455,7 +459,7 @@ NextPacket: elem.Lock() // add to parallel and sequential queue - addToOutboundAndEncryptionQueues(peer.queue.outbound, device.queue.encryption, elem) + addToOutboundAndEncryptionQueues(peer.queue.outbound, device.queue.encryption.c, elem) } } } @@ -486,76 +490,46 @@ func (device *Device) RoutineEncryption() { logDebug := device.log.Debug - defer func() { - for { - select { - case elem, ok := <-device.queue.encryption: - if ok && !elem.IsDropped() { - elem.Drop() - device.PutMessageBuffer(elem.buffer) - elem.Unlock() - } - default: - goto out - } - } - out: - logDebug.Println("Routine: encryption worker - stopped") - device.state.stopping.Done() - }() - + defer logDebug.Println("Routine: encryption worker - stopped") logDebug.Println("Routine: encryption worker - started") - for { - - // fetch next element + for elem := range device.queue.encryption.c { - select { - case <-device.signals.stop: - return - - case elem, ok := <-device.queue.encryption: - - if !ok { - return - } - - // check if dropped + // check if dropped - if elem.IsDropped() { - continue - } + if elem.IsDropped() { + continue + } - // populate header fields + // populate header fields - header := elem.buffer[:MessageTransportHeaderSize] + header := elem.buffer[:MessageTransportHeaderSize] - fieldType := header[0:4] - fieldReceiver := header[4:8] - fieldNonce := header[8:16] + fieldType := header[0:4] + fieldReceiver := header[4:8] + fieldNonce := header[8:16] - binary.LittleEndian.PutUint32(fieldType, MessageTransportType) - binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex) - binary.LittleEndian.PutUint64(fieldNonce, elem.nonce) + binary.LittleEndian.PutUint32(fieldType, MessageTransportType) + binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex) + binary.LittleEndian.PutUint64(fieldNonce, elem.nonce) - // pad content to multiple of 16 + // pad content to multiple of 16 - paddingSize := calculatePaddingSize(len(elem.packet), int(atomic.LoadInt32(&device.tun.mtu))) - for i := 0; i < paddingSize; i++ { - elem.packet = append(elem.packet, 0) - } + paddingSize := calculatePaddingSize(len(elem.packet), int(atomic.LoadInt32(&device.tun.mtu))) + for i := 0; i < paddingSize; i++ { + elem.packet = append(elem.packet, 0) + } - // encrypt content and release to consumer + // encrypt content and release to consumer - binary.LittleEndian.PutUint64(nonce[4:], elem.nonce) - elem.packet = elem.keypair.send.Seal( - header, - nonce[:], - elem.packet, - nil, - ) - elem.Unlock() - } + binary.LittleEndian.PutUint64(nonce[4:], elem.nonce) + elem.packet = elem.keypair.send.Seal( + header, + nonce[:], + elem.packet, + nil, + ) + elem.Unlock() } } @@ -576,6 +550,7 @@ func (peer *Peer) RoutineSequentialSender() { select { case elem, ok := <-peer.queue.outbound: if ok { + elem.Lock() if !elem.IsDropped() { device.PutMessageBuffer(elem.buffer) elem.Drop() |