summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/queue/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/transport/queue/queue.go')
-rw-r--r--pkg/tcpip/transport/queue/queue.go69
1 files changed, 58 insertions, 11 deletions
diff --git a/pkg/tcpip/transport/queue/queue.go b/pkg/tcpip/transport/queue/queue.go
index eb9ee8a3f..b3d2ea68b 100644
--- a/pkg/tcpip/transport/queue/queue.go
+++ b/pkg/tcpip/transport/queue/queue.go
@@ -24,12 +24,23 @@ import (
"gvisor.googlesource.com/gvisor/pkg/waiter"
)
-// Entry implements Linker interface and has both Length and Release methods.
+// Entry implements Linker interface and has additional required methods.
type Entry interface {
ilist.Linker
+
+ // Length returns the number of bytes stored in the entry.
Length() int64
+
+ // Release releases any resources held by the entry.
Release()
+
+ // Peek returns a copy of the entry. It must be Released separately.
Peek() Entry
+
+ // Truncate reduces the number of bytes stored in the entry to n bytes.
+ //
+ // Preconditions: n <= Length().
+ Truncate(n int64)
}
// Queue is a buffer queue.
@@ -52,7 +63,7 @@ func New(ReaderQueue *waiter.Queue, WriterQueue *waiter.Queue, limit int64) *Que
}
// Close closes q for reading and writing. It is immediately not writable and
-// will become unreadble will no more data is pending.
+// will become unreadable when no more data is pending.
//
// Both the read and write queues must be notified after closing:
// q.ReaderQueue.Notify(waiter.EventIn)
@@ -86,38 +97,74 @@ func (q *Queue) IsReadable() bool {
return q.closed || q.dataList.Front() != nil
}
+// bufWritable returns true if there is space for writing.
+//
+// N.B. Linux only considers a unix socket "writable" if >75% of the buffer is
+// free.
+//
+// See net/unix/af_unix.c:unix_writeable.
+func (q *Queue) bufWritable() bool {
+ return 4*q.used < q.limit
+}
+
// IsWritable determines if q is currently writable.
func (q *Queue) IsWritable() bool {
q.mu.Lock()
defer q.mu.Unlock()
- return q.closed || q.used < q.limit
+ return q.closed || q.bufWritable()
}
// Enqueue adds an entry to the data queue if room is available.
//
+// If truncate is true, Enqueue may truncate the message beforing enqueuing it.
+// Otherwise, the entire message must fit. If n < e.Length(), err indicates why.
+//
// If notify is true, ReaderQueue.Notify must be called:
// q.ReaderQueue.Notify(waiter.EventIn)
-func (q *Queue) Enqueue(e Entry) (notify bool, err *tcpip.Error) {
+func (q *Queue) Enqueue(e Entry, truncate bool) (l int64, notify bool, err *tcpip.Error) {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
- return false, tcpip.ErrClosedForSend
+ return 0, false, tcpip.ErrClosedForSend
+ }
+
+ free := q.limit - q.used
+
+ l = e.Length()
+
+ if l > free && truncate {
+ if free == 0 {
+ // Message can't fit right now.
+ q.mu.Unlock()
+ return 0, false, tcpip.ErrWouldBlock
+ }
+
+ e.Truncate(free)
+ l = e.Length()
+ err = tcpip.ErrWouldBlock
+ }
+
+ if l > q.limit {
+ // Message is too big to ever fit.
+ q.mu.Unlock()
+ return 0, false, tcpip.ErrMessageTooLong
}
- if q.used >= q.limit {
+ if l > free {
+ // Message can't fit right now.
q.mu.Unlock()
- return false, tcpip.ErrWouldBlock
+ return 0, false, tcpip.ErrWouldBlock
}
notify = q.dataList.Front() == nil
- q.used += e.Length()
+ q.used += l
q.dataList.PushBack(e)
q.mu.Unlock()
- return notify, nil
+ return l, notify, err
}
// Dequeue removes the first entry in the data queue, if one exists.
@@ -137,13 +184,13 @@ func (q *Queue) Dequeue() (e Entry, notify bool, err *tcpip.Error) {
return nil, false, err
}
- notify = q.used >= q.limit
+ notify = !q.bufWritable()
e = q.dataList.Front().(Entry)
q.dataList.Remove(e)
q.used -= e.Length()
- notify = notify && q.used < q.limit
+ notify = notify && q.bufWritable()
q.mu.Unlock()