summaryrefslogtreecommitdiffhomepage
path: root/server/grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/grpc_server.go')
-rw-r--r--server/grpc_server.go133
1 files changed, 103 insertions, 30 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go
index 158f435f..ebe8e972 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -240,21 +240,6 @@ func (s *Server) GetNeighbor(ctx context.Context, arg *api.GetNeighborRequest) (
return &api.GetNeighborResponse{Peers: p}, nil
}
-func handleMultipleResponses(req *GrpcRequest, f func(*GrpcResponse) error) error {
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- req.EndCh <- struct{}{}
- return err
- }
- if err := f(res); err != nil {
- req.EndCh <- struct{}{}
- return err
- }
- }
- return nil
-}
-
func toPathApi(id string, path *table.Path) *api.Path {
nlri := path.GetNlri()
n, _ := nlri.Serialize()
@@ -357,27 +342,115 @@ func (s *Server) GetRib(ctx context.Context, arg *api.GetRibRequest) (*api.GetRi
}
func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer) error {
- switch arg.Type {
- case api.Resource_ADJ_IN, api.Resource_GLOBAL:
- default:
- return fmt.Errorf("unsupported resource type: %v", arg.Type)
+ w, err := func() (*Watcher, error) {
+ switch arg.Type {
+ case api.Resource_GLOBAL:
+ return s.bgpServer.Watch(WatchBestPath()), nil
+ case api.Resource_ADJ_IN:
+ if arg.PostPolicy {
+ return s.bgpServer.Watch(WatchPostUpdate()), nil
+ }
+ return s.bgpServer.Watch(WatchUpdate()), nil
+ default:
+ return nil, fmt.Errorf("unsupported resource type: %v", arg.Type)
+ }
+ }()
+ if err != nil {
+ return nil
}
- req := NewGrpcRequest(REQ_MONITOR_RIB, arg.Name, bgp.RouteFamily(arg.Family), arg)
- s.bgpServerCh <- req
- return handleMultipleResponses(req, func(res *GrpcResponse) error {
- return stream.Send(res.Data.(*api.Destination))
- })
+ return func() error {
+ defer func() { w.Stop() }()
+
+ sendPath := func(pathList []*table.Path) error {
+ dsts := make(map[string]*api.Destination)
+ for _, path := range pathList {
+ if path == nil {
+ continue
+ }
+ if dst, y := dsts[path.GetNlri().String()]; y {
+ dst.Paths = append(dst.Paths, toPathApi(table.GLOBAL_RIB_NAME, path))
+ } else {
+ dsts[path.GetNlri().String()] = &api.Destination{
+ Prefix: path.GetNlri().String(),
+ Paths: []*api.Path{toPathApi(table.GLOBAL_RIB_NAME, path)},
+ }
+ }
+ }
+ for _, dst := range dsts {
+ if err := stream.Send(dst); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ for {
+ select {
+ case ev := <-w.Event():
+ switch msg := ev.(type) {
+ case *watcherEventBestPathMsg:
+ if err := sendPath(func() []*table.Path {
+ if len(msg.multiPathList) > 0 {
+ l := make([]*table.Path, 0)
+ for _, p := range msg.multiPathList {
+ l = append(l, p...)
+ }
+ return l
+ } else {
+ return msg.pathList
+ }
+ }()); err != nil {
+ return err
+ }
+ case *watcherEventUpdateMsg:
+ if err := sendPath(msg.pathList); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ }()
}
func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_MonitorPeerStateServer) error {
- var rf bgp.RouteFamily
- req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil)
- s.bgpServerCh <- req
+ return func() error {
+ w := s.bgpServer.Watch(WatchPeerState())
+ defer func() { w.Stop() }()
- return handleMultipleResponses(req, func(res *GrpcResponse) error {
- return stream.Send(res.Data.(*api.Peer))
- })
+ for {
+ select {
+ case ev := <-w.Event():
+ switch msg := ev.(type) {
+ case *watcherEventStateChangedMsg:
+ if len(arg.Name) > 0 && arg.Name != msg.peerAddress.String() {
+ continue
+ }
+ if err := stream.Send(&api.Peer{
+ Conf: &api.PeerConf{
+ PeerAs: msg.peerAS,
+ LocalAs: msg.localAS,
+ NeighborAddress: msg.peerAddress.String(),
+ Id: msg.peerID.String(),
+ },
+ Info: &api.PeerState{
+ PeerAs: msg.peerAS,
+ LocalAs: msg.localAS,
+ NeighborAddress: msg.peerAddress.String(),
+ BgpState: msg.state.String(),
+ AdminState: msg.adminState.String(),
+ },
+ Transport: &api.Transport{
+ LocalAddress: msg.localAddress.String(),
+ LocalPort: uint32(msg.localPort),
+ RemotePort: uint32(msg.peerPort),
+ },
+ }); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ }()
}
func (s *Server) neighbor(reqType int, address string, d interface{}) (interface{}, error) {