diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/grpc_server.go | 16 | ||||
-rw-r--r-- | server/mrt.go | 67 | ||||
-rw-r--r-- | server/server.go | 86 |
3 files changed, 67 insertions, 102 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index b9eb48d3..86d1267b 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -612,19 +612,15 @@ func (s *Server) DeletePath(ctx context.Context, arg *api.DeletePathRequest) (*a } func (s *Server) EnableMrt(ctx context.Context, arg *api.EnableMrtRequest) (*api.EnableMrtResponse, error) { - d, err := s.get(REQ_ENABLE_MRT, arg) - if err != nil { - return nil, err - } - return d.(*api.EnableMrtResponse), err + return &api.EnableMrtResponse{}, s.bgpServer.EnableMrt(&config.Mrt{ + Interval: arg.Interval, + DumpType: config.IntToMrtTypeMap[int(arg.DumpType)], + FileName: arg.Filename, + }) } func (s *Server) DisableMrt(ctx context.Context, arg *api.DisableMrtRequest) (*api.DisableMrtResponse, error) { - d, err := s.get(REQ_DISABLE_MRT, arg) - if err != nil { - return nil, err - } - return d.(*api.DisableMrtResponse), err + return &api.DisableMrtResponse{}, s.bgpServer.DisableMrt() } func (s *Server) InjectMrt(stream api.GobgpApi_InjectMrtServer) error { diff --git a/server/mrt.go b/server/mrt.go index 639a2ceb..9c2673be 100644 --- a/server/mrt.go +++ b/server/mrt.go @@ -22,43 +22,37 @@ import ( log "github.com/Sirupsen/logrus" "github.com/osrg/gobgp/packet/mrt" - "gopkg.in/tomb.v2" ) -type mrtWatcher struct { - t tomb.Tomb +type mrtWriter struct { + dead chan struct{} + s *BgpServer filename string file *os.File - ch chan watcherEvent interval uint64 } -func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_UPDATE_MSG { - return w.ch - } - return nil -} - -func (w *mrtWatcher) stop() { - w.t.Kill(nil) +func (m *mrtWriter) Stop() { + close(m.dead) } -func (w *mrtWatcher) loop() error { +func (m *mrtWriter) loop() error { + w := m.s.Watch(WatchUpdate(false)) c := func() *time.Ticker { - if w.interval == 0 { + if m.interval == 0 { return &time.Ticker{} } - return time.NewTicker(time.Second * time.Duration(w.interval)) + return time.NewTicker(time.Second * time.Duration(m.interval)) }() defer func() { - if w.file != nil { - w.file.Close() + if m.file != nil { + m.file.Close() } - if w.interval != 0 { + if m.interval != 0 { c.Stop() } + w.Stop() }() for { @@ -82,19 +76,18 @@ func (w *mrtWatcher) loop() error { } drain := func(ev watcherEvent) { - events := make([]watcherEvent, 0, 1+len(w.ch)) + events := make([]watcherEvent, 0, 1+len(w.Event())) if ev != nil { events = append(events, ev) } - for len(w.ch) > 0 { - e := <-w.ch - events = append(events, e) + for len(w.Event()) > 0 { + events = append(events, <-w.Event()) } w := func(buf []byte) { - if _, err := w.file.Write(buf); err == nil { - w.file.Sync() + if _, err := m.file.Write(buf); err == nil { + m.file.Sync() } else { log.WithFields(log.Fields{ "Topic": "mrt", @@ -124,16 +117,16 @@ func (w *mrtWatcher) loop() error { } } select { - case <-w.t.Dying(): + case <-m.dead: drain(nil) return nil - case e := <-w.ch: + case e := <-w.Event(): drain(e) case <-c.C: - w.file.Close() - file, err := mrtFileOpen(w.filename, w.interval) + m.file.Close() + file, err := mrtFileOpen(m.filename, m.interval) if err == nil { - w.file = file + m.file = file } else { log.Info("can't rotate mrt file", err) } @@ -141,10 +134,6 @@ func (w *mrtWatcher) loop() error { } } -func (w *mrtWatcher) watchingEventTypes() []watcherEventType { - return []watcherEventType{WATCHER_EVENT_UPDATE_MSG} -} - func mrtFileOpen(filename string, interval uint64) (*os.File, error) { realname := filename if interval != 0 { @@ -176,17 +165,17 @@ func mrtFileOpen(filename string, interval uint64) (*os.File, error) { return file, err } -func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) { +func newMrtWriter(s *BgpServer, dumpType int, filename string, interval uint64) (*mrtWriter, error) { file, err := mrtFileOpen(filename, interval) if err != nil { return nil, err } - w := mrtWatcher{ + m := mrtWriter{ + s: s, filename: filename, file: file, - ch: make(chan watcherEvent, 1<<16), interval: interval, } - w.t.Go(w.loop) - return &w, nil + go m.loop() + return &m, nil } diff --git a/server/server.go b/server/server.go index 46fb33a2..7e3943fe 100644 --- a/server/server.go +++ b/server/server.go @@ -107,6 +107,7 @@ type BgpServer struct { watcherMap map[watchType][]*Watcher zclient *zebraClient bmpManager *bmpClientManager + mrt *mrtWriter } func NewBgpServer() *BgpServer { @@ -917,27 +918,6 @@ func (s *BgpServer) DeleteBmp(c *config.BmpServerConfig) (err error) { return err } -func (server *BgpServer) SetMrtConfig(c []config.Mrt) error { - for _, s := range c { - if s.FileName != "" { - ch := make(chan *GrpcResponse) - server.GrpcReqCh <- &GrpcRequest{ - RequestType: REQ_ENABLE_MRT, - Data: &api.EnableMrtRequest{ - DumpType: int32(s.DumpType.ToInt()), - Filename: s.FileName, - Interval: s.Interval, - }, - ResponseCh: ch, - } - if err := (<-ch).Err(); err != nil { - return err - } - } - } - return nil -} - func (server *BgpServer) Shutdown() { server.shutdown = true for _, p := range server.neighborMap { @@ -1765,10 +1745,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { } grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) - case REQ_ENABLE_MRT: - server.handleEnableMrtRequest(grpcReq) - case REQ_DISABLE_MRT: - server.handleDisableMrtRequest(grpcReq) case REQ_VALIDATE_RIB: server.handleValidateRib(grpcReq) case REQ_INITIALIZE_RPKI: @@ -2601,38 +2577,42 @@ func grpcDone(grpcReq *GrpcRequest, e error) { close(grpcReq.ResponseCh) } -func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) { - arg := grpcReq.Data.(*api.EnableMrtRequest) - if _, y := server.watchers.watcher(WATCHER_MRT); y { - grpcDone(grpcReq, fmt.Errorf("already enabled")) - return - } - if arg.Interval != 0 && arg.Interval < 30 { - log.Info("minimum mrt dump interval is 30 seconds") - arg.Interval = 30 - } - w, err := newMrtWatcher(arg.DumpType, arg.Filename, arg.Interval) - if err == nil { - server.watchers.addWatcher(WATCHER_MRT, w) - } - grpcReq.ResponseCh <- &GrpcResponse{ - ResponseErr: err, - Data: &api.EnableMrtResponse{}, +func (s *BgpServer) EnableMrt(c *config.Mrt) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + if s.mrt != nil { + err = fmt.Errorf("already enabled") + } else { + interval := c.Interval + + if interval != 0 && interval < 30 { + log.Info("minimum mrt dump interval is 30 seconds") + interval = 30 + } + s.mrt, err = newMrtWriter(s, c.DumpType.ToInt(), c.FileName, interval) + } } - close(grpcReq.ResponseCh) + return err } -func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) { - _, y := server.watchers.watcher(WATCHER_MRT) - if !y { - grpcDone(grpcReq, fmt.Errorf("not enabled yet")) - return - } - server.watchers.delWatcher(WATCHER_MRT) - grpcReq.ResponseCh <- &GrpcResponse{ - Data: &api.DisableMrtResponse{}, +func (s *BgpServer) DisableMrt() (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + if s.mrt != nil { + s.mrt.Stop() + } else { + err = fmt.Errorf("not enabled") + } } - close(grpcReq.ResponseCh) + return err } func (server *BgpServer) handleValidateRib(grpcReq *GrpcRequest) { |