diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2015-06-23 17:15:56 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-06-25 22:26:06 +0900 |
commit | 40f82a62ed9fe381fdcca71fbd77283a8077c1c5 (patch) | |
tree | 17c2332d025044fffe54361c0838460d097f42e2 /server | |
parent | 854cf36c7460f3240a0126322e5d03b929a1c3d0 (diff) |
server/api: add api to monitor changes of neighbor state
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r-- | server/grpc_server.go | 28 | ||||
-rw-r--r-- | server/server.go | 33 |
2 files changed, 55 insertions, 6 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index aa36a264..12a3a87a 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -48,7 +48,6 @@ const ( REQ_GLOBAL_RIB REQ_GLOBAL_ADD REQ_GLOBAL_DELETE - REQ_GLOBAL_MONITOR_BEST_CHANGED REQ_POLICY_PREFIX REQ_POLICY_PREFIXES REQ_POLICY_PREFIX_ADD @@ -74,6 +73,8 @@ const ( REQ_POLICY_COMMUNITY_ADD REQ_POLICY_COMMUNITY_DELETE REQ_POLICY_COMMUNITIES_DELETE + REQ_MONITOR_GLOBAL_BEST_CHANGED + REQ_MONITOR_NEIGHBOR_PEER_STATE ) const GRPC_PORT = 8080 @@ -210,7 +211,7 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorB var reqType int switch arg.Resource { case api.Resource_GLOBAL: - reqType = REQ_GLOBAL_MONITOR_BEST_CHANGED + reqType = REQ_MONITOR_GLOBAL_BEST_CHANGED default: return fmt.Errorf("unsupported resource type: %v", arg.Resource) } @@ -237,6 +238,27 @@ END: return err } +func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPeerStateServer) error { + var rf bgp.RouteFamily + req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.RouterId, rf, nil) + s.bgpServerCh <- req + + var err error + + for res := range req.ResponseCh { + if err = res.Err(); err != nil { + log.Debug(err.Error()) + goto END + } + if err = stream.Send(res.Data.(*api.Peer)); err != nil { + goto END + } + } +END: + req.EndCh <- struct{}{} + return err +} + func (s *Server) neighbor(reqType int, arg *api.Arguments) (*api.Error, error) { rf, err := convertAf2Rf(arg.Af) if err != nil { @@ -555,7 +577,7 @@ func NewGrpcRequest(reqType int, remoteAddr string, rf bgp.RouteFamily, d interf RouteFamily: rf, RemoteAddr: remoteAddr, ResponseCh: make(chan *GrpcResponse), - EndCh: make(chan struct{}), + EndCh: make(chan struct{}, 1), Data: d, } return r diff --git a/server/server.go b/server/server.go index cc19ecc7..7bfedc49 100644 --- a/server/server.go +++ b/server/server.go @@ -219,7 +219,7 @@ func (server *BgpServer) Serve() { server.neighborMap[name] = peer peer.outgoing = make(chan *bgp.BGPMessage, 128) peer.startFSMHandler(incoming) - + server.broadcastPeerState(peer) case config := <-server.deletedPeerCh: addr := config.NeighborAddress.String() SetTcpMD5SigSockopts(listener(config.NeighborAddress), addr, "") @@ -415,6 +415,10 @@ func (server *BgpServer) broadcastBests(bests []table.Path) { } remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) for _, req := range server.broadcastReqs { + if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED { + remainReqs = append(remainReqs, req) + continue + } select { case <-req.EndCh: continue @@ -426,6 +430,28 @@ func (server *BgpServer) broadcastBests(bests []table.Path) { } } +func (server *BgpServer) broadcastPeerState(peer *Peer) { + result := &GrpcResponse{ + Data: peer.ToApiStruct(), + } + remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) + for _, req := range server.broadcastReqs { + ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE + ignore = ignore || (req.RemoteAddr != "" && req.RemoteAddr != peer.config.NeighborAddress.String()) + if ignore { + remainReqs = append(remainReqs, req) + continue + } + select { + case <-req.EndCh: + continue + case req.ResponseCh <- result: + } + remainReqs = append(remainReqs, req) + } + server.broadcastReqs = remainReqs +} + func (server *BgpServer) propagateUpdate(neighborAddress string, RouteServerClient bool, pathList []table.Path) []*SenderMsg { msgs := make([]*SenderMsg, 0) @@ -528,6 +554,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan * peer.config.BgpNeighborCommonState = config.BgpNeighborCommonState{} } peer.startFSMHandler(incoming) + server.broadcastPeerState(peer) case FSM_MSG_BGP_MESSAGE: switch m := e.MsgData.(type) { @@ -814,8 +841,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { close(grpcReq.ResponseCh) } - case REQ_GLOBAL_MONITOR_BEST_CHANGED: - server.broadcastReqs = append(server.broadcastReqs, grpcReq) case REQ_NEIGHBORS: for _, peer := range server.neighborMap { result := &GrpcResponse{ @@ -1089,6 +1114,8 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { case REQ_POLICY_PREFIXES_DELETE, REQ_POLICY_NEIGHBORS_DELETE, REQ_POLICY_ASPATHS_DELETE, REQ_POLICY_COMMUNITIES_DELETE, REQ_POLICY_ROUTEPOLICIES_DELETE: server.handleGrpcDelPolicies(grpcReq) + case REQ_MONITOR_GLOBAL_BEST_CHANGED, REQ_MONITOR_NEIGHBOR_PEER_STATE: + server.broadcastReqs = append(server.broadcastReqs, grpcReq) default: errmsg := fmt.Errorf("Unknown request type: %v", grpcReq.RequestType) result := &GrpcResponse{ |