summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-11-12 09:25:56 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-11-12 09:30:20 +0900
commit99556f9cfc2df561dcf7d91c147a129449635224 (patch)
tree7331d768c1cee74ab93041717abd03f9de9702e7 /pkg
parent289fc39deb9cfa4ee678ff902d51f84c6c828136 (diff)
server: change monitor API design
https://github.com/osrg/gobgp/issues/1763#issuecomment-437594975 Follow Chris's proposal; consistent with the rest of the APIs. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'pkg')
-rw-r--r--pkg/server/grpc_server.go53
-rw-r--r--pkg/server/server.go72
2 files changed, 42 insertions, 83 deletions
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go
index 86c756a9..0aac4cf0 100644
--- a/pkg/server/grpc_server.go
+++ b/pkg/server/grpc_server.go
@@ -166,41 +166,34 @@ func (s *server) ListPath(r *api.ListPathRequest, stream api.GobgpApi_ListPathSe
}
func (s *server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_MonitorTableServer) error {
- tm, err := s.bgpServer.NewTableMonitor(arg)
- if err != nil {
- return err
- }
- return func() error {
- defer tm.Close()
-
- for v := range tm.Inbox {
- if err := stream.Send(&api.MonitorTableResponse{
- Path: v,
- }); err != nil {
- return err
- }
+ ctx, cancel := context.WithCancel(context.Background())
+ var err error
+ s.bgpServer.MonitorTable(ctx, arg, func(p *api.Path) {
+ if err = stream.Send(&api.MonitorTableResponse{
+ Path: p,
+ }); err != nil {
+ cancel()
+ return
}
- return nil
- }()
+ })
+ <-ctx.Done()
+ return err
}
func (s *server) MonitorPeer(arg *api.MonitorPeerRequest, stream api.GobgpApi_MonitorPeerServer) error {
- pm, err := s.bgpServer.NewPeerMonitor(arg)
- if err != nil {
- return err
- }
- return func() error {
- defer pm.Close()
-
- for v := range pm.Inbox {
- if err := stream.Send(&api.MonitorPeerResponse{
- Peer: v,
- }); err != nil {
- return err
- }
+ ctx, cancel := context.WithCancel(context.Background())
+ var err error
+ err = s.bgpServer.MonitorPeer(ctx, arg, func(p *api.Peer) {
+ if err = stream.Send(&api.MonitorPeerResponse{
+ Peer: p,
+ }); err != nil {
+ fmt.Println("try to cancel")
+ cancel()
+ return
}
- return nil
- }()
+ })
+ <-ctx.Done()
+ return err
}
func (s *server) ResetPeer(ctx context.Context, r *api.ResetPeerRequest) (*empty.Empty, error) {
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 749750f9..4fd4aa25 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -3379,19 +3379,9 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro
}, false)
}
-type TableMonitor struct {
- Inbox chan *api.Path
- done chan interface{}
- w *watcher
-}
-
-func (tm *TableMonitor) Close() {
- close(tm.done)
-}
-
-func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor, error) {
+func (s *BgpServer) MonitorTable(ctx context.Context, r *api.MonitorTableRequest, fn func(*api.Path)) error {
if r == nil {
- return nil, fmt.Errorf("nil request")
+ return fmt.Errorf("nil request")
}
w, err := func() (*watcher, error) {
switch r.Type {
@@ -3407,18 +3397,12 @@ func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor,
}
}()
if err != nil {
- return nil, err
+ return err
}
- tm := &TableMonitor{
- Inbox: make(chan *api.Path),
- done: make(chan interface{}),
- w: w,
- }
go func() {
defer func() {
- close(tm.Inbox)
- tm.w.Stop()
+ w.Stop()
}()
family := bgp.RouteFamily(0)
if r.Family != nil {
@@ -3427,7 +3411,7 @@ func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor,
for {
select {
- case ev := <-tm.w.Event():
+ case ev := <-w.Event():
var pl []*table.Path
switch msg := ev.(type) {
case *watchEventBestPath:
@@ -3448,47 +3432,33 @@ func (s *BgpServer) NewTableMonitor(r *api.MonitorTableRequest) (*TableMonitor,
continue
}
select {
- case tm.Inbox <- toPathApi(path, nil):
- case <-tm.done:
+ case <-ctx.Done():
return
+ default:
+ fn(toPathApi(path, nil))
}
}
- case <-tm.done:
+ case <-ctx.Done():
return
}
}
}()
-
- return tm, nil
-}
-
-type PeerMonitor struct {
- Inbox chan *api.Peer
- done chan interface{}
- w *watcher
-}
-
-func (pm *PeerMonitor) Close() {
- close(pm.done)
+ return nil
}
-func (s *BgpServer) NewPeerMonitor(r *api.MonitorPeerRequest) (*PeerMonitor, error) {
+func (s *BgpServer) MonitorPeer(ctx context.Context, r *api.MonitorPeerRequest, fn func(*api.Peer)) error {
if r == nil {
- return nil, fmt.Errorf("nil request")
- }
- pm := &PeerMonitor{
- Inbox: make(chan *api.Peer),
- done: make(chan interface{}),
- w: s.watch(watchPeerState(r.Current)),
+ return fmt.Errorf("nil request")
}
+
go func() {
+ w := s.watch(watchPeerState(r.Current))
defer func() {
- close(pm.Inbox)
- pm.w.Stop()
+ w.Stop()
}()
for {
select {
- case m := <-pm.w.Event():
+ case m := <-w.Event():
msg := m.(*watchEventPeerState)
if len(r.Address) > 0 && r.Address != msg.PeerAddress.String() && r.Address != msg.PeerInterface {
break
@@ -3514,17 +3484,13 @@ func (s *BgpServer) NewPeerMonitor(r *api.MonitorPeerRequest) (*PeerMonitor, err
RemotePort: uint32(msg.PeerPort),
},
}
- select {
- case pm.Inbox <- p:
- case <-pm.done:
- return
- }
- case <-pm.done:
+ fn(p)
+ case <-ctx.Done():
return
}
}
}()
- return pm, nil
+ return nil
}
type watchEventType string