summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip')
-rw-r--r--pkg/tcpip/tcpip.go8
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go47
-rw-r--r--pkg/tcpip/transport/tcp/snd.go40
-rw-r--r--pkg/tcpip/transport/tcp/tcp_test.go137
4 files changed, 185 insertions, 47 deletions
diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go
index 413aee6c6..8e2fe70ee 100644
--- a/pkg/tcpip/tcpip.go
+++ b/pkg/tcpip/tcpip.go
@@ -420,10 +420,14 @@ type ReceiveQueueSizeOption int
// socket is to be restricted to sending and receiving IPv6 packets only.
type V6OnlyOption int
-// NoDelayOption is used by SetSockOpt/GetSockOpt to specify if data should be
+// DelayOption is used by SetSockOpt/GetSockOpt to specify if data should be
// sent out immediately by the transport protocol. For TCP, it determines if the
// Nagle algorithm is on or off.
-type NoDelayOption int
+type DelayOption int
+
+// CorkOption is used by SetSockOpt/GetSockOpt to specify if data should be
+// held until segments are full by the TCP transport protocol.
+type CorkOption int
// ReuseAddressOption is used by SetSockOpt/GetSockOpt to specify whether Bind()
// should allow reuse of local address.
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 0b395b5b0..96a546aa7 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -162,10 +162,19 @@ type endpoint struct {
// sack holds TCP SACK related information for this endpoint.
sack SACKInfo
+ // delay enables Nagle's algorithm.
+ //
+ // delay is a boolean (0 is false) and must be accessed atomically.
+ delay uint32
+
+ // cork holds back segments until full.
+ //
+ // cork is a boolean (0 is false) and must be accessed atomically.
+ cork uint32
+
// The options below aren't implemented, but we remember the user
// settings because applications expect to be able to set/query these
// options.
- noDelay bool
reuseAddr bool
// segmentQueue is used to hand received segments to the protocol
@@ -276,7 +285,6 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite
rcvBufSize: DefaultBufferSize,
sndBufSize: DefaultBufferSize,
sndMTU: int(math.MaxInt32),
- noDelay: false,
reuseAddr: true,
keepalive: keepalive{
// Linux defaults.
@@ -643,10 +651,24 @@ func (e *endpoint) zeroReceiveWindow(scale uint8) bool {
// SetSockOpt sets a socket option.
func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
switch v := opt.(type) {
- case tcpip.NoDelayOption:
- e.mu.Lock()
- e.noDelay = v != 0
- e.mu.Unlock()
+ case tcpip.DelayOption:
+ if v == 0 {
+ atomic.StoreUint32(&e.delay, 0)
+ } else {
+ atomic.StoreUint32(&e.delay, 1)
+ }
+ return nil
+
+ case tcpip.CorkOption:
+ if v == 0 {
+ atomic.StoreUint32(&e.cork, 0)
+ } else {
+ atomic.StoreUint32(&e.cork, 1)
+ }
+
+ // Handle the corked data.
+ e.sndWaker.Assert()
+
return nil
case tcpip.ReuseAddressOption:
@@ -812,13 +834,16 @@ func (e *endpoint) GetSockOpt(opt interface{}) *tcpip.Error {
*o = tcpip.ReceiveQueueSizeOption(v)
return nil
- case *tcpip.NoDelayOption:
- e.mu.RLock()
- v := e.noDelay
- e.mu.RUnlock()
+ case *tcpip.DelayOption:
+ *o = 0
+ if v := atomic.LoadUint32(&e.delay); v != 0 {
+ *o = 1
+ }
+ return nil
+ case *tcpip.CorkOption:
*o = 0
- if v {
+ if v := atomic.LoadUint32(&e.cork); v != 0 {
*o = 1
}
return nil
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index 4482d8d07..f6dc7520b 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -17,6 +17,7 @@ package tcp
import (
"math"
"sync"
+ "sync/atomic"
"time"
"gvisor.googlesource.com/gvisor/pkg/sleep"
@@ -409,8 +410,6 @@ func (s *sender) sendData() {
// We abuse the flags field to determine if we have already
// assigned a sequence number to this segment.
if seg.flags == 0 {
- seg.sequenceNumber = s.sndNxt
- seg.flags = flagAck | flagPsh
// Merge segments if allowed.
if seg.data.Size() != 0 {
available := int(seg.sequenceNumber.Size(end))
@@ -418,8 +417,20 @@ func (s *sender) sendData() {
available = limit
}
+ // nextTooBig indicates that the next segment was too
+ // large to entirely fit in the current segment. It would
+ // be possible to split the next segment and merge the
+ // portion that fits, but unexpectedly splitting segments
+ // can have user visible side-effects which can break
+ // applications. For example, RFC 7766 section 8 says
+ // that the length and data of a DNS response should be
+ // sent in the same TCP segment to avoid triggering bugs
+ // in poorly written DNS implementations.
+ var nextTooBig bool
+
for next != nil && next.data.Size() != 0 {
if seg.data.Size()+next.data.Size() > available {
+ nextTooBig = true
break
}
@@ -429,7 +440,32 @@ func (s *sender) sendData() {
s.writeList.Remove(next)
next = next.Next()
}
+
+ if !nextTooBig && seg.data.Size() < available {
+ // Segment is not full.
+ if s.outstanding > 0 && atomic.LoadUint32(&s.ep.delay) != 0 {
+ // Nagle's algorithm. From Wikipedia:
+ // Nagle's algorithm works by combining a number of
+ // small outgoing messages and sending them all at
+ // once. Specifically, as long as there is a sent
+ // packet for which the sender has received no
+ // acknowledgment, the sender should keep buffering
+ // its output until it has a full packet's worth of
+ // output, thus allowing output to be sent all at
+ // once.
+ break
+ }
+ if atomic.LoadUint32(&s.ep.cork) != 0 {
+ // Hold back the segment until full.
+ break
+ }
+ }
}
+
+ // Assign flags. We don't do it above so that we can merge
+ // additional data if Nagle holds the segment.
+ seg.sequenceNumber = s.sndNxt
+ seg.flags = flagAck | flagPsh
}
var segEnd seqnum.Value
diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go
index 75868c4a2..8155e4ed8 100644
--- a/pkg/tcpip/transport/tcp/tcp_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_test.go
@@ -1255,20 +1255,92 @@ func TestZeroScaledWindowReceive(t *testing.T) {
}
func TestSegmentMerging(t *testing.T) {
+ tests := []struct {
+ name string
+ stop func(tcpip.Endpoint)
+ resume func(tcpip.Endpoint)
+ }{
+ {
+ "stop work",
+ func(ep tcpip.Endpoint) {
+ ep.(interface{ StopWork() }).StopWork()
+ },
+ func(ep tcpip.Endpoint) {
+ ep.(interface{ ResumeWork() }).ResumeWork()
+ },
+ },
+ {
+ "cork",
+ func(ep tcpip.Endpoint) {
+ ep.SetSockOpt(tcpip.CorkOption(1))
+ },
+ func(ep tcpip.Endpoint) {
+ ep.SetSockOpt(tcpip.CorkOption(0))
+ },
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ c := context.New(t, defaultMTU)
+ defer c.Cleanup()
+
+ c.CreateConnected(789, 30000, nil)
+
+ // Prevent the endpoint from processing packets.
+ test.stop(c.EP)
+
+ var allData []byte
+ for i, data := range [][]byte{{1, 2, 3, 4}, {5, 6, 7}, {8, 9}, {10}, {11}} {
+ allData = append(allData, data...)
+ view := buffer.NewViewFromBytes(data)
+ if _, _, err := c.EP.Write(tcpip.SlicePayload(view), tcpip.WriteOptions{}); err != nil {
+ t.Fatalf("Write #%d failed: %v", i+1, err)
+ }
+ }
+
+ // Let the endpoint process the segments that we just sent.
+ test.resume(c.EP)
+
+ // Check that data is received.
+ b := c.GetPacket()
+ checker.IPv4(t, b,
+ checker.PayloadLen(len(allData)+header.TCPMinimumSize),
+ checker.TCP(
+ checker.DstPort(context.TestPort),
+ checker.SeqNum(uint32(c.IRS)+1),
+ checker.AckNum(790),
+ checker.TCPFlagsMatch(header.TCPFlagAck, ^uint8(header.TCPFlagPsh)),
+ ),
+ )
+
+ if got := b[header.IPv4MinimumSize+header.TCPMinimumSize:]; !bytes.Equal(got, allData) {
+ t.Fatalf("got data = %v, want = %v", got, allData)
+ }
+
+ // Acknowledge the data.
+ c.SendPacket(nil, &context.Headers{
+ SrcPort: context.TestPort,
+ DstPort: c.Port,
+ Flags: header.TCPFlagAck,
+ SeqNum: 790,
+ AckNum: c.IRS.Add(1 + seqnum.Size(len(allData))),
+ RcvWnd: 30000,
+ })
+ })
+ }
+}
+
+func TestDelay(t *testing.T) {
c := context.New(t, defaultMTU)
defer c.Cleanup()
c.CreateConnected(789, 30000, nil)
- // Prevent the endpoint from processing packets.
- worker := c.EP.(interface {
- StopWork()
- ResumeWork()
- })
- worker.StopWork()
+ c.EP.SetSockOpt(tcpip.DelayOption(1))
var allData []byte
- for i, data := range [][]byte{{1, 2, 3, 4}, {5, 6, 7}, {8, 9}, {10}, {11}} {
+ for i, data := range [][]byte{{0}, {1, 2, 3, 4}, {5, 6, 7}, {8, 9}, {10}, {11}} {
allData = append(allData, data...)
view := buffer.NewViewFromBytes(data)
if _, _, err := c.EP.Write(tcpip.SlicePayload(view), tcpip.WriteOptions{}); err != nil {
@@ -1276,34 +1348,35 @@ func TestSegmentMerging(t *testing.T) {
}
}
- // Let the endpoint process the segments that we just sent.
- worker.ResumeWork()
+ seq := c.IRS.Add(1)
+ for _, want := range [][]byte{allData[:1], allData[1:]} {
+ // Check that data is received.
+ b := c.GetPacket()
+ checker.IPv4(t, b,
+ checker.PayloadLen(len(want)+header.TCPMinimumSize),
+ checker.TCP(
+ checker.DstPort(context.TestPort),
+ checker.SeqNum(uint32(seq)),
+ checker.AckNum(790),
+ checker.TCPFlagsMatch(header.TCPFlagAck, ^uint8(header.TCPFlagPsh)),
+ ),
+ )
- // Check that data is received.
- b := c.GetPacket()
- checker.IPv4(t, b,
- checker.PayloadLen(len(allData)+header.TCPMinimumSize),
- checker.TCP(
- checker.DstPort(context.TestPort),
- checker.SeqNum(uint32(c.IRS)+1),
- checker.AckNum(790),
- checker.TCPFlagsMatch(header.TCPFlagAck, ^uint8(header.TCPFlagPsh)),
- ),
- )
+ if got := b[header.IPv4MinimumSize+header.TCPMinimumSize:]; !bytes.Equal(got, want) {
+ t.Fatalf("got data = %v, want = %v", got, want)
+ }
- if got := b[header.IPv4MinimumSize+header.TCPMinimumSize:]; !bytes.Equal(got, allData) {
- t.Fatalf("got data = %v, want = %v", got, allData)
+ seq = seq.Add(seqnum.Size(len(want)))
+ // Acknowledge the data.
+ c.SendPacket(nil, &context.Headers{
+ SrcPort: context.TestPort,
+ DstPort: c.Port,
+ Flags: header.TCPFlagAck,
+ SeqNum: 790,
+ AckNum: seq,
+ RcvWnd: 30000,
+ })
}
-
- // Acknowledge the data.
- c.SendPacket(nil, &context.Headers{
- SrcPort: context.TestPort,
- DstPort: c.Port,
- Flags: header.TCPFlagAck,
- SeqNum: 790,
- AckNum: c.IRS.Add(1 + seqnum.Size(len(allData))),
- RcvWnd: 30000,
- })
}
func testBrokenUpWrite(t *testing.T, c *context.Context, maxPayload int) {