diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/collector.go | 36 | ||||
-rw-r--r-- | server/grpc_server.go | 1 | ||||
-rw-r--r-- | server/server.go | 53 |
3 files changed, 38 insertions, 52 deletions
diff --git a/server/collector.go b/server/collector.go index 92ec8fba..b4fdb061 100644 --- a/server/collector.go +++ b/server/collector.go @@ -25,11 +25,10 @@ import ( ) type Collector struct { - grpcCh chan *GrpcRequest + s *BgpServer url string dbName string interval uint64 - ch chan watcherEvent client client.Client } @@ -39,20 +38,6 @@ const ( MEATUREMENT_TABLE = "table" ) -func (c *Collector) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE || t == WATCHER_EVENT_ADJ_IN { - return c.ch - } - return nil -} - -func (c *Collector) stop() { -} - -func (c *Collector) watchingEventTypes() []watcherEventType { - return []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_STATE_CHANGE, WATCHER_EVENT_ADJ_IN} -} - func (c *Collector) writePoints(points []*client.Point) error { bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ Database: c.dbName, @@ -170,6 +155,9 @@ func (c *Collector) writeTable(msg *watcherEventAdjInMsg) error { } func (c *Collector) loop() { + w := c.s.Watch(WatchPeerState(true), WatchUpdate(false)) + defer w.Stop() + ticker := func() *time.Ticker { if c.interval == 0 { return &time.Ticker{} @@ -180,15 +168,8 @@ func (c *Collector) loop() { for { select { case <-ticker.C: - go func() { - ch := make(chan *GrpcResponse) - c.grpcCh <- &GrpcRequest{ - RequestType: REQ_WATCHER_ADJ_RIB_IN, - ResponseCh: ch, - } - (<-ch).Err() - }() - case ev := <-c.ch: + w.Generate(WATCH_TYPE_PRE_UPDATE) + case ev := <-w.Event(): switch msg := ev.(type) { case *watcherEventUpdateMsg: if err := c.writeUpdate(msg); err != nil { @@ -207,7 +188,7 @@ func (c *Collector) loop() { } } -func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64) (*Collector, error) { +func NewCollector(s *BgpServer, url, dbName string, interval uint64) (*Collector, error) { c, err := client.NewHTTPClient(client.HTTPConfig{ Addr: url, }) @@ -228,11 +209,10 @@ func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64) } collector := &Collector{ - grpcCh: grpcCh, + s: s, url: url, dbName: dbName, interval: interval, - ch: make(chan watcherEvent, 16), client: c, } go collector.loop() diff --git a/server/grpc_server.go b/server/grpc_server.go index 27d0a6d1..b9eb48d3 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -96,7 +96,6 @@ const ( REQ_DEFERRAL_TIMER_EXPIRED REQ_RELOAD_POLICY REQ_INITIALIZE_ZEBRA - REQ_INITIALIZE_COLLECTOR REQ_WATCHER_ADJ_RIB_IN // FIXME ) diff --git a/server/server.go b/server/server.go index 0aeead21..46fb33a2 100644 --- a/server/server.go +++ b/server/server.go @@ -831,20 +831,15 @@ func (server *BgpServer) SetGlobalType(g config.Global) error { return nil } -func (server *BgpServer) SetCollector(c config.Collector) error { - if len(c.Config.Url) == 0 { - return nil - } - ch := make(chan *GrpcResponse) - server.GrpcReqCh <- &GrpcRequest{ - RequestType: REQ_INITIALIZE_COLLECTOR, - Data: &c.Config, - ResponseCh: ch, - } - if err := (<-ch).Err(); err != nil { - return err +func (s *BgpServer) StartCollector(c *config.CollectorConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + _, err = NewCollector(s, c.Url, c.DbName, c.TableDumpInterval) } - return nil + return err } func (s *BgpServer) StartZebraClient(x *config.Zebra) (err error) { @@ -1790,16 +1785,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { if len(pathList) > 0 { server.propagateUpdate(nil, pathList) } - case REQ_INITIALIZE_COLLECTOR: - c := grpcReq.Data.(*config.CollectorConfig) - collector, err := NewCollector(server.GrpcReqCh, c.Url, c.DbName, c.TableDumpInterval) - if err == nil { - server.watchers.addWatcher(WATCHER_COLLECTOR, collector) - } - grpcReq.ResponseCh <- &GrpcResponse{ - ResponseErr: err, - } - close(grpcReq.ResponseCh) case REQ_WATCHER_ADJ_RIB_IN: pathList := make([]*table.Path, 0) for _, peer := range server.neighborMap { @@ -2764,6 +2749,28 @@ func (w *Watcher) Event() <-chan watcherEvent { return w.realCh } +func (w *Watcher) Generate(t watchType) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + w.s.mgmtCh <- func() { + defer close(ch) + + switch t { + case WATCH_TYPE_PRE_UPDATE: + default: + err = fmt.Errorf("unsupported type ", t) + return + } + pathList := make([]*table.Path, 0) + for _, peer := range w.s.neighborMap { + pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...) + } + w.notify(&watcherEventAdjInMsg{pathList: pathList}) + } + return err +} + func (w *Watcher) notify(v watcherEvent) { w.ch.In() <- v } |