diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-11-11 13:56:15 -0800 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-11-12 15:01:12 +0900 |
commit | 180d68f88f457096ec92bcb6eabd915a64bb8858 (patch) | |
tree | 86116964678ddd40637a29206ab0c1254284e21c /server/server.go | |
parent | d5d99dad68ce2b935e1171e64aca8d4340ffae10 (diff) |
server: introduce watcher framework
rewrite mrt to use it.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r-- | server/server.go | 27 |
1 files changed, 20 insertions, 7 deletions
diff --git a/server/server.go b/server/server.go index 1e54da39..8a5cabdf 100644 --- a/server/server.go +++ b/server/server.go @@ -87,7 +87,6 @@ type BgpServer struct { fsmincomingCh chan *FsmMsg rpkiConfigCh chan config.RpkiServers bmpConfigCh chan config.BmpServers - dumper *dumper GrpcReqCh chan *GrpcRequest listenPort int @@ -103,6 +102,7 @@ type BgpServer struct { bmpClient *bmpClient bmpConnCh chan *bmpConn shutdown bool + watchers map[watcherType]watcher } func NewBgpServer(port int) *BgpServer { @@ -118,6 +118,7 @@ func NewBgpServer(port int) *BgpServer { b.policyUpdateCh = make(chan config.RoutingPolicy) b.neighborMap = make(map[string]*Peer) b.listenPort = port + b.watchers = make(map[watcherType]watcher) return &b } @@ -163,11 +164,11 @@ func (server *BgpServer) Serve() { server.roaClient, _ = newROAClient(g.GlobalConfig.As, config.RpkiServers{}) if g.Mrt.FileName != "" { - d, err := newDumper(g.Mrt.FileName) + w, err := newMrtWatcher(g.Mrt.FileName) if err != nil { log.Warn(err) } else { - server.dumper = d + server.watchers[WATCHER_MRT] = w } } @@ -799,20 +800,32 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan * } } if m.Header.Type == bgp.BGP_MSG_UPDATE { - if server.dumper != nil { + listener := make(map[watcher]chan watcherEvent) + for _, watcher := range server.watchers { + if ch := watcher.notify(WATCHER_EVENT_UPDATE_MSG); ch != nil { + listener[watcher] = ch + } + } + if len(listener) > 0 { _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - bm := &broadcastBGPMsg{ + ev := &watcherEventUpdateMsg{ message: m, peerAS: peer.fsm.peerInfo.AS, localAS: peer.fsm.peerInfo.LocalAS, peerAddress: peer.fsm.peerInfo.Address, localAddress: net.ParseIP(l), fourBytesAs: y, - ch: server.dumper.sendCh(), } - server.broadcastMsgs = append(server.broadcastMsgs, bm) + for _, ch := range listener { + bm := &broadcastWatcherMsg{ + ch: ch, + event: ev, + } + server.broadcastMsgs = append(server.broadcastMsgs, bm) + } } + if ch := server.bmpClient.send(); ch != nil { bm := &broadcastBMPMsg{ ch: ch, |