diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-11-08 10:44:02 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-11-08 10:44:02 +0900 |
commit | b48c5613dd62f99854bfa2d87b62838b4b0d14b7 (patch) | |
tree | 266f5d4b226b83193fb51f05ecea2378e6aa5ef1 | |
parent | d4e04188ce7a406642b1dadd038b98a4e02627f1 (diff) |
server: Add PeerTable API
go native API that corresponds to MonitorTable gRPC API.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | pkg/server/grpc_server.go | 60 | ||||
-rw-r--r-- | pkg/server/server.go | 83 |
2 files changed, 91 insertions, 52 deletions
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index b8a1f9e8..2b4fe02b 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -171,62 +171,18 @@ func (s *Server) ListPath(r *api.ListPathRequest, stream api.GobgpApi_ListPathSe } func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_MonitorTableServer) error { - if arg == nil { - return fmt.Errorf("invalid request") - } - w, err := func() (*watcher, error) { - switch arg.Type { - case api.Resource_GLOBAL: - return s.bgpServer.watch(watchBestPath(arg.Current)), nil - case api.Resource_ADJ_IN: - if arg.PostPolicy { - return s.bgpServer.watch(watchPostUpdate(arg.Current)), nil - } - return s.bgpServer.watch(watchUpdate(arg.Current)), nil - default: - return nil, fmt.Errorf("unsupported resource type: %v", arg.Type) - } - }() + tm, err := s.bgpServer.NewTableMonitor(arg) if err != nil { - return nil + return err } - return func() error { - defer func() { w.Stop() }() + defer tm.Close() - sendPath := func(pathList []*table.Path) error { - for _, path := range pathList { - f := bgp.AfiSafiToRouteFamily(uint16(arg.Family.Afi), uint8(arg.Family.Safi)) - if path == nil || (arg.Family != nil && f != path.GetRouteFamily()) { - continue - } - if err := stream.Send(&api.MonitorTableResponse{Path: toPathApi(path, nil)}); err != nil { - return err - } - } - return nil - } - - for ev := range w.Event() { - switch msg := ev.(type) { - case *watchEventBestPath: - if err := sendPath(func() []*table.Path { - if len(msg.MultiPathList) > 0 { - l := make([]*table.Path, 0) - for _, p := range msg.MultiPathList { - l = append(l, p...) - } - return l - } else { - return msg.PathList - } - }()); err != nil { - return err - } - case *watchEventUpdate: - if err := sendPath(msg.PathList); err != nil { - return err - } + for v := range tm.Inbox { + if err := stream.Send(&api.MonitorTableResponse{ + Path: v, + }); err != nil { + return err } } return nil diff --git a/pkg/server/server.go b/pkg/server/server.go index d756b29c..49065400 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -3340,6 +3340,89 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro }, false) } +type TableMonitor struct { + Inbox chan *api.Path + done chan interface{} + w *watcher +} + +func (tm *TableMonitor) Close() { + close(tm.done) +} + +func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, error) { + if r == nil { + return nil, fmt.Errorf("nil request") + } + w, err := func() (*watcher, error) { + switch r.Type { + case api.Resource_GLOBAL: + return s.watch(watchBestPath(r.Current)), nil + case api.Resource_ADJ_IN: + if r.PostPolicy { + return s.watch(watchPostUpdate(r.Current)), nil + } + return s.watch(watchUpdate(r.Current)), nil + default: + return nil, fmt.Errorf("unsupported resource type: %v", r.Type) + } + }() + if err != nil { + return nil, err + } + + tm := &TableMonitor{ + Inbox: make(chan *api.Path), + done: make(chan interface{}), + w: w, + } + go func() { + defer func() { + close(tm.Inbox) + tm.w.Stop() + }() + family := bgp.RouteFamily(0) + if r.Family != nil { + family = bgp.AfiSafiToRouteFamily(uint16(r.Family.Afi), uint8(r.Family.Safi)) + } + + for { + select { + case ev := <-tm.w.Event(): + var pl []*table.Path + switch msg := ev.(type) { + case *watchEventBestPath: + if len(msg.MultiPathList) > 0 { + l := make([]*table.Path, 0) + for _, p := range msg.MultiPathList { + l = append(l, p...) + } + pl = l + } else { + pl = msg.PathList + } + case *watchEventUpdate: + pl = msg.PathList + } + for _, path := range pl { + if path == nil || (r.Family != nil && family != path.GetRouteFamily()) { + continue + } + select { + case tm.Inbox <- toPathApi(path, nil): + case <-tm.done: + return + } + } + case <-tm.done: + return + } + } + }() + + return tm, nil +} + type PeerMonitor struct { Inbox chan *api.Peer done chan interface{} |