summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--device.go31
-rw-r--r--receive.go21
-rw-r--r--send.go15
3 files changed, 36 insertions, 31 deletions
diff --git a/device.go b/device.go
index 8b0d2a5..c714b21 100644
--- a/device.go
+++ b/device.go
@@ -13,6 +13,10 @@ import (
"time"
)
+const (
+ DeviceRoutineNumberPerCPU = 3
+)
+
type Device struct {
isUp AtomicBool // device is (going) up
isClosed AtomicBool // device is closed? (acting as guard)
@@ -21,6 +25,7 @@ type Device struct {
// synchronized resources (locks acquired in order)
state struct {
+ stopping sync.WaitGroup
mutex sync.Mutex
changing AtomicBool
current bool
@@ -306,7 +311,9 @@ func NewDevice(tun TUNDevice, logger *Logger) *Device {
// start workers
- for i := 0; i < runtime.NumCPU(); i += 1 {
+ cpus := runtime.NumCPU()
+ device.state.stopping.Add(DeviceRoutineNumberPerCPU * cpus)
+ for i := 0; i < cpus; i += 1 {
go device.RoutineEncryption()
go device.RoutineDecryption()
go device.RoutineHandshake()
@@ -360,6 +367,25 @@ func (device *Device) RemoveAllPeers() {
device.peers.keyMap = make(map[NoisePublicKey]*Peer)
}
+func (device *Device) FlushPacketQueues() {
+ for {
+ select {
+ case elem, ok := <-device.queue.decryption:
+ if ok {
+ elem.Drop()
+ }
+ case elem, ok := <-device.queue.encryption:
+ if ok {
+ elem.Drop()
+ }
+ case <-device.queue.handshake:
+ default:
+ return
+ }
+ }
+
+}
+
func (device *Device) Close() {
if device.isClosed.Swap(true) {
return
@@ -376,6 +402,9 @@ func (device *Device) Close() {
device.signal.stop.Broadcast()
+ device.state.stopping.Wait()
+ device.FlushPacketQueues()
+
device.RemoveAllPeers()
device.rate.limiter.Close()
diff --git a/receive.go b/receive.go
index b6261b2..1cf77b2 100644
--- a/receive.go
+++ b/receive.go
@@ -238,17 +238,8 @@ func (device *Device) RoutineDecryption() {
logDebug := device.log.Debug
defer func() {
- for {
- select {
- case elem, ok := <-device.queue.decryption:
- if ok {
- elem.Drop()
- }
- default:
- break
- }
- }
logDebug.Println("Routine: decryption worker - stopped")
+ device.state.stopping.Done()
}()
logDebug.Println("Routine: decryption worker - started")
@@ -313,14 +304,8 @@ func (device *Device) RoutineHandshake() {
logDebug := device.log.Debug
defer func() {
- for {
- select {
- case <-device.queue.handshake:
- default:
- return
- }
- }
logDebug.Println("Routine: handshake worker - stopped")
+ device.state.stopping.Done()
}()
logDebug.Println("Routine: handshake worker - started")
@@ -549,8 +534,8 @@ func (peer *Peer) RoutineSequentialReceiver() {
logDebug := device.log.Debug
defer func() {
- peer.routines.stopping.Done()
logDebug.Println(peer, ": Routine: sequential receiver - stopped")
+ peer.routines.stopping.Done()
}()
logDebug.Println(peer, ": Routine: sequential receiver - started")
diff --git a/send.go b/send.go
index a7c68be..ddebb99 100644
--- a/send.go
+++ b/send.go
@@ -200,8 +200,8 @@ func (peer *Peer) RoutineNonce() {
logDebug := device.log.Debug
defer func() {
- peer.routines.stopping.Done()
logDebug.Println(peer, ": Routine: nonce worker - stopped")
+ peer.routines.stopping.Done()
}()
peer.routines.starting.Done()
@@ -277,17 +277,8 @@ func (device *Device) RoutineEncryption() {
logDebug := device.log.Debug
defer func() {
- for {
- select {
- case elem, ok := <-device.queue.encryption:
- if ok {
- elem.Drop()
- }
- default:
- break
- }
- }
logDebug.Println("Routine: encryption worker - stopped")
+ device.state.stopping.Done()
}()
logDebug.Println("Routine: encryption worker - started")
@@ -360,8 +351,8 @@ func (peer *Peer) RoutineSequentialSender() {
logDebug := device.log.Debug
defer func() {
- peer.routines.stopping.Done()
logDebug.Println(peer, ": Routine: sequential sender - stopped")
+ peer.routines.stopping.Done()
}()
logDebug.Println(peer, ": Routine: sequential sender - started")