summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/mrt.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/mrt.go')
-rw-r--r--pkg/server/mrt.go409
1 files changed, 409 insertions, 0 deletions
diff --git a/pkg/server/mrt.go b/pkg/server/mrt.go
new file mode 100644
index 00000000..c654cb78
--- /dev/null
+++ b/pkg/server/mrt.go
@@ -0,0 +1,409 @@
+// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/osrg/gobgp/internal/pkg/config"
+ "github.com/osrg/gobgp/internal/pkg/table"
+ "github.com/osrg/gobgp/pkg/packet/bgp"
+ "github.com/osrg/gobgp/pkg/packet/mrt"
+
+ log "github.com/sirupsen/logrus"
+)
+
+const (
+ MIN_ROTATION_INTERVAL = 60
+ MIN_DUMP_INTERVAL = 60
+)
+
+type mrtWriter struct {
+ dead chan struct{}
+ s *BgpServer
+ c *config.MrtConfig
+ file *os.File
+ rotationInterval uint64
+ dumpInterval uint64
+}
+
+func (m *mrtWriter) Stop() {
+ close(m.dead)
+}
+
+func (m *mrtWriter) loop() error {
+ ops := []WatchOption{}
+ switch m.c.DumpType {
+ case config.MRT_TYPE_UPDATES:
+ ops = append(ops, WatchUpdate(false))
+ case config.MRT_TYPE_TABLE:
+ if len(m.c.TableName) > 0 {
+ ops = append(ops, WatchTableName(m.c.TableName))
+ }
+ }
+ w := m.s.Watch(ops...)
+ rotator := func() *time.Ticker {
+ if m.rotationInterval == 0 {
+ return &time.Ticker{}
+ }
+ return time.NewTicker(time.Second * time.Duration(m.rotationInterval))
+ }()
+ dump := func() *time.Ticker {
+ if m.dumpInterval == 0 {
+ return &time.Ticker{}
+ }
+ return time.NewTicker(time.Second * time.Duration(m.dumpInterval))
+ }()
+
+ defer func() {
+ if m.file != nil {
+ m.file.Close()
+ }
+ if m.rotationInterval != 0 {
+ rotator.Stop()
+ }
+ if m.dumpInterval != 0 {
+ dump.Stop()
+ }
+ w.Stop()
+ }()
+
+ for {
+ serialize := func(ev WatchEvent) []*mrt.MRTMessage {
+ msg := make([]*mrt.MRTMessage, 0, 1)
+ switch e := ev.(type) {
+ case *WatchEventUpdate:
+ if e.Init {
+ return nil
+ }
+ mp := mrt.NewBGP4MPMessage(e.PeerAS, e.LocalAS, 0, e.PeerAddress.String(), e.LocalAddress.String(), e.FourBytesAs, nil)
+ mp.BGPMessagePayload = e.Payload
+ isAddPath := e.Neighbor.IsAddPathReceiveEnabled(e.PathList[0].GetRouteFamily())
+ subtype := mrt.MESSAGE
+ switch {
+ case isAddPath && e.FourBytesAs:
+ subtype = mrt.MESSAGE_AS4_ADDPATH
+ case isAddPath:
+ subtype = mrt.MESSAGE_ADDPATH
+ case e.FourBytesAs:
+ subtype = mrt.MESSAGE_AS4
+ }
+ if bm, err := mrt.NewMRTMessage(uint32(e.Timestamp.Unix()), mrt.BGP4MP, subtype, mp); err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": e,
+ "Error": err,
+ }).Warnf("Failed to create MRT BGP4MP message (subtype %d)", subtype)
+ } else {
+ msg = append(msg, bm)
+ }
+ case *WatchEventTable:
+ t := uint32(time.Now().Unix())
+
+ peers := make([]*mrt.Peer, 1, len(e.Neighbor)+1)
+ // Adding dummy Peer record for locally generated routes
+ peers[0] = mrt.NewPeer("0.0.0.0", "0.0.0.0", 0, true)
+ neighborMap := make(map[string]*config.Neighbor)
+ for _, pconf := range e.Neighbor {
+ peers = append(peers, mrt.NewPeer(pconf.State.RemoteRouterId, pconf.State.NeighborAddress, pconf.Config.PeerAs, true))
+ neighborMap[pconf.State.NeighborAddress] = pconf
+ }
+
+ if bm, err := mrt.NewMRTMessage(t, mrt.TABLE_DUMPv2, mrt.PEER_INDEX_TABLE, mrt.NewPeerIndexTable(e.RouterId, "", peers)); err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": e,
+ "Error": err,
+ }).Warnf("Failed to create MRT TABLE_DUMPv2 message (subtype %d)", mrt.PEER_INDEX_TABLE)
+ break
+ } else {
+ msg = append(msg, bm)
+ }
+
+ idx := func(p *table.Path) uint16 {
+ for i, pconf := range e.Neighbor {
+ if p.GetSource().Address.String() == pconf.State.NeighborAddress {
+ return uint16(i)
+ }
+ }
+ return uint16(len(e.Neighbor))
+ }
+
+ subtype := func(p *table.Path, isAddPath bool) mrt.MRTSubTypeTableDumpv2 {
+ t := mrt.RIB_GENERIC
+ switch p.GetRouteFamily() {
+ case bgp.RF_IPv4_UC:
+ t = mrt.RIB_IPV4_UNICAST
+ case bgp.RF_IPv4_MC:
+ t = mrt.RIB_IPV4_MULTICAST
+ case bgp.RF_IPv6_UC:
+ t = mrt.RIB_IPV6_UNICAST
+ case bgp.RF_IPv6_MC:
+ t = mrt.RIB_IPV6_MULTICAST
+ }
+ if isAddPath {
+ // Shift non-additional-path version to *_ADDPATH
+ t += 6
+ }
+ return t
+ }
+
+ seq := uint32(0)
+ appendTableDumpMsg := func(path *table.Path, entries []*mrt.RibEntry, isAddPath bool) {
+ st := subtype(path, isAddPath)
+ if bm, err := mrt.NewMRTMessage(t, mrt.TABLE_DUMPv2, st, mrt.NewRib(seq, path.GetNlri(), entries)); err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": e,
+ "Error": err,
+ }).Warnf("Failed to create MRT TABLE_DUMPv2 message (subtype %d)", st)
+ } else {
+ msg = append(msg, bm)
+ seq++
+ }
+ }
+ for _, pathList := range e.PathList {
+ entries := make([]*mrt.RibEntry, 0, len(pathList))
+ entriesAddPath := make([]*mrt.RibEntry, 0, len(pathList))
+ for _, path := range pathList {
+ isAddPath := false
+ if path.IsLocal() {
+ isAddPath = true
+ } else if neighbor, ok := neighborMap[path.GetSource().Address.String()]; ok {
+ isAddPath = neighbor.IsAddPathReceiveEnabled(path.GetRouteFamily())
+ }
+ if !isAddPath {
+ entries = append(entries, mrt.NewRibEntry(idx(path), uint32(path.GetTimestamp().Unix()), 0, path.GetPathAttrs(), false))
+ } else {
+ entriesAddPath = append(entriesAddPath, mrt.NewRibEntry(idx(path), uint32(path.GetTimestamp().Unix()), path.GetNlri().PathIdentifier(), path.GetPathAttrs(), true))
+ }
+ }
+ if len(entries) > 0 {
+ appendTableDumpMsg(pathList[0], entries, false)
+ }
+ if len(entriesAddPath) > 0 {
+ appendTableDumpMsg(pathList[0], entriesAddPath, true)
+ }
+ }
+ }
+ return msg
+ }
+
+ drain := func(ev WatchEvent) {
+ events := make([]WatchEvent, 0, 1+len(w.Event()))
+ if ev != nil {
+ events = append(events, ev)
+ }
+
+ for len(w.Event()) > 0 {
+ events = append(events, <-w.Event())
+ }
+
+ w := func(buf []byte) {
+ if _, err := m.file.Write(buf); err == nil {
+ m.file.Sync()
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Error": err,
+ }).Warn("Can't write to destination MRT file")
+ }
+ }
+
+ var b bytes.Buffer
+ for _, e := range events {
+ for _, m := range serialize(e) {
+ if buf, err := m.Serialize(); err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": e,
+ "Error": err,
+ }).Warn("Failed to serialize event")
+ } else {
+ b.Write(buf)
+ if b.Len() > 1*1000*1000 {
+ w(b.Bytes())
+ b.Reset()
+ }
+ }
+ }
+ }
+ if b.Len() > 0 {
+ w(b.Bytes())
+ }
+ }
+ rotate := func() {
+ m.file.Close()
+ file, err := mrtFileOpen(m.c.FileName, m.rotationInterval)
+ if err == nil {
+ m.file = file
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Error": err,
+ }).Warn("can't rotate MRT file")
+ }
+ }
+
+ select {
+ case <-m.dead:
+ drain(nil)
+ return nil
+ case e := <-w.Event():
+ drain(e)
+ if m.c.DumpType == config.MRT_TYPE_TABLE && m.rotationInterval != 0 {
+ rotate()
+ }
+ case <-rotator.C:
+ if m.c.DumpType == config.MRT_TYPE_UPDATES {
+ rotate()
+ } else {
+ w.Generate(WATCH_EVENT_TYPE_TABLE)
+ }
+ case <-dump.C:
+ w.Generate(WATCH_EVENT_TYPE_TABLE)
+ }
+ }
+}
+
+func mrtFileOpen(filename string, interval uint64) (*os.File, error) {
+ realname := filename
+ if interval != 0 {
+ realname = time.Now().Format(filename)
+ }
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Filename": realname,
+ "Dump Interval": interval,
+ }).Debug("Setting new MRT destination file")
+
+ i := len(realname)
+ for i > 0 && os.IsPathSeparator(realname[i-1]) {
+ // skip trailing path separators
+ i--
+ }
+ j := i
+
+ for j > 0 && !os.IsPathSeparator(realname[j-1]) {
+ j--
+ }
+
+ if j > 0 {
+ if err := os.MkdirAll(realname[0:j-1], 0755); err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Error": err,
+ }).Warn("can't create MRT destination directory")
+ return nil, err
+ }
+ }
+
+ file, err := os.OpenFile(realname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Error": err,
+ }).Warn("can't create MRT destination file")
+ }
+ return file, err
+}
+
+func newMrtWriter(s *BgpServer, c *config.MrtConfig, rInterval, dInterval uint64) (*mrtWriter, error) {
+ file, err := mrtFileOpen(c.FileName, rInterval)
+ if err != nil {
+ return nil, err
+ }
+ m := mrtWriter{
+ s: s,
+ c: c,
+ file: file,
+ rotationInterval: rInterval,
+ dumpInterval: dInterval,
+ }
+ go m.loop()
+ return &m, nil
+}
+
+type mrtManager struct {
+ bgpServer *BgpServer
+ writer map[string]*mrtWriter
+}
+
+func (m *mrtManager) enable(c *config.MrtConfig) error {
+ if _, ok := m.writer[c.FileName]; ok {
+ return fmt.Errorf("%s already exists", c.FileName)
+ }
+
+ rInterval := c.RotationInterval
+ dInterval := c.DumpInterval
+
+ setRotationMin := func() {
+ if rInterval < MIN_ROTATION_INTERVAL {
+ log.WithFields(log.Fields{
+ "Topic": "MRT",
+ }).Infof("minimum mrt rotation interval is %d seconds", MIN_ROTATION_INTERVAL)
+ rInterval = MIN_ROTATION_INTERVAL
+ }
+ }
+
+ if c.DumpType == config.MRT_TYPE_TABLE {
+ if rInterval == 0 {
+ if dInterval < MIN_DUMP_INTERVAL {
+ log.WithFields(log.Fields{
+ "Topic": "MRT",
+ }).Infof("minimum mrt dump interval is %d seconds", MIN_DUMP_INTERVAL)
+ dInterval = MIN_DUMP_INTERVAL
+ }
+ } else if dInterval == 0 {
+ setRotationMin()
+ } else {
+ return fmt.Errorf("can't specify both intervals in the table dump type")
+ }
+ } else if c.DumpType == config.MRT_TYPE_UPDATES {
+ // ignore the dump interval
+ dInterval = 0
+ if len(c.TableName) > 0 {
+ return fmt.Errorf("can't specify the table name with the update dump type")
+ }
+ setRotationMin()
+ }
+
+ w, err := newMrtWriter(m.bgpServer, c, rInterval, dInterval)
+ if err == nil {
+ m.writer[c.FileName] = w
+ }
+ return err
+}
+
+func (m *mrtManager) disable(c *config.MrtConfig) error {
+ w, ok := m.writer[c.FileName]
+ if !ok {
+ return fmt.Errorf("%s doesn't exists", c.FileName)
+ }
+ w.Stop()
+ delete(m.writer, c.FileName)
+ return nil
+}
+
+func newMrtManager(s *BgpServer) *mrtManager {
+ return &mrtManager{
+ bgpServer: s,
+ writer: make(map[string]*mrtWriter),
+ }
+}