diff options
Diffstat (limited to 'pkg/tcpip')
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 50 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/dispatcher.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 7 |
3 files changed, 40 insertions, 23 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index f3896715b..a2f384384 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1019,10 +1019,6 @@ func (e *endpoint) handleSegments(fastPath bool) *tcpip.Error { cont, err := e.handleSegment(s) if err != nil { s.decRef() - e.mu.Lock() - e.setEndpointState(StateError) - e.HardError = err - e.mu.Unlock() return err } if !cont { @@ -1414,30 +1410,50 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ // Main loop. Handle segments until both send and receive ends of the // connection have completed. + cleanupOnError := func(err *tcpip.Error) { + e.mu.Lock() + e.workerCleanup = true + if err != nil { + e.resetConnectionLocked(err) + } + // Lock released below. + epilogue() + } +loop: for e.EndpointState() != StateTimeWait && e.EndpointState() != StateClose && e.EndpointState() != StateError { e.mu.Unlock() e.workMu.Unlock() v, _ := s.Fetch(true) e.workMu.Lock() - // We need to double check here because the notification - // maybe stale by the time we got around to processing it. + + // We need to double check here because the notification maybe + // stale by the time we got around to processing it. + // // NOTE: since we now hold the workMu the processors cannot - // change the state of the endpoint so it' safe to proceed + // change the state of the endpoint so it's safe to proceed // after this check. - if e.EndpointState() == StateTimeWait || e.EndpointState() == StateClose || e.EndpointState() == StateError { + switch e.EndpointState() { + case StateError: + // If the endpoint has already transitioned to an ERROR + // state just pass nil here as any reset that may need + // to be sent etc should already have been done and we + // just want to terminate the loop and cleanup the + // endpoint. + cleanupOnError(nil) + return nil + case StateTimeWait: + fallthrough + case StateClose: e.mu.Lock() - break - } - if err := funcs[v].f(); err != nil { + break loop + default: + if err := funcs[v].f(); err != nil { + cleanupOnError(err) + return nil + } e.mu.Lock() - e.workerCleanup = true - e.resetConnectionLocked(err) - // Lock released below. - epilogue() - return nil } - e.mu.Lock() } state := e.EndpointState() diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go index a72f0c379..e18012ac0 100644 --- a/pkg/tcpip/transport/tcp/dispatcher.go +++ b/pkg/tcpip/transport/tcp/dispatcher.go @@ -119,6 +119,12 @@ func (p *processor) handleSegments() { // direct delivery to ensure low latency and avoid // scheduler interactions. if err := ep.handleSegments(true /* fastPath */); err != nil || ep.EndpointState() == StateClose { + // Send any active resets if required. + if err != nil { + ep.mu.Lock() + ep.resetConnectionLocked(err) + ep.mu.Unlock() + } ep.notifyProtocolGoroutine(notifyTickleWorker) ep.workMu.Unlock() continue diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 1799c6e10..4797f11d1 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -2019,18 +2019,13 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error { // If we're fully closed and we have unread data we need to abort // the connection with a RST. if (e.shutdownFlags&tcpip.ShutdownWrite) != 0 && rcvBufUsed > 0 { - // Move the socket to error state immediately. - // This is done redundantly because in case of - // save/restore on a Shutdown/Close() the socket - // state needs to indicate the error otherwise - // save file will show the socket in established - // state even though snd/rcv are closed. e.mu.Unlock() // Try to send an active reset immediately if the // work mutex is available. if e.workMu.TryLock() { e.mu.Lock() e.resetConnectionLocked(tcpip.ErrConnectionAborted) + e.notifyProtocolGoroutine(notifyTickleWorker) e.mu.Unlock() e.workMu.Unlock() } else { |