summaryrefslogtreecommitdiffhomepage
path: root/device/device.go
diff options
context:
space:
mode:
authorJason A. Donenfeld <Jason@zx2c4.com>2021-01-29 18:24:45 +0100
committerJason A. Donenfeld <Jason@zx2c4.com>2021-01-29 18:24:45 +0100
commitbeb25cc4fd31da09590fed3200628baf4c701f8b (patch)
tree15cefc4f60d4fc8085903f115b232245b92e2bc0 /device/device.go
parent9263014ed3f0a97800c893cb7346cc5109fc9e27 (diff)
device: use new model queues for handshakes
Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
Diffstat (limited to 'device/device.go')
-rw-r--r--device/device.go55
1 files changed, 28 insertions, 27 deletions
diff --git a/device/device.go b/device/device.go
index 08db244..fd88855 100644
--- a/device/device.go
+++ b/device/device.go
@@ -13,6 +13,7 @@ import (
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
+
"golang.zx2c4.com/wireguard/conn"
"golang.zx2c4.com/wireguard/ratelimiter"
"golang.zx2c4.com/wireguard/rwcancel"
@@ -77,11 +78,7 @@ type Device struct {
queue struct {
encryption *outboundQueue
decryption *inboundQueue
- handshake chan QueueHandshakeElement
- }
-
- signals struct {
- stop chan struct{}
+ handshake *handshakeQueue
}
tun struct {
@@ -90,6 +87,7 @@ type Device struct {
}
ipcMutex sync.RWMutex
+ closed chan struct{}
}
// An outboundQueue is a channel of QueueOutboundElements awaiting encryption.
@@ -135,6 +133,24 @@ func newInboundQueue() *inboundQueue {
return q
}
+// A handshakeQueue is similar to an outboundQueue; see those docs.
+type handshakeQueue struct {
+ c chan QueueHandshakeElement
+ wg sync.WaitGroup
+}
+
+func newHandshakeQueue() *handshakeQueue {
+ q := &handshakeQueue{
+ c: make(chan QueueHandshakeElement, QueueHandshakeSize),
+ }
+ q.wg.Add(1)
+ go func() {
+ q.wg.Wait()
+ close(q.c)
+ }()
+ return q
+}
+
/* Converts the peer into a "zombie", which remains in the peer map,
* but processes no packets and does not exists in the routing table.
*
@@ -233,7 +249,7 @@ func (device *Device) IsUnderLoad() bool {
// check if currently under load
now := time.Now()
- underLoad := len(device.queue.handshake) >= UnderLoadQueueSize
+ underLoad := len(device.queue.handshake.c) >= UnderLoadQueueSize
if underLoad {
device.rate.underLoadUntil.Store(now.Add(UnderLoadAfterTime))
return true
@@ -302,6 +318,7 @@ func (device *Device) SetPrivateKey(sk NoisePrivateKey) error {
func NewDevice(tunDevice tun.Device, logger *Logger) *Device {
device := new(Device)
+ device.closed = make(chan struct{})
device.log = logger
device.tun.device = tunDevice
mtu, err := device.tun.device.MTU()
@@ -322,14 +339,10 @@ func NewDevice(tunDevice tun.Device, logger *Logger) *Device {
// create queues
- device.queue.handshake = make(chan QueueHandshakeElement, QueueHandshakeSize)
+ device.queue.handshake = newHandshakeQueue()
device.queue.encryption = newOutboundQueue()
device.queue.decryption = newInboundQueue()
- // prepare signals
-
- device.signals.stop = make(chan struct{})
-
// prepare net
device.net.port = 0
@@ -382,18 +395,6 @@ func (device *Device) RemoveAllPeers() {
device.peers.keyMap = make(map[NoisePublicKey]*Peer)
}
-func (device *Device) FlushPacketQueues() {
- for {
- select {
- case elem := <-device.queue.handshake:
- device.PutMessageBuffer(elem.buffer)
- default:
- return
- }
- }
-
-}
-
func (device *Device) Close() {
if device.isClosed.Swap(true) {
return
@@ -414,21 +415,20 @@ func (device *Device) Close() {
// No new peers are coming; we are done with these queues.
device.queue.encryption.wg.Done()
device.queue.decryption.wg.Done()
- close(device.signals.stop)
+ device.queue.handshake.wg.Done()
device.state.stopping.Wait()
device.RemoveAllPeers()
- device.FlushPacketQueues()
-
device.rate.limiter.Close()
device.state.changing.Set(false)
device.log.Verbosef("Interface closed")
+ close(device.closed)
}
func (device *Device) Wait() chan struct{} {
- return device.signals.stop
+ return device.closed
}
func (device *Device) SendKeepalivesToPeersWithCurrentKeypair() {
@@ -561,6 +561,7 @@ func (device *Device) BindUpdate() error {
device.net.stopping.Add(2)
device.queue.decryption.wg.Add(2) // each RoutineReceiveIncoming goroutine writes to device.queue.decryption
+ device.queue.handshake.wg.Add(2) // each RoutineReceiveIncoming goroutine writes to device.queue.handshake
go device.RoutineReceiveIncoming(ipv4.Version, netc.bind)
go device.RoutineReceiveIncoming(ipv6.Version, netc.bind)