summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip')
-rw-r--r--pkg/tcpip/link/rawfile/errors.go2
-rw-r--r--pkg/tcpip/tcpip.go2
-rw-r--r--pkg/tcpip/transport/queue/queue.go69
-rw-r--r--pkg/tcpip/transport/tcp/endpoint_state.go2
-rw-r--r--pkg/tcpip/transport/udp/endpoint.go6
-rw-r--r--pkg/tcpip/transport/unix/connectionless.go6
-rw-r--r--pkg/tcpip/transport/unix/unix.go49
7 files changed, 104 insertions, 32 deletions
diff --git a/pkg/tcpip/link/rawfile/errors.go b/pkg/tcpip/link/rawfile/errors.go
index 7f213793e..de7593d9c 100644
--- a/pkg/tcpip/link/rawfile/errors.go
+++ b/pkg/tcpip/link/rawfile/errors.go
@@ -41,6 +41,8 @@ var translations = map[syscall.Errno]*tcpip.Error{
syscall.ENOTCONN: tcpip.ErrNotConnected,
syscall.ECONNRESET: tcpip.ErrConnectionReset,
syscall.ECONNABORTED: tcpip.ErrConnectionAborted,
+ syscall.EMSGSIZE: tcpip.ErrMessageTooLong,
+ syscall.ENOBUFS: tcpip.ErrNoBufferSpace,
}
// TranslateErrno translate an errno from the syscall package into a
diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go
index f5b5ec86b..cef27948c 100644
--- a/pkg/tcpip/tcpip.go
+++ b/pkg/tcpip/tcpip.go
@@ -98,6 +98,8 @@ var (
ErrNoLinkAddress = &Error{msg: "no remote link address"}
ErrBadAddress = &Error{msg: "bad address"}
ErrNetworkUnreachable = &Error{msg: "network is unreachable"}
+ ErrMessageTooLong = &Error{msg: "message too long"}
+ ErrNoBufferSpace = &Error{msg: "no buffer space available"}
)
// Errors related to Subnet
diff --git a/pkg/tcpip/transport/queue/queue.go b/pkg/tcpip/transport/queue/queue.go
index eb9ee8a3f..b3d2ea68b 100644
--- a/pkg/tcpip/transport/queue/queue.go
+++ b/pkg/tcpip/transport/queue/queue.go
@@ -24,12 +24,23 @@ import (
"gvisor.googlesource.com/gvisor/pkg/waiter"
)
-// Entry implements Linker interface and has both Length and Release methods.
+// Entry implements Linker interface and has additional required methods.
type Entry interface {
ilist.Linker
+
+ // Length returns the number of bytes stored in the entry.
Length() int64
+
+ // Release releases any resources held by the entry.
Release()
+
+ // Peek returns a copy of the entry. It must be Released separately.
Peek() Entry
+
+ // Truncate reduces the number of bytes stored in the entry to n bytes.
+ //
+ // Preconditions: n <= Length().
+ Truncate(n int64)
}
// Queue is a buffer queue.
@@ -52,7 +63,7 @@ func New(ReaderQueue *waiter.Queue, WriterQueue *waiter.Queue, limit int64) *Que
}
// Close closes q for reading and writing. It is immediately not writable and
-// will become unreadble will no more data is pending.
+// will become unreadable when no more data is pending.
//
// Both the read and write queues must be notified after closing:
// q.ReaderQueue.Notify(waiter.EventIn)
@@ -86,38 +97,74 @@ func (q *Queue) IsReadable() bool {
return q.closed || q.dataList.Front() != nil
}
+// bufWritable returns true if there is space for writing.
+//
+// N.B. Linux only considers a unix socket "writable" if >75% of the buffer is
+// free.
+//
+// See net/unix/af_unix.c:unix_writeable.
+func (q *Queue) bufWritable() bool {
+ return 4*q.used < q.limit
+}
+
// IsWritable determines if q is currently writable.
func (q *Queue) IsWritable() bool {
q.mu.Lock()
defer q.mu.Unlock()
- return q.closed || q.used < q.limit
+ return q.closed || q.bufWritable()
}
// Enqueue adds an entry to the data queue if room is available.
//
+// If truncate is true, Enqueue may truncate the message beforing enqueuing it.
+// Otherwise, the entire message must fit. If n < e.Length(), err indicates why.
+//
// If notify is true, ReaderQueue.Notify must be called:
// q.ReaderQueue.Notify(waiter.EventIn)
-func (q *Queue) Enqueue(e Entry) (notify bool, err *tcpip.Error) {
+func (q *Queue) Enqueue(e Entry, truncate bool) (l int64, notify bool, err *tcpip.Error) {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
- return false, tcpip.ErrClosedForSend
+ return 0, false, tcpip.ErrClosedForSend
+ }
+
+ free := q.limit - q.used
+
+ l = e.Length()
+
+ if l > free && truncate {
+ if free == 0 {
+ // Message can't fit right now.
+ q.mu.Unlock()
+ return 0, false, tcpip.ErrWouldBlock
+ }
+
+ e.Truncate(free)
+ l = e.Length()
+ err = tcpip.ErrWouldBlock
+ }
+
+ if l > q.limit {
+ // Message is too big to ever fit.
+ q.mu.Unlock()
+ return 0, false, tcpip.ErrMessageTooLong
}
- if q.used >= q.limit {
+ if l > free {
+ // Message can't fit right now.
q.mu.Unlock()
- return false, tcpip.ErrWouldBlock
+ return 0, false, tcpip.ErrWouldBlock
}
notify = q.dataList.Front() == nil
- q.used += e.Length()
+ q.used += l
q.dataList.PushBack(e)
q.mu.Unlock()
- return notify, nil
+ return l, notify, err
}
// Dequeue removes the first entry in the data queue, if one exists.
@@ -137,13 +184,13 @@ func (q *Queue) Dequeue() (e Entry, notify bool, err *tcpip.Error) {
return nil, false, err
}
- notify = q.used >= q.limit
+ notify = !q.bufWritable()
e = q.dataList.Front().(Entry)
q.dataList.Remove(e)
q.used -= e.Length()
- notify = notify && q.used < q.limit
+ notify = notify && q.bufWritable()
q.mu.Unlock()
diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go
index 6143390b3..bed7ec6a6 100644
--- a/pkg/tcpip/transport/tcp/endpoint_state.go
+++ b/pkg/tcpip/transport/tcp/endpoint_state.go
@@ -315,6 +315,8 @@ func loadError(s string) *tcpip.Error {
tcpip.ErrNoLinkAddress,
tcpip.ErrBadAddress,
tcpip.ErrNetworkUnreachable,
+ tcpip.ErrMessageTooLong,
+ tcpip.ErrNoBufferSpace,
}
messageToError = make(map[string]*tcpip.Error)
diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go
index 6ed805357..840e95302 100644
--- a/pkg/tcpip/transport/udp/endpoint.go
+++ b/pkg/tcpip/transport/udp/endpoint.go
@@ -15,6 +15,7 @@
package udp
import (
+ "math"
"sync"
"gvisor.googlesource.com/gvisor/pkg/sleep"
@@ -264,6 +265,11 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, <-c
return 0, nil, tcpip.ErrInvalidOptionValue
}
+ if p.Size() > math.MaxUint16 {
+ // Payload can't possibly fit in a packet.
+ return 0, nil, tcpip.ErrMessageTooLong
+ }
+
to := opts.To
e.mu.RLock()
diff --git a/pkg/tcpip/transport/unix/connectionless.go b/pkg/tcpip/transport/unix/connectionless.go
index ebd4802b0..ae93c61d7 100644
--- a/pkg/tcpip/transport/unix/connectionless.go
+++ b/pkg/tcpip/transport/unix/connectionless.go
@@ -105,14 +105,12 @@ func (e *connectionlessEndpoint) SendMsg(data [][]byte, c ControlMessages, to Bo
e.Lock()
n, notify, err := connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)})
e.Unlock()
- if err != nil {
- return 0, err
- }
+
if notify {
connected.SendNotify()
}
- return n, nil
+ return n, err
}
// Type implements Endpoint.Type.
diff --git a/pkg/tcpip/transport/unix/unix.go b/pkg/tcpip/transport/unix/unix.go
index 0bb00df42..718606cd1 100644
--- a/pkg/tcpip/transport/unix/unix.go
+++ b/pkg/tcpip/transport/unix/unix.go
@@ -260,20 +260,28 @@ type message struct {
Address tcpip.FullAddress
}
-// Length returns number of bytes stored in the Message.
+// Length returns number of bytes stored in the message.
func (m *message) Length() int64 {
return int64(len(m.Data))
}
-// Release releases any resources held by the Message.
+// Release releases any resources held by the message.
func (m *message) Release() {
m.Control.Release()
}
+// Peek returns a copy of the message.
func (m *message) Peek() queue.Entry {
return &message{Data: m.Data, Control: m.Control.Clone(), Address: m.Address}
}
+// Truncate reduces the length of the message payload to n bytes.
+//
+// Preconditions: n <= m.Length().
+func (m *message) Truncate(n int64) {
+ m.Data.CapLength(int(n))
+}
+
// A Receiver can be used to receive Messages.
type Receiver interface {
// Recv receives a single message. This method does not block.
@@ -623,23 +631,33 @@ func (e *connectedEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error)
// Send implements ConnectedEndpoint.Send.
func (e *connectedEndpoint) Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (uintptr, bool, *tcpip.Error) {
- var l int
+ var l int64
for _, d := range data {
- l += len(d)
- }
- // Discard empty stream packets. Since stream sockets don't preserve
- // message boundaries, sending zero bytes is a no-op. In Linux, the
- // receiver actually uses a zero-length receive as an indication that the
- // stream was closed.
- if l == 0 && e.endpoint.Type() == SockStream {
- controlMessages.Release()
- return 0, false, nil
+ l += int64(len(d))
+ }
+
+ truncate := false
+ if e.endpoint.Type() == SockStream {
+ // Since stream sockets don't preserve message boundaries, we
+ // can write only as much of the message as fits in the queue.
+ truncate = true
+
+ // Discard empty stream packets. Since stream sockets don't
+ // preserve message boundaries, sending zero bytes is a no-op.
+ // In Linux, the receiver actually uses a zero-length receive
+ // as an indication that the stream was closed.
+ if l == 0 {
+ controlMessages.Release()
+ return 0, false, nil
+ }
}
+
v := make([]byte, 0, l)
for _, d := range data {
v = append(v, d...)
}
- notify, err := e.writeQueue.Enqueue(&message{Data: buffer.View(v), Control: controlMessages, Address: from})
+
+ l, notify, err := e.writeQueue.Enqueue(&message{Data: buffer.View(v), Control: controlMessages, Address: from}, truncate)
return uintptr(l), notify, err
}
@@ -793,15 +811,12 @@ func (e *baseEndpoint) SendMsg(data [][]byte, c ControlMessages, to BoundEndpoin
n, notify, err := e.connected.Send(data, c, tcpip.FullAddress{Addr: tcpip.Address(e.path)})
e.Unlock()
- if err != nil {
- return 0, err
- }
if notify {
e.connected.SendNotify()
}
- return n, nil
+ return n, err
}
// SetSockOpt sets a socket option. Currently not supported.