summaryrefslogtreecommitdiffhomepage
path: root/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'send.go')
-rw-r--r--send.go134
1 files changed, 107 insertions, 27 deletions
diff --git a/send.go b/send.go
index ddebb99..1b35e27 100644
--- a/send.go
+++ b/send.go
@@ -6,6 +6,7 @@
package main
import (
+ "bytes"
"encoding/binary"
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/net/ipv4"
@@ -46,21 +47,10 @@ type QueueOutboundElement struct {
buffer *[MaxMessageSize]byte // slice holding the packet data
packet []byte // slice of "buffer" (always!)
nonce uint64 // nonce for encryption
- keyPair *KeyPair // key-pair for encryption
+ keyPair *Keypair // key-pair for encryption
peer *Peer // related peer
}
-func (peer *Peer) flushNonceQueue() {
- elems := len(peer.queue.nonce)
- for i := 0; i < elems; i++ {
- select {
- case <-peer.queue.nonce:
- default:
- return
- }
- }
-}
-
func (device *Device) NewOutboundElement() *QueueOutboundElement {
return &QueueOutboundElement{
dropped: AtomicFalse,
@@ -114,6 +104,73 @@ func addToEncryptionQueue(
}
}
+/* Queues a keepalive if no packets are queued for peer
+ */
+func (peer *Peer) SendKeepalive() bool {
+ if len(peer.queue.nonce) != 0 || peer.queue.packetInNonceQueueIsAwaitingKey {
+ return false
+ }
+ elem := peer.device.NewOutboundElement()
+ elem.packet = nil
+ select {
+ case peer.queue.nonce <- elem:
+ peer.device.log.Debug.Println(peer, ": Sending keepalive packet")
+ return true
+ default:
+ return false
+ }
+}
+
+/* Sends a new handshake initiation message to the peer (endpoint)
+ */
+func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
+ if !isRetry {
+ peer.timers.handshakeAttempts = 0
+ }
+
+ if time.Now().Sub(peer.timers.lastSentHandshake) < RekeyTimeout {
+ return nil
+ }
+ peer.timers.lastSentHandshake = time.Now() //TODO: locking for this variable?
+
+ // create initiation message
+
+ msg, err := peer.device.CreateMessageInitiation(peer)
+ if err != nil {
+ return err
+ }
+
+ peer.device.log.Debug.Println(peer, ": Sending handshake initiation")
+
+ // marshal handshake message
+
+ var buff [MessageInitiationSize]byte
+ writer := bytes.NewBuffer(buff[:0])
+ binary.Write(writer, binary.LittleEndian, msg)
+ packet := writer.Bytes()
+ peer.mac.AddMacs(packet)
+
+ // send to endpoint
+
+ peer.timersAnyAuthenticatedPacketTraversal()
+ peer.timersHandshakeInitiated()
+ return peer.SendBuffer(packet)
+}
+
+/* Called when a new authenticated message has been send
+ *
+ */
+func (peer *Peer) keepKeyFreshSending() {
+ kp := peer.keyPairs.Current()
+ if kp == nil {
+ return
+ }
+ nonce := atomic.LoadUint64(&kp.sendNonce)
+ if nonce > RekeyAfterMessages || (kp.isInitiator && time.Now().Sub(kp.created) > RekeyAfterTime) {
+ peer.SendHandshakeInitiation(false)
+ }
+}
+
/* Reads packets from the TUN and inserts
* into nonce queue for peer
*
@@ -180,13 +237,22 @@ func (device *Device) RoutineReadFromTUN() {
// insert into nonce/pre-handshake queue
if peer.isRunning.Get() {
- peer.event.handshakePushDeadline.Fire()
+ if peer.queue.packetInNonceQueueIsAwaitingKey {
+ peer.SendHandshakeInitiation(false)
+ }
addToOutboundQueue(peer.queue.nonce, elem)
elem = device.NewOutboundElement()
}
}
}
+func (peer *Peer) FlushNonceQueue() {
+ select {
+ case peer.signals.flushNonceQueue <- struct{}{}:
+ default:
+ }
+}
+
/* Queues packets when there is no handshake.
* Then assigns nonces to packets sequentially
* and creates "work" structs for workers
@@ -194,13 +260,14 @@ func (device *Device) RoutineReadFromTUN() {
* Obs. A single instance per peer
*/
func (peer *Peer) RoutineNonce() {
- var keyPair *KeyPair
+ var keyPair *Keypair
device := peer.device
logDebug := device.log.Debug
defer func() {
logDebug.Println(peer, ": Routine: nonce worker - stopped")
+ peer.queue.packetInNonceQueueIsAwaitingKey = false
peer.routines.stopping.Done()
}()
@@ -209,8 +276,7 @@ func (peer *Peer) RoutineNonce() {
for {
NextPacket:
-
- peer.event.flushNonceQueue.Clear()
+ peer.queue.packetInNonceQueueIsAwaitingKey = false
select {
case <-peer.routines.stop:
@@ -225,34 +291,48 @@ func (peer *Peer) RoutineNonce() {
// wait for key pair
for {
-
- peer.event.newKeyPair.Clear()
-
keyPair = peer.keyPairs.Current()
if keyPair != nil && keyPair.sendNonce < RejectAfterMessages {
if time.Now().Sub(keyPair.created) < RejectAfterTime {
break
}
}
+ peer.queue.packetInNonceQueueIsAwaitingKey = true
- peer.event.handshakeBegin.Fire()
+ select {
+ case <-peer.signals.newKeypairArrived:
+ default:
+ }
+
+ peer.SendHandshakeInitiation(false)
logDebug.Println(peer, ": Awaiting key-pair")
select {
- case <-peer.event.newKeyPair.C:
+ case <-peer.signals.newKeypairArrived:
logDebug.Println(peer, ": Obtained awaited key-pair")
- case <-peer.event.flushNonceQueue.C:
- goto NextPacket
+ case <-peer.signals.flushNonceQueue:
+ for {
+ select {
+ case <-peer.queue.nonce:
+ default:
+ goto NextPacket
+ }
+ }
case <-peer.routines.stop:
return
}
}
+ peer.queue.packetInNonceQueueIsAwaitingKey = false
// populate work element
elem.peer = peer
elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1
+ // double check in case of race condition added by future code
+ if elem.nonce >= RejectAfterMessages {
+ goto NextPacket
+ }
elem.keyPair = keyPair
elem.dropped = AtomicFalse
elem.mutex.Lock()
@@ -288,7 +368,7 @@ func (device *Device) RoutineEncryption() {
// fetch next element
select {
- case <-device.signal.stop.Wait():
+ case <-device.signals.stop:
return
case elem, ok := <-device.queue.encryption:
@@ -389,11 +469,11 @@ func (peer *Peer) RoutineSequentialSender() {
// update timers
- peer.event.anyAuthenticatedPacketTraversal.Fire()
+ peer.timersAnyAuthenticatedPacketTraversal()
if len(elem.packet) != MessageKeepaliveSize {
- peer.event.dataSent.Fire()
+ peer.timersDataSent()
}
- peer.KeepKeyFreshSending()
+ peer.keepKeyFreshSending()
}
}
}