diff options
author | Tamir Duberstein <tamird@google.com> | 2018-10-23 08:15:15 -0700 |
---|---|---|
committer | Shentubot <shentubot@google.com> | 2018-10-23 08:16:13 -0700 |
commit | 692df85673e2398a1a9393029b2a9b72448619f4 (patch) | |
tree | 9ec805b46b89d175cd3b918ac2d9aac5ccbd24ae /pkg | |
parent | 75cd70ecc9abfd5daaefea04da5070a0e0d620dd (diff) |
Simplify channel management
The channels {cancel,resCh} have roughly the same lifetime and are used for
roughly the same purpose as an entry's waiters; we can unify the state
management of the two mechanisms, while also reducing unncessary mutex locking
and unlocking.
Made some cosmetic changes while I'm here.
PiperOrigin-RevId: 218343915
Change-Id: Ic69546a2b7b390162b2231f07f335dd6199472d7
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/tcpip/stack/linkaddrcache.go | 131 |
1 files changed, 47 insertions, 84 deletions
diff --git a/pkg/tcpip/stack/linkaddrcache.go b/pkg/tcpip/stack/linkaddrcache.go index cb7b7116b..40e4bdb4a 100644 --- a/pkg/tcpip/stack/linkaddrcache.go +++ b/pkg/tcpip/stack/linkaddrcache.go @@ -77,7 +77,7 @@ func (s entryState) String() string { case expired: return "expired" default: - return fmt.Sprintf("invalid entryState: %d", s) + return fmt.Sprintf("unknown(%d)", s) } } @@ -88,14 +88,12 @@ type linkAddrEntry struct { linkAddr tcpip.LinkAddress expiration time.Time s entryState - resDone bool // wakers is a set of waiters for address resolution result. Anytime // state transitions out of 'incomplete' these waiters are notified. wakers map[*sleep.Waker]struct{} - cancel chan struct{} - resCh chan struct{} + done chan struct{} } func (e *linkAddrEntry) state() entryState { @@ -117,13 +115,13 @@ func (e *linkAddrEntry) changeState(ns entryState) { // All transitions are valid. case ready, failed: if ns != expired { - panic(fmt.Sprintf("invalid state transition from %v to %v", e.s, ns)) + panic(fmt.Sprintf("invalid state transition from %s to %s", e.s, ns)) } case expired: // Terminal state. - panic(fmt.Sprintf("invalid state transition from %v to %v", e.s, ns)) + panic(fmt.Sprintf("invalid state transition from %s to %s", e.s, ns)) default: - panic(fmt.Sprintf("invalid state: %v", e.s)) + panic(fmt.Sprintf("invalid state: %s", e.s)) } // Notify whoever is waiting on address resolution when transitioning @@ -133,6 +131,9 @@ func (e *linkAddrEntry) changeState(ns entryState) { w.Assert() } e.wakers = nil + if e.done != nil { + close(e.done) + } } e.s = ns } @@ -150,8 +151,8 @@ func (c *linkAddrCache) add(k tcpip.FullAddress, v tcpip.LinkAddress) { c.mu.Lock() defer c.mu.Unlock() - entry := c.cache[k] - if entry != nil { + entry, ok := c.cache[k] + if ok { s := entry.state() if s != expired && entry.linkAddr == v { // Disregard repeated calls. @@ -183,28 +184,17 @@ func (c *linkAddrCache) makeAndAddEntry(k tcpip.FullAddress, v tcpip.LinkAddress // Mark the soon-to-be-replaced entry as expired, just in case there is // someone waiting for address resolution on it. entry.changeState(expired) - if entry.cancel != nil { - if !entry.resDone { - close(entry.resCh) - } - close(entry.cancel) - } *entry = linkAddrEntry{ addr: k, linkAddr: v, expiration: time.Now().Add(c.ageLimit), - resDone: false, wakers: make(map[*sleep.Waker]struct{}), - cancel: make(chan struct{}, 1), - resCh: make(chan struct{}, 1), + done: make(chan struct{}), } c.cache[k] = entry - c.next++ - if c.next == len(c.entries) { - c.next = 0 - } + c.next = (c.next + 1) % len(c.entries) return entry } @@ -217,34 +207,34 @@ func (c *linkAddrCache) get(k tcpip.FullAddress, linkRes LinkAddressResolver, lo } c.mu.Lock() - entry := c.cache[k] - if entry == nil || entry.state() == expired { - c.mu.Unlock() - if linkRes == nil { + defer c.mu.Unlock() + if entry, ok := c.cache[k]; ok { + switch s := entry.state(); s { + case expired: + case ready: + return entry.linkAddr, nil, nil + case failed: return "", nil, tcpip.ErrNoLinkAddress + case incomplete: + // Address resolution is still in progress. + entry.addWaker(waker) + return "", entry.done, tcpip.ErrWouldBlock + default: + panic(fmt.Sprintf("invalid cache entry state: %s", s)) } - - ch := c.startAddressResolution(k, linkRes, localAddr, linkEP, waker) - return "", ch, tcpip.ErrWouldBlock } - defer c.mu.Unlock() - switch s := entry.state(); s { - case expired: - // It's possible that entry expired between state() call above and here - // in that case it's safe to consider it ready. - fallthrough - case ready: - return entry.linkAddr, nil, nil - case failed: + if linkRes == nil { return "", nil, tcpip.ErrNoLinkAddress - case incomplete: - // Address resolution is still in progress. - entry.addWaker(waker) - return "", entry.resCh, tcpip.ErrWouldBlock - default: - panic(fmt.Sprintf("invalid cache entry state: %d", s)) } + + // Add 'incomplete' entry in the cache to mark that resolution is in progress. + e := c.makeAndAddEntry(k, "") + e.addWaker(waker) + + go c.startAddressResolution(k, linkRes, localAddr, linkEP, e.done) // S/R-SAFE: link non-savable; wakers dropped synchronously. + + return "", e.done, tcpip.ErrWouldBlock } // removeWaker removes a waker previously added through get(). @@ -252,53 +242,26 @@ func (c *linkAddrCache) removeWaker(k tcpip.FullAddress, waker *sleep.Waker) { c.mu.Lock() defer c.mu.Unlock() - if entry := c.cache[k]; entry != nil { + if entry, ok := c.cache[k]; ok { entry.removeWaker(waker) } } -func (c *linkAddrCache) startAddressResolution(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, waker *sleep.Waker) <-chan struct{} { - c.mu.Lock() - defer c.mu.Unlock() +func (c *linkAddrCache) startAddressResolution(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, done <-chan struct{}) { + for i := 0; ; i++ { + // Send link request, then wait for the timeout limit and check + // whether the request succeeded. + linkRes.LinkAddressRequest(k.Addr, localAddr, linkEP) - // Look up again with lock held to ensure entry wasn't added by someone else. - if e := c.cache[k]; e != nil && e.state() != expired { - return nil - } - - // Add 'incomplete' entry in the cache to mark that resolution is in progress. - e := c.makeAndAddEntry(k, "") - e.addWaker(waker) - - go func() { // S/R-SAFE: link non-savable; wakers dropped synchronously. - for i := 0; ; i++ { - // Send link request, then wait for the timeout limit and check - // whether the request succeeded. - linkRes.LinkAddressRequest(k.Addr, localAddr, linkEP) - c.mu.Lock() - cancel := e.cancel - c.mu.Unlock() - - select { - case <-time.After(c.resolutionTimeout): - if stop := c.checkLinkRequest(k, i); stop { - // If entry is evicted then resCh is already closed. - c.mu.Lock() - if e, ok := c.cache[k]; ok { - if !e.resDone { - e.resDone = true - close(e.resCh) - } - } - c.mu.Unlock() - return - } - case <-cancel: + select { + case <-time.After(c.resolutionTimeout): + if stop := c.checkLinkRequest(k, i); stop { return } + case <-done: + return } - }() - return e.resCh + } } // checkLinkRequest checks whether previous attempt to resolve address has succeeded @@ -327,7 +290,7 @@ func (c *linkAddrCache) checkLinkRequest(k tcpip.FullAddress, attempt int) bool // No response yet, need to send another ARP request. return false default: - panic(fmt.Sprintf("invalid cache entry state: %d", s)) + panic(fmt.Sprintf("invalid cache entry state: %s", s)) } } |