diff options
author | Ian Gudger <igudger@google.com> | 2018-05-01 22:11:07 -0700 |
---|---|---|
committer | Shentubot <shentubot@google.com> | 2018-05-01 22:11:49 -0700 |
commit | 3d3deef573a54e031cb98038b9f617f5fac31044 (patch) | |
tree | 9ed71c29dbabc80845a6b7e5510717cad354c309 /pkg/tcpip/transport | |
parent | 185233427b3834086a9050336113f9e22176fa3b (diff) |
Implement SO_TIMESTAMP
PiperOrigin-RevId: 195047018
Change-Id: I6d99528a00a2125f414e1e51e067205289ec9d3d
Diffstat (limited to 'pkg/tcpip/transport')
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 22 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_test.go | 46 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_timestamp_test.go | 4 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/testing/context/context.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/endpoint.go | 37 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/udp_test.go | 10 |
6 files changed, 74 insertions, 47 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 5d62589d8..d84171b0c 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -374,7 +374,7 @@ func (e *endpoint) cleanup() { } // Read reads data from the endpoint. -func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, *tcpip.Error) { +func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { e.mu.RLock() // The endpoint can be read if it's connected, or if it's already closed // but has some pending unread data. Also note that a RST being received @@ -383,9 +383,9 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, *tcpip.Error) { if s := e.state; s != stateConnected && s != stateClosed && e.rcvBufUsed == 0 { e.mu.RUnlock() if s == stateError { - return buffer.View{}, e.hardError + return buffer.View{}, tcpip.ControlMessages{}, e.hardError } - return buffer.View{}, tcpip.ErrInvalidEndpointState + return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrInvalidEndpointState } e.rcvListMu.Lock() @@ -394,7 +394,7 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, *tcpip.Error) { e.mu.RUnlock() - return v, err + return v, tcpip.ControlMessages{}, err } func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) { @@ -498,7 +498,7 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, *tc // Peek reads data without consuming it from the endpoint. // // This method does not block if there is no data pending. -func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) { +func (e *endpoint) Peek(vec [][]byte) (uintptr, tcpip.ControlMessages, *tcpip.Error) { e.mu.RLock() defer e.mu.RUnlock() @@ -506,9 +506,9 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) { // but has some pending unread data. if s := e.state; s != stateConnected && s != stateClosed { if s == stateError { - return 0, e.hardError + return 0, tcpip.ControlMessages{}, e.hardError } - return 0, tcpip.ErrInvalidEndpointState + return 0, tcpip.ControlMessages{}, tcpip.ErrInvalidEndpointState } e.rcvListMu.Lock() @@ -516,9 +516,9 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) { if e.rcvBufUsed == 0 { if e.rcvClosed || e.state != stateConnected { - return 0, tcpip.ErrClosedForReceive + return 0, tcpip.ControlMessages{}, tcpip.ErrClosedForReceive } - return 0, tcpip.ErrWouldBlock + return 0, tcpip.ControlMessages{}, tcpip.ErrWouldBlock } // Make a copy of vec so we can modify the slide headers. @@ -534,7 +534,7 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) { for len(v) > 0 { if len(vec) == 0 { - return num, nil + return num, tcpip.ControlMessages{}, nil } if len(vec[0]) == 0 { vec = vec[1:] @@ -549,7 +549,7 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, *tcpip.Error) { } } - return num, nil + return num, tcpip.ControlMessages{}, nil } // zeroReceiveWindow checks if the receive window to be announced now would be diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index 118d861ba..3c21a1ec3 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -147,7 +147,7 @@ func TestSimpleReceive(t *testing.T) { c.WQ.EventRegister(&we, waiter.EventIn) defer c.WQ.EventUnregister(&we) - if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { + if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { t.Fatalf("Unexpected error from Read: %v", err) } @@ -169,7 +169,7 @@ func TestSimpleReceive(t *testing.T) { } // Receive data. - v, err := c.EP.Read(nil) + v, _, err := c.EP.Read(nil) if err != nil { t.Fatalf("Unexpected error from Read: %v", err) } @@ -199,7 +199,7 @@ func TestOutOfOrderReceive(t *testing.T) { c.WQ.EventRegister(&we, waiter.EventIn) defer c.WQ.EventUnregister(&we) - if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { + if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { t.Fatalf("Unexpected error from Read: %v", err) } @@ -226,7 +226,7 @@ func TestOutOfOrderReceive(t *testing.T) { // Wait 200ms and check that no data has been received. time.Sleep(200 * time.Millisecond) - if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { + if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { t.Fatalf("Unexpected error from Read: %v", err) } @@ -243,7 +243,7 @@ func TestOutOfOrderReceive(t *testing.T) { // Receive data. read := make([]byte, 0, 6) for len(read) < len(data) { - v, err := c.EP.Read(nil) + v, _, err := c.EP.Read(nil) if err != nil { if err == tcpip.ErrWouldBlock { // Wait for receive to be notified. @@ -284,7 +284,7 @@ func TestOutOfOrderFlood(t *testing.T) { opt := tcpip.ReceiveBufferSizeOption(10) c.CreateConnected(789, 30000, &opt) - if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { + if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { t.Fatalf("Unexpected error from Read: %v", err) } @@ -361,7 +361,7 @@ func TestRstOnCloseWithUnreadData(t *testing.T) { c.WQ.EventRegister(&we, waiter.EventIn) defer c.WQ.EventUnregister(&we) - if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { + if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { t.Fatalf("Unexpected error from Read: %v", err) } @@ -414,7 +414,7 @@ func TestFullWindowReceive(t *testing.T) { c.WQ.EventRegister(&we, waiter.EventIn) defer c.WQ.EventUnregister(&we) - _, err := c.EP.Read(nil) + _, _, err := c.EP.Read(nil) if err != tcpip.ErrWouldBlock { t.Fatalf("Unexpected error from Read: %v", err) } @@ -449,7 +449,7 @@ func TestFullWindowReceive(t *testing.T) { ) // Receive data and check it. - v, err := c.EP.Read(nil) + v, _, err := c.EP.Read(nil) if err != nil { t.Fatalf("Unexpected error from Read: %v", err) } @@ -487,7 +487,7 @@ func TestNoWindowShrinking(t *testing.T) { c.WQ.EventRegister(&we, waiter.EventIn) defer c.WQ.EventUnregister(&we) - _, err := c.EP.Read(nil) + _, _, err := c.EP.Read(nil) if err != tcpip.ErrWouldBlock { t.Fatalf("Unexpected error from Read: %v", err) } @@ -551,7 +551,7 @@ func TestNoWindowShrinking(t *testing.T) { // Receive data and check it. read := make([]byte, 0, 10) for len(read) < len(data) { - v, err := c.EP.Read(nil) + v, _, err := c.EP.Read(nil) if err != nil { t.Fatalf("Unexpected error from Read: %v", err) } @@ -954,7 +954,7 @@ func TestZeroScaledWindowReceive(t *testing.T) { } // Read some data. An ack should be sent in response to that. - v, err := c.EP.Read(nil) + v, _, err := c.EP.Read(nil) if err != nil { t.Fatalf("Unexpected error from Read: %v", err) } @@ -1337,7 +1337,7 @@ func TestReceiveOnResetConnection(t *testing.T) { loop: for { - switch _, err := c.EP.Read(nil); err { + switch _, _, err := c.EP.Read(nil); err { case nil: t.Fatalf("Unexpected success.") case tcpip.ErrWouldBlock: @@ -2293,7 +2293,7 @@ func TestReadAfterClosedState(t *testing.T) { c.WQ.EventRegister(&we, waiter.EventIn) defer c.WQ.EventUnregister(&we) - if _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { + if _, _, err := c.EP.Read(nil); err != tcpip.ErrWouldBlock { t.Fatalf("Unexpected error from Read: %v", err) } @@ -2345,7 +2345,7 @@ func TestReadAfterClosedState(t *testing.T) { // Check that peek works. peekBuf := make([]byte, 10) - n, err := c.EP.Peek([][]byte{peekBuf}) + n, _, err := c.EP.Peek([][]byte{peekBuf}) if err != nil { t.Fatalf("Unexpected error from Peek: %v", err) } @@ -2356,7 +2356,7 @@ func TestReadAfterClosedState(t *testing.T) { } // Receive data. - v, err := c.EP.Read(nil) + v, _, err := c.EP.Read(nil) if err != nil { t.Fatalf("Unexpected error from Read: %v", err) } @@ -2367,11 +2367,11 @@ func TestReadAfterClosedState(t *testing.T) { // Now that we drained the queue, check that functions fail with the // right error code. - if _, err := c.EP.Read(nil); err != tcpip.ErrClosedForReceive { + if _, _, err := c.EP.Read(nil); err != tcpip.ErrClosedForReceive { t.Fatalf("Unexpected return from Read: got %v, want %v", err, tcpip.ErrClosedForReceive) } - if _, err := c.EP.Peek([][]byte{peekBuf}); err != tcpip.ErrClosedForReceive { + if _, _, err := c.EP.Peek([][]byte{peekBuf}); err != tcpip.ErrClosedForReceive { t.Fatalf("Unexpected return from Peek: got %v, want %v", err, tcpip.ErrClosedForReceive) } } @@ -2479,7 +2479,7 @@ func checkSendBufferSize(t *testing.T, ep tcpip.Endpoint, v int) { } func TestDefaultBufferSizes(t *testing.T) { - s := stack.New([]string{ipv4.ProtocolName}, []string{tcp.ProtocolName}) + s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName}, []string{tcp.ProtocolName}) // Check the default values. ep, err := s.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &waiter.Queue{}) @@ -2525,7 +2525,7 @@ func TestDefaultBufferSizes(t *testing.T) { } func TestMinMaxBufferSizes(t *testing.T) { - s := stack.New([]string{ipv4.ProtocolName}, []string{tcp.ProtocolName}) + s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName}, []string{tcp.ProtocolName}) // Check the default values. ep, err := s.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &waiter.Queue{}) @@ -2575,7 +2575,7 @@ func TestSelfConnect(t *testing.T) { // it checks that if an endpoint binds to say 127.0.0.1:1000 then // connects to 127.0.0.1:1000, then it will be connected to itself, and // is able to send and receive data through the same endpoint. - s := stack.New([]string{ipv4.ProtocolName}, []string{tcp.ProtocolName}) + s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName}, []string{tcp.ProtocolName}) id := loopback.New() if testing.Verbose() { @@ -2637,13 +2637,13 @@ func TestSelfConnect(t *testing.T) { // Read back what was written. wq.EventUnregister(&waitEntry) wq.EventRegister(&waitEntry, waiter.EventIn) - rd, err := ep.Read(nil) + rd, _, err := ep.Read(nil) if err != nil { if err != tcpip.ErrWouldBlock { t.Fatalf("Read failed: %v", err) } <-notifyCh - rd, err = ep.Read(nil) + rd, _, err = ep.Read(nil) if err != nil { t.Fatalf("Read failed: %v", err) } diff --git a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go index d12081bb7..335262e43 100644 --- a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go @@ -95,7 +95,7 @@ func TestTimeStampEnabledConnect(t *testing.T) { // There should be 5 views to read and each of them should // contain the same data. for i := 0; i < 5; i++ { - got, err := c.EP.Read(nil) + got, _, err := c.EP.Read(nil) if err != nil { t.Fatalf("Unexpected error from Read: %v", err) } @@ -296,7 +296,7 @@ func TestSegmentDropWhenTimestampMissing(t *testing.T) { } // Issue a read and we should data. - got, err := c.EP.Read(nil) + got, _, err := c.EP.Read(nil) if err != nil { t.Fatalf("Unexpected error from Read: %v", err) } diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go index 6a402d150..eb928553f 100644 --- a/pkg/tcpip/transport/tcp/testing/context/context.go +++ b/pkg/tcpip/transport/tcp/testing/context/context.go @@ -129,7 +129,7 @@ type Context struct { // New allocates and initializes a test context containing a new // stack and a link-layer endpoint. func New(t *testing.T, mtu uint32) *Context { - s := stack.New([]string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{tcp.ProtocolName}) + s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{tcp.ProtocolName}) // Allow minimum send/receive buffer sizes to be 1 during tests. if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, tcp.SendBufferSizeOption{1, tcp.DefaultBufferSize, tcp.DefaultBufferSize * 10}); err != nil { diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 80fa88c4c..f86fc6d5a 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -19,6 +19,8 @@ type udpPacket struct { udpPacketEntry senderAddress tcpip.FullAddress data buffer.VectorisedView `state:".(buffer.VectorisedView)"` + timestamp int64 + hasTimestamp bool // views is used as buffer for data when its length is large // enough to store a VectorisedView. views [8]buffer.View `state:"nosave"` @@ -52,6 +54,7 @@ type endpoint struct { rcvBufSizeMax int `state:".(int)"` rcvBufSize int rcvClosed bool + rcvTimestamp bool // The following fields are protected by the mu mutex. mu sync.RWMutex `state:"nosave"` @@ -134,7 +137,7 @@ func (e *endpoint) Close() { // Read reads data from the endpoint. This method does not block if // there is no data pending. -func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, *tcpip.Error) { +func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { e.rcvMu.Lock() if e.rcvList.Empty() { @@ -143,12 +146,13 @@ func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, *tcpip.Error) { err = tcpip.ErrClosedForReceive } e.rcvMu.Unlock() - return buffer.View{}, err + return buffer.View{}, tcpip.ControlMessages{}, err } p := e.rcvList.Front() e.rcvList.Remove(p) e.rcvBufSize -= p.data.Size() + ts := e.rcvTimestamp e.rcvMu.Unlock() @@ -156,7 +160,12 @@ func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, *tcpip.Error) { *addr = p.senderAddress } - return p.data.ToView(), nil + if ts && !p.hasTimestamp { + // Linux uses the current time. + p.timestamp = e.stack.NowNanoseconds() + } + + return p.data.ToView(), tcpip.ControlMessages{HasTimestamp: ts, Timestamp: p.timestamp}, nil } // prepareForWrite prepares the endpoint for sending data. In particular, it @@ -299,8 +308,8 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, *tc } // Peek only returns data from a single datagram, so do nothing here. -func (e *endpoint) Peek([][]byte) (uintptr, *tcpip.Error) { - return 0, nil +func (e *endpoint) Peek([][]byte) (uintptr, tcpip.ControlMessages, *tcpip.Error) { + return 0, tcpip.ControlMessages{}, nil } // SetSockOpt sets a socket option. Currently not supported. @@ -322,6 +331,11 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { } e.v6only = v != 0 + + case tcpip.TimestampOption: + e.rcvMu.Lock() + e.rcvTimestamp = v != 0 + e.rcvMu.Unlock() } return nil } @@ -370,6 +384,14 @@ func (e *endpoint) GetSockOpt(opt interface{}) *tcpip.Error { } e.rcvMu.Unlock() return nil + + case *tcpip.TimestampOption: + e.rcvMu.Lock() + *o = 0 + if e.rcvTimestamp { + *o = 1 + } + e.rcvMu.Unlock() } return tcpip.ErrUnknownProtocolOption @@ -733,6 +755,11 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv e.rcvList.PushBack(pkt) e.rcvBufSize += vv.Size() + if e.rcvTimestamp { + pkt.timestamp = e.stack.NowNanoseconds() + pkt.hasTimestamp = true + } + e.rcvMu.Unlock() // Notify any waiters that there's data to be read now. diff --git a/pkg/tcpip/transport/udp/udp_test.go b/pkg/tcpip/transport/udp/udp_test.go index 65c567952..1eb9ecb80 100644 --- a/pkg/tcpip/transport/udp/udp_test.go +++ b/pkg/tcpip/transport/udp/udp_test.go @@ -56,7 +56,7 @@ type headers struct { } func newDualTestContext(t *testing.T, mtu uint32) *testContext { - s := stack.New([]string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{udp.ProtocolName}) + s := stack.New(&tcpip.StdClock{}, []string{ipv4.ProtocolName, ipv6.ProtocolName}, []string{udp.ProtocolName}) id, linkEP := channel.New(256, mtu, "") if testing.Verbose() { @@ -260,12 +260,12 @@ func testV4Read(c *testContext) { defer c.wq.EventUnregister(&we) var addr tcpip.FullAddress - v, err := c.ep.Read(&addr) + v, _, err := c.ep.Read(&addr) if err == tcpip.ErrWouldBlock { // Wait for data to become available. select { case <-ch: - v, err = c.ep.Read(&addr) + v, _, err = c.ep.Read(&addr) if err != nil { c.t.Fatalf("Read failed: %v", err) } @@ -355,12 +355,12 @@ func TestV6ReadOnV6(t *testing.T) { defer c.wq.EventUnregister(&we) var addr tcpip.FullAddress - v, err := c.ep.Read(&addr) + v, _, err := c.ep.Read(&addr) if err == tcpip.ErrWouldBlock { // Wait for data to become available. select { case <-ch: - v, err = c.ep.Read(&addr) + v, _, err = c.ep.Read(&addr) if err != nil { c.t.Fatalf("Read failed: %v", err) } |