summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/transport')
-rw-r--r--pkg/tcpip/transport/icmp/endpoint.go17
-rw-r--r--pkg/tcpip/transport/icmp/endpoint_state.go12
-rw-r--r--pkg/tcpip/transport/packet/endpoint.go24
-rw-r--r--pkg/tcpip/transport/packet/endpoint_state.go12
-rw-r--r--pkg/tcpip/transport/raw/endpoint.go22
-rw-r--r--pkg/tcpip/transport/raw/endpoint_state.go12
-rw-r--r--pkg/tcpip/transport/tcp/BUILD3
-rw-r--r--pkg/tcpip/transport/tcp/accept.go14
-rw-r--r--pkg/tcpip/transport/tcp/connect.go66
-rw-r--r--pkg/tcpip/transport/tcp/cubic.go19
-rw-r--r--pkg/tcpip/transport/tcp/dispatcher.go26
-rw-r--r--pkg/tcpip/transport/tcp/dual_stack_test.go2
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go110
-rw-r--r--pkg/tcpip/transport/tcp/endpoint_state.go35
-rw-r--r--pkg/tcpip/transport/tcp/forwarder.go2
-rw-r--r--pkg/tcpip/transport/tcp/protocol.go12
-rw-r--r--pkg/tcpip/transport/tcp/rack.go14
-rw-r--r--pkg/tcpip/transport/tcp/rcv.go37
-rw-r--r--pkg/tcpip/transport/tcp/rcv_state.go29
-rw-r--r--pkg/tcpip/transport/tcp/segment.go31
-rw-r--r--pkg/tcpip/transport/tcp/segment_state.go22
-rw-r--r--pkg/tcpip/transport/tcp/segment_test.go6
-rw-r--r--pkg/tcpip/transport/tcp/snd.go38
-rw-r--r--pkg/tcpip/transport/tcp/snd_state.go42
-rw-r--r--pkg/tcpip/transport/tcp/tcp_rack_test.go6
-rw-r--r--pkg/tcpip/transport/tcp/tcp_test.go54
-rw-r--r--pkg/tcpip/transport/tcp/timer.go51
-rw-r--r--pkg/tcpip/transport/tcp/timer_test.go7
-rw-r--r--pkg/tcpip/transport/udp/BUILD1
-rw-r--r--pkg/tcpip/transport/udp/endpoint.go32
-rw-r--r--pkg/tcpip/transport/udp/endpoint_state.go26
-rw-r--r--pkg/tcpip/transport/udp/forwarder.go2
-rw-r--r--pkg/tcpip/transport/udp/udp_test.go115
33 files changed, 413 insertions, 488 deletions
diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go
index 8afde7fca..39f526023 100644
--- a/pkg/tcpip/transport/icmp/endpoint.go
+++ b/pkg/tcpip/transport/icmp/endpoint.go
@@ -16,6 +16,7 @@ package icmp
import (
"io"
+ "time"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
@@ -33,7 +34,7 @@ type icmpPacket struct {
icmpPacketEntry
senderAddress tcpip.FullAddress
data buffer.VectorisedView `state:".(buffer.VectorisedView)"`
- timestamp int64
+ receivedAt time.Time `state:".(int64)"`
}
type endpointState int
@@ -160,7 +161,7 @@ func (e *endpoint) Close() {
}
// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
-func (e *endpoint) ModerateRecvBuf(copied int) {}
+func (*endpoint) ModerateRecvBuf(int) {}
// SetOwner implements tcpip.Endpoint.SetOwner.
func (e *endpoint) SetOwner(owner tcpip.PacketOwner) {
@@ -193,7 +194,7 @@ func (e *endpoint) Read(dst io.Writer, opts tcpip.ReadOptions) (tcpip.ReadResult
Total: p.data.Size(),
ControlMessages: tcpip.ControlMessages{
HasTimestamp: true,
- Timestamp: p.timestamp,
+ Timestamp: p.receivedAt.UnixNano(),
},
}
if opts.NeedRemoteAddr {
@@ -349,7 +350,7 @@ func (e *endpoint) write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp
}
// SetSockOpt sets a socket option.
-func (e *endpoint) SetSockOpt(opt tcpip.SettableSocketOption) tcpip.Error {
+func (*endpoint) SetSockOpt(tcpip.SettableSocketOption) tcpip.Error {
return nil
}
@@ -390,7 +391,7 @@ func (e *endpoint) GetSockOptInt(opt tcpip.SockOptInt) (int, tcpip.Error) {
}
// GetSockOpt implements tcpip.Endpoint.GetSockOpt.
-func (e *endpoint) GetSockOpt(opt tcpip.GettableSocketOption) tcpip.Error {
+func (*endpoint) GetSockOpt(tcpip.GettableSocketOption) tcpip.Error {
return &tcpip.ErrUnknownProtocolOption{}
}
@@ -606,7 +607,7 @@ func (*endpoint) Accept(*tcpip.FullAddress) (tcpip.Endpoint, *waiter.Queue, tcpi
return nil, nil, &tcpip.ErrNotSupported{}
}
-func (e *endpoint) registerWithStack(nicID tcpip.NICID, netProtos []tcpip.NetworkProtocolNumber, id stack.TransportEndpointID) (stack.TransportEndpointID, tcpip.Error) {
+func (e *endpoint) registerWithStack(_ tcpip.NICID, netProtos []tcpip.NetworkProtocolNumber, id stack.TransportEndpointID) (stack.TransportEndpointID, tcpip.Error) {
if id.LocalPort != 0 {
// The endpoint already has a local port, just attempt to
// register it.
@@ -615,7 +616,7 @@ func (e *endpoint) registerWithStack(nicID tcpip.NICID, netProtos []tcpip.Networ
}
// We need to find a port for the endpoint.
- _, err := e.stack.PickEphemeralPort(func(p uint16) (bool, tcpip.Error) {
+ _, err := e.stack.PickEphemeralPort(e.stack.Rand(), func(p uint16) (bool, tcpip.Error) {
id.LocalPort = p
err := e.stack.RegisterTransportEndpoint(netProtos, e.TransProto, id, e, ports.Flags{}, 0 /* bindtodevice */)
switch err.(type) {
@@ -800,7 +801,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB
e.rcvList.PushBack(packet)
e.rcvBufSize += packet.data.Size()
- packet.timestamp = e.stack.Clock().NowNanoseconds()
+ packet.receivedAt = e.stack.Clock().Now()
e.rcvMu.Unlock()
e.stats.PacketsReceived.Increment()
diff --git a/pkg/tcpip/transport/icmp/endpoint_state.go b/pkg/tcpip/transport/icmp/endpoint_state.go
index 28a56a2d5..b8b839e4a 100644
--- a/pkg/tcpip/transport/icmp/endpoint_state.go
+++ b/pkg/tcpip/transport/icmp/endpoint_state.go
@@ -15,11 +15,23 @@
package icmp
import (
+ "time"
+
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
"gvisor.dev/gvisor/pkg/tcpip/stack"
)
+// saveReceivedAt is invoked by stateify.
+func (p *icmpPacket) saveReceivedAt() int64 {
+ return p.receivedAt.UnixNano()
+}
+
+// loadReceivedAt is invoked by stateify.
+func (p *icmpPacket) loadReceivedAt(nsec int64) {
+ p.receivedAt = time.Unix(0, nsec)
+}
+
// saveData saves icmpPacket.data field.
func (p *icmpPacket) saveData() buffer.VectorisedView {
// We cannot save p.data directly as p.data.views may alias to p.views,
diff --git a/pkg/tcpip/transport/packet/endpoint.go b/pkg/tcpip/transport/packet/endpoint.go
index 496eca581..cd8c99d41 100644
--- a/pkg/tcpip/transport/packet/endpoint.go
+++ b/pkg/tcpip/transport/packet/endpoint.go
@@ -27,6 +27,7 @@ package packet
import (
"fmt"
"io"
+ "time"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
@@ -41,9 +42,8 @@ type packet struct {
packetEntry
// data holds the actual packet data, including any headers and
// payload.
- data buffer.VectorisedView `state:".(buffer.VectorisedView)"`
- // timestampNS is the unix time at which the packet was received.
- timestampNS int64
+ data buffer.VectorisedView `state:".(buffer.VectorisedView)"`
+ receivedAt time.Time `state:".(int64)"`
// senderAddr is the network address of the sender.
senderAddr tcpip.FullAddress
// packetInfo holds additional information like the protocol
@@ -159,7 +159,7 @@ func (ep *endpoint) Close() {
}
// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
-func (ep *endpoint) ModerateRecvBuf(copied int) {}
+func (*endpoint) ModerateRecvBuf(int) {}
// Read implements tcpip.Endpoint.Read.
func (ep *endpoint) Read(dst io.Writer, opts tcpip.ReadOptions) (tcpip.ReadResult, tcpip.Error) {
@@ -189,7 +189,7 @@ func (ep *endpoint) Read(dst io.Writer, opts tcpip.ReadOptions) (tcpip.ReadResul
Total: packet.data.Size(),
ControlMessages: tcpip.ControlMessages{
HasTimestamp: true,
- Timestamp: packet.timestampNS,
+ Timestamp: packet.receivedAt.UnixNano(),
},
}
if opts.NeedRemoteAddr {
@@ -220,19 +220,19 @@ func (*endpoint) Disconnect() tcpip.Error {
// Connect implements tcpip.Endpoint.Connect. Packet sockets cannot be
// connected, and this function always returnes *tcpip.ErrNotSupported.
-func (*endpoint) Connect(addr tcpip.FullAddress) tcpip.Error {
+func (*endpoint) Connect(tcpip.FullAddress) tcpip.Error {
return &tcpip.ErrNotSupported{}
}
// Shutdown implements tcpip.Endpoint.Shutdown. Packet sockets cannot be used
// with Shutdown, and this function always returns *tcpip.ErrNotSupported.
-func (*endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error {
+func (*endpoint) Shutdown(tcpip.ShutdownFlags) tcpip.Error {
return &tcpip.ErrNotSupported{}
}
// Listen implements tcpip.Endpoint.Listen. Packet sockets cannot be used with
// Listen, and this function always returns *tcpip.ErrNotSupported.
-func (*endpoint) Listen(backlog int) tcpip.Error {
+func (*endpoint) Listen(int) tcpip.Error {
return &tcpip.ErrNotSupported{}
}
@@ -318,7 +318,7 @@ func (ep *endpoint) SetSockOpt(opt tcpip.SettableSocketOption) tcpip.Error {
}
// SetSockOptInt implements tcpip.Endpoint.SetSockOptInt.
-func (ep *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error {
+func (*endpoint) SetSockOptInt(tcpip.SockOptInt, int) tcpip.Error {
return &tcpip.ErrUnknownProtocolOption{}
}
@@ -339,7 +339,7 @@ func (ep *endpoint) UpdateLastError(err tcpip.Error) {
}
// GetSockOpt implements tcpip.Endpoint.GetSockOpt.
-func (ep *endpoint) GetSockOpt(opt tcpip.GettableSocketOption) tcpip.Error {
+func (*endpoint) GetSockOpt(tcpip.GettableSocketOption) tcpip.Error {
return &tcpip.ErrNotSupported{}
}
@@ -451,7 +451,7 @@ func (ep *endpoint) HandlePacket(nicID tcpip.NICID, localAddr tcpip.LinkAddress,
packet.data = buffer.NewVectorisedView(pkt.Size(), pkt.Views())
}
}
- packet.timestampNS = ep.stack.Clock().NowNanoseconds()
+ packet.receivedAt = ep.stack.Clock().Now()
ep.rcvList.PushBack(&packet)
ep.rcvBufSize += packet.data.Size()
@@ -484,7 +484,7 @@ func (ep *endpoint) Stats() tcpip.EndpointStats {
}
// SetOwner implements tcpip.Endpoint.SetOwner.
-func (ep *endpoint) SetOwner(owner tcpip.PacketOwner) {}
+func (*endpoint) SetOwner(tcpip.PacketOwner) {}
// SocketOptions implements tcpip.Endpoint.SocketOptions.
func (ep *endpoint) SocketOptions() *tcpip.SocketOptions {
diff --git a/pkg/tcpip/transport/packet/endpoint_state.go b/pkg/tcpip/transport/packet/endpoint_state.go
index 5bd860d20..e729921db 100644
--- a/pkg/tcpip/transport/packet/endpoint_state.go
+++ b/pkg/tcpip/transport/packet/endpoint_state.go
@@ -15,11 +15,23 @@
package packet
import (
+ "time"
+
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
"gvisor.dev/gvisor/pkg/tcpip/stack"
)
+// saveReceivedAt is invoked by stateify.
+func (p *packet) saveReceivedAt() int64 {
+ return p.receivedAt.UnixNano()
+}
+
+// loadReceivedAt is invoked by stateify.
+func (p *packet) loadReceivedAt(nsec int64) {
+ p.receivedAt = time.Unix(0, nsec)
+}
+
// saveData saves packet.data field.
func (p *packet) saveData() buffer.VectorisedView {
// We cannot save p.data directly as p.data.views may alias to p.views,
diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go
index bcec3d2e7..1bce2769a 100644
--- a/pkg/tcpip/transport/raw/endpoint.go
+++ b/pkg/tcpip/transport/raw/endpoint.go
@@ -27,6 +27,7 @@ package raw
import (
"io"
+ "time"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
@@ -41,9 +42,8 @@ type rawPacket struct {
rawPacketEntry
// data holds the actual packet data, including any headers and
// payload.
- data buffer.VectorisedView `state:".(buffer.VectorisedView)"`
- // timestampNS is the unix time at which the packet was received.
- timestampNS int64
+ data buffer.VectorisedView `state:".(buffer.VectorisedView)"`
+ receivedAt time.Time `state:".(int64)"`
// senderAddr is the network address of the sender.
senderAddr tcpip.FullAddress
}
@@ -183,7 +183,7 @@ func (e *endpoint) Close() {
}
// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
-func (e *endpoint) ModerateRecvBuf(copied int) {}
+func (*endpoint) ModerateRecvBuf(int) {}
func (e *endpoint) SetOwner(owner tcpip.PacketOwner) {
e.mu.Lock()
@@ -219,7 +219,7 @@ func (e *endpoint) Read(dst io.Writer, opts tcpip.ReadOptions) (tcpip.ReadResult
Total: pkt.data.Size(),
ControlMessages: tcpip.ControlMessages{
HasTimestamp: true,
- Timestamp: pkt.timestampNS,
+ Timestamp: pkt.receivedAt.UnixNano(),
},
}
if opts.NeedRemoteAddr {
@@ -402,7 +402,7 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) tcpip.Error {
}
// Find a route to the destination.
- route, err := e.stack.FindRoute(nic, tcpip.Address(""), addr.Addr, e.NetProto, false)
+ route, err := e.stack.FindRoute(nic, "", addr.Addr, e.NetProto, false)
if err != nil {
return err
}
@@ -428,7 +428,7 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) tcpip.Error {
}
// Shutdown implements tcpip.Endpoint.Shutdown. It's a noop for raw sockets.
-func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error {
+func (e *endpoint) Shutdown(tcpip.ShutdownFlags) tcpip.Error {
e.mu.Lock()
defer e.mu.Unlock()
@@ -439,7 +439,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error {
}
// Listen implements tcpip.Endpoint.Listen.
-func (*endpoint) Listen(backlog int) tcpip.Error {
+func (*endpoint) Listen(int) tcpip.Error {
return &tcpip.ErrNotSupported{}
}
@@ -513,12 +513,12 @@ func (e *endpoint) SetSockOpt(opt tcpip.SettableSocketOption) tcpip.Error {
}
}
-func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) tcpip.Error {
+func (*endpoint) SetSockOptInt(tcpip.SockOptInt, int) tcpip.Error {
return &tcpip.ErrUnknownProtocolOption{}
}
// GetSockOpt implements tcpip.Endpoint.GetSockOpt.
-func (e *endpoint) GetSockOpt(opt tcpip.GettableSocketOption) tcpip.Error {
+func (*endpoint) GetSockOpt(tcpip.GettableSocketOption) tcpip.Error {
return &tcpip.ErrUnknownProtocolOption{}
}
@@ -621,7 +621,7 @@ func (e *endpoint) HandlePacket(pkt *stack.PacketBuffer) {
}
combinedVV.Append(pkt.Data().ExtractVV())
packet.data = combinedVV
- packet.timestampNS = e.stack.Clock().NowNanoseconds()
+ packet.receivedAt = e.stack.Clock().Now()
e.rcvList.PushBack(packet)
e.rcvBufSize += packet.data.Size()
diff --git a/pkg/tcpip/transport/raw/endpoint_state.go b/pkg/tcpip/transport/raw/endpoint_state.go
index 5d6f2709c..39669b445 100644
--- a/pkg/tcpip/transport/raw/endpoint_state.go
+++ b/pkg/tcpip/transport/raw/endpoint_state.go
@@ -15,11 +15,23 @@
package raw
import (
+ "time"
+
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
"gvisor.dev/gvisor/pkg/tcpip/stack"
)
+// saveReceivedAt is invoked by stateify.
+func (p *rawPacket) saveReceivedAt() int64 {
+ return p.receivedAt.UnixNano()
+}
+
+// loadReceivedAt is invoked by stateify.
+func (p *rawPacket) loadReceivedAt(nsec int64) {
+ p.receivedAt = time.Unix(0, nsec)
+}
+
// saveData saves rawPacket.data field.
func (p *rawPacket) saveData() buffer.VectorisedView {
// We cannot save p.data directly as p.data.views may alias to p.views,
diff --git a/pkg/tcpip/transport/tcp/BUILD b/pkg/tcpip/transport/tcp/BUILD
index 0f20d3856..8436d2cf0 100644
--- a/pkg/tcpip/transport/tcp/BUILD
+++ b/pkg/tcpip/transport/tcp/BUILD
@@ -41,7 +41,6 @@ go_library(
"protocol.go",
"rack.go",
"rcv.go",
- "rcv_state.go",
"reno.go",
"reno_recovery.go",
"sack.go",
@@ -53,7 +52,6 @@ go_library(
"segment_state.go",
"segment_unsafe.go",
"snd.go",
- "snd_state.go",
"tcp_endpoint_list.go",
"tcp_segment_list.go",
"timer.go",
@@ -134,6 +132,7 @@ go_test(
deps = [
"//pkg/sleep",
"//pkg/tcpip/buffer",
+ "//pkg/tcpip/faketime",
"//pkg/tcpip/stack",
"@com_github_google_go_cmp//cmp:go_default_library",
],
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index d4bd4e80e..2c65b737d 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -114,8 +114,8 @@ type listenContext struct {
}
// timeStamp returns an 8-bit timestamp with a granularity of 64 seconds.
-func timeStamp() uint32 {
- return uint32(time.Now().Unix()>>6) & tsMask
+func timeStamp(clock tcpip.Clock) uint32 {
+ return uint32(clock.NowMonotonic().Sub(tcpip.MonotonicTime{}).Seconds()) >> 6 & tsMask
}
// newListenContext creates a new listen context.
@@ -171,7 +171,7 @@ func (l *listenContext) cookieHash(id stack.TransportEndpointID, ts uint32, nonc
// createCookie creates a SYN cookie for the given id and incoming sequence
// number.
func (l *listenContext) createCookie(id stack.TransportEndpointID, seq seqnum.Value, data uint32) seqnum.Value {
- ts := timeStamp()
+ ts := timeStamp(l.stack.Clock())
v := l.cookieHash(id, 0, 0) + uint32(seq) + (ts << tsOffset)
v += (l.cookieHash(id, ts, 1) + data) & hashMask
return seqnum.Value(v)
@@ -181,7 +181,7 @@ func (l *listenContext) createCookie(id stack.TransportEndpointID, seq seqnum.Va
// sequence number. If it is, it also returns the data originally encoded in the
// cookie when createCookie was called.
func (l *listenContext) isCookieValid(id stack.TransportEndpointID, cookie seqnum.Value, seq seqnum.Value) (uint32, bool) {
- ts := timeStamp()
+ ts := timeStamp(l.stack.Clock())
v := uint32(cookie) - l.cookieHash(id, 0, 0) - uint32(seq)
cookieTS := v >> tsOffset
if ((ts - cookieTS) & tsMask) > maxTSDiff {
@@ -247,7 +247,7 @@ func (l *listenContext) createConnectingEndpoint(s *segment, rcvdSynOpts *header
func (l *listenContext) startHandshake(s *segment, opts *header.TCPSynOptions, queue *waiter.Queue, owner tcpip.PacketOwner) (*handshake, tcpip.Error) {
// Create new endpoint.
irs := s.sequenceNumber
- isn := generateSecureISN(s.id, l.stack.Seed())
+ isn := generateSecureISN(s.id, l.stack.Clock(), l.stack.Seed())
ep, err := l.createConnectingEndpoint(s, opts, queue)
if err != nil {
return nil, err
@@ -550,7 +550,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err
e.rcvQueueInfo.rcvQueueMu.Lock()
rcvClosed := e.rcvQueueInfo.RcvClosed
e.rcvQueueInfo.rcvQueueMu.Unlock()
- if rcvClosed || s.flagsAreSet(header.TCPFlagSyn|header.TCPFlagAck) {
+ if rcvClosed || s.flags.Contains(header.TCPFlagSyn|header.TCPFlagAck) {
// If the endpoint is shutdown, reply with reset.
//
// RFC 793 section 3.4 page 35 (figure 12) outlines that a RST
@@ -591,7 +591,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) tcpip.Err
synOpts := header.TCPSynOptions{
WS: -1,
TS: opts.TS,
- TSVal: tcpTimeStamp(time.Now(), timeStampOffset()),
+ TSVal: tcpTimeStamp(e.stack.Clock().NowMonotonic(), timeStampOffset(e.stack.Rand())),
TSEcr: opts.TSVal,
MSS: calculateAdvertisedMSS(e.userMSS, route),
}
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 5e03e7715..570e5081c 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -19,7 +19,6 @@ import (
"math"
"time"
- "gvisor.dev/gvisor/pkg/rand"
"gvisor.dev/gvisor/pkg/sleep"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
@@ -92,7 +91,7 @@ type handshake struct {
rcvWndScale int
// startTime is the time at which the first SYN/SYN-ACK was sent.
- startTime time.Time
+ startTime tcpip.MonotonicTime
// deferAccept if non-zero will drop the final ACK for a passive
// handshake till an ACK segment with data is received or the timeout is
@@ -147,21 +146,16 @@ func FindWndScale(wnd seqnum.Size) int {
// resetState resets the state of the handshake object such that it becomes
// ready for a new 3-way handshake.
func (h *handshake) resetState() {
- b := make([]byte, 4)
- if _, err := rand.Read(b); err != nil {
- panic(err)
- }
-
h.state = handshakeSynSent
h.flags = header.TCPFlagSyn
h.ackNum = 0
h.mss = 0
- h.iss = generateSecureISN(h.ep.TransportEndpointInfo.ID, h.ep.stack.Seed())
+ h.iss = generateSecureISN(h.ep.TransportEndpointInfo.ID, h.ep.stack.Clock(), h.ep.stack.Seed())
}
// generateSecureISN generates a secure Initial Sequence number based on the
// recommendation here https://tools.ietf.org/html/rfc6528#page-3.
-func generateSecureISN(id stack.TransportEndpointID, seed uint32) seqnum.Value {
+func generateSecureISN(id stack.TransportEndpointID, clock tcpip.Clock, seed uint32) seqnum.Value {
isnHasher := jenkins.Sum32(seed)
isnHasher.Write([]byte(id.LocalAddress))
isnHasher.Write([]byte(id.RemoteAddress))
@@ -180,7 +174,7 @@ func generateSecureISN(id stack.TransportEndpointID, seed uint32) seqnum.Value {
//
// Which sort of guarantees that we won't reuse the ISN for a new
// connection for the same tuple for at least 274s.
- isn := isnHasher.Sum32() + uint32(time.Now().UnixNano()>>6)
+ isn := isnHasher.Sum32() + uint32(clock.NowMonotonic().Sub(tcpip.MonotonicTime{}).Nanoseconds()>>6)
return seqnum.Value(isn)
}
@@ -212,7 +206,7 @@ func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *hea
// a TCP 3-way handshake is valid. If it's not, a RST segment is sent back in
// response.
func (h *handshake) checkAck(s *segment) bool {
- if s.flagIsSet(header.TCPFlagAck) && s.ackNumber != h.iss+1 {
+ if s.flags.Contains(header.TCPFlagAck) && s.ackNumber != h.iss+1 {
// RFC 793, page 36, states that a reset must be generated when
// the connection is in any non-synchronized state and an
// incoming segment acknowledges something not yet sent. The
@@ -230,8 +224,8 @@ func (h *handshake) checkAck(s *segment) bool {
func (h *handshake) synSentState(s *segment) tcpip.Error {
// RFC 793, page 37, states that in the SYN-SENT state, a reset is
// acceptable if the ack field acknowledges the SYN.
- if s.flagIsSet(header.TCPFlagRst) {
- if s.flagIsSet(header.TCPFlagAck) && s.ackNumber == h.iss+1 {
+ if s.flags.Contains(header.TCPFlagRst) {
+ if s.flags.Contains(header.TCPFlagAck) && s.ackNumber == h.iss+1 {
// RFC 793, page 67, states that "If the RST bit is set [and] If the ACK
// was acceptable then signal the user "error: connection reset", drop
// the segment, enter CLOSED state, delete TCB, and return."
@@ -249,7 +243,7 @@ func (h *handshake) synSentState(s *segment) tcpip.Error {
// We are in the SYN-SENT state. We only care about segments that have
// the SYN flag.
- if !s.flagIsSet(header.TCPFlagSyn) {
+ if !s.flags.Contains(header.TCPFlagSyn) {
return nil
}
@@ -270,7 +264,7 @@ func (h *handshake) synSentState(s *segment) tcpip.Error {
// If this is a SYN ACK response, we only need to acknowledge the SYN
// and the handshake is completed.
- if s.flagIsSet(header.TCPFlagAck) {
+ if s.flags.Contains(header.TCPFlagAck) {
h.state = handshakeCompleted
h.ep.transitionToStateEstablishedLocked(h)
@@ -316,7 +310,7 @@ func (h *handshake) synSentState(s *segment) tcpip.Error {
// synRcvdState handles a segment received when the TCP 3-way handshake is in
// the SYN-RCVD state.
func (h *handshake) synRcvdState(s *segment) tcpip.Error {
- if s.flagIsSet(header.TCPFlagRst) {
+ if s.flags.Contains(header.TCPFlagRst) {
// RFC 793, page 37, states that in the SYN-RCVD state, a reset
// is acceptable if the sequence number is in the window.
if s.sequenceNumber.InWindow(h.ackNum, h.rcvWnd) {
@@ -340,13 +334,13 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error {
return nil
}
- if s.flagIsSet(header.TCPFlagSyn) && s.sequenceNumber != h.ackNum-1 {
+ if s.flags.Contains(header.TCPFlagSyn) && s.sequenceNumber != h.ackNum-1 {
// We received two SYN segments with different sequence
// numbers, so we reset this and restart the whole
// process, except that we don't reset the timer.
ack := s.sequenceNumber.Add(s.logicalLen())
seq := seqnum.Value(0)
- if s.flagIsSet(header.TCPFlagAck) {
+ if s.flags.Contains(header.TCPFlagAck) {
seq = s.ackNumber
}
h.ep.sendRaw(buffer.VectorisedView{}, header.TCPFlagRst|header.TCPFlagAck, seq, ack, 0)
@@ -378,10 +372,10 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error {
// We have previously received (and acknowledged) the peer's SYN. If the
// peer acknowledges our SYN, the handshake is completed.
- if s.flagIsSet(header.TCPFlagAck) {
+ if s.flags.Contains(header.TCPFlagAck) {
// If deferAccept is not zero and this is a bare ACK and the
// timeout is not hit then drop the ACK.
- if h.deferAccept != 0 && s.data.Size() == 0 && time.Since(h.startTime) < h.deferAccept {
+ if h.deferAccept != 0 && s.data.Size() == 0 && h.ep.stack.Clock().NowMonotonic().Sub(h.startTime) < h.deferAccept {
h.acked = true
h.ep.stack.Stats().DroppedPackets.Increment()
return nil
@@ -426,7 +420,7 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error {
func (h *handshake) handleSegment(s *segment) tcpip.Error {
h.sndWnd = s.window
- if !s.flagIsSet(header.TCPFlagSyn) && h.sndWndScale > 0 {
+ if !s.flags.Contains(header.TCPFlagSyn) && h.sndWndScale > 0 {
h.sndWnd <<= uint8(h.sndWndScale)
}
@@ -474,7 +468,7 @@ func (h *handshake) processSegments() tcpip.Error {
// start sends the first SYN/SYN-ACK. It does not block, even if link address
// resolution is required.
func (h *handshake) start() {
- h.startTime = time.Now()
+ h.startTime = h.ep.stack.Clock().NowMonotonic()
h.ep.amss = calculateAdvertisedMSS(h.ep.userMSS, h.ep.route)
var sackEnabled tcpip.TCPSACKEnabled
if err := h.ep.stack.TransportProtocolOption(ProtocolNumber, &sackEnabled); err != nil {
@@ -527,7 +521,7 @@ func (h *handshake) complete() tcpip.Error {
defer s.Done()
// Initialize the resend timer.
- timer, err := newBackoffTimer(time.Second, MaxRTO, resendWaker.Assert)
+ timer, err := newBackoffTimer(h.ep.stack.Clock(), time.Second, MaxRTO, resendWaker.Assert)
if err != nil {
return err
}
@@ -552,7 +546,7 @@ func (h *handshake) complete() tcpip.Error {
// The last is required to provide a way for the peer to complete
// the connection with another ACK or data (as ACKs are never
// retransmitted on their own).
- if h.active || !h.acked || h.deferAccept != 0 && time.Since(h.startTime) > h.deferAccept {
+ if h.active || !h.acked || h.deferAccept != 0 && h.ep.stack.Clock().NowMonotonic().Sub(h.startTime) > h.deferAccept {
h.ep.sendSynTCP(h.ep.route, tcpFields{
id: h.ep.TransportEndpointInfo.ID,
ttl: h.ep.ttl,
@@ -608,15 +602,15 @@ func (h *handshake) complete() tcpip.Error {
type backoffTimer struct {
timeout time.Duration
maxTimeout time.Duration
- t *time.Timer
+ t tcpip.Timer
}
-func newBackoffTimer(timeout, maxTimeout time.Duration, f func()) (*backoffTimer, tcpip.Error) {
+func newBackoffTimer(clock tcpip.Clock, timeout, maxTimeout time.Duration, f func()) (*backoffTimer, tcpip.Error) {
if timeout > maxTimeout {
return nil, &tcpip.ErrTimeout{}
}
bt := &backoffTimer{timeout: timeout, maxTimeout: maxTimeout}
- bt.t = time.AfterFunc(timeout, f)
+ bt.t = clock.AfterFunc(timeout, f)
return bt, nil
}
@@ -634,7 +628,7 @@ func (bt *backoffTimer) stop() {
}
func parseSynSegmentOptions(s *segment) header.TCPSynOptions {
- synOpts := header.ParseSynOptions(s.options, s.flagIsSet(header.TCPFlagAck))
+ synOpts := header.ParseSynOptions(s.options, s.flags.Contains(header.TCPFlagAck))
if synOpts.TS {
s.parsedOptions.TSVal = synOpts.TSVal
s.parsedOptions.TSEcr = synOpts.TSEcr
@@ -1188,11 +1182,11 @@ func (e *endpoint) handleSegmentLocked(s *segment) (cont bool, err tcpip.Error)
// the TCPEndpointState after the segment is processed.
defer e.probeSegmentLocked()
- if s.flagIsSet(header.TCPFlagRst) {
+ if s.flags.Contains(header.TCPFlagRst) {
if ok, err := e.handleReset(s); !ok {
return false, err
}
- } else if s.flagIsSet(header.TCPFlagSyn) {
+ } else if s.flags.Contains(header.TCPFlagSyn) {
// See: https://tools.ietf.org/html/rfc5961#section-4.1
// 1) If the SYN bit is set, irrespective of the sequence number, TCP
// MUST send an ACK (also referred to as challenge ACK) to the remote
@@ -1216,7 +1210,7 @@ func (e *endpoint) handleSegmentLocked(s *segment) (cont bool, err tcpip.Error)
// should then rely on SYN retransmission from the remote end to
// re-establish the connection.
e.snd.maybeSendOutOfWindowAck(s)
- } else if s.flagIsSet(header.TCPFlagAck) {
+ } else if s.flags.Contains(header.TCPFlagAck) {
// Patch the window size in the segment according to the
// send window scale.
s.window <<= e.snd.SndWndScale
@@ -1235,7 +1229,7 @@ func (e *endpoint) handleSegmentLocked(s *segment) (cont bool, err tcpip.Error)
// Now check if the received segment has caused us to transition
// to a CLOSED state, if yes then terminate processing and do
// not invoke the sender.
- state := e.state
+ state := e.EndpointState()
if state == StateClose {
// When we get into StateClose while processing from the queue,
// return immediately and let the protocolMainloop handle it.
@@ -1267,7 +1261,7 @@ func (e *endpoint) keepaliveTimerExpired() tcpip.Error {
// If a userTimeout is set then abort the connection if it is
// exceeded.
- if userTimeout != 0 && time.Since(e.rcv.lastRcvdAckTime) >= userTimeout && e.keepalive.unacked > 0 {
+ if userTimeout != 0 && e.stack.Clock().NowMonotonic().Sub(e.rcv.lastRcvdAckTime) >= userTimeout && e.keepalive.unacked > 0 {
e.keepalive.Unlock()
e.stack.Stats().TCP.EstablishedTimedout.Increment()
return &tcpip.ErrTimeout{}
@@ -1322,7 +1316,7 @@ func (e *endpoint) disableKeepaliveTimer() {
// segments.
func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{}) tcpip.Error {
e.mu.Lock()
- var closeTimer *time.Timer
+ var closeTimer tcpip.Timer
var closeWaker sleep.Waker
epilogue := func() {
@@ -1484,7 +1478,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
if e.EndpointState() == StateFinWait2 && e.closed {
// The socket has been closed and we are in FIN_WAIT2
// so start the FIN_WAIT2 timer.
- closeTimer = time.AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
+ closeTimer = e.stack.Clock().AfterFunc(e.tcpLingerTimeout, closeWaker.Assert)
}
}
@@ -1721,7 +1715,7 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
var timeWaitWaker sleep.Waker
s.AddWaker(&timeWaitWaker, timeWaitDone)
- timeWaitTimer := time.AfterFunc(timeWaitDuration, timeWaitWaker.Assert)
+ timeWaitTimer := e.stack.Clock().AfterFunc(timeWaitDuration, timeWaitWaker.Assert)
defer timeWaitTimer.Stop()
for {
diff --git a/pkg/tcpip/transport/tcp/cubic.go b/pkg/tcpip/transport/tcp/cubic.go
index 962f1d687..6985194bb 100644
--- a/pkg/tcpip/transport/tcp/cubic.go
+++ b/pkg/tcpip/transport/tcp/cubic.go
@@ -41,7 +41,7 @@ type cubicState struct {
func newCubicCC(s *sender) *cubicState {
return &cubicState{
TCPCubicState: stack.TCPCubicState{
- T: time.Now(),
+ T: s.ep.stack.Clock().NowMonotonic(),
Beta: 0.7,
C: 0.4,
},
@@ -60,7 +60,7 @@ func (c *cubicState) enterCongestionAvoidance() {
// https://tools.ietf.org/html/rfc8312#section-4.8
if c.numCongestionEvents == 0 {
c.K = 0
- c.T = time.Now()
+ c.T = c.s.ep.stack.Clock().NowMonotonic()
c.WLastMax = c.WMax
c.WMax = float64(c.s.SndCwnd)
}
@@ -115,14 +115,15 @@ func (c *cubicState) cubicCwnd(t float64) float64 {
// getCwnd returns the current congestion window as computed by CUBIC.
// Refer: https://tools.ietf.org/html/rfc8312#section-4
func (c *cubicState) getCwnd(packetsAcked, sndCwnd int, srtt time.Duration) int {
- elapsed := time.Since(c.T).Seconds()
+ elapsed := c.s.ep.stack.Clock().NowMonotonic().Sub(c.T)
+ elapsedSeconds := elapsed.Seconds()
// Compute the window as per Cubic after 'elapsed' time
// since last congestion event.
- c.WC = c.cubicCwnd(elapsed - c.K)
+ c.WC = c.cubicCwnd(elapsedSeconds - c.K)
// Compute the TCP friendly estimate of the congestion window.
- c.WEst = c.WMax*c.Beta + (3.0*((1.0-c.Beta)/(1.0+c.Beta)))*(elapsed/srtt.Seconds())
+ c.WEst = c.WMax*c.Beta + (3.0*((1.0-c.Beta)/(1.0+c.Beta)))*(elapsedSeconds/srtt.Seconds())
// Make sure in the TCP friendly region CUBIC performs at least
// as well as Reno.
@@ -134,7 +135,7 @@ func (c *cubicState) getCwnd(packetsAcked, sndCwnd int, srtt time.Duration) int
// In Concave/Convex region of CUBIC, calculate what CUBIC window
// will be after 1 RTT and use that to grow congestion window
// for every ack.
- tEst := (time.Since(c.T) + srtt).Seconds()
+ tEst := (elapsed + srtt).Seconds()
wtRtt := c.cubicCwnd(tEst - c.K)
// As per 4.3 for each received ACK cwnd must be incremented
// by (w_cubic(t+RTT) - cwnd/cwnd.
@@ -151,7 +152,7 @@ func (c *cubicState) getCwnd(packetsAcked, sndCwnd int, srtt time.Duration) int
func (c *cubicState) HandleLossDetected() {
// See: https://tools.ietf.org/html/rfc8312#section-4.5
c.numCongestionEvents++
- c.T = time.Now()
+ c.T = c.s.ep.stack.Clock().NowMonotonic()
c.WLastMax = c.WMax
c.WMax = float64(c.s.SndCwnd)
@@ -162,7 +163,7 @@ func (c *cubicState) HandleLossDetected() {
// HandleRTOExpired implements congestionContrl.HandleRTOExpired.
func (c *cubicState) HandleRTOExpired() {
// See: https://tools.ietf.org/html/rfc8312#section-4.6
- c.T = time.Now()
+ c.T = c.s.ep.stack.Clock().NowMonotonic()
c.numCongestionEvents = 0
c.WLastMax = c.WMax
c.WMax = float64(c.s.SndCwnd)
@@ -193,7 +194,7 @@ func (c *cubicState) fastConvergence() {
// PostRecovery implemements congestionControl.PostRecovery.
func (c *cubicState) PostRecovery() {
- c.T = time.Now()
+ c.T = c.s.ep.stack.Clock().NowMonotonic()
}
// reduceSlowStartThreshold returns new SsThresh as described in
diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go
index 512053a04..dff7cb89c 100644
--- a/pkg/tcpip/transport/tcp/dispatcher.go
+++ b/pkg/tcpip/transport/tcp/dispatcher.go
@@ -16,10 +16,11 @@ package tcp
import (
"encoding/binary"
+ "math/rand"
- "gvisor.dev/gvisor/pkg/rand"
"gvisor.dev/gvisor/pkg/sleep"
"gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/hash/jenkins"
"gvisor.dev/gvisor/pkg/tcpip/header"
"gvisor.dev/gvisor/pkg/tcpip/stack"
@@ -141,15 +142,16 @@ func (p *processor) start(wg *sync.WaitGroup) {
// in-order.
type dispatcher struct {
processors []processor
- seed uint32
- wg sync.WaitGroup
+ // seed is a random secret for a jenkins hash.
+ seed uint32
+ wg sync.WaitGroup
}
-func (d *dispatcher) init(nProcessors int) {
+func (d *dispatcher) init(rng *rand.Rand, nProcessors int) {
d.close()
d.wait()
d.processors = make([]processor, nProcessors)
- d.seed = generateRandUint32()
+ d.seed = rng.Uint32()
for i := range d.processors {
p := &d.processors[i]
p.sleeper.AddWaker(&p.newEndpointWaker, newEndpointWaker)
@@ -172,12 +174,11 @@ func (d *dispatcher) wait() {
d.wg.Wait()
}
-func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.TransportEndpointID, pkt *stack.PacketBuffer) {
+func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.TransportEndpointID, clock tcpip.Clock, pkt *stack.PacketBuffer) {
ep := stackEP.(*endpoint)
- s := newIncomingSegment(id, pkt)
+ s := newIncomingSegment(id, clock, pkt)
if !s.parse(pkt.RXTransportChecksumValidated) {
- ep.stack.Stats().MalformedRcvdPackets.Increment()
ep.stack.Stats().TCP.InvalidSegmentsReceived.Increment()
ep.stats.ReceiveErrors.MalformedPacketsReceived.Increment()
s.decRef()
@@ -185,7 +186,6 @@ func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.Trans
}
if !s.csumValid {
- ep.stack.Stats().MalformedRcvdPackets.Increment()
ep.stack.Stats().TCP.ChecksumErrors.Increment()
ep.stats.ReceiveErrors.ChecksumErrors.Increment()
s.decRef()
@@ -213,14 +213,6 @@ func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.Trans
d.selectProcessor(id).queueEndpoint(ep)
}
-func generateRandUint32() uint32 {
- b := make([]byte, 4)
- if _, err := rand.Read(b); err != nil {
- panic(err)
- }
- return binary.LittleEndian.Uint32(b)
-}
-
func (d *dispatcher) selectProcessor(id stack.TransportEndpointID) *processor {
var payload [4]byte
binary.LittleEndian.PutUint16(payload[0:], id.LocalPort)
diff --git a/pkg/tcpip/transport/tcp/dual_stack_test.go b/pkg/tcpip/transport/tcp/dual_stack_test.go
index f148d505d..5342aacfd 100644
--- a/pkg/tcpip/transport/tcp/dual_stack_test.go
+++ b/pkg/tcpip/transport/tcp/dual_stack_test.go
@@ -421,7 +421,7 @@ func testV4Accept(t *testing.T, c *context.Context) {
r.Reset(data)
nep.Write(&r, tcpip.WriteOptions{})
b = c.GetPacket()
- tcp = header.TCP(header.IPv4(b).Payload())
+ tcp = header.IPv4(b).Payload()
if string(tcp.Payload()) != data {
t.Fatalf("Unexpected data: got %v, want %v", string(tcp.Payload()), data)
}
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 50d39cbad..a27e2110b 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -20,12 +20,12 @@ import (
"fmt"
"io"
"math"
+ "math/rand"
"runtime"
"strings"
"sync/atomic"
"time"
- "gvisor.dev/gvisor/pkg/rand"
"gvisor.dev/gvisor/pkg/sleep"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
@@ -38,19 +38,15 @@ import (
)
// EndpointState represents the state of a TCP endpoint.
-type EndpointState uint32
+type EndpointState tcpip.EndpointState
// Endpoint states. Note that are represented in a netstack-specific manner and
// may not be meaningful externally. Specifically, they need to be translated to
// Linux's representation for these states if presented to userspace.
const (
- // Endpoint states internal to netstack. These map to the TCP state CLOSED.
- StateInitial EndpointState = iota
- StateBound
- StateConnecting // Connect() called, but the initial SYN hasn't been sent.
- StateError
-
- // TCP protocol states.
+ _ EndpointState = iota
+ // TCP protocol states in sync with the definitions in
+ // https://github.com/torvalds/linux/blob/7acac4b3196/include/net/tcp_states.h#L13
StateEstablished
StateSynSent
StateSynRecv
@@ -62,6 +58,12 @@ const (
StateLastAck
StateListen
StateClosing
+
+ // Endpoint states internal to netstack.
+ StateInitial
+ StateBound
+ StateConnecting // Connect() called, but the initial SYN hasn't been sent.
+ StateError
)
const (
@@ -97,6 +99,16 @@ func (s EndpointState) connecting() bool {
}
}
+// internal returns true when the state is netstack internal.
+func (s EndpointState) internal() bool {
+ switch s {
+ case StateInitial, StateBound, StateConnecting, StateError:
+ return true
+ default:
+ return false
+ }
+}
+
// handshake returns true when s is one of the states representing an endpoint
// in the middle of a TCP handshake.
func (s EndpointState) handshake() bool {
@@ -422,12 +434,12 @@ type endpoint struct {
// state must be read/set using the EndpointState()/setEndpointState()
// methods.
- state EndpointState `state:".(EndpointState)"`
+ state uint32 `state:".(EndpointState)"`
// origEndpointState is only used during a restore phase to save the
// endpoint state at restore time as the socket is moved to it's correct
// state.
- origEndpointState EndpointState `state:"nosave"`
+ origEndpointState uint32 `state:"nosave"`
isPortReserved bool `state:"manual"`
isRegistered bool `state:"manual"`
@@ -468,7 +480,7 @@ type endpoint struct {
// recentTSTime is the unix time when we last updated
// TCPEndpointStateInner.RecentTS.
- recentTSTime time.Time `state:".(unixTime)"`
+ recentTSTime tcpip.MonotonicTime
// shutdownFlags represent the current shutdown state of the endpoint.
shutdownFlags tcpip.ShutdownFlags
@@ -626,7 +638,7 @@ type endpoint struct {
// lastOutOfWindowAckTime is the time at which the an ACK was sent in response
// to an out of window segment being received by this endpoint.
- lastOutOfWindowAckTime time.Time `state:".(unixTime)"`
+ lastOutOfWindowAckTime tcpip.MonotonicTime
}
// UniqueID implements stack.TransportEndpoint.UniqueID.
@@ -747,7 +759,7 @@ func (e *endpoint) ResumeWork() {
//
// Precondition: e.mu must be held to call this method.
func (e *endpoint) setEndpointState(state EndpointState) {
- oldstate := EndpointState(atomic.LoadUint32((*uint32)(&e.state)))
+ oldstate := EndpointState(atomic.LoadUint32(&e.state))
switch state {
case StateEstablished:
e.stack.Stats().TCP.CurrentEstablished.Increment()
@@ -764,18 +776,18 @@ func (e *endpoint) setEndpointState(state EndpointState) {
e.stack.Stats().TCP.CurrentEstablished.Decrement()
}
}
- atomic.StoreUint32((*uint32)(&e.state), uint32(state))
+ atomic.StoreUint32(&e.state, uint32(state))
}
// EndpointState returns the current state of the endpoint.
func (e *endpoint) EndpointState() EndpointState {
- return EndpointState(atomic.LoadUint32((*uint32)(&e.state)))
+ return EndpointState(atomic.LoadUint32(&e.state))
}
// setRecentTimestamp sets the recentTS field to the provided value.
func (e *endpoint) setRecentTimestamp(recentTS uint32) {
e.RecentTS = recentTS
- e.recentTSTime = time.Now()
+ e.recentTSTime = e.stack.Clock().NowMonotonic()
}
// recentTimestamp returns the value of the recentTS field.
@@ -806,11 +818,11 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
},
sndQueueInfo: sndQueueInfo{
TCPSndBufState: stack.TCPSndBufState{
- SndMTU: int(math.MaxInt32),
+ SndMTU: math.MaxInt32,
},
},
waiterQueue: waiterQueue,
- state: StateInitial,
+ state: uint32(StateInitial),
keepalive: keepalive{
// Linux defaults.
idle: 2 * time.Hour,
@@ -870,9 +882,9 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
}
e.segmentQueue.ep = e
- e.TSOffset = timeStampOffset()
+ e.TSOffset = timeStampOffset(e.stack.Rand())
e.acceptCond = sync.NewCond(&e.acceptMu)
- e.keepalive.timer.init(&e.keepalive.waker)
+ e.keepalive.timer.init(e.stack.Clock(), &e.keepalive.waker)
return e
}
@@ -1189,7 +1201,7 @@ func (e *endpoint) ModerateRecvBuf(copied int) {
e.rcvQueueInfo.rcvQueueMu.Unlock()
return
}
- now := time.Now()
+ now := e.stack.Clock().NowMonotonic()
if rtt := e.rcvQueueInfo.RcvAutoParams.RTT; rtt == 0 || now.Sub(e.rcvQueueInfo.RcvAutoParams.MeasureTime) < rtt {
e.rcvQueueInfo.RcvAutoParams.CopiedBytes += copied
e.rcvQueueInfo.rcvQueueMu.Unlock()
@@ -1544,7 +1556,7 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, tcp
}
// Add data to the send queue.
- s := newOutgoingSegment(e.TransportEndpointInfo.ID, v)
+ s := newOutgoingSegment(e.TransportEndpointInfo.ID, e.stack.Clock(), v)
e.sndQueueInfo.SndBufUsed += len(v)
e.sndQueueInfo.SndBufInQueue += seqnum.Size(len(v))
e.sndQueueInfo.sndQueue.PushBack(s)
@@ -1956,6 +1968,11 @@ func (e *endpoint) GetSockOptInt(opt tcpip.SockOptInt) (int, tcpip.Error) {
func (e *endpoint) getTCPInfo() tcpip.TCPInfoOption {
info := tcpip.TCPInfoOption{}
e.LockUser()
+ if state := e.EndpointState(); state.internal() {
+ info.State = tcpip.EndpointState(StateClose)
+ } else {
+ info.State = tcpip.EndpointState(state)
+ }
snd := e.snd
if snd != nil {
// We do not calculate RTT before sending the data packets. If
@@ -2198,7 +2215,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
BindToDevice: bindToDevice,
Dest: addr,
}
- if _, err := e.stack.ReservePort(portRes, nil /* testPort */); err != nil {
+ if _, err := e.stack.ReservePort(e.stack.Rand(), portRes, nil /* testPort */); err != nil {
if _, ok := err.(*tcpip.ErrPortInUse); !ok || !reuse {
return false, nil
}
@@ -2224,7 +2241,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
// If the endpoint is not in TIME-WAIT or if it is in TIME-WAIT but
// less than 1 second has elapsed since its recentTS was updated then
// we cannot reuse the port.
- if tcpEP.EndpointState() != StateTimeWait || time.Since(tcpEP.recentTSTime) < 1*time.Second {
+ if tcpEP.EndpointState() != StateTimeWait || e.stack.Clock().NowMonotonic().Sub(tcpEP.recentTSTime) < 1*time.Second {
tcpEP.UnlockUser()
return false, nil
}
@@ -2245,7 +2262,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) tcp
BindToDevice: bindToDevice,
Dest: addr,
}
- if _, err := e.stack.ReservePort(portRes, nil /* testPort */); err != nil {
+ if _, err := e.stack.ReservePort(e.stack.Rand(), portRes, nil /* testPort */); err != nil {
return false, nil
}
}
@@ -2370,7 +2387,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error {
}
// Queue fin segment.
- s := newOutgoingSegment(e.TransportEndpointInfo.ID, nil)
+ s := newOutgoingSegment(e.TransportEndpointInfo.ID, e.stack.Clock(), nil)
e.sndQueueInfo.sndQueue.PushBack(s)
e.sndQueueInfo.SndBufInQueue++
// Mark endpoint as closed.
@@ -2581,7 +2598,7 @@ func (e *endpoint) bindLocked(addr tcpip.FullAddress) (err tcpip.Error) {
BindToDevice: bindToDevice,
Dest: tcpip.FullAddress{},
}
- port, err := e.stack.ReservePort(portRes, func(p uint16) (bool, tcpip.Error) {
+ port, err := e.stack.ReservePort(e.stack.Rand(), portRes, func(p uint16) (bool, tcpip.Error) {
id := e.TransportEndpointInfo.ID
id.LocalPort = p
// CheckRegisterTransportEndpoint should only return an error if there is a
@@ -2731,7 +2748,7 @@ func (e *endpoint) updateSndBufferUsage(v int) {
// We only notify when there is half the sendBufferSize available after
// a full buffer event occurs. This ensures that we don't wake up
// writers to queue just 1-2 segments and go back to sleep.
- notify = notify && e.sndQueueInfo.SndBufUsed < int(sendBufferSize)>>1
+ notify = notify && e.sndQueueInfo.SndBufUsed < sendBufferSize>>1
e.sndQueueInfo.sndQueueMu.Unlock()
if notify {
@@ -2848,23 +2865,20 @@ func (e *endpoint) maybeEnableTimestamp(synOpts *header.TCPSynOptions) {
// timestamp returns the timestamp value to be used in the TSVal field of the
// timestamp option for outgoing TCP segments for a given endpoint.
func (e *endpoint) timestamp() uint32 {
- return tcpTimeStamp(time.Now(), e.TSOffset)
+ return tcpTimeStamp(e.stack.Clock().NowMonotonic(), e.TSOffset)
}
// tcpTimeStamp returns a timestamp offset by the provided offset. This is
// not inlined above as it's used when SYN cookies are in use and endpoint
// is not created at the time when the SYN cookie is sent.
-func tcpTimeStamp(curTime time.Time, offset uint32) uint32 {
- return uint32(curTime.Unix()*1000+int64(curTime.Nanosecond()/1e6)) + offset
+func tcpTimeStamp(curTime tcpip.MonotonicTime, offset uint32) uint32 {
+ d := curTime.Sub(tcpip.MonotonicTime{})
+ return uint32(d.Milliseconds()) + offset
}
// timeStampOffset returns a randomized timestamp offset to be used when sending
// timestamp values in a timestamp option for a TCP segment.
-func timeStampOffset() uint32 {
- b := make([]byte, 4)
- if _, err := rand.Read(b); err != nil {
- panic(err)
- }
+func timeStampOffset(rng *rand.Rand) uint32 {
// Initialize a random tsOffset that will be added to the recentTS
// everytime the timestamp is sent when the Timestamp option is enabled.
//
@@ -2874,7 +2888,7 @@ func timeStampOffset() uint32 {
// NOTE: This is not completely to spec as normally this should be
// initialized in a manner analogous to how sequence numbers are
// randomized per connection basis. But for now this is sufficient.
- return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24
+ return rng.Uint32()
}
// maybeEnableSACKPermitted marks the SACKPermitted option enabled for this endpoint
@@ -2909,7 +2923,7 @@ func (e *endpoint) completeStateLocked() stack.TCPEndpointState {
s := stack.TCPEndpointState{
TCPEndpointStateInner: e.TCPEndpointStateInner,
ID: stack.TCPEndpointID(e.TransportEndpointInfo.ID),
- SegTime: time.Now(),
+ SegTime: e.stack.Clock().NowMonotonic(),
Receiver: e.rcv.TCPReceiverState,
Sender: e.snd.TCPSenderState,
}
@@ -2937,7 +2951,7 @@ func (e *endpoint) completeStateLocked() stack.TCPEndpointState {
if cubic, ok := e.snd.cc.(*cubicState); ok {
s.Sender.Cubic = cubic.TCPCubicState
- s.Sender.Cubic.TimeSinceLastCongestion = time.Since(s.Sender.Cubic.T)
+ s.Sender.Cubic.TimeSinceLastCongestion = e.stack.Clock().NowMonotonic().Sub(s.Sender.Cubic.T)
}
s.Sender.RACKState = e.snd.rc.TCPRACKState
@@ -3029,14 +3043,16 @@ func GetTCPSendBufferLimits(s tcpip.StackHandler) tcpip.SendBufferSizeOption {
// allowOutOfWindowAck returns true if an out-of-window ACK can be sent now.
func (e *endpoint) allowOutOfWindowAck() bool {
- var limit stack.TCPInvalidRateLimitOption
- if err := e.stack.Option(&limit); err != nil {
- panic(fmt.Sprintf("e.stack.Option(%+v) failed with error: %s", limit, err))
- }
+ now := e.stack.Clock().NowMonotonic()
- now := time.Now()
- if now.Sub(e.lastOutOfWindowAckTime) < time.Duration(limit) {
- return false
+ if e.lastOutOfWindowAckTime != (tcpip.MonotonicTime{}) {
+ var limit stack.TCPInvalidRateLimitOption
+ if err := e.stack.Option(&limit); err != nil {
+ panic(fmt.Sprintf("e.stack.Option(%+v) failed with error: %s", limit, err))
+ }
+ if now.Sub(e.lastOutOfWindowAckTime) < time.Duration(limit) {
+ return false
+ }
}
e.lastOutOfWindowAckTime = now
diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go
index 6e9777fe4..952ccacdd 100644
--- a/pkg/tcpip/transport/tcp/endpoint_state.go
+++ b/pkg/tcpip/transport/tcp/endpoint_state.go
@@ -154,20 +154,25 @@ func (e *endpoint) afterLoad() {
e.origEndpointState = e.state
// Restore the endpoint to InitialState as it will be moved to
// its origEndpointState during Resume.
- e.state = StateInitial
+ e.state = uint32(StateInitial)
// Condition variables and mutexs are not S/R'ed so reinitialize
// acceptCond with e.acceptMu.
e.acceptCond = sync.NewCond(&e.acceptMu)
- e.keepalive.timer.init(&e.keepalive.waker)
stack.StackFromEnv.RegisterRestoredEndpoint(e)
}
// Resume implements tcpip.ResumableEndpoint.Resume.
func (e *endpoint) Resume(s *stack.Stack) {
+ e.keepalive.timer.init(s.Clock(), &e.keepalive.waker)
+ if snd := e.snd; snd != nil {
+ snd.resendTimer.init(s.Clock(), &snd.resendWaker)
+ snd.reorderTimer.init(s.Clock(), &snd.reorderWaker)
+ snd.probeTimer.init(s.Clock(), &snd.probeWaker)
+ }
e.stack = s
e.ops.InitHandler(e, e.stack, GetTCPSendBufferLimits, GetTCPReceiveBufferLimits)
e.segmentQueue.thaw()
- epState := e.origEndpointState
+ epState := EndpointState(e.origEndpointState)
switch epState {
case StateInitial, StateBound, StateListen, StateConnecting, StateEstablished:
var ss tcpip.TCPSendBufferSizeRangeOption
@@ -281,32 +286,12 @@ func (e *endpoint) Resume(s *stack.Stack) {
}()
case epState == StateClose:
e.isPortReserved = false
- e.state = StateClose
+ e.state = uint32(StateClose)
e.stack.CompleteTransportEndpointCleanup(e)
tcpip.DeleteDanglingEndpoint(e)
case epState == StateError:
- e.state = StateError
+ e.state = uint32(StateError)
e.stack.CompleteTransportEndpointCleanup(e)
tcpip.DeleteDanglingEndpoint(e)
}
}
-
-// saveRecentTSTime is invoked by stateify.
-func (e *endpoint) saveRecentTSTime() unixTime {
- return unixTime{e.recentTSTime.Unix(), e.recentTSTime.UnixNano()}
-}
-
-// loadRecentTSTime is invoked by stateify.
-func (e *endpoint) loadRecentTSTime(unix unixTime) {
- e.recentTSTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveLastOutOfWindowAckTime is invoked by stateify.
-func (e *endpoint) saveLastOutOfWindowAckTime() unixTime {
- return unixTime{e.lastOutOfWindowAckTime.Unix(), e.lastOutOfWindowAckTime.UnixNano()}
-}
-
-// loadLastOutOfWindowAckTime is invoked by stateify.
-func (e *endpoint) loadLastOutOfWindowAckTime(unix unixTime) {
- e.lastOutOfWindowAckTime = time.Unix(unix.second, unix.nano)
-}
diff --git a/pkg/tcpip/transport/tcp/forwarder.go b/pkg/tcpip/transport/tcp/forwarder.go
index 2f9fe7ee0..65c86823a 100644
--- a/pkg/tcpip/transport/tcp/forwarder.go
+++ b/pkg/tcpip/transport/tcp/forwarder.go
@@ -65,7 +65,7 @@ func NewForwarder(s *stack.Stack, rcvWnd, maxInFlight int, handler func(*Forward
// This function is expected to be passed as an argument to the
// stack.SetTransportProtocolHandler function.
func (f *Forwarder) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketBuffer) bool {
- s := newIncomingSegment(id, pkt)
+ s := newIncomingSegment(id, f.stack.Clock(), pkt)
defer s.decRef()
// We only care about well-formed SYN packets.
diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go
index a3d1aa1a3..2fc282e73 100644
--- a/pkg/tcpip/transport/tcp/protocol.go
+++ b/pkg/tcpip/transport/tcp/protocol.go
@@ -131,7 +131,7 @@ func (*protocol) ParsePorts(v buffer.View) (src, dst uint16, err tcpip.Error) {
// goroutine which is responsible for dequeuing and doing full TCP dispatch of
// the packet.
func (p *protocol) QueuePacket(ep stack.TransportEndpoint, id stack.TransportEndpointID, pkt *stack.PacketBuffer) {
- p.dispatcher.queuePacket(ep, id, pkt)
+ p.dispatcher.queuePacket(ep, id, p.stack.Clock(), pkt)
}
// HandleUnknownDestinationPacket handles packets targeted at this protocol but
@@ -142,14 +142,14 @@ func (p *protocol) QueuePacket(ep stack.TransportEndpoint, id stack.TransportEnd
// particular, SYNs addressed to a non-existent connection are rejected by this
// means."
func (p *protocol) HandleUnknownDestinationPacket(id stack.TransportEndpointID, pkt *stack.PacketBuffer) stack.UnknownDestinationPacketDisposition {
- s := newIncomingSegment(id, pkt)
+ s := newIncomingSegment(id, p.stack.Clock(), pkt)
defer s.decRef()
if !s.parse(pkt.RXTransportChecksumValidated) || !s.csumValid {
return stack.UnknownDestinationPacketMalformed
}
- if !s.flagIsSet(header.TCPFlagRst) {
+ if !s.flags.Contains(header.TCPFlagRst) {
replyWithReset(p.stack, s, stack.DefaultTOS, 0)
}
@@ -181,7 +181,7 @@ func replyWithReset(st *stack.Stack, s *segment, tos, ttl uint8) tcpip.Error {
// reset has sequence number zero and the ACK field is set to the sum
// of the sequence number and segment length of the incoming segment.
// The connection remains in the CLOSED state.
- if s.flagIsSet(header.TCPFlagAck) {
+ if s.flags.Contains(header.TCPFlagAck) {
seq = s.ackNumber
} else {
flags |= header.TCPFlagAck
@@ -401,7 +401,7 @@ func (p *protocol) Option(option tcpip.GettableTransportProtocolOption) tcpip.Er
case *tcpip.TCPTimeWaitReuseOption:
p.mu.RLock()
- *v = tcpip.TCPTimeWaitReuseOption(p.timeWaitReuse)
+ *v = p.timeWaitReuse
p.mu.RUnlock()
return nil
@@ -481,6 +481,6 @@ func NewProtocol(s *stack.Stack) stack.TransportProtocol {
// TODO(gvisor.dev/issue/5243): Set recovery to tcpip.TCPRACKLossDetection.
recovery: 0,
}
- p.dispatcher.init(runtime.GOMAXPROCS(0))
+ p.dispatcher.init(s.Rand(), runtime.GOMAXPROCS(0))
return &p
}
diff --git a/pkg/tcpip/transport/tcp/rack.go b/pkg/tcpip/transport/tcp/rack.go
index 9e332dcf7..0da4eafaa 100644
--- a/pkg/tcpip/transport/tcp/rack.go
+++ b/pkg/tcpip/transport/tcp/rack.go
@@ -79,7 +79,7 @@ func (rc *rackControl) init(snd *sender, iss seqnum.Value) {
// update will update the RACK related fields when an ACK has been received.
// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-09#section-6.2
func (rc *rackControl) update(seg *segment, ackSeg *segment) {
- rtt := time.Now().Sub(seg.xmitTime)
+ rtt := rc.snd.ep.stack.Clock().NowMonotonic().Sub(seg.xmitTime)
tsOffset := rc.snd.ep.TSOffset
// If the ACK is for a retransmitted packet, do not update if it is a
@@ -115,7 +115,7 @@ func (rc *rackControl) update(seg *segment, ackSeg *segment) {
// ending sequence number of the packet which has been acknowledged
// most recently.
endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size()))
- if rc.XmitTime.Before(seg.xmitTime) || (seg.xmitTime.Equal(rc.XmitTime) && rc.EndSequence.LessThan(endSeq)) {
+ if rc.XmitTime.Before(seg.xmitTime) || (seg.xmitTime == rc.XmitTime && rc.EndSequence.LessThan(endSeq)) {
rc.XmitTime = seg.xmitTime
rc.EndSequence = endSeq
}
@@ -174,7 +174,7 @@ func (s *sender) schedulePTO() {
}
s.rtt.Unlock()
- now := time.Now()
+ now := s.ep.stack.Clock().NowMonotonic()
if s.resendTimer.enabled() {
if now.Add(pto).After(s.resendTimer.target) {
pto = s.resendTimer.target.Sub(now)
@@ -279,7 +279,7 @@ func (s *sender) detectTLPRecovery(ack seqnum.Value, rcvdSeg *segment) {
// been observed RACK uses reo_wnd of zero during loss recovery, in order to
// retransmit quickly, or when the number of DUPACKs exceeds the classic
// DUPACKthreshold.
-func (rc *rackControl) updateRACKReorderWindow(ackSeg *segment) {
+func (rc *rackControl) updateRACKReorderWindow() {
dsackSeen := rc.DSACKSeen
snd := rc.snd
@@ -352,7 +352,7 @@ func (rc *rackControl) exitRecovery() {
// detectLoss marks the segment as lost if the reordering window has elapsed
// and the ACK is not received. It will also arm the reorder timer.
// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2 Step 5.
-func (rc *rackControl) detectLoss(rcvTime time.Time) int {
+func (rc *rackControl) detectLoss(rcvTime tcpip.MonotonicTime) int {
var timeout time.Duration
numLost := 0
for seg := rc.snd.writeList.Front(); seg != nil && seg.xmitCount != 0; seg = seg.Next() {
@@ -366,7 +366,7 @@ func (rc *rackControl) detectLoss(rcvTime time.Time) int {
}
endSeq := seg.sequenceNumber.Add(seqnum.Size(seg.data.Size()))
- if seg.xmitTime.Before(rc.XmitTime) || (seg.xmitTime.Equal(rc.XmitTime) && rc.EndSequence.LessThan(endSeq)) {
+ if seg.xmitTime.Before(rc.XmitTime) || (seg.xmitTime == rc.XmitTime && rc.EndSequence.LessThan(endSeq)) {
timeRemaining := seg.xmitTime.Sub(rcvTime) + rc.RTT + rc.ReoWnd
if timeRemaining <= 0 {
seg.lost = true
@@ -392,7 +392,7 @@ func (rc *rackControl) reorderTimerExpired() tcpip.Error {
return nil
}
- numLost := rc.detectLoss(time.Now())
+ numLost := rc.detectLoss(rc.snd.ep.stack.Clock().NowMonotonic())
if numLost == 0 {
return nil
}
diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go
index 133371455..661ca604a 100644
--- a/pkg/tcpip/transport/tcp/rcv.go
+++ b/pkg/tcpip/transport/tcp/rcv.go
@@ -17,7 +17,6 @@ package tcp
import (
"container/heap"
"math"
- "time"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/header"
@@ -50,7 +49,7 @@ type receiver struct {
pendingRcvdSegments segmentHeap
// Time when the last ack was received.
- lastRcvdAckTime time.Time `state:".(unixTime)"`
+ lastRcvdAckTime tcpip.MonotonicTime
}
func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8) *receiver {
@@ -63,7 +62,7 @@ func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale
},
rcvWnd: rcvWnd,
rcvWUP: irs + 1,
- lastRcvdAckTime: time.Now(),
+ lastRcvdAckTime: ep.stack.Clock().NowMonotonic(),
}
}
@@ -137,9 +136,9 @@ func (r *receiver) getSendParams() (RcvNxt seqnum.Value, rcvWnd seqnum.Size) {
// rcvWUP RcvNxt RcvAcc new RcvAcc
// <=====curWnd ===>
// <========= newWnd > curWnd ========= >
- if r.RcvNxt.Add(seqnum.Size(curWnd)).LessThan(r.RcvNxt.Add(seqnum.Size(newWnd))) && toGrow {
+ if r.RcvNxt.Add(curWnd).LessThan(r.RcvNxt.Add(newWnd)) && toGrow {
// If the new window moves the right edge, then update RcvAcc.
- r.RcvAcc = r.RcvNxt.Add(seqnum.Size(newWnd))
+ r.RcvAcc = r.RcvNxt.Add(newWnd)
} else {
if newWnd == 0 {
// newWnd is zero but we can't advertise a zero as it would cause window
@@ -245,7 +244,7 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum
TrimSACKBlockList(&r.ep.sack, r.RcvNxt)
// Handle FIN or FIN-ACK.
- if s.flagIsSet(header.TCPFlagFin) {
+ if s.flags.Contains(header.TCPFlagFin) {
r.RcvNxt++
// Send ACK immediately.
@@ -261,7 +260,7 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum
case StateEstablished:
r.ep.setEndpointState(StateCloseWait)
case StateFinWait1:
- if s.flagIsSet(header.TCPFlagAck) && s.ackNumber == r.ep.snd.SndNxt {
+ if s.flags.Contains(header.TCPFlagAck) && s.ackNumber == r.ep.snd.SndNxt {
// FIN-ACK, transition to TIME-WAIT.
r.ep.setEndpointState(StateTimeWait)
} else {
@@ -296,7 +295,7 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum
// Handle ACK (not FIN-ACK, which we handled above) during one of the
// shutdown states.
- if s.flagIsSet(header.TCPFlagAck) && s.ackNumber == r.ep.snd.SndNxt {
+ if s.flags.Contains(header.TCPFlagAck) && s.ackNumber == r.ep.snd.SndNxt {
switch r.ep.EndpointState() {
case StateFinWait1:
r.ep.setEndpointState(StateFinWait2)
@@ -325,9 +324,9 @@ func (r *receiver) updateRTT() {
// is first acknowledged and the receipt of data that is at least one
// window beyond the sequence number that was acknowledged.
r.ep.rcvQueueInfo.rcvQueueMu.Lock()
- if r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime.IsZero() {
+ if r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime == (tcpip.MonotonicTime{}) {
// New measurement.
- r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = time.Now()
+ r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = r.ep.stack.Clock().NowMonotonic()
r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureSeqNumber = r.RcvNxt.Add(r.rcvWnd)
r.ep.rcvQueueInfo.rcvQueueMu.Unlock()
return
@@ -336,14 +335,14 @@ func (r *receiver) updateRTT() {
r.ep.rcvQueueInfo.rcvQueueMu.Unlock()
return
}
- rtt := time.Since(r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime)
+ rtt := r.ep.stack.Clock().NowMonotonic().Sub(r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime)
// We only store the minimum observed RTT here as this is only used in
// absence of a SRTT available from either timestamps or a sender
// measurement of RTT.
if r.ep.rcvQueueInfo.RcvAutoParams.RTT == 0 || rtt < r.ep.rcvQueueInfo.RcvAutoParams.RTT {
r.ep.rcvQueueInfo.RcvAutoParams.RTT = rtt
}
- r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = time.Now()
+ r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureTime = r.ep.stack.Clock().NowMonotonic()
r.ep.rcvQueueInfo.RcvAutoParams.RTTMeasureSeqNumber = r.RcvNxt.Add(r.rcvWnd)
r.ep.rcvQueueInfo.rcvQueueMu.Unlock()
}
@@ -424,7 +423,7 @@ func (r *receiver) handleRcvdSegmentClosing(s *segment, state EndpointState, clo
// while the FIN is considered to occur after
// the last actual data octet in a segment in
// which it occurs.
- if closed && (!s.flagIsSet(header.TCPFlagFin) || s.sequenceNumber.Add(s.logicalLen()) != r.RcvNxt+1) {
+ if closed && (!s.flags.Contains(header.TCPFlagFin) || s.sequenceNumber.Add(s.logicalLen()) != r.RcvNxt+1) {
return true, &tcpip.ErrConnectionAborted{}
}
}
@@ -467,11 +466,11 @@ func (r *receiver) handleRcvdSegment(s *segment) (drop bool, err tcpip.Error) {
}
// Store the time of the last ack.
- r.lastRcvdAckTime = time.Now()
+ r.lastRcvdAckTime = r.ep.stack.Clock().NowMonotonic()
// Defer segment processing if it can't be consumed now.
if !r.consumeSegment(s, segSeq, segLen) {
- if segLen > 0 || s.flagIsSet(header.TCPFlagFin) {
+ if segLen > 0 || s.flags.Contains(header.TCPFlagFin) {
// We only store the segment if it's within our buffer size limit.
//
// Only use 75% of the receive buffer queue for out-of-order
@@ -539,7 +538,7 @@ func (r *receiver) handleTimeWaitSegment(s *segment) (resetTimeWait bool, newSyn
//
// As we do not yet support PAWS, we are being conservative in ignoring
// RSTs by default.
- if s.flagIsSet(header.TCPFlagRst) {
+ if s.flags.Contains(header.TCPFlagRst) {
return false, false
}
@@ -559,13 +558,13 @@ func (r *receiver) handleTimeWaitSegment(s *segment) (resetTimeWait bool, newSyn
// (2) returns to TIME-WAIT state if the SYN turns out
// to be an old duplicate".
- if s.flagIsSet(header.TCPFlagSyn) && r.RcvNxt.LessThan(segSeq) {
+ if s.flags.Contains(header.TCPFlagSyn) && r.RcvNxt.LessThan(segSeq) {
return false, true
}
// Drop the segment if it does not contain an ACK.
- if !s.flagIsSet(header.TCPFlagAck) {
+ if !s.flags.Contains(header.TCPFlagAck) {
return false, false
}
@@ -574,7 +573,7 @@ func (r *receiver) handleTimeWaitSegment(s *segment) (resetTimeWait bool, newSyn
r.ep.updateRecentTimestamp(s.parsedOptions.TSVal, r.ep.snd.MaxSentAck, segSeq)
}
- if segSeq.Add(1) == r.RcvNxt && s.flagIsSet(header.TCPFlagFin) {
+ if segSeq.Add(1) == r.RcvNxt && s.flags.Contains(header.TCPFlagFin) {
// If it's a FIN-ACK then resetTimeWait and send an ACK, as it
// indicates our final ACK could have been lost.
r.ep.snd.sendAck()
diff --git a/pkg/tcpip/transport/tcp/rcv_state.go b/pkg/tcpip/transport/tcp/rcv_state.go
deleted file mode 100644
index 2bf21a2e7..000000000
--- a/pkg/tcpip/transport/tcp/rcv_state.go
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2019 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package tcp
-
-import (
- "time"
-)
-
-// saveLastRcvdAckTime is invoked by stateify.
-func (r *receiver) saveLastRcvdAckTime() unixTime {
- return unixTime{r.lastRcvdAckTime.Unix(), r.lastRcvdAckTime.UnixNano()}
-}
-
-// loadLastRcvdAckTime is invoked by stateify.
-func (r *receiver) loadLastRcvdAckTime(unix unixTime) {
- r.lastRcvdAckTime = time.Unix(unix.second, unix.nano)
-}
diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go
index 7e5ba6ef7..ca78c96f2 100644
--- a/pkg/tcpip/transport/tcp/segment.go
+++ b/pkg/tcpip/transport/tcp/segment.go
@@ -17,7 +17,6 @@ package tcp
import (
"fmt"
"sync/atomic"
- "time"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
@@ -73,9 +72,9 @@ type segment struct {
parsedOptions header.TCPOptions
options []byte `state:".([]byte)"`
hasNewSACKInfo bool
- rcvdTime time.Time `state:".(unixTime)"`
+ rcvdTime tcpip.MonotonicTime
// xmitTime is the last transmit time of this segment.
- xmitTime time.Time `state:".(unixTime)"`
+ xmitTime tcpip.MonotonicTime
xmitCount uint32
// acked indicates if the segment has already been SACKed.
@@ -88,7 +87,7 @@ type segment struct {
lost bool
}
-func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) *segment {
+func newIncomingSegment(id stack.TransportEndpointID, clock tcpip.Clock, pkt *stack.PacketBuffer) *segment {
netHdr := pkt.Network()
s := &segment{
refCnt: 1,
@@ -100,17 +99,17 @@ func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) *
}
s.data = pkt.Data().ExtractVV().Clone(s.views[:])
s.hdr = header.TCP(pkt.TransportHeader().View())
- s.rcvdTime = time.Now()
+ s.rcvdTime = clock.NowMonotonic()
s.dataMemSize = s.data.Size()
return s
}
-func newOutgoingSegment(id stack.TransportEndpointID, v buffer.View) *segment {
+func newOutgoingSegment(id stack.TransportEndpointID, clock tcpip.Clock, v buffer.View) *segment {
s := &segment{
refCnt: 1,
id: id,
}
- s.rcvdTime = time.Now()
+ s.rcvdTime = clock.NowMonotonic()
if len(v) != 0 {
s.views[0] = v
s.data = buffer.NewVectorisedView(len(v), s.views[:1])
@@ -149,16 +148,6 @@ func (s *segment) merge(oth *segment) {
oth.dataMemSize = oth.data.Size()
}
-// flagIsSet checks if at least one flag in flags is set in s.flags.
-func (s *segment) flagIsSet(flags header.TCPFlags) bool {
- return s.flags&flags != 0
-}
-
-// flagsAreSet checks if all flags in flags are set in s.flags.
-func (s *segment) flagsAreSet(flags header.TCPFlags) bool {
- return s.flags&flags == flags
-}
-
// setOwner sets the owning endpoint for this segment. Its required
// to be called to ensure memory accounting for receive/send buffer
// queues is done properly.
@@ -198,10 +187,10 @@ func (s *segment) incRef() {
// as the data length plus one for each of the SYN and FIN bits set.
func (s *segment) logicalLen() seqnum.Size {
l := seqnum.Size(s.data.Size())
- if s.flagIsSet(header.TCPFlagSyn) {
+ if s.flags.Contains(header.TCPFlagSyn) {
l++
}
- if s.flagIsSet(header.TCPFlagFin) {
+ if s.flags.Contains(header.TCPFlagFin) {
l++
}
return l
@@ -243,7 +232,7 @@ func (s *segment) parse(skipChecksumValidation bool) bool {
return false
}
- s.options = []byte(s.hdr[header.TCPMinimumSize:])
+ s.options = s.hdr[header.TCPMinimumSize:]
s.parsedOptions = header.ParseTCPOptions(s.options)
if skipChecksumValidation {
s.csumValid = true
@@ -262,5 +251,5 @@ func (s *segment) parse(skipChecksumValidation bool) bool {
// sackBlock returns a header.SACKBlock that represents this segment.
func (s *segment) sackBlock() header.SACKBlock {
- return header.SACKBlock{s.sequenceNumber, s.sequenceNumber.Add(s.logicalLen())}
+ return header.SACKBlock{Start: s.sequenceNumber, End: s.sequenceNumber.Add(s.logicalLen())}
}
diff --git a/pkg/tcpip/transport/tcp/segment_state.go b/pkg/tcpip/transport/tcp/segment_state.go
index 7422d8c02..dcfa80f95 100644
--- a/pkg/tcpip/transport/tcp/segment_state.go
+++ b/pkg/tcpip/transport/tcp/segment_state.go
@@ -15,8 +15,6 @@
package tcp
import (
- "time"
-
"gvisor.dev/gvisor/pkg/tcpip/buffer"
)
@@ -55,23 +53,3 @@ func (s *segment) loadOptions(options []byte) {
// allocated so there is no cost here.
s.options = options
}
-
-// saveRcvdTime is invoked by stateify.
-func (s *segment) saveRcvdTime() unixTime {
- return unixTime{s.rcvdTime.Unix(), s.rcvdTime.UnixNano()}
-}
-
-// loadRcvdTime is invoked by stateify.
-func (s *segment) loadRcvdTime(unix unixTime) {
- s.rcvdTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveXmitTime is invoked by stateify.
-func (s *segment) saveXmitTime() unixTime {
- return unixTime{s.rcvdTime.Unix(), s.rcvdTime.UnixNano()}
-}
-
-// loadXmitTime is invoked by stateify.
-func (s *segment) loadXmitTime(unix unixTime) {
- s.rcvdTime = time.Unix(unix.second, unix.nano)
-}
diff --git a/pkg/tcpip/transport/tcp/segment_test.go b/pkg/tcpip/transport/tcp/segment_test.go
index 486016fc0..2e6ea06f5 100644
--- a/pkg/tcpip/transport/tcp/segment_test.go
+++ b/pkg/tcpip/transport/tcp/segment_test.go
@@ -19,6 +19,7 @@ import (
"github.com/google/go-cmp/cmp"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/faketime"
"gvisor.dev/gvisor/pkg/tcpip/stack"
)
@@ -39,10 +40,11 @@ func checkSegmentSize(t *testing.T, name string, seg *segment, want segmentSizeW
}
func TestSegmentMerge(t *testing.T) {
+ var clock faketime.NullClock
id := stack.TransportEndpointID{}
- seg1 := newOutgoingSegment(id, buffer.NewView(10))
+ seg1 := newOutgoingSegment(id, &clock, buffer.NewView(10))
defer seg1.decRef()
- seg2 := newOutgoingSegment(id, buffer.NewView(20))
+ seg2 := newOutgoingSegment(id, &clock, buffer.NewView(20))
defer seg2.decRef()
checkSegmentSize(t, "seg1", seg1, segmentSizeWants{
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index f43e86677..72d58dcff 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -94,7 +94,7 @@ type sender struct {
// firstRetransmittedSegXmitTime is the original transmit time of
// the first segment that was retransmitted due to RTO expiration.
- firstRetransmittedSegXmitTime time.Time `state:".(unixTime)"`
+ firstRetransmittedSegXmitTime tcpip.MonotonicTime
// zeroWindowProbing is set if the sender is currently probing
// for zero receive window.
@@ -169,7 +169,7 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint
SndUna: iss + 1,
SndNxt: iss + 1,
RTTMeasureSeqNum: iss + 1,
- LastSendTime: time.Now(),
+ LastSendTime: ep.stack.Clock().NowMonotonic(),
MaxPayloadSize: maxPayloadSize,
MaxSentAck: irs + 1,
FastRecovery: stack.TCPFastRecoveryState{
@@ -197,9 +197,9 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint
s.SndWndScale = uint8(sndWndScale)
}
- s.resendTimer.init(&s.resendWaker)
- s.reorderTimer.init(&s.reorderWaker)
- s.probeTimer.init(&s.probeWaker)
+ s.resendTimer.init(s.ep.stack.Clock(), &s.resendWaker)
+ s.reorderTimer.init(s.ep.stack.Clock(), &s.reorderWaker)
+ s.probeTimer.init(s.ep.stack.Clock(), &s.probeWaker)
s.updateMaxPayloadSize(int(ep.route.MTU()), 0)
@@ -441,7 +441,7 @@ func (s *sender) retransmitTimerExpired() bool {
// timeout since the first retransmission.
uto := s.ep.userTimeout
- if s.firstRetransmittedSegXmitTime.IsZero() {
+ if s.firstRetransmittedSegXmitTime == (tcpip.MonotonicTime{}) {
// We store the original xmitTime of the segment that we are
// about to retransmit as the retransmission time. This is
// required as by the time the retransmitTimer has expired the
@@ -450,7 +450,7 @@ func (s *sender) retransmitTimerExpired() bool {
s.firstRetransmittedSegXmitTime = s.writeList.Front().xmitTime
}
- elapsed := time.Since(s.firstRetransmittedSegXmitTime)
+ elapsed := s.ep.stack.Clock().NowMonotonic().Sub(s.firstRetransmittedSegXmitTime)
remaining := s.maxRTO
if uto != 0 {
// Cap to the user specified timeout if one is specified.
@@ -616,7 +616,7 @@ func (s *sender) NextSeg(nextSegHint *segment) (nextSeg, hint *segment, rescueRt
// 'S2' that meets the following 3 criteria for determinig
// loss, the sequence range of one segment of up to SMSS
// octects starting with S2 MUST be returned.
- if !s.ep.scoreboard.IsSACKED(header.SACKBlock{segSeq, segSeq.Add(1)}) {
+ if !s.ep.scoreboard.IsSACKED(header.SACKBlock{Start: segSeq, End: segSeq.Add(1)}) {
// NextSeg():
//
// (1.a) S2 is greater than HighRxt
@@ -866,8 +866,8 @@ func (s *sender) enableZeroWindowProbing() {
// We piggyback the probing on the retransmit timer with the
// current retranmission interval, as we may start probing while
// segment retransmissions.
- if s.firstRetransmittedSegXmitTime.IsZero() {
- s.firstRetransmittedSegXmitTime = time.Now()
+ if s.firstRetransmittedSegXmitTime == (tcpip.MonotonicTime{}) {
+ s.firstRetransmittedSegXmitTime = s.ep.stack.Clock().NowMonotonic()
}
s.resendTimer.enable(s.RTO)
}
@@ -875,7 +875,7 @@ func (s *sender) enableZeroWindowProbing() {
func (s *sender) disableZeroWindowProbing() {
s.zeroWindowProbing = false
s.unackZeroWindowProbes = 0
- s.firstRetransmittedSegXmitTime = time.Time{}
+ s.firstRetransmittedSegXmitTime = tcpip.MonotonicTime{}
s.resendTimer.disable()
}
@@ -925,7 +925,7 @@ func (s *sender) sendData() {
// "A TCP SHOULD set cwnd to no more than RW before beginning
// transmission if the TCP has not sent data in the interval exceeding
// the retrasmission timeout."
- if !s.FastRecovery.Active && s.state != tcpip.RTORecovery && time.Now().Sub(s.LastSendTime) > s.RTO {
+ if !s.FastRecovery.Active && s.state != tcpip.RTORecovery && s.ep.stack.Clock().NowMonotonic().Sub(s.LastSendTime) > s.RTO {
if s.SndCwnd > InitialCwnd {
s.SndCwnd = InitialCwnd
}
@@ -1024,7 +1024,7 @@ func (s *sender) SetPipe() {
if segEnd.LessThan(endSeq) {
endSeq = segEnd
}
- sb := header.SACKBlock{startSeq, endSeq}
+ sb := header.SACKBlock{Start: startSeq, End: endSeq}
// SetPipe():
//
// After initializing pipe to zero, the following steps are
@@ -1132,7 +1132,7 @@ func (s *sender) isDupAck(seg *segment) bool {
// (b) The incoming acknowledgment carries no data.
seg.logicalLen() == 0 &&
// (c) The SYN and FIN bits are both off.
- !seg.flagIsSet(header.TCPFlagFin) && !seg.flagIsSet(header.TCPFlagSyn) &&
+ !seg.flags.Intersects(header.TCPFlagFin|header.TCPFlagSyn) &&
// (d) the ACK number is equal to the greatest acknowledgment received on
// the given connection (TCP.UNA from RFC793).
seg.ackNumber == s.SndUna &&
@@ -1234,7 +1234,7 @@ func checkDSACK(rcvdSeg *segment) bool {
func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
// Check if we can extract an RTT measurement from this ack.
if !rcvdSeg.parsedOptions.TS && s.RTTMeasureSeqNum.LessThan(rcvdSeg.ackNumber) {
- s.updateRTO(time.Now().Sub(s.RTTMeasureTime))
+ s.updateRTO(s.ep.stack.Clock().NowMonotonic().Sub(s.RTTMeasureTime))
s.RTTMeasureSeqNum = s.SndNxt
}
@@ -1444,7 +1444,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
if s.SndUna == s.SndNxt {
s.Outstanding = 0
// Reset firstRetransmittedSegXmitTime to the zero value.
- s.firstRetransmittedSegXmitTime = time.Time{}
+ s.firstRetransmittedSegXmitTime = tcpip.MonotonicTime{}
s.resendTimer.disable()
s.probeTimer.disable()
}
@@ -1455,7 +1455,7 @@ func (s *sender) handleRcvdSegment(rcvdSeg *segment) {
// See: https://tools.ietf.org/html/draft-ietf-tcpm-rack-08#section-7.2
// * Upon receiving an ACK:
// * Step 4: Update RACK reordering window
- s.rc.updateRACKReorderWindow(rcvdSeg)
+ s.rc.updateRACKReorderWindow()
// After the reorder window is calculated, detect any loss by checking
// if the time elapsed after the segments are sent is greater than the
@@ -1502,7 +1502,7 @@ func (s *sender) sendSegment(seg *segment) tcpip.Error {
s.ep.stack.Stats().TCP.SlowStartRetransmits.Increment()
}
}
- seg.xmitTime = time.Now()
+ seg.xmitTime = s.ep.stack.Clock().NowMonotonic()
seg.xmitCount++
seg.lost = false
err := s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber)
@@ -1527,7 +1527,7 @@ func (s *sender) sendSegment(seg *segment) tcpip.Error {
// sendSegmentFromView sends a new segment containing the given payload, flags
// and sequence number.
func (s *sender) sendSegmentFromView(data buffer.VectorisedView, flags header.TCPFlags, seq seqnum.Value) tcpip.Error {
- s.LastSendTime = time.Now()
+ s.LastSendTime = s.ep.stack.Clock().NowMonotonic()
if seq == s.RTTMeasureSeqNum {
s.RTTMeasureTime = s.LastSendTime
}
diff --git a/pkg/tcpip/transport/tcp/snd_state.go b/pkg/tcpip/transport/tcp/snd_state.go
deleted file mode 100644
index 2f805d8ce..000000000
--- a/pkg/tcpip/transport/tcp/snd_state.go
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package tcp
-
-import (
- "time"
-)
-
-// +stateify savable
-type unixTime struct {
- second int64
- nano int64
-}
-
-// afterLoad is invoked by stateify.
-func (s *sender) afterLoad() {
- s.resendTimer.init(&s.resendWaker)
- s.reorderTimer.init(&s.reorderWaker)
- s.probeTimer.init(&s.probeWaker)
-}
-
-// saveFirstRetransmittedSegXmitTime is invoked by stateify.
-func (s *sender) saveFirstRetransmittedSegXmitTime() unixTime {
- return unixTime{s.firstRetransmittedSegXmitTime.Unix(), s.firstRetransmittedSegXmitTime.UnixNano()}
-}
-
-// loadFirstRetransmittedSegXmitTime is invoked by stateify.
-func (s *sender) loadFirstRetransmittedSegXmitTime(unix unixTime) {
- s.firstRetransmittedSegXmitTime = time.Unix(unix.second, unix.nano)
-}
diff --git a/pkg/tcpip/transport/tcp/tcp_rack_test.go b/pkg/tcpip/transport/tcp/tcp_rack_test.go
index c58361bc1..d6cf786a1 100644
--- a/pkg/tcpip/transport/tcp/tcp_rack_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_rack_test.go
@@ -38,7 +38,7 @@ const (
func setStackRACKPermitted(t *testing.T, c *context.Context) {
t.Helper()
- opt := tcpip.TCPRecovery(tcpip.TCPRACKLossDetection)
+ opt := tcpip.TCPRACKLossDetection
if err := c.Stack().SetTransportProtocolOption(header.TCPProtocolNumber, &opt); err != nil {
t.Fatalf("c.s.SetTransportProtocolOption(%d, &%v(%v)): %s", header.TCPProtocolNumber, opt, opt, err)
}
@@ -50,7 +50,7 @@ func TestRACKUpdate(t *testing.T) {
c := context.New(t, uint32(mtu))
defer c.Cleanup()
- var xmitTime time.Time
+ var xmitTime tcpip.MonotonicTime
probeDone := make(chan struct{})
c.Stack().AddTCPProbe(func(state stack.TCPEndpointState) {
// Validate that the endpoint Sender.RACKState is what we expect.
@@ -79,7 +79,7 @@ func TestRACKUpdate(t *testing.T) {
}
// Write the data.
- xmitTime = time.Now()
+ xmitTime = c.Stack().Clock().NowMonotonic()
var r bytes.Reader
r.Reset(data)
if _, err := c.EP.Write(&r, tcpip.WriteOptions{}); err != nil {
diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go
index 9916182e3..e7ede7662 100644
--- a/pkg/tcpip/transport/tcp/tcp_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_test.go
@@ -4259,7 +4259,7 @@ func TestReceivedInvalidSegmentCountIncrement(t *testing.T) {
SrcPort: context.TestPort,
DstPort: c.Port,
Flags: header.TCPFlagAck,
- SeqNum: seqnum.Value(iss),
+ SeqNum: iss,
AckNum: c.IRS.Add(1),
RcvWnd: 30000,
})
@@ -5335,7 +5335,7 @@ func TestKeepalive(t *testing.T) {
checker.IPv4(t, b,
checker.TCP(
checker.DstPort(context.TestPort),
- checker.TCPSeqNum(uint32(next-1)),
+ checker.TCPSeqNum(next-1),
checker.TCPAckNum(uint32(iss)),
checker.TCPFlags(header.TCPFlagAck),
),
@@ -5360,12 +5360,7 @@ func TestKeepalive(t *testing.T) {
})
checker.IPv4(t, c.GetPacket(),
- checker.TCP(
- checker.DstPort(context.TestPort),
- checker.TCPSeqNum(uint32(next)),
- checker.TCPAckNum(uint32(0)),
- checker.TCPFlags(header.TCPFlagRst),
- ),
+ checker.TCP(checker.DstPort(context.TestPort), checker.TCPSeqNum(next), checker.TCPAckNum(uint32(0)), checker.TCPFlags(header.TCPFlagRst)),
)
if got := c.Stack().Stats().TCP.EstablishedTimedout.Value(); got != 1 {
@@ -5507,7 +5502,7 @@ func TestListenBacklogFull(t *testing.T) {
// Now execute send one more SYN. The stack should not respond as the backlog
// is full at this point.
c.SendPacket(nil, &context.Headers{
- SrcPort: context.TestPort + uint16(lastPortOffset),
+ SrcPort: context.TestPort + lastPortOffset,
DstPort: context.StackPort,
Flags: header.TCPFlagSyn,
SeqNum: seqnum.Value(context.TestInitialSequenceNumber),
@@ -5884,7 +5879,7 @@ func TestListenSynRcvdQueueFull(t *testing.T) {
r.Reset(data)
newEP.Write(&r, tcpip.WriteOptions{})
pkt := c.GetPacket()
- tcp = header.TCP(header.IPv4(pkt).Payload())
+ tcp = header.IPv4(pkt).Payload()
if string(tcp.Payload()) != data {
t.Fatalf("unexpected data: got %s, want %s", string(tcp.Payload()), data)
}
@@ -6118,7 +6113,7 @@ func TestSynRcvdBadSeqNumber(t *testing.T) {
}
pkt := c.GetPacket()
- tcpHdr = header.TCP(header.IPv4(pkt).Payload())
+ tcpHdr = header.IPv4(pkt).Payload()
if string(tcpHdr.Payload()) != data {
t.Fatalf("unexpected data: got %s, want %s", string(tcpHdr.Payload()), data)
}
@@ -6375,7 +6370,7 @@ func TestReceiveBufferAutoTuningApplicationLimited(t *testing.T) {
// Allocate a large enough payload for the test.
payloadSize := receiveBufferSize * 2
- b := make([]byte, int(payloadSize))
+ b := make([]byte, payloadSize)
worker := (c.EP).(interface {
StopWork()
@@ -6429,7 +6424,7 @@ func TestReceiveBufferAutoTuningApplicationLimited(t *testing.T) {
// ack, 1 for the non-zero window
p := c.GetPacket()
checker.IPv4(t, p, checker.TCP(
- checker.TCPAckNum(uint32(wantAckNum)),
+ checker.TCPAckNum(wantAckNum),
func(t *testing.T, h header.Transport) {
tcp, ok := h.(header.TCP)
if !ok {
@@ -6484,14 +6479,14 @@ func TestReceiveBufferAutoTuning(t *testing.T) {
c.WindowScale = uint8(tcp.FindWndScale(maxReceiveBufferSize))
rawEP := c.CreateConnectedWithOptions(header.TCPSynOptions{TS: true, WS: 4})
- tsVal := uint32(rawEP.TSVal)
+ tsVal := rawEP.TSVal
rawEP.NextSeqNum--
rawEP.SendPacketWithTS(nil, tsVal)
rawEP.NextSeqNum++
pkt := rawEP.VerifyAndReturnACKWithTS(tsVal)
curRcvWnd := int(header.TCP(header.IPv4(pkt).Payload()).WindowSize()) << c.WindowScale
scaleRcvWnd := func(rcvWnd int) uint16 {
- return uint16(rcvWnd >> uint16(c.WindowScale))
+ return uint16(rcvWnd >> c.WindowScale)
}
// Allocate a large array to send to the endpoint.
b := make([]byte, receiveBufferSize*48)
@@ -6619,19 +6614,16 @@ func TestDelayEnabled(t *testing.T) {
defer c.Cleanup()
checkDelayOption(t, c, false, false) // Delay is disabled by default.
- for _, v := range []struct {
- delayEnabled tcpip.TCPDelayEnabled
- wantDelayOption bool
- }{
- {delayEnabled: false, wantDelayOption: false},
- {delayEnabled: true, wantDelayOption: true},
- } {
- c := context.New(t, defaultMTU)
- defer c.Cleanup()
- if err := c.Stack().SetTransportProtocolOption(tcp.ProtocolNumber, &v.delayEnabled); err != nil {
- t.Fatalf("SetTransportProtocolOption(%d, &%T(%t)): %s", tcp.ProtocolNumber, v.delayEnabled, v.delayEnabled, err)
- }
- checkDelayOption(t, c, v.delayEnabled, v.wantDelayOption)
+ for _, delayEnabled := range []bool{false, true} {
+ t.Run(fmt.Sprintf("delayEnabled=%t", delayEnabled), func(t *testing.T) {
+ c := context.New(t, defaultMTU)
+ defer c.Cleanup()
+ opt := tcpip.TCPDelayEnabled(delayEnabled)
+ if err := c.Stack().SetTransportProtocolOption(tcp.ProtocolNumber, &opt); err != nil {
+ t.Fatalf("SetTransportProtocolOption(%d, &%T(%t)): %s", tcp.ProtocolNumber, opt, delayEnabled, err)
+ }
+ checkDelayOption(t, c, opt, delayEnabled)
+ })
}
}
@@ -7042,7 +7034,7 @@ func TestTCPTimeWaitNewSyn(t *testing.T) {
// Receive the SYN-ACK reply.
b = c.GetPacket()
- tcpHdr = header.TCP(header.IPv4(b).Payload())
+ tcpHdr = header.IPv4(b).Payload()
c.IRS = seqnum.Value(tcpHdr.SequenceNumber())
ackHeaders = &context.Headers{
@@ -7467,7 +7459,7 @@ func TestTCPUserTimeout(t *testing.T) {
checker.IPv4(t, c.GetPacket(),
checker.TCP(
checker.DstPort(context.TestPort),
- checker.TCPSeqNum(uint32(next)),
+ checker.TCPSeqNum(next),
checker.TCPAckNum(uint32(0)),
checker.TCPFlags(header.TCPFlagRst),
),
@@ -7545,7 +7537,7 @@ func TestKeepaliveWithUserTimeout(t *testing.T) {
DstPort: c.Port,
Flags: header.TCPFlagAck,
SeqNum: iss,
- AckNum: seqnum.Value(c.IRS + 1),
+ AckNum: c.IRS + 1,
RcvWnd: 30000,
})
diff --git a/pkg/tcpip/transport/tcp/timer.go b/pkg/tcpip/transport/tcp/timer.go
index 38a335840..5645c772e 100644
--- a/pkg/tcpip/transport/tcp/timer.go
+++ b/pkg/tcpip/transport/tcp/timer.go
@@ -15,21 +15,29 @@
package tcp
import (
+ "math"
"time"
"gvisor.dev/gvisor/pkg/sleep"
+ "gvisor.dev/gvisor/pkg/tcpip"
)
type timerState int
const (
+ // The timer is disabled.
timerStateDisabled timerState = iota
+ // The timer is enabled, but the clock timer may be set to an earlier
+ // expiration time due to a previous orphaned state.
timerStateEnabled
+ // The timer is disabled, but the clock timer is enabled, which means that
+ // it will cause a spurious wakeup unless the timer is enabled before the
+ // clock timer fires.
timerStateOrphaned
)
// timer is a timer implementation that reduces the interactions with the
-// runtime timer infrastructure by letting timers run (and potentially
+// clock timer infrastructure by letting timers run (and potentially
// eventually expire) even if they are stopped. It makes it cheaper to
// disable/reenable timers at the expense of spurious wakes. This is useful for
// cases when the same timer is disabled/reenabled repeatedly with relatively
@@ -39,44 +47,37 @@ const (
// (currently at least 200ms), and get disabled when acks are received, and
// reenabled when new pending segments are sent.
//
-// It is advantageous to avoid interacting with the runtime because it acquires
+// It is advantageous to avoid interacting with the clock because it acquires
// a global mutex and performs O(log n) operations, where n is the global number
// of timers, whenever a timer is enabled or disabled, and may make a syscall.
//
// This struct is thread-compatible.
type timer struct {
- // state is the current state of the timer, it can be one of the
- // following values:
- // disabled - the timer is disabled.
- // orphaned - the timer is disabled, but the runtime timer is
- // enabled, which means that it will evetually cause a
- // spurious wake (unless it gets enabled again before
- // then).
- // enabled - the timer is enabled, but the runtime timer may be set
- // to an earlier expiration time due to a previous
- // orphaned state.
state timerState
+ clock tcpip.Clock
+
// target is the expiration time of the current timer. It is only
// meaningful in the enabled state.
- target time.Time
+ target tcpip.MonotonicTime
- // runtimeTarget is the expiration time of the runtime timer. It is
+ // clockTarget is the expiration time of the clock timer. It is
// meaningful in the enabled and orphaned states.
- runtimeTarget time.Time
+ clockTarget tcpip.MonotonicTime
- // timer is the runtime timer used to wait on.
- timer *time.Timer
+ // timer is the clock timer used to wait on.
+ timer tcpip.Timer
}
// init initializes the timer. Once it expires, it the given waker will be
// asserted.
-func (t *timer) init(w *sleep.Waker) {
+func (t *timer) init(clock tcpip.Clock, w *sleep.Waker) {
t.state = timerStateDisabled
+ t.clock = clock
- // Initialize a runtime timer that will assert the waker, then
+ // Initialize a clock timer that will assert the waker, then
// immediately stop it.
- t.timer = time.AfterFunc(time.Hour, func() {
+ t.timer = t.clock.AfterFunc(math.MaxInt64, func() {
w.Assert()
})
t.timer.Stop()
@@ -106,9 +107,9 @@ func (t *timer) checkExpiration() bool {
// The timer is enabled, but it may have expired early. Check if that's
// the case, and if so, reset the runtime timer to the correct time.
- now := time.Now()
+ now := t.clock.NowMonotonic()
if now.Before(t.target) {
- t.runtimeTarget = t.target
+ t.clockTarget = t.target
t.timer.Reset(t.target.Sub(now))
return false
}
@@ -134,11 +135,11 @@ func (t *timer) enabled() bool {
// enable enables the timer, programming the runtime timer if necessary.
func (t *timer) enable(d time.Duration) {
- t.target = time.Now().Add(d)
+ t.target = t.clock.NowMonotonic().Add(d)
// Check if we need to set the runtime timer.
- if t.state == timerStateDisabled || t.target.Before(t.runtimeTarget) {
- t.runtimeTarget = t.target
+ if t.state == timerStateDisabled || t.target.Before(t.clockTarget) {
+ t.clockTarget = t.target
t.timer.Reset(d)
}
diff --git a/pkg/tcpip/transport/tcp/timer_test.go b/pkg/tcpip/transport/tcp/timer_test.go
index dbd6dff54..479752de7 100644
--- a/pkg/tcpip/transport/tcp/timer_test.go
+++ b/pkg/tcpip/transport/tcp/timer_test.go
@@ -19,6 +19,7 @@ import (
"time"
"gvisor.dev/gvisor/pkg/sleep"
+ "gvisor.dev/gvisor/pkg/tcpip/faketime"
)
func TestCleanup(t *testing.T) {
@@ -27,9 +28,11 @@ func TestCleanup(t *testing.T) {
isAssertedTimeoutSeconds = timerDurationSeconds + 1
)
+ clock := faketime.NewManualClock()
+
tmr := timer{}
w := sleep.Waker{}
- tmr.init(&w)
+ tmr.init(clock, &w)
tmr.enable(timerDurationSeconds * time.Second)
tmr.cleanup()
@@ -39,7 +42,7 @@ func TestCleanup(t *testing.T) {
// The waker should not be asserted.
for i := 0; i < isAssertedTimeoutSeconds; i++ {
- time.Sleep(time.Second)
+ clock.Advance(time.Second)
if w.IsAsserted() {
t.Fatalf("waker asserted unexpectedly")
}
diff --git a/pkg/tcpip/transport/udp/BUILD b/pkg/tcpip/transport/udp/BUILD
index dd5c910ae..cdc344ab7 100644
--- a/pkg/tcpip/transport/udp/BUILD
+++ b/pkg/tcpip/transport/udp/BUILD
@@ -49,6 +49,7 @@ go_test(
"//pkg/tcpip",
"//pkg/tcpip/buffer",
"//pkg/tcpip/checker",
+ "//pkg/tcpip/faketime",
"//pkg/tcpip/header",
"//pkg/tcpip/link/channel",
"//pkg/tcpip/link/loopback",
diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go
index f7dd50d35..b964e446b 100644
--- a/pkg/tcpip/transport/udp/endpoint.go
+++ b/pkg/tcpip/transport/udp/endpoint.go
@@ -17,6 +17,7 @@ package udp
import (
"io"
"sync/atomic"
+ "time"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
@@ -34,19 +35,20 @@ type udpPacket struct {
destinationAddress tcpip.FullAddress
packetInfo tcpip.IPPacketInfo
data buffer.VectorisedView `state:".(buffer.VectorisedView)"`
- timestamp int64
+ receivedAt time.Time `state:".(int64)"`
// tos stores either the receiveTOS or receiveTClass value.
tos uint8
}
// EndpointState represents the state of a UDP endpoint.
-type EndpointState uint32
+type EndpointState tcpip.EndpointState
// Endpoint states. Note that are represented in a netstack-specific manner and
// may not be meaningful externally. Specifically, they need to be translated to
// Linux's representation for these states if presented to userspace.
const (
- StateInitial EndpointState = iota
+ _ EndpointState = iota
+ StateInitial
StateBound
StateConnected
StateClosed
@@ -98,7 +100,7 @@ type endpoint struct {
mu sync.RWMutex `state:"nosave"`
// state must be read/set using the EndpointState()/setEndpointState()
// methods.
- state EndpointState
+ state uint32
route *stack.Route `state:"manual"`
dstPort uint16
ttl uint8
@@ -176,7 +178,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
// Linux defaults to TTL=1.
multicastTTL: 1,
multicastMemberships: make(map[multicastMembership]struct{}),
- state: StateInitial,
+ state: uint32(StateInitial),
uniqueID: s.UniqueID(),
}
e.ops.InitHandler(e, e.stack, tcpip.GetStackSendBufferLimits, tcpip.GetStackReceiveBufferLimits)
@@ -204,12 +206,12 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue
//
// Precondition: e.mu must be held to call this method.
func (e *endpoint) setEndpointState(state EndpointState) {
- atomic.StoreUint32((*uint32)(&e.state), uint32(state))
+ atomic.StoreUint32(&e.state, uint32(state))
}
// EndpointState() returns the current state of the endpoint.
func (e *endpoint) EndpointState() EndpointState {
- return EndpointState(atomic.LoadUint32((*uint32)(&e.state)))
+ return EndpointState(atomic.LoadUint32(&e.state))
}
// UniqueID implements stack.TransportEndpoint.UniqueID.
@@ -290,7 +292,7 @@ func (e *endpoint) Close() {
}
// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
-func (e *endpoint) ModerateRecvBuf(copied int) {}
+func (*endpoint) ModerateRecvBuf(int) {}
// Read implements tcpip.Endpoint.Read.
func (e *endpoint) Read(dst io.Writer, opts tcpip.ReadOptions) (tcpip.ReadResult, tcpip.Error) {
@@ -320,7 +322,7 @@ func (e *endpoint) Read(dst io.Writer, opts tcpip.ReadOptions) (tcpip.ReadResult
// Control Messages
cm := tcpip.ControlMessages{
HasTimestamp: true,
- Timestamp: p.timestamp,
+ Timestamp: p.receivedAt.UnixNano(),
}
if e.ops.GetReceiveTOS() {
cm.HasTOS = true
@@ -801,8 +803,8 @@ func (e *endpoint) GetSockOpt(opt tcpip.GettableSocketOption) tcpip.Error {
case *tcpip.MulticastInterfaceOption:
e.mu.Lock()
*o = tcpip.MulticastInterfaceOption{
- e.multicastNICID,
- e.multicastAddr,
+ NIC: e.multicastNICID,
+ InterfaceAddr: e.multicastAddr,
}
e.mu.Unlock()
@@ -1075,7 +1077,7 @@ func (e *endpoint) registerWithStack(netProtos []tcpip.NetworkProtocolNumber, id
BindToDevice: bindToDevice,
Dest: tcpip.FullAddress{},
}
- port, err := e.stack.ReservePort(portRes, nil /* testPort */)
+ port, err := e.stack.ReservePort(e.stack.Rand(), portRes, nil /* testPort */)
if err != nil {
return id, bindToDevice, err
}
@@ -1301,12 +1303,12 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB
senderAddress: tcpip.FullAddress{
NIC: pkt.NICID,
Addr: id.RemoteAddress,
- Port: header.UDP(hdr).SourcePort(),
+ Port: hdr.SourcePort(),
},
destinationAddress: tcpip.FullAddress{
NIC: pkt.NICID,
Addr: id.LocalAddress,
- Port: header.UDP(hdr).DestinationPort(),
+ Port: hdr.DestinationPort(),
},
data: pkt.Data().ExtractVV(),
}
@@ -1328,7 +1330,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB
packet.packetInfo.LocalAddr = localAddr
packet.packetInfo.DestinationAddr = localAddr
packet.packetInfo.NIC = pkt.NICID
- packet.timestamp = e.stack.Clock().NowNanoseconds()
+ packet.receivedAt = e.stack.Clock().Now()
e.rcvMu.Unlock()
diff --git a/pkg/tcpip/transport/udp/endpoint_state.go b/pkg/tcpip/transport/udp/endpoint_state.go
index 4aba68b21..1f638c3f6 100644
--- a/pkg/tcpip/transport/udp/endpoint_state.go
+++ b/pkg/tcpip/transport/udp/endpoint_state.go
@@ -15,26 +15,38 @@
package udp
import (
+ "time"
+
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
"gvisor.dev/gvisor/pkg/tcpip/header"
"gvisor.dev/gvisor/pkg/tcpip/stack"
)
+// saveReceivedAt is invoked by stateify.
+func (p *udpPacket) saveReceivedAt() int64 {
+ return p.receivedAt.UnixNano()
+}
+
+// loadReceivedAt is invoked by stateify.
+func (p *udpPacket) loadReceivedAt(nsec int64) {
+ p.receivedAt = time.Unix(0, nsec)
+}
+
// saveData saves udpPacket.data field.
-func (u *udpPacket) saveData() buffer.VectorisedView {
- // We cannot save u.data directly as u.data.views may alias to u.views,
+func (p *udpPacket) saveData() buffer.VectorisedView {
+ // We cannot save p.data directly as p.data.views may alias to p.views,
// which is not allowed by state framework (in-struct pointer).
- return u.data.Clone(nil)
+ return p.data.Clone(nil)
}
// loadData loads udpPacket.data field.
-func (u *udpPacket) loadData(data buffer.VectorisedView) {
- // NOTE: We cannot do the u.data = data.Clone(u.views[:]) optimization
+func (p *udpPacket) loadData(data buffer.VectorisedView) {
+ // NOTE: We cannot do the p.data = data.Clone(p.views[:]) optimization
// here because data.views is not guaranteed to be loaded by now. Plus,
// data.views will be allocated anyway so there really is little point
- // of utilizing u.views for data.views.
- u.data = data
+ // of utilizing p.views for data.views.
+ p.data = data
}
// afterLoad is invoked by stateify.
diff --git a/pkg/tcpip/transport/udp/forwarder.go b/pkg/tcpip/transport/udp/forwarder.go
index 705ad1f64..7c357cb09 100644
--- a/pkg/tcpip/transport/udp/forwarder.go
+++ b/pkg/tcpip/transport/udp/forwarder.go
@@ -90,7 +90,7 @@ func (r *ForwarderRequest) CreateEndpoint(queue *waiter.Queue) (tcpip.Endpoint,
ep.RegisterNICID = r.pkt.NICID
ep.boundPortFlags = ep.portFlags
- ep.state = StateConnected
+ ep.state = uint32(StateConnected)
ep.rcvMu.Lock()
ep.rcvReady = true
diff --git a/pkg/tcpip/transport/udp/udp_test.go b/pkg/tcpip/transport/udp/udp_test.go
index dc2e3f493..4008cacf2 100644
--- a/pkg/tcpip/transport/udp/udp_test.go
+++ b/pkg/tcpip/transport/udp/udp_test.go
@@ -16,17 +16,16 @@ package udp_test
import (
"bytes"
- "context"
"fmt"
"io/ioutil"
"math/rand"
"testing"
- "time"
"github.com/google/go-cmp/cmp"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
"gvisor.dev/gvisor/pkg/tcpip/checker"
+ "gvisor.dev/gvisor/pkg/tcpip/faketime"
"gvisor.dev/gvisor/pkg/tcpip/header"
"gvisor.dev/gvisor/pkg/tcpip/link/channel"
"gvisor.dev/gvisor/pkg/tcpip/link/loopback"
@@ -298,16 +297,18 @@ type testContext struct {
func newDualTestContext(t *testing.T, mtu uint32) *testContext {
t.Helper()
- return newDualTestContextWithOptions(t, mtu, stack.Options{
- NetworkProtocols: []stack.NetworkProtocolFactory{ipv4.NewProtocol, ipv6.NewProtocol},
- TransportProtocols: []stack.TransportProtocolFactory{udp.NewProtocol, icmp.NewProtocol6, icmp.NewProtocol4},
- HandleLocal: true,
- })
+ return newDualTestContextWithHandleLocal(t, mtu, true)
}
-func newDualTestContextWithOptions(t *testing.T, mtu uint32, options stack.Options) *testContext {
+func newDualTestContextWithHandleLocal(t *testing.T, mtu uint32, handleLocal bool) *testContext {
t.Helper()
+ options := stack.Options{
+ NetworkProtocols: []stack.NetworkProtocolFactory{ipv4.NewProtocol, ipv6.NewProtocol},
+ TransportProtocols: []stack.TransportProtocolFactory{udp.NewProtocol, icmp.NewProtocol6, icmp.NewProtocol4},
+ HandleLocal: handleLocal,
+ Clock: &faketime.NullClock{},
+ }
s := stack.New(options)
ep := channel.New(256, mtu, "")
wep := stack.LinkEndpoint(ep)
@@ -378,9 +379,7 @@ func (c *testContext) createEndpointForFlow(flow testFlow) {
func (c *testContext) getPacketAndVerify(flow testFlow, checkers ...checker.NetworkChecker) []byte {
c.t.Helper()
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- defer cancel()
- p, ok := c.linkEP.ReadContext(ctx)
+ p, ok := c.linkEP.Read()
if !ok {
c.t.Fatalf("Packet wasn't written out")
return nil
@@ -534,7 +533,9 @@ func newMinPayload(minSize int) []byte {
func TestBindToDeviceOption(t *testing.T) {
s := stack.New(stack.Options{
NetworkProtocols: []stack.NetworkProtocolFactory{ipv4.NewProtocol},
- TransportProtocols: []stack.TransportProtocolFactory{udp.NewProtocol}})
+ TransportProtocols: []stack.TransportProtocolFactory{udp.NewProtocol},
+ Clock: &faketime.NullClock{},
+ })
ep, err := s.NewEndpoint(udp.ProtocolNumber, ipv4.ProtocolNumber, &waiter.Queue{})
if err != nil {
@@ -606,7 +607,7 @@ func testReadInternal(c *testContext, flow testFlow, packetShouldBeDropped, expe
case <-ch:
res, err = c.ep.Read(&buf, tcpip.ReadOptions{NeedRemoteAddr: true})
- case <-time.After(300 * time.Millisecond):
+ default:
if packetShouldBeDropped {
return // expected to time out
}
@@ -820,11 +821,7 @@ func TestV4ReadSelfSource(t *testing.T) {
{"NoHandleLocal", true, &tcpip.ErrWouldBlock{}, 1},
} {
t.Run(tt.name, func(t *testing.T) {
- c := newDualTestContextWithOptions(t, defaultMTU, stack.Options{
- NetworkProtocols: []stack.NetworkProtocolFactory{ipv4.NewProtocol, ipv6.NewProtocol},
- TransportProtocols: []stack.TransportProtocolFactory{udp.NewProtocol},
- HandleLocal: tt.handleLocal,
- })
+ c := newDualTestContextWithHandleLocal(t, defaultMTU, tt.handleLocal)
defer c.cleanup()
c.createEndpointForFlow(unicastV4)
@@ -1034,17 +1031,17 @@ func testWriteAndVerifyInternal(c *testContext, flow testFlow, setDest bool, che
payload := testWriteNoVerify(c, flow, setDest)
// Received the packet and check the payload.
b := c.getPacketAndVerify(flow, checkers...)
- var udp header.UDP
+ var udpH header.UDP
if flow.isV4() {
- udp = header.UDP(header.IPv4(b).Payload())
+ udpH = header.IPv4(b).Payload()
} else {
- udp = header.UDP(header.IPv6(b).Payload())
+ udpH = header.IPv6(b).Payload()
}
- if !bytes.Equal(payload, udp.Payload()) {
- c.t.Fatalf("Bad payload: got %x, want %x", udp.Payload(), payload)
+ if !bytes.Equal(payload, udpH.Payload()) {
+ c.t.Fatalf("Bad payload: got %x, want %x", udpH.Payload(), payload)
}
- return udp.SourcePort()
+ return udpH.SourcePort()
}
func testDualWrite(c *testContext) uint16 {
@@ -1198,7 +1195,7 @@ func TestWriteOnConnectedInvalidPort(t *testing.T) {
r.Reset(payload)
n, err := c.ep.Write(&r, writeOpts)
if err != nil {
- c.t.Fatalf("c.ep.Write(...) = %+s, want nil", err)
+ c.t.Fatalf("c.ep.Write(...) = %s, want nil", err)
}
if got, want := n, int64(len(payload)); got != want {
c.t.Fatalf("c.ep.Write(...) wrote %d bytes, want %d bytes", got, want)
@@ -1462,7 +1459,7 @@ func TestReadRecvOriginalDstAddr(t *testing.T) {
name: "IPv4 unicast",
proto: header.IPv4ProtocolNumber,
flow: unicastV4,
- expectedOriginalDstAddr: tcpip.FullAddress{1, stackAddr, stackPort},
+ expectedOriginalDstAddr: tcpip.FullAddress{NIC: 1, Addr: stackAddr, Port: stackPort},
},
{
name: "IPv4 multicast",
@@ -1474,7 +1471,7 @@ func TestReadRecvOriginalDstAddr(t *testing.T) {
// behaviour. We still include the test so that once the bug is
// resolved, this test will start to fail and the individual tasked
// with fixing this bug knows to also fix this test :).
- expectedOriginalDstAddr: tcpip.FullAddress{1, multicastAddr, stackPort},
+ expectedOriginalDstAddr: tcpip.FullAddress{NIC: 1, Addr: multicastAddr, Port: stackPort},
},
{
name: "IPv4 broadcast",
@@ -1486,13 +1483,13 @@ func TestReadRecvOriginalDstAddr(t *testing.T) {
// behaviour. We still include the test so that once the bug is
// resolved, this test will start to fail and the individual tasked
// with fixing this bug knows to also fix this test :).
- expectedOriginalDstAddr: tcpip.FullAddress{1, broadcastAddr, stackPort},
+ expectedOriginalDstAddr: tcpip.FullAddress{NIC: 1, Addr: broadcastAddr, Port: stackPort},
},
{
name: "IPv6 unicast",
proto: header.IPv6ProtocolNumber,
flow: unicastV6,
- expectedOriginalDstAddr: tcpip.FullAddress{1, stackV6Addr, stackPort},
+ expectedOriginalDstAddr: tcpip.FullAddress{NIC: 1, Addr: stackV6Addr, Port: stackPort},
},
{
name: "IPv6 multicast",
@@ -1504,7 +1501,7 @@ func TestReadRecvOriginalDstAddr(t *testing.T) {
// behaviour. We still include the test so that once the bug is
// resolved, this test will start to fail and the individual tasked
// with fixing this bug knows to also fix this test :).
- expectedOriginalDstAddr: tcpip.FullAddress{1, multicastV6Addr, stackPort},
+ expectedOriginalDstAddr: tcpip.FullAddress{NIC: 1, Addr: multicastV6Addr, Port: stackPort},
},
}
@@ -1614,6 +1611,7 @@ func TestTTL(t *testing.T) {
}
s := stack.New(stack.Options{
NetworkProtocols: []stack.NetworkProtocolFactory{p},
+ Clock: &faketime.NullClock{},
})
ep := s.NetworkProtocolInstance(n).NewEndpoint(&testInterface{}, nil)
wantTTL = ep.DefaultTTL()
@@ -1759,7 +1757,7 @@ func TestReceiveTosTClass(t *testing.T) {
c.t.Errorf("got GetSockOptBool(%s) = %t, want = %t", name, v, false)
}
- want := true
+ const want = true
optionSetter(want)
got := optionGetter()
@@ -1889,18 +1887,14 @@ func TestV4UnknownDestination(t *testing.T) {
}
}
if !tc.icmpRequired {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
- if p, ok := c.linkEP.ReadContext(ctx); ok {
+ if p, ok := c.linkEP.Read(); ok {
t.Fatalf("unexpected packet received: %+v", p)
}
return
}
// ICMP required.
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
- p, ok := c.linkEP.ReadContext(ctx)
+ p, ok := c.linkEP.Read()
if !ok {
t.Fatalf("packet wasn't written out")
return
@@ -1987,18 +1981,14 @@ func TestV6UnknownDestination(t *testing.T) {
}
}
if !tc.icmpRequired {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
- if p, ok := c.linkEP.ReadContext(ctx); ok {
+ if p, ok := c.linkEP.Read(); ok {
t.Fatalf("unexpected packet received: %+v", p)
}
return
}
// ICMP required.
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
- p, ok := c.linkEP.ReadContext(ctx)
+ p, ok := c.linkEP.Read()
if !ok {
t.Fatalf("packet wasn't written out")
return
@@ -2115,8 +2105,8 @@ func TestShortHeader(t *testing.T) {
Data: buf.ToVectorisedView(),
}))
- if got, want := c.s.Stats().MalformedRcvdPackets.Value(), uint64(1); got != want {
- t.Errorf("got c.s.Stats().MalformedRcvdPackets.Value() = %d, want = %d", got, want)
+ if got, want := c.s.Stats().NICs.MalformedL4RcvdPackets.Value(), uint64(1); got != want {
+ t.Errorf("got c.s.Stats().NIC.MalformedL4RcvdPackets.Value() = %d, want = %d", got, want)
}
}
@@ -2124,25 +2114,27 @@ func TestShortHeader(t *testing.T) {
// global and endpoint stats are incremented.
func TestBadChecksumErrors(t *testing.T) {
for _, flow := range []testFlow{unicastV4, unicastV6} {
- c := newDualTestContext(t, defaultMTU)
- defer c.cleanup()
+ t.Run(flow.String(), func(t *testing.T) {
+ c := newDualTestContext(t, defaultMTU)
+ defer c.cleanup()
- c.createEndpoint(flow.sockProto())
- // Bind to wildcard.
- if err := c.ep.Bind(tcpip.FullAddress{Port: stackPort}); err != nil {
- c.t.Fatalf("Bind failed: %s", err)
- }
+ c.createEndpoint(flow.sockProto())
+ // Bind to wildcard.
+ if err := c.ep.Bind(tcpip.FullAddress{Port: stackPort}); err != nil {
+ c.t.Fatalf("Bind failed: %s", err)
+ }
- payload := newPayload()
- c.injectPacket(flow, payload, true /* badChecksum */)
+ payload := newPayload()
+ c.injectPacket(flow, payload, true /* badChecksum */)
- const want = 1
- if got := c.s.Stats().UDP.ChecksumErrors.Value(); got != want {
- t.Errorf("got stats.UDP.ChecksumErrors.Value() = %d, want = %d", got, want)
- }
- if got := c.ep.Stats().(*tcpip.TransportEndpointStats).ReceiveErrors.ChecksumErrors.Value(); got != want {
- t.Errorf("got EP Stats.ReceiveErrors.ChecksumErrors stats = %d, want = %d", got, want)
- }
+ const want = 1
+ if got := c.s.Stats().UDP.ChecksumErrors.Value(); got != want {
+ t.Errorf("got stats.UDP.ChecksumErrors.Value() = %d, want = %d", got, want)
+ }
+ if got := c.ep.Stats().(*tcpip.TransportEndpointStats).ReceiveErrors.ChecksumErrors.Value(); got != want {
+ t.Errorf("got EP Stats.ReceiveErrors.ChecksumErrors stats = %d, want = %d", got, want)
+ }
+ })
}
}
@@ -2484,6 +2476,7 @@ func TestOutgoingSubnetBroadcast(t *testing.T) {
s := stack.New(stack.Options{
NetworkProtocols: []stack.NetworkProtocolFactory{ipv4.NewProtocol, ipv6.NewProtocol},
TransportProtocols: []stack.TransportProtocolFactory{udp.NewProtocol},
+ Clock: &faketime.NullClock{},
})
e := channel.New(0, defaultMTU, "")
if err := s.CreateNIC(nicID1, e); err != nil {