summaryrefslogtreecommitdiffhomepage
path: root/src/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/send.go')
-rw-r--r--src/send.go215
1 files changed, 140 insertions, 75 deletions
diff --git a/src/send.go b/src/send.go
index f58d311..4ff75db 100644
--- a/src/send.go
+++ b/src/send.go
@@ -5,107 +5,159 @@ import (
"golang.org/x/crypto/chacha20poly1305"
"net"
"sync"
- "time"
)
/* Handles outbound flow
*
* 1. TUN queue
- * 2. Routing
- * 3. Per peer queuing
- * 4. (work queuing)
+ * 2. Routing (sequential)
+ * 3. Nonce assignment (sequential)
+ * 4. Encryption (parallel)
+ * 5. Transmission (sequential)
*
+ * The order of packets (per peer) is maintained.
+ * The functions in this file occure (roughly) in the order packets are processed.
*/
-type OutboundWorkQueueElement struct {
- wg sync.WaitGroup
+/* A work unit
+ *
+ * The sequential consumers will attempt to take the lock,
+ * workers release lock when they have completed work on the packet.
+ */
+type QueueOutboundElement struct {
+ mutex sync.Mutex
packet []byte
nonce uint64
keyPair *KeyPair
}
-func (peer *Peer) HandshakeWorker(handshakeQueue []byte) {
-
+func (peer *Peer) FlushNonceQueue() {
+ elems := len(peer.queue.nonce)
+ for i := 0; i < elems; i += 1 {
+ select {
+ case <-peer.queue.nonce:
+ default:
+ return
+ }
+ }
}
-func (device *Device) SendPacket(packet []byte) {
+func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) {
+ for {
+ select {
+ case peer.queue.outbound <- elem:
+ default:
+ select {
+ case <-peer.queue.outbound:
+ default:
+ }
+ }
+ }
+}
- // lookup peer
+/* Reads packets from the TUN and inserts
+ * into nonce queue for peer
+ *
+ * Obs. Single instance per TUN device
+ */
+func (device *Device) RoutineReadFromTUN(tun TUNDevice) {
+ for {
+ // read packet
- var peer *Peer
- switch packet[0] >> 4 {
- case IPv4version:
- dst := packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len]
- peer = device.routingTable.LookupIPv4(dst)
+ packet := make([]byte, 1<<16) // TODO: Fix & avoid dynamic allocation
+ size, err := tun.Read(packet)
+ if err != nil {
+ device.log.Error.Println("Failed to read packet from TUN device:", err)
+ continue
+ }
+ packet = packet[:size]
+ if len(packet) < IPv4headerSize {
+ device.log.Error.Println("Packet too short, length:", len(packet))
+ continue
+ }
- case IPv6version:
- dst := packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len]
- peer = device.routingTable.LookupIPv6(dst)
+ device.log.Debug.Println("New packet on TUN:", packet) // TODO: Slow debugging, remove.
- default:
- device.log.Debug.Println("receieved packet with unknown IP version")
- return
- }
+ // lookup peer
- if peer == nil {
- return
- }
+ var peer *Peer
+ switch packet[0] >> 4 {
+ case IPv4version:
+ dst := packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len]
+ peer = device.routingTable.LookupIPv4(dst)
- // insert into peer queue
+ case IPv6version:
+ dst := packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len]
+ peer = device.routingTable.LookupIPv6(dst)
- for {
- select {
- case peer.queueOutboundRouting <- packet:
default:
+ device.log.Debug.Println("Receieved packet with unknown IP version")
+ return
+ }
+
+ if peer == nil {
+ device.log.Debug.Println("No peer configured for IP")
+ return
+ }
+
+ // insert into nonce/pre-handshake queue
+
+ for {
select {
- case <-peer.queueOutboundRouting:
+ case peer.queue.nonce <- packet:
default:
+ select {
+ case <-peer.queue.nonce:
+ default:
+ }
+ continue
}
- continue
+ break
}
- break
}
}
-/* Go routine
+/* Queues packets when there is no handshake.
+ * Then assigns nonces to packets sequentially
+ * and creates "work" structs for workers
*
+ * TODO: Avoid dynamic allocation of work queue elements
*
- * 1. waits for handshake.
- * 2. assigns key pair & nonce
- * 3. inserts to working queue
- *
- * TODO: avoid dynamic allocation of work queue elements
+ * Obs. A single instance per peer
*/
-func (peer *Peer) RoutineOutboundNonceWorker() {
+func (peer *Peer) RoutineNonce() {
var packet []byte
var keyPair *KeyPair
- var flushTimer time.Timer
for {
// wait for packet
if packet == nil {
- packet = <-peer.queueOutboundRouting
+ select {
+ case packet = <-peer.queue.nonce:
+ case <-peer.signal.stopSending:
+ close(peer.queue.outbound)
+ return
+ }
}
// wait for key pair
for keyPair == nil {
- flushTimer.Reset(time.Second * 10)
- // TODO: Handshake or NOP
+ peer.signal.newHandshake <- true
select {
case <-peer.keyPairs.newKeyPair:
keyPair = peer.keyPairs.Current()
continue
- case <-flushTimer.C:
- size := len(peer.queueOutboundRouting)
- for i := 0; i < size; i += 1 {
- <-peer.queueOutboundRouting
- }
+ case <-peer.signal.flushNonceQueue:
+ peer.FlushNonceQueue()
packet = nil
+ continue
+ case <-peer.signal.stopSending:
+ close(peer.queue.outbound)
+ return
}
- break
}
// process current packet
@@ -114,14 +166,13 @@ func (peer *Peer) RoutineOutboundNonceWorker() {
// create work element
- work := new(OutboundWorkQueueElement)
- work.wg.Add(1)
+ work := new(QueueOutboundElement) // TODO: profile, maybe use pool
work.keyPair = keyPair
work.packet = packet
work.nonce = keyPair.sendNonce
+ work.mutex.Lock()
packet = nil
- peer.queueOutbound <- work
keyPair.sendNonce += 1
// drop packets until there is space
@@ -129,46 +180,36 @@ func (peer *Peer) RoutineOutboundNonceWorker() {
func() {
for {
select {
- case peer.device.queueWorkOutbound <- work:
+ case peer.device.queue.encryption <- work:
return
default:
- drop := <-peer.device.queueWorkOutbound
+ drop := <-peer.device.queue.encryption
drop.packet = nil
- drop.wg.Done()
+ drop.mutex.Unlock()
}
}
}()
+ peer.queue.outbound <- work
}
}
}
-/* Go routine
- *
- * sequentially reads packets from queue and sends to endpoint
+/* Encrypts the elements in the queue
+ * and marks them for sequential consumption (by releasing the mutex)
*
+ * Obs. One instance per core
*/
-func (peer *Peer) RoutineSequential() {
- for work := range peer.queueOutbound {
- work.wg.Wait()
- if work.packet == nil {
- continue
- }
- if peer.endpoint == nil {
- continue
- }
- peer.device.conn.WriteToUDP(work.packet, peer.endpoint)
- }
-}
-
-func (device *Device) RoutineEncryptionWorker() {
+func (device *Device) RoutineEncryption() {
var nonce [chacha20poly1305.NonceSize]byte
- for work := range device.queueWorkOutbound {
+ for work := range device.queue.encryption {
+
// pad packet
padding := device.mtu - len(work.packet)
if padding < 0 {
+ // drop
work.packet = nil
- work.wg.Done()
+ work.mutex.Unlock()
}
for n := 0; n < padding; n += 1 {
work.packet = append(work.packet, 0)
@@ -183,6 +224,30 @@ func (device *Device) RoutineEncryptionWorker() {
work.packet,
nil,
)
- work.wg.Done()
+ work.mutex.Unlock()
+ }
+}
+
+/* Sequentially reads packets from queue and sends to endpoint
+ *
+ * Obs. Single instance per peer.
+ * The routine terminates then the outbound queue is closed.
+ */
+func (peer *Peer) RoutineSequential() {
+ for work := range peer.queue.outbound {
+ work.mutex.Lock()
+ func() {
+ peer.mutex.RLock()
+ defer peer.mutex.RUnlock()
+ if work.packet == nil {
+ return
+ }
+ if peer.endpoint == nil {
+ return
+ }
+ peer.device.conn.WriteToUDP(work.packet, peer.endpoint)
+ peer.timer.sendKeepalive.Reset(peer.persistentKeepaliveInterval)
+ }()
+ work.mutex.Unlock()
}
}