diff options
Diffstat (limited to 'pkg/tcpip')
-rw-r--r-- | pkg/tcpip/link/rawfile/errors.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/tcpip.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/queue/queue.go | 69 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint_state.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/endpoint.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/unix/connectionless.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/unix/unix.go | 49 |
7 files changed, 104 insertions, 32 deletions
diff --git a/pkg/tcpip/link/rawfile/errors.go b/pkg/tcpip/link/rawfile/errors.go index 7f213793e..de7593d9c 100644 --- a/pkg/tcpip/link/rawfile/errors.go +++ b/pkg/tcpip/link/rawfile/errors.go @@ -41,6 +41,8 @@ var translations = map[syscall.Errno]*tcpip.Error{ syscall.ENOTCONN: tcpip.ErrNotConnected, syscall.ECONNRESET: tcpip.ErrConnectionReset, syscall.ECONNABORTED: tcpip.ErrConnectionAborted, + syscall.EMSGSIZE: tcpip.ErrMessageTooLong, + syscall.ENOBUFS: tcpip.ErrNoBufferSpace, } // TranslateErrno translate an errno from the syscall package into a diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go index f5b5ec86b..cef27948c 100644 --- a/pkg/tcpip/tcpip.go +++ b/pkg/tcpip/tcpip.go @@ -98,6 +98,8 @@ var ( ErrNoLinkAddress = &Error{msg: "no remote link address"} ErrBadAddress = &Error{msg: "bad address"} ErrNetworkUnreachable = &Error{msg: "network is unreachable"} + ErrMessageTooLong = &Error{msg: "message too long"} + ErrNoBufferSpace = &Error{msg: "no buffer space available"} ) // Errors related to Subnet diff --git a/pkg/tcpip/transport/queue/queue.go b/pkg/tcpip/transport/queue/queue.go index eb9ee8a3f..b3d2ea68b 100644 --- a/pkg/tcpip/transport/queue/queue.go +++ b/pkg/tcpip/transport/queue/queue.go @@ -24,12 +24,23 @@ import ( "gvisor.googlesource.com/gvisor/pkg/waiter" ) -// Entry implements Linker interface and has both Length and Release methods. +// Entry implements Linker interface and has additional required methods. type Entry interface { ilist.Linker + + // Length returns the number of bytes stored in the entry. Length() int64 + + // Release releases any resources held by the entry. Release() + + // Peek returns a copy of the entry. It must be Released separately. Peek() Entry + + // Truncate reduces the number of bytes stored in the entry to n bytes. + // + // Preconditions: n <= Length(). + Truncate(n int64) } // Queue is a buffer queue. @@ -52,7 +63,7 @@ func New(ReaderQueue *waiter.Queue, WriterQueue *waiter.Queue, limit int64) *Que } // Close closes q for reading and writing. It is immediately not writable and -// will become unreadble will no more data is pending. +// will become unreadable when no more data is pending. // // Both the read and write queues must be notified after closing: // q.ReaderQueue.Notify(waiter.EventIn) @@ -86,38 +97,74 @@ func (q *Queue) IsReadable() bool { return q.closed || q.dataList.Front() != nil } +// bufWritable returns true if there is space for writing. +// +// N.B. Linux only considers a unix socket "writable" if >75% of the buffer is +// free. +// +// See net/unix/af_unix.c:unix_writeable. +func (q *Queue) bufWritable() bool { + return 4*q.used < q.limit +} + // IsWritable determines if q is currently writable. func (q *Queue) IsWritable() bool { q.mu.Lock() defer q.mu.Unlock() - return q.closed || q.used < q.limit + return q.closed || q.bufWritable() } // Enqueue adds an entry to the data queue if room is available. // +// If truncate is true, Enqueue may truncate the message beforing enqueuing it. +// Otherwise, the entire message must fit. If n < e.Length(), err indicates why. +// // If notify is true, ReaderQueue.Notify must be called: // q.ReaderQueue.Notify(waiter.EventIn) -func (q *Queue) Enqueue(e Entry) (notify bool, err *tcpip.Error) { +func (q *Queue) Enqueue(e Entry, truncate bool) (l int64, notify bool, err *tcpip.Error) { q.mu.Lock() if q.closed { q.mu.Unlock() - return false, tcpip.ErrClosedForSend + return 0, false, tcpip.ErrClosedForSend + } + + free := q.limit - q.used + + l = e.Length() + + if l > free && truncate { + if free == 0 { + // Message can't fit right now. + q.mu.Unlock() + return 0, false, tcpip.ErrWouldBlock + } + + e.Truncate(free) + l = e.Length() + err = tcpip.ErrWouldBlock + } + + if l > q.limit { + // Message is too big to ever fit. + q.mu.Unlock() + return 0, false, tcpip.ErrMessageTooLong } - if q.used >= q.limit { + if l > free { + // Message can't fit right now. q.mu.Unlock() - return false, tcpip.ErrWouldBlock + return 0, false, tcpip.ErrWouldBlock } notify = q.dataList.Front() == nil - q.used += e.Length() + q.used += l q.dataList.PushBack(e) q.mu.Unlock() - return notify, nil + return l, notify, err } // Dequeue removes the first entry in the data queue, if one exists. @@ -137,13 +184,13 @@ func (q *Queue) Dequeue() (e Entry, notify bool, err *tcpip.Error) { return nil, false, err } - notify = q.used >= q.limit + notify = !q.bufWritable() e = q.dataList.Front().(Entry) q.dataList.Remove(e) q.used -= e.Length() - notify = notify && q.used < q.limit + notify = notify && q.bufWritable() q.mu.Unlock() diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go index 6143390b3..bed7ec6a6 100644 --- a/pkg/tcpip/transport/tcp/endpoint_state.go +++ b/pkg/tcpip/transport/tcp/endpoint_state.go @@ -315,6 +315,8 @@ func loadError(s string) *tcpip.Error { tcpip.ErrNoLinkAddress, tcpip.ErrBadAddress, tcpip.ErrNetworkUnreachable, + tcpip.ErrMessageTooLong, + tcpip.ErrNoBufferSpace, } messageToError = make(map[string]*tcpip.Error) diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 6ed805357..840e95302 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -15,6 +15,7 @@ package udp import ( + "math" "sync" "gvisor.googlesource.com/gvisor/pkg/sleep" @@ -264,6 +265,11 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, <-c return 0, nil, tcpip.ErrInvalidOptionValue } + if p.Size() > math.MaxUint16 { + // Payload can't possibly fit in a packet. + return 0, nil, tcpip.ErrMessageTooLong + } + to := opts.To e.mu.RLock() diff --git a/pkg/tcpip/transport/unix/connectionless.go b/pkg/tcpip/transport/unix/connectionless.go index ebd4802b0..ae93c61d7 100644 --- a/pkg/tcpip/transport/unix/connectionless.go +++ b/pkg/tcpip/transport/unix/connectionless.go @@ -105,14 +105,12 @@ func (e *connectionlessEndpoint) SendMsg(data [][]byte, c ControlMessages, to Bo e.Lock() n, notify, err := connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)}) e.Unlock() - if err != nil { - return 0, err - } + if notify { connected.SendNotify() } - return n, nil + return n, err } // Type implements Endpoint.Type. diff --git a/pkg/tcpip/transport/unix/unix.go b/pkg/tcpip/transport/unix/unix.go index 0bb00df42..718606cd1 100644 --- a/pkg/tcpip/transport/unix/unix.go +++ b/pkg/tcpip/transport/unix/unix.go @@ -260,20 +260,28 @@ type message struct { Address tcpip.FullAddress } -// Length returns number of bytes stored in the Message. +// Length returns number of bytes stored in the message. func (m *message) Length() int64 { return int64(len(m.Data)) } -// Release releases any resources held by the Message. +// Release releases any resources held by the message. func (m *message) Release() { m.Control.Release() } +// Peek returns a copy of the message. func (m *message) Peek() queue.Entry { return &message{Data: m.Data, Control: m.Control.Clone(), Address: m.Address} } +// Truncate reduces the length of the message payload to n bytes. +// +// Preconditions: n <= m.Length(). +func (m *message) Truncate(n int64) { + m.Data.CapLength(int(n)) +} + // A Receiver can be used to receive Messages. type Receiver interface { // Recv receives a single message. This method does not block. @@ -623,23 +631,33 @@ func (e *connectedEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) // Send implements ConnectedEndpoint.Send. func (e *connectedEndpoint) Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (uintptr, bool, *tcpip.Error) { - var l int + var l int64 for _, d := range data { - l += len(d) - } - // Discard empty stream packets. Since stream sockets don't preserve - // message boundaries, sending zero bytes is a no-op. In Linux, the - // receiver actually uses a zero-length receive as an indication that the - // stream was closed. - if l == 0 && e.endpoint.Type() == SockStream { - controlMessages.Release() - return 0, false, nil + l += int64(len(d)) + } + + truncate := false + if e.endpoint.Type() == SockStream { + // Since stream sockets don't preserve message boundaries, we + // can write only as much of the message as fits in the queue. + truncate = true + + // Discard empty stream packets. Since stream sockets don't + // preserve message boundaries, sending zero bytes is a no-op. + // In Linux, the receiver actually uses a zero-length receive + // as an indication that the stream was closed. + if l == 0 { + controlMessages.Release() + return 0, false, nil + } } + v := make([]byte, 0, l) for _, d := range data { v = append(v, d...) } - notify, err := e.writeQueue.Enqueue(&message{Data: buffer.View(v), Control: controlMessages, Address: from}) + + l, notify, err := e.writeQueue.Enqueue(&message{Data: buffer.View(v), Control: controlMessages, Address: from}, truncate) return uintptr(l), notify, err } @@ -793,15 +811,12 @@ func (e *baseEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoin n, notify, err := e.connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)}) e.Unlock() - if err != nil { - return 0, err - } if notify { e.connected.SendNotify() } - return n, nil + return n, err } // SetSockOpt sets a socket option. Currently not supported. |