diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/watcher.go | 69 |
1 files changed, 47 insertions, 22 deletions
diff --git a/server/watcher.go b/server/watcher.go index ce86203d..4294d358 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -16,6 +16,7 @@ package server import ( + "bytes" log "github.com/Sirupsen/logrus" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" @@ -134,7 +135,7 @@ func (w *mrtWatcher) loop() error { }() for { - write := func(ev watcherEvent) { + serialize := func(ev watcherEvent) ([]byte, error) { m := ev.(*watcherEventUpdateMsg) subtype := bgp.MESSAGE_AS4 mp := bgp.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil) @@ -148,35 +149,59 @@ func (w *mrtWatcher) loop() error { "Topic": "mrt", "Data": m, }).Warn(err) - return - } - buf, err := bm.Serialize() - if err == nil { - if _, err = w.file.Write(buf); err == nil { - err = w.file.Sync() - } - } - if err != nil { - log.WithFields(log.Fields{ - "Topic": "mrt", - "Data": m, - }).Warn(err) + return nil, err } + return bm.Serialize() } - drain := func() { + drain := func(ev watcherEvent) { + events := make([]watcherEvent, 0, 1+len(w.ch)) + if ev != nil { + events = append(events, ev) + } + for len(w.ch) > 0 { - m := <-w.ch - write(m) + e := <-w.ch + events = append(events, e) + } + + w := func(buf []byte) { + if _, err := w.file.Write(buf); err == nil { + w.file.Sync() + } else { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Error": err, + }).Warn(err) + } } - } + var b bytes.Buffer + for _, e := range events { + buf, err := serialize(e) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Data": e, + }).Warn(err) + continue + } + b.Write(buf) + if b.Len() > 1*1000*1000 { + w(b.Bytes()) + b.Reset() + } + } + if b.Len() > 0 { + w(b.Bytes()) + } + } select { case <-w.t.Dying(): - drain() + drain(nil) return nil - case m := <-w.ch: - write(m) + case e := <-w.ch: + drain(e) case <-c.C: w.file.Close() file, err := mrtFileOpen(w.filename, w.interval) @@ -232,7 +257,7 @@ func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatche w := mrtWatcher{ filename: filename, file: file, - ch: make(chan watcherEvent), + ch: make(chan watcherEvent, 1<<16), interval: interval, } w.t.Go(w.loop) |