summaryrefslogtreecommitdiffhomepage
path: root/server/monitor.go
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2016-05-24 05:47:52 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-06-06 12:43:20 +0900
commitaca6fd6ad4409b4cb63682bff3c79fca8ca2800d (patch)
treeeb91718c87ddcdaa0d2133f3aaccfee6dbe7f7a8 /server/monitor.go
parent10746e5f4b303aba553c2bb759afe3a8d4ffe3aa (diff)
server: refactoring for monitorbestchanged api. use watcher infra
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/monitor.go')
-rw-r--r--server/monitor.go124
1 files changed, 75 insertions, 49 deletions
diff --git a/server/monitor.go b/server/monitor.go
index ac9c775b..c7236fed 100644
--- a/server/monitor.go
+++ b/server/monitor.go
@@ -22,99 +22,125 @@ import (
"gopkg.in/tomb.v2"
)
-type grpcIncomingWatcher struct {
+type grpcWatcher struct {
t tomb.Tomb
ch chan watcherEvent
ctlCh chan *GrpcRequest
- reqs []*GrpcRequest
+ reqs map[watcherEventType][]*GrpcRequest
}
-func (w *grpcIncomingWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG {
+func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent {
+ if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG {
return w.ch
}
return nil
}
-func (w *grpcIncomingWatcher) stop() {
+func (w *grpcWatcher) stop() {
w.t.Kill(nil)
}
-func (w *grpcIncomingWatcher) watchingEventTypes() []watcherEventType {
- pre := false
- post := false
- for _, req := range w.reqs {
- if req.Data.(*api.Table).PostPolicy {
- post = true
- } else {
- pre = true
+func (w *grpcWatcher) watchingEventTypes() []watcherEventType {
+ types := make([]watcherEventType, 0, 3)
+ for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE} {
+ if len(w.reqs[t]) > 0 {
+ types = append(types, t)
}
}
- types := make([]watcherEventType, 0, 2)
- if pre {
- types = append(types, WATCHER_EVENT_UPDATE_MSG)
- }
- if post {
- types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG)
- }
return types
}
-func (w *grpcIncomingWatcher) loop() error {
+func (w *grpcWatcher) loop() error {
for {
select {
case <-w.t.Dying():
- for _, req := range w.reqs {
- close(req.ResponseCh)
+ for _, rs := range w.reqs {
+ for _, req := range rs {
+ close(req.ResponseCh)
+ }
}
return nil
case req := <-w.ctlCh:
- w.reqs = append(w.reqs, req)
- case ev := <-w.ch:
- msg := ev.(*watcherEventUpdateMsg)
- for _, path := range msg.pathList {
- remains := make([]*GrpcRequest, 0, len(w.reqs))
- result := &GrpcResponse{
- Data: &api.Destination{
- Prefix: path.GetNlri().String(),
- Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)},
- },
+ tbl := req.Data.(*api.Table)
+ var reqType watcherEventType
+ switch tbl.Type {
+ case api.Resource_GLOBAL:
+ reqType = WATCHER_EVENT_BESTPATH_CHANGE
+ case api.Resource_ADJ_IN:
+ if tbl.PostPolicy {
+ reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG
+ } else {
+ reqType = WATCHER_EVENT_UPDATE_MSG
}
- for _, req := range w.reqs {
- select {
- case <-req.EndCh:
+ default:
+ continue
+ }
+ reqs := w.reqs[reqType]
+ if reqs == nil {
+ reqs = make([]*GrpcRequest, 0, 16)
+ }
+ reqs = append(reqs, req)
+ w.reqs[reqType] = reqs
+ case ev := <-w.ch:
+ sendPaths := func(reqType watcherEventType, paths []*table.Path) {
+ for _, path := range paths {
+ if path == nil {
continue
- default:
}
- remains = append(remains, req)
- if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() {
- continue
+ remains := make([]*GrpcRequest, 0, len(w.reqs[reqType]))
+ result := &GrpcResponse{
+ Data: &api.Destination{
+ Prefix: path.GetNlri().String(),
+ Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)},
+ },
}
- if req.Name != "" && req.Name != path.GetSource().Address.String() {
- continue
+ for _, req := range w.reqs[reqType] {
+ select {
+ case <-req.EndCh:
+ continue
+ default:
+ }
+ remains = append(remains, req)
+ if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() {
+ continue
+ }
+ if req.Name != "" && req.Name != path.GetSource().Address.String() {
+ continue
+ }
+ req.ResponseCh <- result
}
- req.ResponseCh <- result
+ w.reqs[reqType] = remains
}
- w.reqs = remains
}
+ switch msg := ev.(type) {
+ case *watcherEventBestPathMsg:
+ sendPaths(WATCHER_EVENT_BESTPATH_CHANGE, msg.pathList)
+ case *watcherEventUpdateMsg:
+ if msg.postPolicy {
+ sendPaths(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, msg.pathList)
+ } else {
+ sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList)
+ }
+ }
+
}
}
}
-func (w *grpcIncomingWatcher) restart(string) error {
+func (w *grpcWatcher) restart(string) error {
return nil
}
-func (w *grpcIncomingWatcher) addRequest(req *GrpcRequest) error {
+func (w *grpcWatcher) addRequest(req *GrpcRequest) error {
w.ctlCh <- req
return nil
}
-func newGrpcIncomingWatcher() (*grpcIncomingWatcher, error) {
- w := &grpcIncomingWatcher{
+func newGrpcWatcher() (*grpcWatcher, error) {
+ w := &grpcWatcher{
ch: make(chan watcherEvent),
ctlCh: make(chan *GrpcRequest),
- reqs: make([]*GrpcRequest, 0, 16),
+ reqs: make(map[watcherEventType][]*GrpcRequest),
}
w.t.Go(w.loop)
return w, nil