diff options
-rw-r--r-- | pkg/tcpip/stack/neighbor_entry.go | 252 |
1 files changed, 156 insertions, 96 deletions
diff --git a/pkg/tcpip/stack/neighbor_entry.go b/pkg/tcpip/stack/neighbor_entry.go index 3dba6f7e9..78c6cb681 100644 --- a/pkg/tcpip/stack/neighbor_entry.go +++ b/pkg/tcpip/stack/neighbor_entry.go @@ -70,6 +70,13 @@ const ( Failed ) +type timer struct { + // done indicates to the timer that the timer was stopped. + done *bool + + timer tcpip.Timer +} + // neighborEntry implements a neighbor entry's individual node behavior, as per // RFC 4861 section 7.3.3. Neighbor Unreachability Detection operates in // parallel with the sending of packets to a neighbor, necessitating the @@ -95,7 +102,8 @@ type neighborEntry struct { onResolve []func(LinkResolutionResult) isRouter bool - job *tcpip.Job + + timer timer } } @@ -138,6 +146,20 @@ func newStaticNeighborEntry(cache *neighborCache, addr tcpip.Address, linkAddr t return n } +type remainingCounter struct { + mu struct { + sync.Mutex + + remaining uint32 + } +} + +func (r *remainingCounter) init(max uint32) { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.remaining = max +} + // notifyCompletionLocked notifies those waiting for address resolution, with // the link address if resolution completed successfully. // @@ -192,13 +214,16 @@ func (e *neighborEntry) dispatchRemoveEventLocked() { } } -// cancelJobLocked cancels the currently scheduled action, if there is one. +// cancelTimerLocked cancels the currently scheduled action, if there is one. // Entries in Unknown, Stale, or Static state do not have a scheduled action. // // Precondition: e.mu MUST be locked. -func (e *neighborEntry) cancelJobLocked() { - if job := e.mu.job; job != nil { - job.Cancel() +func (e *neighborEntry) cancelTimerLocked() { + if e.mu.timer.timer != nil { + e.mu.timer.timer.Stop() + *e.mu.timer.done = true + + e.mu.timer = timer{} } } @@ -208,7 +233,7 @@ func (e *neighborEntry) cancelJobLocked() { func (e *neighborEntry) removeLocked() { e.mu.neigh.UpdatedAtNanos = e.cache.nic.stack.clock.NowNanoseconds() e.dispatchRemoveEventLocked() - e.cancelJobLocked() + e.cancelTimerLocked() e.notifyCompletionLocked(false /* succeeded */) } @@ -218,7 +243,7 @@ func (e *neighborEntry) removeLocked() { // // Precondition: e.mu MUST be locked. func (e *neighborEntry) setStateLocked(next NeighborState) { - e.cancelJobLocked() + e.cancelTimerLocked() prev := e.mu.neigh.State e.mu.neigh.State = next @@ -230,47 +255,90 @@ func (e *neighborEntry) setStateLocked(next NeighborState) { panic(fmt.Sprintf("should never transition to Incomplete with setStateLocked; neigh = %#v, prev state = %s", e.mu.neigh, prev)) case Reachable: - e.mu.job = e.cache.nic.stack.newJob(&e.mu, func() { - e.setStateLocked(Stale) - e.dispatchChangeEventLocked() - }) - e.mu.job.Schedule(e.nudState.ReachableTime()) + // Protected by e.mu. + done := false + + e.mu.timer = timer{ + done: &done, + timer: e.cache.nic.stack.Clock().AfterFunc(e.nudState.ReachableTime(), func() { + e.mu.Lock() + defer e.mu.Unlock() + + if done { + // The timer was stopped because the entry changed state. + return + } + + e.setStateLocked(Stale) + e.dispatchChangeEventLocked() + }), + } case Delay: - e.mu.job = e.cache.nic.stack.newJob(&e.mu, func() { - e.setStateLocked(Probe) - e.dispatchChangeEventLocked() - }) - e.mu.job.Schedule(config.DelayFirstProbeTime) + // Protected by e.mu. + done := false + + e.mu.timer = timer{ + done: &done, + timer: e.cache.nic.stack.Clock().AfterFunc(config.DelayFirstProbeTime, func() { + e.mu.Lock() + defer e.mu.Unlock() + + if done { + // The timer was stopped because the entry changed state. + return + } - case Probe: - var retryCounter uint32 - var sendUnicastProbe func() - - sendUnicastProbe = func() { - if retryCounter == config.MaxUnicastProbes { - e.dispatchRemoveEventLocked() - e.setStateLocked(Failed) - return - } + e.setStateLocked(Probe) + e.dispatchChangeEventLocked() + }), + } - if err := e.cache.linkRes.LinkAddressRequest(e.mu.neigh.Addr, "" /* localAddr */, e.mu.neigh.LinkAddr); err != nil { - e.dispatchRemoveEventLocked() - e.setStateLocked(Failed) - return - } + case Probe: + // Protected by e.mu. + done := false - retryCounter++ - e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendUnicastProbe) - e.mu.job.Schedule(config.RetransmitTimer) - } + var remaining remainingCounter + remaining.init(config.MaxUnicastProbes) + addr := e.mu.neigh.Addr + linkAddr := e.mu.neigh.LinkAddr // Send a probe in another gorountine to free this thread of execution - // for finishing the state transition. This is necessary to avoid - // deadlock where sending and processing probes are done synchronously, - // such as loopback and integration tests. - e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendUnicastProbe) - e.mu.job.Schedule(immediateDuration) + // for finishing the state transition. This is necessary to escape the + // currently held lock so we can send the probe message without holding + // a shared lock. + e.mu.timer = timer{ + done: &done, + timer: e.cache.nic.stack.Clock().AfterFunc(0, func() { + // Okay to hold this lock while writing packets since we use a different + // lock per probe timer so there will not be any lock contention. + remaining.mu.Lock() + defer remaining.mu.Unlock() + + var err tcpip.Error + timedoutResolution := remaining.mu.remaining == 0 + if !timedoutResolution { + err = e.cache.linkRes.LinkAddressRequest(addr, "" /* localAddr */, linkAddr) + } + + e.mu.Lock() + defer e.mu.Unlock() + + if done { + // The timer was stopped because the entry changed state. + return + } + + if timedoutResolution || err != nil { + e.dispatchRemoveEventLocked() + e.setStateLocked(Failed) + return + } + + remaining.mu.remaining-- + e.mu.timer.timer.Reset(config.RetransmitTimer) + }), + } case Failed: e.notifyCompletionLocked(false /* succeeded */) @@ -300,66 +368,58 @@ func (e *neighborEntry) handlePacketQueuedLocked(localAddr tcpip.Address) { e.mu.neigh.UpdatedAtNanos = e.cache.nic.stack.clock.NowNanoseconds() e.dispatchAddEventLocked() - config := e.nudState.Config() - var retryCounter uint32 - var sendMulticastProbe func() - - sendMulticastProbe = func() { - if retryCounter == config.MaxMulticastProbes { - // "If no Neighbor Advertisement is received after - // MAX_MULTICAST_SOLICIT solicitations, address resolution has failed. - // The sender MUST return ICMP destination unreachable indications with - // code 3 (Address Unreachable) for each packet queued awaiting address - // resolution." - RFC 4861 section 7.2.2 - // - // There is no need to send an ICMP destination unreachable indication - // since the failure to resolve the address is expected to only occur - // on this node. Thus, redirecting traffic is currently not supported. - // - // "If the error occurs on a node other than the node originating the - // packet, an ICMP error message is generated. If the error occurs on - // the originating node, an implementation is not required to actually - // create and send an ICMP error packet to the source, as long as the - // upper-layer sender is notified through an appropriate mechanism - // (e.g. return value from a procedure call). Note, however, that an - // implementation may find it convenient in some cases to return errors - // to the sender by taking the offending packet, generating an ICMP - // error message, and then delivering it (locally) through the generic - // error-handling routines." - RFC 4861 section 2.1 - e.dispatchRemoveEventLocked() - e.setStateLocked(Failed) - return - } + // Protected by e.mu. + done := false - // As per RFC 4861 section 7.2.2: - // - // If the source address of the packet prompting the solicitation is the - // same as one of the addresses assigned to the outgoing interface, that - // address SHOULD be placed in the IP Source Address of the outgoing - // solicitation. - // - if err := e.cache.linkRes.LinkAddressRequest(e.mu.neigh.Addr, localAddr, ""); err != nil { - // There is no need to log the error here; the NUD implementation may - // assume a working link. A valid link should be the responsibility of - // the NIC/stack.LinkEndpoint. - e.dispatchRemoveEventLocked() - e.setStateLocked(Failed) - return - } - - retryCounter++ - e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendMulticastProbe) - e.mu.job.Schedule(config.RetransmitTimer) - } + var remaining remainingCounter + remaining.init(config.MaxMulticastProbes) + addr := e.mu.neigh.Addr // Send a probe in another gorountine to free this thread of execution - // for finishing the state transition. This is necessary to avoid - // deadlock where sending and processing probes are done synchronously, - // such as loopback and integration tests. - e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendMulticastProbe) - e.mu.job.Schedule(immediateDuration) + // for finishing the state transition. This is necessary to escape the + // currently held lock so we can send the probe message without holding + // a shared lock. + e.mu.timer = timer{ + done: &done, + timer: e.cache.nic.stack.Clock().AfterFunc(0, func() { + // Okay to hold this lock while writing packets since we use a different + // lock per probe timer so there will not be any lock contention. + remaining.mu.Lock() + defer remaining.mu.Unlock() + + var err tcpip.Error + timedoutResolution := remaining.mu.remaining == 0 + if !timedoutResolution { + // As per RFC 4861 section 7.2.2: + // + // If the source address of the packet prompting the solicitation is + // the same as one of the addresses assigned to the outgoing interface, + // that address SHOULD be placed in the IP Source Address of the + // outgoing solicitation. + // + err = e.cache.linkRes.LinkAddressRequest(addr, localAddr, "" /* linkAddr */) + } + + e.mu.Lock() + defer e.mu.Unlock() + + if done { + // The timer was stopped because the entry changed state. + return + } + + if timedoutResolution || err != nil { + e.dispatchRemoveEventLocked() + e.setStateLocked(Failed) + return + } + + remaining.mu.remaining-- + e.mu.timer.timer.Reset(config.RetransmitTimer) + }), + } case Stale: e.setStateLocked(Delay) |