summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip
diff options
context:
space:
mode:
authorTamir Duberstein <tamird@google.com>2018-10-23 08:15:15 -0700
committerShentubot <shentubot@google.com>2018-10-23 08:16:13 -0700
commit692df85673e2398a1a9393029b2a9b72448619f4 (patch)
tree9ec805b46b89d175cd3b918ac2d9aac5ccbd24ae /pkg/tcpip
parent75cd70ecc9abfd5daaefea04da5070a0e0d620dd (diff)
Simplify channel management
The channels {cancel,resCh} have roughly the same lifetime and are used for roughly the same purpose as an entry's waiters; we can unify the state management of the two mechanisms, while also reducing unncessary mutex locking and unlocking. Made some cosmetic changes while I'm here. PiperOrigin-RevId: 218343915 Change-Id: Ic69546a2b7b390162b2231f07f335dd6199472d7
Diffstat (limited to 'pkg/tcpip')
-rw-r--r--pkg/tcpip/stack/linkaddrcache.go131
1 files changed, 47 insertions, 84 deletions
diff --git a/pkg/tcpip/stack/linkaddrcache.go b/pkg/tcpip/stack/linkaddrcache.go
index cb7b7116b..40e4bdb4a 100644
--- a/pkg/tcpip/stack/linkaddrcache.go
+++ b/pkg/tcpip/stack/linkaddrcache.go
@@ -77,7 +77,7 @@ func (s entryState) String() string {
case expired:
return "expired"
default:
- return fmt.Sprintf("invalid entryState: %d", s)
+ return fmt.Sprintf("unknown(%d)", s)
}
}
@@ -88,14 +88,12 @@ type linkAddrEntry struct {
linkAddr tcpip.LinkAddress
expiration time.Time
s entryState
- resDone bool
// wakers is a set of waiters for address resolution result. Anytime
// state transitions out of 'incomplete' these waiters are notified.
wakers map[*sleep.Waker]struct{}
- cancel chan struct{}
- resCh chan struct{}
+ done chan struct{}
}
func (e *linkAddrEntry) state() entryState {
@@ -117,13 +115,13 @@ func (e *linkAddrEntry) changeState(ns entryState) {
// All transitions are valid.
case ready, failed:
if ns != expired {
- panic(fmt.Sprintf("invalid state transition from %v to %v", e.s, ns))
+ panic(fmt.Sprintf("invalid state transition from %s to %s", e.s, ns))
}
case expired:
// Terminal state.
- panic(fmt.Sprintf("invalid state transition from %v to %v", e.s, ns))
+ panic(fmt.Sprintf("invalid state transition from %s to %s", e.s, ns))
default:
- panic(fmt.Sprintf("invalid state: %v", e.s))
+ panic(fmt.Sprintf("invalid state: %s", e.s))
}
// Notify whoever is waiting on address resolution when transitioning
@@ -133,6 +131,9 @@ func (e *linkAddrEntry) changeState(ns entryState) {
w.Assert()
}
e.wakers = nil
+ if e.done != nil {
+ close(e.done)
+ }
}
e.s = ns
}
@@ -150,8 +151,8 @@ func (c *linkAddrCache) add(k tcpip.FullAddress, v tcpip.LinkAddress) {
c.mu.Lock()
defer c.mu.Unlock()
- entry := c.cache[k]
- if entry != nil {
+ entry, ok := c.cache[k]
+ if ok {
s := entry.state()
if s != expired && entry.linkAddr == v {
// Disregard repeated calls.
@@ -183,28 +184,17 @@ func (c *linkAddrCache) makeAndAddEntry(k tcpip.FullAddress, v tcpip.LinkAddress
// Mark the soon-to-be-replaced entry as expired, just in case there is
// someone waiting for address resolution on it.
entry.changeState(expired)
- if entry.cancel != nil {
- if !entry.resDone {
- close(entry.resCh)
- }
- close(entry.cancel)
- }
*entry = linkAddrEntry{
addr: k,
linkAddr: v,
expiration: time.Now().Add(c.ageLimit),
- resDone: false,
wakers: make(map[*sleep.Waker]struct{}),
- cancel: make(chan struct{}, 1),
- resCh: make(chan struct{}, 1),
+ done: make(chan struct{}),
}
c.cache[k] = entry
- c.next++
- if c.next == len(c.entries) {
- c.next = 0
- }
+ c.next = (c.next + 1) % len(c.entries)
return entry
}
@@ -217,34 +207,34 @@ func (c *linkAddrCache) get(k tcpip.FullAddress, linkRes LinkAddressResolver, lo
}
c.mu.Lock()
- entry := c.cache[k]
- if entry == nil || entry.state() == expired {
- c.mu.Unlock()
- if linkRes == nil {
+ defer c.mu.Unlock()
+ if entry, ok := c.cache[k]; ok {
+ switch s := entry.state(); s {
+ case expired:
+ case ready:
+ return entry.linkAddr, nil, nil
+ case failed:
return "", nil, tcpip.ErrNoLinkAddress
+ case incomplete:
+ // Address resolution is still in progress.
+ entry.addWaker(waker)
+ return "", entry.done, tcpip.ErrWouldBlock
+ default:
+ panic(fmt.Sprintf("invalid cache entry state: %s", s))
}
-
- ch := c.startAddressResolution(k, linkRes, localAddr, linkEP, waker)
- return "", ch, tcpip.ErrWouldBlock
}
- defer c.mu.Unlock()
- switch s := entry.state(); s {
- case expired:
- // It's possible that entry expired between state() call above and here
- // in that case it's safe to consider it ready.
- fallthrough
- case ready:
- return entry.linkAddr, nil, nil
- case failed:
+ if linkRes == nil {
return "", nil, tcpip.ErrNoLinkAddress
- case incomplete:
- // Address resolution is still in progress.
- entry.addWaker(waker)
- return "", entry.resCh, tcpip.ErrWouldBlock
- default:
- panic(fmt.Sprintf("invalid cache entry state: %d", s))
}
+
+ // Add 'incomplete' entry in the cache to mark that resolution is in progress.
+ e := c.makeAndAddEntry(k, "")
+ e.addWaker(waker)
+
+ go c.startAddressResolution(k, linkRes, localAddr, linkEP, e.done) // S/R-SAFE: link non-savable; wakers dropped synchronously.
+
+ return "", e.done, tcpip.ErrWouldBlock
}
// removeWaker removes a waker previously added through get().
@@ -252,53 +242,26 @@ func (c *linkAddrCache) removeWaker(k tcpip.FullAddress, waker *sleep.Waker) {
c.mu.Lock()
defer c.mu.Unlock()
- if entry := c.cache[k]; entry != nil {
+ if entry, ok := c.cache[k]; ok {
entry.removeWaker(waker)
}
}
-func (c *linkAddrCache) startAddressResolution(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, waker *sleep.Waker) <-chan struct{} {
- c.mu.Lock()
- defer c.mu.Unlock()
+func (c *linkAddrCache) startAddressResolution(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, done <-chan struct{}) {
+ for i := 0; ; i++ {
+ // Send link request, then wait for the timeout limit and check
+ // whether the request succeeded.
+ linkRes.LinkAddressRequest(k.Addr, localAddr, linkEP)
- // Look up again with lock held to ensure entry wasn't added by someone else.
- if e := c.cache[k]; e != nil && e.state() != expired {
- return nil
- }
-
- // Add 'incomplete' entry in the cache to mark that resolution is in progress.
- e := c.makeAndAddEntry(k, "")
- e.addWaker(waker)
-
- go func() { // S/R-SAFE: link non-savable; wakers dropped synchronously.
- for i := 0; ; i++ {
- // Send link request, then wait for the timeout limit and check
- // whether the request succeeded.
- linkRes.LinkAddressRequest(k.Addr, localAddr, linkEP)
- c.mu.Lock()
- cancel := e.cancel
- c.mu.Unlock()
-
- select {
- case <-time.After(c.resolutionTimeout):
- if stop := c.checkLinkRequest(k, i); stop {
- // If entry is evicted then resCh is already closed.
- c.mu.Lock()
- if e, ok := c.cache[k]; ok {
- if !e.resDone {
- e.resDone = true
- close(e.resCh)
- }
- }
- c.mu.Unlock()
- return
- }
- case <-cancel:
+ select {
+ case <-time.After(c.resolutionTimeout):
+ if stop := c.checkLinkRequest(k, i); stop {
return
}
+ case <-done:
+ return
}
- }()
- return e.resCh
+ }
}
// checkLinkRequest checks whether previous attempt to resolve address has succeeded
@@ -327,7 +290,7 @@ func (c *linkAddrCache) checkLinkRequest(k tcpip.FullAddress, attempt int) bool
// No response yet, need to send another ARP request.
return false
default:
- panic(fmt.Sprintf("invalid cache entry state: %d", s))
+ panic(fmt.Sprintf("invalid cache entry state: %s", s))
}
}