diff options
author | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2017-07-14 14:25:18 +0200 |
---|---|---|
committer | Mathias Hall-Andersen <mathias@hall-andersen.dk> | 2017-07-14 14:25:18 +0200 |
commit | 8993b3927cf66517e2884b181d6b71d4c6599b7a (patch) | |
tree | a19a98791a7b4e800ad5cf815a7e008d317949f9 /src/send.go | |
parent | 0043008ad06bbeda4b037f2959f2c9cf1b55d65b (diff) |
Improved throughput
- Improved performance by adding the message buffers to a sync.Pool.
- Fixed issue with computing "next" key-pair upon
receiving a response message.
Diffstat (limited to 'src/send.go')
-rw-r--r-- | src/send.go | 39 |
1 files changed, 22 insertions, 17 deletions
diff --git a/src/send.go b/src/send.go index d8ddc82..7a2fe44 100644 --- a/src/send.go +++ b/src/send.go @@ -33,11 +33,11 @@ import ( type QueueOutboundElement struct { dropped int32 mutex sync.Mutex - data [MaxMessageSize]byte // slice holding the packet data - packet []byte // slice of "data" (always!) - nonce uint64 // nonce for encryption - keyPair *KeyPair // key-pair for encryption - peer *Peer // related peer + buffer *[MaxMessageSize]byte // slice holding the packet data + packet []byte // slice of "data" (always!) + nonce uint64 // nonce for encryption + keyPair *KeyPair // key-pair for encryption + peer *Peer // related peer } func (peer *Peer) FlushNonceQueue() { @@ -51,13 +51,11 @@ func (peer *Peer) FlushNonceQueue() { } } -/* - * Assumption: The mutex of the returned element is released - */ func (device *Device) NewOutboundElement() *QueueOutboundElement { - // TODO: profile, consider sync.Pool - elem := new(QueueOutboundElement) - return elem + return &QueueOutboundElement{ + dropped: AtomicFalse, + buffer: device.pool.messageBuffers.Get().(*[MaxMessageSize]byte), + } } func (elem *QueueOutboundElement) Drop() { @@ -130,7 +128,7 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { elem = device.NewOutboundElement() } - elem.packet = elem.data[MessageTransportHeaderSize:] + elem.packet = elem.buffer[MessageTransportHeaderSize:] size, err := tun.Read(elem.packet) if err != nil { @@ -284,7 +282,7 @@ func (device *Device) RoutineEncryption() { // populate header fields func() { - header := work.data[:MessageTransportHeaderSize] + header := work.buffer[:MessageTransportHeaderSize] fieldType := header[0:4] fieldReceiver := header[4:8] @@ -305,7 +303,7 @@ func (device *Device) RoutineEncryption() { nil, ) length := MessageTransportHeaderSize + len(work.packet) - work.packet = work.data[:length] + work.packet = work.buffer[:length] work.mutex.Unlock() // refresh key if necessary @@ -333,12 +331,16 @@ func (peer *Peer) RoutineSequentialSender() { case work := <-peer.queue.outbound: work.mutex.Lock() - if work.IsDropped() { - continue - } func() { + // return buffer to pool after processing + + defer device.PutMessageBuffer(work.buffer) + if work.IsDropped() { + return + } + // send to endpoint peer.mutex.RLock() @@ -357,10 +359,13 @@ func (peer *Peer) RoutineSequentialSender() { return } + // send message and return buffer to pool + _, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint) if err != nil { return } + atomic.AddUint64(&peer.txBytes, uint64(len(work.packet))) // reset keep-alive |