diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/server/grpc_server.go | 43 | ||||
-rw-r--r-- | pkg/server/server.go | 65 |
2 files changed, 74 insertions, 34 deletions
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index 3b6cf7a9..b8a1f9e8 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -234,43 +234,18 @@ func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_ } func (s *Server) MonitorPeer(arg *api.MonitorPeerRequest, stream api.GobgpApi_MonitorPeerServer) error { - if arg == nil { - return fmt.Errorf("invalid request") + pm, err := s.bgpServer.NewPeerMonitor(arg) + if err != nil { + return err } return func() error { - w := s.bgpServer.watch(watchPeerState(arg.Current)) - defer func() { w.Stop() }() + defer pm.Close() - for ev := range w.Event() { - switch msg := ev.(type) { - case *watchEventPeerState: - if len(arg.Address) > 0 && arg.Address != msg.PeerAddress.String() && arg.Address != msg.PeerInterface { - continue - } - if err := stream.Send(&api.MonitorPeerResponse{ - Peer: &api.Peer{ - Conf: &api.PeerConf{ - PeerAs: msg.PeerAS, - LocalAs: msg.LocalAS, - NeighborAddress: msg.PeerAddress.String(), - Id: msg.PeerID.String(), - NeighborInterface: msg.PeerInterface, - }, - State: &api.PeerState{ - PeerAs: msg.PeerAS, - LocalAs: msg.LocalAS, - NeighborAddress: msg.PeerAddress.String(), - SessionState: api.PeerState_SessionState(int(msg.State) + 1), - AdminState: api.PeerState_AdminState(msg.AdminState), - }, - Transport: &api.Transport{ - LocalAddress: msg.LocalAddress.String(), - LocalPort: uint32(msg.LocalPort), - RemotePort: uint32(msg.PeerPort), - }, - }}); err != nil { - return err - } + for v := range pm.Inbox { + if err := stream.Send(&api.MonitorPeerResponse{ + Peer: v, + }); err != nil { + return err } } return nil diff --git a/pkg/server/server.go b/pkg/server/server.go index f5dfbeb6..d756b29c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -3340,6 +3340,71 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro }, false) } +type PeerMonitor struct { + Inbox chan *api.Peer + done chan interface{} + w *watcher +} + +func (pm *PeerMonitor) Close() { + close(pm.done) +} + +func (s *BgpServer) NewPeerMonitor(r *api.MonitorPeerRequest) (*PeerMonitor, error) { + if r == nil { + return nil, fmt.Errorf("nil request") + } + pm := &PeerMonitor{ + Inbox: make(chan *api.Peer), + done: make(chan interface{}), + w: s.watch(watchPeerState(r.Current)), + } + go func() { + defer func() { + close(pm.Inbox) + pm.w.Stop() + }() + for { + select { + case m := <-pm.w.Event(): + msg := m.(*watchEventPeerState) + if len(r.Address) > 0 && r.Address != msg.PeerAddress.String() && r.Address != msg.PeerInterface { + break + } + p := &api.Peer{ + Conf: &api.PeerConf{ + PeerAs: msg.PeerAS, + LocalAs: msg.LocalAS, + NeighborAddress: msg.PeerAddress.String(), + Id: msg.PeerID.String(), + NeighborInterface: msg.PeerInterface, + }, + State: &api.PeerState{ + PeerAs: msg.PeerAS, + LocalAs: msg.LocalAS, + NeighborAddress: msg.PeerAddress.String(), + SessionState: api.PeerState_SessionState(int(msg.State) + 1), + AdminState: api.PeerState_AdminState(msg.AdminState), + }, + Transport: &api.Transport{ + LocalAddress: msg.LocalAddress.String(), + LocalPort: uint32(msg.LocalPort), + RemotePort: uint32(msg.PeerPort), + }, + } + select { + case pm.Inbox <- p: + case <-pm.done: + return + } + case <-pm.done: + return + } + } + }() + return pm, nil +} + type watchEventType string const ( |