summaryrefslogtreecommitdiffhomepage
path: root/server/server.go
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2016-05-25 04:39:07 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-06-06 12:43:20 +0900
commitb63e1c1fc3c40b58ba798bbae4e122f0eedaf55d (patch)
tree48356ad79cdc1d2a9deede9650a37556d4a86adf /server/server.go
parentaca6fd6ad4409b4cb63682bff3c79fca8ca2800d (diff)
server: refactor monitor/watcher infra
have watcherManager to manage all watchers also merge grpc neighbor state monitoring handling to grpcWatcher Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r--server/server.go162
1 files changed, 31 insertions, 131 deletions
diff --git a/server/server.go b/server/server.go
index 7bd29048..4d5ebf43 100644
--- a/server/server.go
+++ b/server/server.go
@@ -42,50 +42,6 @@ type SenderMsg struct {
msg *FsmOutgoingMsg
}
-type broadcastMsg interface {
- send()
-}
-
-type broadcastGrpcMsg struct {
- req *GrpcRequest
- result *GrpcResponse
- done bool
-}
-
-func (m *broadcastGrpcMsg) send() {
- m.req.ResponseCh <- m.result
- if m.done == true {
- close(m.req.ResponseCh)
- }
-}
-
-type broadcastBGPMsg struct {
- message *bgp.BGPMessage
- peerAS uint32
- localAS uint32
- peerAddress net.IP
- localAddress net.IP
- fourBytesAs bool
- ch chan *broadcastBGPMsg
-}
-
-func (m *broadcastBGPMsg) send() {
- m.ch <- m
-}
-
-type Watchers map[watcherType]watcher
-
-func (ws Watchers) watching(typ watcherEventType) bool {
- for _, w := range ws {
- for _, ev := range w.watchingEventTypes() {
- if ev == typ {
- return true
- }
- }
- }
- return false
-}
-
type TCPListener struct {
l *net.TCPListener
ch chan struct{}
@@ -146,16 +102,14 @@ type BgpServer struct {
acceptCh chan *net.TCPConn
collector *Collector
- GrpcReqCh chan *GrpcRequest
- policy *table.RoutingPolicy
- broadcastReqs []*GrpcRequest
- broadcastMsgs []broadcastMsg
- listeners []*TCPListener
- neighborMap map[string]*Peer
- globalRib *table.TableManager
- roaManager *roaManager
- shutdown bool
- watchers Watchers
+ GrpcReqCh chan *GrpcRequest
+ policy *table.RoutingPolicy
+ listeners []*TCPListener
+ neighborMap map[string]*Peer
+ globalRib *table.TableManager
+ roaManager *roaManager
+ shutdown bool
+ watchers *watcherManager
}
func NewBgpServer() *BgpServer {
@@ -163,24 +117,12 @@ func NewBgpServer() *BgpServer {
return &BgpServer{
GrpcReqCh: make(chan *GrpcRequest, 1),
neighborMap: make(map[string]*Peer),
- watchers: Watchers(make(map[watcherType]watcher)),
policy: table.NewRoutingPolicy(),
roaManager: roaManager,
+ watchers: newWatcherManager(),
}
}
-func (server *BgpServer) notify2watchers(typ watcherEventType, ev watcherEvent) error {
- for _, watcher := range server.watchers {
- if ch := watcher.notify(typ); ch != nil {
- server.broadcastMsgs = append(server.broadcastMsgs, &broadcastWatcherMsg{
- ch: ch,
- event: ev,
- })
- }
- }
- return nil
-}
-
func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
list := make([]*net.TCPListener, 0, len(server.listeners))
rhs := net.ParseIP(addr).To4() != nil
@@ -196,7 +138,7 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
func (server *BgpServer) Serve() {
w, _ := newGrpcWatcher()
- server.watchers[WATCHER_GRPC_MONITOR] = w
+ server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w)
senderCh := make(chan *SenderMsg, 1<<16)
go func(ch chan *SenderMsg) {
@@ -213,14 +155,6 @@ func (server *BgpServer) Serve() {
}(senderCh)
- broadcastCh := make(chan broadcastMsg, 8)
- go func(ch chan broadcastMsg) {
- for {
- m := <-ch
- m.send()
- }
- }(broadcastCh)
-
server.listeners = make([]*TCPListener, 0, 2)
server.fsmincomingCh = channels.NewInfiniteChannel()
server.fsmStateCh = make(chan *FsmMsg, 4096)
@@ -249,12 +183,6 @@ func (server *BgpServer) Serve() {
sCh = senderCh
firstMsg = senderMsgs[0]
}
- var firstBroadcastMsg broadcastMsg
- var bCh chan broadcastMsg
- if len(server.broadcastMsgs) > 0 {
- bCh = broadcastCh
- firstBroadcastMsg = server.broadcastMsgs[0]
- }
passConn := func(conn *net.TCPConn) {
host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
@@ -336,8 +264,6 @@ func (server *BgpServer) Serve() {
handleFsmMsg(e)
case sCh <- firstMsg:
senderMsgs = senderMsgs[1:]
- case bCh <- firstBroadcastMsg:
- server.broadcastMsgs = server.broadcastMsgs[1:]
case grpcReq := <-server.GrpcReqCh:
m := server.handleGrpc(grpcReq)
if len(m) > 0 {
@@ -478,7 +404,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
best, _ := server.globalRib.DeletePathsByPeer(ids, peer.fsm.peerInfo, rf)
if !peer.isRouteServerClient() {
- server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
+ server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
}
for _, targetPeer := range server.neighborMap {
@@ -494,30 +420,6 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
}
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
- result := &GrpcResponse{
- Data: peer.ToApiStruct(),
- }
- remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs))
- for _, req := range server.broadcastReqs {
- select {
- case <-req.EndCh:
- continue
- default:
- }
- ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE
- ignore = ignore || (req.Name != "" && req.Name != peer.fsm.pConf.Config.NeighborAddress)
- if ignore {
- remainReqs = append(remainReqs, req)
- continue
- }
- m := &broadcastGrpcMsg{
- req: req,
- result: result,
- }
- server.broadcastMsgs = append(server.broadcastMsgs, m)
- remainReqs = append(remainReqs, req)
- }
- server.broadcastReqs = remainReqs
newState := peer.fsm.state
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) {
@@ -536,9 +438,10 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
sentOpen: sentOpen,
recvOpen: recvOpen,
state: newState,
+ adminState: peer.fsm.adminState,
timestamp: time.Now(),
}
- server.notify2watchers(WATCHER_EVENT_STATE_CHANGE, ev)
+ server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev)
}
}
}
@@ -650,7 +553,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]
if len(best[table.GLOBAL_RIB_NAME]) == 0 {
return nil, alteredPathList
}
- server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
+ server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
}
for _, targetPeer := range server.neighborMap {
@@ -807,7 +710,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
postPolicy: false,
pathList: pathList,
}
- server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev)
+ server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev)
}
if len(pathList) > 0 {
@@ -830,7 +733,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
for _, u := range table.CreateUpdateMsgFromPaths(altered) {
payload, _ := u.Serialize()
ev.payload = payload
- server.notify2watchers(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev)
+ server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev)
}
}
}
@@ -2253,16 +2156,16 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
Data: data,
}
close(grpcReq.ResponseCh)
- case REQ_MONITOR_NEIGHBOR_PEER_STATE:
- server.broadcastReqs = append(server.broadcastReqs, grpcReq)
- case REQ_MONITOR_RIB:
+ case REQ_MONITOR_RIB, REQ_MONITOR_NEIGHBOR_PEER_STATE:
if grpcReq.Name != "" {
if _, err = server.checkNeighborRequest(grpcReq); err != nil {
break
}
}
- w := server.watchers[WATCHER_GRPC_MONITOR]
- go w.(*grpcWatcher).addRequest(grpcReq)
+ w, y := server.watchers.watcher(WATCHER_GRPC_MONITOR)
+ if y {
+ go w.(*grpcWatcher).addRequest(grpcReq)
+ }
case REQ_ENABLE_MRT:
server.handleEnableMrtRequest(grpcReq)
case REQ_DISABLE_MRT:
@@ -2308,7 +2211,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
}
z, err := newZebraWatcher(server.GrpcReqCh, c.Url, protos)
if err == nil {
- server.watchers[WATCHER_ZEBRA] = z
+ server.watchers.addWatcher(WATCHER_ZEBRA, z)
}
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
@@ -2318,8 +2221,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
c := grpcReq.Data.(*config.CollectorConfig)
collector, err := NewCollector(server.GrpcReqCh, c.Url, c.DbName, c.TableDumpInterval)
if err == nil {
- server.collector = collector
- server.watchers[WATCHER_COLLECTOR] = collector
+ server.watchers.addWatcher(WATCHER_COLLECTOR, collector)
}
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
@@ -2333,7 +2235,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
grpcReq.ResponseCh <- &GrpcResponse{}
close(grpcReq.ResponseCh)
- server.notify2watchers(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList})
+ server.watchers.notify(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList})
default:
err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType)
goto ERROR
@@ -3121,7 +3023,7 @@ func grpcDone(grpcReq *GrpcRequest, e error) {
func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) {
arg := grpcReq.Data.(*api.EnableMrtRequest)
- if _, y := server.watchers[WATCHER_MRT]; y {
+ if _, y := server.watchers.watcher(WATCHER_MRT); y {
grpcDone(grpcReq, fmt.Errorf("already enabled"))
return
}
@@ -3131,7 +3033,7 @@ func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) {
}
w, err := newMrtWatcher(arg.DumpType, arg.Filename, arg.Interval)
if err == nil {
- server.watchers[WATCHER_MRT] = w
+ server.watchers.addWatcher(WATCHER_MRT, w)
}
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
@@ -3141,14 +3043,12 @@ func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) {
}
func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) {
- w, y := server.watchers[WATCHER_MRT]
+ _, y := server.watchers.watcher(WATCHER_MRT)
if !y {
grpcDone(grpcReq, fmt.Errorf("not enabled yet"))
return
}
-
- delete(server.watchers, WATCHER_MRT)
- w.stop()
+ server.watchers.delWatcher(WATCHER_MRT)
grpcReq.ResponseCh <- &GrpcResponse{
Data: &api.DisableMrtResponse{},
}
@@ -3168,10 +3068,10 @@ func (server *BgpServer) handleAddBmp(grpcReq *GrpcRequest) {
c = arg
}
- w, y := server.watchers[WATCHER_BMP]
+ w, y := server.watchers.watcher(WATCHER_BMP)
if !y {
w, _ = newBmpWatcher(server.GrpcReqCh)
- server.watchers[WATCHER_BMP] = w
+ server.watchers.addWatcher(WATCHER_BMP, w)
}
err := w.(*bmpWatcher).addServer(*c)
@@ -3194,7 +3094,7 @@ func (server *BgpServer) handleDeleteBmp(grpcReq *GrpcRequest) {
c = arg
}
- if w, y := server.watchers[WATCHER_BMP]; y {
+ if w, y := server.watchers.watcher(WATCHER_BMP); y {
err := w.(*bmpWatcher).deleteServer(*c)
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,