diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2016-05-25 04:39:07 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-06-06 12:43:20 +0900 |
commit | b63e1c1fc3c40b58ba798bbae4e122f0eedaf55d (patch) | |
tree | 48356ad79cdc1d2a9deede9650a37556d4a86adf /server/monitor.go | |
parent | aca6fd6ad4409b4cb63682bff3c79fca8ca2800d (diff) |
server: refactor monitor/watcher infra
have watcherManager to manage all watchers
also merge grpc neighbor state monitoring handling to grpcWatcher
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/monitor.go')
-rw-r--r-- | server/monitor.go | 71 |
1 files changed, 56 insertions, 15 deletions
diff --git a/server/monitor.go b/server/monitor.go index c7236fed..e8e7df4d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -30,7 +30,7 @@ type grpcWatcher struct { } func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG { + if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE { return w.ch } return nil @@ -41,8 +41,8 @@ func (w *grpcWatcher) stop() { } func (w *grpcWatcher) watchingEventTypes() []watcherEventType { - types := make([]watcherEventType, 0, 3) - for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE} { + types := make([]watcherEventType, 0, 4) + for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE, WATCHER_EVENT_STATE_CHANGE} { if len(w.reqs[t]) > 0 { types = append(types, t) } @@ -61,19 +61,22 @@ func (w *grpcWatcher) loop() error { } return nil case req := <-w.ctlCh: - tbl := req.Data.(*api.Table) var reqType watcherEventType - switch tbl.Type { - case api.Resource_GLOBAL: - reqType = WATCHER_EVENT_BESTPATH_CHANGE - case api.Resource_ADJ_IN: - if tbl.PostPolicy { - reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG - } else { - reqType = WATCHER_EVENT_UPDATE_MSG + switch req.RequestType { + case REQ_MONITOR_RIB: + tbl := req.Data.(*api.Table) + switch tbl.Type { + case api.Resource_GLOBAL: + reqType = WATCHER_EVENT_BESTPATH_CHANGE + case api.Resource_ADJ_IN: + if tbl.PostPolicy { + reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG + } else { + reqType = WATCHER_EVENT_UPDATE_MSG + } } - default: - continue + case REQ_MONITOR_NEIGHBOR_PEER_STATE: + reqType = WATCHER_EVENT_STATE_CHANGE } reqs := w.reqs[reqType] if reqs == nil { @@ -121,8 +124,46 @@ func (w *grpcWatcher) loop() error { } else { sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList) } + case *watcherEventStateChangedMsg: + peer := &api.Peer{ + Conf: &api.PeerConf{ + PeerAs: msg.peerAS, + LocalAs: msg.localAS, + NeighborAddress: msg.peerAddress.String(), + Id: msg.peerID.String(), + }, + Info: &api.PeerState{ + PeerAs: msg.peerAS, + LocalAs: msg.localAS, + NeighborAddress: msg.peerAddress.String(), + BgpState: msg.state.String(), + AdminState: msg.adminState.String(), + }, + Transport: &api.Transport{ + LocalAddress: msg.localAddress.String(), + LocalPort: uint32(msg.localPort), + RemotePort: uint32(msg.peerPort), + }, + } + reqType := WATCHER_EVENT_STATE_CHANGE + remains := make([]*GrpcRequest, 0, len(w.reqs[reqType])) + result := &GrpcResponse{ + Data: peer, + } + for _, req := range w.reqs[reqType] { + select { + case <-req.EndCh: + continue + default: + } + remains = append(remains, req) + if req.Name != "" && req.Name != peer.Conf.NeighborAddress { + continue + } + req.ResponseCh <- result + } + w.reqs[reqType] = remains } - } } } |