diff options
Diffstat (limited to 'pkg/tcpip/transport/tcp/endpoint.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 71 |
1 files changed, 68 insertions, 3 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index e78138415..4f4f4c65e 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -441,6 +441,11 @@ type endpoint struct { v6only bool isConnectNotified bool + // h stores a reference to the current handshake state if the endpoint is in + // the SYN-SENT or SYN-RECV states, in which case endpoint == endpoint.h.ep. + // nil otherwise. + h *handshake `state:"nosave"` + // portFlags stores the current values of port related flags. portFlags ports.Flags @@ -922,6 +927,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue e.segmentQueue.ep = e e.tsOffset = timeStampOffset() e.acceptCond = sync.NewCond(&e.acceptMu) + e.keepalive.timer.init(&e.keepalive.waker) return e } @@ -1146,6 +1152,7 @@ func (e *endpoint) cleanupLocked() { // Close all endpoints that might have been accepted by TCP but not by // the client. e.closePendingAcceptableConnectionsLocked() + e.keepalive.timer.cleanup() e.workerCleanup = false @@ -2175,6 +2182,8 @@ func (*endpoint) Disconnect() *tcpip.Error { func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error { err := e.connect(addr, true, true) if err != nil && !err.IgnoreStats() { + // Connect failed. Let's wake up any waiters. + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) e.stack.Stats().TCP.FailedConnectionAttempts.Increment() e.stats.FailedConnectionAttempts.Increment() } @@ -2387,14 +2396,70 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) *tc } if run { - e.workerRunning = true - e.stack.Stats().TCP.ActiveConnectionOpenings.Increment() - go e.protocolMainLoop(handshake, nil) // S/R-SAFE: will be drained before save. + if err := e.startMainLoop(handshake); err != nil { + return err + } } return tcpip.ErrConnectStarted } +// startMainLoop sends the initial SYN and starts the main loop for the +// endpoint. +func (e *endpoint) startMainLoop(handshake bool) *tcpip.Error { + preloop := func() *tcpip.Error { + if handshake { + h := e.newHandshake() + e.setEndpointState(StateSynSent) + if err := h.start(); err != nil { + e.lastErrorMu.Lock() + e.lastError = err + e.lastErrorMu.Unlock() + + e.setEndpointState(StateError) + e.HardError = err + + // Call cleanupLocked to free up any reservations. + e.cleanupLocked() + return err + } + } + e.stack.Stats().TCP.ActiveConnectionOpenings.Increment() + return nil + } + + if e.route.IsResolutionRequired() { + // If the endpoint is closed between releasing e.mu and the goroutine below + // acquiring it, make sure that cleanup is deferred to the new goroutine. + e.workerRunning = true + + // Sending the initial SYN may block due to route resolution; do it in a + // separate goroutine to avoid blocking the syscall goroutine. + go func() { // S/R-SAFE: will be drained before save. + e.mu.Lock() + if err := preloop(); err != nil { + e.workerRunning = false + e.mu.Unlock() + return + } + e.mu.Unlock() + _ = e.protocolMainLoop(handshake, nil) + }() + return nil + } + + // No route resolution is required, so we can send the initial SYN here without + // blocking. This will hopefully reduce overall latency by overlapping time + // spent waiting for a SYN-ACK and time spent spinning up a new goroutine + // for the main loop. + if err := preloop(); err != nil { + return err + } + e.workerRunning = true + go e.protocolMainLoop(handshake, nil) // S/R-SAFE: will be drained before save. + return nil +} + // ConnectEndpoint is not supported. func (*endpoint) ConnectEndpoint(tcpip.Endpoint) *tcpip.Error { return tcpip.ErrInvalidEndpointState |