diff options
-rw-r--r-- | server/server.go | 2 | ||||
-rw-r--r-- | server/zclient.go | 193 |
2 files changed, 164 insertions, 31 deletions
diff --git a/server/server.go b/server/server.go index 5b644dfe..31ebd16c 100644 --- a/server/server.go +++ b/server/server.go @@ -981,7 +981,7 @@ func (s *BgpServer) StartZebraClient(c *config.ZebraConfig) error { protos = append(protos, string(p)) } var err error - s.zclient, err = newZebraClient(s, c.Url, protos, c.Version) + s.zclient, err = newZebraClient(s, c.Url, protos, c.Version, c.NexthopTriggerEnable, c.NexthopTriggerDelay) return err }, false) } diff --git a/server/zclient.go b/server/zclient.go index 537da7b6..4bba83ec 100644 --- a/server/zclient.go +++ b/server/zclient.go @@ -30,15 +30,37 @@ import ( type pathList []*table.Path -type registeredNexthopCache []*net.IP +type nexthopTrackingManager struct { + dead chan struct{} + nexthopCache []*net.IP + server *BgpServer + delay int + isScheduled bool + scheduledPathList map[string]pathList + trigger chan struct{} + pathListCh chan pathList +} + +func newNexthopTrackingManager(server *BgpServer, delay int) *nexthopTrackingManager { + return &nexthopTrackingManager{ + dead: make(chan struct{}), + nexthopCache: make([]*net.IP, 0), + server: server, + delay: delay, + scheduledPathList: make(map[string]pathList, 0), + trigger: make(chan struct{}), + pathListCh: make(chan pathList), + } +} -func newRegisteredNexthopCache() *registeredNexthopCache { - cache := make(registeredNexthopCache, 0) - return &cache +func (s *nexthopTrackingManager) stop() { + close(s.pathListCh) + close(s.trigger) + close(s.dead) } -func (c *registeredNexthopCache) isRegistered(nexthop net.IP) bool { - for _, cached := range *c { +func (m *nexthopTrackingManager) isRegisteredNexthop(nexthop net.IP) bool { + for _, cached := range m.nexthopCache { if cached.Equal(nexthop) { return true } @@ -46,15 +68,115 @@ func (c *registeredNexthopCache) isRegistered(nexthop net.IP) bool { return false } -func (c *registeredNexthopCache) register(nexthop net.IP) bool { - cache := *c - if c.isRegistered(nexthop) { +func (m *nexthopTrackingManager) registerNexthop(nexthop net.IP) bool { + if m.isRegisteredNexthop(nexthop) { return false } - *c = append(cache, &nexthop) + m.nexthopCache = append(m.nexthopCache, &nexthop) return true } +func (m *nexthopTrackingManager) appendPathList(paths pathList) { + if len(paths) == 0 { + return + } + path := paths[0] + + m.scheduledPathList[path.GetNexthop().String()] = paths +} + +func (m *nexthopTrackingManager) calculateDelay(penalty int) int { + if penalty <= 950 { + return m.delay + } + + delay := 8 + for penalty > 950 { + delay += 8 + penalty /= 2 + } + return delay +} + +func (m *nexthopTrackingManager) triggerUpdatePathAfter(delay int) { + time.Sleep(time.Duration(delay) * time.Second) + + m.trigger <- struct{}{} +} + +func (m *nexthopTrackingManager) loop() { + t := time.NewTicker(8 * time.Second) + defer t.Stop() + + penalty := 0 + + for { + select { + case <-m.dead: + return + + case <-t.C: + penalty /= 2 + + case paths := <-m.pathListCh: + penalty += 500 + log.WithFields(log.Fields{ + "Topic": "Zebra", + "Event": "Nexthop Tracking", + }).Debug("penalty 500 chrged: penalty: %d", penalty) + + m.appendPathList(paths) + + isScheduled := m.isScheduled + if isScheduled { + log.WithFields(log.Fields{ + "Topic": "Zebra", + "Event": "Nexthop Tracking", + }).Debug("nexthop tracking event already scheduled") + continue + } else { + m.isScheduled = true + } + + delay := m.calculateDelay(penalty) + go m.triggerUpdatePathAfter(delay) + log.WithFields(log.Fields{ + "Topic": "Zebra", + "Event": "Nexthop Tracking", + }).Debug("nexthop tracking event scheduled in %d secs", delay) + + case <-m.trigger: + paths := make(pathList, 0) + for _, pList := range m.scheduledPathList { + for _, p := range pList { + paths = append(paths, p) + } + } + log.WithFields(log.Fields{ + "Topic": "Zebra", + "Event": "Nexthop Tracking", + }).Debug("update nexthop reachability: %s", paths) + + if err := m.server.UpdatePath("", paths); err != nil { + log.WithFields(log.Fields{ + "Topic": "Zebra", + "Event": "Nexthop Tracking", + }).Error("failed to update nexthop reachability") + } + + m.isScheduled = false + m.scheduledPathList = make(map[string]pathList, 0) + } + } +} + +func (m *nexthopTrackingManager) scheduleUpdate(paths pathList) { + if len(paths) == 0 { + return + } + m.pathListCh <- paths +} + func filterOutNilPath(paths pathList) pathList { filteredPaths := make(pathList, 0, len(paths)) for _, path := range paths { @@ -154,10 +276,10 @@ func newIPRouteMessage(dst pathList, version uint8, vrfId uint16) *zebra.Message } } -func newNexthopRegisterMessage(dst pathList, version uint8, vrfId uint16, nexthopCache *registeredNexthopCache) *zebra.Message { +func newNexthopRegisterMessage(dst pathList, version uint8, vrfId uint16, nhtManager *nexthopTrackingManager) *zebra.Message { // Note: NEXTHOP_REGISTER and NEXTHOP_UNREGISTER messages are not // supported in Zebra protocol version<3. - if version < 3 { + if version < 3 || nhtManager == nil { return nil } @@ -185,7 +307,7 @@ func newNexthopRegisterMessage(dst pathList, version uint8, vrfId uint16, nextho // - already registered // - already invalidated // - an unspecified address - if nexthopCache.isRegistered(nexthop) || p.IsNexthopInvalid || nexthop.IsUnspecified() { + if nhtManager.isRegisteredNexthop(nexthop) || p.IsNexthopInvalid || nexthop.IsUnspecified() { continue } @@ -205,7 +327,7 @@ func newNexthopRegisterMessage(dst pathList, version uint8, vrfId uint16, nextho return nil } nexthops = append(nexthops, nh) - nexthopCache.register(nexthop) + nhtManager.registerNexthop(nexthop) } // If no nexthop needs to be registered or unregistered, @@ -319,9 +441,10 @@ func createPathListFromNexthopUpdateMessage(m *zebra.Message, manager *table.Tab } type zebraClient struct { - client *zebra.Client - server *BgpServer - dead chan struct{} + client *zebra.Client + server *BgpServer + dead chan struct{} + nhtManager *nexthopTrackingManager } func (z *zebraClient) stop() { @@ -330,9 +453,12 @@ func (z *zebraClient) stop() { func (z *zebraClient) loop() { w := z.server.Watch(WatchBestPath(true)) - defer func() { w.Stop() }() + defer w.Stop() - nexthopCache := newRegisteredNexthopCache() + if z.nhtManager != nil { + go z.nhtManager.loop() + defer z.nhtManager.stop() + } for { select { @@ -347,11 +473,13 @@ func (z *zebraClient) loop() { } } case *zebra.NexthopUpdateBody: - body := msg.Body.(*zebra.NexthopUpdateBody) - if paths, err := createPathListFromNexthopUpdateMessage(msg, z.server.globalRib); err != nil { - log.Errorf("failed to create updated path list related to nexthop %s", body.Prefix.String()) - } else if err = z.server.UpdatePath("", paths); err != nil { - log.Errorf("failed to update path related to nexthop %s", body.Prefix.String()) + if z.nhtManager != nil { + body := msg.Body.(*zebra.NexthopUpdateBody) + if paths, err := createPathListFromNexthopUpdateMessage(msg, z.server.globalRib); err != nil { + log.Errorf("failed to create updated path list related to nexthop %s", body.Prefix.String()) + } else { + z.nhtManager.scheduleUpdate(paths) + } } } case ev := <-w.Event(): @@ -361,7 +489,7 @@ func (z *zebraClient) loop() { if m := newIPRouteMessage(dst, z.client.Version, 0); m != nil { z.client.Send(m) } - if m := newNexthopRegisterMessage(dst, z.client.Version, 0, nexthopCache); m != nil { + if m := newNexthopRegisterMessage(dst, z.client.Version, 0, z.nhtManager); m != nil { z.client.Send(m) } } @@ -374,7 +502,7 @@ func (z *zebraClient) loop() { if m := newIPRouteMessage(pathList{path}, z.client.Version, i); m != nil { z.client.Send(m) } - if m := newNexthopRegisterMessage(pathList{path}, z.client.Version, i, nexthopCache); m != nil { + if m := newNexthopRegisterMessage(pathList{path}, z.client.Version, i, z.nhtManager); m != nil { z.client.Send(m) } } @@ -384,7 +512,7 @@ func (z *zebraClient) loop() { } } -func newZebraClient(s *BgpServer, url string, protos []string, version uint8) (*zebraClient, error) { +func newZebraClient(s *BgpServer, url string, protos []string, version uint8, nhtEnable bool, nhtDelay uint8) (*zebraClient, error) { l := strings.SplitN(url, ":", 2) if len(l) != 2 { return nil, fmt.Errorf("unsupported url: %s", url) @@ -403,10 +531,15 @@ func newZebraClient(s *BgpServer, url string, protos []string, version uint8) (* } cli.SendRedistribute(t, zebra.VRF_DEFAULT) } + var nhtManager *nexthopTrackingManager = nil + if nhtEnable { + nhtManager = newNexthopTrackingManager(s, int(nhtDelay)) + } w := &zebraClient{ - dead: make(chan struct{}), - client: cli, - server: s, + dead: make(chan struct{}), + client: cli, + server: s, + nhtManager: nhtManager, } go w.loop() return w, nil |