summaryrefslogtreecommitdiffhomepage
path: root/server/collector.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/collector.go')
-rw-r--r--server/collector.go36
1 files changed, 8 insertions, 28 deletions
diff --git a/server/collector.go b/server/collector.go
index 92ec8fba..b4fdb061 100644
--- a/server/collector.go
+++ b/server/collector.go
@@ -25,11 +25,10 @@ import (
)
type Collector struct {
- grpcCh chan *GrpcRequest
+ s *BgpServer
url string
dbName string
interval uint64
- ch chan watcherEvent
client client.Client
}
@@ -39,20 +38,6 @@ const (
MEATUREMENT_TABLE = "table"
)
-func (c *Collector) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE || t == WATCHER_EVENT_ADJ_IN {
- return c.ch
- }
- return nil
-}
-
-func (c *Collector) stop() {
-}
-
-func (c *Collector) watchingEventTypes() []watcherEventType {
- return []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_STATE_CHANGE, WATCHER_EVENT_ADJ_IN}
-}
-
func (c *Collector) writePoints(points []*client.Point) error {
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: c.dbName,
@@ -170,6 +155,9 @@ func (c *Collector) writeTable(msg *watcherEventAdjInMsg) error {
}
func (c *Collector) loop() {
+ w := c.s.Watch(WatchPeerState(true), WatchUpdate(false))
+ defer w.Stop()
+
ticker := func() *time.Ticker {
if c.interval == 0 {
return &time.Ticker{}
@@ -180,15 +168,8 @@ func (c *Collector) loop() {
for {
select {
case <-ticker.C:
- go func() {
- ch := make(chan *GrpcResponse)
- c.grpcCh <- &GrpcRequest{
- RequestType: REQ_WATCHER_ADJ_RIB_IN,
- ResponseCh: ch,
- }
- (<-ch).Err()
- }()
- case ev := <-c.ch:
+ w.Generate(WATCH_TYPE_PRE_UPDATE)
+ case ev := <-w.Event():
switch msg := ev.(type) {
case *watcherEventUpdateMsg:
if err := c.writeUpdate(msg); err != nil {
@@ -207,7 +188,7 @@ func (c *Collector) loop() {
}
}
-func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64) (*Collector, error) {
+func NewCollector(s *BgpServer, url, dbName string, interval uint64) (*Collector, error) {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: url,
})
@@ -228,11 +209,10 @@ func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64)
}
collector := &Collector{
- grpcCh: grpcCh,
+ s: s,
url: url,
dbName: dbName,
interval: interval,
- ch: make(chan watcherEvent, 16),
client: c,
}
go collector.loop()