summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/queue/queue.go
diff options
context:
space:
mode:
authorGoogler <noreply@google.com>2018-04-27 10:37:02 -0700
committerAdin Scannell <ascannell@google.com>2018-04-28 01:44:26 -0400
commitd02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch)
tree54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/tcpip/transport/queue/queue.go
parentf70210e742919f40aa2f0934a22f1c9ba6dada62 (diff)
Check in gVisor.
PiperOrigin-RevId: 194583126 Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/tcpip/transport/queue/queue.go')
-rw-r--r--pkg/tcpip/transport/queue/queue.go166
1 files changed, 166 insertions, 0 deletions
diff --git a/pkg/tcpip/transport/queue/queue.go b/pkg/tcpip/transport/queue/queue.go
new file mode 100644
index 000000000..2d2918504
--- /dev/null
+++ b/pkg/tcpip/transport/queue/queue.go
@@ -0,0 +1,166 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package queue provides the implementation of buffer queue
+// and interface of queue entry with Length method.
+package queue
+
+import (
+ "sync"
+
+ "gvisor.googlesource.com/gvisor/pkg/ilist"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip"
+ "gvisor.googlesource.com/gvisor/pkg/waiter"
+)
+
+// Entry implements Linker interface and has both Length and Release methods.
+type Entry interface {
+ ilist.Linker
+ Length() int64
+ Release()
+ Peek() Entry
+}
+
+// Queue is a buffer queue.
+type Queue struct {
+ ReaderQueue *waiter.Queue
+ WriterQueue *waiter.Queue
+
+ mu sync.Mutex `state:"nosave"`
+ closed bool
+ used int64
+ limit int64
+ dataList ilist.List
+}
+
+// New allocates and initializes a new queue.
+func New(ReaderQueue *waiter.Queue, WriterQueue *waiter.Queue, limit int64) *Queue {
+ return &Queue{ReaderQueue: ReaderQueue, WriterQueue: WriterQueue, limit: limit}
+}
+
+// Close closes q for reading and writing. It is immediately not writable and
+// will become unreadble will no more data is pending.
+//
+// Both the read and write queues must be notified after closing:
+// q.ReaderQueue.Notify(waiter.EventIn)
+// q.WriterQueue.Notify(waiter.EventOut)
+func (q *Queue) Close() {
+ q.mu.Lock()
+ q.closed = true
+ q.mu.Unlock()
+}
+
+// Reset empties the queue and Releases all of the Entries.
+//
+// Both the read and write queues must be notified after resetting:
+// q.ReaderQueue.Notify(waiter.EventIn)
+// q.WriterQueue.Notify(waiter.EventOut)
+func (q *Queue) Reset() {
+ q.mu.Lock()
+ for cur := q.dataList.Front(); cur != nil; cur = cur.Next() {
+ cur.(Entry).Release()
+ }
+ q.dataList.Reset()
+ q.used = 0
+ q.mu.Unlock()
+}
+
+// IsReadable determines if q is currently readable.
+func (q *Queue) IsReadable() bool {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ return q.closed || q.dataList.Front() != nil
+}
+
+// IsWritable determines if q is currently writable.
+func (q *Queue) IsWritable() bool {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ return q.closed || q.used < q.limit
+}
+
+// Enqueue adds an entry to the data queue if room is available.
+//
+// If notify is true, ReaderQueue.Notify must be called:
+// q.ReaderQueue.Notify(waiter.EventIn)
+func (q *Queue) Enqueue(e Entry) (notify bool, err *tcpip.Error) {
+ q.mu.Lock()
+
+ if q.closed {
+ q.mu.Unlock()
+ return false, tcpip.ErrClosedForSend
+ }
+
+ if q.used >= q.limit {
+ q.mu.Unlock()
+ return false, tcpip.ErrWouldBlock
+ }
+
+ notify = q.dataList.Front() == nil
+ q.used += e.Length()
+ q.dataList.PushBack(e)
+
+ q.mu.Unlock()
+
+ return notify, nil
+}
+
+// Dequeue removes the first entry in the data queue, if one exists.
+//
+// If notify is true, WriterQueue.Notify must be called:
+// q.WriterQueue.Notify(waiter.EventOut)
+func (q *Queue) Dequeue() (e Entry, notify bool, err *tcpip.Error) {
+ q.mu.Lock()
+
+ if q.dataList.Front() == nil {
+ err := tcpip.ErrWouldBlock
+ if q.closed {
+ err = tcpip.ErrClosedForReceive
+ }
+ q.mu.Unlock()
+
+ return nil, false, err
+ }
+
+ notify = q.used >= q.limit
+
+ e = q.dataList.Front().(Entry)
+ q.dataList.Remove(e)
+ q.used -= e.Length()
+
+ notify = notify && q.used < q.limit
+
+ q.mu.Unlock()
+
+ return e, notify, nil
+}
+
+// Peek returns the first entry in the data queue, if one exists.
+func (q *Queue) Peek() (Entry, *tcpip.Error) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ if q.dataList.Front() == nil {
+ err := tcpip.ErrWouldBlock
+ if q.closed {
+ err = tcpip.ErrClosedForReceive
+ }
+ return nil, err
+ }
+
+ return q.dataList.Front().(Entry).Peek(), nil
+}
+
+// QueuedSize returns the number of bytes currently in the queue, that is, the
+// number of readable bytes.
+func (q *Queue) QueuedSize() int64 {
+ return q.used
+}
+
+// MaxQueueSize returns the maximum number of bytes storable in the queue.
+func (q *Queue) MaxQueueSize() int64 {
+ return q.limit
+}