summaryrefslogtreecommitdiffhomepage
path: root/receive.go
diff options
context:
space:
mode:
Diffstat (limited to 'receive.go')
-rw-r--r--receive.go40
1 files changed, 23 insertions, 17 deletions
diff --git a/receive.go b/receive.go
index 9bf3af3..ab86913 100644
--- a/receive.go
+++ b/receive.go
@@ -55,6 +55,7 @@ func (device *Device) addToInboundAndDecryptionQueues(inboundQueue chan *QueueIn
return false
}
default:
+ device.PutInboundElement(element)
return false
}
}
@@ -168,15 +169,15 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) {
}
// create work element
-
peer := value.peer
- elem := &QueueInboundElement{
- packet: packet,
- buffer: buffer,
- keypair: keypair,
- dropped: AtomicFalse,
- endpoint: endpoint,
- }
+ elem := device.GetInboundElement()
+ elem.packet = packet
+ elem.buffer = buffer
+ elem.keypair = keypair
+ elem.dropped = AtomicFalse
+ elem.endpoint = endpoint
+ elem.counter = 0
+ elem.mutex = sync.Mutex{}
elem.mutex.Lock()
// add to decryption queues
@@ -246,6 +247,7 @@ func (device *Device) RoutineDecryption() {
// check if dropped
if elem.IsDropped() {
+ device.PutInboundElement(elem)
continue
}
@@ -280,7 +282,6 @@ func (device *Device) RoutineDecryption() {
elem.Drop()
device.PutMessageBuffer(elem.buffer)
elem.buffer = nil
- elem.mutex.Unlock()
}
elem.mutex.Unlock()
}
@@ -487,12 +488,16 @@ func (peer *Peer) RoutineSequentialReceiver() {
logDebug := device.log.Debug
var elem *QueueInboundElement
+ var ok bool
defer func() {
logDebug.Println(peer, "- Routine: sequential receiver - stopped")
peer.routines.stopping.Done()
- if elem != nil && elem.buffer != nil {
- device.PutMessageBuffer(elem.buffer)
+ if elem != nil {
+ if elem.buffer != nil {
+ device.PutMessageBuffer(elem.buffer)
+ }
+ device.PutInboundElement(elem)
}
}()
@@ -501,8 +506,11 @@ func (peer *Peer) RoutineSequentialReceiver() {
peer.routines.starting.Done()
for {
- if elem != nil && elem.buffer != nil {
- device.PutMessageBuffer(elem.buffer)
+ if elem != nil {
+ if elem.buffer != nil {
+ device.PutMessageBuffer(elem.buffer)
+ }
+ device.PutInboundElement(elem)
}
select {
@@ -510,7 +518,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
case <-peer.routines.stop:
return
- case elem, ok := <-peer.queue.inbound:
+ case elem, ok = <-peer.queue.inbound:
if !ok {
return
@@ -621,9 +629,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
offset := MessageTransportOffsetContent
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
- _, err := device.tun.device.Write(
- elem.buffer[:offset+len(elem.packet)],
- offset)
+ _, err := device.tun.device.Write(elem.buffer[:offset+len(elem.packet)], offset)
if err != nil {
logError.Println("Failed to write packet to TUN device:", err)
}