diff options
Diffstat (limited to 'device/pools.go')
-rw-r--r-- | device/pools.go | 113 |
1 files changed, 53 insertions, 60 deletions
diff --git a/device/pools.go b/device/pools.go index eb6d6be..f1d1fa0 100644 --- a/device/pools.go +++ b/device/pools.go @@ -5,87 +5,80 @@ package device -import "sync" +import ( + "sync" + "sync/atomic" +) -func (device *Device) PopulatePools() { - if PreallocatedBuffersPerPool == 0 { - device.pool.messageBufferPool = &sync.Pool{ - New: func() interface{} { - return new([MaxMessageSize]byte) - }, - } - device.pool.inboundElementPool = &sync.Pool{ - New: func() interface{} { - return new(QueueInboundElement) - }, - } - device.pool.outboundElementPool = &sync.Pool{ - New: func() interface{} { - return new(QueueOutboundElement) - }, - } - } else { - device.pool.messageBufferReuseChan = make(chan *[MaxMessageSize]byte, PreallocatedBuffersPerPool) - for i := 0; i < PreallocatedBuffersPerPool; i++ { - device.pool.messageBufferReuseChan <- new([MaxMessageSize]byte) - } - device.pool.inboundElementReuseChan = make(chan *QueueInboundElement, PreallocatedBuffersPerPool) - for i := 0; i < PreallocatedBuffersPerPool; i++ { - device.pool.inboundElementReuseChan <- new(QueueInboundElement) - } - device.pool.outboundElementReuseChan = make(chan *QueueOutboundElement, PreallocatedBuffersPerPool) - for i := 0; i < PreallocatedBuffersPerPool; i++ { - device.pool.outboundElementReuseChan <- new(QueueOutboundElement) +type WaitPool struct { + pool sync.Pool + cond sync.Cond + lock sync.Mutex + count uint32 + max uint32 +} + +func NewWaitPool(max uint32, new func() interface{}) *WaitPool { + p := &WaitPool{pool: sync.Pool{New: new}, max: max} + p.cond = sync.Cond{L: &p.lock} + return p +} + +func (p *WaitPool) Get() interface{} { + if p.max != 0 { + p.lock.Lock() + for atomic.LoadUint32(&p.count) >= p.max { + p.cond.Wait() } + atomic.AddUint32(&p.count, 1) + p.lock.Unlock() } + return p.pool.Get() } -func (device *Device) GetMessageBuffer() *[MaxMessageSize]byte { - if PreallocatedBuffersPerPool == 0 { - return device.pool.messageBufferPool.Get().(*[MaxMessageSize]byte) - } else { - return <-device.pool.messageBufferReuseChan +func (p *WaitPool) Put(x interface{}) { + p.pool.Put(x) + if p.max == 0 { + return } + atomic.AddUint32(&p.count, ^uint32(0)) + p.cond.Signal() +} + +func (device *Device) PopulatePools() { + device.pool.messageBuffers = NewWaitPool(PreallocatedBuffersPerPool, func() interface{} { + return new([MaxMessageSize]byte) + }) + device.pool.inboundElements = NewWaitPool(PreallocatedBuffersPerPool, func() interface{} { + return new(QueueInboundElement) + }) + device.pool.outboundElements = NewWaitPool(PreallocatedBuffersPerPool, func() interface{} { + return new(QueueOutboundElement) + }) +} + +func (device *Device) GetMessageBuffer() *[MaxMessageSize]byte { + return device.pool.messageBuffers.Get().(*[MaxMessageSize]byte) } func (device *Device) PutMessageBuffer(msg *[MaxMessageSize]byte) { - if PreallocatedBuffersPerPool == 0 { - device.pool.messageBufferPool.Put(msg) - } else { - device.pool.messageBufferReuseChan <- msg - } + device.pool.messageBuffers.Put(msg) } func (device *Device) GetInboundElement() *QueueInboundElement { - if PreallocatedBuffersPerPool == 0 { - return device.pool.inboundElementPool.Get().(*QueueInboundElement) - } else { - return <-device.pool.inboundElementReuseChan - } + return device.pool.inboundElements.Get().(*QueueInboundElement) } func (device *Device) PutInboundElement(elem *QueueInboundElement) { elem.clearPointers() - if PreallocatedBuffersPerPool == 0 { - device.pool.inboundElementPool.Put(elem) - } else { - device.pool.inboundElementReuseChan <- elem - } + device.pool.inboundElements.Put(elem) } func (device *Device) GetOutboundElement() *QueueOutboundElement { - if PreallocatedBuffersPerPool == 0 { - return device.pool.outboundElementPool.Get().(*QueueOutboundElement) - } else { - return <-device.pool.outboundElementReuseChan - } + return device.pool.outboundElements.Get().(*QueueOutboundElement) } func (device *Device) PutOutboundElement(elem *QueueOutboundElement) { elem.clearPointers() - if PreallocatedBuffersPerPool == 0 { - device.pool.outboundElementPool.Put(elem) - } else { - device.pool.outboundElementReuseChan <- elem - } + device.pool.outboundElements.Put(elem) } |