summaryrefslogtreecommitdiffhomepage
path: root/src/receive.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/receive.go')
-rw-r--r--src/receive.go44
1 files changed, 27 insertions, 17 deletions
diff --git a/src/receive.go b/src/receive.go
index c788dcf..e780c66 100644
--- a/src/receive.go
+++ b/src/receive.go
@@ -10,11 +10,6 @@ import (
"time"
)
-const (
- ElementStateOkay = iota
- ElementStateDropped
-)
-
type QueueHandshakeElement struct {
msgType uint32
packet []byte
@@ -22,7 +17,7 @@ type QueueHandshakeElement struct {
}
type QueueInboundElement struct {
- state uint32
+ dropped int32
mutex sync.Mutex
packet []byte
counter uint64
@@ -30,11 +25,11 @@ type QueueInboundElement struct {
}
func (elem *QueueInboundElement) Drop() {
- atomic.StoreUint32(&elem.state, ElementStateDropped)
+ atomic.StoreInt32(&elem.dropped, AtomicTrue)
}
func (elem *QueueInboundElement) IsDropped() bool {
- return atomic.LoadUint32(&elem.state) == ElementStateDropped
+ return atomic.LoadInt32(&elem.dropped) == AtomicTrue
}
func addToInboundQueue(
@@ -101,9 +96,9 @@ func (device *Device) RoutineBusyMonitor() {
// update busy state
if busy {
- atomic.StoreInt32(&device.congestionState, CongestionStateUnderLoad)
+ atomic.StoreInt32(&device.underLoad, AtomicTrue)
} else {
- atomic.StoreInt32(&device.congestionState, CongestionStateOkay)
+ atomic.StoreInt32(&device.underLoad, AtomicFalse)
}
timer.Reset(interval)
@@ -216,7 +211,7 @@ func (device *Device) RoutineReceiveIncomming() {
work := new(QueueInboundElement)
work.packet = packet
work.keyPair = keyPair
- work.state = ElementStateOkay
+ work.dropped = AtomicFalse
work.mutex.Lock()
// add to decryption queues
@@ -303,7 +298,7 @@ func (device *Device) RoutineHandshake() {
// verify mac2
- busy := atomic.LoadInt32(&device.congestionState) == CongestionStateUnderLoad
+ busy := atomic.LoadInt32(&device.underLoad) == AtomicTrue
if busy && !device.mac.CheckMAC2(elem.packet, elem.source) {
sender := binary.LittleEndian.Uint32(elem.packet[4:8]) // "sender" always follows "type"
@@ -397,13 +392,12 @@ func (device *Device) RoutineHandshake() {
)
return
}
- sendSignal(peer.signal.handshakeCompleted)
- logDebug.Println("Recieved valid response message for peer", peer.id)
kp := peer.NewKeyPair()
if kp == nil {
logDebug.Println("Failed to derieve key-pair")
}
peer.SendKeepAlive()
+ peer.EventHandshakeComplete()
default:
device.log.Error.Println("Invalid message type in handshake queue")
@@ -438,9 +432,25 @@ func (peer *Peer) RoutineSequentialReceiver() {
// check for replay
- // update timers
+ // time (passive) keep-alive
+
+ peer.TimerStartKeepalive()
+
+ // refresh key material (rekey)
- // refresh key material
+ peer.KeepKeyFreshReceiving()
+
+ // check if confirming handshake
+
+ kp := &peer.keyPairs
+ kp.mutex.Lock()
+ if kp.next == elem.keyPair {
+ peer.EventHandshakeComplete()
+ kp.previous = kp.current
+ kp.current = kp.next
+ kp.next = nil
+ }
+ kp.mutex.Unlock()
// check for keep-alive
@@ -491,7 +501,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
}
default:
- device.log.Debug.Println("Receieved packet with unknown IP version")
+ logDebug.Println("Receieved packet with unknown IP version")
return
}