summaryrefslogtreecommitdiffhomepage
path: root/send.go
diff options
context:
space:
mode:
authorJason A. Donenfeld <Jason@zx2c4.com>2018-09-22 06:29:02 +0200
committerJason A. Donenfeld <Jason@zx2c4.com>2018-09-24 00:37:43 +0200
commit833597b585f460aaa17bad93ad59290ec282e77e (patch)
tree209278c3c686fefffc56067b03a4f66017e009ae /send.go
parentcf81a28dd30bd8714432d2ff108d64c7f4b65e50 (diff)
More pooling
Diffstat (limited to 'send.go')
-rw-r--r--send.go34
1 files changed, 27 insertions, 7 deletions
diff --git a/send.go b/send.go
index 24e2f39..fa84043 100644
--- a/send.go
+++ b/send.go
@@ -52,10 +52,14 @@ type QueueOutboundElement struct {
}
func (device *Device) NewOutboundElement() *QueueOutboundElement {
- return &QueueOutboundElement{
- dropped: AtomicFalse,
- buffer: device.GetMessageBuffer(),
- }
+ elem := device.GetOutboundElement()
+ elem.dropped = AtomicFalse
+ elem.buffer = device.GetMessageBuffer()
+ elem.mutex = sync.Mutex{}
+ elem.nonce = 0
+ elem.keypair = nil
+ elem.peer = nil
+ return elem
}
func (elem *QueueOutboundElement) Drop() {
@@ -75,6 +79,7 @@ func addToNonceQueue(queue chan *QueueOutboundElement, element *QueueOutboundEle
select {
case old := <-queue:
device.PutMessageBuffer(old.buffer)
+ device.PutOutboundElement(old)
default:
}
}
@@ -94,6 +99,7 @@ func addToOutboundAndEncryptionQueues(outboundQueue chan *QueueOutboundElement,
}
default:
element.peer.device.PutMessageBuffer(element.buffer)
+ element.peer.device.PutOutboundElement(element)
}
}
@@ -111,6 +117,7 @@ func (peer *Peer) SendKeepalive() bool {
return true
default:
peer.device.PutMessageBuffer(elem.buffer)
+ peer.device.PutOutboundElement(elem)
return false
}
}
@@ -236,8 +243,6 @@ func (peer *Peer) keepKeyFreshSending() {
*/
func (device *Device) RoutineReadFromTUN() {
- elem := device.NewOutboundElement()
-
logDebug := device.log.Debug
logError := device.log.Error
@@ -249,7 +254,14 @@ func (device *Device) RoutineReadFromTUN() {
logDebug.Println("Routine: TUN reader - started")
device.state.starting.Done()
+ var elem *QueueOutboundElement
+
for {
+ if elem != nil {
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ }
+ elem = device.NewOutboundElement()
// read packet
@@ -262,6 +274,7 @@ func (device *Device) RoutineReadFromTUN() {
device.Close()
}
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
return
}
@@ -304,7 +317,7 @@ func (device *Device) RoutineReadFromTUN() {
peer.SendHandshakeInitiation(false)
}
addToNonceQueue(peer.queue.nonce, elem, device)
- elem = device.NewOutboundElement()
+ elem = nil
}
}
}
@@ -339,6 +352,7 @@ func (peer *Peer) RoutineNonce() {
select {
case elem := <-peer.queue.nonce:
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
default:
return
}
@@ -399,11 +413,13 @@ func (peer *Peer) RoutineNonce() {
case <-peer.signals.flushNonceQueue:
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
flush()
goto NextPacket
case <-peer.routines.stop:
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
return
}
}
@@ -419,6 +435,7 @@ func (peer *Peer) RoutineNonce() {
if elem.nonce >= RejectAfterMessages {
atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages)
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
goto NextPacket
}
@@ -468,6 +485,7 @@ func (device *Device) RoutineEncryption() {
// check if dropped
if elem.IsDropped() {
+ device.PutOutboundElement(elem)
continue
}
@@ -544,6 +562,7 @@ func (peer *Peer) RoutineSequentialSender() {
elem.mutex.Lock()
if elem.IsDropped() {
+ device.PutOutboundElement(elem)
continue
}
@@ -555,6 +574,7 @@ func (peer *Peer) RoutineSequentialSender() {
length := uint64(len(elem.packet))
err := peer.SendBuffer(elem.packet)
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
if err != nil {
logError.Println(peer, "- Failed to send data packet", err)
continue