diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2016-05-25 04:39:07 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-06-06 12:43:20 +0900 |
commit | b63e1c1fc3c40b58ba798bbae4e122f0eedaf55d (patch) | |
tree | 48356ad79cdc1d2a9deede9650a37556d4a86adf /server/server.go | |
parent | aca6fd6ad4409b4cb63682bff3c79fca8ca2800d (diff) |
server: refactor monitor/watcher infra
have watcherManager to manage all watchers
also merge grpc neighbor state monitoring handling to grpcWatcher
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r-- | server/server.go | 162 |
1 files changed, 31 insertions, 131 deletions
diff --git a/server/server.go b/server/server.go index 7bd29048..4d5ebf43 100644 --- a/server/server.go +++ b/server/server.go @@ -42,50 +42,6 @@ type SenderMsg struct { msg *FsmOutgoingMsg } -type broadcastMsg interface { - send() -} - -type broadcastGrpcMsg struct { - req *GrpcRequest - result *GrpcResponse - done bool -} - -func (m *broadcastGrpcMsg) send() { - m.req.ResponseCh <- m.result - if m.done == true { - close(m.req.ResponseCh) - } -} - -type broadcastBGPMsg struct { - message *bgp.BGPMessage - peerAS uint32 - localAS uint32 - peerAddress net.IP - localAddress net.IP - fourBytesAs bool - ch chan *broadcastBGPMsg -} - -func (m *broadcastBGPMsg) send() { - m.ch <- m -} - -type Watchers map[watcherType]watcher - -func (ws Watchers) watching(typ watcherEventType) bool { - for _, w := range ws { - for _, ev := range w.watchingEventTypes() { - if ev == typ { - return true - } - } - } - return false -} - type TCPListener struct { l *net.TCPListener ch chan struct{} @@ -146,16 +102,14 @@ type BgpServer struct { acceptCh chan *net.TCPConn collector *Collector - GrpcReqCh chan *GrpcRequest - policy *table.RoutingPolicy - broadcastReqs []*GrpcRequest - broadcastMsgs []broadcastMsg - listeners []*TCPListener - neighborMap map[string]*Peer - globalRib *table.TableManager - roaManager *roaManager - shutdown bool - watchers Watchers + GrpcReqCh chan *GrpcRequest + policy *table.RoutingPolicy + listeners []*TCPListener + neighborMap map[string]*Peer + globalRib *table.TableManager + roaManager *roaManager + shutdown bool + watchers *watcherManager } func NewBgpServer() *BgpServer { @@ -163,24 +117,12 @@ func NewBgpServer() *BgpServer { return &BgpServer{ GrpcReqCh: make(chan *GrpcRequest, 1), neighborMap: make(map[string]*Peer), - watchers: Watchers(make(map[watcherType]watcher)), policy: table.NewRoutingPolicy(), roaManager: roaManager, + watchers: newWatcherManager(), } } -func (server *BgpServer) notify2watchers(typ watcherEventType, ev watcherEvent) error { - for _, watcher := range server.watchers { - if ch := watcher.notify(typ); ch != nil { - server.broadcastMsgs = append(server.broadcastMsgs, &broadcastWatcherMsg{ - ch: ch, - event: ev, - }) - } - } - return nil -} - func (server *BgpServer) Listeners(addr string) []*net.TCPListener { list := make([]*net.TCPListener, 0, len(server.listeners)) rhs := net.ParseIP(addr).To4() != nil @@ -196,7 +138,7 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener { func (server *BgpServer) Serve() { w, _ := newGrpcWatcher() - server.watchers[WATCHER_GRPC_MONITOR] = w + server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w) senderCh := make(chan *SenderMsg, 1<<16) go func(ch chan *SenderMsg) { @@ -213,14 +155,6 @@ func (server *BgpServer) Serve() { }(senderCh) - broadcastCh := make(chan broadcastMsg, 8) - go func(ch chan broadcastMsg) { - for { - m := <-ch - m.send() - } - }(broadcastCh) - server.listeners = make([]*TCPListener, 0, 2) server.fsmincomingCh = channels.NewInfiniteChannel() server.fsmStateCh = make(chan *FsmMsg, 4096) @@ -249,12 +183,6 @@ func (server *BgpServer) Serve() { sCh = senderCh firstMsg = senderMsgs[0] } - var firstBroadcastMsg broadcastMsg - var bCh chan broadcastMsg - if len(server.broadcastMsgs) > 0 { - bCh = broadcastCh - firstBroadcastMsg = server.broadcastMsgs[0] - } passConn := func(conn *net.TCPConn) { host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) @@ -336,8 +264,6 @@ func (server *BgpServer) Serve() { handleFsmMsg(e) case sCh <- firstMsg: senderMsgs = senderMsgs[1:] - case bCh <- firstBroadcastMsg: - server.broadcastMsgs = server.broadcastMsgs[1:] case grpcReq := <-server.GrpcReqCh: m := server.handleGrpc(grpcReq) if len(m) > 0 { @@ -478,7 +404,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil best, _ := server.globalRib.DeletePathsByPeer(ids, peer.fsm.peerInfo, rf) if !peer.isRouteServerClient() { - server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) + server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) } for _, targetPeer := range server.neighborMap { @@ -494,30 +420,6 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil } func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { - result := &GrpcResponse{ - Data: peer.ToApiStruct(), - } - remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) - for _, req := range server.broadcastReqs { - select { - case <-req.EndCh: - continue - default: - } - ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE - ignore = ignore || (req.Name != "" && req.Name != peer.fsm.pConf.Config.NeighborAddress) - if ignore { - remainReqs = append(remainReqs, req) - continue - } - m := &broadcastGrpcMsg{ - req: req, - result: result, - } - server.broadcastMsgs = append(server.broadcastMsgs, m) - remainReqs = append(remainReqs, req) - } - server.broadcastReqs = remainReqs newState := peer.fsm.state if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) { @@ -536,9 +438,10 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { sentOpen: sentOpen, recvOpen: recvOpen, state: newState, + adminState: peer.fsm.adminState, timestamp: time.Now(), } - server.notify2watchers(WATCHER_EVENT_STATE_CHANGE, ev) + server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev) } } } @@ -650,7 +553,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([] if len(best[table.GLOBAL_RIB_NAME]) == 0 { return nil, alteredPathList } - server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) + server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) } for _, targetPeer := range server.neighborMap { @@ -807,7 +710,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { postPolicy: false, pathList: pathList, } - server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev) + server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev) } if len(pathList) > 0 { @@ -830,7 +733,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { for _, u := range table.CreateUpdateMsgFromPaths(altered) { payload, _ := u.Serialize() ev.payload = payload - server.notify2watchers(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev) + server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev) } } } @@ -2253,16 +2156,16 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { Data: data, } close(grpcReq.ResponseCh) - case REQ_MONITOR_NEIGHBOR_PEER_STATE: - server.broadcastReqs = append(server.broadcastReqs, grpcReq) - case REQ_MONITOR_RIB: + case REQ_MONITOR_RIB, REQ_MONITOR_NEIGHBOR_PEER_STATE: if grpcReq.Name != "" { if _, err = server.checkNeighborRequest(grpcReq); err != nil { break } } - w := server.watchers[WATCHER_GRPC_MONITOR] - go w.(*grpcWatcher).addRequest(grpcReq) + w, y := server.watchers.watcher(WATCHER_GRPC_MONITOR) + if y { + go w.(*grpcWatcher).addRequest(grpcReq) + } case REQ_ENABLE_MRT: server.handleEnableMrtRequest(grpcReq) case REQ_DISABLE_MRT: @@ -2308,7 +2211,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { } z, err := newZebraWatcher(server.GrpcReqCh, c.Url, protos) if err == nil { - server.watchers[WATCHER_ZEBRA] = z + server.watchers.addWatcher(WATCHER_ZEBRA, z) } grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, @@ -2318,8 +2221,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { c := grpcReq.Data.(*config.CollectorConfig) collector, err := NewCollector(server.GrpcReqCh, c.Url, c.DbName, c.TableDumpInterval) if err == nil { - server.collector = collector - server.watchers[WATCHER_COLLECTOR] = collector + server.watchers.addWatcher(WATCHER_COLLECTOR, collector) } grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, @@ -2333,7 +2235,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { grpcReq.ResponseCh <- &GrpcResponse{} close(grpcReq.ResponseCh) - server.notify2watchers(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList}) + server.watchers.notify(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList}) default: err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType) goto ERROR @@ -3121,7 +3023,7 @@ func grpcDone(grpcReq *GrpcRequest, e error) { func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) { arg := grpcReq.Data.(*api.EnableMrtRequest) - if _, y := server.watchers[WATCHER_MRT]; y { + if _, y := server.watchers.watcher(WATCHER_MRT); y { grpcDone(grpcReq, fmt.Errorf("already enabled")) return } @@ -3131,7 +3033,7 @@ func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) { } w, err := newMrtWatcher(arg.DumpType, arg.Filename, arg.Interval) if err == nil { - server.watchers[WATCHER_MRT] = w + server.watchers.addWatcher(WATCHER_MRT, w) } grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, @@ -3141,14 +3043,12 @@ func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) { } func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) { - w, y := server.watchers[WATCHER_MRT] + _, y := server.watchers.watcher(WATCHER_MRT) if !y { grpcDone(grpcReq, fmt.Errorf("not enabled yet")) return } - - delete(server.watchers, WATCHER_MRT) - w.stop() + server.watchers.delWatcher(WATCHER_MRT) grpcReq.ResponseCh <- &GrpcResponse{ Data: &api.DisableMrtResponse{}, } @@ -3168,10 +3068,10 @@ func (server *BgpServer) handleAddBmp(grpcReq *GrpcRequest) { c = arg } - w, y := server.watchers[WATCHER_BMP] + w, y := server.watchers.watcher(WATCHER_BMP) if !y { w, _ = newBmpWatcher(server.GrpcReqCh) - server.watchers[WATCHER_BMP] = w + server.watchers.addWatcher(WATCHER_BMP, w) } err := w.(*bmpWatcher).addServer(*c) @@ -3194,7 +3094,7 @@ func (server *BgpServer) handleDeleteBmp(grpcReq *GrpcRequest) { c = arg } - if w, y := server.watchers[WATCHER_BMP]; y { + if w, y := server.watchers.watcher(WATCHER_BMP); y { err := w.(*bmpWatcher).deleteServer(*c) grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, |