diff options
Diffstat (limited to 'server/watcher.go')
-rw-r--r-- | server/watcher.go | 232 |
1 files changed, 74 insertions, 158 deletions
diff --git a/server/watcher.go b/server/watcher.go index 8c92fab3..4b4b700c 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -16,26 +16,18 @@ package server import ( - "bytes" + "fmt" + "net" + "sync" + "time" + log "github.com/Sirupsen/logrus" + "github.com/eapache/channels" "github.com/osrg/gobgp/packet/bgp" - "github.com/osrg/gobgp/packet/mrt" "github.com/osrg/gobgp/table" "gopkg.in/tomb.v2" - "net" - "os" - "time" ) -type broadcastWatcherMsg struct { - ch chan watcherEvent - event watcherEvent -} - -func (m *broadcastWatcherMsg) send() { - m.ch <- m.event -} - type watcherType uint8 const ( @@ -86,6 +78,7 @@ type watcherEventStateChangedMsg struct { sentOpen *bgp.BGPMessage recvOpen *bgp.BGPMessage state bgp.FSMState + adminState AdminState timestamp time.Time } @@ -104,172 +97,95 @@ type watcher interface { watchingEventTypes() []watcherEventType } -type mrtWatcher struct { - t tomb.Tomb - filename string - file *os.File - ch chan watcherEvent - interval uint64 +type watcherMsg struct { + typ watcherEventType + ev watcherEvent } -func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_UPDATE_MSG { - return w.ch - } - return nil +type watcherManager struct { + t tomb.Tomb + mu sync.RWMutex + m map[watcherType]watcher + ch *channels.InfiniteChannel } -func (w *mrtWatcher) stop() { - w.t.Kill(nil) +func (m *watcherManager) watching(typ watcherEventType) bool { + for _, w := range m.m { + for _, ev := range w.watchingEventTypes() { + if ev == typ { + return true + } + } + } + return false } -func (w *mrtWatcher) restart(filename string) error { - return nil +// this will be called from server's main goroutine. +// shouldn't block. +func (m *watcherManager) notify(typ watcherEventType, ev watcherEvent) { + m.ch.In() <- &watcherMsg{typ, ev} } -func (w *mrtWatcher) loop() error { - c := func() *time.Ticker { - if w.interval == 0 { - return &time.Ticker{} - } - return time.NewTicker(time.Second * time.Duration(w.interval)) - }() - - defer func() { - if w.file != nil { - w.file.Close() - } - if w.interval != 0 { - c.Stop() - } - }() - +func (m *watcherManager) loop() error { for { - serialize := func(ev watcherEvent) ([]byte, error) { - m := ev.(*watcherEventUpdateMsg) - subtype := mrt.MESSAGE_AS4 - mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil) - mp.BGPMessagePayload = m.payload - if m.fourBytesAs == false { - subtype = mrt.MESSAGE - } - bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp) - if err != nil { - log.WithFields(log.Fields{ - "Topic": "mrt", - "Data": m, - }).Warn(err) - return nil, err - } - return bm.Serialize() - } - - drain := func(ev watcherEvent) { - events := make([]watcherEvent, 0, 1+len(w.ch)) - if ev != nil { - events = append(events, ev) - } - - for len(w.ch) > 0 { - e := <-w.ch - events = append(events, e) - } - - w := func(buf []byte) { - if _, err := w.file.Write(buf); err == nil { - w.file.Sync() - } else { - log.WithFields(log.Fields{ - "Topic": "mrt", - "Error": err, - }).Warn(err) - } + select { + case i, ok := <-m.ch.Out(): + if !ok { + continue } - - var b bytes.Buffer - for _, e := range events { - buf, err := serialize(e) - if err != nil { - log.WithFields(log.Fields{ - "Topic": "mrt", - "Data": e, - }).Warn(err) - continue - } - b.Write(buf) - if b.Len() > 1*1000*1000 { - w(b.Bytes()) - b.Reset() + msg := i.(*watcherMsg) + m.mu.RLock() + for _, w := range m.m { + if ch := w.notify(msg.typ); ch != nil { + t := time.NewTimer(time.Second) + select { + case ch <- msg.ev: + case <-t.C: + log.WithFields(log.Fields{ + "Topic": "Watcher", + }).Warnf("notification to %s timeout expired") + } } } - if b.Len() > 0 { - w(b.Bytes()) - } - } - select { - case <-w.t.Dying(): - drain(nil) - return nil - case e := <-w.ch: - drain(e) - case <-c.C: - w.file.Close() - file, err := mrtFileOpen(w.filename, w.interval) - if err == nil { - w.file = file - } else { - log.Info("can't rotate mrt file", err) - } + m.mu.RUnlock() } } } -func (w *mrtWatcher) watchingEventTypes() []watcherEventType { - return []watcherEventType{WATCHER_EVENT_UPDATE_MSG} +func (m *watcherManager) watcher(typ watcherType) (watcher, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + w, y := m.m[typ] + return w, y } -func mrtFileOpen(filename string, interval uint64) (*os.File, error) { - realname := filename - if interval != 0 { - realname = time.Now().Format(filename) - } - - i := len(realname) - for i > 0 && os.IsPathSeparator(realname[i-1]) { - // skip trailing path separators - i-- - } - j := i - - for j > 0 && !os.IsPathSeparator(realname[j-1]) { - j-- - } - - if j > 0 { - if err := os.MkdirAll(realname[0:j-1], 0755); err != nil { - log.Warn(err) - return nil, err - } +func (m *watcherManager) addWatcher(typ watcherType, w watcher) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, y := m.m[typ]; y { + return fmt.Errorf("already exists %s watcher", typ) } + m.m[typ] = w + return nil +} - file, err := os.OpenFile(realname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) - if err != nil { - log.Warn(err) +func (m *watcherManager) delWatcher(typ watcherType) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, y := m.m[typ]; !y { + return fmt.Errorf("not found %s watcher", typ) } - return file, err + w := m.m[typ] + w.stop() + delete(m.m, typ) + return nil } -func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) { - file, err := mrtFileOpen(filename, interval) - if err != nil { - return nil, err - } - w := mrtWatcher{ - filename: filename, - file: file, - ch: make(chan watcherEvent, 1<<16), - interval: interval, +func newWatcherManager() *watcherManager { + m := &watcherManager{ + m: make(map[watcherType]watcher), + ch: channels.NewInfiniteChannel(), } - w.t.Go(w.loop) - return &w, nil + m.t.Go(m.loop) + return m } |