summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--gobgpd/main.go9
-rw-r--r--server/grpc_server.go16
-rw-r--r--server/mrt.go67
-rw-r--r--server/server.go86
4 files changed, 74 insertions, 104 deletions
diff --git a/gobgpd/main.go b/gobgpd/main.go
index ca90ece7..f886ea95 100644
--- a/gobgpd/main.go
+++ b/gobgpd/main.go
@@ -227,8 +227,13 @@ func main() {
log.Fatalf("failed to set bmp config: %s", err)
}
}
- if err := bgpServer.SetMrtConfig(newConfig.MrtDump); err != nil {
- log.Fatalf("failed to set mrt config: %s", err)
+ for _, c := range newConfig.MrtDump {
+ if len(c.FileName) == 0 {
+ continue
+ }
+ if err := bgpServer.EnableMrt(&c); err != nil {
+ log.Fatalf("failed to set mrt config: %s", err)
+ }
}
p := config.ConfigSetToRoutingPolicy(newConfig)
if err := bgpServer.UpdatePolicy(*p); err != nil {
diff --git a/server/grpc_server.go b/server/grpc_server.go
index b9eb48d3..86d1267b 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -612,19 +612,15 @@ func (s *Server) DeletePath(ctx context.Context, arg *api.DeletePathRequest) (*a
}
func (s *Server) EnableMrt(ctx context.Context, arg *api.EnableMrtRequest) (*api.EnableMrtResponse, error) {
- d, err := s.get(REQ_ENABLE_MRT, arg)
- if err != nil {
- return nil, err
- }
- return d.(*api.EnableMrtResponse), err
+ return &api.EnableMrtResponse{}, s.bgpServer.EnableMrt(&config.Mrt{
+ Interval: arg.Interval,
+ DumpType: config.IntToMrtTypeMap[int(arg.DumpType)],
+ FileName: arg.Filename,
+ })
}
func (s *Server) DisableMrt(ctx context.Context, arg *api.DisableMrtRequest) (*api.DisableMrtResponse, error) {
- d, err := s.get(REQ_DISABLE_MRT, arg)
- if err != nil {
- return nil, err
- }
- return d.(*api.DisableMrtResponse), err
+ return &api.DisableMrtResponse{}, s.bgpServer.DisableMrt()
}
func (s *Server) InjectMrt(stream api.GobgpApi_InjectMrtServer) error {
diff --git a/server/mrt.go b/server/mrt.go
index 639a2ceb..9c2673be 100644
--- a/server/mrt.go
+++ b/server/mrt.go
@@ -22,43 +22,37 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/osrg/gobgp/packet/mrt"
- "gopkg.in/tomb.v2"
)
-type mrtWatcher struct {
- t tomb.Tomb
+type mrtWriter struct {
+ dead chan struct{}
+ s *BgpServer
filename string
file *os.File
- ch chan watcherEvent
interval uint64
}
-func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_UPDATE_MSG {
- return w.ch
- }
- return nil
-}
-
-func (w *mrtWatcher) stop() {
- w.t.Kill(nil)
+func (m *mrtWriter) Stop() {
+ close(m.dead)
}
-func (w *mrtWatcher) loop() error {
+func (m *mrtWriter) loop() error {
+ w := m.s.Watch(WatchUpdate(false))
c := func() *time.Ticker {
- if w.interval == 0 {
+ if m.interval == 0 {
return &time.Ticker{}
}
- return time.NewTicker(time.Second * time.Duration(w.interval))
+ return time.NewTicker(time.Second * time.Duration(m.interval))
}()
defer func() {
- if w.file != nil {
- w.file.Close()
+ if m.file != nil {
+ m.file.Close()
}
- if w.interval != 0 {
+ if m.interval != 0 {
c.Stop()
}
+ w.Stop()
}()
for {
@@ -82,19 +76,18 @@ func (w *mrtWatcher) loop() error {
}
drain := func(ev watcherEvent) {
- events := make([]watcherEvent, 0, 1+len(w.ch))
+ events := make([]watcherEvent, 0, 1+len(w.Event()))
if ev != nil {
events = append(events, ev)
}
- for len(w.ch) > 0 {
- e := <-w.ch
- events = append(events, e)
+ for len(w.Event()) > 0 {
+ events = append(events, <-w.Event())
}
w := func(buf []byte) {
- if _, err := w.file.Write(buf); err == nil {
- w.file.Sync()
+ if _, err := m.file.Write(buf); err == nil {
+ m.file.Sync()
} else {
log.WithFields(log.Fields{
"Topic": "mrt",
@@ -124,16 +117,16 @@ func (w *mrtWatcher) loop() error {
}
}
select {
- case <-w.t.Dying():
+ case <-m.dead:
drain(nil)
return nil
- case e := <-w.ch:
+ case e := <-w.Event():
drain(e)
case <-c.C:
- w.file.Close()
- file, err := mrtFileOpen(w.filename, w.interval)
+ m.file.Close()
+ file, err := mrtFileOpen(m.filename, m.interval)
if err == nil {
- w.file = file
+ m.file = file
} else {
log.Info("can't rotate mrt file", err)
}
@@ -141,10 +134,6 @@ func (w *mrtWatcher) loop() error {
}
}
-func (w *mrtWatcher) watchingEventTypes() []watcherEventType {
- return []watcherEventType{WATCHER_EVENT_UPDATE_MSG}
-}
-
func mrtFileOpen(filename string, interval uint64) (*os.File, error) {
realname := filename
if interval != 0 {
@@ -176,17 +165,17 @@ func mrtFileOpen(filename string, interval uint64) (*os.File, error) {
return file, err
}
-func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) {
+func newMrtWriter(s *BgpServer, dumpType int, filename string, interval uint64) (*mrtWriter, error) {
file, err := mrtFileOpen(filename, interval)
if err != nil {
return nil, err
}
- w := mrtWatcher{
+ m := mrtWriter{
+ s: s,
filename: filename,
file: file,
- ch: make(chan watcherEvent, 1<<16),
interval: interval,
}
- w.t.Go(w.loop)
- return &w, nil
+ go m.loop()
+ return &m, nil
}
diff --git a/server/server.go b/server/server.go
index 46fb33a2..7e3943fe 100644
--- a/server/server.go
+++ b/server/server.go
@@ -107,6 +107,7 @@ type BgpServer struct {
watcherMap map[watchType][]*Watcher
zclient *zebraClient
bmpManager *bmpClientManager
+ mrt *mrtWriter
}
func NewBgpServer() *BgpServer {
@@ -917,27 +918,6 @@ func (s *BgpServer) DeleteBmp(c *config.BmpServerConfig) (err error) {
return err
}
-func (server *BgpServer) SetMrtConfig(c []config.Mrt) error {
- for _, s := range c {
- if s.FileName != "" {
- ch := make(chan *GrpcResponse)
- server.GrpcReqCh <- &GrpcRequest{
- RequestType: REQ_ENABLE_MRT,
- Data: &api.EnableMrtRequest{
- DumpType: int32(s.DumpType.ToInt()),
- Filename: s.FileName,
- Interval: s.Interval,
- },
- ResponseCh: ch,
- }
- if err := (<-ch).Err(); err != nil {
- return err
- }
- }
- }
- return nil
-}
-
func (server *BgpServer) Shutdown() {
server.shutdown = true
for _, p := range server.neighborMap {
@@ -1765,10 +1745,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
}
grpcReq.ResponseCh <- result
close(grpcReq.ResponseCh)
- case REQ_ENABLE_MRT:
- server.handleEnableMrtRequest(grpcReq)
- case REQ_DISABLE_MRT:
- server.handleDisableMrtRequest(grpcReq)
case REQ_VALIDATE_RIB:
server.handleValidateRib(grpcReq)
case REQ_INITIALIZE_RPKI:
@@ -2601,38 +2577,42 @@ func grpcDone(grpcReq *GrpcRequest, e error) {
close(grpcReq.ResponseCh)
}
-func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) {
- arg := grpcReq.Data.(*api.EnableMrtRequest)
- if _, y := server.watchers.watcher(WATCHER_MRT); y {
- grpcDone(grpcReq, fmt.Errorf("already enabled"))
- return
- }
- if arg.Interval != 0 && arg.Interval < 30 {
- log.Info("minimum mrt dump interval is 30 seconds")
- arg.Interval = 30
- }
- w, err := newMrtWatcher(arg.DumpType, arg.Filename, arg.Interval)
- if err == nil {
- server.watchers.addWatcher(WATCHER_MRT, w)
- }
- grpcReq.ResponseCh <- &GrpcResponse{
- ResponseErr: err,
- Data: &api.EnableMrtResponse{},
+func (s *BgpServer) EnableMrt(c *config.Mrt) (err error) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ s.mgmtCh <- func() {
+ defer close(ch)
+
+ if s.mrt != nil {
+ err = fmt.Errorf("already enabled")
+ } else {
+ interval := c.Interval
+
+ if interval != 0 && interval < 30 {
+ log.Info("minimum mrt dump interval is 30 seconds")
+ interval = 30
+ }
+ s.mrt, err = newMrtWriter(s, c.DumpType.ToInt(), c.FileName, interval)
+ }
}
- close(grpcReq.ResponseCh)
+ return err
}
-func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) {
- _, y := server.watchers.watcher(WATCHER_MRT)
- if !y {
- grpcDone(grpcReq, fmt.Errorf("not enabled yet"))
- return
- }
- server.watchers.delWatcher(WATCHER_MRT)
- grpcReq.ResponseCh <- &GrpcResponse{
- Data: &api.DisableMrtResponse{},
+func (s *BgpServer) DisableMrt() (err error) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ s.mgmtCh <- func() {
+ defer close(ch)
+
+ if s.mrt != nil {
+ s.mrt.Stop()
+ } else {
+ err = fmt.Errorf("not enabled")
+ }
}
- close(grpcReq.ResponseCh)
+ return err
}
func (server *BgpServer) handleValidateRib(grpcReq *GrpcRequest) {