diff options
Diffstat (limited to 'device.go')
-rw-r--r-- | device.go | 31 |
1 files changed, 30 insertions, 1 deletions
@@ -13,6 +13,10 @@ import ( "time" ) +const ( + DeviceRoutineNumberPerCPU = 3 +) + type Device struct { isUp AtomicBool // device is (going) up isClosed AtomicBool // device is closed? (acting as guard) @@ -21,6 +25,7 @@ type Device struct { // synchronized resources (locks acquired in order) state struct { + stopping sync.WaitGroup mutex sync.Mutex changing AtomicBool current bool @@ -306,7 +311,9 @@ func NewDevice(tun TUNDevice, logger *Logger) *Device { // start workers - for i := 0; i < runtime.NumCPU(); i += 1 { + cpus := runtime.NumCPU() + device.state.stopping.Add(DeviceRoutineNumberPerCPU * cpus) + for i := 0; i < cpus; i += 1 { go device.RoutineEncryption() go device.RoutineDecryption() go device.RoutineHandshake() @@ -360,6 +367,25 @@ func (device *Device) RemoveAllPeers() { device.peers.keyMap = make(map[NoisePublicKey]*Peer) } +func (device *Device) FlushPacketQueues() { + for { + select { + case elem, ok := <-device.queue.decryption: + if ok { + elem.Drop() + } + case elem, ok := <-device.queue.encryption: + if ok { + elem.Drop() + } + case <-device.queue.handshake: + default: + return + } + } + +} + func (device *Device) Close() { if device.isClosed.Swap(true) { return @@ -376,6 +402,9 @@ func (device *Device) Close() { device.signal.stop.Broadcast() + device.state.stopping.Wait() + device.FlushPacketQueues() + device.RemoveAllPeers() device.rate.limiter.Close() |