diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/collector.go | 224 | ||||
-rw-r--r-- | server/grpc_server.go | 2 | ||||
-rw-r--r-- | server/server.go | 37 | ||||
-rw-r--r-- | server/watcher.go | 6 |
4 files changed, 269 insertions, 0 deletions
diff --git a/server/collector.go b/server/collector.go new file mode 100644 index 00000000..6860011e --- /dev/null +++ b/server/collector.go @@ -0,0 +1,224 @@ +// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + log "github.com/Sirupsen/logrus" + "github.com/influxdata/influxdb/client/v2" + "github.com/osrg/gobgp/packet/bgp" + "github.com/osrg/gobgp/table" + "strings" + "time" +) + +type Collector struct { + grpcCh chan *GrpcRequest + url string + dbName string + interval uint64 + ch chan watcherEvent + client client.Client +} + +const ( + MEATUREMENT_UPDATE = "update" + MEATUREMENT_PEER = "peer" + MEATUREMENT_TABLE = "table" +) + +func (c *Collector) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE || t == WATCHER_EVENT_ADJ_IN { + return c.ch + } + return nil +} + +func (c *Collector) stop() { +} + +func (c *Collector) restart(filename string) error { + return nil +} + +func (c *Collector) watchingEventTypes() []watcherEventType { + return []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_STATE_CHANGE, WATCHER_EVENT_ADJ_IN} +} + +func (c *Collector) writePoints(points []*client.Point) error { + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ + Database: c.dbName, + Precision: "ms", + }) + bp.AddPoints(points) + return c.client.Write(bp) +} + +func (c *Collector) writePeer(msg *watcherEventStateChangedMsg) error { + var state string + switch msg.state { + case bgp.BGP_FSM_ESTABLISHED: + state = "Established" + case bgp.BGP_FSM_IDLE: + state = "Idle" + default: + return fmt.Errorf("unexpected fsm state %v", msg.state) + } + + tags := map[string]string{ + "PeerAddress": msg.peerAddress.String(), + "PeerAS": fmt.Sprintf("%v", msg.peerAS), + "State": state, + } + + fields := map[string]interface{}{ + "PeerID": msg.peerID.String(), + } + + pt, err := client.NewPoint(MEATUREMENT_PEER, tags, fields, msg.timestamp) + if err != nil { + return err + } + return c.writePoints([]*client.Point{pt}) +} + +func path2data(path *table.Path) (map[string]interface{}, map[string]string) { + fields := map[string]interface{}{ + "ASPath": path.GetAsPath().String(), + } + if origin, err := path.GetOrigin(); err == nil { + typ := "-" + switch origin { + case bgp.BGP_ORIGIN_ATTR_TYPE_IGP: + typ = "i" + case bgp.BGP_ORIGIN_ATTR_TYPE_EGP: + typ = "e" + case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE: + typ = "?" + } + fields["Origin"] = typ + } + if med, err := path.GetMed(); err == nil { + fields["Med"] = med + } + + var prefix, prefixLen string + l := strings.Split(path.GetNlri().String(), "/") + if len(l) == 2 { + prefix = l[0] + prefixLen = l[1] + } + tags := map[string]string{ + "PeerAddress": path.GetSource().Address.String(), + "PeerAS": fmt.Sprintf("%v", path.GetSource().AS), + "Prefix": prefix, + "PrefixLen": prefixLen, + "NextHop": path.GetNexthop().String(), + "OriginAS": fmt.Sprintf("%v", path.GetSourceAs()), + "Timestamp": path.GetTimestamp().String(), + } + return fields, tags +} + +func (c *Collector) writeUpdate(msg *watcherEventUpdateMsg) error { + if len(msg.pathList) == 0 { + // EOR + return nil + } + now := time.Now() + points := make([]*client.Point, 0, len(msg.pathList)) + for _, path := range msg.pathList { + fields, tags := path2data(path) + tags["Withdraw"] = fmt.Sprintf("%v", path.IsWithdraw) + pt, err := client.NewPoint(MEATUREMENT_UPDATE, tags, fields, now) + if err != nil { + return fmt.Errorf("failed to write update, %v", err) + } + points = append(points, pt) + } + return c.writePoints(points) +} + +func (c *Collector) writeTable(msg *watcherEventAdjInMsg) error { + now := time.Now() + points := make([]*client.Point, 0, len(msg.pathList)) + for _, path := range msg.pathList { + fields, tags := path2data(path) + pt, err := client.NewPoint(MEATUREMENT_TABLE, tags, fields, now) + if err != nil { + return fmt.Errorf("failed to write table, %v", err) + } + points = append(points, pt) + } + return c.writePoints(points) +} + +func (c *Collector) loop() { + ticker := func() *time.Ticker { + if c.interval == 0 { + return &time.Ticker{} + } + return time.NewTicker(time.Second * time.Duration(c.interval)) + }() + + for { + select { + case <-ticker.C: + go func() { + ch := make(chan *GrpcResponse) + c.grpcCh <- &GrpcRequest{ + RequestType: REQ_WATCHER_ADJ_RIB_IN, + ResponseCh: ch, + } + (<-ch).Err() + }() + case ev := <-c.ch: + switch msg := ev.(type) { + case *watcherEventUpdateMsg: + if err := c.writeUpdate(msg); err != nil { + log.Error(err) + } + case *watcherEventStateChangedMsg: + if err := c.writePeer(msg); err != nil { + log.Error(err) + } + case *watcherEventAdjInMsg: + if err := c.writeTable(msg); err != nil { + log.Error(err) + } + } + } + } +} + +func NewCollector(grpcCh chan *GrpcRequest, url, dbName string, interval uint64) (*Collector, error) { + c, err := client.NewHTTPClient(client.HTTPConfig{ + Addr: url, + }) + if err != nil { + return nil, err + } + collector := &Collector{ + grpcCh: grpcCh, + url: url, + dbName: dbName, + interval: interval, + ch: make(chan watcherEvent, 16), + client: c, + } + go collector.loop() + return collector, nil +} diff --git a/server/grpc_server.go b/server/grpc_server.go index c2669da9..cbe642a2 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -98,6 +98,8 @@ const ( REQ_DEFERRAL_TIMER_EXPIRED REQ_RELOAD_POLICY REQ_INITIALIZE_ZEBRA + REQ_INITIALIZE_COLLECTOR + REQ_WATCHER_ADJ_RIB_IN // FIXME ) type Server struct { diff --git a/server/server.go b/server/server.go index 97f9228c..25153aaa 100644 --- a/server/server.go +++ b/server/server.go @@ -146,6 +146,7 @@ type BgpServer struct { fsmStateCh chan *FsmMsg acceptCh chan *net.TCPConn zapiMsgCh chan *zebra.Message + collector *Collector GrpcReqCh chan *GrpcRequest policy *table.RoutingPolicy @@ -997,6 +998,22 @@ func (server *BgpServer) SetGlobalType(g config.Global) error { return nil } +func (server *BgpServer) SetCollector(c config.Collector) error { + if len(c.Config.Url) == 0 { + return nil + } + ch := make(chan *GrpcResponse) + server.GrpcReqCh <- &GrpcRequest{ + RequestType: REQ_INITIALIZE_COLLECTOR, + Data: &c.Config, + ResponseCh: ch, + } + if err := (<-ch).Err(); err != nil { + return err + } + return nil +} + func (server *BgpServer) SetZebraConfig(z config.Zebra) error { if !z.Config.Enabled { return nil @@ -2338,6 +2355,26 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { ResponseErr: err, } close(grpcReq.ResponseCh) + case REQ_INITIALIZE_COLLECTOR: + c := grpcReq.Data.(*config.CollectorConfig) + collector, err := NewCollector(server.GrpcReqCh, c.Url, c.DbName, c.TableDumpInterval) + if err == nil { + server.collector = collector + server.watchers[WATCHER_COLLECTOR] = collector + } + grpcReq.ResponseCh <- &GrpcResponse{ + ResponseErr: err, + } + close(grpcReq.ResponseCh) + case REQ_WATCHER_ADJ_RIB_IN: + pathList := make([]*table.Path, 0) + for _, peer := range server.neighborMap { + pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...) + } + + grpcReq.ResponseCh <- &GrpcResponse{} + close(grpcReq.ResponseCh) + server.notify2watchers(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList}) default: err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType) goto ERROR diff --git a/server/watcher.go b/server/watcher.go index d08a6ef3..490978f0 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -43,6 +43,7 @@ const ( WATCHER_MRT // UPDATE MSG WATCHER_BMP WATCHER_ZEBRA + WATCHER_COLLECTOR WATCHER_GRPC_BESTPATH WATCHER_GRPC_INCOMING ) @@ -55,6 +56,7 @@ const ( WATCHER_EVENT_STATE_CHANGE WATCHER_EVENT_BESTPATH_CHANGE WATCHER_EVENT_POST_POLICY_UPDATE_MSG + WATCHER_EVENT_ADJ_IN ) type watcherEvent interface { @@ -88,6 +90,10 @@ type watcherEventStateChangedMsg struct { timestamp time.Time } +type watcherEventAdjInMsg struct { + pathList []*table.Path +} + type watcher interface { notify(watcherEventType) chan watcherEvent restart(string) error |