diff options
Diffstat (limited to 'src/send.go')
-rw-r--r-- | src/send.go | 215 |
1 files changed, 140 insertions, 75 deletions
diff --git a/src/send.go b/src/send.go index f58d311..4ff75db 100644 --- a/src/send.go +++ b/src/send.go @@ -5,107 +5,159 @@ import ( "golang.org/x/crypto/chacha20poly1305" "net" "sync" - "time" ) /* Handles outbound flow * * 1. TUN queue - * 2. Routing - * 3. Per peer queuing - * 4. (work queuing) + * 2. Routing (sequential) + * 3. Nonce assignment (sequential) + * 4. Encryption (parallel) + * 5. Transmission (sequential) * + * The order of packets (per peer) is maintained. + * The functions in this file occure (roughly) in the order packets are processed. */ -type OutboundWorkQueueElement struct { - wg sync.WaitGroup +/* A work unit + * + * The sequential consumers will attempt to take the lock, + * workers release lock when they have completed work on the packet. + */ +type QueueOutboundElement struct { + mutex sync.Mutex packet []byte nonce uint64 keyPair *KeyPair } -func (peer *Peer) HandshakeWorker(handshakeQueue []byte) { - +func (peer *Peer) FlushNonceQueue() { + elems := len(peer.queue.nonce) + for i := 0; i < elems; i += 1 { + select { + case <-peer.queue.nonce: + default: + return + } + } } -func (device *Device) SendPacket(packet []byte) { +func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) { + for { + select { + case peer.queue.outbound <- elem: + default: + select { + case <-peer.queue.outbound: + default: + } + } + } +} - // lookup peer +/* Reads packets from the TUN and inserts + * into nonce queue for peer + * + * Obs. Single instance per TUN device + */ +func (device *Device) RoutineReadFromTUN(tun TUNDevice) { + for { + // read packet - var peer *Peer - switch packet[0] >> 4 { - case IPv4version: - dst := packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len] - peer = device.routingTable.LookupIPv4(dst) + packet := make([]byte, 1<<16) // TODO: Fix & avoid dynamic allocation + size, err := tun.Read(packet) + if err != nil { + device.log.Error.Println("Failed to read packet from TUN device:", err) + continue + } + packet = packet[:size] + if len(packet) < IPv4headerSize { + device.log.Error.Println("Packet too short, length:", len(packet)) + continue + } - case IPv6version: - dst := packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len] - peer = device.routingTable.LookupIPv6(dst) + device.log.Debug.Println("New packet on TUN:", packet) // TODO: Slow debugging, remove. - default: - device.log.Debug.Println("receieved packet with unknown IP version") - return - } + // lookup peer - if peer == nil { - return - } + var peer *Peer + switch packet[0] >> 4 { + case IPv4version: + dst := packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len] + peer = device.routingTable.LookupIPv4(dst) - // insert into peer queue + case IPv6version: + dst := packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len] + peer = device.routingTable.LookupIPv6(dst) - for { - select { - case peer.queueOutboundRouting <- packet: default: + device.log.Debug.Println("Receieved packet with unknown IP version") + return + } + + if peer == nil { + device.log.Debug.Println("No peer configured for IP") + return + } + + // insert into nonce/pre-handshake queue + + for { select { - case <-peer.queueOutboundRouting: + case peer.queue.nonce <- packet: default: + select { + case <-peer.queue.nonce: + default: + } + continue } - continue + break } - break } } -/* Go routine +/* Queues packets when there is no handshake. + * Then assigns nonces to packets sequentially + * and creates "work" structs for workers * + * TODO: Avoid dynamic allocation of work queue elements * - * 1. waits for handshake. - * 2. assigns key pair & nonce - * 3. inserts to working queue - * - * TODO: avoid dynamic allocation of work queue elements + * Obs. A single instance per peer */ -func (peer *Peer) RoutineOutboundNonceWorker() { +func (peer *Peer) RoutineNonce() { var packet []byte var keyPair *KeyPair - var flushTimer time.Timer for { // wait for packet if packet == nil { - packet = <-peer.queueOutboundRouting + select { + case packet = <-peer.queue.nonce: + case <-peer.signal.stopSending: + close(peer.queue.outbound) + return + } } // wait for key pair for keyPair == nil { - flushTimer.Reset(time.Second * 10) - // TODO: Handshake or NOP + peer.signal.newHandshake <- true select { case <-peer.keyPairs.newKeyPair: keyPair = peer.keyPairs.Current() continue - case <-flushTimer.C: - size := len(peer.queueOutboundRouting) - for i := 0; i < size; i += 1 { - <-peer.queueOutboundRouting - } + case <-peer.signal.flushNonceQueue: + peer.FlushNonceQueue() packet = nil + continue + case <-peer.signal.stopSending: + close(peer.queue.outbound) + return } - break } // process current packet @@ -114,14 +166,13 @@ func (peer *Peer) RoutineOutboundNonceWorker() { // create work element - work := new(OutboundWorkQueueElement) - work.wg.Add(1) + work := new(QueueOutboundElement) // TODO: profile, maybe use pool work.keyPair = keyPair work.packet = packet work.nonce = keyPair.sendNonce + work.mutex.Lock() packet = nil - peer.queueOutbound <- work keyPair.sendNonce += 1 // drop packets until there is space @@ -129,46 +180,36 @@ func (peer *Peer) RoutineOutboundNonceWorker() { func() { for { select { - case peer.device.queueWorkOutbound <- work: + case peer.device.queue.encryption <- work: return default: - drop := <-peer.device.queueWorkOutbound + drop := <-peer.device.queue.encryption drop.packet = nil - drop.wg.Done() + drop.mutex.Unlock() } } }() + peer.queue.outbound <- work } } } -/* Go routine - * - * sequentially reads packets from queue and sends to endpoint +/* Encrypts the elements in the queue + * and marks them for sequential consumption (by releasing the mutex) * + * Obs. One instance per core */ -func (peer *Peer) RoutineSequential() { - for work := range peer.queueOutbound { - work.wg.Wait() - if work.packet == nil { - continue - } - if peer.endpoint == nil { - continue - } - peer.device.conn.WriteToUDP(work.packet, peer.endpoint) - } -} - -func (device *Device) RoutineEncryptionWorker() { +func (device *Device) RoutineEncryption() { var nonce [chacha20poly1305.NonceSize]byte - for work := range device.queueWorkOutbound { + for work := range device.queue.encryption { + // pad packet padding := device.mtu - len(work.packet) if padding < 0 { + // drop work.packet = nil - work.wg.Done() + work.mutex.Unlock() } for n := 0; n < padding; n += 1 { work.packet = append(work.packet, 0) @@ -183,6 +224,30 @@ func (device *Device) RoutineEncryptionWorker() { work.packet, nil, ) - work.wg.Done() + work.mutex.Unlock() + } +} + +/* Sequentially reads packets from queue and sends to endpoint + * + * Obs. Single instance per peer. + * The routine terminates then the outbound queue is closed. + */ +func (peer *Peer) RoutineSequential() { + for work := range peer.queue.outbound { + work.mutex.Lock() + func() { + peer.mutex.RLock() + defer peer.mutex.RUnlock() + if work.packet == nil { + return + } + if peer.endpoint == nil { + return + } + peer.device.conn.WriteToUDP(work.packet, peer.endpoint) + peer.timer.sendKeepalive.Reset(peer.persistentKeepaliveInterval) + }() + work.mutex.Unlock() } } |