summaryrefslogtreecommitdiffhomepage
path: root/server/server.go
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-11-11 13:56:15 -0800
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-11-12 15:01:12 +0900
commit180d68f88f457096ec92bcb6eabd915a64bb8858 (patch)
tree86116964678ddd40637a29206ab0c1254284e21c /server/server.go
parentd5d99dad68ce2b935e1171e64aca8d4340ffae10 (diff)
server: introduce watcher framework
rewrite mrt to use it. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r--server/server.go27
1 files changed, 20 insertions, 7 deletions
diff --git a/server/server.go b/server/server.go
index 1e54da39..8a5cabdf 100644
--- a/server/server.go
+++ b/server/server.go
@@ -87,7 +87,6 @@ type BgpServer struct {
fsmincomingCh chan *FsmMsg
rpkiConfigCh chan config.RpkiServers
bmpConfigCh chan config.BmpServers
- dumper *dumper
GrpcReqCh chan *GrpcRequest
listenPort int
@@ -103,6 +102,7 @@ type BgpServer struct {
bmpClient *bmpClient
bmpConnCh chan *bmpConn
shutdown bool
+ watchers map[watcherType]watcher
}
func NewBgpServer(port int) *BgpServer {
@@ -118,6 +118,7 @@ func NewBgpServer(port int) *BgpServer {
b.policyUpdateCh = make(chan config.RoutingPolicy)
b.neighborMap = make(map[string]*Peer)
b.listenPort = port
+ b.watchers = make(map[watcherType]watcher)
return &b
}
@@ -163,11 +164,11 @@ func (server *BgpServer) Serve() {
server.roaClient, _ = newROAClient(g.GlobalConfig.As, config.RpkiServers{})
if g.Mrt.FileName != "" {
- d, err := newDumper(g.Mrt.FileName)
+ w, err := newMrtWatcher(g.Mrt.FileName)
if err != nil {
log.Warn(err)
} else {
- server.dumper = d
+ server.watchers[WATCHER_MRT] = w
}
}
@@ -799,20 +800,32 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan *
}
}
if m.Header.Type == bgp.BGP_MSG_UPDATE {
- if server.dumper != nil {
+ listener := make(map[watcher]chan watcherEvent)
+ for _, watcher := range server.watchers {
+ if ch := watcher.notify(WATCHER_EVENT_UPDATE_MSG); ch != nil {
+ listener[watcher] = ch
+ }
+ }
+ if len(listener) > 0 {
_, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- bm := &broadcastBGPMsg{
+ ev := &watcherEventUpdateMsg{
message: m,
peerAS: peer.fsm.peerInfo.AS,
localAS: peer.fsm.peerInfo.LocalAS,
peerAddress: peer.fsm.peerInfo.Address,
localAddress: net.ParseIP(l),
fourBytesAs: y,
- ch: server.dumper.sendCh(),
}
- server.broadcastMsgs = append(server.broadcastMsgs, bm)
+ for _, ch := range listener {
+ bm := &broadcastWatcherMsg{
+ ch: ch,
+ event: ev,
+ }
+ server.broadcastMsgs = append(server.broadcastMsgs, bm)
+ }
}
+
if ch := server.bmpClient.send(); ch != nil {
bm := &broadcastBMPMsg{
ch: ch,