summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/collector.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/collector.go')
-rw-r--r--pkg/server/collector.go222
1 files changed, 0 insertions, 222 deletions
diff --git a/pkg/server/collector.go b/pkg/server/collector.go
deleted file mode 100644
index c219b215..00000000
--- a/pkg/server/collector.go
+++ /dev/null
@@ -1,222 +0,0 @@
-// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-// implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package server
-
-import (
- "fmt"
- "time"
-
- "github.com/influxdata/influxdb/client/v2"
- "github.com/osrg/gobgp/internal/pkg/table"
- "github.com/osrg/gobgp/pkg/packet/bgp"
- log "github.com/sirupsen/logrus"
-)
-
-type Collector struct {
- s *BgpServer
- url string
- dbName string
- interval uint64
- client client.Client
-}
-
-const (
- MEATUREMENT_UPDATE = "update"
- MEATUREMENT_PEER = "peer"
- MEATUREMENT_TABLE = "table"
-)
-
-func (c *Collector) writePoints(points []*client.Point) error {
- bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
- Database: c.dbName,
- Precision: "ms",
- })
- bp.AddPoints(points)
- return c.client.Write(bp)
-}
-
-func (c *Collector) writePeer(msg *WatchEventPeerState) error {
- var state string
- switch msg.State {
- case bgp.BGP_FSM_ESTABLISHED:
- state = "Established"
- case bgp.BGP_FSM_IDLE:
- state = "Idle"
- default:
- return fmt.Errorf("unexpected fsm state %v", msg.State)
- }
-
- tags := map[string]string{
- "PeerAddress": msg.PeerAddress.String(),
- "PeerAS": fmt.Sprintf("%v", msg.PeerAS),
- "State": state,
- }
-
- fields := map[string]interface{}{
- "PeerID": msg.PeerID.String(),
- }
-
- pt, err := client.NewPoint(MEATUREMENT_PEER, tags, fields, msg.Timestamp)
- if err != nil {
- return err
- }
- return c.writePoints([]*client.Point{pt})
-}
-
-func path2data(path *table.Path) (map[string]interface{}, map[string]string) {
- fields := map[string]interface{}{
- "RouterID": path.GetSource().ID,
- }
- if asPath := path.GetAsPath(); asPath != nil {
- fields["ASPath"] = asPath.String()
- }
- if origin, err := path.GetOrigin(); err == nil {
- typ := "-"
- switch origin {
- case bgp.BGP_ORIGIN_ATTR_TYPE_IGP:
- typ = "i"
- case bgp.BGP_ORIGIN_ATTR_TYPE_EGP:
- typ = "e"
- case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE:
- typ = "?"
- }
- fields["Origin"] = typ
- }
- if med, err := path.GetMed(); err == nil {
- fields["Med"] = med
- }
-
- tags := map[string]string{
- "PeerAddress": path.GetSource().Address.String(),
- "PeerAS": fmt.Sprintf("%v", path.GetSource().AS),
- "Timestamp": path.GetTimestamp().String(),
- }
- if nexthop := path.GetNexthop(); len(nexthop) > 0 {
- fields["NextHop"] = nexthop.String()
- }
- if originAS := path.GetSourceAs(); originAS != 0 {
- fields["OriginAS"] = fmt.Sprintf("%v", originAS)
- }
-
- if err := bgp.FlatUpdate(tags, path.GetNlri().Flat()); err != nil {
- log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("NLRI FlatUpdate failed")
- }
- for _, p := range path.GetPathAttrs() {
- if err := bgp.FlatUpdate(tags, p.Flat()); err != nil {
- log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("PathAttr FlatUpdate failed")
- }
- }
- return fields, tags
-}
-
-func (c *Collector) writeUpdate(msg *WatchEventUpdate) error {
- if len(msg.PathList) == 0 {
- // EOR
- return nil
- }
- now := time.Now()
- points := make([]*client.Point, 0, len(msg.PathList))
- for _, path := range msg.PathList {
- fields, tags := path2data(path)
- tags["Withdraw"] = fmt.Sprintf("%v", path.IsWithdraw)
- pt, err := client.NewPoint(MEATUREMENT_UPDATE, tags, fields, now)
- if err != nil {
- return fmt.Errorf("failed to write update, %v", err)
- }
- points = append(points, pt)
- }
- return c.writePoints(points)
-}
-
-func (c *Collector) writeTable(msg *WatchEventAdjIn) error {
- now := time.Now()
- points := make([]*client.Point, 0, len(msg.PathList))
- for _, path := range msg.PathList {
- fields, tags := path2data(path)
- pt, err := client.NewPoint(MEATUREMENT_TABLE, tags, fields, now)
- if err != nil {
- return fmt.Errorf("failed to write table, %v", err)
- }
- points = append(points, pt)
- }
- return c.writePoints(points)
-}
-
-func (c *Collector) loop() {
- w := c.s.Watch(WatchPeerState(true), WatchUpdate(false))
- defer w.Stop()
-
- ticker := func() *time.Ticker {
- if c.interval == 0 {
- return &time.Ticker{}
- }
- return time.NewTicker(time.Second * time.Duration(c.interval))
- }()
-
- for {
- select {
- case <-ticker.C:
- w.Generate(WATCH_EVENT_TYPE_PRE_UPDATE)
- case ev := <-w.Event():
- switch msg := ev.(type) {
- case *WatchEventUpdate:
- if err := c.writeUpdate(msg); err != nil {
- log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to write update event message")
- }
- case *WatchEventPeerState:
- if err := c.writePeer(msg); err != nil {
- log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to write state changed event message")
- }
- case *WatchEventAdjIn:
- if err := c.writeTable(msg); err != nil {
- log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to write Adj-In event message")
- }
- }
- }
- }
-}
-
-func NewCollector(s *BgpServer, url, dbName string, interval uint64) (*Collector, error) {
- c, err := client.NewHTTPClient(client.HTTPConfig{
- Addr: url,
- })
- if err != nil {
- return nil, err
- }
-
- _, _, err = c.Ping(0)
- if err != nil {
- log.Error("can not connect to InfluxDB")
- log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to connect to InfluxDB")
- return nil, err
- }
-
- q := client.NewQuery("CREATE DATABASE "+dbName, "", "")
- if response, err := c.Query(q); err != nil || response.Error() != nil {
- log.WithFields(log.Fields{"Type": "collector", "Error": err}).Errorf("Failed to create database:%s", dbName)
- return nil, err
- }
-
- collector := &Collector{
- s: s,
- url: url,
- dbName: dbName,
- interval: interval,
- client: c,
- }
- go collector.loop()
- return collector, nil
-}