diff options
author | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-24 01:52:02 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-24 01:52:02 +0200 |
commit | ebc7541953269b39cd73d703166b9b8ee7b34e37 (patch) | |
tree | 9f436df296e7fba4a29b4a084d637c304844b7de | |
parent | 833597b585f460aaa17bad93ad59290ec282e77e (diff) |
Fix shutdown races
-rw-r--r-- | device.go | 3 | ||||
-rw-r--r-- | receive.go | 8 | ||||
-rw-r--r-- | send.go | 43 |
3 files changed, 42 insertions, 12 deletions
@@ -377,10 +377,11 @@ func (device *Device) Close() { close(device.signals.stop) + device.RemoveAllPeers() + device.state.stopping.Wait() device.FlushPacketQueues() - device.RemoveAllPeers() device.rate.limiter.Close() device.state.changing.Set(false) @@ -247,7 +247,6 @@ func (device *Device) RoutineDecryption() { // check if dropped if elem.IsDropped() { - device.PutInboundElement(elem) continue } @@ -281,7 +280,6 @@ func (device *Device) RoutineDecryption() { if err != nil { elem.Drop() device.PutMessageBuffer(elem.buffer) - elem.buffer = nil } elem.mutex.Unlock() } @@ -313,6 +311,7 @@ func (device *Device) RoutineHandshake() { for { if elem.buffer != nil { device.PutMessageBuffer(elem.buffer) + elem.buffer = nil } select { @@ -494,7 +493,7 @@ func (peer *Peer) RoutineSequentialReceiver() { logDebug.Println(peer, "- Routine: sequential receiver - stopped") peer.routines.stopping.Done() if elem != nil { - if elem.buffer != nil { + if !elem.IsDropped() { device.PutMessageBuffer(elem.buffer) } device.PutInboundElement(elem) @@ -507,10 +506,11 @@ func (peer *Peer) RoutineSequentialReceiver() { for { if elem != nil { - if elem.buffer != nil { + if !elem.IsDropped() { device.PutMessageBuffer(elem.buffer) } device.PutInboundElement(elem) + elem = nil } select { @@ -341,12 +341,6 @@ func (peer *Peer) RoutineNonce() { device := peer.device logDebug := device.log.Debug - defer func() { - logDebug.Println(peer, "- Routine: nonce worker - stopped") - peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) - peer.routines.stopping.Done() - }() - flush := func() { for { select { @@ -359,6 +353,13 @@ func (peer *Peer) RoutineNonce() { } } + defer func() { + flush() + logDebug.Println(peer, "- Routine: nonce worker - stopped") + peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) + peer.routines.stopping.Done() + }() + peer.routines.starting.Done() logDebug.Println(peer, "- Routine: nonce worker - started") @@ -461,6 +462,19 @@ func (device *Device) RoutineEncryption() { logDebug := device.log.Debug defer func() { + for { + select { + case elem, ok := <-device.queue.encryption: + if ok && !elem.IsDropped() { + elem.Drop() + device.PutMessageBuffer(elem.buffer) + elem.mutex.Unlock() + } + default: + goto out + } + } + out: logDebug.Println("Routine: encryption worker - stopped") device.state.stopping.Done() }() @@ -485,7 +499,6 @@ func (device *Device) RoutineEncryption() { // check if dropped if elem.IsDropped() { - device.PutOutboundElement(elem) continue } @@ -540,6 +553,22 @@ func (peer *Peer) RoutineSequentialSender() { logError := device.log.Error defer func() { + for { + select { + case elem, ok := <-peer.queue.outbound: + if ok { + if !elem.IsDropped() { + device.PutMessageBuffer(elem.buffer) + elem.Drop() + } + device.PutOutboundElement(elem) + elem.mutex.Unlock() + } + default: + goto out + } + } + out: logDebug.Println(peer, "- Routine: sequential sender - stopped") peer.routines.stopping.Done() }() |