summaryrefslogtreecommitdiffhomepage
path: root/server/monitor.go
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2016-05-25 04:39:07 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-06-06 12:43:20 +0900
commitb63e1c1fc3c40b58ba798bbae4e122f0eedaf55d (patch)
tree48356ad79cdc1d2a9deede9650a37556d4a86adf /server/monitor.go
parentaca6fd6ad4409b4cb63682bff3c79fca8ca2800d (diff)
server: refactor monitor/watcher infra
have watcherManager to manage all watchers also merge grpc neighbor state monitoring handling to grpcWatcher Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/monitor.go')
-rw-r--r--server/monitor.go71
1 files changed, 56 insertions, 15 deletions
diff --git a/server/monitor.go b/server/monitor.go
index c7236fed..e8e7df4d 100644
--- a/server/monitor.go
+++ b/server/monitor.go
@@ -30,7 +30,7 @@ type grpcWatcher struct {
}
func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG {
+ if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE {
return w.ch
}
return nil
@@ -41,8 +41,8 @@ func (w *grpcWatcher) stop() {
}
func (w *grpcWatcher) watchingEventTypes() []watcherEventType {
- types := make([]watcherEventType, 0, 3)
- for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE} {
+ types := make([]watcherEventType, 0, 4)
+ for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE, WATCHER_EVENT_STATE_CHANGE} {
if len(w.reqs[t]) > 0 {
types = append(types, t)
}
@@ -61,19 +61,22 @@ func (w *grpcWatcher) loop() error {
}
return nil
case req := <-w.ctlCh:
- tbl := req.Data.(*api.Table)
var reqType watcherEventType
- switch tbl.Type {
- case api.Resource_GLOBAL:
- reqType = WATCHER_EVENT_BESTPATH_CHANGE
- case api.Resource_ADJ_IN:
- if tbl.PostPolicy {
- reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG
- } else {
- reqType = WATCHER_EVENT_UPDATE_MSG
+ switch req.RequestType {
+ case REQ_MONITOR_RIB:
+ tbl := req.Data.(*api.Table)
+ switch tbl.Type {
+ case api.Resource_GLOBAL:
+ reqType = WATCHER_EVENT_BESTPATH_CHANGE
+ case api.Resource_ADJ_IN:
+ if tbl.PostPolicy {
+ reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG
+ } else {
+ reqType = WATCHER_EVENT_UPDATE_MSG
+ }
}
- default:
- continue
+ case REQ_MONITOR_NEIGHBOR_PEER_STATE:
+ reqType = WATCHER_EVENT_STATE_CHANGE
}
reqs := w.reqs[reqType]
if reqs == nil {
@@ -121,8 +124,46 @@ func (w *grpcWatcher) loop() error {
} else {
sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList)
}
+ case *watcherEventStateChangedMsg:
+ peer := &api.Peer{
+ Conf: &api.PeerConf{
+ PeerAs: msg.peerAS,
+ LocalAs: msg.localAS,
+ NeighborAddress: msg.peerAddress.String(),
+ Id: msg.peerID.String(),
+ },
+ Info: &api.PeerState{
+ PeerAs: msg.peerAS,
+ LocalAs: msg.localAS,
+ NeighborAddress: msg.peerAddress.String(),
+ BgpState: msg.state.String(),
+ AdminState: msg.adminState.String(),
+ },
+ Transport: &api.Transport{
+ LocalAddress: msg.localAddress.String(),
+ LocalPort: uint32(msg.localPort),
+ RemotePort: uint32(msg.peerPort),
+ },
+ }
+ reqType := WATCHER_EVENT_STATE_CHANGE
+ remains := make([]*GrpcRequest, 0, len(w.reqs[reqType]))
+ result := &GrpcResponse{
+ Data: peer,
+ }
+ for _, req := range w.reqs[reqType] {
+ select {
+ case <-req.EndCh:
+ continue
+ default:
+ }
+ remains = append(remains, req)
+ if req.Name != "" && req.Name != peer.Conf.NeighborAddress {
+ continue
+ }
+ req.ResponseCh <- result
+ }
+ w.reqs[reqType] = remains
}
-
}
}
}