diff options
Diffstat (limited to 'server/collector.go')
-rw-r--r-- | server/collector.go | 36 |
1 files changed, 8 insertions, 28 deletions
diff --git a/server/collector.go b/server/collector.go index 92ec8fba..b4fdb061 100644 --- a/server/collector.go +++ b/server/collector.go @@ -25,11 +25,10 @@ import ( ) type Collector struct { - grpcCh chan *GrpcRequest + s *BgpServer url string dbName string interval uint64 - ch chan watcherEvent client client.Client } @@ -39,20 +38,6 @@ const ( MEATUREMENT_TABLE = "table" ) -func (c *Collector) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE || t == WATCHER_EVENT_ADJ_IN { - return c.ch - } - return nil -} - -func (c *Collector) stop() { -} - -func (c *Collector) watchingEventTypes() []watcherEventType { - return []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_STATE_CHANGE, WATCHER_EVENT_ADJ_IN} -} - func (c *Collector) writePoints(points []*client.Point) error { bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ Database: c.dbName, @@ -170,6 +155,9 @@ func (c *Collector) writeTable(msg *watcherEventAdjInMsg) error { } func (c *Collector) loop() { + w := c.s.Watch(WatchPeerState(true), WatchUpdate(false)) + defer w.Stop() + ticker := func() *time.Ticker { if c.interval == 0 { return &time.Ticker{} @@ -180,15 +168,8 @@ func (c *Collector) loop() { for { select { case <-ticker.C: - go func() { - ch := make(chan *GrpcResponse) - c.grpcCh <- &GrpcRequest{ - RequestType: REQ_WATCHER_ADJ_RIB_IN, - ResponseCh: ch, - } - (<-ch).Err() - }() - case ev := <-c.ch: + w.Generate(WATCH_TYPE_PRE_UPDATE) + case ev := <-w.Event(): switch msg := ev.(type) { case *watcherEventUpdateMsg: if err := c.writeUpdate(msg); err != nil { @@ -207,7 +188,7 @@ func (c *Collector) loop() { } } -func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64) (*Collector, error) { +func NewCollector(s *BgpServer, url, dbName string, interval uint64) (*Collector, error) { c, err := client.NewHTTPClient(client.HTTPConfig{ Addr: url, }) @@ -228,11 +209,10 @@ func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64) } collector := &Collector{ - grpcCh: grpcCh, + s: s, url: url, dbName: dbName, interval: interval, - ch: make(chan watcherEvent, 16), client: c, } go collector.loop() |