diff options
author | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-22 06:29:02 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-24 00:37:43 +0200 |
commit | 833597b585f460aaa17bad93ad59290ec282e77e (patch) | |
tree | 209278c3c686fefffc56067b03a4f66017e009ae /receive.go | |
parent | cf81a28dd30bd8714432d2ff108d64c7f4b65e50 (diff) |
More pooling
Diffstat (limited to 'receive.go')
-rw-r--r-- | receive.go | 40 |
1 files changed, 23 insertions, 17 deletions
@@ -55,6 +55,7 @@ func (device *Device) addToInboundAndDecryptionQueues(inboundQueue chan *QueueIn return false } default: + device.PutInboundElement(element) return false } } @@ -168,15 +169,15 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { } // create work element - peer := value.peer - elem := &QueueInboundElement{ - packet: packet, - buffer: buffer, - keypair: keypair, - dropped: AtomicFalse, - endpoint: endpoint, - } + elem := device.GetInboundElement() + elem.packet = packet + elem.buffer = buffer + elem.keypair = keypair + elem.dropped = AtomicFalse + elem.endpoint = endpoint + elem.counter = 0 + elem.mutex = sync.Mutex{} elem.mutex.Lock() // add to decryption queues @@ -246,6 +247,7 @@ func (device *Device) RoutineDecryption() { // check if dropped if elem.IsDropped() { + device.PutInboundElement(elem) continue } @@ -280,7 +282,6 @@ func (device *Device) RoutineDecryption() { elem.Drop() device.PutMessageBuffer(elem.buffer) elem.buffer = nil - elem.mutex.Unlock() } elem.mutex.Unlock() } @@ -487,12 +488,16 @@ func (peer *Peer) RoutineSequentialReceiver() { logDebug := device.log.Debug var elem *QueueInboundElement + var ok bool defer func() { logDebug.Println(peer, "- Routine: sequential receiver - stopped") peer.routines.stopping.Done() - if elem != nil && elem.buffer != nil { - device.PutMessageBuffer(elem.buffer) + if elem != nil { + if elem.buffer != nil { + device.PutMessageBuffer(elem.buffer) + } + device.PutInboundElement(elem) } }() @@ -501,8 +506,11 @@ func (peer *Peer) RoutineSequentialReceiver() { peer.routines.starting.Done() for { - if elem != nil && elem.buffer != nil { - device.PutMessageBuffer(elem.buffer) + if elem != nil { + if elem.buffer != nil { + device.PutMessageBuffer(elem.buffer) + } + device.PutInboundElement(elem) } select { @@ -510,7 +518,7 @@ func (peer *Peer) RoutineSequentialReceiver() { case <-peer.routines.stop: return - case elem, ok := <-peer.queue.inbound: + case elem, ok = <-peer.queue.inbound: if !ok { return @@ -621,9 +629,7 @@ func (peer *Peer) RoutineSequentialReceiver() { offset := MessageTransportOffsetContent atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) - _, err := device.tun.device.Write( - elem.buffer[:offset+len(elem.packet)], - offset) + _, err := device.tun.device.Write(elem.buffer[:offset+len(elem.packet)], offset) if err != nil { logError.Println("Failed to write packet to TUN device:", err) } |