From 275ac8ce1debf89a22eb1150df3bf9ba7a0bc9ba Mon Sep 17 00:00:00 2001 From: Bhasker Hariharan Date: Wed, 15 Jan 2020 13:20:14 -0800 Subject: Bugfix to terminate the protocol loop on StateError. The change to introduce worker goroutines can cause the endpoint to transition to StateError and we should terminate the loop rather than let the endpoint transition to a CLOSED state as we do in case the endpoint enters TIME-WAIT/CLOSED. Moving to a closed state would cause the actual error to not be propagated to any read() calls etc. PiperOrigin-RevId: 289923568 --- pkg/tcpip/transport/tcp/connect.go | 50 +++++++++++++++++++++++------------ pkg/tcpip/transport/tcp/dispatcher.go | 6 +++++ pkg/tcpip/transport/tcp/endpoint.go | 7 +---- 3 files changed, 40 insertions(+), 23 deletions(-) (limited to 'pkg/tcpip/transport/tcp') diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index f3896715b..a2f384384 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1019,10 +1019,6 @@ func (e *endpoint) handleSegments(fastPath bool) *tcpip.Error { cont, err := e.handleSegment(s) if err != nil { s.decRef() - e.mu.Lock() - e.setEndpointState(StateError) - e.HardError = err - e.mu.Unlock() return err } if !cont { @@ -1414,30 +1410,50 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ // Main loop. Handle segments until both send and receive ends of the // connection have completed. + cleanupOnError := func(err *tcpip.Error) { + e.mu.Lock() + e.workerCleanup = true + if err != nil { + e.resetConnectionLocked(err) + } + // Lock released below. + epilogue() + } +loop: for e.EndpointState() != StateTimeWait && e.EndpointState() != StateClose && e.EndpointState() != StateError { e.mu.Unlock() e.workMu.Unlock() v, _ := s.Fetch(true) e.workMu.Lock() - // We need to double check here because the notification - // maybe stale by the time we got around to processing it. + + // We need to double check here because the notification maybe + // stale by the time we got around to processing it. + // // NOTE: since we now hold the workMu the processors cannot - // change the state of the endpoint so it' safe to proceed + // change the state of the endpoint so it's safe to proceed // after this check. - if e.EndpointState() == StateTimeWait || e.EndpointState() == StateClose || e.EndpointState() == StateError { + switch e.EndpointState() { + case StateError: + // If the endpoint has already transitioned to an ERROR + // state just pass nil here as any reset that may need + // to be sent etc should already have been done and we + // just want to terminate the loop and cleanup the + // endpoint. + cleanupOnError(nil) + return nil + case StateTimeWait: + fallthrough + case StateClose: e.mu.Lock() - break - } - if err := funcs[v].f(); err != nil { + break loop + default: + if err := funcs[v].f(); err != nil { + cleanupOnError(err) + return nil + } e.mu.Lock() - e.workerCleanup = true - e.resetConnectionLocked(err) - // Lock released below. - epilogue() - return nil } - e.mu.Lock() } state := e.EndpointState() diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go index a72f0c379..e18012ac0 100644 --- a/pkg/tcpip/transport/tcp/dispatcher.go +++ b/pkg/tcpip/transport/tcp/dispatcher.go @@ -119,6 +119,12 @@ func (p *processor) handleSegments() { // direct delivery to ensure low latency and avoid // scheduler interactions. if err := ep.handleSegments(true /* fastPath */); err != nil || ep.EndpointState() == StateClose { + // Send any active resets if required. + if err != nil { + ep.mu.Lock() + ep.resetConnectionLocked(err) + ep.mu.Unlock() + } ep.notifyProtocolGoroutine(notifyTickleWorker) ep.workMu.Unlock() continue diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 1799c6e10..4797f11d1 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -2019,18 +2019,13 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error { // If we're fully closed and we have unread data we need to abort // the connection with a RST. if (e.shutdownFlags&tcpip.ShutdownWrite) != 0 && rcvBufUsed > 0 { - // Move the socket to error state immediately. - // This is done redundantly because in case of - // save/restore on a Shutdown/Close() the socket - // state needs to indicate the error otherwise - // save file will show the socket in established - // state even though snd/rcv are closed. e.mu.Unlock() // Try to send an active reset immediately if the // work mutex is available. if e.workMu.TryLock() { e.mu.Lock() e.resetConnectionLocked(tcpip.ErrConnectionAborted) + e.notifyProtocolGoroutine(notifyTickleWorker) e.mu.Unlock() e.workMu.Unlock() } else { -- cgit v1.2.3