diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2017-07-06 15:43:55 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2017-07-06 15:43:55 +0200 |
commit | 59f9316f51ce3cb470200b0cfe847116a0583d25 (patch) | |
tree | e9cfb69aa8b58d8b009167730713c4bde67d7cd4 /src/send.go | |
parent | 2aa0daf4d58ffc930fde611e7efe6ae3c9515130 (diff) |
Initial working full exchange
The implementation is now capable of connecting to another
wireguard instance, complete a handshake and exchange transport
messages.
Diffstat (limited to 'src/send.go')
-rw-r--r-- | src/send.go | 188 |
1 files changed, 87 insertions, 101 deletions
diff --git a/src/send.go b/src/send.go index 3fe4733..4053669 100644 --- a/src/send.go +++ b/src/send.go @@ -25,14 +25,19 @@ import ( * * The sequential consumers will attempt to take the lock, * workers release lock when they have completed work on the packet. + * + * If the element is inserted into the "encryption queue", + * the content is preceeded by enough "junk" to contain the header + * (to allow the constuction of transport messages in-place) */ type QueueOutboundElement struct { state uint32 mutex sync.Mutex - packet []byte - nonce uint64 - keyPair *KeyPair - peer *Peer + data [MaxMessageSize]byte + packet []byte // slice of packet (sending) + nonce uint64 // nonce for encryption + keyPair *KeyPair // key-pair for encryption + peer *Peer // related peer } func (peer *Peer) FlushNonceQueue() { @@ -46,75 +51,87 @@ func (peer *Peer) FlushNonceQueue() { } } -func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) { +func (device *Device) NewOutboundElement() *QueueOutboundElement { + elem := new(QueueOutboundElement) // TODO: profile, consider sync.Pool + return elem +} + +func (elem *QueueOutboundElement) Drop() { + atomic.StoreUint32(&elem.state, ElementStateDropped) +} + +func (elem *QueueOutboundElement) IsDropped() bool { + return atomic.LoadUint32(&elem.state) == ElementStateDropped +} + +func addToOutboundQueue( + queue chan *QueueOutboundElement, + element *QueueOutboundElement, +) { for { select { - case peer.queue.outbound <- elem: + case queue <- element: return default: select { - case <-peer.queue.outbound: + case old := <-queue: + old.Drop() default: } } } } -func (elem *QueueOutboundElement) Drop() { - atomic.StoreUint32(&elem.state, ElementStateDropped) -} - -func (elem *QueueOutboundElement) IsDropped() bool { - return atomic.LoadUint32(&elem.state) == ElementStateDropped -} - /* Reads packets from the TUN and inserts * into nonce queue for peer * * Obs. Single instance per TUN device */ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { - if tun.MTU() == 0 { - // Dummy + if tun == nil { + // dummy return } + elem := device.NewOutboundElement() + device.log.Debug.Println("Routine, TUN Reader: started") for { // read packet - packet := make([]byte, 1<<16) // TODO: Fix & avoid dynamic allocation - size, err := tun.Read(packet) + if elem == nil { + elem = device.NewOutboundElement() + } + + elem.packet = elem.data[MessageTransportHeaderSize:] + size, err := tun.Read(elem.packet) if err != nil { device.log.Error.Println("Failed to read packet from TUN device:", err) continue } - packet = packet[:size] - if len(packet) < IPv4headerSize { - device.log.Error.Println("Packet too short, length:", len(packet)) + elem.packet = elem.packet[:size] + if len(elem.packet) < IPv4headerSize { + device.log.Error.Println("Packet too short, length:", size) continue } // lookup peer var peer *Peer - switch packet[0] >> 4 { + switch elem.packet[0] >> 4 { case IPv4version: - dst := packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len] + dst := elem.packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len] peer = device.routingTable.LookupIPv4(dst) - device.log.Debug.Println("New IPv4 packet:", packet, dst) case IPv6version: - dst := packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len] + dst := elem.packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len] peer = device.routingTable.LookupIPv6(dst) - device.log.Debug.Println("New IPv6 packet:", packet, dst) default: device.log.Debug.Println("Receieved packet with unknown IP version") } if peer == nil { - device.log.Debug.Println("No peer configured for IP") continue } if peer.endpoint == nil { @@ -124,18 +141,9 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { // insert into nonce/pre-handshake queue - for { - select { - case peer.queue.nonce <- packet: - default: - select { - case <-peer.queue.nonce: - default: - } - continue - } - break - } + addToOutboundQueue(peer.queue.nonce, elem) + elem = nil + } } @@ -148,8 +156,8 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { * Obs. A single instance per peer */ func (peer *Peer) RoutineNonce() { - var packet []byte var keyPair *KeyPair + var elem *QueueOutboundElement device := peer.device logger := device.log.Debug @@ -163,9 +171,9 @@ func (peer *Peer) RoutineNonce() { // wait for packet - if packet == nil { + if elem == nil { select { - case packet = <-peer.queue.nonce: + case elem = <-peer.queue.nonce: case <-peer.signal.stop: return } @@ -198,7 +206,7 @@ func (peer *Peer) RoutineNonce() { case <-peer.signal.flushNonceQueue: logger.Println("Clearing queue for peer", peer.id) peer.FlushNonceQueue() - packet = nil + elem = nil goto NextPacket case <-peer.signal.stop: @@ -208,36 +216,20 @@ func (peer *Peer) RoutineNonce() { // process current packet - if packet != nil { + if elem != nil { // create work element - work := new(QueueOutboundElement) // TODO: profile, maybe use pool - work.keyPair = keyPair - work.packet = packet - work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 - work.peer = peer - work.mutex.Lock() - - packet = nil - - // drop packets until there is space - - func() { - for { - select { - case peer.device.queue.encryption <- work: - return - default: - select { - case elem := <-peer.device.queue.encryption: - elem.Drop() - default: - } - } - } - }() - peer.queue.outbound <- work + elem.keyPair = keyPair + elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 + elem.peer = peer + elem.mutex.Lock() + + // add to parallel processing and sequential consuming queue + + addToOutboundQueue(device.queue.encryption, elem) + addToOutboundQueue(peer.queue.outbound, elem) + elem = nil } } }() @@ -257,42 +249,38 @@ func (device *Device) RoutineEncryption() { continue } - // pad packet - - padding := device.mtu - len(work.packet) - MessageTransportSize - if padding < 0 { - work.Drop() - continue - } - - for n := 0; n < padding; n += 1 { - work.packet = append(work.packet, 0) - } - content := work.packet[MessageTransportHeaderSize:] - copy(content, work.packet) + // populate header fields - // prepare header + func() { + header := work.data[:MessageTransportHeaderSize] - binary.LittleEndian.PutUint32(work.packet[:4], MessageTransportType) - binary.LittleEndian.PutUint32(work.packet[4:8], work.keyPair.remoteIndex) - binary.LittleEndian.PutUint64(work.packet[8:16], work.nonce) + fieldType := header[0:4] + fieldReceiver := header[4:8] + fieldNonce := header[8:16] - device.log.Debug.Println(work.packet, work.nonce) + binary.LittleEndian.PutUint32(fieldType, MessageTransportType) + binary.LittleEndian.PutUint32(fieldReceiver, work.keyPair.remoteIndex) + binary.LittleEndian.PutUint64(fieldNonce, work.nonce) + }() // encrypt content - binary.LittleEndian.PutUint64(nonce[4:], work.nonce) - work.keyPair.send.Seal( - content[:0], - nonce[:], - content, - nil, - ) - work.mutex.Unlock() + func() { + binary.LittleEndian.PutUint64(nonce[4:], work.nonce) + work.packet = work.keyPair.send.Seal( + work.packet[:0], + nonce[:], + work.packet, + nil, + ) + work.mutex.Unlock() + }() - device.log.Debug.Println(work.packet, work.nonce) + // reslice to include header - // initiate new handshake + work.packet = work.data[:MessageTransportHeaderSize+len(work.packet)] + + // refresh key if necessary work.peer.KeepKeyFreshSending() } @@ -340,8 +328,6 @@ func (peer *Peer) RoutineSequentialSender() { return } - logger.Println(work.packet) - _, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint) if err != nil { return |