diff options
author | gVisor bot <gvisor-bot@google.com> | 2019-06-02 06:44:55 +0000 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2019-06-02 06:44:55 +0000 |
commit | ceb0d792f328d1fc0692197d8856a43c3936a571 (patch) | |
tree | 83155f302eff44a78bcc30a3a08f4efe59a79379 /pkg/sentry/socket/unix/transport/queue.go | |
parent | deb7ecf1e46862d54f4b102f2d163cfbcfc37f3b (diff) | |
parent | 216da0b733dbed9aad9b2ab92ac75bcb906fd7ee (diff) |
Merge 216da0b7 (automated)
Diffstat (limited to 'pkg/sentry/socket/unix/transport/queue.go')
-rw-r--r-- | pkg/sentry/socket/unix/transport/queue.go | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/pkg/sentry/socket/unix/transport/queue.go b/pkg/sentry/socket/unix/transport/queue.go new file mode 100644 index 000000000..b650caae7 --- /dev/null +++ b/pkg/sentry/socket/unix/transport/queue.go @@ -0,0 +1,210 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport + +import ( + "sync" + + "gvisor.googlesource.com/gvisor/pkg/refs" + "gvisor.googlesource.com/gvisor/pkg/syserr" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// queue is a buffer queue. +// +// +stateify savable +type queue struct { + refs.AtomicRefCount + + ReaderQueue *waiter.Queue + WriterQueue *waiter.Queue + + mu sync.Mutex `state:"nosave"` + closed bool + used int64 + limit int64 + dataList messageList +} + +// Close closes q for reading and writing. It is immediately not writable and +// will become unreadable when no more data is pending. +// +// Both the read and write queues must be notified after closing: +// q.ReaderQueue.Notify(waiter.EventIn) +// q.WriterQueue.Notify(waiter.EventOut) +func (q *queue) Close() { + q.mu.Lock() + q.closed = true + q.mu.Unlock() +} + +// Reset empties the queue and Releases all of the Entries. +// +// Both the read and write queues must be notified after resetting: +// q.ReaderQueue.Notify(waiter.EventIn) +// q.WriterQueue.Notify(waiter.EventOut) +func (q *queue) Reset() { + q.mu.Lock() + for cur := q.dataList.Front(); cur != nil; cur = cur.Next() { + cur.Release() + } + q.dataList.Reset() + q.used = 0 + q.mu.Unlock() +} + +// DecRef implements RefCounter.DecRef with destructor q.Reset. +func (q *queue) DecRef() { + q.DecRefWithDestructor(q.Reset) + // We don't need to notify after resetting because no one cares about + // this queue after all references have been dropped. +} + +// IsReadable determines if q is currently readable. +func (q *queue) IsReadable() bool { + q.mu.Lock() + defer q.mu.Unlock() + + return q.closed || q.dataList.Front() != nil +} + +// bufWritable returns true if there is space for writing. +// +// N.B. Linux only considers a unix socket "writable" if >75% of the buffer is +// free. +// +// See net/unix/af_unix.c:unix_writeable. +func (q *queue) bufWritable() bool { + return 4*q.used < q.limit +} + +// IsWritable determines if q is currently writable. +func (q *queue) IsWritable() bool { + q.mu.Lock() + defer q.mu.Unlock() + + return q.closed || q.bufWritable() +} + +// Enqueue adds an entry to the data queue if room is available. +// +// If truncate is true, Enqueue may truncate the message beforing enqueuing it. +// Otherwise, the entire message must fit. If n < e.Length(), err indicates why. +// +// If notify is true, ReaderQueue.Notify must be called: +// q.ReaderQueue.Notify(waiter.EventIn) +func (q *queue) Enqueue(e *message, truncate bool) (l int64, notify bool, err *syserr.Error) { + q.mu.Lock() + + if q.closed { + q.mu.Unlock() + return 0, false, syserr.ErrClosedForSend + } + + free := q.limit - q.used + + l = e.Length() + + if l > free && truncate { + if free == 0 { + // Message can't fit right now. + q.mu.Unlock() + return 0, false, syserr.ErrWouldBlock + } + + e.Truncate(free) + l = e.Length() + err = syserr.ErrWouldBlock + } + + if l > q.limit { + // Message is too big to ever fit. + q.mu.Unlock() + return 0, false, syserr.ErrMessageTooLong + } + + if l > free { + // Message can't fit right now. + q.mu.Unlock() + return 0, false, syserr.ErrWouldBlock + } + + notify = q.dataList.Front() == nil + q.used += l + q.dataList.PushBack(e) + + q.mu.Unlock() + + return l, notify, err +} + +// Dequeue removes the first entry in the data queue, if one exists. +// +// If notify is true, WriterQueue.Notify must be called: +// q.WriterQueue.Notify(waiter.EventOut) +func (q *queue) Dequeue() (e *message, notify bool, err *syserr.Error) { + q.mu.Lock() + + if q.dataList.Front() == nil { + err := syserr.ErrWouldBlock + if q.closed { + err = syserr.ErrClosedForReceive + } + q.mu.Unlock() + + return nil, false, err + } + + notify = !q.bufWritable() + + e = q.dataList.Front() + q.dataList.Remove(e) + q.used -= e.Length() + + notify = notify && q.bufWritable() + + q.mu.Unlock() + + return e, notify, nil +} + +// Peek returns the first entry in the data queue, if one exists. +func (q *queue) Peek() (*message, *syserr.Error) { + q.mu.Lock() + defer q.mu.Unlock() + + if q.dataList.Front() == nil { + err := syserr.ErrWouldBlock + if q.closed { + err = syserr.ErrClosedForReceive + } + return nil, err + } + + return q.dataList.Front().Peek(), nil +} + +// QueuedSize returns the number of bytes currently in the queue, that is, the +// number of readable bytes. +func (q *queue) QueuedSize() int64 { + q.mu.Lock() + defer q.mu.Unlock() + return q.used +} + +// MaxQueueSize returns the maximum number of bytes storable in the queue. +func (q *queue) MaxQueueSize() int64 { + return q.limit +} |