diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-11-12 09:25:56 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-11-12 09:30:20 +0900 |
commit | 99556f9cfc2df561dcf7d91c147a129449635224 (patch) | |
tree | 7331d768c1cee74ab93041717abd03f9de9702e7 | |
parent | 289fc39deb9cfa4ee678ff902d51f84c6c828136 (diff) |
server: change monitor API design
https://github.com/osrg/gobgp/issues/1763#issuecomment-437594975
Follow Chris's proposal; consistent with the rest of the APIs.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | docs/sources/lib.md | 9 | ||||
-rw-r--r-- | pkg/server/grpc_server.go | 53 | ||||
-rw-r--r-- | pkg/server/server.go | 72 |
3 files changed, 43 insertions, 91 deletions
diff --git a/docs/sources/lib.md b/docs/sources/lib.md index c76133f9..ba481797 100644 --- a/docs/sources/lib.md +++ b/docs/sources/lib.md @@ -39,15 +39,8 @@ func main() { } // monitor the change of the peer state - if pm, err := s.NewPeerMonitor(&api.MonitorPeerRequest{}); err != nil { + if err := s.MonitorPeer(context.Background(), &api.MonitorPeerRequest{}, func(p *api.Peer){log.Info(p)}); err != nil { log.Fatal(err) - } else { - defer pm.Close() - go func() { - for v := range pm.Inbox { - log.Info(v) - } - }() } // neighbor configuration diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index 86c756a9..0aac4cf0 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -166,41 +166,34 @@ func (s *server) ListPath(r *api.ListPathRequest, stream api.GobgpApi_ListPathSe } func (s *server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_MonitorTableServer) error { - tm, err := s.bgpServer.NewTableMonitor(arg) - if err != nil { - return err - } - return func() error { - defer tm.Close() - - for v := range tm.Inbox { - if err := stream.Send(&api.MonitorTableResponse{ - Path: v, - }); err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + var err error + s.bgpServer.MonitorTable(ctx, arg, func(p *api.Path) { + if err = stream.Send(&api.MonitorTableResponse{ + Path: p, + }); err != nil { + cancel() + return } - return nil - }() + }) + <-ctx.Done() + return err } func (s *server) MonitorPeer(arg *api.MonitorPeerRequest, stream api.GobgpApi_MonitorPeerServer) error { - pm, err := s.bgpServer.NewPeerMonitor(arg) - if err != nil { - return err - } - return func() error { - defer pm.Close() - - for v := range pm.Inbox { - if err := stream.Send(&api.MonitorPeerResponse{ - Peer: v, - }); err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + var err error + err = s.bgpServer.MonitorPeer(ctx, arg, func(p *api.Peer) { + if err = stream.Send(&api.MonitorPeerResponse{ + Peer: p, + }); err != nil { + fmt.Println("try to cancel") + cancel() + return } - return nil - }() + }) + <-ctx.Done() + return err } func (s *server) ResetPeer(ctx context.Context, r *api.ResetPeerRequest) (*empty.Empty, error) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 749750f9..4fd4aa25 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -3379,19 +3379,9 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro }, false) } -type TableMonitor struct { - Inbox chan *api.Path - done chan interface{} - w *watcher -} - -func (tm *TableMonitor) Close() { - close(tm.done) -} - -func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, error) { +func (s *BgpServer) MonitorTable(ctx context.Context, r *api.MonitorTableRequest, fn func(*api.Path)) error { if r == nil { - return nil, fmt.Errorf("nil request") + return fmt.Errorf("nil request") } w, err := func() (*watcher, error) { switch r.Type { @@ -3407,18 +3397,12 @@ func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, } }() if err != nil { - return nil, err + return err } - tm := &TableMonitor{ - Inbox: make(chan *api.Path), - done: make(chan interface{}), - w: w, - } go func() { defer func() { - close(tm.Inbox) - tm.w.Stop() + w.Stop() }() family := bgp.RouteFamily(0) if r.Family != nil { @@ -3427,7 +3411,7 @@ func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, for { select { - case ev := <-tm.w.Event(): + case ev := <-w.Event(): var pl []*table.Path switch msg := ev.(type) { case *watchEventBestPath: @@ -3448,47 +3432,33 @@ func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, continue } select { - case tm.Inbox <- toPathApi(path, nil): - case <-tm.done: + case <-ctx.Done(): return + default: + fn(toPathApi(path, nil)) } } - case <-tm.done: + case <-ctx.Done(): return } } }() - - return tm, nil -} - -type PeerMonitor struct { - Inbox chan *api.Peer - done chan interface{} - w *watcher -} - -func (pm *PeerMonitor) Close() { - close(pm.done) + return nil } -func (s *BgpServer) NewPeerMonitor(r *api.MonitorPeerRequest) (*PeerMonitor, error) { +func (s *BgpServer) MonitorPeer(ctx context.Context, r *api.MonitorPeerRequest, fn func(*api.Peer)) error { if r == nil { - return nil, fmt.Errorf("nil request") - } - pm := &PeerMonitor{ - Inbox: make(chan *api.Peer), - done: make(chan interface{}), - w: s.watch(watchPeerState(r.Current)), + return fmt.Errorf("nil request") } + go func() { + w := s.watch(watchPeerState(r.Current)) defer func() { - close(pm.Inbox) - pm.w.Stop() + w.Stop() }() for { select { - case m := <-pm.w.Event(): + case m := <-w.Event(): msg := m.(*watchEventPeerState) if len(r.Address) > 0 && r.Address != msg.PeerAddress.String() && r.Address != msg.PeerInterface { break @@ -3514,17 +3484,13 @@ func (s *BgpServer) NewPeerMonitor(r *api.MonitorPeerRequest) (*PeerMonitor, err RemotePort: uint32(msg.PeerPort), }, } - select { - case pm.Inbox <- p: - case <-pm.done: - return - } - case <-pm.done: + fn(p) + case <-ctx.Done(): return } } }() - return pm, nil + return nil } type watchEventType string |