summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/server.go')
-rw-r--r--pkg/server/server.go65
1 files changed, 65 insertions, 0 deletions
diff --git a/pkg/server/server.go b/pkg/server/server.go
index f5dfbeb6..d756b29c 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -3340,6 +3340,71 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro
}, false)
}
+type PeerMonitor struct {
+ Inbox chan *api.Peer
+ done chan interface{}
+ w *watcher
+}
+
+func (pm *PeerMonitor) Close() {
+ close(pm.done)
+}
+
+func (s *BgpServer) NewPeerMonitor(r *api.MonitorPeerRequest) (*PeerMonitor, error) {
+ if r == nil {
+ return nil, fmt.Errorf("nil request")
+ }
+ pm := &PeerMonitor{
+ Inbox: make(chan *api.Peer),
+ done: make(chan interface{}),
+ w: s.watch(watchPeerState(r.Current)),
+ }
+ go func() {
+ defer func() {
+ close(pm.Inbox)
+ pm.w.Stop()
+ }()
+ for {
+ select {
+ case m := <-pm.w.Event():
+ msg := m.(*watchEventPeerState)
+ if len(r.Address) > 0 && r.Address != msg.PeerAddress.String() && r.Address != msg.PeerInterface {
+ break
+ }
+ p := &api.Peer{
+ Conf: &api.PeerConf{
+ PeerAs: msg.PeerAS,
+ LocalAs: msg.LocalAS,
+ NeighborAddress: msg.PeerAddress.String(),
+ Id: msg.PeerID.String(),
+ NeighborInterface: msg.PeerInterface,
+ },
+ State: &api.PeerState{
+ PeerAs: msg.PeerAS,
+ LocalAs: msg.LocalAS,
+ NeighborAddress: msg.PeerAddress.String(),
+ SessionState: api.PeerState_SessionState(int(msg.State) + 1),
+ AdminState: api.PeerState_AdminState(msg.AdminState),
+ },
+ Transport: &api.Transport{
+ LocalAddress: msg.LocalAddress.String(),
+ LocalPort: uint32(msg.LocalPort),
+ RemotePort: uint32(msg.PeerPort),
+ },
+ }
+ select {
+ case pm.Inbox <- p:
+ case <-pm.done:
+ return
+ }
+ case <-pm.done:
+ return
+ }
+ }
+ }()
+ return pm, nil
+}
+
type watchEventType string
const (