diff options
Diffstat (limited to 'device')
-rw-r--r-- | device/peer.go | 29 | ||||
-rw-r--r-- | device/queueconstants_android.go | 1 | ||||
-rw-r--r-- | device/queueconstants_default.go | 1 | ||||
-rw-r--r-- | device/queueconstants_ios.go | 1 | ||||
-rw-r--r-- | device/receive.go | 9 | ||||
-rw-r--r-- | device/send.go | 188 | ||||
-rw-r--r-- | device/timers.go | 2 | ||||
-rw-r--r-- | device/uapi.go | 8 |
8 files changed, 72 insertions, 167 deletions
diff --git a/device/peer.go b/device/peer.go index a103b5d..af2f57f 100644 --- a/device/peer.go +++ b/device/peer.go @@ -16,10 +16,6 @@ import ( "golang.zx2c4.com/wireguard/conn" ) -const ( - PeerRoutineNumber = 2 -) - type Peer struct { isRunning AtomicBool sync.RWMutex // Mostly protects endpoint, but is generally taken whenever we modify peer @@ -54,17 +50,11 @@ type Peer struct { sentLastMinuteHandshake AtomicBool } - signals struct { - newKeypairArrived chan struct{} - flushNonceQueue chan struct{} - } - queue struct { sync.RWMutex - nonce chan *QueueOutboundElement // nonce / pre-handshake queue - outbound chan *QueueOutboundElement // sequential ordering of work - inbound chan *QueueInboundElement // sequential ordering of work - packetInNonceQueueIsAwaitingKey AtomicBool + staged chan *QueueOutboundElement // staged packets before a handshake is available + outbound chan *QueueOutboundElement // sequential ordering of work + inbound chan *QueueInboundElement // sequential ordering of work } routines struct { @@ -197,25 +187,20 @@ func (peer *Peer) Start() { peer.routines.stopping.Wait() peer.routines.stop = make(chan struct{}) - peer.routines.stopping.Add(PeerRoutineNumber) + peer.routines.stopping.Add(1) // prepare queues peer.queue.Lock() - peer.queue.nonce = make(chan *QueueOutboundElement, QueueOutboundSize) + peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize) peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize) peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize) peer.queue.Unlock() peer.timersInit() peer.handshake.lastSentHandshake = time.Now().Add(-(RekeyTimeout + time.Second)) - peer.signals.newKeypairArrived = make(chan struct{}, 1) - peer.signals.flushNonceQueue = make(chan struct{}, 1) // wait for routines to start - // RoutineNonce writes to the encryption queue; keep it alive until we are done. - device.queue.encryption.wg.Add(1) - go peer.RoutineNonce() go peer.RoutineSequentialSender() go peer.RoutineSequentialReceiver() @@ -245,7 +230,7 @@ func (peer *Peer) ZeroAndFlushAll() { handshake.Clear() handshake.mutex.Unlock() - peer.FlushNonceQueue() + peer.FlushStagedPackets() } func (peer *Peer) ExpireCurrentKeypairs() { @@ -291,8 +276,8 @@ func (peer *Peer) Stop() { // close queues peer.queue.Lock() - close(peer.queue.nonce) close(peer.queue.inbound) + close(peer.queue.outbound) peer.queue.Unlock() peer.ZeroAndFlushAll() diff --git a/device/queueconstants_android.go b/device/queueconstants_android.go index f19c7be..f4de5c9 100644 --- a/device/queueconstants_android.go +++ b/device/queueconstants_android.go @@ -8,6 +8,7 @@ package device /* Reduce memory consumption for Android */ const ( + QueueStagedSize = 128 QueueOutboundSize = 1024 QueueInboundSize = 1024 QueueHandshakeSize = 1024 diff --git a/device/queueconstants_default.go b/device/queueconstants_default.go index 18f0bea..52a199d 100644 --- a/device/queueconstants_default.go +++ b/device/queueconstants_default.go @@ -8,6 +8,7 @@ package device const ( + QueueStagedSize = 128 QueueOutboundSize = 1024 QueueInboundSize = 1024 QueueHandshakeSize = 1024 diff --git a/device/queueconstants_ios.go b/device/queueconstants_ios.go index 4c83015..c6c0b16 100644 --- a/device/queueconstants_ios.go +++ b/device/queueconstants_ios.go @@ -10,6 +10,7 @@ package device /* Fit within memory limits for iOS's Network Extension API, which has stricter requirements */ const ( + QueueStagedSize = 128 QueueOutboundSize = 1024 QueueInboundSize = 1024 QueueHandshakeSize = 1024 diff --git a/device/receive.go b/device/receive.go index a8e55cc..e891fd0 100644 --- a/device/receive.go +++ b/device/receive.go @@ -427,10 +427,6 @@ func (device *Device) RoutineHandshake() { peer.timersSessionDerived() peer.timersHandshakeComplete() peer.SendKeepalive() - select { - case peer.signals.newKeypairArrived <- struct{}{}: - default: - } } } } @@ -485,10 +481,7 @@ func (peer *Peer) RoutineSequentialReceiver() { // check if using new keypair if peer.ReceivedWithKeypair(elem.keypair) { peer.timersHandshakeComplete() - select { - case peer.signals.newKeypairArrived <- struct{}{}: - default: - } + peer.SendStagedPackets() } peer.keepKeyFreshReceiving() diff --git a/device/send.go b/device/send.go index 2d9af78..8bec144 100644 --- a/device/send.go +++ b/device/send.go @@ -71,41 +71,26 @@ func (elem *QueueOutboundElement) clearPointers() { elem.peer = nil } -func addToNonceQueue(queue chan *QueueOutboundElement, elem *QueueOutboundElement, device *Device) { - for { - select { - case queue <- elem: - return - default: - select { - case old := <-queue: - device.PutMessageBuffer(old.buffer) - device.PutOutboundElement(old) - default: - } - } - } -} - /* Queues a keepalive if no packets are queued for peer */ -func (peer *Peer) SendKeepalive() bool { +func (peer *Peer) SendKeepalive() { + var elem *QueueOutboundElement peer.queue.RLock() - defer peer.queue.RUnlock() - if len(peer.queue.nonce) != 0 || peer.queue.packetInNonceQueueIsAwaitingKey.Get() || !peer.isRunning.Get() { - return false + if len(peer.queue.staged) != 0 || !peer.isRunning.Get() { + goto out } - elem := peer.device.NewOutboundElement() + elem = peer.device.NewOutboundElement() elem.packet = nil select { - case peer.queue.nonce <- elem: + case peer.queue.staged <- elem: peer.device.log.Verbosef("%v - Sending keepalive packet", peer) - return true default: peer.device.PutMessageBuffer(elem.buffer) peer.device.PutOutboundElement(elem) - return false } +out: + peer.queue.RUnlock() + peer.SendStagedPackets() } func (peer *Peer) SendHandshakeInitiation(isRetry bool) error { @@ -220,7 +205,7 @@ func (peer *Peer) keepKeyFreshSending() { } /* Reads packets from the TUN and inserts - * into nonce queue for peer + * into staged queue for peer * * Obs. Single instance per TUN device */ @@ -287,136 +272,53 @@ func (device *Device) RoutineReadFromTUN() { if peer == nil { continue } - - // insert into nonce/pre-handshake queue - - peer.queue.RLock() if peer.isRunning.Get() { - if peer.queue.packetInNonceQueueIsAwaitingKey.Get() { - peer.SendHandshakeInitiation(false) - } - addToNonceQueue(peer.queue.nonce, elem, device) + peer.StagePacket(elem) elem = nil + peer.SendStagedPackets() } - peer.queue.RUnlock() } } -func (peer *Peer) FlushNonceQueue() { - select { - case peer.signals.flushNonceQueue <- struct{}{}: - default: - } -} - -/* Queues packets when there is no handshake. - * Then assigns nonces to packets sequentially - * and creates "work" structs for workers - * - * Obs. A single instance per peer - */ -func (peer *Peer) RoutineNonce() { - var keypair *Keypair - device := peer.device - - flush := func() { - for { +func (peer *Peer) StagePacket(elem *QueueOutboundElement) { + for { + select { + case peer.queue.staged <- elem: + return + default: select { - case elem := <-peer.queue.nonce: - device.PutMessageBuffer(elem.buffer) - device.PutOutboundElement(elem) + case tooOld := <-peer.queue.staged: + peer.device.PutMessageBuffer(tooOld.buffer) + peer.device.PutOutboundElement(tooOld) default: - return } } } +} - defer func() { - flush() - device.log.Verbosef("%v - Routine: nonce worker - stopped", peer) - peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) - device.queue.encryption.wg.Done() // no more writes from us - close(peer.queue.outbound) // no more writes to this channel - peer.routines.stopping.Done() - }() +func (peer *Peer) SendStagedPackets() { +top: + if len(peer.queue.staged) == 0 || !peer.device.isUp.Get() { + return + } - device.log.Verbosef("%v - Routine: nonce worker - started", peer) + keypair := peer.keypairs.Current() + if keypair == nil || atomic.LoadUint64(&keypair.sendNonce) >= RejectAfterMessages || time.Since(keypair.created) >= RejectAfterTime { + peer.SendHandshakeInitiation(false) + return + } + peer.device.queue.encryption.wg.Add(1) + defer peer.device.queue.encryption.wg.Done() -NextPacket: for { - peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) - select { - case <-peer.routines.stop: - return - - case <-peer.signals.flushNonceQueue: - flush() - continue NextPacket - - case elem, ok := <-peer.queue.nonce: - - if !ok { - return - } - - // make sure to always pick the newest key - - for { - - // check validity of newest key pair - - keypair = peer.keypairs.Current() - if keypair != nil && atomic.LoadUint64(&keypair.sendNonce) < RejectAfterMessages { - if time.Since(keypair.created) < RejectAfterTime { - break - } - } - peer.queue.packetInNonceQueueIsAwaitingKey.Set(true) - - // no suitable key pair, request for new handshake - - select { - case <-peer.signals.newKeypairArrived: - default: - } - - peer.SendHandshakeInitiation(false) - - // wait for key to be established - - device.log.Verbosef("%v - Awaiting keypair", peer) - - select { - case <-peer.signals.newKeypairArrived: - device.log.Verbosef("%v - Obtained awaited keypair", peer) - - case <-peer.signals.flushNonceQueue: - device.PutMessageBuffer(elem.buffer) - device.PutOutboundElement(elem) - flush() - continue NextPacket - - case <-peer.routines.stop: - device.PutMessageBuffer(elem.buffer) - device.PutOutboundElement(elem) - return - } - } - peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) - - // populate work element - + case elem := <-peer.queue.staged: elem.peer = peer elem.nonce = atomic.AddUint64(&keypair.sendNonce, 1) - 1 - - // double check in case of race condition added by future code - if elem.nonce >= RejectAfterMessages { atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages) - device.PutMessageBuffer(elem.buffer) - device.PutOutboundElement(elem) - continue NextPacket + peer.StagePacket(elem) // XXX: Out of order, but we can't front-load go chans + goto top } elem.keypair = keypair @@ -424,7 +326,21 @@ NextPacket: // add to parallel and sequential queue peer.queue.outbound <- elem - device.queue.encryption.c <- elem + peer.device.queue.encryption.c <- elem + default: + return + } + } +} + +func (peer *Peer) FlushStagedPackets() { + for { + select { + case elem := <-peer.queue.staged: + peer.device.PutMessageBuffer(elem.buffer) + peer.device.PutOutboundElement(elem) + default: + return } } } diff --git a/device/timers.go b/device/timers.go index 25bef8c..0678f1e 100644 --- a/device/timers.go +++ b/device/timers.go @@ -87,7 +87,7 @@ func expiredRetransmitHandshake(peer *Peer) { /* We drop all packets without a keypair and don't try again, * if we try unsuccessfully for too long to make a handshake. */ - peer.FlushNonceQueue() + peer.FlushStagedPackets() /* We set a timer for destroying any residue that might be left * of a partial exchange. diff --git a/device/uapi.go b/device/uapi.go index cbfe25e..bfef877 100644 --- a/device/uapi.go +++ b/device/uapi.go @@ -156,6 +156,7 @@ func (device *Device) IpcSetOperation(r io.Reader) (err error) { if deviceConfig { deviceConfig = false } + peer.handlePostConfig() // Load/create the peer we are now configuring. err := device.handlePublicKeyLine(peer, value) if err != nil { @@ -174,6 +175,7 @@ func (device *Device) IpcSetOperation(r io.Reader) (err error) { return err } } + peer.handlePostConfig() if err := scanner.Err(); err != nil { return ipcErrorf(ipc.IpcErrorIO, "failed to read input: %w", err) @@ -241,6 +243,12 @@ type ipcSetPeer struct { created bool // new reports whether this is a newly created peer } +func (peer *ipcSetPeer) handlePostConfig() { + if peer.Peer != nil && !peer.dummy && peer.Peer.device.isUp.Get() { + peer.SendStagedPackets() + } +} + func (device *Device) handlePublicKeyLine(peer *ipcSetPeer, value string) error { // Load/create the peer we are configuring. var publicKey NoisePublicKey |