summaryrefslogtreecommitdiffhomepage
path: root/device.go
diff options
context:
space:
mode:
Diffstat (limited to 'device.go')
-rw-r--r--device.go31
1 files changed, 30 insertions, 1 deletions
diff --git a/device.go b/device.go
index 8b0d2a5..c714b21 100644
--- a/device.go
+++ b/device.go
@@ -13,6 +13,10 @@ import (
"time"
)
+const (
+ DeviceRoutineNumberPerCPU = 3
+)
+
type Device struct {
isUp AtomicBool // device is (going) up
isClosed AtomicBool // device is closed? (acting as guard)
@@ -21,6 +25,7 @@ type Device struct {
// synchronized resources (locks acquired in order)
state struct {
+ stopping sync.WaitGroup
mutex sync.Mutex
changing AtomicBool
current bool
@@ -306,7 +311,9 @@ func NewDevice(tun TUNDevice, logger *Logger) *Device {
// start workers
- for i := 0; i < runtime.NumCPU(); i += 1 {
+ cpus := runtime.NumCPU()
+ device.state.stopping.Add(DeviceRoutineNumberPerCPU * cpus)
+ for i := 0; i < cpus; i += 1 {
go device.RoutineEncryption()
go device.RoutineDecryption()
go device.RoutineHandshake()
@@ -360,6 +367,25 @@ func (device *Device) RemoveAllPeers() {
device.peers.keyMap = make(map[NoisePublicKey]*Peer)
}
+func (device *Device) FlushPacketQueues() {
+ for {
+ select {
+ case elem, ok := <-device.queue.decryption:
+ if ok {
+ elem.Drop()
+ }
+ case elem, ok := <-device.queue.encryption:
+ if ok {
+ elem.Drop()
+ }
+ case <-device.queue.handshake:
+ default:
+ return
+ }
+ }
+
+}
+
func (device *Device) Close() {
if device.isClosed.Swap(true) {
return
@@ -376,6 +402,9 @@ func (device *Device) Close() {
device.signal.stop.Broadcast()
+ device.state.stopping.Wait()
+ device.FlushPacketQueues()
+
device.RemoveAllPeers()
device.rate.limiter.Close()