summaryrefslogtreecommitdiffhomepage
path: root/server/mrt.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/mrt.go')
-rw-r--r--server/mrt.go226
1 files changed, 177 insertions, 49 deletions
diff --git a/server/mrt.go b/server/mrt.go
index 9241eabd..605d6fe0 100644
--- a/server/mrt.go
+++ b/server/mrt.go
@@ -17,19 +17,25 @@ package server
import (
"bytes"
+ "fmt"
"os"
"time"
log "github.com/Sirupsen/logrus"
+ "github.com/osrg/gobgp/config"
+ "github.com/osrg/gobgp/packet/bgp"
"github.com/osrg/gobgp/packet/mrt"
+ "github.com/osrg/gobgp/table"
)
type mrtWriter struct {
- dead chan struct{}
- s *BgpServer
- filename string
- file *os.File
- interval uint64
+ dead chan struct{}
+ s *BgpServer
+ filename string
+ file *os.File
+ rotationInterval uint64
+ dumpInterval uint64
+ dumpType config.MrtType
}
func (m *mrtWriter) Stop() {
@@ -37,43 +43,111 @@ func (m *mrtWriter) Stop() {
}
func (m *mrtWriter) loop() error {
- w := m.s.Watch(WatchUpdate(false))
- c := func() *time.Ticker {
- if m.interval == 0 {
+ ops := []WatchOption{}
+ switch m.dumpType {
+ case config.MRT_TYPE_UPDATES:
+ ops = append(ops, WatchUpdate(false))
+ case config.MRT_TYPE_TABLE:
+ }
+ w := m.s.Watch(ops...)
+ rotator := func() *time.Ticker {
+ if m.rotationInterval == 0 {
+ return &time.Ticker{}
+ }
+ return time.NewTicker(time.Second * time.Duration(m.rotationInterval))
+ }()
+ dump := func() *time.Ticker {
+ if m.dumpInterval == 0 {
return &time.Ticker{}
}
- return time.NewTicker(time.Second * time.Duration(m.interval))
+ return time.NewTicker(time.Second * time.Duration(m.dumpInterval))
}()
defer func() {
if m.file != nil {
m.file.Close()
}
- if m.interval != 0 {
- c.Stop()
+ if m.rotationInterval != 0 {
+ rotator.Stop()
+ }
+ if m.dumpInterval == 0 {
+ dump.Stop()
}
w.Stop()
}()
for {
- serialize := func(ev WatchEvent) ([]byte, error) {
- m := ev.(*WatchEventUpdate)
- subtype := mrt.MESSAGE_AS4
- mp := mrt.NewBGP4MPMessage(m.PeerAS, m.LocalAS, 0, m.PeerAddress.String(), m.LocalAddress.String(), m.FourBytesAs, nil)
- mp.BGPMessagePayload = m.Payload
- if m.FourBytesAs == false {
- subtype = mrt.MESSAGE
- }
- bm, err := mrt.NewMRTMessage(uint32(m.Timestamp.Unix()), mrt.BGP4MP, subtype, mp)
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Data": m,
- "Error": err,
- }).Warn("Failed to create MRT message in serialize()")
- return nil, err
+ serialize := func(ev WatchEvent) []*mrt.MRTMessage {
+ msg := make([]*mrt.MRTMessage, 0, 1)
+ switch m := ev.(type) {
+ case *WatchEventUpdate:
+ subtype := mrt.MESSAGE_AS4
+ mp := mrt.NewBGP4MPMessage(m.PeerAS, m.LocalAS, 0, m.PeerAddress.String(), m.LocalAddress.String(), m.FourBytesAs, nil)
+ mp.BGPMessagePayload = m.Payload
+ if m.FourBytesAs == false {
+ subtype = mrt.MESSAGE
+ }
+ if bm, err := mrt.NewMRTMessage(uint32(m.Timestamp.Unix()), mrt.BGP4MP, subtype, mp); err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": m,
+ "Error": err,
+ }).Warn("Failed to create MRT message in serialize()")
+ } else {
+ msg = append(msg, bm)
+ }
+ case *WatchEventTable:
+ t := uint32(time.Now().Unix())
+ peers := make([]*mrt.Peer, 0, len(m.Neighbor))
+ for _, pconf := range m.Neighbor {
+ peers = append(peers, mrt.NewPeer(pconf.State.Description, pconf.Config.NeighborAddress, pconf.Config.PeerAs, true))
+ }
+ if bm, err := mrt.NewMRTMessage(t, mrt.TABLE_DUMPv2, mrt.PEER_INDEX_TABLE, mrt.NewPeerIndexTable(m.RouterId, "", peers)); err != nil {
+ break
+ } else {
+ msg = append(msg, bm)
+ }
+
+ idx := func(p *table.Path) uint16 {
+ for i, pconf := range m.Neighbor {
+ if p.GetSource().Address.String() == pconf.Config.NeighborAddress {
+ return uint16(i)
+ }
+ }
+ return uint16(len(m.Neighbor))
+ }
+
+ subtype := func(p *table.Path) mrt.MRTSubTypeTableDumpv2 {
+ switch p.GetRouteFamily() {
+ case bgp.RF_IPv4_UC:
+ return mrt.RIB_IPV4_UNICAST
+ case bgp.RF_IPv4_MC:
+ return mrt.RIB_IPV4_MULTICAST
+ case bgp.RF_IPv6_UC:
+ return mrt.RIB_IPV6_UNICAST
+ case bgp.RF_IPv6_MC:
+ return mrt.RIB_IPV6_MULTICAST
+ }
+ return mrt.RIB_GENERIC
+ }
+
+ seq := uint32(0)
+ for _, pathList := range m.PathList {
+ entries := make([]*mrt.RibEntry, 0, len(pathList))
+ for _, path := range pathList {
+ if path.IsLocal() {
+ continue
+ }
+ entries = append(entries, mrt.NewRibEntry(idx(path), uint32(path.GetTimestamp().Unix()), path.GetPathAttrs()))
+ }
+ if len(entries) > 0 {
+ bm, _ := mrt.NewMRTMessage(t, mrt.TABLE_DUMPv2, subtype(pathList[0]), mrt.NewRib(seq, pathList[0].GetNlri(), entries))
+ msg = append(msg, bm)
+ seq++
+ }
+ }
}
- return bm.Serialize()
+ return msg
}
drain := func(ev WatchEvent) {
@@ -99,19 +173,20 @@ func (m *mrtWriter) loop() error {
var b bytes.Buffer
for _, e := range events {
- buf, err := serialize(e)
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Data": e,
- "Error": err,
- }).Warn("Failed to serialize event")
- continue
- }
- b.Write(buf)
- if b.Len() > 1*1000*1000 {
- w(b.Bytes())
- b.Reset()
+ for _, m := range serialize(e) {
+ if buf, err := m.Serialize(); err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": e,
+ "Error": err,
+ }).Warn("Failed to serialize event")
+ } else {
+ b.Write(buf)
+ if b.Len() > 1*1000*1000 {
+ w(b.Bytes())
+ b.Reset()
+ }
+ }
}
}
if b.Len() > 0 {
@@ -124,9 +199,9 @@ func (m *mrtWriter) loop() error {
return nil
case e := <-w.Event():
drain(e)
- case <-c.C:
+ case <-rotator.C:
m.file.Close()
- file, err := mrtFileOpen(m.filename, m.interval)
+ file, err := mrtFileOpen(m.filename, m.rotationInterval)
if err == nil {
m.file = file
} else {
@@ -135,6 +210,8 @@ func (m *mrtWriter) loop() error {
"Error": err,
}).Warn("can't rotate MRT file")
}
+ case <-dump.C:
+ w.Generate(WATCH_EVENT_TYPE_TABLE)
}
}
}
@@ -181,17 +258,68 @@ func mrtFileOpen(filename string, interval uint64) (*os.File, error) {
return file, err
}
-func newMrtWriter(s *BgpServer, dumpType int, filename string, interval uint64) (*mrtWriter, error) {
- file, err := mrtFileOpen(filename, interval)
+func newMrtWriter(s *BgpServer, dumpType config.MrtType, filename string, rInterval, dInterval uint64) (*mrtWriter, error) {
+ file, err := mrtFileOpen(filename, rInterval)
if err != nil {
return nil, err
}
m := mrtWriter{
- s: s,
- filename: filename,
- file: file,
- interval: interval,
+ dumpType: dumpType,
+ s: s,
+ filename: filename,
+ file: file,
+ rotationInterval: rInterval,
+ dumpInterval: dInterval,
}
go m.loop()
return &m, nil
}
+
+type mrtManager struct {
+ bgpServer *BgpServer
+ writer map[string]*mrtWriter
+}
+
+func (m *mrtManager) enable(c *config.MrtConfig) error {
+ if _, ok := m.writer[c.FileName]; ok {
+ return fmt.Errorf("%s already exists", c.FileName)
+ }
+
+ rInterval := c.RotationInterval
+ if rInterval != 0 && rInterval < 30 {
+ log.Info("minimum mrt dump interval is 30 seconds")
+ rInterval = 30
+ }
+ dInterval := c.DumpInterval
+ if c.DumpType == config.MRT_TYPE_TABLE {
+ if dInterval < 60 {
+ log.Info("minimum mrt dump interval is 30 seconds")
+ dInterval = 60
+ }
+ } else if c.DumpType == config.MRT_TYPE_UPDATES {
+ dInterval = 0
+ }
+
+ w, err := newMrtWriter(m.bgpServer, c.DumpType, c.FileName, rInterval, dInterval)
+ if err == nil {
+ m.writer[c.FileName] = w
+ }
+ return err
+}
+
+func (m *mrtManager) disable(c *config.MrtConfig) error {
+ if w, ok := m.writer[c.FileName]; !ok {
+ return fmt.Errorf("%s doesn't exists", c.FileName)
+ } else {
+ w.Stop()
+ delete(m.writer, c.FileName)
+ }
+ return nil
+}
+
+func newMrtManager(s *BgpServer) *mrtManager {
+ return &mrtManager{
+ bgpServer: s,
+ writer: make(map[string]*mrtWriter),
+ }
+}