summaryrefslogtreecommitdiffhomepage
path: root/src/send.go
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2017-07-17 16:16:18 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2017-07-17 16:16:18 +0200
commitc5d7efc2467abb6cd8365c83fae68da6924c17f2 (patch)
tree0324219cf4979a87fc45fc575e26f7058b0a196f /src/send.go
parentdd4da93749fd9a8a231942a6b75ad137cc308e02 (diff)
Fixed deadlock in index.go
Diffstat (limited to 'src/send.go')
-rw-r--r--src/send.go81
1 files changed, 45 insertions, 36 deletions
diff --git a/src/send.go b/src/send.go
index 2db74ba..fdbc676 100644
--- a/src/send.go
+++ b/src/send.go
@@ -270,50 +270,65 @@ func (peer *Peer) RoutineNonce() {
* Obs. One instance per core
*/
func (device *Device) RoutineEncryption() {
+
+ var elem *QueueOutboundElement
var nonce [chacha20poly1305.NonceSize]byte
- for work := range device.queue.encryption {
+
+ logDebug := device.log.Debug
+ logDebug.Println("Routine, encryption worker, started")
+
+ for {
+
+ // fetch next element
+
+ select {
+ case elem = <-device.queue.encryption:
+ case <-device.signal.stop:
+ logDebug.Println("Routine, encryption worker, stopped")
+ return
+ }
// check if dropped
- if work.IsDropped() {
+ if elem.IsDropped() {
continue
}
// populate header fields
- header := work.buffer[:MessageTransportHeaderSize]
+ header := elem.buffer[:MessageTransportHeaderSize]
fieldType := header[0:4]
fieldReceiver := header[4:8]
fieldNonce := header[8:16]
binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
- binary.LittleEndian.PutUint32(fieldReceiver, work.keyPair.remoteIndex)
- binary.LittleEndian.PutUint64(fieldNonce, work.nonce)
+ binary.LittleEndian.PutUint32(fieldReceiver, elem.keyPair.remoteIndex)
+ binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
// pad content to MTU size
mtu := int(atomic.LoadInt32(&device.mtu))
- for i := len(work.packet); i < mtu; i++ {
- work.packet = append(work.packet, 0)
+ for i := len(elem.packet); i < mtu; i++ {
+ elem.packet = append(elem.packet, 0)
}
// encrypt content
- binary.LittleEndian.PutUint64(nonce[4:], work.nonce)
- work.packet = work.keyPair.send.Seal(
- work.packet[:0],
+ binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
+ elem.packet = elem.keyPair.send.Seal(
+ elem.packet[:0],
nonce[:],
- work.packet,
+ elem.packet,
nil,
)
- length := MessageTransportHeaderSize + len(work.packet)
- work.packet = work.buffer[:length]
- work.mutex.Unlock()
+ length := MessageTransportHeaderSize + len(elem.packet)
+ elem.packet = elem.buffer[:length]
+ elem.mutex.Unlock()
// refresh key if necessary
- work.peer.KeepKeyFreshSending()
+ elem.peer.KeepKeyFreshSending()
}
}
@@ -334,49 +349,43 @@ func (peer *Peer) RoutineSequentialSender() {
logDebug.Println("Routine, sequential sender, stopped for", peer.String())
return
- case work := <-peer.queue.outbound:
- work.mutex.Lock()
+ case elem := <-peer.queue.outbound:
+ elem.mutex.Lock()
func() {
-
- // return buffer to pool after processing
-
- defer device.PutMessageBuffer(work.buffer)
- if work.IsDropped() {
+ if elem.IsDropped() {
return
}
- // send to endpoint
+ // get endpoint and connection
peer.mutex.RLock()
- defer peer.mutex.RUnlock()
-
- if peer.endpoint == nil {
+ endpoint := peer.endpoint
+ peer.mutex.RUnlock()
+ if endpoint == nil {
logDebug.Println("No endpoint for", peer.String())
return
}
device.net.mutex.RLock()
- defer device.net.mutex.RUnlock()
-
- if device.net.conn == nil {
+ conn := device.net.conn
+ device.net.mutex.RUnlock()
+ if conn == nil {
logDebug.Println("No source for device")
return
}
- // send message and return buffer to pool
+ // send message and refresh keys
- _, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint)
+ _, err := conn.WriteToUDP(elem.packet, endpoint)
if err != nil {
return
}
-
- atomic.AddUint64(&peer.txBytes, uint64(len(work.packet)))
-
- // reset keep-alive
-
+ atomic.AddUint64(&peer.txBytes, uint64(len(elem.packet)))
peer.TimerResetKeepalive()
}()
+
+ device.PutMessageBuffer(elem.buffer)
}
}
}