summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/server/grpc_server.go60
-rw-r--r--pkg/server/server.go83
2 files changed, 91 insertions, 52 deletions
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go
index b8a1f9e8..2b4fe02b 100644
--- a/pkg/server/grpc_server.go
+++ b/pkg/server/grpc_server.go
@@ -171,62 +171,18 @@ func (s *Server) ListPath(r *api.ListPathRequest, stream api.GobgpApi_ListPathSe
}
func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_MonitorTableServer) error {
- if arg == nil {
- return fmt.Errorf("invalid request")
- }
- w, err := func() (*watcher, error) {
- switch arg.Type {
- case api.Resource_GLOBAL:
- return s.bgpServer.watch(watchBestPath(arg.Current)), nil
- case api.Resource_ADJ_IN:
- if arg.PostPolicy {
- return s.bgpServer.watch(watchPostUpdate(arg.Current)), nil
- }
- return s.bgpServer.watch(watchUpdate(arg.Current)), nil
- default:
- return nil, fmt.Errorf("unsupported resource type: %v", arg.Type)
- }
- }()
+ tm, err := s.bgpServer.NewTableMonitor(arg)
if err != nil {
- return nil
+ return err
}
-
return func() error {
- defer func() { w.Stop() }()
+ defer tm.Close()
- sendPath := func(pathList []*table.Path) error {
- for _, path := range pathList {
- f := bgp.AfiSafiToRouteFamily(uint16(arg.Family.Afi), uint8(arg.Family.Safi))
- if path == nil || (arg.Family != nil && f != path.GetRouteFamily()) {
- continue
- }
- if err := stream.Send(&api.MonitorTableResponse{Path: toPathApi(path, nil)}); err != nil {
- return err
- }
- }
- return nil
- }
-
- for ev := range w.Event() {
- switch msg := ev.(type) {
- case *watchEventBestPath:
- if err := sendPath(func() []*table.Path {
- if len(msg.MultiPathList) > 0 {
- l := make([]*table.Path, 0)
- for _, p := range msg.MultiPathList {
- l = append(l, p...)
- }
- return l
- } else {
- return msg.PathList
- }
- }()); err != nil {
- return err
- }
- case *watchEventUpdate:
- if err := sendPath(msg.PathList); err != nil {
- return err
- }
+ for v := range tm.Inbox {
+ if err := stream.Send(&api.MonitorTableResponse{
+ Path: v,
+ }); err != nil {
+ return err
}
}
return nil
diff --git a/pkg/server/server.go b/pkg/server/server.go
index d756b29c..49065400 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -3340,6 +3340,89 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro
}, false)
}
+type TableMonitor struct {
+ Inbox chan *api.Path
+ done chan interface{}
+ w *watcher
+}
+
+func (tm *TableMonitor) Close() {
+ close(tm.done)
+}
+
+func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, error) {
+ if r == nil {
+ return nil, fmt.Errorf("nil request")
+ }
+ w, err := func() (*watcher, error) {
+ switch r.Type {
+ case api.Resource_GLOBAL:
+ return s.watch(watchBestPath(r.Current)), nil
+ case api.Resource_ADJ_IN:
+ if r.PostPolicy {
+ return s.watch(watchPostUpdate(r.Current)), nil
+ }
+ return s.watch(watchUpdate(r.Current)), nil
+ default:
+ return nil, fmt.Errorf("unsupported resource type: %v", r.Type)
+ }
+ }()
+ if err != nil {
+ return nil, err
+ }
+
+ tm := &TableMonitor{
+ Inbox: make(chan *api.Path),
+ done: make(chan interface{}),
+ w: w,
+ }
+ go func() {
+ defer func() {
+ close(tm.Inbox)
+ tm.w.Stop()
+ }()
+ family := bgp.RouteFamily(0)
+ if r.Family != nil {
+ family = bgp.AfiSafiToRouteFamily(uint16(r.Family.Afi), uint8(r.Family.Safi))
+ }
+
+ for {
+ select {
+ case ev := <-tm.w.Event():
+ var pl []*table.Path
+ switch msg := ev.(type) {
+ case *watchEventBestPath:
+ if len(msg.MultiPathList) > 0 {
+ l := make([]*table.Path, 0)
+ for _, p := range msg.MultiPathList {
+ l = append(l, p...)
+ }
+ pl = l
+ } else {
+ pl = msg.PathList
+ }
+ case *watchEventUpdate:
+ pl = msg.PathList
+ }
+ for _, path := range pl {
+ if path == nil || (r.Family != nil && family != path.GetRouteFamily()) {
+ continue
+ }
+ select {
+ case tm.Inbox <- toPathApi(path, nil):
+ case <-tm.done:
+ return
+ }
+ }
+ case <-tm.done:
+ return
+ }
+ }
+ }()
+
+ return tm, nil
+}
+
type PeerMonitor struct {
Inbox chan *api.Peer
done chan interface{}