summaryrefslogtreecommitdiffhomepage
path: root/pkg/server
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-11-08 10:09:18 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-11-08 10:09:18 +0900
commitd4e04188ce7a406642b1dadd038b98a4e02627f1 (patch)
treee7a4816cec613aa92d2dd5a78138de5663282e75 /pkg/server
parentdf8ad76b5ca5316ae2a9ca88c5aa6af5f2dc9b4e (diff)
server: Add PeerMonitor API
go native API that corresponds to MonitorPeer gRPC API. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'pkg/server')
-rw-r--r--pkg/server/grpc_server.go43
-rw-r--r--pkg/server/server.go65
2 files changed, 74 insertions, 34 deletions
diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go
index 3b6cf7a9..b8a1f9e8 100644
--- a/pkg/server/grpc_server.go
+++ b/pkg/server/grpc_server.go
@@ -234,43 +234,18 @@ func (s *Server) MonitorTable(arg *api.MonitorTableRequest, stream api.GobgpApi_
}
func (s *Server) MonitorPeer(arg *api.MonitorPeerRequest, stream api.GobgpApi_MonitorPeerServer) error {
- if arg == nil {
- return fmt.Errorf("invalid request")
+ pm, err := s.bgpServer.NewPeerMonitor(arg)
+ if err != nil {
+ return err
}
return func() error {
- w := s.bgpServer.watch(watchPeerState(arg.Current))
- defer func() { w.Stop() }()
+ defer pm.Close()
- for ev := range w.Event() {
- switch msg := ev.(type) {
- case *watchEventPeerState:
- if len(arg.Address) > 0 && arg.Address != msg.PeerAddress.String() && arg.Address != msg.PeerInterface {
- continue
- }
- if err := stream.Send(&api.MonitorPeerResponse{
- Peer: &api.Peer{
- Conf: &api.PeerConf{
- PeerAs: msg.PeerAS,
- LocalAs: msg.LocalAS,
- NeighborAddress: msg.PeerAddress.String(),
- Id: msg.PeerID.String(),
- NeighborInterface: msg.PeerInterface,
- },
- State: &api.PeerState{
- PeerAs: msg.PeerAS,
- LocalAs: msg.LocalAS,
- NeighborAddress: msg.PeerAddress.String(),
- SessionState: api.PeerState_SessionState(int(msg.State) + 1),
- AdminState: api.PeerState_AdminState(msg.AdminState),
- },
- Transport: &api.Transport{
- LocalAddress: msg.LocalAddress.String(),
- LocalPort: uint32(msg.LocalPort),
- RemotePort: uint32(msg.PeerPort),
- },
- }}); err != nil {
- return err
- }
+ for v := range pm.Inbox {
+ if err := stream.Send(&api.MonitorPeerResponse{
+ Peer: v,
+ }); err != nil {
+ return err
}
}
return nil
diff --git a/pkg/server/server.go b/pkg/server/server.go
index f5dfbeb6..d756b29c 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -3340,6 +3340,71 @@ func (s *BgpServer) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) erro
}, false)
}
+type PeerMonitor struct {
+ Inbox chan *api.Peer
+ done chan interface{}
+ w *watcher
+}
+
+func (pm *PeerMonitor) Close() {
+ close(pm.done)
+}
+
+func (s *BgpServer) NewPeerMonitor(r *api.MonitorPeerRequest) (*PeerMonitor, error) {
+ if r == nil {
+ return nil, fmt.Errorf("nil request")
+ }
+ pm := &PeerMonitor{
+ Inbox: make(chan *api.Peer),
+ done: make(chan interface{}),
+ w: s.watch(watchPeerState(r.Current)),
+ }
+ go func() {
+ defer func() {
+ close(pm.Inbox)
+ pm.w.Stop()
+ }()
+ for {
+ select {
+ case m := <-pm.w.Event():
+ msg := m.(*watchEventPeerState)
+ if len(r.Address) > 0 && r.Address != msg.PeerAddress.String() && r.Address != msg.PeerInterface {
+ break
+ }
+ p := &api.Peer{
+ Conf: &api.PeerConf{
+ PeerAs: msg.PeerAS,
+ LocalAs: msg.LocalAS,
+ NeighborAddress: msg.PeerAddress.String(),
+ Id: msg.PeerID.String(),
+ NeighborInterface: msg.PeerInterface,
+ },
+ State: &api.PeerState{
+ PeerAs: msg.PeerAS,
+ LocalAs: msg.LocalAS,
+ NeighborAddress: msg.PeerAddress.String(),
+ SessionState: api.PeerState_SessionState(int(msg.State) + 1),
+ AdminState: api.PeerState_AdminState(msg.AdminState),
+ },
+ Transport: &api.Transport{
+ LocalAddress: msg.LocalAddress.String(),
+ LocalPort: uint32(msg.LocalPort),
+ RemotePort: uint32(msg.PeerPort),
+ },
+ }
+ select {
+ case pm.Inbox <- p:
+ case <-pm.done:
+ return
+ }
+ case <-pm.done:
+ return
+ }
+ }
+ }()
+ return pm, nil
+}
+
type watchEventType string
const (