summaryrefslogtreecommitdiffhomepage
path: root/server/watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/watcher.go')
-rw-r--r--server/watcher.go69
1 files changed, 47 insertions, 22 deletions
diff --git a/server/watcher.go b/server/watcher.go
index ce86203d..4294d358 100644
--- a/server/watcher.go
+++ b/server/watcher.go
@@ -16,6 +16,7 @@
package server
import (
+ "bytes"
log "github.com/Sirupsen/logrus"
"github.com/osrg/gobgp/packet"
"github.com/osrg/gobgp/table"
@@ -134,7 +135,7 @@ func (w *mrtWatcher) loop() error {
}()
for {
- write := func(ev watcherEvent) {
+ serialize := func(ev watcherEvent) ([]byte, error) {
m := ev.(*watcherEventUpdateMsg)
subtype := bgp.MESSAGE_AS4
mp := bgp.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil)
@@ -148,35 +149,59 @@ func (w *mrtWatcher) loop() error {
"Topic": "mrt",
"Data": m,
}).Warn(err)
- return
- }
- buf, err := bm.Serialize()
- if err == nil {
- if _, err = w.file.Write(buf); err == nil {
- err = w.file.Sync()
- }
- }
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Data": m,
- }).Warn(err)
+ return nil, err
}
+ return bm.Serialize()
}
- drain := func() {
+ drain := func(ev watcherEvent) {
+ events := make([]watcherEvent, 0, 1+len(w.ch))
+ if ev != nil {
+ events = append(events, ev)
+ }
+
for len(w.ch) > 0 {
- m := <-w.ch
- write(m)
+ e := <-w.ch
+ events = append(events, e)
+ }
+
+ w := func(buf []byte) {
+ if _, err := w.file.Write(buf); err == nil {
+ w.file.Sync()
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Error": err,
+ }).Warn(err)
+ }
}
- }
+ var b bytes.Buffer
+ for _, e := range events {
+ buf, err := serialize(e)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": e,
+ }).Warn(err)
+ continue
+ }
+ b.Write(buf)
+ if b.Len() > 1*1000*1000 {
+ w(b.Bytes())
+ b.Reset()
+ }
+ }
+ if b.Len() > 0 {
+ w(b.Bytes())
+ }
+ }
select {
case <-w.t.Dying():
- drain()
+ drain(nil)
return nil
- case m := <-w.ch:
- write(m)
+ case e := <-w.ch:
+ drain(e)
case <-c.C:
w.file.Close()
file, err := mrtFileOpen(w.filename, w.interval)
@@ -232,7 +257,7 @@ func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatche
w := mrtWatcher{
filename: filename,
file: file,
- ch: make(chan watcherEvent),
+ ch: make(chan watcherEvent, 1<<16),
interval: interval,
}
w.t.Go(w.loop)