diff options
author | Googler <noreply@google.com> | 2018-04-27 10:37:02 -0700 |
---|---|---|
committer | Adin Scannell <ascannell@google.com> | 2018-04-28 01:44:26 -0400 |
commit | d02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch) | |
tree | 54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/tcpip/transport/queue | |
parent | f70210e742919f40aa2f0934a22f1c9ba6dada62 (diff) |
Check in gVisor.
PiperOrigin-RevId: 194583126
Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/tcpip/transport/queue')
-rw-r--r-- | pkg/tcpip/transport/queue/BUILD | 29 | ||||
-rw-r--r-- | pkg/tcpip/transport/queue/queue.go | 166 |
2 files changed, 195 insertions, 0 deletions
diff --git a/pkg/tcpip/transport/queue/BUILD b/pkg/tcpip/transport/queue/BUILD new file mode 100644 index 000000000..162af574c --- /dev/null +++ b/pkg/tcpip/transport/queue/BUILD @@ -0,0 +1,29 @@ +package(licenses = ["notice"]) # BSD + +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("//tools/go_stateify:defs.bzl", "go_stateify") + +go_stateify( + name = "queue_state", + srcs = [ + "queue.go", + ], + out = "queue_state.go", + package = "queue", +) + +go_library( + name = "queue", + srcs = [ + "queue.go", + "queue_state.go", + ], + importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/transport/queue", + visibility = ["//:sandbox"], + deps = [ + "//pkg/ilist", + "//pkg/state", + "//pkg/tcpip", + "//pkg/waiter", + ], +) diff --git a/pkg/tcpip/transport/queue/queue.go b/pkg/tcpip/transport/queue/queue.go new file mode 100644 index 000000000..2d2918504 --- /dev/null +++ b/pkg/tcpip/transport/queue/queue.go @@ -0,0 +1,166 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package queue provides the implementation of buffer queue +// and interface of queue entry with Length method. +package queue + +import ( + "sync" + + "gvisor.googlesource.com/gvisor/pkg/ilist" + "gvisor.googlesource.com/gvisor/pkg/tcpip" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// Entry implements Linker interface and has both Length and Release methods. +type Entry interface { + ilist.Linker + Length() int64 + Release() + Peek() Entry +} + +// Queue is a buffer queue. +type Queue struct { + ReaderQueue *waiter.Queue + WriterQueue *waiter.Queue + + mu sync.Mutex `state:"nosave"` + closed bool + used int64 + limit int64 + dataList ilist.List +} + +// New allocates and initializes a new queue. +func New(ReaderQueue *waiter.Queue, WriterQueue *waiter.Queue, limit int64) *Queue { + return &Queue{ReaderQueue: ReaderQueue, WriterQueue: WriterQueue, limit: limit} +} + +// Close closes q for reading and writing. It is immediately not writable and +// will become unreadble will no more data is pending. +// +// Both the read and write queues must be notified after closing: +// q.ReaderQueue.Notify(waiter.EventIn) +// q.WriterQueue.Notify(waiter.EventOut) +func (q *Queue) Close() { + q.mu.Lock() + q.closed = true + q.mu.Unlock() +} + +// Reset empties the queue and Releases all of the Entries. +// +// Both the read and write queues must be notified after resetting: +// q.ReaderQueue.Notify(waiter.EventIn) +// q.WriterQueue.Notify(waiter.EventOut) +func (q *Queue) Reset() { + q.mu.Lock() + for cur := q.dataList.Front(); cur != nil; cur = cur.Next() { + cur.(Entry).Release() + } + q.dataList.Reset() + q.used = 0 + q.mu.Unlock() +} + +// IsReadable determines if q is currently readable. +func (q *Queue) IsReadable() bool { + q.mu.Lock() + defer q.mu.Unlock() + + return q.closed || q.dataList.Front() != nil +} + +// IsWritable determines if q is currently writable. +func (q *Queue) IsWritable() bool { + q.mu.Lock() + defer q.mu.Unlock() + + return q.closed || q.used < q.limit +} + +// Enqueue adds an entry to the data queue if room is available. +// +// If notify is true, ReaderQueue.Notify must be called: +// q.ReaderQueue.Notify(waiter.EventIn) +func (q *Queue) Enqueue(e Entry) (notify bool, err *tcpip.Error) { + q.mu.Lock() + + if q.closed { + q.mu.Unlock() + return false, tcpip.ErrClosedForSend + } + + if q.used >= q.limit { + q.mu.Unlock() + return false, tcpip.ErrWouldBlock + } + + notify = q.dataList.Front() == nil + q.used += e.Length() + q.dataList.PushBack(e) + + q.mu.Unlock() + + return notify, nil +} + +// Dequeue removes the first entry in the data queue, if one exists. +// +// If notify is true, WriterQueue.Notify must be called: +// q.WriterQueue.Notify(waiter.EventOut) +func (q *Queue) Dequeue() (e Entry, notify bool, err *tcpip.Error) { + q.mu.Lock() + + if q.dataList.Front() == nil { + err := tcpip.ErrWouldBlock + if q.closed { + err = tcpip.ErrClosedForReceive + } + q.mu.Unlock() + + return nil, false, err + } + + notify = q.used >= q.limit + + e = q.dataList.Front().(Entry) + q.dataList.Remove(e) + q.used -= e.Length() + + notify = notify && q.used < q.limit + + q.mu.Unlock() + + return e, notify, nil +} + +// Peek returns the first entry in the data queue, if one exists. +func (q *Queue) Peek() (Entry, *tcpip.Error) { + q.mu.Lock() + defer q.mu.Unlock() + + if q.dataList.Front() == nil { + err := tcpip.ErrWouldBlock + if q.closed { + err = tcpip.ErrClosedForReceive + } + return nil, err + } + + return q.dataList.Front().(Entry).Peek(), nil +} + +// QueuedSize returns the number of bytes currently in the queue, that is, the +// number of readable bytes. +func (q *Queue) QueuedSize() int64 { + return q.used +} + +// MaxQueueSize returns the maximum number of bytes storable in the queue. +func (q *Queue) MaxQueueSize() int64 { + return q.limit +} |