diff options
Diffstat (limited to 'src/receive.go')
-rw-r--r-- | src/receive.go | 404 |
1 files changed, 404 insertions, 0 deletions
diff --git a/src/receive.go b/src/receive.go new file mode 100644 index 0000000..ab28944 --- /dev/null +++ b/src/receive.go @@ -0,0 +1,404 @@ +package main + +import ( + "bytes" + "encoding/binary" + "golang.org/x/crypto/chacha20poly1305" + "net" + "sync" + "sync/atomic" + "time" +) + +const ( + ElementStateOkay = iota + ElementStateDropped +) + +type QueueHandshakeElement struct { + msgType uint32 + packet []byte + source *net.UDPAddr +} + +type QueueInboundElement struct { + state uint32 + mutex sync.Mutex + packet []byte + counter uint64 + keyPair *KeyPair +} + +func (elem *QueueInboundElement) Drop() { + atomic.StoreUint32(&elem.state, ElementStateDropped) + elem.mutex.Unlock() +} + +func (device *Device) RoutineReceiveIncomming() { + var packet []byte + + debugLog := device.log.Debug + debugLog.Println("Routine, receive incomming, started") + + errorLog := device.log.Error + + for { + + // check if stopped + + select { + case <-device.signal.stop: + return + default: + } + + // read next datagram + + if packet == nil { + packet = make([]byte, 1<<16) + } + + device.net.mutex.RLock() + conn := device.net.conn + device.net.mutex.RUnlock() + + conn.SetReadDeadline(time.Now().Add(time.Second)) + + size, raddr, err := conn.ReadFromUDP(packet) + if err != nil { + continue + } + if size < MinMessageSize { + continue + } + + // handle packet + + packet = packet[:size] + msgType := binary.LittleEndian.Uint32(packet[:4]) + + func() { + switch msgType { + + case MessageInitiationType, MessageResponseType: + + // verify mac1 + + if !device.mac.CheckMAC1(packet) { + debugLog.Println("Received packet with invalid mac1") + return + } + + // check if busy, TODO: refine definition of "busy" + + busy := len(device.queue.handshake) > QueueHandshakeBusySize + if busy && !device.mac.CheckMAC2(packet, raddr) { + sender := binary.LittleEndian.Uint32(packet[4:8]) // "sender" follows "type" + reply, err := device.CreateMessageCookieReply(packet, sender, raddr) + if err != nil { + errorLog.Println("Failed to create cookie reply:", err) + return + } + writer := bytes.NewBuffer(packet[:0]) + binary.Write(writer, binary.LittleEndian, reply) + packet = writer.Bytes() + _, err = device.net.conn.WriteToUDP(packet, raddr) + if err != nil { + debugLog.Println("Failed to send cookie reply:", err) + } + return + } + + // add to handshake queue + + device.queue.handshake <- QueueHandshakeElement{ + msgType: msgType, + packet: packet, + source: raddr, + } + + case MessageCookieReplyType: + + // verify and update peer cookie state + + if len(packet) != MessageCookieReplySize { + return + } + + var reply MessageCookieReply + reader := bytes.NewReader(packet) + err := binary.Read(reader, binary.LittleEndian, &reply) + if err != nil { + debugLog.Println("Failed to decode cookie reply") + return + } + device.ConsumeMessageCookieReply(&reply) + + case MessageTransportType: + + debugLog.Println("DEBUG: Got transport") + + // lookup key pair + + if len(packet) < MessageTransportSize { + return + } + + receiver := binary.LittleEndian.Uint32( + packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter], + ) + value := device.indices.Lookup(receiver) + keyPair := value.keyPair + if keyPair == nil { + return + } + + // check key-pair expiry + + if keyPair.created.Add(RejectAfterTime).Before(time.Now()) { + return + } + + // add to peer queue + + peer := value.peer + work := new(QueueInboundElement) + work.packet = packet + work.keyPair = keyPair + work.state = ElementStateOkay + work.mutex.Lock() + + // add to parallel decryption queue + + func() { + for { + select { + case device.queue.decryption <- work: + return + default: + select { + case elem := <-device.queue.decryption: + elem.Drop() + default: + } + } + } + }() + + // add to sequential inbound queue + + func() { + for { + select { + case peer.queue.inbound <- work: + break + default: + select { + case elem := <-peer.queue.inbound: + elem.Drop() + default: + } + } + } + }() + + default: + // unknown message type + } + }() + } +} + +func (device *Device) RoutineDecryption() { + var elem *QueueInboundElement + var nonce [chacha20poly1305.NonceSize]byte + + for { + select { + case elem = <-device.queue.decryption: + case <-device.signal.stop: + return + } + + // check if dropped + + state := atomic.LoadUint32(&elem.state) + if state != ElementStateOkay { + continue + } + + // split message into fields + + counter := binary.LittleEndian.Uint64( + elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent], + ) + content := elem.packet[MessageTransportOffsetContent:] + + // decrypt with key-pair + + var err error + binary.LittleEndian.PutUint64(nonce[4:], counter) + elem.packet, err = elem.keyPair.recv.Open(elem.packet[:0], nonce[:], content, nil) + if err != nil { + elem.Drop() + continue + } + + // release to consumer + + elem.counter = counter + elem.mutex.Unlock() + } +} + +/* Handles incomming packets related to handshake + * + * + */ +func (device *Device) RoutineHandshake() { + + logInfo := device.log.Info + logError := device.log.Error + logDebug := device.log.Debug + + var elem QueueHandshakeElement + + for { + select { + case elem = <-device.queue.handshake: + case <-device.signal.stop: + return + } + + func() { + + switch elem.msgType { + case MessageInitiationType: + + // unmarshal + + if len(elem.packet) != MessageInitiationSize { + return + } + + var msg MessageInitiation + reader := bytes.NewReader(elem.packet) + err := binary.Read(reader, binary.LittleEndian, &msg) + if err != nil { + logError.Println("Failed to decode initiation message") + return + } + + // consume initiation + + peer := device.ConsumeMessageInitiation(&msg) + if peer == nil { + logInfo.Println( + "Recieved invalid initiation message from", + elem.source.IP.String(), + elem.source.Port, + ) + return + } + logDebug.Println("Recieved valid initiation message for peer", peer.id) + + case MessageResponseType: + + // unmarshal + + if len(elem.packet) != MessageResponseSize { + return + } + + var msg MessageResponse + reader := bytes.NewReader(elem.packet) + err := binary.Read(reader, binary.LittleEndian, &msg) + if err != nil { + logError.Println("Failed to decode response message") + return + } + + // consume response + + peer := device.ConsumeMessageResponse(&msg) + if peer == nil { + logInfo.Println( + "Recieved invalid response message from", + elem.source.IP.String(), + elem.source.Port, + ) + return + } + sendSignal(peer.signal.handshakeCompleted) + logDebug.Println("Recieved valid response message for peer", peer.id) + peer.NewKeyPair() + peer.SendKeepAlive() + + default: + device.log.Error.Println("Invalid message type in handshake queue") + } + + }() + } +} + +func (peer *Peer) RoutineSequentialReceiver() { + var elem *QueueInboundElement + + device := peer.device + logDebug := device.log.Debug + + logDebug.Println("Routine, sequential receiver, started for peer", peer.id) + + for { + // wait for decryption + + select { + case <-peer.signal.stop: + return + case elem = <-peer.queue.inbound: + } + elem.mutex.Lock() + + // check if dropped + + logDebug.Println("MESSSAGE:", elem) + + state := atomic.LoadUint32(&elem.state) + if state != ElementStateOkay { + continue + } + + // check for replay + + // check for keep-alive + + if len(elem.packet) == 0 { + continue + } + + // insert into inbound TUN queue + + device.queue.inbound <- elem.packet + } + +} + +func (device *Device) RoutineWriteToTUN(tun TUNDevice) { + for { + var packet []byte + + select { + case <-device.signal.stop: + case packet = <-device.queue.inbound: + } + + device.log.Debug.Println("GOT:", packet) + + size, err := tun.Write(packet) + device.log.Debug.Println("DEBUG:", size, err) + if err != nil { + + } + } +} |