diff options
Diffstat (limited to 'pkg/server/collector.go')
-rw-r--r-- | pkg/server/collector.go | 222 |
1 files changed, 0 insertions, 222 deletions
diff --git a/pkg/server/collector.go b/pkg/server/collector.go deleted file mode 100644 index c219b215..00000000 --- a/pkg/server/collector.go +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "fmt" - "time" - - "github.com/influxdata/influxdb/client/v2" - "github.com/osrg/gobgp/internal/pkg/table" - "github.com/osrg/gobgp/pkg/packet/bgp" - log "github.com/sirupsen/logrus" -) - -type Collector struct { - s *BgpServer - url string - dbName string - interval uint64 - client client.Client -} - -const ( - MEATUREMENT_UPDATE = "update" - MEATUREMENT_PEER = "peer" - MEATUREMENT_TABLE = "table" -) - -func (c *Collector) writePoints(points []*client.Point) error { - bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ - Database: c.dbName, - Precision: "ms", - }) - bp.AddPoints(points) - return c.client.Write(bp) -} - -func (c *Collector) writePeer(msg *WatchEventPeerState) error { - var state string - switch msg.State { - case bgp.BGP_FSM_ESTABLISHED: - state = "Established" - case bgp.BGP_FSM_IDLE: - state = "Idle" - default: - return fmt.Errorf("unexpected fsm state %v", msg.State) - } - - tags := map[string]string{ - "PeerAddress": msg.PeerAddress.String(), - "PeerAS": fmt.Sprintf("%v", msg.PeerAS), - "State": state, - } - - fields := map[string]interface{}{ - "PeerID": msg.PeerID.String(), - } - - pt, err := client.NewPoint(MEATUREMENT_PEER, tags, fields, msg.Timestamp) - if err != nil { - return err - } - return c.writePoints([]*client.Point{pt}) -} - -func path2data(path *table.Path) (map[string]interface{}, map[string]string) { - fields := map[string]interface{}{ - "RouterID": path.GetSource().ID, - } - if asPath := path.GetAsPath(); asPath != nil { - fields["ASPath"] = asPath.String() - } - if origin, err := path.GetOrigin(); err == nil { - typ := "-" - switch origin { - case bgp.BGP_ORIGIN_ATTR_TYPE_IGP: - typ = "i" - case bgp.BGP_ORIGIN_ATTR_TYPE_EGP: - typ = "e" - case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE: - typ = "?" - } - fields["Origin"] = typ - } - if med, err := path.GetMed(); err == nil { - fields["Med"] = med - } - - tags := map[string]string{ - "PeerAddress": path.GetSource().Address.String(), - "PeerAS": fmt.Sprintf("%v", path.GetSource().AS), - "Timestamp": path.GetTimestamp().String(), - } - if nexthop := path.GetNexthop(); len(nexthop) > 0 { - fields["NextHop"] = nexthop.String() - } - if originAS := path.GetSourceAs(); originAS != 0 { - fields["OriginAS"] = fmt.Sprintf("%v", originAS) - } - - if err := bgp.FlatUpdate(tags, path.GetNlri().Flat()); err != nil { - log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("NLRI FlatUpdate failed") - } - for _, p := range path.GetPathAttrs() { - if err := bgp.FlatUpdate(tags, p.Flat()); err != nil { - log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("PathAttr FlatUpdate failed") - } - } - return fields, tags -} - -func (c *Collector) writeUpdate(msg *WatchEventUpdate) error { - if len(msg.PathList) == 0 { - // EOR - return nil - } - now := time.Now() - points := make([]*client.Point, 0, len(msg.PathList)) - for _, path := range msg.PathList { - fields, tags := path2data(path) - tags["Withdraw"] = fmt.Sprintf("%v", path.IsWithdraw) - pt, err := client.NewPoint(MEATUREMENT_UPDATE, tags, fields, now) - if err != nil { - return fmt.Errorf("failed to write update, %v", err) - } - points = append(points, pt) - } - return c.writePoints(points) -} - -func (c *Collector) writeTable(msg *WatchEventAdjIn) error { - now := time.Now() - points := make([]*client.Point, 0, len(msg.PathList)) - for _, path := range msg.PathList { - fields, tags := path2data(path) - pt, err := client.NewPoint(MEATUREMENT_TABLE, tags, fields, now) - if err != nil { - return fmt.Errorf("failed to write table, %v", err) - } - points = append(points, pt) - } - return c.writePoints(points) -} - -func (c *Collector) loop() { - w := c.s.Watch(WatchPeerState(true), WatchUpdate(false)) - defer w.Stop() - - ticker := func() *time.Ticker { - if c.interval == 0 { - return &time.Ticker{} - } - return time.NewTicker(time.Second * time.Duration(c.interval)) - }() - - for { - select { - case <-ticker.C: - w.Generate(WATCH_EVENT_TYPE_PRE_UPDATE) - case ev := <-w.Event(): - switch msg := ev.(type) { - case *WatchEventUpdate: - if err := c.writeUpdate(msg); err != nil { - log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to write update event message") - } - case *WatchEventPeerState: - if err := c.writePeer(msg); err != nil { - log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to write state changed event message") - } - case *WatchEventAdjIn: - if err := c.writeTable(msg); err != nil { - log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to write Adj-In event message") - } - } - } - } -} - -func NewCollector(s *BgpServer, url, dbName string, interval uint64) (*Collector, error) { - c, err := client.NewHTTPClient(client.HTTPConfig{ - Addr: url, - }) - if err != nil { - return nil, err - } - - _, _, err = c.Ping(0) - if err != nil { - log.Error("can not connect to InfluxDB") - log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to connect to InfluxDB") - return nil, err - } - - q := client.NewQuery("CREATE DATABASE "+dbName, "", "") - if response, err := c.Query(q); err != nil || response.Error() != nil { - log.WithFields(log.Fields{"Type": "collector", "Error": err}).Errorf("Failed to create database:%s", dbName) - return nil, err - } - - collector := &Collector{ - s: s, - url: url, - dbName: dbName, - interval: interval, - client: c, - } - go collector.loop() - return collector, nil -} |