summaryrefslogtreecommitdiffhomepage
path: root/src/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/send.go')
-rw-r--r--src/send.go74
1 files changed, 42 insertions, 32 deletions
diff --git a/src/send.go b/src/send.go
index 7cdb806..37078b9 100644
--- a/src/send.go
+++ b/src/send.go
@@ -2,6 +2,7 @@ package main
import (
"encoding/binary"
+ "errors"
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
@@ -51,6 +52,11 @@ func (peer *Peer) FlushNonceQueue() {
}
}
+var (
+ ErrorNoEndpoint = errors.New("No known endpoint for peer")
+ ErrorNoConnection = errors.New("No UDP socket for device")
+)
+
func (device *Device) NewOutboundElement() *QueueOutboundElement {
return &QueueOutboundElement{
dropped: AtomicFalse,
@@ -103,6 +109,25 @@ func addToEncryptionQueue(
}
}
+func (peer *Peer) SendBuffer(buffer []byte) (int, error) {
+
+ peer.mutex.RLock()
+ endpoint := peer.endpoint
+ peer.mutex.RUnlock()
+ if endpoint == nil {
+ return 0, ErrorNoEndpoint
+ }
+
+ peer.device.net.mutex.RLock()
+ conn := peer.device.net.conn
+ peer.device.net.mutex.RUnlock()
+ if conn == nil {
+ return 0, ErrorNoConnection
+ }
+
+ return conn.WriteToUDP(buffer, endpoint)
+}
+
/* Reads packets from the TUN and inserts
* into nonce queue for peer
*
@@ -349,42 +374,27 @@ func (peer *Peer) RoutineSequentialSender() {
case elem := <-peer.queue.outbound:
elem.mutex.Lock()
+ if elem.IsDropped() {
+ continue
+ }
- func() {
- if elem.IsDropped() {
- return
- }
-
- // get endpoint and connection
-
- peer.mutex.RLock()
- endpoint := peer.endpoint
- peer.mutex.RUnlock()
- if endpoint == nil {
- logDebug.Println("No endpoint for", peer.String())
- return
- }
-
- device.net.mutex.RLock()
- conn := device.net.conn
- device.net.mutex.RUnlock()
- if conn == nil {
- logDebug.Println("No source for device")
- return
- }
-
- // send message and refresh keys
+ // send message and return buffer to pool
- _, err := conn.WriteToUDP(elem.packet, endpoint)
- if err != nil {
- return
- }
+ length := uint64(len(elem.packet))
+ _, err := peer.SendBuffer(elem.packet)
+ device.PutMessageBuffer(elem.buffer)
+ if err != nil {
+ continue
+ }
+ atomic.AddUint64(&peer.stats.txBytes, length)
- atomic.AddUint64(&peer.stats.txBytes, uint64(len(elem.packet)))
- peer.TimerResetKeepalive()
- }()
+ // update timers
- device.PutMessageBuffer(elem.buffer)
+ peer.TimerPacketSent()
+ if len(elem.packet) != MessageKeepaliveSize {
+ peer.TimerDataSent()
+ }
+ peer.KeepKeyFreshSending()
}
}
}