summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--device/channels.go22
-rw-r--r--device/peer.go8
-rw-r--r--device/pools_test.go2
-rw-r--r--device/receive.go6
-rw-r--r--device/send.go4
5 files changed, 22 insertions, 20 deletions
diff --git a/device/channels.go b/device/channels.go
index 4bd6090..1e3e206 100644
--- a/device/channels.go
+++ b/device/channels.go
@@ -71,14 +71,15 @@ func newHandshakeQueue() *handshakeQueue {
return q
}
+type autodrainingInboundQueue struct {
+ c chan *QueueInboundElement
+}
+
// newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
// It is useful in cases in which is it hard to manage the lifetime of the channel.
// The returned channel must not be closed. Senders should signal shutdown using
// some other means, such as sending a sentinel nil values.
-func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement {
- type autodrainingInboundQueue struct {
- c chan *QueueInboundElement
- }
+func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
q := &autodrainingInboundQueue{
c: make(chan *QueueInboundElement, QueueInboundSize),
}
@@ -97,7 +98,11 @@ func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement {
}
}
})
- return q.c
+ return q
+}
+
+type autodrainingOutboundQueue struct {
+ c chan *QueueOutboundElement
}
// newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
@@ -105,10 +110,7 @@ func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement {
// The returned channel must not be closed. Senders should signal shutdown using
// some other means, such as sending a sentinel nil values.
// All sends to the channel must be best-effort, because there may be no receivers.
-func newAutodrainingOutboundQueue(device *Device) chan *QueueOutboundElement {
- type autodrainingOutboundQueue struct {
- c chan *QueueOutboundElement
- }
+func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
q := &autodrainingOutboundQueue{
c: make(chan *QueueOutboundElement, QueueOutboundSize),
}
@@ -127,5 +129,5 @@ func newAutodrainingOutboundQueue(device *Device) chan *QueueOutboundElement {
}
}
})
- return q.c
+ return q
}
diff --git a/device/peer.go b/device/peer.go
index 49b9acb..69238a6 100644
--- a/device/peer.go
+++ b/device/peer.go
@@ -57,8 +57,8 @@ type Peer struct {
queue struct {
staged chan *QueueOutboundElement // staged packets before a handshake is available
- outbound chan *QueueOutboundElement // sequential ordering of udp transmission
- inbound chan *QueueInboundElement // sequential ordering of tun writing
+ outbound *autodrainingOutboundQueue // sequential ordering of udp transmission
+ inbound *autodrainingInboundQueue // sequential ordering of tun writing
}
cookieGenerator CookieGenerator
@@ -253,8 +253,8 @@ func (peer *Peer) Stop() {
peer.timersStop()
// Signal that RoutineSequentialSender and RoutineSequentialReceiver should exit.
- peer.queue.inbound <- nil
- peer.queue.outbound <- nil
+ peer.queue.inbound.c <- nil
+ peer.queue.outbound.c <- nil
peer.stopping.Wait()
peer.device.queue.encryption.wg.Done() // no more writes to encryption queue from us
diff --git a/device/pools_test.go b/device/pools_test.go
index 6caf7e7..a27ccc0 100644
--- a/device/pools_test.go
+++ b/device/pools_test.go
@@ -80,4 +80,4 @@ func BenchmarkWaitPool(b *testing.B) {
}()
}
wg.Wait()
-} \ No newline at end of file
+}
diff --git a/device/receive.go b/device/receive.go
index 7acb7d9..3fc6831 100644
--- a/device/receive.go
+++ b/device/receive.go
@@ -167,7 +167,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) {
// add to decryption queues
if peer.isRunning.Get() {
- peer.queue.inbound <- elem
+ peer.queue.inbound.c <- elem
device.queue.decryption.c <- elem
buffer = device.GetMessageBuffer()
} else {
@@ -402,7 +402,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
}()
device.log.Verbosef("%v - Routine: sequential receiver - started", peer)
- for elem := range peer.queue.inbound {
+ for elem := range peer.queue.inbound.c {
if elem == nil {
return
}
@@ -477,7 +477,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
if err != nil && !device.isClosed() {
device.log.Errorf("Failed to write packet to TUN device: %v", err)
}
- if len(peer.queue.inbound) == 0 {
+ if len(peer.queue.inbound.c) == 0 {
err = device.tun.device.Flush()
if err != nil {
peer.device.log.Errorf("Unable to flush packets: %v", err)
diff --git a/device/send.go b/device/send.go
index 911ee5c..783e5b9 100644
--- a/device/send.go
+++ b/device/send.go
@@ -317,7 +317,7 @@ top:
// add to parallel and sequential queue
if peer.isRunning.Get() {
- peer.queue.outbound <- elem
+ peer.queue.outbound.c <- elem
peer.device.queue.encryption.c <- elem
} else {
peer.device.PutMessageBuffer(elem.buffer)
@@ -410,7 +410,7 @@ func (peer *Peer) RoutineSequentialSender() {
}()
device.log.Verbosef("%v - Routine: sequential sender - started", peer)
- for elem := range peer.queue.outbound {
+ for elem := range peer.queue.outbound.c {
if elem == nil {
return
}