diff options
Diffstat (limited to 'src/send.go')
-rw-r--r-- | src/send.go | 81 |
1 files changed, 45 insertions, 36 deletions
diff --git a/src/send.go b/src/send.go index 2db74ba..fdbc676 100644 --- a/src/send.go +++ b/src/send.go @@ -270,50 +270,65 @@ func (peer *Peer) RoutineNonce() { * Obs. One instance per core */ func (device *Device) RoutineEncryption() { + + var elem *QueueOutboundElement var nonce [chacha20poly1305.NonceSize]byte - for work := range device.queue.encryption { + + logDebug := device.log.Debug + logDebug.Println("Routine, encryption worker, started") + + for { + + // fetch next element + + select { + case elem = <-device.queue.encryption: + case <-device.signal.stop: + logDebug.Println("Routine, encryption worker, stopped") + return + } // check if dropped - if work.IsDropped() { + if elem.IsDropped() { continue } // populate header fields - header := work.buffer[:MessageTransportHeaderSize] + header := elem.buffer[:MessageTransportHeaderSize] fieldType := header[0:4] fieldReceiver := header[4:8] fieldNonce := header[8:16] binary.LittleEndian.PutUint32(fieldType, MessageTransportType) - binary.LittleEndian.PutUint32(fieldReceiver, work.keyPair.remoteIndex) - binary.LittleEndian.PutUint64(fieldNonce, work.nonce) + binary.LittleEndian.PutUint32(fieldReceiver, elem.keyPair.remoteIndex) + binary.LittleEndian.PutUint64(fieldNonce, elem.nonce) // pad content to MTU size mtu := int(atomic.LoadInt32(&device.mtu)) - for i := len(work.packet); i < mtu; i++ { - work.packet = append(work.packet, 0) + for i := len(elem.packet); i < mtu; i++ { + elem.packet = append(elem.packet, 0) } // encrypt content - binary.LittleEndian.PutUint64(nonce[4:], work.nonce) - work.packet = work.keyPair.send.Seal( - work.packet[:0], + binary.LittleEndian.PutUint64(nonce[4:], elem.nonce) + elem.packet = elem.keyPair.send.Seal( + elem.packet[:0], nonce[:], - work.packet, + elem.packet, nil, ) - length := MessageTransportHeaderSize + len(work.packet) - work.packet = work.buffer[:length] - work.mutex.Unlock() + length := MessageTransportHeaderSize + len(elem.packet) + elem.packet = elem.buffer[:length] + elem.mutex.Unlock() // refresh key if necessary - work.peer.KeepKeyFreshSending() + elem.peer.KeepKeyFreshSending() } } @@ -334,49 +349,43 @@ func (peer *Peer) RoutineSequentialSender() { logDebug.Println("Routine, sequential sender, stopped for", peer.String()) return - case work := <-peer.queue.outbound: - work.mutex.Lock() + case elem := <-peer.queue.outbound: + elem.mutex.Lock() func() { - - // return buffer to pool after processing - - defer device.PutMessageBuffer(work.buffer) - if work.IsDropped() { + if elem.IsDropped() { return } - // send to endpoint + // get endpoint and connection peer.mutex.RLock() - defer peer.mutex.RUnlock() - - if peer.endpoint == nil { + endpoint := peer.endpoint + peer.mutex.RUnlock() + if endpoint == nil { logDebug.Println("No endpoint for", peer.String()) return } device.net.mutex.RLock() - defer device.net.mutex.RUnlock() - - if device.net.conn == nil { + conn := device.net.conn + device.net.mutex.RUnlock() + if conn == nil { logDebug.Println("No source for device") return } - // send message and return buffer to pool + // send message and refresh keys - _, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint) + _, err := conn.WriteToUDP(elem.packet, endpoint) if err != nil { return } - - atomic.AddUint64(&peer.txBytes, uint64(len(work.packet))) - - // reset keep-alive - + atomic.AddUint64(&peer.txBytes, uint64(len(elem.packet))) peer.TimerResetKeepalive() }() + + device.PutMessageBuffer(elem.buffer) } } } |