diff options
Diffstat (limited to 'pkg/tcpip/transport/tcp')
-rw-r--r-- | pkg/tcpip/transport/tcp/accept.go | 20 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 7 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 30 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/forwarder.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/protocol.go | 8 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_test.go | 16 |
6 files changed, 50 insertions, 33 deletions
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index b61c2a8c3..5bb243e3b 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -26,7 +26,6 @@ import ( "gvisor.dev/gvisor/pkg/sleep" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip" - "gvisor.dev/gvisor/pkg/tcpip/buffer" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/seqnum" "gvisor.dev/gvisor/pkg/tcpip/stack" @@ -433,19 +432,16 @@ func (e *endpoint) acceptQueueIsFull() bool { // handleListenSegment is called when a listening endpoint receives a segment // and needs to handle it. func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { - if s.flagsAreSet(header.TCPFlagSyn | header.TCPFlagAck) { + e.rcvListMu.Lock() + rcvClosed := e.rcvClosed + e.rcvListMu.Unlock() + if rcvClosed || s.flagsAreSet(header.TCPFlagSyn|header.TCPFlagAck) { + // If the endpoint is shutdown, reply with reset. + // // RFC 793 section 3.4 page 35 (figure 12) outlines that a RST // must be sent in response to a SYN-ACK while in the listen // state to prevent completing a handshake from an old SYN. - e.sendTCP(&s.route, tcpFields{ - id: s.id, - ttl: e.ttl, - tos: e.sendTOS, - flags: header.TCPFlagRst, - seq: s.ackNumber, - ack: 0, - rcvWnd: 0, - }, buffer.VectorisedView{}, nil) + replyWithReset(s, e.sendTOS, e.ttl) return } @@ -534,7 +530,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { // The only time we should reach here when a connection // was opened and closed really quickly and a delayed // ACK was received from the sender. - replyWithReset(s) + replyWithReset(s, e.sendTOS, e.ttl) return } diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 994ac52a3..368865911 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1053,10 +1053,15 @@ func (e *endpoint) tryDeliverSegmentFromClosedEndpoint(s *segment) { ep = e.stack.FindTransportEndpoint(header.IPv4ProtocolNumber, e.TransProto, e.ID, &s.route) } if ep == nil { - replyWithReset(s) + replyWithReset(s, stack.DefaultTOS, s.route.DefaultTTL()) s.decRef() return } + + if e == ep { + panic("current endpoint not removed from demuxer, enqueing segments to itself") + } + if ep.(*endpoint).enqueueSegment(s) { ep.(*endpoint).newSegmentWaker.Assert() } diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index bffc59e9f..5d0ea9e93 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -2101,7 +2101,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) *tcpip.Error { switch { case e.EndpointState().connected(): // Close for read. - if (e.shutdownFlags & tcpip.ShutdownRead) != 0 { + if e.shutdownFlags&tcpip.ShutdownRead != 0 { // Mark read side as closed. e.rcvListMu.Lock() e.rcvClosed = true @@ -2110,7 +2110,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) *tcpip.Error { // If we're fully closed and we have unread data we need to abort // the connection with a RST. - if (e.shutdownFlags&tcpip.ShutdownWrite) != 0 && rcvBufUsed > 0 { + if e.shutdownFlags&tcpip.ShutdownWrite != 0 && rcvBufUsed > 0 { e.resetConnectionLocked(tcpip.ErrConnectionAborted) // Wake up worker to terminate loop. e.notifyProtocolGoroutine(notifyTickleWorker) @@ -2119,7 +2119,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) *tcpip.Error { } // Close for write. - if (e.shutdownFlags & tcpip.ShutdownWrite) != 0 { + if e.shutdownFlags&tcpip.ShutdownWrite != 0 { e.sndBufMu.Lock() if e.sndClosed { // Already closed. @@ -2142,12 +2142,23 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) *tcpip.Error { return nil case e.EndpointState() == StateListen: - // Tell protocolListenLoop to stop. - if flags&tcpip.ShutdownRead != 0 { - e.notifyProtocolGoroutine(notifyClose) + if e.shutdownFlags&tcpip.ShutdownRead != 0 { + // Reset all connections from the accept queue and keep the + // worker running so that it can continue handling incoming + // segments by replying with RST. + // + // By not removing this endpoint from the demuxer mapping, we + // ensure that any other bind to the same port fails, as on Linux. + // TODO(gvisor.dev/issue/2468): We need to enable applications to + // start listening on this endpoint again similar to Linux. + e.rcvListMu.Lock() + e.rcvClosed = true + e.rcvListMu.Unlock() + e.closePendingAcceptableConnectionsLocked() + // Notify waiters that the endpoint is shutdown. + e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr) } return nil - default: return tcpip.ErrNotConnected } @@ -2251,8 +2262,11 @@ func (e *endpoint) Accept() (tcpip.Endpoint, *waiter.Queue, *tcpip.Error) { e.LockUser() defer e.UnlockUser() + e.rcvListMu.Lock() + rcvClosed := e.rcvClosed + e.rcvListMu.Unlock() // Endpoint must be in listen state before it can accept connections. - if e.EndpointState() != StateListen { + if rcvClosed || e.EndpointState() != StateListen { return nil, nil, tcpip.ErrInvalidEndpointState } diff --git a/pkg/tcpip/transport/tcp/forwarder.go b/pkg/tcpip/transport/tcp/forwarder.go index 808410c92..704d01c64 100644 --- a/pkg/tcpip/transport/tcp/forwarder.go +++ b/pkg/tcpip/transport/tcp/forwarder.go @@ -130,7 +130,7 @@ func (r *ForwarderRequest) Complete(sendReset bool) { // If the caller requested, send a reset. if sendReset { - replyWithReset(r.segment) + replyWithReset(r.segment, stack.DefaultTOS, r.segment.route.DefaultTTL()) } // Release all resources. diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go index effbf203f..cfd9a4e8e 100644 --- a/pkg/tcpip/transport/tcp/protocol.go +++ b/pkg/tcpip/transport/tcp/protocol.go @@ -223,12 +223,12 @@ func (*protocol) HandleUnknownDestinationPacket(r *stack.Route, id stack.Transpo return true } - replyWithReset(s) + replyWithReset(s, stack.DefaultTOS, s.route.DefaultTTL()) return true } // replyWithReset replies to the given segment with a reset segment. -func replyWithReset(s *segment) { +func replyWithReset(s *segment, tos, ttl uint8) { // Get the seqnum from the packet if the ack flag is set. seq := seqnum.Value(0) ack := seqnum.Value(0) @@ -252,8 +252,8 @@ func replyWithReset(s *segment) { } sendTCP(&s.route, tcpFields{ id: s.id, - ttl: s.route.DefaultTTL(), - tos: stack.DefaultTOS, + ttl: ttl, + tos: tos, flags: flags, seq: seq, ack: ack, diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index 74fb6e064..ab1014c7f 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -1034,8 +1034,8 @@ func TestSendRstOnListenerRxAckV6(t *testing.T) { checker.SeqNum(200))) } -// TestListenShutdown tests for the listening endpoint not processing -// any receive when it is on read shutdown. +// TestListenShutdown tests for the listening endpoint replying with RST +// on read shutdown. func TestListenShutdown(t *testing.T) { c := context.New(t, defaultMTU) defer c.Cleanup() @@ -1046,7 +1046,7 @@ func TestListenShutdown(t *testing.T) { t.Fatal("Bind failed:", err) } - if err := c.EP.Listen(10 /* backlog */); err != nil { + if err := c.EP.Listen(1 /* backlog */); err != nil { t.Fatal("Listen failed:", err) } @@ -1054,9 +1054,6 @@ func TestListenShutdown(t *testing.T) { t.Fatal("Shutdown failed:", err) } - // Wait for the endpoint state to be propagated. - time.Sleep(10 * time.Millisecond) - c.SendPacket(nil, &context.Headers{ SrcPort: context.TestPort, DstPort: context.StackPort, @@ -1065,7 +1062,12 @@ func TestListenShutdown(t *testing.T) { AckNum: 200, }) - c.CheckNoPacket("Packet received when listening socket was shutdown") + // Expect the listening endpoint to reset the connection. + checker.IPv4(t, c.GetPacket(), + checker.TCP( + checker.DstPort(context.TestPort), + checker.TCPFlags(header.TCPFlagAck|header.TCPFlagRst), + )) } // TestListenCloseWhileConnect tests for the listening endpoint to |