summaryrefslogtreecommitdiffhomepage
path: root/server/watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/watcher.go')
-rw-r--r--server/watcher.go232
1 files changed, 74 insertions, 158 deletions
diff --git a/server/watcher.go b/server/watcher.go
index 8c92fab3..4b4b700c 100644
--- a/server/watcher.go
+++ b/server/watcher.go
@@ -16,26 +16,18 @@
package server
import (
- "bytes"
+ "fmt"
+ "net"
+ "sync"
+ "time"
+
log "github.com/Sirupsen/logrus"
+ "github.com/eapache/channels"
"github.com/osrg/gobgp/packet/bgp"
- "github.com/osrg/gobgp/packet/mrt"
"github.com/osrg/gobgp/table"
"gopkg.in/tomb.v2"
- "net"
- "os"
- "time"
)
-type broadcastWatcherMsg struct {
- ch chan watcherEvent
- event watcherEvent
-}
-
-func (m *broadcastWatcherMsg) send() {
- m.ch <- m.event
-}
-
type watcherType uint8
const (
@@ -86,6 +78,7 @@ type watcherEventStateChangedMsg struct {
sentOpen *bgp.BGPMessage
recvOpen *bgp.BGPMessage
state bgp.FSMState
+ adminState AdminState
timestamp time.Time
}
@@ -104,172 +97,95 @@ type watcher interface {
watchingEventTypes() []watcherEventType
}
-type mrtWatcher struct {
- t tomb.Tomb
- filename string
- file *os.File
- ch chan watcherEvent
- interval uint64
+type watcherMsg struct {
+ typ watcherEventType
+ ev watcherEvent
}
-func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_UPDATE_MSG {
- return w.ch
- }
- return nil
+type watcherManager struct {
+ t tomb.Tomb
+ mu sync.RWMutex
+ m map[watcherType]watcher
+ ch *channels.InfiniteChannel
}
-func (w *mrtWatcher) stop() {
- w.t.Kill(nil)
+func (m *watcherManager) watching(typ watcherEventType) bool {
+ for _, w := range m.m {
+ for _, ev := range w.watchingEventTypes() {
+ if ev == typ {
+ return true
+ }
+ }
+ }
+ return false
}
-func (w *mrtWatcher) restart(filename string) error {
- return nil
+// this will be called from server's main goroutine.
+// shouldn't block.
+func (m *watcherManager) notify(typ watcherEventType, ev watcherEvent) {
+ m.ch.In() <- &watcherMsg{typ, ev}
}
-func (w *mrtWatcher) loop() error {
- c := func() *time.Ticker {
- if w.interval == 0 {
- return &time.Ticker{}
- }
- return time.NewTicker(time.Second * time.Duration(w.interval))
- }()
-
- defer func() {
- if w.file != nil {
- w.file.Close()
- }
- if w.interval != 0 {
- c.Stop()
- }
- }()
-
+func (m *watcherManager) loop() error {
for {
- serialize := func(ev watcherEvent) ([]byte, error) {
- m := ev.(*watcherEventUpdateMsg)
- subtype := mrt.MESSAGE_AS4
- mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil)
- mp.BGPMessagePayload = m.payload
- if m.fourBytesAs == false {
- subtype = mrt.MESSAGE
- }
- bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp)
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Data": m,
- }).Warn(err)
- return nil, err
- }
- return bm.Serialize()
- }
-
- drain := func(ev watcherEvent) {
- events := make([]watcherEvent, 0, 1+len(w.ch))
- if ev != nil {
- events = append(events, ev)
- }
-
- for len(w.ch) > 0 {
- e := <-w.ch
- events = append(events, e)
- }
-
- w := func(buf []byte) {
- if _, err := w.file.Write(buf); err == nil {
- w.file.Sync()
- } else {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Error": err,
- }).Warn(err)
- }
+ select {
+ case i, ok := <-m.ch.Out():
+ if !ok {
+ continue
}
-
- var b bytes.Buffer
- for _, e := range events {
- buf, err := serialize(e)
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Data": e,
- }).Warn(err)
- continue
- }
- b.Write(buf)
- if b.Len() > 1*1000*1000 {
- w(b.Bytes())
- b.Reset()
+ msg := i.(*watcherMsg)
+ m.mu.RLock()
+ for _, w := range m.m {
+ if ch := w.notify(msg.typ); ch != nil {
+ t := time.NewTimer(time.Second)
+ select {
+ case ch <- msg.ev:
+ case <-t.C:
+ log.WithFields(log.Fields{
+ "Topic": "Watcher",
+ }).Warnf("notification to %s timeout expired")
+ }
}
}
- if b.Len() > 0 {
- w(b.Bytes())
- }
- }
- select {
- case <-w.t.Dying():
- drain(nil)
- return nil
- case e := <-w.ch:
- drain(e)
- case <-c.C:
- w.file.Close()
- file, err := mrtFileOpen(w.filename, w.interval)
- if err == nil {
- w.file = file
- } else {
- log.Info("can't rotate mrt file", err)
- }
+ m.mu.RUnlock()
}
}
}
-func (w *mrtWatcher) watchingEventTypes() []watcherEventType {
- return []watcherEventType{WATCHER_EVENT_UPDATE_MSG}
+func (m *watcherManager) watcher(typ watcherType) (watcher, bool) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ w, y := m.m[typ]
+ return w, y
}
-func mrtFileOpen(filename string, interval uint64) (*os.File, error) {
- realname := filename
- if interval != 0 {
- realname = time.Now().Format(filename)
- }
-
- i := len(realname)
- for i > 0 && os.IsPathSeparator(realname[i-1]) {
- // skip trailing path separators
- i--
- }
- j := i
-
- for j > 0 && !os.IsPathSeparator(realname[j-1]) {
- j--
- }
-
- if j > 0 {
- if err := os.MkdirAll(realname[0:j-1], 0755); err != nil {
- log.Warn(err)
- return nil, err
- }
+func (m *watcherManager) addWatcher(typ watcherType, w watcher) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if _, y := m.m[typ]; y {
+ return fmt.Errorf("already exists %s watcher", typ)
}
+ m.m[typ] = w
+ return nil
+}
- file, err := os.OpenFile(realname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
- if err != nil {
- log.Warn(err)
+func (m *watcherManager) delWatcher(typ watcherType) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if _, y := m.m[typ]; !y {
+ return fmt.Errorf("not found %s watcher", typ)
}
- return file, err
+ w := m.m[typ]
+ w.stop()
+ delete(m.m, typ)
+ return nil
}
-func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) {
- file, err := mrtFileOpen(filename, interval)
- if err != nil {
- return nil, err
- }
- w := mrtWatcher{
- filename: filename,
- file: file,
- ch: make(chan watcherEvent, 1<<16),
- interval: interval,
+func newWatcherManager() *watcherManager {
+ m := &watcherManager{
+ m: make(map[watcherType]watcher),
+ ch: channels.NewInfiniteChannel(),
}
- w.t.Go(w.loop)
- return &w, nil
+ m.t.Go(m.loop)
+ return m
}