summaryrefslogtreecommitdiffhomepage
path: root/device/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/send.go')
-rw-r--r--device/send.go95
1 files changed, 49 insertions, 46 deletions
diff --git a/device/send.go b/device/send.go
index e838c4e..769720a 100644
--- a/device/send.go
+++ b/device/send.go
@@ -46,7 +46,6 @@ import (
*/
type QueueOutboundElement struct {
- sync.Mutex
buffer *[MaxMessageSize]byte // slice holding the packet data
packet []byte // slice of "buffer" (always!)
nonce uint64 // nonce for encryption
@@ -54,10 +53,14 @@ type QueueOutboundElement struct {
peer *Peer // related peer
}
+type QueueOutboundElementsContainer struct {
+ sync.Mutex
+ elems []*QueueOutboundElement
+}
+
func (device *Device) NewOutboundElement() *QueueOutboundElement {
elem := device.GetOutboundElement()
elem.buffer = device.GetMessageBuffer()
- elem.Mutex = sync.Mutex{}
elem.nonce = 0
// keypair and peer were cleared (if necessary) by clearPointers.
return elem
@@ -79,15 +82,15 @@ func (elem *QueueOutboundElement) clearPointers() {
func (peer *Peer) SendKeepalive() {
if len(peer.queue.staged) == 0 && peer.isRunning.Load() {
elem := peer.device.NewOutboundElement()
- elems := peer.device.GetOutboundElementsSlice()
- *elems = append(*elems, elem)
+ elemsContainer := peer.device.GetOutboundElementsContainer()
+ elemsContainer.elems = append(elemsContainer.elems, elem)
select {
- case peer.queue.staged <- elems:
+ case peer.queue.staged <- elemsContainer:
peer.device.log.Verbosef("%v - Sending keepalive packet", peer)
default:
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
- peer.device.PutOutboundElementsSlice(elems)
+ peer.device.PutOutboundElementsContainer(elemsContainer)
}
}
peer.SendStagedPackets()
@@ -219,7 +222,7 @@ func (device *Device) RoutineReadFromTUN() {
readErr error
elems = make([]*QueueOutboundElement, batchSize)
bufs = make([][]byte, batchSize)
- elemsByPeer = make(map[*Peer]*[]*QueueOutboundElement, batchSize)
+ elemsByPeer = make(map[*Peer]*QueueOutboundElementsContainer, batchSize)
count = 0
sizes = make([]int, batchSize)
offset = MessageTransportHeaderSize
@@ -276,10 +279,10 @@ func (device *Device) RoutineReadFromTUN() {
}
elemsForPeer, ok := elemsByPeer[peer]
if !ok {
- elemsForPeer = device.GetOutboundElementsSlice()
+ elemsForPeer = device.GetOutboundElementsContainer()
elemsByPeer[peer] = elemsForPeer
}
- *elemsForPeer = append(*elemsForPeer, elem)
+ elemsForPeer.elems = append(elemsForPeer.elems, elem)
elems[i] = device.NewOutboundElement()
bufs[i] = elems[i].buffer[:]
}
@@ -289,11 +292,11 @@ func (device *Device) RoutineReadFromTUN() {
peer.StagePackets(elemsForPeer)
peer.SendStagedPackets()
} else {
- for _, elem := range *elemsForPeer {
+ for _, elem := range elemsForPeer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
- device.PutOutboundElementsSlice(elemsForPeer)
+ device.PutOutboundElementsContainer(elemsForPeer)
}
delete(elemsByPeer, peer)
}
@@ -317,7 +320,7 @@ func (device *Device) RoutineReadFromTUN() {
}
}
-func (peer *Peer) StagePackets(elems *[]*QueueOutboundElement) {
+func (peer *Peer) StagePackets(elems *QueueOutboundElementsContainer) {
for {
select {
case peer.queue.staged <- elems:
@@ -326,11 +329,11 @@ func (peer *Peer) StagePackets(elems *[]*QueueOutboundElement) {
}
select {
case tooOld := <-peer.queue.staged:
- for _, elem := range *tooOld {
+ for _, elem := range tooOld.elems {
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
}
- peer.device.PutOutboundElementsSlice(tooOld)
+ peer.device.PutOutboundElementsContainer(tooOld)
default:
}
}
@@ -349,52 +352,52 @@ top:
}
for {
- var elemsOOO *[]*QueueOutboundElement
+ var elemsContainerOOO *QueueOutboundElementsContainer
select {
- case elems := <-peer.queue.staged:
+ case elemsContainer := <-peer.queue.staged:
i := 0
- for _, elem := range *elems {
+ for _, elem := range elemsContainer.elems {
elem.peer = peer
elem.nonce = keypair.sendNonce.Add(1) - 1
if elem.nonce >= RejectAfterMessages {
keypair.sendNonce.Store(RejectAfterMessages)
- if elemsOOO == nil {
- elemsOOO = peer.device.GetOutboundElementsSlice()
+ if elemsContainerOOO == nil {
+ elemsContainerOOO = peer.device.GetOutboundElementsContainer()
}
- *elemsOOO = append(*elemsOOO, elem)
+ elemsContainerOOO.elems = append(elemsContainerOOO.elems, elem)
continue
} else {
- (*elems)[i] = elem
+ elemsContainer.elems[i] = elem
i++
}
elem.keypair = keypair
- elem.Lock()
}
- *elems = (*elems)[:i]
+ elemsContainer.Lock()
+ elemsContainer.elems = elemsContainer.elems[:i]
- if elemsOOO != nil {
- peer.StagePackets(elemsOOO) // XXX: Out of order, but we can't front-load go chans
+ if elemsContainerOOO != nil {
+ peer.StagePackets(elemsContainerOOO) // XXX: Out of order, but we can't front-load go chans
}
- if len(*elems) == 0 {
- peer.device.PutOutboundElementsSlice(elems)
+ if len(elemsContainer.elems) == 0 {
+ peer.device.PutOutboundElementsContainer(elemsContainer)
goto top
}
// add to parallel and sequential queue
if peer.isRunning.Load() {
- peer.queue.outbound.c <- elems
- peer.device.queue.encryption.c <- elems
+ peer.queue.outbound.c <- elemsContainer
+ peer.device.queue.encryption.c <- elemsContainer
} else {
- for _, elem := range *elems {
+ for _, elem := range elemsContainer.elems {
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
}
- peer.device.PutOutboundElementsSlice(elems)
+ peer.device.PutOutboundElementsContainer(elemsContainer)
}
- if elemsOOO != nil {
+ if elemsContainerOOO != nil {
goto top
}
default:
@@ -406,12 +409,12 @@ top:
func (peer *Peer) FlushStagedPackets() {
for {
select {
- case elems := <-peer.queue.staged:
- for _, elem := range *elems {
+ case elemsContainer := <-peer.queue.staged:
+ for _, elem := range elemsContainer.elems {
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
}
- peer.device.PutOutboundElementsSlice(elems)
+ peer.device.PutOutboundElementsContainer(elemsContainer)
default:
return
}
@@ -445,8 +448,8 @@ func (device *Device) RoutineEncryption(id int) {
defer device.log.Verbosef("Routine: encryption worker %d - stopped", id)
device.log.Verbosef("Routine: encryption worker %d - started", id)
- for elems := range device.queue.encryption.c {
- for _, elem := range *elems {
+ for elemsContainer := range device.queue.encryption.c {
+ for _, elem := range elemsContainer.elems {
// populate header fields
header := elem.buffer[:MessageTransportHeaderSize]
@@ -471,8 +474,8 @@ func (device *Device) RoutineEncryption(id int) {
elem.packet,
nil,
)
- elem.Unlock()
}
+ elemsContainer.Unlock()
}
}
@@ -486,9 +489,9 @@ func (peer *Peer) RoutineSequentialSender(maxBatchSize int) {
bufs := make([][]byte, 0, maxBatchSize)
- for elems := range peer.queue.outbound.c {
+ for elemsContainer := range peer.queue.outbound.c {
bufs = bufs[:0]
- if elems == nil {
+ if elemsContainer == nil {
return
}
if !peer.isRunning.Load() {
@@ -498,16 +501,16 @@ func (peer *Peer) RoutineSequentialSender(maxBatchSize int) {
// The timers and SendBuffers code are resilient to a few stragglers.
// TODO: rework peer shutdown order to ensure
// that we never accidentally keep timers alive longer than necessary.
- for _, elem := range *elems {
- elem.Lock()
+ elemsContainer.Lock()
+ for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
continue
}
dataSent := false
- for _, elem := range *elems {
- elem.Lock()
+ elemsContainer.Lock()
+ for _, elem := range elemsContainer.elems {
if len(elem.packet) != MessageKeepaliveSize {
dataSent = true
}
@@ -521,11 +524,11 @@ func (peer *Peer) RoutineSequentialSender(maxBatchSize int) {
if dataSent {
peer.timersDataSent()
}
- for _, elem := range *elems {
+ for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
- device.PutOutboundElementsSlice(elems)
+ device.PutOutboundElementsContainer(elemsContainer)
if err != nil {
var errGSO conn.ErrUDPGSODisabled
if errors.As(err, &errGSO) {