diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2016-05-24 05:47:52 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-06-06 12:43:20 +0900 |
commit | aca6fd6ad4409b4cb63682bff3c79fca8ca2800d (patch) | |
tree | eb91718c87ddcdaa0d2133f3aaccfee6dbe7f7a8 /server/monitor.go | |
parent | 10746e5f4b303aba553c2bb759afe3a8d4ffe3aa (diff) |
server: refactoring for monitorbestchanged api. use watcher infra
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/monitor.go')
-rw-r--r-- | server/monitor.go | 124 |
1 files changed, 75 insertions, 49 deletions
diff --git a/server/monitor.go b/server/monitor.go index ac9c775b..c7236fed 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -22,99 +22,125 @@ import ( "gopkg.in/tomb.v2" ) -type grpcIncomingWatcher struct { +type grpcWatcher struct { t tomb.Tomb ch chan watcherEvent ctlCh chan *GrpcRequest - reqs []*GrpcRequest + reqs map[watcherEventType][]*GrpcRequest } -func (w *grpcIncomingWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG { +func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG { return w.ch } return nil } -func (w *grpcIncomingWatcher) stop() { +func (w *grpcWatcher) stop() { w.t.Kill(nil) } -func (w *grpcIncomingWatcher) watchingEventTypes() []watcherEventType { - pre := false - post := false - for _, req := range w.reqs { - if req.Data.(*api.Table).PostPolicy { - post = true - } else { - pre = true +func (w *grpcWatcher) watchingEventTypes() []watcherEventType { + types := make([]watcherEventType, 0, 3) + for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE} { + if len(w.reqs[t]) > 0 { + types = append(types, t) } } - types := make([]watcherEventType, 0, 2) - if pre { - types = append(types, WATCHER_EVENT_UPDATE_MSG) - } - if post { - types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG) - } return types } -func (w *grpcIncomingWatcher) loop() error { +func (w *grpcWatcher) loop() error { for { select { case <-w.t.Dying(): - for _, req := range w.reqs { - close(req.ResponseCh) + for _, rs := range w.reqs { + for _, req := range rs { + close(req.ResponseCh) + } } return nil case req := <-w.ctlCh: - w.reqs = append(w.reqs, req) - case ev := <-w.ch: - msg := ev.(*watcherEventUpdateMsg) - for _, path := range msg.pathList { - remains := make([]*GrpcRequest, 0, len(w.reqs)) - result := &GrpcResponse{ - Data: &api.Destination{ - Prefix: path.GetNlri().String(), - Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)}, - }, + tbl := req.Data.(*api.Table) + var reqType watcherEventType + switch tbl.Type { + case api.Resource_GLOBAL: + reqType = WATCHER_EVENT_BESTPATH_CHANGE + case api.Resource_ADJ_IN: + if tbl.PostPolicy { + reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG + } else { + reqType = WATCHER_EVENT_UPDATE_MSG } - for _, req := range w.reqs { - select { - case <-req.EndCh: + default: + continue + } + reqs := w.reqs[reqType] + if reqs == nil { + reqs = make([]*GrpcRequest, 0, 16) + } + reqs = append(reqs, req) + w.reqs[reqType] = reqs + case ev := <-w.ch: + sendPaths := func(reqType watcherEventType, paths []*table.Path) { + for _, path := range paths { + if path == nil { continue - default: } - remains = append(remains, req) - if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() { - continue + remains := make([]*GrpcRequest, 0, len(w.reqs[reqType])) + result := &GrpcResponse{ + Data: &api.Destination{ + Prefix: path.GetNlri().String(), + Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)}, + }, } - if req.Name != "" && req.Name != path.GetSource().Address.String() { - continue + for _, req := range w.reqs[reqType] { + select { + case <-req.EndCh: + continue + default: + } + remains = append(remains, req) + if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() { + continue + } + if req.Name != "" && req.Name != path.GetSource().Address.String() { + continue + } + req.ResponseCh <- result } - req.ResponseCh <- result + w.reqs[reqType] = remains } - w.reqs = remains } + switch msg := ev.(type) { + case *watcherEventBestPathMsg: + sendPaths(WATCHER_EVENT_BESTPATH_CHANGE, msg.pathList) + case *watcherEventUpdateMsg: + if msg.postPolicy { + sendPaths(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, msg.pathList) + } else { + sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList) + } + } + } } } -func (w *grpcIncomingWatcher) restart(string) error { +func (w *grpcWatcher) restart(string) error { return nil } -func (w *grpcIncomingWatcher) addRequest(req *GrpcRequest) error { +func (w *grpcWatcher) addRequest(req *GrpcRequest) error { w.ctlCh <- req return nil } -func newGrpcIncomingWatcher() (*grpcIncomingWatcher, error) { - w := &grpcIncomingWatcher{ +func newGrpcWatcher() (*grpcWatcher, error) { + w := &grpcWatcher{ ch: make(chan watcherEvent), ctlCh: make(chan *GrpcRequest), - reqs: make([]*GrpcRequest, 0, 16), + reqs: make(map[watcherEventType][]*GrpcRequest), } w.t.Go(w.loop) return w, nil |