package main import ( "encoding/binary" "golang.org/x/crypto/chacha20poly1305" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" "net" "sync" "sync/atomic" "time" ) /* Outbound flow * * 1. TUN queue * 2. Routing (sequential) * 3. Nonce assignment (sequential) * 4. Encryption (parallel) * 5. Transmission (sequential) * * The functions in this file occur (roughly) in the order in * which the packets are processed. * * Locking, Producers and Consumers * * The order of packets (per peer) must be maintained, * but encryption of packets happen out-of-order: * * The sequential consumers will attempt to take the lock, * workers release lock when they have completed work (encryption) on the packet. * * If the element is inserted into the "encryption queue", * the content is preceded by enough "junk" to contain the transport header * (to allow the construction of transport messages in-place) */ type QueueOutboundElement struct { dropped int32 mutex sync.Mutex buffer *[MaxMessageSize]byte // slice holding the packet data packet []byte // slice of "buffer" (always!) nonce uint64 // nonce for encryption keyPair *KeyPair // key-pair for encryption peer *Peer // related peer } func (peer *Peer) FlushNonceQueue() { elems := len(peer.queue.nonce) for i := 0; i < elems; i++ { select { case <-peer.queue.nonce: default: return } } } func (device *Device) NewOutboundElement() *QueueOutboundElement { return &QueueOutboundElement{ dropped: AtomicFalse, buffer: device.pool.messageBuffers.Get().(*[MaxMessageSize]byte), } } func (elem *QueueOutboundElement) Drop() { atomic.StoreInt32(&elem.dropped, AtomicTrue) } func (elem *QueueOutboundElement) IsDropped() bool { return atomic.LoadInt32(&elem.dropped) == AtomicTrue } func addToOutboundQueue( queue chan *QueueOutboundElement, element *QueueOutboundElement, ) { for { select { case queue <- element: return default: select { case old := <-queue: old.Drop() default: } } } } func addToEncryptionQueue( queue chan *QueueOutboundElement, element *QueueOutboundElement, ) { for { select { case queue <- element: return default: select { case old := <-queue: // drop & release to potential consumer old.Drop() old.mutex.Unlock() default: } } } } /* Reads packets from the TUN and inserts * into nonce queue for peer * * Obs. Single instance per TUN device */ func (device *Device) RoutineReadFromTUN() { elem := device.NewOutboundElement() logDebug := device.log.Debug logError := device.log.Error defer func() { logDebug.Println("Routine: TUN reader - stopped") }() logDebug.Println("Routine: TUN reader - started") for { // read packet offset := MessageTransportHeaderSize size, err := device.tun.device.Read(elem.buffer[:], offset) if err != nil { logError.Println("Failed to read packet from TUN device:", err) device.Close() return } if size == 0 || size > MaxContentSize { continue } elem.packet = elem.buffer[offset : offset+size] // lookup peer var peer *Peer switch elem.packet[0] >> 4 { case ipv4.Version: if len(elem.packet) < ipv4.HeaderLen { continue } dst := elem.packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len] peer = device.routing.table.LookupIPv4(dst) case ipv6.Version: if len(elem.packet) < ipv6.HeaderLen { continue } dst := elem.packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len] peer = device.routing.table.LookupIPv6(dst) default: logDebug.Println("Received packet with unknown IP version") } if peer == nil { continue } // insert into nonce/pre-handshake queue if peer.isRunning.Get() { peer.timer.handshakeDeadline.Reset(RekeyAttemptTime) addToOutboundQueue(peer.queue.nonce, elem) elem = device.NewOutboundElement() } } } /* Queues packets when there is no handshake. * Then assigns nonces to packets sequentially * and creates "work" structs for workers * * Obs. A single instance per peer */ func (peer *Peer) RoutineNonce() { var keyPair *KeyPair device := peer.device logDebug := device.log.Debug defer func() { peer.routines.stopping.Done() logDebug.Println(peer.String() + ": Routine: nonce worker - stopped") }() peer.routines.starting.Done() logDebug.Println(peer.String() + ": Routine: nonce worker - started") for { NextPacket: select { case <-peer.routines.stop.Wait(): return case elem, ok := <-peer.queue.nonce: if !ok { return } // wait for key pair for { keyPair = peer.keyPairs.Current() if keyPair != nil && keyPair.sendNonce < RejectAfterMessages { if time.Now().Sub(keyPair.created) < RejectAfterTime { break } } peer.signal.handshakeBegin.Send() logDebug.Println(peer.String() + ": Awaiting key-pair") select { case <-peer.signal.newKeyPair.Wait(): logDebug.Println(peer.String() + ": Obtained awaited key-pair") case <-peer.signal.flushNonceQueue.Wait(): logDebug.Println(peer.String() + ": Flushing nonce queue") peer.FlushNonceQueue() goto NextPacket case <-peer.routines.stop.Wait(): return } } // populate work element elem.peer = peer elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 elem.keyPair = keyPair elem.dropped = AtomicFalse elem.mutex.Lock() // add to parallel and sequential queue addToEncryptionQueue(device.queue.encryption, elem) addToOutboundQueue(peer.queue.outbound, elem) } } } /* Encrypts the elements in the queue * and marks them for sequential consumption (by releasing the mutex) * * Obs. One instance per core */ func (device *Device) RoutineEncryption() { var nonce [chacha20poly1305.NonceSize]byte logDebug := device.log.Debug defer func() { for { select { case elem, ok := <-device.queue.encryption: if ok { elem.Drop() } default: break } } logDebug.Println("Routine: encryption worker - stopped") }() logDebug.Println("Routine: encryption worker - started") for { // fetch next element select { case <-device.signal.stop.Wait(): return case elem, ok := <-device.queue.encryption: if !ok { return } // check if dropped if elem.IsDropped() { continue } // populate header fields header := elem.buffer[:MessageTransportHeaderSize] fieldType := header[0:4] fieldReceiver := header[4:8] fieldNonce := header[8:16] binary.LittleEndian.PutUint32(fieldType, MessageTransportType) binary.LittleEndian.PutUint32(fieldReceiver, elem.keyPair.remoteIndex) binary.LittleEndian.PutUint64(fieldNonce, elem.nonce) // pad content to multiple of 16 mtu := int(atomic.LoadInt32(&device.tun.mtu)) rem := len(elem.packet) % PaddingMultiple if rem > 0 { for i := 0; i < PaddingMultiple-rem && len(elem.packet) < mtu; i++ { elem.packet = append(elem.packet, 0) } } // encrypt content and release to consumer binary.LittleEndian.PutUint64(nonce[4:], elem.nonce) elem.packet = elem.keyPair.send.Seal( header, nonce[:], elem.packet, nil, ) elem.mutex.Unlock() } } } /* Sequentially reads packets from queue and sends to endpoint * * Obs. Single instance per peer. * The routine terminates then the outbound queue is closed. */ func (peer *Peer) RoutineSequentialSender() { device := peer.device logDebug := device.log.Debug defer func() { peer.routines.stopping.Done() logDebug.Println(peer.String() + ": Routine: sequential sender - stopped") }() logDebug.Println(peer.String() + ": Routine: sequential sender - started") peer.routines.starting.Done() for { select { case <-peer.routines.stop.Wait(): return case elem, ok := <-peer.queue.outbound: if !ok { return } elem.mutex.Lock() if elem.IsDropped() { continue } // send message and return buffer to pool length := uint64(len(elem.packet)) err := peer.SendBuffer(elem.packet) device.PutMessageBuffer(elem.buffer) if err != nil { logDebug.Println("Failed to send authenticated packet to peer", peer.String()) continue } atomic.AddUint64(&peer.stats.txBytes, length) // update timers peer.TimerAnyAuthenticatedPacketTraversal() if len(elem.packet) != MessageKeepaliveSize { peer.TimerDataSent() } peer.KeepKeyFreshSending() } } }