diff options
author | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-16 21:50:58 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-16 21:50:58 +0200 |
commit | 39d6e4f2f18265c9cee1a2b1b456f6468950b932 (patch) | |
tree | 2d6c604dd6b5c16ca3a1cbf96d94351bc9c7c864 /receive.go | |
parent | 1c025570139f614f2083b935e2c58d5dbf199c2f (diff) |
Change queueing drop order and fix memory leaks
If the queues are full, we drop the present packet, which is better for
network traffic flow. Also, we try to fix up the memory leaks with not
putting buffers from our shared pool.
Diffstat (limited to 'receive.go')
-rw-r--r-- | receive.go | 79 |
1 files changed, 26 insertions, 53 deletions
@@ -43,59 +43,28 @@ func (elem *QueueInboundElement) IsDropped() bool { return atomic.LoadInt32(&elem.dropped) == AtomicTrue } -func (device *Device) addToInboundQueue( - queue chan *QueueInboundElement, - element *QueueInboundElement, -) { - for { +func (device *Device) addToInboundAndDecryptionQueues(inboundQueue chan *QueueInboundElement, decryptionQueue chan *QueueInboundElement, element *QueueInboundElement) bool { + select { + case inboundQueue <- element: select { - case queue <- element: - return + case decryptionQueue <- element: + return true default: - select { - case old := <-queue: - old.Drop() - default: - } + element.Drop() + element.mutex.Unlock() + return false } + default: + return false } } -func (device *Device) addToDecryptionQueue( - queue chan *QueueInboundElement, - element *QueueInboundElement, -) { - for { - select { - case queue <- element: - return - default: - select { - case old := <-queue: - // drop & release to potential consumer - old.Drop() - old.mutex.Unlock() - default: - } - } - } -} - -func (device *Device) addToHandshakeQueue( - queue chan QueueHandshakeElement, - element QueueHandshakeElement, -) { - for { - select { - case queue <- element: - return - default: - select { - case elem := <-queue: - device.PutMessageBuffer(elem.buffer) - default: - } - } +func (device *Device) addToHandshakeQueue(queue chan QueueHandshakeElement, element QueueHandshakeElement) bool { + select { + case queue <- element: + return true + default: + return false } } @@ -154,6 +123,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { } if err != nil { + device.PutMessageBuffer(buffer) return } @@ -212,9 +182,9 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { // add to decryption queues if peer.isRunning.Get() { - device.addToDecryptionQueue(device.queue.decryption, elem) - device.addToInboundQueue(peer.queue.inbound, elem) - buffer = device.GetMessageBuffer() + if device.addToInboundAndDecryptionQueues(peer.queue.inbound, device.queue.decryption, elem) { + buffer = device.GetMessageBuffer() + } } continue @@ -235,7 +205,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { } if okay { - device.addToHandshakeQueue( + if (device.addToHandshakeQueue( device.queue.handshake, QueueHandshakeElement{ msgType: msgType, @@ -243,8 +213,9 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { packet: packet, endpoint: endpoint, }, - ) - buffer = device.GetMessageBuffer() + )) { + buffer = device.GetMessageBuffer() + } } } } @@ -307,6 +278,8 @@ func (device *Device) RoutineDecryption() { ) if err != nil { elem.Drop() + device.PutMessageBuffer(elem.buffer) + elem.mutex.Unlock() } elem.mutex.Unlock() } |