From 28212b3f179dc23bb966f72b11f635017cdf8664 Mon Sep 17 00:00:00 2001 From: Bhasker Hariharan Date: Tue, 14 Apr 2020 19:32:32 -0700 Subject: Reduce flakiness in tcp_test. Tests now use a MinRTO of 3s instead of default 200ms. This reduced flakiness in a lot of the congestion control/recovery tests which were flaky due to retransmit timer firing too early in case the test executors were overloaded. This change also bumps some of the timeouts in tests which were too sensitive to timer variations and reduces the number of slow start iterations which can make the tests run for too long and also trigger retansmit timeouts etc if the executor is overloaded. PiperOrigin-RevId: 306562645 --- pkg/tcpip/checker/checker.go | 19 +++++ pkg/tcpip/link/channel/channel.go | 34 ++------- pkg/tcpip/tcpip.go | 4 ++ pkg/tcpip/transport/tcp/BUILD | 5 +- pkg/tcpip/transport/tcp/protocol.go | 17 +++++ pkg/tcpip/transport/tcp/snd.go | 15 +++- pkg/tcpip/transport/tcp/tcp_noracedetector_test.go | 83 ++++++++++++++-------- pkg/tcpip/transport/tcp/tcp_sack_test.go | 2 +- pkg/tcpip/transport/tcp/tcp_test.go | 50 ++++++------- pkg/tcpip/transport/tcp/testing/context/context.go | 17 ++++- 10 files changed, 159 insertions(+), 87 deletions(-) diff --git a/pkg/tcpip/checker/checker.go b/pkg/tcpip/checker/checker.go index 307f1b666..c1745ba6a 100644 --- a/pkg/tcpip/checker/checker.go +++ b/pkg/tcpip/checker/checker.go @@ -107,6 +107,8 @@ func DstAddr(addr tcpip.Address) NetworkChecker { // TTL creates a checker that checks the TTL (ipv4) or HopLimit (ipv6). func TTL(ttl uint8) NetworkChecker { return func(t *testing.T, h []header.Network) { + t.Helper() + var v uint8 switch ip := h[0].(type) { case header.IPv4: @@ -310,6 +312,8 @@ func SrcPort(port uint16) TransportChecker { // DstPort creates a checker that checks the destination port. func DstPort(port uint16) TransportChecker { return func(t *testing.T, h header.Transport) { + t.Helper() + if p := h.DestinationPort(); p != port { t.Errorf("Bad destination port, got %v, want %v", p, port) } @@ -336,6 +340,7 @@ func SeqNum(seq uint32) TransportChecker { func AckNum(seq uint32) TransportChecker { return func(t *testing.T, h header.Transport) { t.Helper() + tcp, ok := h.(header.TCP) if !ok { return @@ -350,6 +355,8 @@ func AckNum(seq uint32) TransportChecker { // Window creates a checker that checks the tcp window. func Window(window uint16) TransportChecker { return func(t *testing.T, h header.Transport) { + t.Helper() + tcp, ok := h.(header.TCP) if !ok { return @@ -381,6 +388,8 @@ func TCPFlags(flags uint8) TransportChecker { // given mask, match the supplied flags. func TCPFlagsMatch(flags, mask uint8) TransportChecker { return func(t *testing.T, h header.Transport) { + t.Helper() + tcp, ok := h.(header.TCP) if !ok { return @@ -398,6 +407,8 @@ func TCPFlagsMatch(flags, mask uint8) TransportChecker { // If wndscale is negative, the window scale option must not be present. func TCPSynOptions(wantOpts header.TCPSynOptions) TransportChecker { return func(t *testing.T, h header.Transport) { + t.Helper() + tcp, ok := h.(header.TCP) if !ok { return @@ -494,6 +505,8 @@ func TCPSynOptions(wantOpts header.TCPSynOptions) TransportChecker { // skipped. func TCPTimestampChecker(wantTS bool, wantTSVal uint32, wantTSEcr uint32) TransportChecker { return func(t *testing.T, h header.Transport) { + t.Helper() + tcp, ok := h.(header.TCP) if !ok { return @@ -612,6 +625,8 @@ func TCPSACKBlockChecker(sackBlocks []header.SACKBlock) TransportChecker { // Payload creates a checker that checks the payload. func Payload(want []byte) TransportChecker { return func(t *testing.T, h header.Transport) { + t.Helper() + if got := h.Payload(); !reflect.DeepEqual(got, want) { t.Errorf("Wrong payload, got %v, want %v", got, want) } @@ -644,6 +659,7 @@ func ICMPv4(checkers ...TransportChecker) NetworkChecker { func ICMPv4Type(want header.ICMPv4Type) TransportChecker { return func(t *testing.T, h header.Transport) { t.Helper() + icmpv4, ok := h.(header.ICMPv4) if !ok { t.Fatalf("unexpected transport header passed to checker got: %+v, want: header.ICMPv4", h) @@ -658,6 +674,7 @@ func ICMPv4Type(want header.ICMPv4Type) TransportChecker { func ICMPv4Code(want byte) TransportChecker { return func(t *testing.T, h header.Transport) { t.Helper() + icmpv4, ok := h.(header.ICMPv4) if !ok { t.Fatalf("unexpected transport header passed to checker got: %+v, want: header.ICMPv4", h) @@ -700,6 +717,7 @@ func ICMPv6(checkers ...TransportChecker) NetworkChecker { func ICMPv6Type(want header.ICMPv6Type) TransportChecker { return func(t *testing.T, h header.Transport) { t.Helper() + icmpv6, ok := h.(header.ICMPv6) if !ok { t.Fatalf("unexpected transport header passed to checker got: %+v, want: header.ICMPv6", h) @@ -714,6 +732,7 @@ func ICMPv6Type(want header.ICMPv6Type) TransportChecker { func ICMPv6Code(want byte) TransportChecker { return func(t *testing.T, h header.Transport) { t.Helper() + icmpv6, ok := h.(header.ICMPv6) if !ok { t.Fatalf("unexpected transport header passed to checker got: %+v, want: header.ICMPv6", h) diff --git a/pkg/tcpip/link/channel/channel.go b/pkg/tcpip/link/channel/channel.go index b4a0ae53d..9bf67686d 100644 --- a/pkg/tcpip/link/channel/channel.go +++ b/pkg/tcpip/link/channel/channel.go @@ -50,13 +50,11 @@ type NotificationHandle struct { } type queue struct { + // c is the outbound packet channel. + c chan PacketInfo // mu protects fields below. - mu sync.RWMutex - // c is the outbound packet channel. Sending to c should hold mu. - c chan PacketInfo - numWrite int - numRead int - notify []*NotificationHandle + mu sync.RWMutex + notify []*NotificationHandle } func (q *queue) Close() { @@ -64,11 +62,8 @@ func (q *queue) Close() { } func (q *queue) Read() (PacketInfo, bool) { - q.mu.Lock() - defer q.mu.Unlock() select { case p := <-q.c: - q.numRead++ return p, true default: return PacketInfo{}, false @@ -76,15 +71,8 @@ func (q *queue) Read() (PacketInfo, bool) { } func (q *queue) ReadContext(ctx context.Context) (PacketInfo, bool) { - // We have to receive from channel without holding the lock, since it can - // block indefinitely. This will cause a window that numWrite - numRead - // produces a larger number, but won't go to negative. numWrite >= numRead - // still holds. select { case pkt := <-q.c: - q.mu.Lock() - defer q.mu.Unlock() - q.numRead++ return pkt, true case <-ctx.Done(): return PacketInfo{}, false @@ -93,16 +81,12 @@ func (q *queue) ReadContext(ctx context.Context) (PacketInfo, bool) { func (q *queue) Write(p PacketInfo) bool { wrote := false - - // It's important to make sure nobody can see numWrite until we increment it, - // so numWrite >= numRead holds. - q.mu.Lock() select { case q.c <- p: wrote = true - q.numWrite++ default: } + q.mu.Lock() notify := q.notify q.mu.Unlock() @@ -116,13 +100,7 @@ func (q *queue) Write(p PacketInfo) bool { } func (q *queue) Num() int { - q.mu.RLock() - defer q.mu.RUnlock() - n := q.numWrite - q.numRead - if n < 0 { - panic("numWrite < numRead") - } - return n + return len(q.c) } func (q *queue) AddNotify(notify Notification) *NotificationHandle { diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go index aec7126ff..109121dbc 100644 --- a/pkg/tcpip/tcpip.go +++ b/pkg/tcpip/tcpip.go @@ -681,6 +681,10 @@ type TCPTimeWaitTimeoutOption time.Duration // for a handshake till the specified timeout until a segment with data arrives. type TCPDeferAcceptOption time.Duration +// TCPMinRTOOption is use by SetSockOpt/GetSockOpt to allow overriding +// default MinRTO used by the Stack. +type TCPMinRTOOption time.Duration + // MulticastInterfaceOption is used by SetSockOpt/GetSockOpt to specify a // default interface for multicast. type MulticastInterfaceOption struct { diff --git a/pkg/tcpip/transport/tcp/BUILD b/pkg/tcpip/transport/tcp/BUILD index 7f94f9646..edb7718a6 100644 --- a/pkg/tcpip/transport/tcp/BUILD +++ b/pkg/tcpip/transport/tcp/BUILD @@ -87,7 +87,9 @@ go_test( "tcp_timestamp_test.go", ], # FIXME(b/68809571) - tags = ["flaky"], + tags = [ + "flaky", + ], deps = [ ":tcp", "//pkg/sync", @@ -104,5 +106,6 @@ go_test( "//pkg/tcpip/stack", "//pkg/tcpip/transport/tcp/testing/context", "//pkg/waiter", + "//runsc/testutil", ], ) diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go index dce9a1652..91f25c132 100644 --- a/pkg/tcpip/transport/tcp/protocol.go +++ b/pkg/tcpip/transport/tcp/protocol.go @@ -105,6 +105,7 @@ type protocol struct { moderateReceiveBuffer bool tcpLingerTimeout time.Duration tcpTimeWaitTimeout time.Duration + minRTO time.Duration dispatcher *dispatcher } @@ -272,6 +273,15 @@ func (p *protocol) SetOption(option interface{}) *tcpip.Error { p.mu.Unlock() return nil + case tcpip.TCPMinRTOOption: + if v < 0 { + v = tcpip.TCPMinRTOOption(MinRTO) + } + p.mu.Lock() + p.minRTO = time.Duration(v) + p.mu.Unlock() + return nil + default: return tcpip.ErrUnknownProtocolOption } @@ -334,6 +344,12 @@ func (p *protocol) Option(option interface{}) *tcpip.Error { p.mu.RUnlock() return nil + case *tcpip.TCPMinRTOOption: + p.mu.RLock() + *v = tcpip.TCPMinRTOOption(p.minRTO) + p.mu.RUnlock() + return nil + default: return tcpip.ErrUnknownProtocolOption } @@ -359,5 +375,6 @@ func NewProtocol() stack.TransportProtocol { tcpLingerTimeout: DefaultTCPLingerTimeout, tcpTimeWaitTimeout: DefaultTCPTimeWaitTimeout, dispatcher: newDispatcher(runtime.GOMAXPROCS(0)), + minRTO: MinRTO, } } diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index 6b7bac37d..d8cfe3115 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -15,6 +15,7 @@ package tcp import ( + "fmt" "math" "sync/atomic" "time" @@ -149,6 +150,9 @@ type sender struct { rtt rtt rto time.Duration + // minRTO is the minimum permitted value for sender.rto. + minRTO time.Duration + // maxPayloadSize is the maximum size of the payload of a given segment. // It is initialized on demand. maxPayloadSize int @@ -260,6 +264,13 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint // etc. s.ep.scoreboard = NewSACKScoreboard(uint16(s.maxPayloadSize), iss) + // Get Stack wide minRTO. + var v tcpip.TCPMinRTOOption + if err := ep.stack.TransportProtocolOption(ProtocolNumber, &v); err != nil { + panic(fmt.Sprintf("unable to get minRTO from stack: %s", err)) + } + s.minRTO = time.Duration(v) + return s } @@ -394,8 +405,8 @@ func (s *sender) updateRTO(rtt time.Duration) { s.rto = s.rtt.srtt + 4*s.rtt.rttvar s.rtt.Unlock() - if s.rto < MinRTO { - s.rto = MinRTO + if s.rto < s.minRTO { + s.rto = s.minRTO } } diff --git a/pkg/tcpip/transport/tcp/tcp_noracedetector_test.go b/pkg/tcpip/transport/tcp/tcp_noracedetector_test.go index 782d7b42c..359a75e73 100644 --- a/pkg/tcpip/transport/tcp/tcp_noracedetector_test.go +++ b/pkg/tcpip/transport/tcp/tcp_noracedetector_test.go @@ -31,6 +31,7 @@ import ( "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" "gvisor.dev/gvisor/pkg/tcpip/transport/tcp/testing/context" + "gvisor.dev/gvisor/runsc/testutil" ) func TestFastRecovery(t *testing.T) { @@ -40,7 +41,7 @@ func TestFastRecovery(t *testing.T) { c.CreateConnected(789, 30000, -1 /* epRcvBuf */) - const iterations = 7 + const iterations = 3 data := buffer.NewView(2 * maxPayload * (tcp.InitialCwnd << (iterations + 1))) for i := range data { data[i] = byte(i) @@ -86,16 +87,23 @@ func TestFastRecovery(t *testing.T) { // Receive the retransmitted packet. c.ReceiveAndCheckPacket(data, rtxOffset, maxPayload) - if got, want := c.Stack().Stats().TCP.FastRetransmit.Value(), uint64(1); got != want { - t.Errorf("got stats.TCP.FastRetransmit.Value = %v, want = %v", got, want) - } + // Wait before checking metrics. + metricPollFn := func() error { + if got, want := c.Stack().Stats().TCP.FastRetransmit.Value(), uint64(1); got != want { + return fmt.Errorf("got stats.TCP.FastRetransmit.Value = %v, want = %v", got, want) + } + if got, want := c.Stack().Stats().TCP.Retransmits.Value(), uint64(1); got != want { + return fmt.Errorf("got stats.TCP.Retransmit.Value = %v, want = %v", got, want) + } - if got, want := c.Stack().Stats().TCP.Retransmits.Value(), uint64(1); got != want { - t.Errorf("got stats.TCP.Retransmit.Value = %v, want = %v", got, want) + if got, want := c.Stack().Stats().TCP.FastRecovery.Value(), uint64(1); got != want { + return fmt.Errorf("got stats.TCP.FastRecovery.Value = %v, want = %v", got, want) + } + return nil } - if got, want := c.Stack().Stats().TCP.FastRecovery.Value(), uint64(1); got != want { - t.Errorf("got stats.TCP.FastRecovery.Value = %v, want = %v", got, want) + if err := testutil.Poll(metricPollFn, 1*time.Second); err != nil { + t.Error(err) } // Now send 7 mode duplicate acks. Each of these should cause a window @@ -117,12 +125,18 @@ func TestFastRecovery(t *testing.T) { // Receive the retransmit due to partial ack. c.ReceiveAndCheckPacket(data, rtxOffset, maxPayload) - if got, want := c.Stack().Stats().TCP.FastRetransmit.Value(), uint64(2); got != want { - t.Errorf("got stats.TCP.FastRetransmit.Value = %v, want = %v", got, want) + // Wait before checking metrics. + metricPollFn = func() error { + if got, want := c.Stack().Stats().TCP.FastRetransmit.Value(), uint64(2); got != want { + return fmt.Errorf("got stats.TCP.FastRetransmit.Value = %v, want = %v", got, want) + } + if got, want := c.Stack().Stats().TCP.Retransmits.Value(), uint64(2); got != want { + return fmt.Errorf("got stats.TCP.Retransmit.Value = %v, want = %v", got, want) + } + return nil } - - if got, want := c.Stack().Stats().TCP.Retransmits.Value(), uint64(2); got != want { - t.Errorf("got stats.TCP.Retransmit.Value = %v, want = %v", got, want) + if err := testutil.Poll(metricPollFn, 1*time.Second); err != nil { + t.Error(err) } // Receive the 10 extra packets that should have been released due to @@ -192,7 +206,7 @@ func TestExponentialIncreaseDuringSlowStart(t *testing.T) { c.CreateConnected(789, 30000, -1 /* epRcvBuf */) - const iterations = 7 + const iterations = 3 data := buffer.NewView(maxPayload * (tcp.InitialCwnd << (iterations + 1))) for i := range data { data[i] = byte(i) @@ -234,7 +248,7 @@ func TestCongestionAvoidance(t *testing.T) { c.CreateConnected(789, 30000, -1 /* epRcvBuf */) - const iterations = 7 + const iterations = 3 data := buffer.NewView(2 * maxPayload * (tcp.InitialCwnd << (iterations + 1))) for i := range data { data[i] = byte(i) @@ -338,7 +352,7 @@ func TestCubicCongestionAvoidance(t *testing.T) { c.CreateConnected(789, 30000, -1 /* epRcvBuf */) - const iterations = 7 + const iterations = 3 data := buffer.NewView(2 * maxPayload * (tcp.InitialCwnd << (iterations + 1))) for i := range data { @@ -447,7 +461,7 @@ func TestRetransmit(t *testing.T) { c.CreateConnected(789, 30000, -1 /* epRcvBuf */) - const iterations = 7 + const iterations = 3 data := buffer.NewView(maxPayload * (tcp.InitialCwnd << (iterations + 1))) for i := range data { data[i] = byte(i) @@ -492,24 +506,33 @@ func TestRetransmit(t *testing.T) { rtxOffset := bytesRead - maxPayload*expected c.ReceiveAndCheckPacket(data, rtxOffset, maxPayload) - if got, want := c.Stack().Stats().TCP.Timeouts.Value(), uint64(1); got != want { - t.Errorf("got stats.TCP.Timeouts.Value = %v, want = %v", got, want) - } + metricPollFn := func() error { + if got, want := c.Stack().Stats().TCP.Timeouts.Value(), uint64(1); got != want { + return fmt.Errorf("got stats.TCP.Timeouts.Value = %v, want = %v", got, want) + } - if got, want := c.Stack().Stats().TCP.Retransmits.Value(), uint64(1); got != want { - t.Errorf("got stats.TCP.Retransmits.Value = %v, want = %v", got, want) - } + if got, want := c.Stack().Stats().TCP.Retransmits.Value(), uint64(1); got != want { + return fmt.Errorf("got stats.TCP.Retransmits.Value = %v, want = %v", got, want) + } - if got, want := c.EP.Stats().(*tcp.Stats).SendErrors.Timeouts.Value(), uint64(1); got != want { - t.Errorf("got EP SendErrors.Timeouts.Value = %v, want = %v", got, want) - } + if got, want := c.EP.Stats().(*tcp.Stats).SendErrors.Timeouts.Value(), uint64(1); got != want { + return fmt.Errorf("got EP SendErrors.Timeouts.Value = %v, want = %v", got, want) + } + + if got, want := c.EP.Stats().(*tcp.Stats).SendErrors.Retransmits.Value(), uint64(1); got != want { + return fmt.Errorf("got EP stats SendErrors.Retransmits.Value = %v, want = %v", got, want) + } + + if got, want := c.Stack().Stats().TCP.SlowStartRetransmits.Value(), uint64(1); got != want { + return fmt.Errorf("got stats.TCP.SlowStartRetransmits.Value = %v, want = %v", got, want) + } - if got, want := c.EP.Stats().(*tcp.Stats).SendErrors.Retransmits.Value(), uint64(1); got != want { - t.Errorf("got EP stats SendErrors.Retransmits.Value = %v, want = %v", got, want) + return nil } - if got, want := c.Stack().Stats().TCP.SlowStartRetransmits.Value(), uint64(1); got != want { - t.Errorf("got stats.TCP.SlowStartRetransmits.Value = %v, want = %v", got, want) + // Poll when checking metrics. + if err := testutil.Poll(metricPollFn, 1*time.Second); err != nil { + t.Error(err) } // Acknowledge half of the pending data. diff --git a/pkg/tcpip/transport/tcp/tcp_sack_test.go b/pkg/tcpip/transport/tcp/tcp_sack_test.go index afea124ec..c439d5281 100644 --- a/pkg/tcpip/transport/tcp/tcp_sack_test.go +++ b/pkg/tcpip/transport/tcp/tcp_sack_test.go @@ -387,7 +387,7 @@ func TestSACKRecovery(t *testing.T) { setStackSACKPermitted(t, c, true) createConnectedWithSACKAndTS(c) - const iterations = 7 + const iterations = 3 data := buffer.NewView(2 * maxPayload * (tcp.InitialCwnd << (iterations + 1))) for i := range data { data[i] = byte(i) diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index 29301a45c..41caa9ed4 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -590,6 +590,10 @@ func TestClosingWithEnqueuedSegments(t *testing.T) { ), ) + // Give the stack a few ms to transition the endpoint out of ESTABLISHED + // state. + time.Sleep(10 * time.Millisecond) + if got, want := tcp.EndpointState(ep.State()), tcp.StateCloseWait; got != want { t.Errorf("Unexpected endpoint state: want %v, got %v", want, got) } @@ -4472,8 +4476,8 @@ func TestKeepalive(t *testing.T) { c.CreateConnected(789, 30000, -1 /* epRcvBuf */) - const keepAliveInterval = 10 * time.Millisecond - c.EP.SetSockOpt(tcpip.KeepaliveIdleOption(10 * time.Millisecond)) + const keepAliveInterval = 3 * time.Second + c.EP.SetSockOpt(tcpip.KeepaliveIdleOption(100 * time.Millisecond)) c.EP.SetSockOpt(tcpip.KeepaliveIntervalOption(keepAliveInterval)) c.EP.SetSockOptInt(tcpip.KeepaliveCountOption, 5) c.EP.SetSockOptBool(tcpip.KeepaliveEnabledOption, true) @@ -4567,7 +4571,7 @@ func TestKeepalive(t *testing.T) { // Sleep for a litte over the KeepAlive interval to make sure // the timer has time to fire after the last ACK and close the // close the socket. - time.Sleep(keepAliveInterval + 5*time.Millisecond) + time.Sleep(keepAliveInterval + keepAliveInterval/2) // The connection should be terminated after 5 unacked keepalives. // Send an ACK to trigger a RST from the stack as the endpoint should @@ -6615,14 +6619,17 @@ func TestKeepaliveWithUserTimeout(t *testing.T) { origEstablishedTimedout := c.Stack().Stats().TCP.EstablishedTimedout.Value() - const keepAliveInterval = 10 * time.Millisecond - c.EP.SetSockOpt(tcpip.KeepaliveIdleOption(10 * time.Millisecond)) + const keepAliveInterval = 3 * time.Second + c.EP.SetSockOpt(tcpip.KeepaliveIdleOption(100 * time.Millisecond)) c.EP.SetSockOpt(tcpip.KeepaliveIntervalOption(keepAliveInterval)) c.EP.SetSockOptInt(tcpip.KeepaliveCountOption, 10) c.EP.SetSockOptBool(tcpip.KeepaliveEnabledOption, true) - // Set userTimeout to be the duration for 3 keepalive probes. - userTimeout := 30 * time.Millisecond + // Set userTimeout to be the duration to be 1 keepalive + // probes. Which means that after the first probe is sent + // the second one should cause the connection to be + // closed due to userTimeout being hit. + userTimeout := 1 * keepAliveInterval c.EP.SetSockOpt(tcpip.TCPUserTimeoutOption(userTimeout)) // Check that the connection is still alive. @@ -6630,28 +6637,23 @@ func TestKeepaliveWithUserTimeout(t *testing.T) { t.Fatalf("got c.EP.Read(nil) = %v, want = %v", err, tcpip.ErrWouldBlock) } - // Now receive 2 keepalives, but don't ACK them. The connection should - // be reset when the 3rd one should be sent due to userTimeout being - // 30ms and each keepalive probe should be sent 10ms apart as set above after - // the connection has been idle for 10ms. - for i := 0; i < 2; i++ { - b := c.GetPacket() - checker.IPv4(t, b, - checker.TCP( - checker.DstPort(context.TestPort), - checker.SeqNum(uint32(c.IRS)), - checker.AckNum(uint32(790)), - checker.TCPFlags(header.TCPFlagAck), - ), - ) - } + // Now receive 1 keepalives, but don't ACK it. + b := c.GetPacket() + checker.IPv4(t, b, + checker.TCP( + checker.DstPort(context.TestPort), + checker.SeqNum(uint32(c.IRS)), + checker.AckNum(uint32(790)), + checker.TCPFlags(header.TCPFlagAck), + ), + ) // Sleep for a litte over the KeepAlive interval to make sure // the timer has time to fire after the last ACK and close the // close the socket. - time.Sleep(keepAliveInterval + 5*time.Millisecond) + time.Sleep(keepAliveInterval + keepAliveInterval/2) - // The connection should be terminated after 30ms. + // The connection should be closed with a timeout. // Send an ACK to trigger a RST from the stack as the endpoint should // be dead. c.SendPacket(nil, &context.Headers{ diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go index 431ab4e6b..7b1d72cf4 100644 --- a/pkg/tcpip/transport/tcp/testing/context/context.go +++ b/pkg/tcpip/transport/tcp/testing/context/context.go @@ -152,6 +152,13 @@ func New(t *testing.T, mtu uint32) *Context { t.Fatalf("SetTransportProtocolOption failed: %v", err) } + // Increase minimum RTO in tests to avoid test flakes due to early + // retransmit in case the test executors are overloaded and cause timers + // to fire earlier than expected. + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcpip.TCPMinRTOOption(3*time.Second)); err != nil { + t.Fatalf("failed to set stack-wide minRTO: %s", err) + } + // Some of the congestion control tests send up to 640 packets, we so // set the channel size to 1000. ep := channel.New(1000, mtu, "") @@ -236,7 +243,7 @@ func (c *Context) CheckNoPacket(errMsg string) { func (c *Context) GetPacket() []byte { c.t.Helper() - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() p, ok := c.linkEP.ReadContext(ctx) if !ok { @@ -417,6 +424,8 @@ func (c *Context) SendAckWithSACK(seq seqnum.Value, bytesReceived int, sackBlock // verifies that the packet packet payload of packet matches the slice // of data indicated by offset & size. func (c *Context) ReceiveAndCheckPacket(data []byte, offset, size int) { + c.t.Helper() + c.ReceiveAndCheckPacketWithOptions(data, offset, size, 0) } @@ -425,6 +434,8 @@ func (c *Context) ReceiveAndCheckPacket(data []byte, offset, size int) { // data indicated by offset & size and skips optlen bytes in addition to the IP // TCP headers when comparing the data. func (c *Context) ReceiveAndCheckPacketWithOptions(data []byte, offset, size, optlen int) { + c.t.Helper() + b := c.GetPacket() checker.IPv4(c.t, b, checker.PayloadLen(size+header.TCPMinimumSize+optlen), @@ -447,6 +458,8 @@ func (c *Context) ReceiveAndCheckPacketWithOptions(data []byte, offset, size, op // data indicated by offset & size. It returns true if a packet was received and // processed. func (c *Context) ReceiveNonBlockingAndCheckPacket(data []byte, offset, size int) bool { + c.t.Helper() + b := c.GetPacketNonBlocking() if b == nil { return false @@ -570,6 +583,8 @@ func (c *Context) CreateConnected(iss seqnum.Value, rcvWnd seqnum.Size, epRcvBuf // // PreCondition: c.EP must already be created. func (c *Context) Connect(iss seqnum.Value, rcvWnd seqnum.Size, options []byte) { + c.t.Helper() + // Start connection attempt. waitEntry, notifyCh := waiter.NewChannelEntry(nil) c.WQ.EventRegister(&waitEntry, waiter.EventOut) -- cgit v1.2.3