diff options
author | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-05-01 16:59:13 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-05-01 17:46:28 +0200 |
commit | 168ef61a638e4875b260edbc51551bae0dc34ac3 (patch) | |
tree | 579a18ee07b9cf5427c9bab187707917215b7e5f /receive.go | |
parent | b34604245ec4dfb50846d0ba28d022be5b756c25 (diff) |
Add missing locks and fix debug output, and try to flush queues
Flushing queues on exit is sort of a partial solution, but this could be
better. Really what we want is for no more packets to be enqueued after
isUp is set to false.
Diffstat (limited to 'receive.go')
-rw-r--r-- | receive.go | 58 |
1 files changed, 48 insertions, 10 deletions
@@ -7,6 +7,7 @@ import ( "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" "net" + "strconv" "sync" "sync/atomic" "time" @@ -101,7 +102,11 @@ func (device *Device) addToHandshakeQueue( func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { logDebug := device.log.Debug - logDebug.Println("Routine, receive incoming, IP version:", IP) + defer func() { + logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - stopped") + }() + + logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - starting") // receive datagrams until conn is closed @@ -224,15 +229,31 @@ func (device *Device) RoutineDecryption() { var nonce [chacha20poly1305.NonceSize]byte logDebug := device.log.Debug - logDebug.Println("Routine, decryption, started for device") + defer func() { + for { + select { + case elem, ok := <-device.queue.decryption: + if ok { + elem.Drop() + } + default: + break + } + } + logDebug.Println("Routine: decryption worker - stopped") + }() + logDebug.Println("Routine: decryption worker - started") for { select { case <-device.signal.stop.Wait(): - logDebug.Println("Routine, decryption worker, stopped") return - case elem := <-device.queue.decryption: + case elem, ok := <-device.queue.decryption: + + if !ok { + return + } // check if dropped @@ -282,18 +303,35 @@ func (device *Device) RoutineHandshake() { logInfo := device.log.Info logError := device.log.Error logDebug := device.log.Debug - logDebug.Println("Routine, handshake routine, started for device") + + defer func() { + for { + select { + case <-device.queue.handshake: + default: + return + } + } + logDebug.Println("Routine: handshake worker - stopped") + }() + + logDebug.Println("Routine: handshake worker - started") var temp [MessageHandshakeSize]byte var elem QueueHandshakeElement + var ok bool for { select { - case elem = <-device.queue.handshake: + case elem, ok = <-device.queue.handshake: case <-device.signal.stop.Wait(): return } + if !ok { + return + } + // handle cookie fields and ratelimiting switch elem.msgType { @@ -419,7 +457,7 @@ func (device *Device) RoutineHandshake() { peer.endpoint = elem.endpoint peer.mutex.Unlock() - logDebug.Println(peer, ": Received handshake initiation") + logDebug.Println(peer.String() + ": Received handshake initiation") // create response @@ -477,7 +515,7 @@ func (device *Device) RoutineHandshake() { peer.endpoint = elem.endpoint peer.mutex.Unlock() - logDebug.Println(peer, ": Received handshake response") + logDebug.Println(peer.String() + ": Received handshake response") peer.TimerEphemeralKeyCreated() @@ -504,10 +542,10 @@ func (peer *Peer) RoutineSequentialReceiver() { defer func() { peer.routines.stopping.Done() - logDebug.Println(peer.String(), ": Routine, Sequential Receiver, Stopped") + logDebug.Println(peer.String() + ": Routine: sequential receiver - stopped") }() - logDebug.Println(peer.String(), ": Routine, Sequential Receiver, Started") + logDebug.Println(peer.String() + ": Routine: sequential receiver - started") peer.routines.starting.Done() |