From a4eff12d7f749c992247579161c4ce9e60e2df47 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Fri, 11 Aug 2017 16:18:20 +0200 Subject: Improved receive.go - Fixed configuration listen-port semantics - Improved receive.go code for updating listen port - Updated under load detection, how follows the kernel space implementation - Fixed trie bug accidentally introduced in last commit - Added interface name to log (format still subject to change) - Can now configure the logging level using the LOG_LEVEL variable - Begin porting netsh.sh tests - A number of smaller changes --- src/receive.go | 205 ++++++++++++++++++++++++--------------------------------- 1 file changed, 87 insertions(+), 118 deletions(-) (limited to 'src/receive.go') diff --git a/src/receive.go b/src/receive.go index 5f46925..c47d93c 100644 --- a/src/receive.go +++ b/src/receive.go @@ -72,43 +72,6 @@ func (device *Device) addToHandshakeQueue( } } -/* Routine determining the busy state of the interface - * - * TODO: Under load for some time - */ -func (device *Device) RoutineBusyMonitor() { - samples := 0 - interval := time.Second - for timer := time.NewTimer(interval); ; { - - select { - case <-device.signal.stop: - return - case <-timer.C: - } - - // compute busy heuristic - - if len(device.queue.handshake) > QueueHandshakeBusySize { - samples += 1 - } else if samples > 0 { - samples -= 1 - } - samples %= 30 - busy := samples > 5 - - // update busy state - - if busy { - atomic.StoreInt32(&device.underLoad, AtomicTrue) - } else { - atomic.StoreInt32(&device.underLoad, AtomicFalse) - } - - timer.Reset(interval) - } -} - func (device *Device) RoutineReceiveIncomming() { logDebug := device.log.Debug @@ -118,117 +81,121 @@ func (device *Device) RoutineReceiveIncomming() { // wait for new conn - var conn *net.UDPConn + logDebug.Println("Waiting for udp socket") select { + case <-device.signal.stop: + return + case <-device.signal.newUDPConn: + + // fetch connection + device.net.mutex.RLock() - conn = device.net.conn + conn := device.net.conn device.net.mutex.RUnlock() + if conn == nil { + continue + } - case <-device.signal.stop: - return - } - - if conn == nil { - continue - } + logDebug.Println("Listening for inbound packets") - // receive datagrams until closed + // receive datagrams until conn is closed - buffer := device.GetMessageBuffer() + buffer := device.GetMessageBuffer() - for { + for { - // read next datagram + // read next datagram - size, raddr, err := conn.ReadFromUDP(buffer[:]) // TODO: This is broken + size, raddr, err := conn.ReadFromUDP(buffer[:]) // Blocks sometimes - if err != nil { - break - } + if err != nil { + break + } - if size < MinMessageSize { - continue - } + if size < MinMessageSize { + continue + } - // check size of packet + // check size of packet - packet := buffer[:size] - msgType := binary.LittleEndian.Uint32(packet[:4]) + packet := buffer[:size] + msgType := binary.LittleEndian.Uint32(packet[:4]) - var okay bool + var okay bool - switch msgType { + switch msgType { - // check if transport + // check if transport - case MessageTransportType: + case MessageTransportType: - // check size + // check size - if len(packet) < MessageTransportType { - continue - } + if len(packet) < MessageTransportType { + continue + } - // lookup key pair + // lookup key pair - receiver := binary.LittleEndian.Uint32( - packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter], - ) - value := device.indices.Lookup(receiver) - keyPair := value.keyPair - if keyPair == nil { - continue - } + receiver := binary.LittleEndian.Uint32( + packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter], + ) + value := device.indices.Lookup(receiver) + keyPair := value.keyPair + if keyPair == nil { + continue + } - // check key-pair expiry + // check key-pair expiry - if keyPair.created.Add(RejectAfterTime).Before(time.Now()) { - continue - } + if keyPair.created.Add(RejectAfterTime).Before(time.Now()) { + continue + } - // create work element + // create work element - peer := value.peer - elem := &QueueInboundElement{ - packet: packet, - buffer: buffer, - keyPair: keyPair, - dropped: AtomicFalse, - } - elem.mutex.Lock() + peer := value.peer + elem := &QueueInboundElement{ + packet: packet, + buffer: buffer, + keyPair: keyPair, + dropped: AtomicFalse, + } + elem.mutex.Lock() - // add to decryption queues + // add to decryption queues - device.addToInboundQueue(device.queue.decryption, elem) - device.addToInboundQueue(peer.queue.inbound, elem) - buffer = nil - continue + device.addToInboundQueue(device.queue.decryption, elem) + device.addToInboundQueue(peer.queue.inbound, elem) + buffer = device.GetMessageBuffer() + continue - // otherwise it is a handshake related packet + // otherwise it is a handshake related packet - case MessageInitiationType: - okay = len(packet) == MessageInitiationSize + case MessageInitiationType: + okay = len(packet) == MessageInitiationSize - case MessageResponseType: - okay = len(packet) == MessageResponseSize + case MessageResponseType: + okay = len(packet) == MessageResponseSize - case MessageCookieReplyType: - okay = len(packet) == MessageCookieReplySize - } + case MessageCookieReplyType: + okay = len(packet) == MessageCookieReplySize + } - if okay { - device.addToHandshakeQueue( - device.queue.handshake, - QueueHandshakeElement{ - msgType: msgType, - buffer: buffer, - packet: packet, - source: raddr, - }, - ) - buffer = device.GetMessageBuffer() + if okay { + device.addToHandshakeQueue( + device.queue.handshake, + QueueHandshakeElement{ + msgType: msgType, + buffer: buffer, + packet: packet, + source: raddr, + }, + ) + buffer = device.GetMessageBuffer() + } } } } @@ -326,10 +293,11 @@ func (device *Device) RoutineHandshake() { return } - busy := atomic.LoadInt32(&device.underLoad) == AtomicTrue - - if busy { + if device.IsUnderLoad() { if !device.mac.CheckMAC2(elem.packet, elem.source) { + + // construct cookie reply + sender := binary.LittleEndian.Uint32(elem.packet[4:8]) // "sender" always follows "type" reply, err := device.CreateMessageCookieReply(elem.packet, sender, elem.source) if err != nil { @@ -347,6 +315,7 @@ func (device *Device) RoutineHandshake() { } continue } + if !device.ratelimiter.Allow(elem.source.IP) { continue } @@ -577,7 +546,7 @@ func (peer *Peer) RoutineSequentialReceiver() { // write to tun atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) - _, err := device.tun.Write(elem.packet) + _, err := device.tun.device.Write(elem.packet) device.PutMessageBuffer(elem.buffer) if err != nil { logError.Println("Failed to write packet to TUN device:", err) -- cgit v1.2.3