summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport
diff options
context:
space:
mode:
authorIan Gudger <igudger@google.com>2018-05-01 22:11:07 -0700
committerShentubot <shentubot@google.com>2018-05-01 22:11:49 -0700
commit3d3deef573a54e031cb98038b9f617f5fac31044 (patch)
tree9ed71c29dbabc80845a6b7e5510717cad354c309 /pkg/tcpip/transport
parent185233427b3834086a9050336113f9e22176fa3b (diff)
Implement SO_TIMESTAMP
PiperOrigin-RevId: 195047018 Change-Id: I6d99528a00a2125f414e1e51e067205289ec9d3d
Diffstat (limited to 'pkg/tcpip/transport')
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go22
-rw-r--r--pkg/tcpip/transport/tcp/tcp_test.go46
-rw-r--r--pkg/tcpip/transport/tcp/tcp_timestamp_test.go4
-rw-r--r--pkg/tcpip/transport/tcp/testing/context/context.go2
-rw-r--r--pkg/tcpip/transport/udp/endpoint.go37
-rw-r--r--pkg/tcpip/transport/udp/udp_test.go10
6 files changed, 74 insertions, 47 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 5d62589d8..d84171b0c 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -374,7 +374,7 @@ func (e *endpoint) cleanup() {
}
// Read reads data from the endpoint.
-func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, *tcpip.Error) {
+func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) {
e.mu.RLock()
// The endpoint can be read if it's connected, or if it's already closed
// but has some pending unread data. Also note that a RST being received
@@ -383,9 +383,9 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, *tcpip.Error) {
if s := e.state; s != stateConnected && s != stateClosed && e.rcvBufUsed == 0 {
e.mu.RUnlock()
if s == stateError {
- return buffer.View{}, e.hardError
+ return buffer.View{}, tcpip.ControlMessages{}, e.hardError
}
- return buffer.View{}, tcpip.ErrInvalidEndpointState
+ return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrInvalidEndpointState
}
e.rcvListMu.Lock()
@@ -394,7 +394,7 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, *tcpip.Error) {
e.mu.RUnlock()
- return v, err
+ return v, tcpip.ControlMessages{}, err
}
func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) {
@@ -498,7 +498,7 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, *tc
// Peek reads data without consuming it from the endpoint.
//
// This method does not block if there is no data pending.
-func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) {
+func (e *endpoint) Peek(vec [][]byte) (uintptr, tcpip.ControlMessages, *tcpip.Error) {
e.mu.RLock()
defer e.mu.RUnlock()
@@ -506,9 +506,9 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) {
// but has some pending unread data.
if s := e.state; s != stateConnected && s != stateClosed {
if s == stateError {
- return 0, e.hardError
+ return 0, tcpip.ControlMessages{}, e.hardError
}
- return 0, tcpip.ErrInvalidEndpointState
+ return 0, tcpip.ControlMessages{}, tcpip.ErrInvalidEndpointState
}
e.rcvListMu.Lock()
@@ -516,9 +516,9 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) {
if e.rcvBufUsed == 0 {
if e.rcvClosed || e.state != stateConnected {
- return 0, tcpip.ErrClosedForReceive
+ return 0, tcpip.ControlMessages{}, tcpip.ErrClosedForReceive
}
- return 0, tcpip.ErrWouldBlock
+ return 0, tcpip.ControlMessages{}, tcpip.ErrWouldBlock
}
// Make a copy of vec so we can modify the slide headers.
@@ -534,7 +534,7 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) {
for len(v) > 0 {
if len(vec) == 0 {
- return num, nil
+ return num, tcpip.ControlMessages{}, nil
}
if len(vec[0]) == 0 {
vec = vec[1:]
@@ -549,7 +549,7 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) {
}
}
- return num, nil
+ return num, tcpip.ControlMessages{}, nil
}
// zeroReceiveWindow checks if the receive window to be announced now would be
diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go
index 118d861ba..3c21a1ec3 100644
--- a/pkg/tcpip/transport/tcp/tcp_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_test.go
@@ -147,7 +147,7 @@ func TestSimpleReceive(t *testing.T) {
c.WQ.EventRegister(&we, waiter.EventIn)
defer c.WQ.EventUnregister(&we)
- if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
+ if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -169,7 +169,7 @@ func TestSimpleReceive(t *testing.T) {
}
// Receive data.
- v, err := c.EP.Read(nil)
+ v, _, err := c.EP.Read(nil)
if err != nil {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -199,7 +199,7 @@ func TestOutOfOrderReceive(t *testing.T) {
c.WQ.EventRegister(&we, waiter.EventIn)
defer c.WQ.EventUnregister(&we)
- if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
+ if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -226,7 +226,7 @@ func TestOutOfOrderReceive(t *testing.T) {
// Wait 200ms and check that no data has been received.
time.Sleep(200 * time.Millisecond)
- if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
+ if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -243,7 +243,7 @@ func TestOutOfOrderReceive(t *testing.T) {
// Receive data.
read := make([]byte, 0, 6)
for len(read) < len(data) {
- v, err := c.EP.Read(nil)
+ v, _, err := c.EP.Read(nil)
if err != nil {
if err == tcpip.ErrWouldBlock {
// Wait for receive to be notified.
@@ -284,7 +284,7 @@ func TestOutOfOrderFlood(t *testing.T) {
opt := tcpip.ReceiveBufferSizeOption(10)
c.CreateConnected(789, 30000, &opt)
- if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
+ if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -361,7 +361,7 @@ func TestRstOnCloseWithUnreadData(t *testing.T) {
c.WQ.EventRegister(&we, waiter.EventIn)
defer c.WQ.EventUnregister(&we)
- if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
+ if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -414,7 +414,7 @@ func TestFullWindowReceive(t *testing.T) {
c.WQ.EventRegister(&we, waiter.EventIn)
defer c.WQ.EventUnregister(&we)
- _, err := c.EP.Read(nil)
+ _, _, err := c.EP.Read(nil)
if err != tcpip.ErrWouldBlock {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -449,7 +449,7 @@ func TestFullWindowReceive(t *testing.T) {
)
// Receive data and check it.
- v, err := c.EP.Read(nil)
+ v, _, err := c.EP.Read(nil)
if err != nil {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -487,7 +487,7 @@ func TestNoWindowShrinking(t *testing.T) {
c.WQ.EventRegister(&we, waiter.EventIn)
defer c.WQ.EventUnregister(&we)
- _, err := c.EP.Read(nil)
+ _, _, err := c.EP.Read(nil)
if err != tcpip.ErrWouldBlock {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -551,7 +551,7 @@ func TestNoWindowShrinking(t *testing.T) {
// Receive data and check it.
read := make([]byte, 0, 10)
for len(read) < len(data) {
- v, err := c.EP.Read(nil)
+ v, _, err := c.EP.Read(nil)
if err != nil {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -954,7 +954,7 @@ func TestZeroScaledWindowReceive(t *testing.T) {
}
// Read some data. An ack should be sent in response to that.
- v, err := c.EP.Read(nil)
+ v, _, err := c.EP.Read(nil)
if err != nil {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -1337,7 +1337,7 @@ func TestReceiveOnResetConnection(t *testing.T) {
loop:
for {
- switch _, err := c.EP.Read(nil); err {
+ switch _, _, err := c.EP.Read(nil); err {
case nil:
t.Fatalf("Unexpected success.")
case tcpip.ErrWouldBlock:
@@ -2293,7 +2293,7 @@ func TestReadAfterClosedState(t *testing.T) {
c.WQ.EventRegister(&we, waiter.EventIn)
defer c.WQ.EventUnregister(&we)
- if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
+ if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -2345,7 +2345,7 @@ func TestReadAfterClosedState(t *testing.T) {
// Check that peek works.
peekBuf := make([]byte, 10)
- n, err := c.EP.Peek([][]byte{peekBuf})
+ n, _, err := c.EP.Peek([][]byte{peekBuf})
if err != nil {
t.Fatalf("Unexpected error from Peek: %v", err)
}
@@ -2356,7 +2356,7 @@ func TestReadAfterClosedState(t *testing.T) {
}
// Receive data.
- v, err := c.EP.Read(nil)
+ v, _, err := c.EP.Read(nil)
if err != nil {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -2367,11 +2367,11 @@ func TestReadAfterClosedState(t *testing.T) {
// Now that we drained the queue, check that functions fail with the
// right error code.
- if _, err := c.EP.Read(nil); err != tcpip.ErrClosedForReceive {
+ if _, _, err := c.EP.Read(nil); err != tcpip.ErrClosedForReceive {
t.Fatalf("Unexpected return from Read: got %v, want %v", err, tcpip.ErrClosedForReceive)
}
- if _, err := c.EP.Peek([][]byte{peekBuf}); err != tcpip.ErrClosedForReceive {
+ if _, _, err := c.EP.Peek([][]byte{peekBuf}); err != tcpip.ErrClosedForReceive {
t.Fatalf("Unexpected return from Peek: got %v, want %v", err, tcpip.ErrClosedForReceive)
}
}
@@ -2479,7 +2479,7 @@ func checkSendBufferSize(t *testing.T, ep tcpip.Endpoint, v int) {
}
func TestDefaultBufferSizes(t *testing.T) {
- s := stack.New([]string{ipv4.ProtocolName}, []string{tcp.ProtocolName})
+ s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName}, []string{tcp.ProtocolName})
// Check the default values.
ep, err := s.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &waiter.Queue{})
@@ -2525,7 +2525,7 @@ func TestDefaultBufferSizes(t *testing.T) {
}
func TestMinMaxBufferSizes(t *testing.T) {
- s := stack.New([]string{ipv4.ProtocolName}, []string{tcp.ProtocolName})
+ s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName}, []string{tcp.ProtocolName})
// Check the default values.
ep, err := s.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &waiter.Queue{})
@@ -2575,7 +2575,7 @@ func TestSelfConnect(t *testing.T) {
// it checks that if an endpoint binds to say 127.0.0.1:1000 then
// connects to 127.0.0.1:1000, then it will be connected to itself, and
// is able to send and receive data through the same endpoint.
- s := stack.New([]string{ipv4.ProtocolName}, []string{tcp.ProtocolName})
+ s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName}, []string{tcp.ProtocolName})
id := loopback.New()
if testing.Verbose() {
@@ -2637,13 +2637,13 @@ func TestSelfConnect(t *testing.T) {
// Read back what was written.
wq.EventUnregister(&waitEntry)
wq.EventRegister(&waitEntry, waiter.EventIn)
- rd, err := ep.Read(nil)
+ rd, _, err := ep.Read(nil)
if err != nil {
if err != tcpip.ErrWouldBlock {
t.Fatalf("Read failed: %v", err)
}
<-notifyCh
- rd, err = ep.Read(nil)
+ rd, _, err = ep.Read(nil)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
diff --git a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go
index d12081bb7..335262e43 100644
--- a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go
@@ -95,7 +95,7 @@ func TestTimeStampEnabledConnect(t *testing.T) {
// There should be 5 views to read and each of them should
// contain the same data.
for i := 0; i < 5; i++ {
- got, err := c.EP.Read(nil)
+ got, _, err := c.EP.Read(nil)
if err != nil {
t.Fatalf("Unexpected error from Read: %v", err)
}
@@ -296,7 +296,7 @@ func TestSegmentDropWhenTimestampMissing(t *testing.T) {
}
// Issue a read and we should data.
- got, err := c.EP.Read(nil)
+ got, _, err := c.EP.Read(nil)
if err != nil {
t.Fatalf("Unexpected error from Read: %v", err)
}
diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go
index 6a402d150..eb928553f 100644
--- a/pkg/tcpip/transport/tcp/testing/context/context.go
+++ b/pkg/tcpip/transport/tcp/testing/context/context.go
@@ -129,7 +129,7 @@ type Context struct {
// New allocates and initializes a test context containing a new
// stack and a link-layer endpoint.
func New(t *testing.T, mtu uint32) *Context {
- s := stack.New([]string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{tcp.ProtocolName})
+ s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{tcp.ProtocolName})
// Allow minimum send/receive buffer sizes to be 1 during tests.
if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultBufferSize, tcp.DefaultBufferSize * 10}); err != nil {
diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go
index 80fa88c4c..f86fc6d5a 100644
--- a/pkg/tcpip/transport/udp/endpoint.go
+++ b/pkg/tcpip/transport/udp/endpoint.go
@@ -19,6 +19,8 @@ type udpPacket struct {
udpPacketEntry
senderAddress tcpip.FullAddress
data buffer.VectorisedView `state:".(buffer.VectorisedView)"`
+ timestamp int64
+ hasTimestamp bool
// views is used as buffer for data when its length is large
// enough to store a VectorisedView.
views [8]buffer.View `state:"nosave"`
@@ -52,6 +54,7 @@ type endpoint struct {
rcvBufSizeMax int `state:".(int)"`
rcvBufSize int
rcvClosed bool
+ rcvTimestamp bool
// The following fields are protected by the mu mutex.
mu sync.RWMutex `state:"nosave"`
@@ -134,7 +137,7 @@ func (e *endpoint) Close() {
// Read reads data from the endpoint. This method does not block if
// there is no data pending.
-func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, *tcpip.Error) {
+func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) {
e.rcvMu.Lock()
if e.rcvList.Empty() {
@@ -143,12 +146,13 @@ func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, *tcpip.Error) {
err = tcpip.ErrClosedForReceive
}
e.rcvMu.Unlock()
- return buffer.View{}, err
+ return buffer.View{}, tcpip.ControlMessages{}, err
}
p := e.rcvList.Front()
e.rcvList.Remove(p)
e.rcvBufSize -= p.data.Size()
+ ts := e.rcvTimestamp
e.rcvMu.Unlock()
@@ -156,7 +160,12 @@ func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, *tcpip.Error) {
*addr = p.senderAddress
}
- return p.data.ToView(), nil
+ if ts && !p.hasTimestamp {
+ // Linux uses the current time.
+ p.timestamp = e.stack.NowNanoseconds()
+ }
+
+ return p.data.ToView(), tcpip.ControlMessages{HasTimestamp: ts, Timestamp: p.timestamp}, nil
}
// prepareForWrite prepares the endpoint for sending data. In particular, it
@@ -299,8 +308,8 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, *tc
}
// Peek only returns data from a single datagram, so do nothing here.
-func (e *endpoint) Peek([][]byte) (uintptr, *tcpip.Error) {
- return 0, nil
+func (e *endpoint) Peek([][]byte) (uintptr, tcpip.ControlMessages, *tcpip.Error) {
+ return 0, tcpip.ControlMessages{}, nil
}
// SetSockOpt sets a socket option. Currently not supported.
@@ -322,6 +331,11 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
}
e.v6only = v != 0
+
+ case tcpip.TimestampOption:
+ e.rcvMu.Lock()
+ e.rcvTimestamp = v != 0
+ e.rcvMu.Unlock()
}
return nil
}
@@ -370,6 +384,14 @@ func (e *endpoint) GetSockOpt(opt interface{}) *tcpip.Error {
}
e.rcvMu.Unlock()
return nil
+
+ case *tcpip.TimestampOption:
+ e.rcvMu.Lock()
+ *o = 0
+ if e.rcvTimestamp {
+ *o = 1
+ }
+ e.rcvMu.Unlock()
}
return tcpip.ErrUnknownProtocolOption
@@ -733,6 +755,11 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv
e.rcvList.PushBack(pkt)
e.rcvBufSize += vv.Size()
+ if e.rcvTimestamp {
+ pkt.timestamp = e.stack.NowNanoseconds()
+ pkt.hasTimestamp = true
+ }
+
e.rcvMu.Unlock()
// Notify any waiters that there's data to be read now.
diff --git a/pkg/tcpip/transport/udp/udp_test.go b/pkg/tcpip/transport/udp/udp_test.go
index 65c567952..1eb9ecb80 100644
--- a/pkg/tcpip/transport/udp/udp_test.go
+++ b/pkg/tcpip/transport/udp/udp_test.go
@@ -56,7 +56,7 @@ type headers struct {
}
func newDualTestContext(t *testing.T, mtu uint32) *testContext {
- s := stack.New([]string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{udp.ProtocolName})
+ s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{udp.ProtocolName})
id, linkEP := channel.New(256, mtu, "")
if testing.Verbose() {
@@ -260,12 +260,12 @@ func testV4Read(c *testContext) {
defer c.wq.EventUnregister(&we)
var addr tcpip.FullAddress
- v, err := c.ep.Read(&addr)
+ v, _, err := c.ep.Read(&addr)
if err == tcpip.ErrWouldBlock {
// Wait for data to become available.
select {
case <-ch:
- v, err = c.ep.Read(&addr)
+ v, _, err = c.ep.Read(&addr)
if err != nil {
c.t.Fatalf("Read failed: %v", err)
}
@@ -355,12 +355,12 @@ func TestV6ReadOnV6(t *testing.T) {
defer c.wq.EventUnregister(&we)
var addr tcpip.FullAddress
- v, err := c.ep.Read(&addr)
+ v, _, err := c.ep.Read(&addr)
if err == tcpip.ErrWouldBlock {
// Wait for data to become available.
select {
case <-ch:
- v, err = c.ep.Read(&addr)
+ v, _, err = c.ep.Read(&addr)
if err != nil {
c.t.Fatalf("Read failed: %v", err)
}