diff options
author | Ghanan Gowripalan <ghanan@google.com> | 2021-02-06 12:42:25 -0800 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2021-02-06 12:44:15 -0800 |
commit | 4943347137dd09cd47b22b5998f8823e0bb04de6 (patch) | |
tree | dbe8d424c2575ed866b6f30201985a4543eed34a /pkg | |
parent | a83c8585aff55fb5fa2b8b61bf8e1c79d23d59bf (diff) |
Use fine grained locks while sending NUD probes
Previously when sending probe messages, we would hold a shared lock
which lead to deadlocks (due to synchronous packet loooping (e.g. pipe
and loopback link endpoints)) and lock contention.
Writing packets may be an expensive operation which could prevent other
goroutines from doing meaningful work if a shared lock is held while
writing packets.
This change upates the NUD timers to not hold shared locks while
sending packets.
PiperOrigin-RevId: 356048697
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/tcpip/stack/neighbor_entry.go | 252 |
1 files changed, 156 insertions, 96 deletions
diff --git a/pkg/tcpip/stack/neighbor_entry.go b/pkg/tcpip/stack/neighbor_entry.go index 3dba6f7e9..78c6cb681 100644 --- a/pkg/tcpip/stack/neighbor_entry.go +++ b/pkg/tcpip/stack/neighbor_entry.go @@ -70,6 +70,13 @@ const ( Failed ) +type timer struct { + // done indicates to the timer that the timer was stopped. + done *bool + + timer tcpip.Timer +} + // neighborEntry implements a neighbor entry's individual node behavior, as per // RFC 4861 section 7.3.3. Neighbor Unreachability Detection operates in // parallel with the sending of packets to a neighbor, necessitating the @@ -95,7 +102,8 @@ type neighborEntry struct { onResolve []func(LinkResolutionResult) isRouter bool - job *tcpip.Job + + timer timer } } @@ -138,6 +146,20 @@ func newStaticNeighborEntry(cache *neighborCache, addr tcpip.Address, linkAddr t return n } +type remainingCounter struct { + mu struct { + sync.Mutex + + remaining uint32 + } +} + +func (r *remainingCounter) init(max uint32) { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.remaining = max +} + // notifyCompletionLocked notifies those waiting for address resolution, with // the link address if resolution completed successfully. // @@ -192,13 +214,16 @@ func (e *neighborEntry) dispatchRemoveEventLocked() { } } -// cancelJobLocked cancels the currently scheduled action, if there is one. +// cancelTimerLocked cancels the currently scheduled action, if there is one. // Entries in Unknown, Stale, or Static state do not have a scheduled action. // // Precondition: e.mu MUST be locked. -func (e *neighborEntry) cancelJobLocked() { - if job := e.mu.job; job != nil { - job.Cancel() +func (e *neighborEntry) cancelTimerLocked() { + if e.mu.timer.timer != nil { + e.mu.timer.timer.Stop() + *e.mu.timer.done = true + + e.mu.timer = timer{} } } @@ -208,7 +233,7 @@ func (e *neighborEntry) cancelJobLocked() { func (e *neighborEntry) removeLocked() { e.mu.neigh.UpdatedAtNanos = e.cache.nic.stack.clock.NowNanoseconds() e.dispatchRemoveEventLocked() - e.cancelJobLocked() + e.cancelTimerLocked() e.notifyCompletionLocked(false /* succeeded */) } @@ -218,7 +243,7 @@ func (e *neighborEntry) removeLocked() { // // Precondition: e.mu MUST be locked. func (e *neighborEntry) setStateLocked(next NeighborState) { - e.cancelJobLocked() + e.cancelTimerLocked() prev := e.mu.neigh.State e.mu.neigh.State = next @@ -230,47 +255,90 @@ func (e *neighborEntry) setStateLocked(next NeighborState) { panic(fmt.Sprintf("should never transition to Incomplete with setStateLocked; neigh = %#v, prev state = %s", e.mu.neigh, prev)) case Reachable: - e.mu.job = e.cache.nic.stack.newJob(&e.mu, func() { - e.setStateLocked(Stale) - e.dispatchChangeEventLocked() - }) - e.mu.job.Schedule(e.nudState.ReachableTime()) + // Protected by e.mu. + done := false + + e.mu.timer = timer{ + done: &done, + timer: e.cache.nic.stack.Clock().AfterFunc(e.nudState.ReachableTime(), func() { + e.mu.Lock() + defer e.mu.Unlock() + + if done { + // The timer was stopped because the entry changed state. + return + } + + e.setStateLocked(Stale) + e.dispatchChangeEventLocked() + }), + } case Delay: - e.mu.job = e.cache.nic.stack.newJob(&e.mu, func() { - e.setStateLocked(Probe) - e.dispatchChangeEventLocked() - }) - e.mu.job.Schedule(config.DelayFirstProbeTime) + // Protected by e.mu. + done := false + + e.mu.timer = timer{ + done: &done, + timer: e.cache.nic.stack.Clock().AfterFunc(config.DelayFirstProbeTime, func() { + e.mu.Lock() + defer e.mu.Unlock() + + if done { + // The timer was stopped because the entry changed state. + return + } - case Probe: - var retryCounter uint32 - var sendUnicastProbe func() - - sendUnicastProbe = func() { - if retryCounter == config.MaxUnicastProbes { - e.dispatchRemoveEventLocked() - e.setStateLocked(Failed) - return - } + e.setStateLocked(Probe) + e.dispatchChangeEventLocked() + }), + } - if err := e.cache.linkRes.LinkAddressRequest(e.mu.neigh.Addr, "" /* localAddr */, e.mu.neigh.LinkAddr); err != nil { - e.dispatchRemoveEventLocked() - e.setStateLocked(Failed) - return - } + case Probe: + // Protected by e.mu. + done := false - retryCounter++ - e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendUnicastProbe) - e.mu.job.Schedule(config.RetransmitTimer) - } + var remaining remainingCounter + remaining.init(config.MaxUnicastProbes) + addr := e.mu.neigh.Addr + linkAddr := e.mu.neigh.LinkAddr // Send a probe in another gorountine to free this thread of execution - // for finishing the state transition. This is necessary to avoid - // deadlock where sending and processing probes are done synchronously, - // such as loopback and integration tests. - e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendUnicastProbe) - e.mu.job.Schedule(immediateDuration) + // for finishing the state transition. This is necessary to escape the + // currently held lock so we can send the probe message without holding + // a shared lock. + e.mu.timer = timer{ + done: &done, + timer: e.cache.nic.stack.Clock().AfterFunc(0, func() { + // Okay to hold this lock while writing packets since we use a different + // lock per probe timer so there will not be any lock contention. + remaining.mu.Lock() + defer remaining.mu.Unlock() + + var err tcpip.Error + timedoutResolution := remaining.mu.remaining == 0 + if !timedoutResolution { + err = e.cache.linkRes.LinkAddressRequest(addr, "" /* localAddr */, linkAddr) + } + + e.mu.Lock() + defer e.mu.Unlock() + + if done { + // The timer was stopped because the entry changed state. + return + } + + if timedoutResolution || err != nil { + e.dispatchRemoveEventLocked() + e.setStateLocked(Failed) + return + } + + remaining.mu.remaining-- + e.mu.timer.timer.Reset(config.RetransmitTimer) + }), + } case Failed: e.notifyCompletionLocked(false /* succeeded */) @@ -300,66 +368,58 @@ func (e *neighborEntry) handlePacketQueuedLocked(localAddr tcpip.Address) { e.mu.neigh.UpdatedAtNanos = e.cache.nic.stack.clock.NowNanoseconds() e.dispatchAddEventLocked() - config := e.nudState.Config() - var retryCounter uint32 - var sendMulticastProbe func() - - sendMulticastProbe = func() { - if retryCounter == config.MaxMulticastProbes { - // "If no Neighbor Advertisement is received after - // MAX_MULTICAST_SOLICIT solicitations, address resolution has failed. - // The sender MUST return ICMP destination unreachable indications with - // code 3 (Address Unreachable) for each packet queued awaiting address - // resolution." - RFC 4861 section 7.2.2 - // - // There is no need to send an ICMP destination unreachable indication - // since the failure to resolve the address is expected to only occur - // on this node. Thus, redirecting traffic is currently not supported. - // - // "If the error occurs on a node other than the node originating the - // packet, an ICMP error message is generated. If the error occurs on - // the originating node, an implementation is not required to actually - // create and send an ICMP error packet to the source, as long as the - // upper-layer sender is notified through an appropriate mechanism - // (e.g. return value from a procedure call). Note, however, that an - // implementation may find it convenient in some cases to return errors - // to the sender by taking the offending packet, generating an ICMP - // error message, and then delivering it (locally) through the generic - // error-handling routines." - RFC 4861 section 2.1 - e.dispatchRemoveEventLocked() - e.setStateLocked(Failed) - return - } + // Protected by e.mu. + done := false - // As per RFC 4861 section 7.2.2: - // - // If the source address of the packet prompting the solicitation is the - // same as one of the addresses assigned to the outgoing interface, that - // address SHOULD be placed in the IP Source Address of the outgoing - // solicitation. - // - if err := e.cache.linkRes.LinkAddressRequest(e.mu.neigh.Addr, localAddr, ""); err != nil { - // There is no need to log the error here; the NUD implementation may - // assume a working link. A valid link should be the responsibility of - // the NIC/stack.LinkEndpoint. - e.dispatchRemoveEventLocked() - e.setStateLocked(Failed) - return - } - - retryCounter++ - e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendMulticastProbe) - e.mu.job.Schedule(config.RetransmitTimer) - } + var remaining remainingCounter + remaining.init(config.MaxMulticastProbes) + addr := e.mu.neigh.Addr // Send a probe in another gorountine to free this thread of execution - // for finishing the state transition. This is necessary to avoid - // deadlock where sending and processing probes are done synchronously, - // such as loopback and integration tests. - e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendMulticastProbe) - e.mu.job.Schedule(immediateDuration) + // for finishing the state transition. This is necessary to escape the + // currently held lock so we can send the probe message without holding + // a shared lock. + e.mu.timer = timer{ + done: &done, + timer: e.cache.nic.stack.Clock().AfterFunc(0, func() { + // Okay to hold this lock while writing packets since we use a different + // lock per probe timer so there will not be any lock contention. + remaining.mu.Lock() + defer remaining.mu.Unlock() + + var err tcpip.Error + timedoutResolution := remaining.mu.remaining == 0 + if !timedoutResolution { + // As per RFC 4861 section 7.2.2: + // + // If the source address of the packet prompting the solicitation is + // the same as one of the addresses assigned to the outgoing interface, + // that address SHOULD be placed in the IP Source Address of the + // outgoing solicitation. + // + err = e.cache.linkRes.LinkAddressRequest(addr, localAddr, "" /* linkAddr */) + } + + e.mu.Lock() + defer e.mu.Unlock() + + if done { + // The timer was stopped because the entry changed state. + return + } + + if timedoutResolution || err != nil { + e.dispatchRemoveEventLocked() + e.setStateLocked(Failed) + return + } + + remaining.mu.remaining-- + e.mu.timer.timer.Reset(config.RetransmitTimer) + }), + } case Stale: e.setStateLocked(Delay) |