summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--device/channels.go52
-rw-r--r--device/peer.go2
2 files changed, 30 insertions, 24 deletions
diff --git a/device/channels.go b/device/channels.go
index bf78868..6f4370a 100644
--- a/device/channels.go
+++ b/device/channels.go
@@ -83,21 +83,23 @@ func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
q := &autodrainingInboundQueue{
c: make(chan *QueueInboundElement, QueueInboundSize),
}
- runtime.SetFinalizer(q, func(q *autodrainingInboundQueue) {
- for {
- select {
- case elem := <-q.c:
- elem.Lock()
- device.PutMessageBuffer(elem.buffer)
- device.PutInboundElement(elem)
- default:
- return
- }
- }
- })
+ runtime.SetFinalizer(q, device.flushInboundQueue)
return q
}
+func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
+ for {
+ select {
+ case elem := <-q.c:
+ elem.Lock()
+ device.PutMessageBuffer(elem.buffer)
+ device.PutInboundElement(elem)
+ default:
+ return
+ }
+ }
+}
+
type autodrainingOutboundQueue struct {
c chan *QueueOutboundElement
}
@@ -111,17 +113,19 @@ func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
q := &autodrainingOutboundQueue{
c: make(chan *QueueOutboundElement, QueueOutboundSize),
}
- runtime.SetFinalizer(q, func(q *autodrainingOutboundQueue) {
- for {
- select {
- case elem := <-q.c:
- elem.Lock()
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- default:
- return
- }
- }
- })
+ runtime.SetFinalizer(q, device.flushOutboundQueue)
return q
}
+
+func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
+ for {
+ select {
+ case elem := <-q.c:
+ elem.Lock()
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ default:
+ return
+ }
+ }
+}
diff --git a/device/peer.go b/device/peer.go
index 40de59b..a3b428a 100644
--- a/device/peer.go
+++ b/device/peer.go
@@ -186,6 +186,8 @@ func (peer *Peer) Start() {
peer.timersStart()
+ device.flushInboundQueue(peer.queue.inbound)
+ device.flushOutboundQueue(peer.queue.outbound)
go peer.RoutineSequentialSender()
go peer.RoutineSequentialReceiver()