diff options
Diffstat (limited to 'pkg/tcpip')
-rw-r--r-- | pkg/tcpip/link/tun/BUILD | 1 | ||||
-rw-r--r-- | pkg/tcpip/link/tun/device.go | 5 | ||||
-rw-r--r-- | pkg/tcpip/socketops.go | 59 | ||||
-rw-r--r-- | pkg/tcpip/stack/packet_buffer.go | 14 | ||||
-rw-r--r-- | pkg/tcpip/stack/packet_buffer_test.go | 26 | ||||
-rw-r--r-- | pkg/tcpip/stack/tcp.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 66 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_test.go | 92 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/endpoint.go | 2 |
10 files changed, 229 insertions, 48 deletions
diff --git a/pkg/tcpip/link/tun/BUILD b/pkg/tcpip/link/tun/BUILD index 4758a99ad..c3e4c3455 100644 --- a/pkg/tcpip/link/tun/BUILD +++ b/pkg/tcpip/link/tun/BUILD @@ -31,7 +31,6 @@ go_library( "//pkg/refs", "//pkg/refsvfs2", "//pkg/sync", - "//pkg/syserror", "//pkg/tcpip", "//pkg/tcpip/buffer", "//pkg/tcpip/header", diff --git a/pkg/tcpip/link/tun/device.go b/pkg/tcpip/link/tun/device.go index d23210503..fa2131c28 100644 --- a/pkg/tcpip/link/tun/device.go +++ b/pkg/tcpip/link/tun/device.go @@ -20,7 +20,6 @@ import ( "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/errors/linuxerr" "gvisor.dev/gvisor/pkg/sync" - "gvisor.dev/gvisor/pkg/syserror" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/buffer" "gvisor.dev/gvisor/pkg/tcpip/header" @@ -174,7 +173,7 @@ func (d *Device) Write(data []byte) (int64, error) { return 0, linuxerr.EBADFD } if !endpoint.IsAttached() { - return 0, syserror.EIO + return 0, linuxerr.EIO } dataLen := int64(len(data)) @@ -249,7 +248,7 @@ func (d *Device) Read() ([]byte, error) { for { info, ok := endpoint.Read() if !ok { - return nil, syserror.ErrWouldBlock + return nil, linuxerr.ErrWouldBlock } v, ok := d.encodePkt(&info) diff --git a/pkg/tcpip/socketops.go b/pkg/tcpip/socketops.go index 5642c86f8..34ac62444 100644 --- a/pkg/tcpip/socketops.go +++ b/pkg/tcpip/socketops.go @@ -17,6 +17,7 @@ package tcpip import ( "sync/atomic" + "gvisor.dev/gvisor/pkg/atomicbitops" "gvisor.dev/gvisor/pkg/sync" ) @@ -56,6 +57,11 @@ type SocketOptionsHandler interface { // OnSetReceiveBufferSize is invoked by SO_RCVBUF and SO_RCVBUFFORCE. OnSetReceiveBufferSize(v, oldSz int64) (newSz int64) + + // WakeupWriters is invoked when the send buffer size for an endpoint is + // changed. The handler notifies the writers if the send buffer size is + // increased with setsockopt(2) for TCP endpoints. + WakeupWriters() } // DefaultSocketOptionsHandler is an embeddable type that implements no-op @@ -97,6 +103,9 @@ func (*DefaultSocketOptionsHandler) OnSetSendBufferSize(v int64) (newSz int64) { return v } +// WakeupWriters implements SocketOptionsHandler.WakeupWriters. +func (*DefaultSocketOptionsHandler) WakeupWriters() {} + // OnSetReceiveBufferSize implements SocketOptionsHandler.OnSetReceiveBufferSize. func (*DefaultSocketOptionsHandler) OnSetReceiveBufferSize(v, oldSz int64) (newSz int64) { return v @@ -207,24 +216,16 @@ type SocketOptions struct { // will not change. getSendBufferLimits GetSendBufferLimits `state:"manual"` - // sendBufSizeMu protects sendBufferSize and calls to - // handler.OnSetSendBufferSize. - sendBufSizeMu sync.Mutex `state:"nosave"` - // sendBufferSize determines the send buffer size for this socket. - sendBufferSize int64 + sendBufferSize atomicbitops.AlignedAtomicInt64 // getReceiveBufferLimits provides the handler to get the min, default and // max size for receive buffer. It is initialized at the creation time and // will not change. getReceiveBufferLimits GetReceiveBufferLimits `state:"manual"` - // receiveBufSizeMu protects receiveBufferSize and calls to - // handler.OnSetReceiveBufferSize. - receiveBufSizeMu sync.Mutex `state:"nosave"` - // receiveBufferSize determines the receive buffer size for this socket. - receiveBufferSize int64 + receiveBufferSize atomicbitops.AlignedAtomicInt64 // mu protects the access to the below fields. mu sync.Mutex `state:"nosave"` @@ -614,6 +615,11 @@ func (so *SocketOptions) SetBindToDevice(bindToDevice int32) Error { return nil } +// GetSendBufferSize gets value for SO_SNDBUF option. +func (so *SocketOptions) GetSendBufferSize() int64 { + return so.sendBufferSize.Load() +} + // SendBufferLimits returns the [min, max) range of allowable send buffer // sizes. func (so *SocketOptions) SendBufferLimits() (min, max int64) { @@ -621,22 +627,21 @@ func (so *SocketOptions) SendBufferLimits() (min, max int64) { return int64(limits.Min), int64(limits.Max) } -// GetSendBufferSize gets value for SO_SNDBUF option. -func (so *SocketOptions) GetSendBufferSize() int64 { - so.sendBufSizeMu.Lock() - defer so.sendBufSizeMu.Unlock() - return so.sendBufferSize -} - // SetSendBufferSize sets value for SO_SNDBUF option. notify indicates if the // stack handler should be invoked to set the send buffer size. func (so *SocketOptions) SetSendBufferSize(sendBufferSize int64, notify bool) { - so.sendBufSizeMu.Lock() - defer so.sendBufSizeMu.Unlock() if notify { sendBufferSize = so.handler.OnSetSendBufferSize(sendBufferSize) } - so.sendBufferSize = sendBufferSize + so.sendBufferSize.Store(sendBufferSize) + if notify { + so.handler.WakeupWriters() + } +} + +// GetReceiveBufferSize gets value for SO_RCVBUF option. +func (so *SocketOptions) GetReceiveBufferSize() int64 { + return so.receiveBufferSize.Load() } // ReceiveBufferLimits returns the [min, max) range of allowable receive buffer @@ -646,20 +651,12 @@ func (so *SocketOptions) ReceiveBufferLimits() (min, max int64) { return int64(limits.Min), int64(limits.Max) } -// GetReceiveBufferSize gets value for SO_RCVBUF option. -func (so *SocketOptions) GetReceiveBufferSize() int64 { - so.receiveBufSizeMu.Lock() - defer so.receiveBufSizeMu.Unlock() - return so.receiveBufferSize -} - // SetReceiveBufferSize sets the value of the SO_RCVBUF option, optionally // notifying the owning endpoint. func (so *SocketOptions) SetReceiveBufferSize(receiveBufferSize int64, notify bool) { - so.receiveBufSizeMu.Lock() - defer so.receiveBufSizeMu.Unlock() if notify { - receiveBufferSize = so.handler.OnSetReceiveBufferSize(receiveBufferSize, so.receiveBufferSize) + oldSz := so.receiveBufferSize.Load() + receiveBufferSize = so.handler.OnSetReceiveBufferSize(receiveBufferSize, oldSz) } - so.receiveBufferSize = receiveBufferSize + so.receiveBufferSize.Store(receiveBufferSize) } diff --git a/pkg/tcpip/stack/packet_buffer.go b/pkg/tcpip/stack/packet_buffer.go index 9192d8433..29c22bfd4 100644 --- a/pkg/tcpip/stack/packet_buffer.go +++ b/pkg/tcpip/stack/packet_buffer.go @@ -282,14 +282,12 @@ func (pk *PacketBuffer) headerView(typ headerType) tcpipbuffer.View { return v } -// Clone makes a shallow copy of pk. -// -// Clone should be called in such cases so that no modifications is done to -// underlying packet payload. +// Clone makes a semi-deep copy of pk. The underlying packet payload is +// shared. Hence, no modifications is done to underlying packet payload. func (pk *PacketBuffer) Clone() *PacketBuffer { return &PacketBuffer{ PacketBufferEntry: pk.PacketBufferEntry, - buf: pk.buf, + buf: pk.buf.Clone(), reserved: pk.reserved, pushed: pk.pushed, consumed: pk.consumed, @@ -321,14 +319,14 @@ func (pk *PacketBuffer) Network() header.Network { } } -// CloneToInbound makes a shallow copy of the packet buffer to be used as an -// inbound packet. +// CloneToInbound makes a semi-deep copy of the packet buffer (similar to +// Clone) to be used as an inbound packet. // // See PacketBuffer.Data for details about how a packet buffer holds an inbound // packet. func (pk *PacketBuffer) CloneToInbound() *PacketBuffer { newPk := &PacketBuffer{ - buf: pk.buf, + buf: pk.buf.Clone(), // Treat unfilled header portion as reserved. reserved: pk.AvailableHeaderBytes(), } diff --git a/pkg/tcpip/stack/packet_buffer_test.go b/pkg/tcpip/stack/packet_buffer_test.go index a8da34992..87b023445 100644 --- a/pkg/tcpip/stack/packet_buffer_test.go +++ b/pkg/tcpip/stack/packet_buffer_test.go @@ -123,6 +123,32 @@ func TestPacketHeaderPush(t *testing.T) { } } +func TestPacketBufferClone(t *testing.T) { + data := concatViews(makeView(20), makeView(30), makeView(40)) + pk := NewPacketBuffer(PacketBufferOptions{ + // Make a copy of data to make sure our truth data won't be taint by + // PacketBuffer. + Data: buffer.NewViewFromBytes(data).ToVectorisedView(), + }) + + bytesToDelete := 30 + originalSize := data.Size() + + clonedPks := []*PacketBuffer{ + pk.Clone(), + pk.CloneToInbound(), + } + pk.Data().DeleteFront(bytesToDelete) + if got, want := pk.Data().Size(), originalSize-bytesToDelete; got != want { + t.Errorf("original packet was not changed: size expected = %d, got = %d", want, got) + } + for _, clonedPk := range clonedPks { + if got := clonedPk.Data().Size(); got != originalSize { + t.Errorf("cloned packet should not be modified: expected size = %d, got = %d", originalSize, got) + } + } +} + func TestPacketHeaderConsume(t *testing.T) { for _, test := range []struct { name string diff --git a/pkg/tcpip/stack/tcp.go b/pkg/tcpip/stack/tcp.go index 90a8ba6cf..93ea83cdc 100644 --- a/pkg/tcpip/stack/tcp.go +++ b/pkg/tcpip/stack/tcp.go @@ -386,6 +386,12 @@ type TCPSndBufState struct { // SndMTU is the smallest MTU seen in the control packets received. SndMTU int + + // AutoTuneSndBufDisabled indicates that the auto tuning of send buffer + // is disabled. + // + // Must be accessed using atomic operations. + AutoTuneSndBufDisabled uint32 } // TCPEndpointStateInner contains the members of TCPEndpointState used directly diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 044123185..355719beb 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -1717,6 +1717,27 @@ func (e *endpoint) OnSetReceiveBufferSize(rcvBufSz, oldSz int64) (newSz int64) { return rcvBufSz } +// OnSetSendBufferSize implements tcpip.SocketOptionsHandler.OnSetSendBufferSize. +func (e *endpoint) OnSetSendBufferSize(sz int64) int64 { + atomic.StoreUint32(&e.sndQueueInfo.TCPSndBufState.AutoTuneSndBufDisabled, 1) + return sz +} + +// WakeupWriters implements tcpip.SocketOptionsHandler.WakeupWriters. +func (e *endpoint) WakeupWriters() { + e.LockUser() + defer e.UnlockUser() + + sendBufferSize := e.getSendBufferSize() + e.sndQueueInfo.sndQueueMu.Lock() + notify := (sendBufferSize - e.sndQueueInfo.SndBufUsed) >= e.sndQueueInfo.SndBufUsed>>1 + e.sndQueueInfo.sndQueueMu.Unlock() + + if notify { + e.waiterQueue.Notify(waiter.WritableEvents) + } +} + // SetSockOptInt sets a socket option. func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error { // Lower 2 bits represents ECN bits. RFC 3168, section 23.1 @@ -2329,6 +2350,9 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp e.segmentQueue.mu.Unlock() e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0) e.setEndpointState(StateEstablished) + // Set the new auto tuned send buffer size after entering + // established state. + e.ops.SetSendBufferSize(e.computeTCPSendBufferSize(), false /* notify */) } if run { @@ -2763,13 +2787,20 @@ func (e *endpoint) updateSndBufferUsage(v int) { e.sndQueueInfo.sndQueueMu.Lock() notify := e.sndQueueInfo.SndBufUsed >= sendBufferSize>>1 e.sndQueueInfo.SndBufUsed -= v + + // Get the new send buffer size with auto tuning, but do not set it + // unless we decide to notify the writers. + newSndBufSz := e.computeTCPSendBufferSize() + // We only notify when there is half the sendBufferSize available after // a full buffer event occurs. This ensures that we don't wake up // writers to queue just 1-2 segments and go back to sleep. - notify = notify && e.sndQueueInfo.SndBufUsed < sendBufferSize>>1 + notify = notify && e.sndQueueInfo.SndBufUsed < int(newSndBufSz)>>1 e.sndQueueInfo.sndQueueMu.Unlock() if notify { + // Set the new send buffer size calculated from auto tuning. + e.ops.SetSendBufferSize(newSndBufSz, false /* notify */) e.waiterQueue.Notify(waiter.WritableEvents) } } @@ -3091,3 +3122,36 @@ func GetTCPReceiveBufferLimits(s tcpip.StackHandler) tcpip.ReceiveBufferSizeOpti Max: ss.Max, } } + +// computeTCPSendBufferSize implements auto tuning of send buffer size and +// returns the new send buffer size. +func (e *endpoint) computeTCPSendBufferSize() int64 { + curSndBufSz := int64(e.getSendBufferSize()) + + // Auto tuning is disabled when the user explicitly sets the send + // buffer size with SO_SNDBUF option. + if disabled := atomic.LoadUint32(&e.sndQueueInfo.TCPSndBufState.AutoTuneSndBufDisabled); disabled == 1 { + return curSndBufSz + } + + const packetOverheadFactor = 2 + curMSS := e.snd.MaxPayloadSize + numSeg := InitialCwnd + if numSeg < e.snd.SndCwnd { + numSeg = e.snd.SndCwnd + } + + // SndCwnd indicates the number of segments that can be sent. This means + // that the sender can send upto #SndCwnd segments and the send buffer + // size should be set to SndCwnd*MSS to accommodate sending of all the + // segments. + newSndBufSz := int64(numSeg * curMSS * packetOverheadFactor) + if newSndBufSz < curSndBufSz { + return curSndBufSz + } + if ss := GetTCPSendBufferLimits(e.stack); int64(ss.Max) < newSndBufSz { + newSndBufSz = int64(ss.Max) + } + + return newSndBufSz +} diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 92a66f17e..64302f576 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -1415,9 +1415,6 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { ackLeft -= datalen } - // Update the send buffer usage and notify potential waiters. - s.ep.updateSndBufferUsage(int(acked)) - // Clear SACK information for all acked data. s.ep.scoreboard.Delete(s.SndUna) @@ -1437,6 +1434,9 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) { } } + // Update the send buffer usage and notify potential waiters. + s.ep.updateSndBufferUsage(int(acked)) + // It is possible for s.outstanding to drop below zero if we get // a retransmit timeout, reset outstanding to zero but later // get an ack that cover previously sent data. diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index 031f01357..db6b0955a 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -7964,3 +7964,95 @@ func generateRandomPayload(t *testing.T, n int) []byte { } return buf } + +func TestSendBufferTuning(t *testing.T) { + const maxPayload = 536 + const mtu = header.TCPMinimumSize + header.IPv4MinimumSize + maxTCPOptionSize + maxPayload + const packetOverheadFactor = 2 + + testCases := []struct { + name string + autoTuningDisabled bool + }{ + {"autoTuningDisabled", true}, + {"autoTuningEnabled", false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c := context.New(t, mtu) + defer c.Cleanup() + + // Set the stack option for send buffer size. + const defaultSndBufSz = maxPayload * tcp.InitialCwnd + const maxSndBufSz = defaultSndBufSz * 10 + { + opt := tcpip.TCPSendBufferSizeRangeOption{Min: 1, Default: defaultSndBufSz, Max: maxSndBufSz} + if err := c.Stack().SetTransportProtocolOption(tcp.ProtocolNumber, &opt); err != nil { + t.Fatalf("SetTransportProtocolOption(%d, &%#v): %s", tcp.ProtocolNumber, opt, err) + } + } + + c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) + + oldSz := c.EP.SocketOptions().GetSendBufferSize() + if oldSz != defaultSndBufSz { + t.Fatalf("Wrong send buffer size got %d want %d", oldSz, defaultSndBufSz) + } + + if tc.autoTuningDisabled { + c.EP.SocketOptions().SetSendBufferSize(defaultSndBufSz, true /* notify */) + } + + data := make([]byte, maxPayload) + for i := range data { + data[i] = byte(i) + } + + w, ch := waiter.NewChannelEntry(nil) + c.WQ.EventRegister(&w, waiter.WritableEvents) + defer c.WQ.EventUnregister(&w) + + bytesRead := 0 + for { + // Packets will be sent till the send buffer + // size is reached. + var r bytes.Reader + r.Reset(data[bytesRead : bytesRead+maxPayload]) + _, err := c.EP.Write(&r, tcpip.WriteOptions{}) + if cmp.Equal(&tcpip.ErrWouldBlock{}, err) { + break + } + + c.ReceiveAndCheckPacketWithOptions(data, bytesRead, maxPayload, 0) + bytesRead += maxPayload + data = append(data, data...) + } + + // Send an ACK and wait for connection to become writable again. + c.SendAck(seqnum.Value(context.TestInitialSequenceNumber).Add(1), bytesRead) + select { + case <-ch: + if err := c.EP.LastError(); err != nil { + t.Fatalf("Write failed: %s", err) + } + case <-time.After(1 * time.Second): + t.Fatalf("Timed out waiting for connection") + } + + outSz := int64(defaultSndBufSz) + if !tc.autoTuningDisabled { + // Calculate the new auto tuned send buffer. + var info tcpip.TCPInfoOption + if err := c.EP.GetSockOpt(&info); err != nil { + t.Fatalf("GetSockOpt failed: %v", err) + } + outSz = (int64(info.SndCwnd) * packetOverheadFactor * (maxPayload)) + } + + if newSz := c.EP.SocketOptions().GetSendBufferSize(); newSz != outSz { + t.Fatalf("Wrong send buffer size, got %d want %d", newSz, outSz) + } + }) + } +} diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 82a3f2287..108580508 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -266,7 +266,7 @@ func (e *endpoint) Close() { for mem := range e.multicastMemberships { e.stack.LeaveGroup(e.NetProto, mem.nicID, mem.multicastAddr) } - e.multicastMemberships = make(map[multicastMembership]struct{}) + e.multicastMemberships = nil // Close the receive list and drain it. e.rcvMu.Lock() |