diff options
Diffstat (limited to 'src/send.go')
-rw-r--r-- | src/send.go | 58 |
1 files changed, 38 insertions, 20 deletions
diff --git a/src/send.go b/src/send.go index a02f5cb..5ea9a8f 100644 --- a/src/send.go +++ b/src/send.go @@ -31,7 +31,7 @@ import ( * (to allow the construction of transport messages in-place) */ type QueueOutboundElement struct { - state uint32 + dropped int32 mutex sync.Mutex data [MaxMessageSize]byte packet []byte // slice of "data" (always!) @@ -61,11 +61,11 @@ func (device *Device) NewOutboundElement() *QueueOutboundElement { } func (elem *QueueOutboundElement) Drop() { - atomic.StoreUint32(&elem.state, ElementStateDropped) + atomic.StoreInt32(&elem.dropped, AtomicTrue) } func (elem *QueueOutboundElement) IsDropped() bool { - return atomic.LoadUint32(&elem.state) == ElementStateDropped + return atomic.LoadInt32(&elem.dropped) == AtomicTrue } func addToOutboundQueue( @@ -86,6 +86,25 @@ func addToOutboundQueue( } } +func addToEncryptionQueue( + queue chan *QueueOutboundElement, + element *QueueOutboundElement, +) { + for { + select { + case queue <- element: + return + default: + select { + case old := <-queue: + old.Drop() + old.mutex.Unlock() + default: + } + } + } +} + /* Reads packets from the TUN and inserts * into nonce queue for peer * @@ -196,9 +215,7 @@ func (peer *Peer) RoutineNonce() { break } } - logDebug.Println("Key pair:", keyPair) - - sendSignal(peer.signal.handshakeBegin) + signalSend(peer.signal.handshakeBegin) logDebug.Println("Waiting for key-pair, peer", peer.id) select { @@ -225,12 +242,13 @@ func (peer *Peer) RoutineNonce() { elem.keyPair = keyPair elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 + elem.dropped = AtomicFalse elem.peer = peer elem.mutex.Lock() - // add to parallel processing and sequential consuming queue + // add to parallel and sequential queue - addToOutboundQueue(device.queue.encryption, elem) + addToEncryptionQueue(device.queue.encryption, elem) addToOutboundQueue(peer.queue.outbound, elem) elem = nil } @@ -246,6 +264,9 @@ func (peer *Peer) RoutineNonce() { func (device *Device) RoutineEncryption() { var nonce [chacha20poly1305.NonceSize]byte for work := range device.queue.encryption { + + // check if dropped + if work.IsDropped() { continue } @@ -289,25 +310,25 @@ func (device *Device) RoutineEncryption() { * The routine terminates then the outbound queue is closed. */ func (peer *Peer) RoutineSequentialSender() { - logDebug := peer.device.log.Debug - logDebug.Println("Routine, sequential sender, started for peer", peer.id) - device := peer.device + logDebug := device.log.Debug + logDebug.Println("Routine, sequential sender, started for peer", peer.id) + for { select { case <-peer.signal.stop: logDebug.Println("Routine, sequential sender, stopped for peer", peer.id) return case work := <-peer.queue.outbound: + work.mutex.Lock() if work.IsDropped() { continue } - work.mutex.Lock() + func() { - if work.packet == nil { - return - } + + // send to endpoint peer.mutex.RLock() defer peer.mutex.RUnlock() @@ -331,12 +352,9 @@ func (peer *Peer) RoutineSequentialSender() { } atomic.AddUint64(&peer.txBytes, uint64(len(work.packet))) - // shift keep-alive timer + // reset keep-alive (passive keep-alives / acknowledgements) - if peer.persistentKeepaliveInterval != 0 { - interval := time.Duration(peer.persistentKeepaliveInterval) * time.Second - peer.timer.sendKeepalive.Reset(interval) - } + peer.TimerResetKeepalive() }() } } |