summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/grpc_server.go')
-rw-r--r--pkg/server/grpc_server.go53
1 files changed, 23 insertions, 30 deletions
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go
index 86c756a9..0aac4cf0 100644
--- a/pkg/server/grpc_server.go
+++ b/pkg/server/grpc_server.go
@@ -166,41 +166,34 @@ func (s *server) ListPath(r *api.ListPathRequest, stream api.GobgpApi_ListPathSe
}
func (s *server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_MonitorTableServer) error {
- tm, err := s.bgpServer.NewTableMonitor(arg)
- if err != nil {
- return err
- }
- return func() error {
- defer tm.Close()
-
- for v := range tm.Inbox {
- if err := stream.Send(&api.MonitorTableResponse{
- Path: v,
- }); err != nil {
- return err
- }
+ ctx, cancel := context.WithCancel(context.Background())
+ var err error
+ s.bgpServer.MonitorTable(ctx, arg, func(p *api.Path) {
+ if err = stream.Send(&api.MonitorTableResponse{
+ Path: p,
+ }); err != nil {
+ cancel()
+ return
}
- return nil
- }()
+ })
+ <-ctx.Done()
+ return err
}
func (s *server) MonitorPeer(arg *api.MonitorPeerRequest, stream api.GobgpApi_MonitorPeerServer) error {
- pm, err := s.bgpServer.NewPeerMonitor(arg)
- if err != nil {
- return err
- }
- return func() error {
- defer pm.Close()
-
- for v := range pm.Inbox {
- if err := stream.Send(&api.MonitorPeerResponse{
- Peer: v,
- }); err != nil {
- return err
- }
+ ctx, cancel := context.WithCancel(context.Background())
+ var err error
+ err = s.bgpServer.MonitorPeer(ctx, arg, func(p *api.Peer) {
+ if err = stream.Send(&api.MonitorPeerResponse{
+ Peer: p,
+ }); err != nil {
+ fmt.Println("try to cancel")
+ cancel()
+ return
}
- return nil
- }()
+ })
+ <-ctx.Done()
+ return err
}
func (s *server) ResetPeer(ctx context.Context, r *api.ResetPeerRequest) (*empty.Empty, error) {