From 1b092ce584cbee0f86f3e25b5498870c8ca96652 Mon Sep 17 00:00:00 2001 From: "Jason A. Donenfeld" Date: Wed, 27 Jan 2021 18:13:53 +0100 Subject: device: get rid of nonce routine This moves to a simple queue with no routine processing it, to reduce scheduler pressure. This splits latency in half! benchmark old ns/op new ns/op delta BenchmarkThroughput-16 2394 2364 -1.25% BenchmarkLatency-16 259652 120810 -53.47% Signed-off-by: Jason A. Donenfeld --- device/send.go | 188 ++++++++++++++++----------------------------------------- 1 file changed, 52 insertions(+), 136 deletions(-) (limited to 'device/send.go') diff --git a/device/send.go b/device/send.go index 2d9af78..8bec144 100644 --- a/device/send.go +++ b/device/send.go @@ -71,41 +71,26 @@ func (elem *QueueOutboundElement) clearPointers() { elem.peer = nil } -func addToNonceQueue(queue chan *QueueOutboundElement, elem *QueueOutboundElement, device *Device) { - for { - select { - case queue <- elem: - return - default: - select { - case old := <-queue: - device.PutMessageBuffer(old.buffer) - device.PutOutboundElement(old) - default: - } - } - } -} - /* Queues a keepalive if no packets are queued for peer */ -func (peer *Peer) SendKeepalive() bool { +func (peer *Peer) SendKeepalive() { + var elem *QueueOutboundElement peer.queue.RLock() - defer peer.queue.RUnlock() - if len(peer.queue.nonce) != 0 || peer.queue.packetInNonceQueueIsAwaitingKey.Get() || !peer.isRunning.Get() { - return false + if len(peer.queue.staged) != 0 || !peer.isRunning.Get() { + goto out } - elem := peer.device.NewOutboundElement() + elem = peer.device.NewOutboundElement() elem.packet = nil select { - case peer.queue.nonce <- elem: + case peer.queue.staged <- elem: peer.device.log.Verbosef("%v - Sending keepalive packet", peer) - return true default: peer.device.PutMessageBuffer(elem.buffer) peer.device.PutOutboundElement(elem) - return false } +out: + peer.queue.RUnlock() + peer.SendStagedPackets() } func (peer *Peer) SendHandshakeInitiation(isRetry bool) error { @@ -220,7 +205,7 @@ func (peer *Peer) keepKeyFreshSending() { } /* Reads packets from the TUN and inserts - * into nonce queue for peer + * into staged queue for peer * * Obs. Single instance per TUN device */ @@ -287,136 +272,53 @@ func (device *Device) RoutineReadFromTUN() { if peer == nil { continue } - - // insert into nonce/pre-handshake queue - - peer.queue.RLock() if peer.isRunning.Get() { - if peer.queue.packetInNonceQueueIsAwaitingKey.Get() { - peer.SendHandshakeInitiation(false) - } - addToNonceQueue(peer.queue.nonce, elem, device) + peer.StagePacket(elem) elem = nil + peer.SendStagedPackets() } - peer.queue.RUnlock() } } -func (peer *Peer) FlushNonceQueue() { - select { - case peer.signals.flushNonceQueue <- struct{}{}: - default: - } -} - -/* Queues packets when there is no handshake. - * Then assigns nonces to packets sequentially - * and creates "work" structs for workers - * - * Obs. A single instance per peer - */ -func (peer *Peer) RoutineNonce() { - var keypair *Keypair - device := peer.device - - flush := func() { - for { +func (peer *Peer) StagePacket(elem *QueueOutboundElement) { + for { + select { + case peer.queue.staged <- elem: + return + default: select { - case elem := <-peer.queue.nonce: - device.PutMessageBuffer(elem.buffer) - device.PutOutboundElement(elem) + case tooOld := <-peer.queue.staged: + peer.device.PutMessageBuffer(tooOld.buffer) + peer.device.PutOutboundElement(tooOld) default: - return } } } +} - defer func() { - flush() - device.log.Verbosef("%v - Routine: nonce worker - stopped", peer) - peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) - device.queue.encryption.wg.Done() // no more writes from us - close(peer.queue.outbound) // no more writes to this channel - peer.routines.stopping.Done() - }() +func (peer *Peer) SendStagedPackets() { +top: + if len(peer.queue.staged) == 0 || !peer.device.isUp.Get() { + return + } - device.log.Verbosef("%v - Routine: nonce worker - started", peer) + keypair := peer.keypairs.Current() + if keypair == nil || atomic.LoadUint64(&keypair.sendNonce) >= RejectAfterMessages || time.Since(keypair.created) >= RejectAfterTime { + peer.SendHandshakeInitiation(false) + return + } + peer.device.queue.encryption.wg.Add(1) + defer peer.device.queue.encryption.wg.Done() -NextPacket: for { - peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) - select { - case <-peer.routines.stop: - return - - case <-peer.signals.flushNonceQueue: - flush() - continue NextPacket - - case elem, ok := <-peer.queue.nonce: - - if !ok { - return - } - - // make sure to always pick the newest key - - for { - - // check validity of newest key pair - - keypair = peer.keypairs.Current() - if keypair != nil && atomic.LoadUint64(&keypair.sendNonce) < RejectAfterMessages { - if time.Since(keypair.created) < RejectAfterTime { - break - } - } - peer.queue.packetInNonceQueueIsAwaitingKey.Set(true) - - // no suitable key pair, request for new handshake - - select { - case <-peer.signals.newKeypairArrived: - default: - } - - peer.SendHandshakeInitiation(false) - - // wait for key to be established - - device.log.Verbosef("%v - Awaiting keypair", peer) - - select { - case <-peer.signals.newKeypairArrived: - device.log.Verbosef("%v - Obtained awaited keypair", peer) - - case <-peer.signals.flushNonceQueue: - device.PutMessageBuffer(elem.buffer) - device.PutOutboundElement(elem) - flush() - continue NextPacket - - case <-peer.routines.stop: - device.PutMessageBuffer(elem.buffer) - device.PutOutboundElement(elem) - return - } - } - peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) - - // populate work element - + case elem := <-peer.queue.staged: elem.peer = peer elem.nonce = atomic.AddUint64(&keypair.sendNonce, 1) - 1 - - // double check in case of race condition added by future code - if elem.nonce >= RejectAfterMessages { atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages) - device.PutMessageBuffer(elem.buffer) - device.PutOutboundElement(elem) - continue NextPacket + peer.StagePacket(elem) // XXX: Out of order, but we can't front-load go chans + goto top } elem.keypair = keypair @@ -424,7 +326,21 @@ NextPacket: // add to parallel and sequential queue peer.queue.outbound <- elem - device.queue.encryption.c <- elem + peer.device.queue.encryption.c <- elem + default: + return + } + } +} + +func (peer *Peer) FlushStagedPackets() { + for { + select { + case elem := <-peer.queue.staged: + peer.device.PutMessageBuffer(elem.buffer) + peer.device.PutOutboundElement(elem) + default: + return } } } -- cgit v1.2.3