summaryrefslogtreecommitdiffhomepage
path: root/server/mrt.go
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2016-05-25 04:39:07 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-06-06 12:43:20 +0900
commitb63e1c1fc3c40b58ba798bbae4e122f0eedaf55d (patch)
tree48356ad79cdc1d2a9deede9650a37556d4a86adf /server/mrt.go
parentaca6fd6ad4409b4cb63682bff3c79fca8ca2800d (diff)
server: refactor monitor/watcher infra
have watcherManager to manage all watchers also merge grpc neighbor state monitoring handling to grpcWatcher Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/mrt.go')
-rw-r--r--server/mrt.go196
1 files changed, 196 insertions, 0 deletions
diff --git a/server/mrt.go b/server/mrt.go
new file mode 100644
index 00000000..ce4ac90b
--- /dev/null
+++ b/server/mrt.go
@@ -0,0 +1,196 @@
+// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "bytes"
+ "os"
+ "time"
+
+ log "github.com/Sirupsen/logrus"
+ "github.com/osrg/gobgp/packet/mrt"
+ "gopkg.in/tomb.v2"
+)
+
+type mrtWatcher struct {
+ t tomb.Tomb
+ filename string
+ file *os.File
+ ch chan watcherEvent
+ interval uint64
+}
+
+func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent {
+ if t == WATCHER_EVENT_UPDATE_MSG {
+ return w.ch
+ }
+ return nil
+}
+
+func (w *mrtWatcher) stop() {
+ w.t.Kill(nil)
+}
+
+func (w *mrtWatcher) restart(filename string) error {
+ return nil
+}
+
+func (w *mrtWatcher) loop() error {
+ c := func() *time.Ticker {
+ if w.interval == 0 {
+ return &time.Ticker{}
+ }
+ return time.NewTicker(time.Second * time.Duration(w.interval))
+ }()
+
+ defer func() {
+ if w.file != nil {
+ w.file.Close()
+ }
+ if w.interval != 0 {
+ c.Stop()
+ }
+ }()
+
+ for {
+ serialize := func(ev watcherEvent) ([]byte, error) {
+ m := ev.(*watcherEventUpdateMsg)
+ subtype := mrt.MESSAGE_AS4
+ mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil)
+ mp.BGPMessagePayload = m.payload
+ if m.fourBytesAs == false {
+ subtype = mrt.MESSAGE
+ }
+ bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": m,
+ }).Warn(err)
+ return nil, err
+ }
+ return bm.Serialize()
+ }
+
+ drain := func(ev watcherEvent) {
+ events := make([]watcherEvent, 0, 1+len(w.ch))
+ if ev != nil {
+ events = append(events, ev)
+ }
+
+ for len(w.ch) > 0 {
+ e := <-w.ch
+ events = append(events, e)
+ }
+
+ w := func(buf []byte) {
+ if _, err := w.file.Write(buf); err == nil {
+ w.file.Sync()
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Error": err,
+ }).Warn(err)
+ }
+ }
+
+ var b bytes.Buffer
+ for _, e := range events {
+ buf, err := serialize(e)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": e,
+ }).Warn(err)
+ continue
+ }
+ b.Write(buf)
+ if b.Len() > 1*1000*1000 {
+ w(b.Bytes())
+ b.Reset()
+ }
+ }
+ if b.Len() > 0 {
+ w(b.Bytes())
+ }
+ }
+ select {
+ case <-w.t.Dying():
+ drain(nil)
+ return nil
+ case e := <-w.ch:
+ drain(e)
+ case <-c.C:
+ w.file.Close()
+ file, err := mrtFileOpen(w.filename, w.interval)
+ if err == nil {
+ w.file = file
+ } else {
+ log.Info("can't rotate mrt file", err)
+ }
+ }
+ }
+}
+
+func (w *mrtWatcher) watchingEventTypes() []watcherEventType {
+ return []watcherEventType{WATCHER_EVENT_UPDATE_MSG}
+}
+
+func mrtFileOpen(filename string, interval uint64) (*os.File, error) {
+ realname := filename
+ if interval != 0 {
+ realname = time.Now().Format(filename)
+ }
+
+ i := len(realname)
+ for i > 0 && os.IsPathSeparator(realname[i-1]) {
+ // skip trailing path separators
+ i--
+ }
+ j := i
+
+ for j > 0 && !os.IsPathSeparator(realname[j-1]) {
+ j--
+ }
+
+ if j > 0 {
+ if err := os.MkdirAll(realname[0:j-1], 0755); err != nil {
+ log.Warn(err)
+ return nil, err
+ }
+ }
+
+ file, err := os.OpenFile(realname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ log.Warn(err)
+ }
+ return file, err
+}
+
+func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) {
+ file, err := mrtFileOpen(filename, interval)
+ if err != nil {
+ return nil, err
+ }
+ w := mrtWatcher{
+ filename: filename,
+ file: file,
+ ch: make(chan watcherEvent, 1<<16),
+ interval: interval,
+ }
+ w.t.Go(w.loop)
+ return &w, nil
+}