From 20696d7e1cca8d49b2b59f73aad4be009d5293b0 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Fri, 22 Jul 2016 23:04:17 +0900 Subject: mrt uses the new Watch API Signed-off-by: FUJITA Tomonori --- server/server.go | 86 ++++++++++++++++++++++---------------------------------- 1 file changed, 33 insertions(+), 53 deletions(-) (limited to 'server/server.go') diff --git a/server/server.go b/server/server.go index 46fb33a2..7e3943fe 100644 --- a/server/server.go +++ b/server/server.go @@ -107,6 +107,7 @@ type BgpServer struct { watcherMap map[watchType][]*Watcher zclient *zebraClient bmpManager *bmpClientManager + mrt *mrtWriter } func NewBgpServer() *BgpServer { @@ -917,27 +918,6 @@ func (s *BgpServer) DeleteBmp(c *config.BmpServerConfig) (err error) { return err } -func (server *BgpServer) SetMrtConfig(c []config.Mrt) error { - for _, s := range c { - if s.FileName != "" { - ch := make(chan *GrpcResponse) - server.GrpcReqCh <- &GrpcRequest{ - RequestType: REQ_ENABLE_MRT, - Data: &api.EnableMrtRequest{ - DumpType: int32(s.DumpType.ToInt()), - Filename: s.FileName, - Interval: s.Interval, - }, - ResponseCh: ch, - } - if err := (<-ch).Err(); err != nil { - return err - } - } - } - return nil -} - func (server *BgpServer) Shutdown() { server.shutdown = true for _, p := range server.neighborMap { @@ -1765,10 +1745,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { } grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) - case REQ_ENABLE_MRT: - server.handleEnableMrtRequest(grpcReq) - case REQ_DISABLE_MRT: - server.handleDisableMrtRequest(grpcReq) case REQ_VALIDATE_RIB: server.handleValidateRib(grpcReq) case REQ_INITIALIZE_RPKI: @@ -2601,38 +2577,42 @@ func grpcDone(grpcReq *GrpcRequest, e error) { close(grpcReq.ResponseCh) } -func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) { - arg := grpcReq.Data.(*api.EnableMrtRequest) - if _, y := server.watchers.watcher(WATCHER_MRT); y { - grpcDone(grpcReq, fmt.Errorf("already enabled")) - return - } - if arg.Interval != 0 && arg.Interval < 30 { - log.Info("minimum mrt dump interval is 30 seconds") - arg.Interval = 30 - } - w, err := newMrtWatcher(arg.DumpType, arg.Filename, arg.Interval) - if err == nil { - server.watchers.addWatcher(WATCHER_MRT, w) - } - grpcReq.ResponseCh <- &GrpcResponse{ - ResponseErr: err, - Data: &api.EnableMrtResponse{}, +func (s *BgpServer) EnableMrt(c *config.Mrt) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + if s.mrt != nil { + err = fmt.Errorf("already enabled") + } else { + interval := c.Interval + + if interval != 0 && interval < 30 { + log.Info("minimum mrt dump interval is 30 seconds") + interval = 30 + } + s.mrt, err = newMrtWriter(s, c.DumpType.ToInt(), c.FileName, interval) + } } - close(grpcReq.ResponseCh) + return err } -func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) { - _, y := server.watchers.watcher(WATCHER_MRT) - if !y { - grpcDone(grpcReq, fmt.Errorf("not enabled yet")) - return - } - server.watchers.delWatcher(WATCHER_MRT) - grpcReq.ResponseCh <- &GrpcResponse{ - Data: &api.DisableMrtResponse{}, +func (s *BgpServer) DisableMrt() (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + if s.mrt != nil { + s.mrt.Stop() + } else { + err = fmt.Errorf("not enabled") + } } - close(grpcReq.ResponseCh) + return err } func (server *BgpServer) handleValidateRib(grpcReq *GrpcRequest) { -- cgit v1.2.3