diff options
Diffstat (limited to 'src/send.go')
-rw-r--r-- | src/send.go | 167 |
1 files changed, 63 insertions, 104 deletions
diff --git a/src/send.go b/src/send.go index c598ad4..e9dfb54 100644 --- a/src/send.go +++ b/src/send.go @@ -35,7 +35,7 @@ type QueueOutboundElement struct { dropped int32 mutex sync.Mutex buffer *[MaxMessageSize]byte // slice holding the packet data - packet []byte // slice of "data" (always!) + packet []byte // slice of "buffer" (always!) nonce uint64 // nonce for encryption keyPair *KeyPair // key-pair for encryption peer *Peer // related peer @@ -52,11 +52,6 @@ func (peer *Peer) FlushNonceQueue() { } } -var ( - ErrorNoEndpoint = errors.New("No known endpoint for peer") - ErrorNoConnection = errors.New("No UDP socket for device") -) - func (device *Device) NewOutboundElement() *QueueOutboundElement { return &QueueOutboundElement{ dropped: AtomicFalse, @@ -118,14 +113,13 @@ func (peer *Peer) SendBuffer(buffer []byte) (int, error) { defer peer.mutex.RUnlock() endpoint := peer.endpoint - conn := peer.device.net.conn - if endpoint == nil { - return 0, ErrorNoEndpoint + return 0, errors.New("No known endpoint for peer") } + conn := peer.device.net.conn if conn == nil { - return 0, ErrorNoConnection + return 0, errors.New("No UDP socket for device") } return conn.WriteToUDP(buffer, endpoint) @@ -189,16 +183,6 @@ func (device *Device) RoutineReadFromTUN() { continue } - // check if known endpoint (drop early) - - peer.mutex.RLock() - if peer.endpoint == nil { - peer.mutex.RUnlock() - logDebug.Println("No known endpoint for peer", peer.String()) - continue - } - peer.mutex.RUnlock() - // insert into nonce/pre-handshake queue signalSend(peer.signal.handshakeReset) @@ -211,86 +195,61 @@ func (device *Device) RoutineReadFromTUN() { * Then assigns nonces to packets sequentially * and creates "work" structs for workers * - * TODO: Avoid dynamic allocation of work queue elements - * * Obs. A single instance per peer */ func (peer *Peer) RoutineNonce() { var keyPair *KeyPair - var elem *QueueOutboundElement device := peer.device logDebug := device.log.Debug logDebug.Println("Routine, nonce worker, started for peer", peer.String()) - func() { - - for { - NextPacket: - - // wait for packet + for { + NextPacket: + select { + case <-peer.signal.stop: + return - if elem == nil { - select { - case elem = <-peer.queue.nonce: - case <-peer.signal.stop: - return - } - } + case elem := <-peer.queue.nonce: // wait for key pair for { - select { - case <-peer.signal.newKeyPair: - default: - } - keyPair = peer.keyPairs.Current() if keyPair != nil && keyPair.sendNonce < RejectAfterMessages { if time.Now().Sub(keyPair.created) < RejectAfterTime { break } } + signalSend(peer.signal.handshakeBegin) logDebug.Println("Awaiting key-pair for", peer.String()) select { case <-peer.signal.newKeyPair: - logDebug.Println("Key-pair negotiated for", peer.String()) - goto NextPacket - case <-peer.signal.flushNonceQueue: logDebug.Println("Clearing queue for", peer.String()) peer.FlushNonceQueue() - elem = nil goto NextPacket - case <-peer.signal.stop: return } } - // process current packet + // populate work element - if elem != nil { - - // create work element - - elem.keyPair = keyPair - elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 - elem.dropped = AtomicFalse - elem.peer = peer - elem.mutex.Lock() + elem.peer = peer + elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 + elem.keyPair = keyPair + elem.dropped = AtomicFalse + elem.mutex.Lock() - // add to parallel and sequential queue + // add to parallel and sequential queue - addToEncryptionQueue(device.queue.encryption, elem) - addToOutboundQueue(peer.queue.outbound, elem) - elem = nil - } + addToEncryptionQueue(device.queue.encryption, elem) + addToOutboundQueue(peer.queue.outbound, elem) } - }() + } } /* Encrypts the elements in the queue @@ -300,7 +259,6 @@ func (peer *Peer) RoutineNonce() { */ func (device *Device) RoutineEncryption() { - var elem *QueueOutboundElement var nonce [chacha20poly1305.NonceSize]byte logDebug := device.log.Debug @@ -311,62 +269,62 @@ func (device *Device) RoutineEncryption() { // fetch next element select { - case elem = <-device.queue.encryption: case <-device.signal.stop: logDebug.Println("Routine, encryption worker, stopped") return - } - // check if dropped + case elem := <-device.queue.encryption: - if elem.IsDropped() { - continue - } + // check if dropped + + if elem.IsDropped() { + continue + } - // populate header fields + // populate header fields - header := elem.buffer[:MessageTransportHeaderSize] + header := elem.buffer[:MessageTransportHeaderSize] - fieldType := header[0:4] - fieldReceiver := header[4:8] - fieldNonce := header[8:16] + fieldType := header[0:4] + fieldReceiver := header[4:8] + fieldNonce := header[8:16] - binary.LittleEndian.PutUint32(fieldType, MessageTransportType) - binary.LittleEndian.PutUint32(fieldReceiver, elem.keyPair.remoteIndex) - binary.LittleEndian.PutUint64(fieldNonce, elem.nonce) + binary.LittleEndian.PutUint32(fieldType, MessageTransportType) + binary.LittleEndian.PutUint32(fieldReceiver, elem.keyPair.remoteIndex) + binary.LittleEndian.PutUint64(fieldNonce, elem.nonce) - // pad content to MTU size + // pad content to multiple of 16 - mtu := int(atomic.LoadInt32(&device.tun.mtu)) - pad := len(elem.packet) % PaddingMultiple - if pad > 0 { - for i := 0; i < PaddingMultiple-pad && len(elem.packet) < mtu; i++ { - elem.packet = append(elem.packet, 0) + mtu := int(atomic.LoadInt32(&device.tun.mtu)) + rem := len(elem.packet) % PaddingMultiple + if rem > 0 { + for i := 0; i < PaddingMultiple-rem && len(elem.packet) < mtu; i++ { + elem.packet = append(elem.packet, 0) + } } - // TODO: How good is this code - } - // encrypt content (append to header) - - binary.LittleEndian.PutUint64(nonce[4:], elem.nonce) - elem.keyPair.send.mutex.RLock() - if elem.keyPair.send.aead == nil { - // very unlikely (the key was deleted during queuing) - elem.Drop() - } else { - elem.packet = elem.keyPair.send.aead.Seal( - header, - nonce[:], - elem.packet, - nil, - ) - } - elem.keyPair.send.mutex.RUnlock() - elem.mutex.Unlock() + // encrypt content (append to header) + + binary.LittleEndian.PutUint64(nonce[4:], elem.nonce) + elem.keyPair.send.mutex.RLock() + if elem.keyPair.send.aead == nil { + // very unlikely (the key was deleted during queuing) + elem.Drop() + } else { + elem.packet = elem.keyPair.send.aead.Seal( + header, + nonce[:], + elem.packet, + nil, + ) + } + elem.mutex.Unlock() + elem.keyPair.send.mutex.RUnlock() - // refresh key if necessary + // refresh key if necessary - elem.peer.KeepKeyFreshSending() + elem.peer.KeepKeyFreshSending() + } } } @@ -399,6 +357,7 @@ func (peer *Peer) RoutineSequentialSender() { _, err := peer.SendBuffer(elem.packet) device.PutMessageBuffer(elem.buffer) if err != nil { + logDebug.Println("Failed to send authenticated packet to peer", peer.String()) continue } atomic.AddUint64(&peer.stats.txBytes, length) |