diff options
Diffstat (limited to 'src/send.go')
-rw-r--r-- | src/send.go | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/src/send.go b/src/send.go new file mode 100644 index 0000000..9790320 --- /dev/null +++ b/src/send.go @@ -0,0 +1,154 @@ +package main + +import ( + "net" + "sync" + "sync/atomic" +) + +/* Handles outbound flow + * + * 1. TUN queue + * 2. Routing + * 3. Per peer queuing + * 4. (work queuing) + * + */ + +type OutboundWorkQueueElement struct { + wg sync.WaitGroup + packet []byte + nonce uint64 + keyPair *KeyPair +} + +func (device *Device) SendPacket(packet []byte) { + + // lookup peer + + var peer *Peer + switch packet[0] >> 4 { + case IPv4version: + dst := packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len] + peer = device.routingTable.LookupIPv4(dst) + + case IPv6version: + dst := packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len] + peer = device.routingTable.LookupIPv6(dst) + + default: + device.logger.Println("unknown IP version") + return + } + + if peer == nil { + return + } + + // insert into peer queue + + for { + select { + case peer.queueOutboundRouting <- packet: + default: + select { + case <-peer.queueOutboundRouting: + default: + } + continue + } + break + } +} + +/* Go routine + * + * + * 1. waits for handshake. + * 2. assigns key pair & nonce + * 3. inserts to working queue + * + * TODO: avoid dynamic allocation of work queue elements + */ +func (peer *Peer) ConsumeOutboundPackets() { + for { + // wait for key pair + keyPair := func() *KeyPair { + peer.keyPairs.mutex.RLock() + defer peer.keyPairs.mutex.RUnlock() + return peer.keyPairs.current + }() + if keyPair == nil { + if len(peer.queueOutboundRouting) > 0 { + // TODO: start handshake + <-peer.keyPairs.newKeyPair + } + continue + } + + // assign packets key pair + for { + select { + case <-peer.keyPairs.newKeyPair: + default: + case <-peer.keyPairs.newKeyPair: + case packet := <-peer.queueOutboundRouting: + + // create new work element + + work := new(OutboundWorkQueueElement) + work.wg.Add(1) + work.keyPair = keyPair + work.packet = packet + work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 + + peer.queueOutbound <- work + + // drop packets until there is room + + for { + select { + case peer.device.queueWorkOutbound <- work: + break + default: + drop := <-peer.device.queueWorkOutbound + drop.packet = nil + drop.wg.Done() + } + } + } + } + } +} + +func (peer *Peer) RoutineSequential() { + for work := range peer.queueOutbound { + work.wg.Wait() + if work.packet == nil { + continue + } + } +} + +func (device *Device) EncryptionWorker() { + for { + work := <-device.queueWorkOutbound + + func() { + defer work.wg.Done() + + // pad packet + padding := device.mtu - len(work.packet) + if padding < 0 { + work.packet = nil + return + } + for n := 0; n < padding; n += 1 { + work.packet = append(work.packet, 0) // TODO: gotta be a faster way + } + + // + + }() + } +} |