diff options
author | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-22 06:29:02 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-24 00:37:43 +0200 |
commit | 833597b585f460aaa17bad93ad59290ec282e77e (patch) | |
tree | 209278c3c686fefffc56067b03a4f66017e009ae /send.go | |
parent | cf81a28dd30bd8714432d2ff108d64c7f4b65e50 (diff) |
More pooling
Diffstat (limited to 'send.go')
-rw-r--r-- | send.go | 34 |
1 files changed, 27 insertions, 7 deletions
@@ -52,10 +52,14 @@ type QueueOutboundElement struct { } func (device *Device) NewOutboundElement() *QueueOutboundElement { - return &QueueOutboundElement{ - dropped: AtomicFalse, - buffer: device.GetMessageBuffer(), - } + elem := device.GetOutboundElement() + elem.dropped = AtomicFalse + elem.buffer = device.GetMessageBuffer() + elem.mutex = sync.Mutex{} + elem.nonce = 0 + elem.keypair = nil + elem.peer = nil + return elem } func (elem *QueueOutboundElement) Drop() { @@ -75,6 +79,7 @@ func addToNonceQueue(queue chan *QueueOutboundElement, element *QueueOutboundEle select { case old := <-queue: device.PutMessageBuffer(old.buffer) + device.PutOutboundElement(old) default: } } @@ -94,6 +99,7 @@ func addToOutboundAndEncryptionQueues(outboundQueue chan *QueueOutboundElement, } default: element.peer.device.PutMessageBuffer(element.buffer) + element.peer.device.PutOutboundElement(element) } } @@ -111,6 +117,7 @@ func (peer *Peer) SendKeepalive() bool { return true default: peer.device.PutMessageBuffer(elem.buffer) + peer.device.PutOutboundElement(elem) return false } } @@ -236,8 +243,6 @@ func (peer *Peer) keepKeyFreshSending() { */ func (device *Device) RoutineReadFromTUN() { - elem := device.NewOutboundElement() - logDebug := device.log.Debug logError := device.log.Error @@ -249,7 +254,14 @@ func (device *Device) RoutineReadFromTUN() { logDebug.Println("Routine: TUN reader - started") device.state.starting.Done() + var elem *QueueOutboundElement + for { + if elem != nil { + device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) + } + elem = device.NewOutboundElement() // read packet @@ -262,6 +274,7 @@ func (device *Device) RoutineReadFromTUN() { device.Close() } device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) return } @@ -304,7 +317,7 @@ func (device *Device) RoutineReadFromTUN() { peer.SendHandshakeInitiation(false) } addToNonceQueue(peer.queue.nonce, elem, device) - elem = device.NewOutboundElement() + elem = nil } } } @@ -339,6 +352,7 @@ func (peer *Peer) RoutineNonce() { select { case elem := <-peer.queue.nonce: device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) default: return } @@ -399,11 +413,13 @@ func (peer *Peer) RoutineNonce() { case <-peer.signals.flushNonceQueue: device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) flush() goto NextPacket case <-peer.routines.stop: device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) return } } @@ -419,6 +435,7 @@ func (peer *Peer) RoutineNonce() { if elem.nonce >= RejectAfterMessages { atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages) device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) goto NextPacket } @@ -468,6 +485,7 @@ func (device *Device) RoutineEncryption() { // check if dropped if elem.IsDropped() { + device.PutOutboundElement(elem) continue } @@ -544,6 +562,7 @@ func (peer *Peer) RoutineSequentialSender() { elem.mutex.Lock() if elem.IsDropped() { + device.PutOutboundElement(elem) continue } @@ -555,6 +574,7 @@ func (peer *Peer) RoutineSequentialSender() { length := uint64(len(elem.packet)) err := peer.SendBuffer(elem.packet) device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) if err != nil { logError.Println(peer, "- Failed to send data packet", err) continue |