summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--device.go3
-rw-r--r--receive.go8
-rw-r--r--send.go43
3 files changed, 42 insertions, 12 deletions
diff --git a/device.go b/device.go
index 7cf9ba2..8823404 100644
--- a/device.go
+++ b/device.go
@@ -377,10 +377,11 @@ func (device *Device) Close() {
close(device.signals.stop)
+ device.RemoveAllPeers()
+
device.state.stopping.Wait()
device.FlushPacketQueues()
- device.RemoveAllPeers()
device.rate.limiter.Close()
device.state.changing.Set(false)
diff --git a/receive.go b/receive.go
index ab86913..01151ca 100644
--- a/receive.go
+++ b/receive.go
@@ -247,7 +247,6 @@ func (device *Device) RoutineDecryption() {
// check if dropped
if elem.IsDropped() {
- device.PutInboundElement(elem)
continue
}
@@ -281,7 +280,6 @@ func (device *Device) RoutineDecryption() {
if err != nil {
elem.Drop()
device.PutMessageBuffer(elem.buffer)
- elem.buffer = nil
}
elem.mutex.Unlock()
}
@@ -313,6 +311,7 @@ func (device *Device) RoutineHandshake() {
for {
if elem.buffer != nil {
device.PutMessageBuffer(elem.buffer)
+ elem.buffer = nil
}
select {
@@ -494,7 +493,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
logDebug.Println(peer, "- Routine: sequential receiver - stopped")
peer.routines.stopping.Done()
if elem != nil {
- if elem.buffer != nil {
+ if !elem.IsDropped() {
device.PutMessageBuffer(elem.buffer)
}
device.PutInboundElement(elem)
@@ -507,10 +506,11 @@ func (peer *Peer) RoutineSequentialReceiver() {
for {
if elem != nil {
- if elem.buffer != nil {
+ if !elem.IsDropped() {
device.PutMessageBuffer(elem.buffer)
}
device.PutInboundElement(elem)
+ elem = nil
}
select {
diff --git a/send.go b/send.go
index fa84043..b636a43 100644
--- a/send.go
+++ b/send.go
@@ -341,12 +341,6 @@ func (peer *Peer) RoutineNonce() {
device := peer.device
logDebug := device.log.Debug
- defer func() {
- logDebug.Println(peer, "- Routine: nonce worker - stopped")
- peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
- peer.routines.stopping.Done()
- }()
-
flush := func() {
for {
select {
@@ -359,6 +353,13 @@ func (peer *Peer) RoutineNonce() {
}
}
+ defer func() {
+ flush()
+ logDebug.Println(peer, "- Routine: nonce worker - stopped")
+ peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
+ peer.routines.stopping.Done()
+ }()
+
peer.routines.starting.Done()
logDebug.Println(peer, "- Routine: nonce worker - started")
@@ -461,6 +462,19 @@ func (device *Device) RoutineEncryption() {
logDebug := device.log.Debug
defer func() {
+ for {
+ select {
+ case elem, ok := <-device.queue.encryption:
+ if ok && !elem.IsDropped() {
+ elem.Drop()
+ device.PutMessageBuffer(elem.buffer)
+ elem.mutex.Unlock()
+ }
+ default:
+ goto out
+ }
+ }
+ out:
logDebug.Println("Routine: encryption worker - stopped")
device.state.stopping.Done()
}()
@@ -485,7 +499,6 @@ func (device *Device) RoutineEncryption() {
// check if dropped
if elem.IsDropped() {
- device.PutOutboundElement(elem)
continue
}
@@ -540,6 +553,22 @@ func (peer *Peer) RoutineSequentialSender() {
logError := device.log.Error
defer func() {
+ for {
+ select {
+ case elem, ok := <-peer.queue.outbound:
+ if ok {
+ if !elem.IsDropped() {
+ device.PutMessageBuffer(elem.buffer)
+ elem.Drop()
+ }
+ device.PutOutboundElement(elem)
+ elem.mutex.Unlock()
+ }
+ default:
+ goto out
+ }
+ }
+ out:
logDebug.Println(peer, "- Routine: sequential sender - stopped")
peer.routines.stopping.Done()
}()