diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2016-05-25 04:39:07 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-06-06 12:43:20 +0900 |
commit | b63e1c1fc3c40b58ba798bbae4e122f0eedaf55d (patch) | |
tree | 48356ad79cdc1d2a9deede9650a37556d4a86adf /server/mrt.go | |
parent | aca6fd6ad4409b4cb63682bff3c79fca8ca2800d (diff) |
server: refactor monitor/watcher infra
have watcherManager to manage all watchers
also merge grpc neighbor state monitoring handling to grpcWatcher
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/mrt.go')
-rw-r--r-- | server/mrt.go | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/server/mrt.go b/server/mrt.go new file mode 100644 index 00000000..ce4ac90b --- /dev/null +++ b/server/mrt.go @@ -0,0 +1,196 @@ +// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "bytes" + "os" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/osrg/gobgp/packet/mrt" + "gopkg.in/tomb.v2" +) + +type mrtWatcher struct { + t tomb.Tomb + filename string + file *os.File + ch chan watcherEvent + interval uint64 +} + +func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_UPDATE_MSG { + return w.ch + } + return nil +} + +func (w *mrtWatcher) stop() { + w.t.Kill(nil) +} + +func (w *mrtWatcher) restart(filename string) error { + return nil +} + +func (w *mrtWatcher) loop() error { + c := func() *time.Ticker { + if w.interval == 0 { + return &time.Ticker{} + } + return time.NewTicker(time.Second * time.Duration(w.interval)) + }() + + defer func() { + if w.file != nil { + w.file.Close() + } + if w.interval != 0 { + c.Stop() + } + }() + + for { + serialize := func(ev watcherEvent) ([]byte, error) { + m := ev.(*watcherEventUpdateMsg) + subtype := mrt.MESSAGE_AS4 + mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil) + mp.BGPMessagePayload = m.payload + if m.fourBytesAs == false { + subtype = mrt.MESSAGE + } + bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Data": m, + }).Warn(err) + return nil, err + } + return bm.Serialize() + } + + drain := func(ev watcherEvent) { + events := make([]watcherEvent, 0, 1+len(w.ch)) + if ev != nil { + events = append(events, ev) + } + + for len(w.ch) > 0 { + e := <-w.ch + events = append(events, e) + } + + w := func(buf []byte) { + if _, err := w.file.Write(buf); err == nil { + w.file.Sync() + } else { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Error": err, + }).Warn(err) + } + } + + var b bytes.Buffer + for _, e := range events { + buf, err := serialize(e) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Data": e, + }).Warn(err) + continue + } + b.Write(buf) + if b.Len() > 1*1000*1000 { + w(b.Bytes()) + b.Reset() + } + } + if b.Len() > 0 { + w(b.Bytes()) + } + } + select { + case <-w.t.Dying(): + drain(nil) + return nil + case e := <-w.ch: + drain(e) + case <-c.C: + w.file.Close() + file, err := mrtFileOpen(w.filename, w.interval) + if err == nil { + w.file = file + } else { + log.Info("can't rotate mrt file", err) + } + } + } +} + +func (w *mrtWatcher) watchingEventTypes() []watcherEventType { + return []watcherEventType{WATCHER_EVENT_UPDATE_MSG} +} + +func mrtFileOpen(filename string, interval uint64) (*os.File, error) { + realname := filename + if interval != 0 { + realname = time.Now().Format(filename) + } + + i := len(realname) + for i > 0 && os.IsPathSeparator(realname[i-1]) { + // skip trailing path separators + i-- + } + j := i + + for j > 0 && !os.IsPathSeparator(realname[j-1]) { + j-- + } + + if j > 0 { + if err := os.MkdirAll(realname[0:j-1], 0755); err != nil { + log.Warn(err) + return nil, err + } + } + + file, err := os.OpenFile(realname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + log.Warn(err) + } + return file, err +} + +func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) { + file, err := mrtFileOpen(filename, interval) + if err != nil { + return nil, err + } + w := mrtWatcher{ + filename: filename, + file: file, + ch: make(chan watcherEvent, 1<<16), + interval: interval, + } + w.t.Go(w.loop) + return &w, nil +} |