diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/server/grpc_server.go | 53 | ||||
-rw-r--r-- | pkg/server/server.go | 72 |
2 files changed, 42 insertions, 83 deletions
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index 86c756a9..0aac4cf0 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -166,41 +166,34 @@ func (s *server) ListPath(r *api.ListPathRequest, stream api.GobgpApi_ListPathSe } func (s *server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_MonitorTableServer) error { - tm, err := s.bgpServer.NewTableMonitor(arg) - if err != nil { - return err - } - return func() error { - defer tm.Close() - - for v := range tm.Inbox { - if err := stream.Send(&api.MonitorTableResponse{ - Path: v, - }); err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + var err error + s.bgpServer.MonitorTable(ctx, arg, func(p *api.Path) { + if err = stream.Send(&api.MonitorTableResponse{ + Path: p, + }); err != nil { + cancel() + return } - return nil - }() + }) + <-ctx.Done() + return err } func (s *server) MonitorPeer(arg *api.MonitorPeerRequest, stream api.GobgpApi_MonitorPeerServer) error { - pm, err := s.bgpServer.NewPeerMonitor(arg) - if err != nil { - return err - } - return func() error { - defer pm.Close() - - for v := range pm.Inbox { - if err := stream.Send(&api.MonitorPeerResponse{ - Peer: v, - }); err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + var err error + err = s.bgpServer.MonitorPeer(ctx, arg, func(p *api.Peer) { + if err = stream.Send(&api.MonitorPeerResponse{ + Peer: p, + }); err != nil { + fmt.Println("try to cancel") + cancel() + return } - return nil - }() + }) + <-ctx.Done() + return err } func (s *server) ResetPeer(ctx context.Context, r *api.ResetPeerRequest) (*empty.Empty, error) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 749750f9..4fd4aa25 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -3379,19 +3379,9 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro }, false) } -type TableMonitor struct { - Inbox chan *api.Path - done chan interface{} - w *watcher -} - -func (tm *TableMonitor) Close() { - close(tm.done) -} - -func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, error) { +func (s *BgpServer) MonitorTable(ctx context.Context, r *api.MonitorTableRequest, fn func(*api.Path)) error { if r == nil { - return nil, fmt.Errorf("nil request") + return fmt.Errorf("nil request") } w, err := func() (*watcher, error) { switch r.Type { @@ -3407,18 +3397,12 @@ func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, } }() if err != nil { - return nil, err + return err } - tm := &TableMonitor{ - Inbox: make(chan *api.Path), - done: make(chan interface{}), - w: w, - } go func() { defer func() { - close(tm.Inbox) - tm.w.Stop() + w.Stop() }() family := bgp.RouteFamily(0) if r.Family != nil { @@ -3427,7 +3411,7 @@ func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, for { select { - case ev := <-tm.w.Event(): + case ev := <-w.Event(): var pl []*table.Path switch msg := ev.(type) { case *watchEventBestPath: @@ -3448,47 +3432,33 @@ func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, continue } select { - case tm.Inbox <- toPathApi(path, nil): - case <-tm.done: + case <-ctx.Done(): return + default: + fn(toPathApi(path, nil)) } } - case <-tm.done: + case <-ctx.Done(): return } } }() - - return tm, nil -} - -type PeerMonitor struct { - Inbox chan *api.Peer - done chan interface{} - w *watcher -} - -func (pm *PeerMonitor) Close() { - close(pm.done) + return nil } -func (s *BgpServer) NewPeerMonitor(r *api.MonitorPeerRequest) (*PeerMonitor, error) { +func (s *BgpServer) MonitorPeer(ctx context.Context, r *api.MonitorPeerRequest, fn func(*api.Peer)) error { if r == nil { - return nil, fmt.Errorf("nil request") - } - pm := &PeerMonitor{ - Inbox: make(chan *api.Peer), - done: make(chan interface{}), - w: s.watch(watchPeerState(r.Current)), + return fmt.Errorf("nil request") } + go func() { + w := s.watch(watchPeerState(r.Current)) defer func() { - close(pm.Inbox) - pm.w.Stop() + w.Stop() }() for { select { - case m := <-pm.w.Event(): + case m := <-w.Event(): msg := m.(*watchEventPeerState) if len(r.Address) > 0 && r.Address != msg.PeerAddress.String() && r.Address != msg.PeerInterface { break @@ -3514,17 +3484,13 @@ func (s *BgpServer) NewPeerMonitor(r *api.MonitorPeerRequest) (*PeerMonitor, err RemotePort: uint32(msg.PeerPort), }, } - select { - case pm.Inbox <- p: - case <-pm.done: - return - } - case <-pm.done: + fn(p) + case <-ctx.Done(): return } } }() - return pm, nil + return nil } type watchEventType string |