From 2d856045a0dbfc15d38d738e2a9d159ba2a49a47 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Sat, 7 Oct 2017 22:35:23 +0200 Subject: Begin incorporating new src cache into receive --- src/receive.go | 53 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 17 deletions(-) (limited to 'src/receive.go') diff --git a/src/receive.go b/src/receive.go index 52c2718..60c0f2c 100644 --- a/src/receive.go +++ b/src/receive.go @@ -13,10 +13,10 @@ import ( ) type QueueHandshakeElement struct { - msgType uint32 - packet []byte - buffer *[MaxMessageSize]byte - source *net.UDPAddr + msgType uint32 + packet []byte + endpoint Endpoint + buffer *[MaxMessageSize]byte } type QueueInboundElement struct { @@ -92,11 +92,22 @@ func (device *Device) addToHandshakeQueue( } } -func (device *Device) RoutineReceiveIncomming() { +func (device *Device) RoutineReceiveIncomming(IPVersion int) { logDebug := device.log.Debug logDebug.Println("Routine, receive incomming, started") + var listener *Listener + + switch IPVersion { + case ipv4.Version: + listener = &device.net.ipv4 + case ipv6.Version: + listener = &device.net.ipv6 + default: + return + } + for { // wait for new conn @@ -107,14 +118,15 @@ func (device *Device) RoutineReceiveIncomming() { case <-device.signal.stop: return - case <-device.signal.newUDPConn: + case <-listener.update: - // fetch connection + // fetch new socket device.net.mutex.RLock() - conn := device.net.conn + sock := listener.sock + okay := listener.active device.net.mutex.RUnlock() - if conn == nil { + if !okay { continue } @@ -124,11 +136,20 @@ func (device *Device) RoutineReceiveIncomming() { buffer := device.GetMessageBuffer() + var size int + var err error + for { // read next datagram - size, raddr, err := conn.ReadFromUDP(buffer[:]) + var endpoint Endpoint + + if IPVersion == ipv6.Version { + size, err = endpoint.ReceiveIPv4(sock, buffer[:]) + } else { + size, err = endpoint.ReceiveIPv6(sock, buffer[:]) + } if err != nil { break @@ -192,7 +213,7 @@ func (device *Device) RoutineReceiveIncomming() { buffer = device.GetMessageBuffer() continue - // otherwise it is a handshake related packet + // otherwise it is a fixed size & handshake related packet case MessageInitiationType: okay = len(packet) == MessageInitiationSize @@ -208,10 +229,10 @@ func (device *Device) RoutineReceiveIncomming() { device.addToHandshakeQueue( device.queue.handshake, QueueHandshakeElement{ - msgType: msgType, - buffer: buffer, - packet: packet, - source: raddr, + msgType: msgType, + buffer: buffer, + packet: packet, + endpoint: endpoint, }, ) buffer = device.GetMessageBuffer() @@ -293,8 +314,6 @@ func (device *Device) RoutineHandshake() { // unmarshal packet - logDebug.Println("Process cookie reply from:", elem.source.String()) - var reply MessageCookieReply reader := bytes.NewReader(elem.packet) err := binary.Read(reader, binary.LittleEndian, &reply) -- cgit v1.2.3