diff options
Diffstat (limited to 'send.go')
-rw-r--r-- | send.go | 134 |
1 files changed, 107 insertions, 27 deletions
@@ -6,6 +6,7 @@ package main import ( + "bytes" "encoding/binary" "golang.org/x/crypto/chacha20poly1305" "golang.org/x/net/ipv4" @@ -46,21 +47,10 @@ type QueueOutboundElement struct { buffer *[MaxMessageSize]byte // slice holding the packet data packet []byte // slice of "buffer" (always!) nonce uint64 // nonce for encryption - keyPair *KeyPair // key-pair for encryption + keyPair *Keypair // key-pair for encryption peer *Peer // related peer } -func (peer *Peer) flushNonceQueue() { - elems := len(peer.queue.nonce) - for i := 0; i < elems; i++ { - select { - case <-peer.queue.nonce: - default: - return - } - } -} - func (device *Device) NewOutboundElement() *QueueOutboundElement { return &QueueOutboundElement{ dropped: AtomicFalse, @@ -114,6 +104,73 @@ func addToEncryptionQueue( } } +/* Queues a keepalive if no packets are queued for peer + */ +func (peer *Peer) SendKeepalive() bool { + if len(peer.queue.nonce) != 0 || peer.queue.packetInNonceQueueIsAwaitingKey { + return false + } + elem := peer.device.NewOutboundElement() + elem.packet = nil + select { + case peer.queue.nonce <- elem: + peer.device.log.Debug.Println(peer, ": Sending keepalive packet") + return true + default: + return false + } +} + +/* Sends a new handshake initiation message to the peer (endpoint) + */ +func (peer *Peer) SendHandshakeInitiation(isRetry bool) error { + if !isRetry { + peer.timers.handshakeAttempts = 0 + } + + if time.Now().Sub(peer.timers.lastSentHandshake) < RekeyTimeout { + return nil + } + peer.timers.lastSentHandshake = time.Now() //TODO: locking for this variable? + + // create initiation message + + msg, err := peer.device.CreateMessageInitiation(peer) + if err != nil { + return err + } + + peer.device.log.Debug.Println(peer, ": Sending handshake initiation") + + // marshal handshake message + + var buff [MessageInitiationSize]byte + writer := bytes.NewBuffer(buff[:0]) + binary.Write(writer, binary.LittleEndian, msg) + packet := writer.Bytes() + peer.mac.AddMacs(packet) + + // send to endpoint + + peer.timersAnyAuthenticatedPacketTraversal() + peer.timersHandshakeInitiated() + return peer.SendBuffer(packet) +} + +/* Called when a new authenticated message has been send + * + */ +func (peer *Peer) keepKeyFreshSending() { + kp := peer.keyPairs.Current() + if kp == nil { + return + } + nonce := atomic.LoadUint64(&kp.sendNonce) + if nonce > RekeyAfterMessages || (kp.isInitiator && time.Now().Sub(kp.created) > RekeyAfterTime) { + peer.SendHandshakeInitiation(false) + } +} + /* Reads packets from the TUN and inserts * into nonce queue for peer * @@ -180,13 +237,22 @@ func (device *Device) RoutineReadFromTUN() { // insert into nonce/pre-handshake queue if peer.isRunning.Get() { - peer.event.handshakePushDeadline.Fire() + if peer.queue.packetInNonceQueueIsAwaitingKey { + peer.SendHandshakeInitiation(false) + } addToOutboundQueue(peer.queue.nonce, elem) elem = device.NewOutboundElement() } } } +func (peer *Peer) FlushNonceQueue() { + select { + case peer.signals.flushNonceQueue <- struct{}{}: + default: + } +} + /* Queues packets when there is no handshake. * Then assigns nonces to packets sequentially * and creates "work" structs for workers @@ -194,13 +260,14 @@ func (device *Device) RoutineReadFromTUN() { * Obs. A single instance per peer */ func (peer *Peer) RoutineNonce() { - var keyPair *KeyPair + var keyPair *Keypair device := peer.device logDebug := device.log.Debug defer func() { logDebug.Println(peer, ": Routine: nonce worker - stopped") + peer.queue.packetInNonceQueueIsAwaitingKey = false peer.routines.stopping.Done() }() @@ -209,8 +276,7 @@ func (peer *Peer) RoutineNonce() { for { NextPacket: - - peer.event.flushNonceQueue.Clear() + peer.queue.packetInNonceQueueIsAwaitingKey = false select { case <-peer.routines.stop: @@ -225,34 +291,48 @@ func (peer *Peer) RoutineNonce() { // wait for key pair for { - - peer.event.newKeyPair.Clear() - keyPair = peer.keyPairs.Current() if keyPair != nil && keyPair.sendNonce < RejectAfterMessages { if time.Now().Sub(keyPair.created) < RejectAfterTime { break } } + peer.queue.packetInNonceQueueIsAwaitingKey = true - peer.event.handshakeBegin.Fire() + select { + case <-peer.signals.newKeypairArrived: + default: + } + + peer.SendHandshakeInitiation(false) logDebug.Println(peer, ": Awaiting key-pair") select { - case <-peer.event.newKeyPair.C: + case <-peer.signals.newKeypairArrived: logDebug.Println(peer, ": Obtained awaited key-pair") - case <-peer.event.flushNonceQueue.C: - goto NextPacket + case <-peer.signals.flushNonceQueue: + for { + select { + case <-peer.queue.nonce: + default: + goto NextPacket + } + } case <-peer.routines.stop: return } } + peer.queue.packetInNonceQueueIsAwaitingKey = false // populate work element elem.peer = peer elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 + // double check in case of race condition added by future code + if elem.nonce >= RejectAfterMessages { + goto NextPacket + } elem.keyPair = keyPair elem.dropped = AtomicFalse elem.mutex.Lock() @@ -288,7 +368,7 @@ func (device *Device) RoutineEncryption() { // fetch next element select { - case <-device.signal.stop.Wait(): + case <-device.signals.stop: return case elem, ok := <-device.queue.encryption: @@ -389,11 +469,11 @@ func (peer *Peer) RoutineSequentialSender() { // update timers - peer.event.anyAuthenticatedPacketTraversal.Fire() + peer.timersAnyAuthenticatedPacketTraversal() if len(elem.packet) != MessageKeepaliveSize { - peer.event.dataSent.Fire() + peer.timersDataSent() } - peer.KeepKeyFreshSending() + peer.keepKeyFreshSending() } } } |