summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--gobgpd/main.go6
-rw-r--r--server/collector.go36
-rw-r--r--server/grpc_server.go1
-rw-r--r--server/server.go53
4 files changed, 42 insertions, 54 deletions
diff --git a/gobgpd/main.go b/gobgpd/main.go
index b5d9ecfb..ca90ece7 100644
--- a/gobgpd/main.go
+++ b/gobgpd/main.go
@@ -214,8 +214,10 @@ func main() {
log.Fatalf("failed to set zebra config: %s", err)
}
}
- if err := bgpServer.SetCollector(newConfig.Collector); err != nil {
- log.Fatalf("failed to set collector config: %s", err)
+ if len(newConfig.Collector.Config.Url) > 0 {
+ if err := bgpServer.StartCollector(&newConfig.Collector.Config); err != nil {
+ log.Fatalf("failed to set collector config: %s", err)
+ }
}
if err := bgpServer.SetRpkiConfig(newConfig.RpkiServers); err != nil {
log.Fatalf("failed to set rpki config: %s", err)
diff --git a/server/collector.go b/server/collector.go
index 92ec8fba..b4fdb061 100644
--- a/server/collector.go
+++ b/server/collector.go
@@ -25,11 +25,10 @@ import (
)
type Collector struct {
- grpcCh chan *GrpcRequest
+ s *BgpServer
url string
dbName string
interval uint64
- ch chan watcherEvent
client client.Client
}
@@ -39,20 +38,6 @@ const (
MEATUREMENT_TABLE = "table"
)
-func (c *Collector) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE || t == WATCHER_EVENT_ADJ_IN {
- return c.ch
- }
- return nil
-}
-
-func (c *Collector) stop() {
-}
-
-func (c *Collector) watchingEventTypes() []watcherEventType {
- return []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_STATE_CHANGE, WATCHER_EVENT_ADJ_IN}
-}
-
func (c *Collector) writePoints(points []*client.Point) error {
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: c.dbName,
@@ -170,6 +155,9 @@ func (c *Collector) writeTable(msg *watcherEventAdjInMsg) error {
}
func (c *Collector) loop() {
+ w := c.s.Watch(WatchPeerState(true), WatchUpdate(false))
+ defer w.Stop()
+
ticker := func() *time.Ticker {
if c.interval == 0 {
return &time.Ticker{}
@@ -180,15 +168,8 @@ func (c *Collector) loop() {
for {
select {
case <-ticker.C:
- go func() {
- ch := make(chan *GrpcResponse)
- c.grpcCh <- &GrpcRequest{
- RequestType: REQ_WATCHER_ADJ_RIB_IN,
- ResponseCh: ch,
- }
- (<-ch).Err()
- }()
- case ev := <-c.ch:
+ w.Generate(WATCH_TYPE_PRE_UPDATE)
+ case ev := <-w.Event():
switch msg := ev.(type) {
case *watcherEventUpdateMsg:
if err := c.writeUpdate(msg); err != nil {
@@ -207,7 +188,7 @@ func (c *Collector) loop() {
}
}
-func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64) (*Collector, error) {
+func NewCollector(s *BgpServer, url, dbName string, interval uint64) (*Collector, error) {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: url,
})
@@ -228,11 +209,10 @@ func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64)
}
collector := &Collector{
- grpcCh: grpcCh,
+ s: s,
url: url,
dbName: dbName,
interval: interval,
- ch: make(chan watcherEvent, 16),
client: c,
}
go collector.loop()
diff --git a/server/grpc_server.go b/server/grpc_server.go
index 27d0a6d1..b9eb48d3 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -96,7 +96,6 @@ const (
REQ_DEFERRAL_TIMER_EXPIRED
REQ_RELOAD_POLICY
REQ_INITIALIZE_ZEBRA
- REQ_INITIALIZE_COLLECTOR
REQ_WATCHER_ADJ_RIB_IN // FIXME
)
diff --git a/server/server.go b/server/server.go
index 0aeead21..46fb33a2 100644
--- a/server/server.go
+++ b/server/server.go
@@ -831,20 +831,15 @@ func (server *BgpServer) SetGlobalType(g config.Global) error {
return nil
}
-func (server *BgpServer) SetCollector(c config.Collector) error {
- if len(c.Config.Url) == 0 {
- return nil
- }
- ch := make(chan *GrpcResponse)
- server.GrpcReqCh <- &GrpcRequest{
- RequestType: REQ_INITIALIZE_COLLECTOR,
- Data: &c.Config,
- ResponseCh: ch,
- }
- if err := (<-ch).Err(); err != nil {
- return err
+func (s *BgpServer) StartCollector(c *config.CollectorConfig) (err error) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ s.mgmtCh <- func() {
+ defer close(ch)
+ _, err = NewCollector(s, c.Url, c.DbName, c.TableDumpInterval)
}
- return nil
+ return err
}
func (s *BgpServer) StartZebraClient(x *config.Zebra) (err error) {
@@ -1790,16 +1785,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
if len(pathList) > 0 {
server.propagateUpdate(nil, pathList)
}
- case REQ_INITIALIZE_COLLECTOR:
- c := grpcReq.Data.(*config.CollectorConfig)
- collector, err := NewCollector(server.GrpcReqCh, c.Url, c.DbName, c.TableDumpInterval)
- if err == nil {
- server.watchers.addWatcher(WATCHER_COLLECTOR, collector)
- }
- grpcReq.ResponseCh <- &GrpcResponse{
- ResponseErr: err,
- }
- close(grpcReq.ResponseCh)
case REQ_WATCHER_ADJ_RIB_IN:
pathList := make([]*table.Path, 0)
for _, peer := range server.neighborMap {
@@ -2764,6 +2749,28 @@ func (w *Watcher) Event() <-chan watcherEvent {
return w.realCh
}
+func (w *Watcher) Generate(t watchType) (err error) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ w.s.mgmtCh <- func() {
+ defer close(ch)
+
+ switch t {
+ case WATCH_TYPE_PRE_UPDATE:
+ default:
+ err = fmt.Errorf("unsupported type ", t)
+ return
+ }
+ pathList := make([]*table.Path, 0)
+ for _, peer := range w.s.neighborMap {
+ pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...)
+ }
+ w.notify(&watcherEventAdjInMsg{pathList: pathList})
+ }
+ return err
+}
+
func (w *Watcher) notify(v watcherEvent) {
w.ch.In() <- v
}