diff options
author | Michael Pratt <mpratt@google.com> | 2018-10-10 14:09:24 -0700 |
---|---|---|
committer | Shentubot <shentubot@google.com> | 2018-10-10 14:10:17 -0700 |
commit | ddb34b3690c07f6c8efe2b96f89166145c4a7d3c (patch) | |
tree | 781361c955c356d26b484f572bc4ad41a250ab72 /pkg/tcpip/transport | |
parent | b78552d30e0af4122710e01bc86cbde6bb412686 (diff) |
Enforce message size limits and avoid host calls with too many iovecs
Currently, in the face of FileMem fragmentation and a large sendmsg or
recvmsg call, host sockets may pass > 1024 iovecs to the host, which
will immediately cause the host to return EMSGSIZE.
When we detect this case, use a single intermediate buffer to pass to
the kernel, copying to/from the src/dst buffer.
To avoid creating unbounded intermediate buffers, enforce message size
checks and truncation w.r.t. the send buffer size. The same
functionality is added to netstack unix sockets for feature parity.
PiperOrigin-RevId: 216590198
Change-Id: I719a32e71c7b1098d5097f35e6daf7dd5190eff7
Diffstat (limited to 'pkg/tcpip/transport')
-rw-r--r-- | pkg/tcpip/transport/queue/queue.go | 69 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint_state.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/endpoint.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/unix/connectionless.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/unix/unix.go | 49 |
5 files changed, 100 insertions, 32 deletions
diff --git a/pkg/tcpip/transport/queue/queue.go b/pkg/tcpip/transport/queue/queue.go index eb9ee8a3f..b3d2ea68b 100644 --- a/pkg/tcpip/transport/queue/queue.go +++ b/pkg/tcpip/transport/queue/queue.go @@ -24,12 +24,23 @@ import ( "gvisor.googlesource.com/gvisor/pkg/waiter" ) -// Entry implements Linker interface and has both Length and Release methods. +// Entry implements Linker interface and has additional required methods. type Entry interface { ilist.Linker + + // Length returns the number of bytes stored in the entry. Length() int64 + + // Release releases any resources held by the entry. Release() + + // Peek returns a copy of the entry. It must be Released separately. Peek() Entry + + // Truncate reduces the number of bytes stored in the entry to n bytes. + // + // Preconditions: n <= Length(). + Truncate(n int64) } // Queue is a buffer queue. @@ -52,7 +63,7 @@ func New(ReaderQueue *waiter.Queue, WriterQueue *waiter.Queue, limit int64) *Que } // Close closes q for reading and writing. It is immediately not writable and -// will become unreadble will no more data is pending. +// will become unreadable when no more data is pending. // // Both the read and write queues must be notified after closing: // q.ReaderQueue.Notify(waiter.EventIn) @@ -86,38 +97,74 @@ func (q *Queue) IsReadable() bool { return q.closed || q.dataList.Front() != nil } +// bufWritable returns true if there is space for writing. +// +// N.B. Linux only considers a unix socket "writable" if >75% of the buffer is +// free. +// +// See net/unix/af_unix.c:unix_writeable. +func (q *Queue) bufWritable() bool { + return 4*q.used < q.limit +} + // IsWritable determines if q is currently writable. func (q *Queue) IsWritable() bool { q.mu.Lock() defer q.mu.Unlock() - return q.closed || q.used < q.limit + return q.closed || q.bufWritable() } // Enqueue adds an entry to the data queue if room is available. // +// If truncate is true, Enqueue may truncate the message beforing enqueuing it. +// Otherwise, the entire message must fit. If n < e.Length(), err indicates why. +// // If notify is true, ReaderQueue.Notify must be called: // q.ReaderQueue.Notify(waiter.EventIn) -func (q *Queue) Enqueue(e Entry) (notify bool, err *tcpip.Error) { +func (q *Queue) Enqueue(e Entry, truncate bool) (l int64, notify bool, err *tcpip.Error) { q.mu.Lock() if q.closed { q.mu.Unlock() - return false, tcpip.ErrClosedForSend + return 0, false, tcpip.ErrClosedForSend + } + + free := q.limit - q.used + + l = e.Length() + + if l > free && truncate { + if free == 0 { + // Message can't fit right now. + q.mu.Unlock() + return 0, false, tcpip.ErrWouldBlock + } + + e.Truncate(free) + l = e.Length() + err = tcpip.ErrWouldBlock + } + + if l > q.limit { + // Message is too big to ever fit. + q.mu.Unlock() + return 0, false, tcpip.ErrMessageTooLong } - if q.used >= q.limit { + if l > free { + // Message can't fit right now. q.mu.Unlock() - return false, tcpip.ErrWouldBlock + return 0, false, tcpip.ErrWouldBlock } notify = q.dataList.Front() == nil - q.used += e.Length() + q.used += l q.dataList.PushBack(e) q.mu.Unlock() - return notify, nil + return l, notify, err } // Dequeue removes the first entry in the data queue, if one exists. @@ -137,13 +184,13 @@ func (q *Queue) Dequeue() (e Entry, notify bool, err *tcpip.Error) { return nil, false, err } - notify = q.used >= q.limit + notify = !q.bufWritable() e = q.dataList.Front().(Entry) q.dataList.Remove(e) q.used -= e.Length() - notify = notify && q.used < q.limit + notify = notify && q.bufWritable() q.mu.Unlock() diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go index 6143390b3..bed7ec6a6 100644 --- a/pkg/tcpip/transport/tcp/endpoint_state.go +++ b/pkg/tcpip/transport/tcp/endpoint_state.go @@ -315,6 +315,8 @@ func loadError(s string) *tcpip.Error { tcpip.ErrNoLinkAddress, tcpip.ErrBadAddress, tcpip.ErrNetworkUnreachable, + tcpip.ErrMessageTooLong, + tcpip.ErrNoBufferSpace, } messageToError = make(map[string]*tcpip.Error) diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 6ed805357..840e95302 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -15,6 +15,7 @@ package udp import ( + "math" "sync" "gvisor.googlesource.com/gvisor/pkg/sleep" @@ -264,6 +265,11 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, <-c return 0, nil, tcpip.ErrInvalidOptionValue } + if p.Size() > math.MaxUint16 { + // Payload can't possibly fit in a packet. + return 0, nil, tcpip.ErrMessageTooLong + } + to := opts.To e.mu.RLock() diff --git a/pkg/tcpip/transport/unix/connectionless.go b/pkg/tcpip/transport/unix/connectionless.go index ebd4802b0..ae93c61d7 100644 --- a/pkg/tcpip/transport/unix/connectionless.go +++ b/pkg/tcpip/transport/unix/connectionless.go @@ -105,14 +105,12 @@ func (e *connectionlessEndpoint) SendMsg(data [][]byte, c ControlMessages, to Bo e.Lock() n, notify, err := connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)}) e.Unlock() - if err != nil { - return 0, err - } + if notify { connected.SendNotify() } - return n, nil + return n, err } // Type implements Endpoint.Type. diff --git a/pkg/tcpip/transport/unix/unix.go b/pkg/tcpip/transport/unix/unix.go index 0bb00df42..718606cd1 100644 --- a/pkg/tcpip/transport/unix/unix.go +++ b/pkg/tcpip/transport/unix/unix.go @@ -260,20 +260,28 @@ type message struct { Address tcpip.FullAddress } -// Length returns number of bytes stored in the Message. +// Length returns number of bytes stored in the message. func (m *message) Length() int64 { return int64(len(m.Data)) } -// Release releases any resources held by the Message. +// Release releases any resources held by the message. func (m *message) Release() { m.Control.Release() } +// Peek returns a copy of the message. func (m *message) Peek() queue.Entry { return &message{Data: m.Data, Control: m.Control.Clone(), Address: m.Address} } +// Truncate reduces the length of the message payload to n bytes. +// +// Preconditions: n <= m.Length(). +func (m *message) Truncate(n int64) { + m.Data.CapLength(int(n)) +} + // A Receiver can be used to receive Messages. type Receiver interface { // Recv receives a single message. This method does not block. @@ -623,23 +631,33 @@ func (e *connectedEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) // Send implements ConnectedEndpoint.Send. func (e *connectedEndpoint) Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (uintptr, bool, *tcpip.Error) { - var l int + var l int64 for _, d := range data { - l += len(d) - } - // Discard empty stream packets. Since stream sockets don't preserve - // message boundaries, sending zero bytes is a no-op. In Linux, the - // receiver actually uses a zero-length receive as an indication that the - // stream was closed. - if l == 0 && e.endpoint.Type() == SockStream { - controlMessages.Release() - return 0, false, nil + l += int64(len(d)) + } + + truncate := false + if e.endpoint.Type() == SockStream { + // Since stream sockets don't preserve message boundaries, we + // can write only as much of the message as fits in the queue. + truncate = true + + // Discard empty stream packets. Since stream sockets don't + // preserve message boundaries, sending zero bytes is a no-op. + // In Linux, the receiver actually uses a zero-length receive + // as an indication that the stream was closed. + if l == 0 { + controlMessages.Release() + return 0, false, nil + } } + v := make([]byte, 0, l) for _, d := range data { v = append(v, d...) } - notify, err := e.writeQueue.Enqueue(&message{Data: buffer.View(v), Control: controlMessages, Address: from}) + + l, notify, err := e.writeQueue.Enqueue(&message{Data: buffer.View(v), Control: controlMessages, Address: from}, truncate) return uintptr(l), notify, err } @@ -793,15 +811,12 @@ func (e *baseEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoin n, notify, err := e.connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)}) e.Unlock() - if err != nil { - return 0, err - } if notify { e.connected.SendNotify() } - return n, nil + return n, err } // SetSockOpt sets a socket option. Currently not supported. |