summaryrefslogtreecommitdiffhomepage
path: root/device/receive.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/receive.go')
-rw-r--r--device/receive.go84
1 files changed, 28 insertions, 56 deletions
diff --git a/device/receive.go b/device/receive.go
index 605303e..38d018c 100644
--- a/device/receive.go
+++ b/device/receive.go
@@ -10,7 +10,6 @@ import (
"encoding/binary"
"errors"
"net"
- "strconv"
"sync"
"sync/atomic"
"time"
@@ -79,15 +78,13 @@ func (peer *Peer) keepKeyFreshReceiving() {
* IPv4 and IPv6 (separately)
*/
func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) {
-
- logDebug := device.log.Debug
defer func() {
- logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - stopped")
+ device.debugf("Routine: receive incoming IPv%d - stopped", IP)
device.queue.decryption.wg.Done()
device.net.stopping.Done()
}()
- logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - started")
+ device.debugf("Routine: receive incoming IPv%d - started", IP)
// receive datagrams until conn is closed
@@ -115,7 +112,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) {
if errors.Is(err, conn.NetErrClosed) {
return
}
- device.log.Error.Printf("Failed to receive packet: %v", err)
+ device.errorf("Failed to receive packet: %v", err)
if deathSpiral < 10 {
deathSpiral++
time.Sleep(time.Second / 3)
@@ -202,7 +199,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) {
okay = len(packet) == MessageCookieReplySize
default:
- logDebug.Println("Received message with unknown type")
+ device.debugf("Received message with unknown type")
}
if okay {
@@ -222,15 +219,12 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) {
}
func (device *Device) RoutineDecryption() {
-
var nonce [chacha20poly1305.NonceSize]byte
-
- logDebug := device.log.Debug
defer func() {
- logDebug.Println("Routine: decryption worker - stopped")
+ device.debugf("Routine: decryption worker - stopped")
device.state.stopping.Done()
}()
- logDebug.Println("Routine: decryption worker - started")
+ device.debugf("Routine: decryption worker - started")
for elem := range device.queue.decryption.c {
// split message into fields
@@ -258,23 +252,18 @@ func (device *Device) RoutineDecryption() {
/* Handles incoming packets related to handshake
*/
func (device *Device) RoutineHandshake() {
-
- logInfo := device.log.Info
- logError := device.log.Error
- logDebug := device.log.Debug
-
var elem QueueHandshakeElement
var ok bool
defer func() {
- logDebug.Println("Routine: handshake worker - stopped")
+ device.debugf("Routine: handshake worker - stopped")
device.state.stopping.Done()
if elem.buffer != nil {
device.PutMessageBuffer(elem.buffer)
}
}()
- logDebug.Println("Routine: handshake worker - started")
+ device.debugf("Routine: handshake worker - started")
for {
if elem.buffer != nil {
@@ -304,7 +293,7 @@ func (device *Device) RoutineHandshake() {
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &reply)
if err != nil {
- logDebug.Println("Failed to decode cookie reply")
+ device.debugf("Failed to decode cookie reply")
return
}
@@ -319,9 +308,9 @@ func (device *Device) RoutineHandshake() {
// consume reply
if peer := entry.peer; peer.isRunning.Get() {
- logDebug.Println("Receiving cookie response from ", elem.endpoint.DstToString())
+ device.debugf("Receiving cookie response from %s", elem.endpoint.DstToString())
if !peer.cookieGenerator.ConsumeReply(&reply) {
- logDebug.Println("Could not decrypt invalid cookie response")
+ device.debugf("Could not decrypt invalid cookie response")
}
}
@@ -332,7 +321,7 @@ func (device *Device) RoutineHandshake() {
// check mac fields and maybe ratelimit
if !device.cookieChecker.CheckMAC1(elem.packet) {
- logDebug.Println("Received packet with invalid mac1")
+ device.debugf("Received packet with invalid mac1")
continue
}
@@ -355,7 +344,7 @@ func (device *Device) RoutineHandshake() {
}
default:
- logError.Println("Invalid packet ended up in the handshake queue")
+ device.errorf("Invalid packet ended up in the handshake queue")
continue
}
@@ -370,7 +359,7 @@ func (device *Device) RoutineHandshake() {
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
- logError.Println("Failed to decode initiation message")
+ device.errorf("Failed to decode initiation message")
continue
}
@@ -378,10 +367,7 @@ func (device *Device) RoutineHandshake() {
peer := device.ConsumeMessageInitiation(&msg)
if peer == nil {
- logInfo.Println(
- "Received invalid initiation message from",
- elem.endpoint.DstToString(),
- )
+ device.infof("Received invalid initiation message from %s", elem.endpoint.DstToString())
continue
}
@@ -393,7 +379,7 @@ func (device *Device) RoutineHandshake() {
// update endpoint
peer.SetEndpointFromPacket(elem.endpoint)
- logDebug.Println(peer, "- Received handshake initiation")
+ device.debugf("%v - Received handshake initiation", peer)
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
peer.SendHandshakeResponse()
@@ -406,7 +392,7 @@ func (device *Device) RoutineHandshake() {
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
- logError.Println("Failed to decode response message")
+ device.errorf("Failed to decode response message")
continue
}
@@ -414,17 +400,14 @@ func (device *Device) RoutineHandshake() {
peer := device.ConsumeMessageResponse(&msg)
if peer == nil {
- logInfo.Println(
- "Received invalid response message from",
- elem.endpoint.DstToString(),
- )
+ device.infof("Received invalid response message from %s", elem.endpoint.DstToString())
continue
}
// update endpoint
peer.SetEndpointFromPacket(elem.endpoint)
- logDebug.Println(peer, "- Received handshake response")
+ device.debugf("%v - Received handshake response", peer)
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
// update timers
@@ -437,7 +420,7 @@ func (device *Device) RoutineHandshake() {
err = peer.BeginSymmetricSession()
if err != nil {
- logError.Println(peer, "- Failed to derive keypair:", err)
+ device.errorf("%v - Failed to derive keypair: %v", peer, err)
continue
}
@@ -453,16 +436,11 @@ func (device *Device) RoutineHandshake() {
}
func (peer *Peer) RoutineSequentialReceiver() {
-
device := peer.device
- logInfo := device.log.Info
- logError := device.log.Error
- logDebug := device.log.Debug
-
var elem *QueueInboundElement
defer func() {
- logDebug.Println(peer, "- Routine: sequential receiver - stopped")
+ device.debugf("%v - Routine: sequential receiver - stopped", peer)
peer.routines.stopping.Done()
if elem != nil {
device.PutMessageBuffer(elem.buffer)
@@ -470,7 +448,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
}
}()
- logDebug.Println(peer, "- Routine: sequential receiver - started")
+ device.debugf("%v - Routine: sequential receiver - started", peer)
for {
if elem != nil {
@@ -521,7 +499,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
// check for keepalive
if len(elem.packet) == 0 {
- logDebug.Println(peer, "- Receiving keepalive packet")
+ device.debugf("%v - Receiving keepalive packet", peer)
continue
}
peer.timersDataReceived()
@@ -549,10 +527,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len]
if device.allowedips.LookupIPv4(src) != peer {
- logInfo.Println(
- "IPv4 packet with disallowed source address from",
- peer,
- )
+ device.infof("IPv4 packet with disallowed source address from %v", peer)
continue
}
@@ -577,15 +552,12 @@ func (peer *Peer) RoutineSequentialReceiver() {
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
if device.allowedips.LookupIPv6(src) != peer {
- logInfo.Println(
- "IPv6 packet with disallowed source address from",
- peer,
- )
+ device.infof("IPv6 packet with disallowed source address from %v", peer)
continue
}
default:
- logInfo.Println("Packet with invalid IP version from", peer)
+ device.infof("Packet with invalid IP version from %v", peer)
continue
}
@@ -594,12 +566,12 @@ func (peer *Peer) RoutineSequentialReceiver() {
offset := MessageTransportOffsetContent
_, err := device.tun.device.Write(elem.buffer[:offset+len(elem.packet)], offset)
if err != nil && !device.isClosed.Get() {
- logError.Println("Failed to write packet to TUN device:", err)
+ device.errorf("Failed to write packet to TUN device: %v", err)
}
if len(peer.queue.inbound) == 0 {
err := device.tun.device.Flush()
if err != nil {
- peer.device.log.Error.Printf("Unable to flush packets: %v", err)
+ peer.device.errorf("Unable to flush packets: %v", err)
}
}
}