diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2017-07-14 14:25:18 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2017-07-14 14:25:18 +0200 |
commit | 8993b3927cf66517e2884b181d6b71d4c6599b7a (patch) | |
tree | a19a98791a7b4e800ad5cf815a7e008d317949f9 /src/receive.go | |
parent | 0043008ad06bbeda4b037f2959f2c9cf1b55d65b (diff) |
Improved throughput
- Improved performance by adding the message buffers to a sync.Pool.
- Fixed issue with computing "next" key-pair upon
receiving a response message.
Diffstat (limited to 'src/receive.go')
-rw-r--r-- | src/receive.go | 57 |
1 files changed, 35 insertions, 22 deletions
diff --git a/src/receive.go b/src/receive.go index f2bf70b..31f74e2 100644 --- a/src/receive.go +++ b/src/receive.go @@ -15,12 +15,14 @@ import ( type QueueHandshakeElement struct { msgType uint32 packet []byte + buffer *[MaxMessageSize]byte source *net.UDPAddr } type QueueInboundElement struct { dropped int32 mutex sync.Mutex + buffer *[MaxMessageSize]byte packet []byte counter uint64 keyPair *KeyPair @@ -34,7 +36,7 @@ func (elem *QueueInboundElement) IsDropped() bool { return atomic.LoadInt32(&elem.dropped) == AtomicTrue } -func addToInboundQueue( +func (device *Device) addToInboundQueue( queue chan *QueueInboundElement, element *QueueInboundElement, ) { @@ -52,7 +54,7 @@ func addToInboundQueue( } } -func addToHandshakeQueue( +func (device *Device) addToHandshakeQueue( queue chan QueueHandshakeElement, element QueueHandshakeElement, ) { @@ -62,7 +64,8 @@ func addToHandshakeQueue( return default: select { - case <-queue: + case elem := <-queue: + device.PutMessageBuffer(elem.buffer) default: } } @@ -70,9 +73,6 @@ func addToHandshakeQueue( } /* Routine determining the busy state of the interface - * - * TODO: prehaps nicer to do this in response to events - * TODO: more well reasoned definition of "busy" */ func (device *Device) RoutineBusyMonitor() { samples := 0 @@ -109,10 +109,11 @@ func (device *Device) RoutineBusyMonitor() { func (device *Device) RoutineReceiveIncomming() { + logInfo := device.log.Info logDebug := device.log.Debug logDebug.Println("Routine, receive incomming, started") - var buffer []byte + var buffer *[MaxMessageSize]byte for { @@ -127,7 +128,7 @@ func (device *Device) RoutineReceiveIncomming() { // read next datagram if buffer == nil { - buffer = make([]byte, MaxMessageSize) + buffer = device.GetMessageBuffer() } device.net.mutex.RLock() @@ -140,7 +141,7 @@ func (device *Device) RoutineReceiveIncomming() { conn.SetReadDeadline(time.Now().Add(time.Second)) - size, raddr, err := conn.ReadFromUDP(buffer) + size, raddr, err := conn.ReadFromUDP(buffer[:]) if err != nil || size < MinMessageSize { continue } @@ -157,10 +158,11 @@ func (device *Device) RoutineReceiveIncomming() { // add to handshake queue - addToHandshakeQueue( + device.addToHandshakeQueue( device.queue.handshake, QueueHandshakeElement{ msgType: msgType, + buffer: buffer, packet: packet, source: raddr, }, @@ -210,21 +212,22 @@ func (device *Device) RoutineReceiveIncomming() { // add to peer queue peer := value.peer - work := new(QueueInboundElement) - work.packet = packet - work.keyPair = keyPair - work.dropped = AtomicFalse + work := &QueueInboundElement{ + packet: packet, + buffer: buffer, + keyPair: keyPair, + dropped: AtomicFalse, + } work.mutex.Lock() // add to decryption queues - addToInboundQueue(device.queue.decryption, work) - addToInboundQueue(peer.queue.inbound, work) + device.addToInboundQueue(device.queue.decryption, work) + device.addToInboundQueue(peer.queue.inbound, work) buffer = nil default: - // unknown message type - logDebug.Println("Got unknown message from:", raddr) + logInfo.Println("Got unknown message from:", raddr) } }() } @@ -261,7 +264,12 @@ func (device *Device) RoutineDecryption() { var err error copy(nonce[4:], counter) elem.counter = binary.LittleEndian.Uint64(counter) - elem.packet, err = elem.keyPair.receive.Open(elem.packet[:0], nonce[:], content, nil) + elem.packet, err = elem.keyPair.receive.Open( + elem.buffer[:0], + nonce[:], + content, + nil, + ) if err != nil { elem.Drop() } @@ -373,12 +381,16 @@ func (device *Device) RoutineHandshake() { logDebug.Println("Creating response message for", peer.String()) outElem := device.NewOutboundElement() - writer := bytes.NewBuffer(outElem.data[:0]) + writer := bytes.NewBuffer(outElem.buffer[:0]) binary.Write(writer, binary.LittleEndian, response) outElem.packet = writer.Bytes() peer.mac.AddMacs(outElem.packet) addToOutboundQueue(peer.queue.outbound, outElem) + // create new keypair + + peer.NewKeyPair() + case MessageResponseType: // unmarshal @@ -414,7 +426,7 @@ func (device *Device) RoutineHandshake() { peer.EventHandshakeComplete() default: - device.log.Error.Println("Invalid message type in handshake queue") + logError.Println("Invalid message type in handshake queue") } }() } @@ -529,7 +541,7 @@ func (peer *Peer) RoutineSequentialReceiver() { } atomic.AddUint64(&peer.rxBytes, uint64(len(elem.packet))) - addToInboundQueue(device.queue.inbound, elem) + device.addToInboundQueue(device.queue.inbound, elem) }() } } @@ -546,6 +558,7 @@ func (device *Device) RoutineWriteToTUN(tun TUNDevice) { return case elem := <-device.queue.inbound: _, err := tun.Write(elem.packet) + device.PutMessageBuffer(elem.buffer) if err != nil { logError.Println("Failed to write packet to TUN device:", err) } |