summaryrefslogtreecommitdiffhomepage
path: root/src/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/send.go')
-rw-r--r--src/send.go58
1 files changed, 38 insertions, 20 deletions
diff --git a/src/send.go b/src/send.go
index a02f5cb..5ea9a8f 100644
--- a/src/send.go
+++ b/src/send.go
@@ -31,7 +31,7 @@ import (
* (to allow the construction of transport messages in-place)
*/
type QueueOutboundElement struct {
- state uint32
+ dropped int32
mutex sync.Mutex
data [MaxMessageSize]byte
packet []byte // slice of "data" (always!)
@@ -61,11 +61,11 @@ func (device *Device) NewOutboundElement() *QueueOutboundElement {
}
func (elem *QueueOutboundElement) Drop() {
- atomic.StoreUint32(&elem.state, ElementStateDropped)
+ atomic.StoreInt32(&elem.dropped, AtomicTrue)
}
func (elem *QueueOutboundElement) IsDropped() bool {
- return atomic.LoadUint32(&elem.state) == ElementStateDropped
+ return atomic.LoadInt32(&elem.dropped) == AtomicTrue
}
func addToOutboundQueue(
@@ -86,6 +86,25 @@ func addToOutboundQueue(
}
}
+func addToEncryptionQueue(
+ queue chan *QueueOutboundElement,
+ element *QueueOutboundElement,
+) {
+ for {
+ select {
+ case queue <- element:
+ return
+ default:
+ select {
+ case old := <-queue:
+ old.Drop()
+ old.mutex.Unlock()
+ default:
+ }
+ }
+ }
+}
+
/* Reads packets from the TUN and inserts
* into nonce queue for peer
*
@@ -196,9 +215,7 @@ func (peer *Peer) RoutineNonce() {
break
}
}
- logDebug.Println("Key pair:", keyPair)
-
- sendSignal(peer.signal.handshakeBegin)
+ signalSend(peer.signal.handshakeBegin)
logDebug.Println("Waiting for key-pair, peer", peer.id)
select {
@@ -225,12 +242,13 @@ func (peer *Peer) RoutineNonce() {
elem.keyPair = keyPair
elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1
+ elem.dropped = AtomicFalse
elem.peer = peer
elem.mutex.Lock()
- // add to parallel processing and sequential consuming queue
+ // add to parallel and sequential queue
- addToOutboundQueue(device.queue.encryption, elem)
+ addToEncryptionQueue(device.queue.encryption, elem)
addToOutboundQueue(peer.queue.outbound, elem)
elem = nil
}
@@ -246,6 +264,9 @@ func (peer *Peer) RoutineNonce() {
func (device *Device) RoutineEncryption() {
var nonce [chacha20poly1305.NonceSize]byte
for work := range device.queue.encryption {
+
+ // check if dropped
+
if work.IsDropped() {
continue
}
@@ -289,25 +310,25 @@ func (device *Device) RoutineEncryption() {
* The routine terminates then the outbound queue is closed.
*/
func (peer *Peer) RoutineSequentialSender() {
- logDebug := peer.device.log.Debug
- logDebug.Println("Routine, sequential sender, started for peer", peer.id)
-
device := peer.device
+ logDebug := device.log.Debug
+ logDebug.Println("Routine, sequential sender, started for peer", peer.id)
+
for {
select {
case <-peer.signal.stop:
logDebug.Println("Routine, sequential sender, stopped for peer", peer.id)
return
case work := <-peer.queue.outbound:
+ work.mutex.Lock()
if work.IsDropped() {
continue
}
- work.mutex.Lock()
+
func() {
- if work.packet == nil {
- return
- }
+
+ // send to endpoint
peer.mutex.RLock()
defer peer.mutex.RUnlock()
@@ -331,12 +352,9 @@ func (peer *Peer) RoutineSequentialSender() {
}
atomic.AddUint64(&peer.txBytes, uint64(len(work.packet)))
- // shift keep-alive timer
+ // reset keep-alive (passive keep-alives / acknowledgements)
- if peer.persistentKeepaliveInterval != 0 {
- interval := time.Duration(peer.persistentKeepaliveInterval) * time.Second
- peer.timer.sendKeepalive.Reset(interval)
- }
+ peer.TimerResetKeepalive()
}()
}
}