diff options
-rw-r--r-- | config/bgp_configs.go | 75 | ||||
-rw-r--r-- | config/serve.go | 1 | ||||
-rw-r--r-- | gobgpd/main.go | 3 | ||||
-rw-r--r-- | server/collector.go | 224 | ||||
-rw-r--r-- | server/grpc_server.go | 2 | ||||
-rw-r--r-- | server/server.go | 37 | ||||
-rw-r--r-- | server/watcher.go | 6 | ||||
-rw-r--r-- | table/path.go | 7 | ||||
-rw-r--r-- | tools/pyang_plugins/gobgp.yang | 27 |
9 files changed, 382 insertions, 0 deletions
diff --git a/config/bgp_configs.go b/config/bgp_configs.go index 2054efec..16968123 100644 --- a/config/bgp_configs.go +++ b/config/bgp_configs.go @@ -871,6 +871,76 @@ func (v RpkiValidationResultType) Validate() error { } //struct for container gobgp:state +type CollectorState struct { + // original -> gobgp:url + Url string `mapstructure:"url"` + // original -> gobgp:db-name + DbName string `mapstructure:"db-name"` + // original -> gobgp:table-dump-interval + TableDumpInterval uint64 `mapstructure:"table-dump-interval"` +} + +func (lhs *CollectorState) Equal(rhs *CollectorState) bool { + if lhs == nil || rhs == nil { + return false + } + if lhs.Url != rhs.Url { + return false + } + if lhs.DbName != rhs.DbName { + return false + } + if lhs.TableDumpInterval != rhs.TableDumpInterval { + return false + } + return true +} + +//struct for container gobgp:config +type CollectorConfig struct { + // original -> gobgp:url + Url string `mapstructure:"url"` + // original -> gobgp:db-name + DbName string `mapstructure:"db-name"` + // original -> gobgp:table-dump-interval + TableDumpInterval uint64 `mapstructure:"table-dump-interval"` +} + +func (lhs *CollectorConfig) Equal(rhs *CollectorConfig) bool { + if lhs == nil || rhs == nil { + return false + } + if lhs.Url != rhs.Url { + return false + } + if lhs.DbName != rhs.DbName { + return false + } + if lhs.TableDumpInterval != rhs.TableDumpInterval { + return false + } + return true +} + +//struct for container gobgp:collector +type Collector struct { + // original -> gobgp:collector-config + Config CollectorConfig `mapstructure:"config"` + // original -> gobgp:collector-state + State CollectorState `mapstructure:"state"` +} + +func (lhs *Collector) Equal(rhs *Collector) bool { + if lhs == nil || rhs == nil { + return false + } + if !lhs.Config.Equal(&(rhs.Config)) { + return false + } + return true +} + +//struct for container gobgp:state type ZebraState struct { // original -> gobgp:enabled //gobgp:enabled's original type is boolean @@ -3998,6 +4068,8 @@ type Bgp struct { MrtDump []Mrt `mapstructure:"mrt-dump"` // original -> gobgp:zebra Zebra Zebra `mapstructure:"zebra"` + // original -> gobgp:collector + Collector Collector `mapstructure:"collector"` } func (lhs *Bgp) Equal(rhs *Bgp) bool { @@ -4090,6 +4162,9 @@ func (lhs *Bgp) Equal(rhs *Bgp) bool { if !lhs.Zebra.Equal(&(rhs.Zebra)) { return false } + if !lhs.Collector.Equal(&(rhs.Collector)) { + return false + } return true } diff --git a/config/serve.go b/config/serve.go index a8f67725..95658a97 100644 --- a/config/serve.go +++ b/config/serve.go @@ -16,6 +16,7 @@ type BgpConfigSet struct { BmpServers []BmpServer `mapstructure:"bmp-servers"` MrtDump []Mrt `mapstructure:"mrt-dump"` Zebra Zebra `mapstructure:"zebra"` + Collector Collector `mapstructure:"collector"` DefinedSets DefinedSets `mapstructure:"defined-sets"` PolicyDefinitions []PolicyDefinition `mapstructure:"policy-definitions"` } diff --git a/gobgpd/main.go b/gobgpd/main.go index 7a04eef7..8dddfa51 100644 --- a/gobgpd/main.go +++ b/gobgpd/main.go @@ -208,6 +208,9 @@ func main() { if err := bgpServer.SetZebraConfig(newConfig.Zebra); err != nil { log.Fatalf("failed to set zebra config: %s", err) } + if err := bgpServer.SetCollector(newConfig.Collector); err != nil { + log.Fatalf("failed to set collector config: %s", err) + } if err := bgpServer.SetRpkiConfig(newConfig.RpkiServers); err != nil { log.Fatalf("failed to set rpki config: %s", err) } diff --git a/server/collector.go b/server/collector.go new file mode 100644 index 00000000..6860011e --- /dev/null +++ b/server/collector.go @@ -0,0 +1,224 @@ +// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + log "github.com/Sirupsen/logrus" + "github.com/influxdata/influxdb/client/v2" + "github.com/osrg/gobgp/packet/bgp" + "github.com/osrg/gobgp/table" + "strings" + "time" +) + +type Collector struct { + grpcCh chan *GrpcRequest + url string + dbName string + interval uint64 + ch chan watcherEvent + client client.Client +} + +const ( + MEATUREMENT_UPDATE = "update" + MEATUREMENT_PEER = "peer" + MEATUREMENT_TABLE = "table" +) + +func (c *Collector) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE || t == WATCHER_EVENT_ADJ_IN { + return c.ch + } + return nil +} + +func (c *Collector) stop() { +} + +func (c *Collector) restart(filename string) error { + return nil +} + +func (c *Collector) watchingEventTypes() []watcherEventType { + return []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_STATE_CHANGE, WATCHER_EVENT_ADJ_IN} +} + +func (c *Collector) writePoints(points []*client.Point) error { + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ + Database: c.dbName, + Precision: "ms", + }) + bp.AddPoints(points) + return c.client.Write(bp) +} + +func (c *Collector) writePeer(msg *watcherEventStateChangedMsg) error { + var state string + switch msg.state { + case bgp.BGP_FSM_ESTABLISHED: + state = "Established" + case bgp.BGP_FSM_IDLE: + state = "Idle" + default: + return fmt.Errorf("unexpected fsm state %v", msg.state) + } + + tags := map[string]string{ + "PeerAddress": msg.peerAddress.String(), + "PeerAS": fmt.Sprintf("%v", msg.peerAS), + "State": state, + } + + fields := map[string]interface{}{ + "PeerID": msg.peerID.String(), + } + + pt, err := client.NewPoint(MEATUREMENT_PEER, tags, fields, msg.timestamp) + if err != nil { + return err + } + return c.writePoints([]*client.Point{pt}) +} + +func path2data(path *table.Path) (map[string]interface{}, map[string]string) { + fields := map[string]interface{}{ + "ASPath": path.GetAsPath().String(), + } + if origin, err := path.GetOrigin(); err == nil { + typ := "-" + switch origin { + case bgp.BGP_ORIGIN_ATTR_TYPE_IGP: + typ = "i" + case bgp.BGP_ORIGIN_ATTR_TYPE_EGP: + typ = "e" + case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE: + typ = "?" + } + fields["Origin"] = typ + } + if med, err := path.GetMed(); err == nil { + fields["Med"] = med + } + + var prefix, prefixLen string + l := strings.Split(path.GetNlri().String(), "/") + if len(l) == 2 { + prefix = l[0] + prefixLen = l[1] + } + tags := map[string]string{ + "PeerAddress": path.GetSource().Address.String(), + "PeerAS": fmt.Sprintf("%v", path.GetSource().AS), + "Prefix": prefix, + "PrefixLen": prefixLen, + "NextHop": path.GetNexthop().String(), + "OriginAS": fmt.Sprintf("%v", path.GetSourceAs()), + "Timestamp": path.GetTimestamp().String(), + } + return fields, tags +} + +func (c *Collector) writeUpdate(msg *watcherEventUpdateMsg) error { + if len(msg.pathList) == 0 { + // EOR + return nil + } + now := time.Now() + points := make([]*client.Point, 0, len(msg.pathList)) + for _, path := range msg.pathList { + fields, tags := path2data(path) + tags["Withdraw"] = fmt.Sprintf("%v", path.IsWithdraw) + pt, err := client.NewPoint(MEATUREMENT_UPDATE, tags, fields, now) + if err != nil { + return fmt.Errorf("failed to write update, %v", err) + } + points = append(points, pt) + } + return c.writePoints(points) +} + +func (c *Collector) writeTable(msg *watcherEventAdjInMsg) error { + now := time.Now() + points := make([]*client.Point, 0, len(msg.pathList)) + for _, path := range msg.pathList { + fields, tags := path2data(path) + pt, err := client.NewPoint(MEATUREMENT_TABLE, tags, fields, now) + if err != nil { + return fmt.Errorf("failed to write table, %v", err) + } + points = append(points, pt) + } + return c.writePoints(points) +} + +func (c *Collector) loop() { + ticker := func() *time.Ticker { + if c.interval == 0 { + return &time.Ticker{} + } + return time.NewTicker(time.Second * time.Duration(c.interval)) + }() + + for { + select { + case <-ticker.C: + go func() { + ch := make(chan *GrpcResponse) + c.grpcCh <- &GrpcRequest{ + RequestType: REQ_WATCHER_ADJ_RIB_IN, + ResponseCh: ch, + } + (<-ch).Err() + }() + case ev := <-c.ch: + switch msg := ev.(type) { + case *watcherEventUpdateMsg: + if err := c.writeUpdate(msg); err != nil { + log.Error(err) + } + case *watcherEventStateChangedMsg: + if err := c.writePeer(msg); err != nil { + log.Error(err) + } + case *watcherEventAdjInMsg: + if err := c.writeTable(msg); err != nil { + log.Error(err) + } + } + } + } +} + +func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64) (*Collector, error) { + c, err := client.NewHTTPClient(client.HTTPConfig{ + Addr: url, + }) + if err != nil { + return nil, err + } + collector := &Collector{ + grpcCh: grpcCh, + url: url, + dbName: dbName, + interval: interval, + ch: make(chan watcherEvent, 16), + client: c, + } + go collector.loop() + return collector, nil +} diff --git a/server/grpc_server.go b/server/grpc_server.go index c2669da9..cbe642a2 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -98,6 +98,8 @@ const ( REQ_DEFERRAL_TIMER_EXPIRED REQ_RELOAD_POLICY REQ_INITIALIZE_ZEBRA + REQ_INITIALIZE_COLLECTOR + REQ_WATCHER_ADJ_RIB_IN // FIXME ) type Server struct { diff --git a/server/server.go b/server/server.go index 97f9228c..25153aaa 100644 --- a/server/server.go +++ b/server/server.go @@ -146,6 +146,7 @@ type BgpServer struct { fsmStateCh chan *FsmMsg acceptCh chan *net.TCPConn zapiMsgCh chan *zebra.Message + collector *Collector GrpcReqCh chan *GrpcRequest policy *table.RoutingPolicy @@ -997,6 +998,22 @@ func (server *BgpServer) SetGlobalType(g config.Global) error { return nil } +func (server *BgpServer) SetCollector(c config.Collector) error { + if len(c.Config.Url) == 0 { + return nil + } + ch := make(chan *GrpcResponse) + server.GrpcReqCh <- &GrpcRequest{ + RequestType: REQ_INITIALIZE_COLLECTOR, + Data: &c.Config, + ResponseCh: ch, + } + if err := (<-ch).Err(); err != nil { + return err + } + return nil +} + func (server *BgpServer) SetZebraConfig(z config.Zebra) error { if !z.Config.Enabled { return nil @@ -2338,6 +2355,26 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { ResponseErr: err, } close(grpcReq.ResponseCh) + case REQ_INITIALIZE_COLLECTOR: + c := grpcReq.Data.(*config.CollectorConfig) + collector, err := NewCollector(server.GrpcReqCh, c.Url, c.DbName, c.TableDumpInterval) + if err == nil { + server.collector = collector + server.watchers[WATCHER_COLLECTOR] = collector + } + grpcReq.ResponseCh <- &GrpcResponse{ + ResponseErr: err, + } + close(grpcReq.ResponseCh) + case REQ_WATCHER_ADJ_RIB_IN: + pathList := make([]*table.Path, 0) + for _, peer := range server.neighborMap { + pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...) + } + + grpcReq.ResponseCh <- &GrpcResponse{} + close(grpcReq.ResponseCh) + server.notify2watchers(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList}) default: err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType) goto ERROR diff --git a/server/watcher.go b/server/watcher.go index d08a6ef3..490978f0 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -43,6 +43,7 @@ const ( WATCHER_MRT // UPDATE MSG WATCHER_BMP WATCHER_ZEBRA + WATCHER_COLLECTOR WATCHER_GRPC_BESTPATH WATCHER_GRPC_INCOMING ) @@ -55,6 +56,7 @@ const ( WATCHER_EVENT_STATE_CHANGE WATCHER_EVENT_BESTPATH_CHANGE WATCHER_EVENT_POST_POLICY_UPDATE_MSG + WATCHER_EVENT_ADJ_IN ) type watcherEvent interface { @@ -88,6 +90,10 @@ type watcherEventStateChangedMsg struct { timestamp time.Time } +type watcherEventAdjInMsg struct { + pathList []*table.Path +} + type watcher interface { notify(watcherEventType) chan watcherEvent restart(string) error diff --git a/table/path.go b/table/path.go index 3742970f..a4fc4e43 100644 --- a/table/path.go +++ b/table/path.go @@ -826,6 +826,13 @@ func (path *Path) GetClusterList() []net.IP { return nil } +func (path *Path) GetOrigin() (uint8, error) { + if attr := path.getPathAttr(bgp.BGP_ATTR_TYPE_ORIGIN); attr != nil { + return attr.(*bgp.PathAttributeOrigin).Value[0], nil + } + return 0, fmt.Errorf("no origin path attr") +} + func (lhs *Path) Equal(rhs *Path) bool { return lhs == rhs } diff --git a/tools/pyang_plugins/gobgp.yang b/tools/pyang_plugins/gobgp.yang index 325942f2..f9aba5f7 100644 --- a/tools/pyang_plugins/gobgp.yang +++ b/tools/pyang_plugins/gobgp.yang @@ -808,6 +808,33 @@ module gobgp { uses zebra-set; } + grouping collector-config { + leaf url { + type string; + } + leaf db-name { + type string; + } + leaf table-dump-interval { + type uint64; + } + } + + grouping collector-set { + container collector { + container config { + uses collector-config; + } + container state { + uses collector-config; + } + } + } + + augment "/bgp:bgp" { + uses collector-set; + } + augment "/bgp:bgp/bgp:global" { container mpls-label-range { description "mpls labal range"; |