diff options
Diffstat (limited to 'receive.go')
-rw-r--r-- | receive.go | 40 |
1 files changed, 23 insertions, 17 deletions
@@ -55,6 +55,7 @@ func (device *Device) addToInboundAndDecryptionQueues(inboundQueue chan *QueueIn return false } default: + device.PutInboundElement(element) return false } } @@ -168,15 +169,15 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { } // create work element - peer := value.peer - elem := &QueueInboundElement{ - packet: packet, - buffer: buffer, - keypair: keypair, - dropped: AtomicFalse, - endpoint: endpoint, - } + elem := device.GetInboundElement() + elem.packet = packet + elem.buffer = buffer + elem.keypair = keypair + elem.dropped = AtomicFalse + elem.endpoint = endpoint + elem.counter = 0 + elem.mutex = sync.Mutex{} elem.mutex.Lock() // add to decryption queues @@ -246,6 +247,7 @@ func (device *Device) RoutineDecryption() { // check if dropped if elem.IsDropped() { + device.PutInboundElement(elem) continue } @@ -280,7 +282,6 @@ func (device *Device) RoutineDecryption() { elem.Drop() device.PutMessageBuffer(elem.buffer) elem.buffer = nil - elem.mutex.Unlock() } elem.mutex.Unlock() } @@ -487,12 +488,16 @@ func (peer *Peer) RoutineSequentialReceiver() { logDebug := device.log.Debug var elem *QueueInboundElement + var ok bool defer func() { logDebug.Println(peer, "- Routine: sequential receiver - stopped") peer.routines.stopping.Done() - if elem != nil && elem.buffer != nil { - device.PutMessageBuffer(elem.buffer) + if elem != nil { + if elem.buffer != nil { + device.PutMessageBuffer(elem.buffer) + } + device.PutInboundElement(elem) } }() @@ -501,8 +506,11 @@ func (peer *Peer) RoutineSequentialReceiver() { peer.routines.starting.Done() for { - if elem != nil && elem.buffer != nil { - device.PutMessageBuffer(elem.buffer) + if elem != nil { + if elem.buffer != nil { + device.PutMessageBuffer(elem.buffer) + } + device.PutInboundElement(elem) } select { @@ -510,7 +518,7 @@ func (peer *Peer) RoutineSequentialReceiver() { case <-peer.routines.stop: return - case elem, ok := <-peer.queue.inbound: + case elem, ok = <-peer.queue.inbound: if !ok { return @@ -621,9 +629,7 @@ func (peer *Peer) RoutineSequentialReceiver() { offset := MessageTransportOffsetContent atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) - _, err := device.tun.device.Write( - elem.buffer[:offset+len(elem.packet)], - offset) + _, err := device.tun.device.Write(elem.buffer[:offset+len(elem.packet)], offset) if err != nil { logError.Println("Failed to write packet to TUN device:", err) } |