summaryrefslogtreecommitdiffhomepage
path: root/src/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/send.go')
-rw-r--r--src/send.go39
1 files changed, 22 insertions, 17 deletions
diff --git a/src/send.go b/src/send.go
index d8ddc82..7a2fe44 100644
--- a/src/send.go
+++ b/src/send.go
@@ -33,11 +33,11 @@ import (
type QueueOutboundElement struct {
dropped int32
mutex sync.Mutex
- data [MaxMessageSize]byte // slice holding the packet data
- packet []byte // slice of "data" (always!)
- nonce uint64 // nonce for encryption
- keyPair *KeyPair // key-pair for encryption
- peer *Peer // related peer
+ buffer *[MaxMessageSize]byte // slice holding the packet data
+ packet []byte // slice of "data" (always!)
+ nonce uint64 // nonce for encryption
+ keyPair *KeyPair // key-pair for encryption
+ peer *Peer // related peer
}
func (peer *Peer) FlushNonceQueue() {
@@ -51,13 +51,11 @@ func (peer *Peer) FlushNonceQueue() {
}
}
-/*
- * Assumption: The mutex of the returned element is released
- */
func (device *Device) NewOutboundElement() *QueueOutboundElement {
- // TODO: profile, consider sync.Pool
- elem := new(QueueOutboundElement)
- return elem
+ return &QueueOutboundElement{
+ dropped: AtomicFalse,
+ buffer: device.pool.messageBuffers.Get().(*[MaxMessageSize]byte),
+ }
}
func (elem *QueueOutboundElement) Drop() {
@@ -130,7 +128,7 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) {
elem = device.NewOutboundElement()
}
- elem.packet = elem.data[MessageTransportHeaderSize:]
+ elem.packet = elem.buffer[MessageTransportHeaderSize:]
size, err := tun.Read(elem.packet)
if err != nil {
@@ -284,7 +282,7 @@ func (device *Device) RoutineEncryption() {
// populate header fields
func() {
- header := work.data[:MessageTransportHeaderSize]
+ header := work.buffer[:MessageTransportHeaderSize]
fieldType := header[0:4]
fieldReceiver := header[4:8]
@@ -305,7 +303,7 @@ func (device *Device) RoutineEncryption() {
nil,
)
length := MessageTransportHeaderSize + len(work.packet)
- work.packet = work.data[:length]
+ work.packet = work.buffer[:length]
work.mutex.Unlock()
// refresh key if necessary
@@ -333,12 +331,16 @@ func (peer *Peer) RoutineSequentialSender() {
case work := <-peer.queue.outbound:
work.mutex.Lock()
- if work.IsDropped() {
- continue
- }
func() {
+ // return buffer to pool after processing
+
+ defer device.PutMessageBuffer(work.buffer)
+ if work.IsDropped() {
+ return
+ }
+
// send to endpoint
peer.mutex.RLock()
@@ -357,10 +359,13 @@ func (peer *Peer) RoutineSequentialSender() {
return
}
+ // send message and return buffer to pool
+
_, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint)
if err != nil {
return
}
+
atomic.AddUint64(&peer.txBytes, uint64(len(work.packet)))
// reset keep-alive