diff options
Diffstat (limited to 'device/device.go')
-rw-r--r-- | device/device.go | 39 |
1 files changed, 32 insertions, 7 deletions
diff --git a/device/device.go b/device/device.go index 9e2d001..d9367e5 100644 --- a/device/device.go +++ b/device/device.go @@ -74,7 +74,7 @@ type Device struct { } queue struct { - encryption chan *QueueOutboundElement + encryption *encryptionQueue decryption chan *QueueInboundElement handshake chan QueueHandshakeElement } @@ -89,6 +89,31 @@ type Device struct { } } +// An encryptionQueue is a channel of QueueOutboundElements awaiting encryption. +// An encryptionQueue is ref-counted using its wg field. +// An encryptionQueue created with newEncryptionQueue has one reference. +// Every additional writer must call wg.Add(1). +// Every completed writer must call wg.Done(). +// When no further writers will be added, +// call wg.Done to remove the initial reference. +// When the refcount hits 0, the queue's channel is closed. +type encryptionQueue struct { + c chan *QueueOutboundElement + wg sync.WaitGroup +} + +func newEncryptionQueue() *encryptionQueue { + q := &encryptionQueue{ + c: make(chan *QueueOutboundElement, QueueOutboundSize), + } + q.wg.Add(1) + go func() { + q.wg.Wait() + close(q.c) + }() + return q +} + /* Converts the peer into a "zombie", which remains in the peer map, * but processes no packets and does not exists in the routing table. * @@ -280,7 +305,7 @@ func NewDevice(tunDevice tun.Device, logger *Logger) *Device { // create queues device.queue.handshake = make(chan QueueHandshakeElement, QueueHandshakeSize) - device.queue.encryption = make(chan *QueueOutboundElement, QueueOutboundSize) + device.queue.encryption = newEncryptionQueue() device.queue.decryption = make(chan *QueueInboundElement, QueueInboundSize) // prepare signals @@ -297,7 +322,7 @@ func NewDevice(tunDevice tun.Device, logger *Logger) *Device { cpus := runtime.NumCPU() device.state.stopping.Wait() for i := 0; i < cpus; i += 1 { - device.state.stopping.Add(3) + device.state.stopping.Add(2) // decryption and handshake go device.RoutineEncryption() go device.RoutineDecryption() go device.RoutineHandshake() @@ -346,10 +371,6 @@ func (device *Device) FlushPacketQueues() { if ok { elem.Drop() } - case elem, ok := <-device.queue.encryption: - if ok { - elem.Drop() - } case <-device.queue.handshake: default: return @@ -373,6 +394,10 @@ func (device *Device) Close() { device.isUp.Set(false) + // We kept a reference to the encryption queue, + // in case we started any new peers that might write to it. + // No new peers are coming; we are done with the encryption queue. + device.queue.encryption.wg.Done() close(device.signals.stop) device.state.stopping.Wait() |