From 6922eee6499212a009fdc254224f916bd1c46f29 Mon Sep 17 00:00:00 2001 From: Ian Gudger Date: Wed, 17 Oct 2018 15:09:26 -0700 Subject: Merge queue into Unix transport This queue only has a single user, so there is no need for it to use an interface. Merging it into the same package as its sole user allows us to avoid a circular dependency. This simplifies the code and should slightly improve performance. PiperOrigin-RevId: 217595889 Change-Id: Iabbd5164240b935f79933618c61581bc8dcd2822 --- pkg/sentry/socket/unix/transport/BUILD | 2 +- pkg/sentry/socket/unix/transport/connectioned.go | 9 +- pkg/sentry/socket/unix/transport/connectionless.go | 3 +- pkg/sentry/socket/unix/transport/queue.go | 206 +++++++++++++++++++ pkg/sentry/socket/unix/transport/queue/BUILD | 15 -- pkg/sentry/socket/unix/transport/queue/queue.go | 227 --------------------- pkg/sentry/socket/unix/transport/unix.go | 28 ++- 7 files changed, 224 insertions(+), 266 deletions(-) create mode 100644 pkg/sentry/socket/unix/transport/queue.go delete mode 100644 pkg/sentry/socket/unix/transport/queue/BUILD delete mode 100644 pkg/sentry/socket/unix/transport/queue/queue.go (limited to 'pkg/sentry') diff --git a/pkg/sentry/socket/unix/transport/BUILD b/pkg/sentry/socket/unix/transport/BUILD index 04ef0d438..75b5a2eb6 100644 --- a/pkg/sentry/socket/unix/transport/BUILD +++ b/pkg/sentry/socket/unix/transport/BUILD @@ -8,13 +8,13 @@ go_library( "connectioned.go", "connectioned_state.go", "connectionless.go", + "queue.go", "unix.go", ], importpath = "gvisor.googlesource.com/gvisor/pkg/sentry/socket/unix/transport", visibility = ["//:sandbox"], deps = [ "//pkg/ilist", - "//pkg/sentry/socket/unix/transport/queue", "//pkg/tcpip", "//pkg/tcpip/buffer", "//pkg/waiter", diff --git a/pkg/sentry/socket/unix/transport/connectioned.go b/pkg/sentry/socket/unix/transport/connectioned.go index f09935765..566e3d57b 100644 --- a/pkg/sentry/socket/unix/transport/connectioned.go +++ b/pkg/sentry/socket/unix/transport/connectioned.go @@ -17,7 +17,6 @@ package transport import ( "sync" - "gvisor.googlesource.com/gvisor/pkg/sentry/socket/unix/transport/queue" "gvisor.googlesource.com/gvisor/pkg/tcpip" "gvisor.googlesource.com/gvisor/pkg/waiter" ) @@ -135,8 +134,8 @@ func NewPair(stype SockType, uid UniqueIDProvider) (Endpoint, Endpoint) { stype: stype, } - q1 := queue.New(a.Queue, b.Queue, initialLimit) - q2 := queue.New(b.Queue, a.Queue, initialLimit) + q1 := newQueue(a.Queue, b.Queue, initialLimit) + q2 := newQueue(b.Queue, a.Queue, initialLimit) if stype == SockStream { a.receiver = &streamQueueReceiver{queueReceiver: queueReceiver{q1}} @@ -283,8 +282,8 @@ func (e *connectionedEndpoint) BidirectionalConnect(ce ConnectingEndpoint, retur idGenerator: e.idGenerator, stype: e.stype, } - readQueue := queue.New(ce.WaiterQueue(), ne.Queue, initialLimit) - writeQueue := queue.New(ne.Queue, ce.WaiterQueue(), initialLimit) + readQueue := newQueue(ce.WaiterQueue(), ne.Queue, initialLimit) + writeQueue := newQueue(ne.Queue, ce.WaiterQueue(), initialLimit) ne.connected = &connectedEndpoint{ endpoint: ce, writeQueue: readQueue, diff --git a/pkg/sentry/socket/unix/transport/connectionless.go b/pkg/sentry/socket/unix/transport/connectionless.go index fb2728010..86cd05199 100644 --- a/pkg/sentry/socket/unix/transport/connectionless.go +++ b/pkg/sentry/socket/unix/transport/connectionless.go @@ -15,7 +15,6 @@ package transport import ( - "gvisor.googlesource.com/gvisor/pkg/sentry/socket/unix/transport/queue" "gvisor.googlesource.com/gvisor/pkg/tcpip" "gvisor.googlesource.com/gvisor/pkg/waiter" ) @@ -34,7 +33,7 @@ type connectionlessEndpoint struct { // NewConnectionless creates a new unbound dgram endpoint. func NewConnectionless() Endpoint { ep := &connectionlessEndpoint{baseEndpoint{Queue: &waiter.Queue{}}} - ep.receiver = &queueReceiver{readQueue: queue.New(&waiter.Queue{}, ep.Queue, initialLimit)} + ep.receiver = &queueReceiver{readQueue: newQueue(&waiter.Queue{}, ep.Queue, initialLimit)} return ep } diff --git a/pkg/sentry/socket/unix/transport/queue.go b/pkg/sentry/socket/unix/transport/queue.go new file mode 100644 index 000000000..203e31333 --- /dev/null +++ b/pkg/sentry/socket/unix/transport/queue.go @@ -0,0 +1,206 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport + +import ( + "sync" + + "gvisor.googlesource.com/gvisor/pkg/ilist" + "gvisor.googlesource.com/gvisor/pkg/tcpip" + "gvisor.googlesource.com/gvisor/pkg/waiter" +) + +// queue is a buffer queue. +// +// +stateify savable +type queue struct { + ReaderQueue *waiter.Queue + WriterQueue *waiter.Queue + + mu sync.Mutex `state:"nosave"` + closed bool + used int64 + limit int64 + dataList ilist.List +} + +// newQueue allocates and initializes a new queue. +func newQueue(ReaderQueue *waiter.Queue, WriterQueue *waiter.Queue, limit int64) *queue { + return &queue{ReaderQueue: ReaderQueue, WriterQueue: WriterQueue, limit: limit} +} + +// Close closes q for reading and writing. It is immediately not writable and +// will become unreadable when no more data is pending. +// +// Both the read and write queues must be notified after closing: +// q.ReaderQueue.Notify(waiter.EventIn) +// q.WriterQueue.Notify(waiter.EventOut) +func (q *queue) Close() { + q.mu.Lock() + q.closed = true + q.mu.Unlock() +} + +// Reset empties the queue and Releases all of the Entries. +// +// Both the read and write queues must be notified after resetting: +// q.ReaderQueue.Notify(waiter.EventIn) +// q.WriterQueue.Notify(waiter.EventOut) +func (q *queue) Reset() { + q.mu.Lock() + for cur := q.dataList.Front(); cur != nil; cur = cur.Next() { + cur.(*message).Release() + } + q.dataList.Reset() + q.used = 0 + q.mu.Unlock() +} + +// IsReadable determines if q is currently readable. +func (q *queue) IsReadable() bool { + q.mu.Lock() + defer q.mu.Unlock() + + return q.closed || q.dataList.Front() != nil +} + +// bufWritable returns true if there is space for writing. +// +// N.B. Linux only considers a unix socket "writable" if >75% of the buffer is +// free. +// +// See net/unix/af_unix.c:unix_writeable. +func (q *queue) bufWritable() bool { + return 4*q.used < q.limit +} + +// IsWritable determines if q is currently writable. +func (q *queue) IsWritable() bool { + q.mu.Lock() + defer q.mu.Unlock() + + return q.closed || q.bufWritable() +} + +// Enqueue adds an entry to the data queue if room is available. +// +// If truncate is true, Enqueue may truncate the message beforing enqueuing it. +// Otherwise, the entire message must fit. If n < e.Length(), err indicates why. +// +// If notify is true, ReaderQueue.Notify must be called: +// q.ReaderQueue.Notify(waiter.EventIn) +func (q *queue) Enqueue(e *message, truncate bool) (l int64, notify bool, err *tcpip.Error) { + q.mu.Lock() + + if q.closed { + q.mu.Unlock() + return 0, false, tcpip.ErrClosedForSend + } + + free := q.limit - q.used + + l = e.Length() + + if l > free && truncate { + if free == 0 { + // Message can't fit right now. + q.mu.Unlock() + return 0, false, tcpip.ErrWouldBlock + } + + e.Truncate(free) + l = e.Length() + err = tcpip.ErrWouldBlock + } + + if l > q.limit { + // Message is too big to ever fit. + q.mu.Unlock() + return 0, false, tcpip.ErrMessageTooLong + } + + if l > free { + // Message can't fit right now. + q.mu.Unlock() + return 0, false, tcpip.ErrWouldBlock + } + + notify = q.dataList.Front() == nil + q.used += l + q.dataList.PushBack(e) + + q.mu.Unlock() + + return l, notify, err +} + +// Dequeue removes the first entry in the data queue, if one exists. +// +// If notify is true, WriterQueue.Notify must be called: +// q.WriterQueue.Notify(waiter.EventOut) +func (q *queue) Dequeue() (e *message, notify bool, err *tcpip.Error) { + q.mu.Lock() + + if q.dataList.Front() == nil { + err := tcpip.ErrWouldBlock + if q.closed { + err = tcpip.ErrClosedForReceive + } + q.mu.Unlock() + + return nil, false, err + } + + notify = !q.bufWritable() + + e = q.dataList.Front().(*message) + q.dataList.Remove(e) + q.used -= e.Length() + + notify = notify && q.bufWritable() + + q.mu.Unlock() + + return e, notify, nil +} + +// Peek returns the first entry in the data queue, if one exists. +func (q *queue) Peek() (*message, *tcpip.Error) { + q.mu.Lock() + defer q.mu.Unlock() + + if q.dataList.Front() == nil { + err := tcpip.ErrWouldBlock + if q.closed { + err = tcpip.ErrClosedForReceive + } + return nil, err + } + + return q.dataList.Front().(*message).Peek(), nil +} + +// QueuedSize returns the number of bytes currently in the queue, that is, the +// number of readable bytes. +func (q *queue) QueuedSize() int64 { + q.mu.Lock() + defer q.mu.Unlock() + return q.used +} + +// MaxQueueSize returns the maximum number of bytes storable in the queue. +func (q *queue) MaxQueueSize() int64 { + return q.limit +} diff --git a/pkg/sentry/socket/unix/transport/queue/BUILD b/pkg/sentry/socket/unix/transport/queue/BUILD deleted file mode 100644 index d914ecc23..000000000 --- a/pkg/sentry/socket/unix/transport/queue/BUILD +++ /dev/null @@ -1,15 +0,0 @@ -package(licenses = ["notice"]) # Apache 2.0 - -load("//tools/go_stateify:defs.bzl", "go_library") - -go_library( - name = "queue", - srcs = ["queue.go"], - importpath = "gvisor.googlesource.com/gvisor/pkg/sentry/socket/unix/transport/queue", - visibility = ["//:sandbox"], - deps = [ - "//pkg/ilist", - "//pkg/tcpip", - "//pkg/waiter", - ], -) diff --git a/pkg/sentry/socket/unix/transport/queue/queue.go b/pkg/sentry/socket/unix/transport/queue/queue.go deleted file mode 100644 index b3d2ea68b..000000000 --- a/pkg/sentry/socket/unix/transport/queue/queue.go +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2018 Google Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package queue provides the implementation of buffer queue -// and interface of queue entry with Length method. -package queue - -import ( - "sync" - - "gvisor.googlesource.com/gvisor/pkg/ilist" - "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/waiter" -) - -// Entry implements Linker interface and has additional required methods. -type Entry interface { - ilist.Linker - - // Length returns the number of bytes stored in the entry. - Length() int64 - - // Release releases any resources held by the entry. - Release() - - // Peek returns a copy of the entry. It must be Released separately. - Peek() Entry - - // Truncate reduces the number of bytes stored in the entry to n bytes. - // - // Preconditions: n <= Length(). - Truncate(n int64) -} - -// Queue is a buffer queue. -// -// +stateify savable -type Queue struct { - ReaderQueue *waiter.Queue - WriterQueue *waiter.Queue - - mu sync.Mutex `state:"nosave"` - closed bool - used int64 - limit int64 - dataList ilist.List -} - -// New allocates and initializes a new queue. -func New(ReaderQueue *waiter.Queue, WriterQueue *waiter.Queue, limit int64) *Queue { - return &Queue{ReaderQueue: ReaderQueue, WriterQueue: WriterQueue, limit: limit} -} - -// Close closes q for reading and writing. It is immediately not writable and -// will become unreadable when no more data is pending. -// -// Both the read and write queues must be notified after closing: -// q.ReaderQueue.Notify(waiter.EventIn) -// q.WriterQueue.Notify(waiter.EventOut) -func (q *Queue) Close() { - q.mu.Lock() - q.closed = true - q.mu.Unlock() -} - -// Reset empties the queue and Releases all of the Entries. -// -// Both the read and write queues must be notified after resetting: -// q.ReaderQueue.Notify(waiter.EventIn) -// q.WriterQueue.Notify(waiter.EventOut) -func (q *Queue) Reset() { - q.mu.Lock() - for cur := q.dataList.Front(); cur != nil; cur = cur.Next() { - cur.(Entry).Release() - } - q.dataList.Reset() - q.used = 0 - q.mu.Unlock() -} - -// IsReadable determines if q is currently readable. -func (q *Queue) IsReadable() bool { - q.mu.Lock() - defer q.mu.Unlock() - - return q.closed || q.dataList.Front() != nil -} - -// bufWritable returns true if there is space for writing. -// -// N.B. Linux only considers a unix socket "writable" if >75% of the buffer is -// free. -// -// See net/unix/af_unix.c:unix_writeable. -func (q *Queue) bufWritable() bool { - return 4*q.used < q.limit -} - -// IsWritable determines if q is currently writable. -func (q *Queue) IsWritable() bool { - q.mu.Lock() - defer q.mu.Unlock() - - return q.closed || q.bufWritable() -} - -// Enqueue adds an entry to the data queue if room is available. -// -// If truncate is true, Enqueue may truncate the message beforing enqueuing it. -// Otherwise, the entire message must fit. If n < e.Length(), err indicates why. -// -// If notify is true, ReaderQueue.Notify must be called: -// q.ReaderQueue.Notify(waiter.EventIn) -func (q *Queue) Enqueue(e Entry, truncate bool) (l int64, notify bool, err *tcpip.Error) { - q.mu.Lock() - - if q.closed { - q.mu.Unlock() - return 0, false, tcpip.ErrClosedForSend - } - - free := q.limit - q.used - - l = e.Length() - - if l > free && truncate { - if free == 0 { - // Message can't fit right now. - q.mu.Unlock() - return 0, false, tcpip.ErrWouldBlock - } - - e.Truncate(free) - l = e.Length() - err = tcpip.ErrWouldBlock - } - - if l > q.limit { - // Message is too big to ever fit. - q.mu.Unlock() - return 0, false, tcpip.ErrMessageTooLong - } - - if l > free { - // Message can't fit right now. - q.mu.Unlock() - return 0, false, tcpip.ErrWouldBlock - } - - notify = q.dataList.Front() == nil - q.used += l - q.dataList.PushBack(e) - - q.mu.Unlock() - - return l, notify, err -} - -// Dequeue removes the first entry in the data queue, if one exists. -// -// If notify is true, WriterQueue.Notify must be called: -// q.WriterQueue.Notify(waiter.EventOut) -func (q *Queue) Dequeue() (e Entry, notify bool, err *tcpip.Error) { - q.mu.Lock() - - if q.dataList.Front() == nil { - err := tcpip.ErrWouldBlock - if q.closed { - err = tcpip.ErrClosedForReceive - } - q.mu.Unlock() - - return nil, false, err - } - - notify = !q.bufWritable() - - e = q.dataList.Front().(Entry) - q.dataList.Remove(e) - q.used -= e.Length() - - notify = notify && q.bufWritable() - - q.mu.Unlock() - - return e, notify, nil -} - -// Peek returns the first entry in the data queue, if one exists. -func (q *Queue) Peek() (Entry, *tcpip.Error) { - q.mu.Lock() - defer q.mu.Unlock() - - if q.dataList.Front() == nil { - err := tcpip.ErrWouldBlock - if q.closed { - err = tcpip.ErrClosedForReceive - } - return nil, err - } - - return q.dataList.Front().(Entry).Peek(), nil -} - -// QueuedSize returns the number of bytes currently in the queue, that is, the -// number of readable bytes. -func (q *Queue) QueuedSize() int64 { - q.mu.Lock() - defer q.mu.Unlock() - return q.used -} - -// MaxQueueSize returns the maximum number of bytes storable in the queue. -func (q *Queue) MaxQueueSize() int64 { - return q.limit -} diff --git a/pkg/sentry/socket/unix/transport/unix.go b/pkg/sentry/socket/unix/transport/unix.go index 577aa87d5..9a0de9a06 100644 --- a/pkg/sentry/socket/unix/transport/unix.go +++ b/pkg/sentry/socket/unix/transport/unix.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "gvisor.googlesource.com/gvisor/pkg/ilist" - "gvisor.googlesource.com/gvisor/pkg/sentry/socket/unix/transport/queue" "gvisor.googlesource.com/gvisor/pkg/tcpip" "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" "gvisor.googlesource.com/gvisor/pkg/waiter" @@ -271,7 +270,7 @@ func (m *message) Release() { } // Peek returns a copy of the message. -func (m *message) Peek() queue.Entry { +func (m *message) Peek() *message { return &message{Data: m.Data, Control: m.Control.Clone(), Address: m.Address} } @@ -325,12 +324,12 @@ type Receiver interface { // // +stateify savable type queueReceiver struct { - readQueue *queue.Queue + readQueue *queue } // Recv implements Receiver.Recv. func (q *queueReceiver) Recv(data [][]byte, creds bool, numRights uintptr, peek bool) (uintptr, uintptr, ControlMessages, tcpip.FullAddress, bool, *tcpip.Error) { - var m queue.Entry + var m *message var notify bool var err *tcpip.Error if peek { @@ -341,15 +340,14 @@ func (q *queueReceiver) Recv(data [][]byte, creds bool, numRights uintptr, peek if err != nil { return 0, 0, ControlMessages{}, tcpip.FullAddress{}, false, err } - msg := m.(*message) - src := []byte(msg.Data) + src := []byte(m.Data) var copied uintptr for i := 0; i < len(data) && len(src) > 0; i++ { n := copy(data[i], src) copied += uintptr(n) src = src[n:] } - return copied, uintptr(len(msg.Data)), msg.Control, msg.Address, notify, nil + return copied, uintptr(len(m.Data)), m.Control, m.Address, notify, nil } // RecvNotify implements Receiver.RecvNotify. @@ -456,10 +454,9 @@ func (q *streamQueueReceiver) Recv(data [][]byte, wantCreds bool, numRights uint return 0, 0, ControlMessages{}, tcpip.FullAddress{}, false, err } notify = n - msg := m.(*message) - q.buffer = []byte(msg.Data) - q.control = msg.Control - q.addr = msg.Address + q.buffer = []byte(m.Data) + q.control = m.Control + q.addr = m.Address } var copied uintptr @@ -506,10 +503,9 @@ func (q *streamQueueReceiver) Recv(data [][]byte, wantCreds bool, numRights uint break } notify = notify || n - msg := m.(*message) - q.buffer = []byte(msg.Data) - q.control = msg.Control - q.addr = msg.Address + q.buffer = []byte(m.Data) + q.control = m.Control + q.addr = m.Address if wantCreds { if (q.control.Credentials == nil) != (c.Credentials == nil) { @@ -619,7 +615,7 @@ type connectedEndpoint struct { Type() SockType } - writeQueue *queue.Queue + writeQueue *queue } // Passcred implements ConnectedEndpoint.Passcred. -- cgit v1.2.3