diff options
Diffstat (limited to 'server/grpc_server.go')
-rw-r--r-- | server/grpc_server.go | 133 |
1 files changed, 103 insertions, 30 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index 158f435f..ebe8e972 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -240,21 +240,6 @@ func (s *Server) GetNeighbor(ctx context.Context, arg *api.GetNeighborRequest) ( return &api.GetNeighborResponse{Peers: p}, nil } -func handleMultipleResponses(req *GrpcRequest, f func(*GrpcResponse) error) error { - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - req.EndCh <- struct{}{} - return err - } - if err := f(res); err != nil { - req.EndCh <- struct{}{} - return err - } - } - return nil -} - func toPathApi(id string, path *table.Path) *api.Path { nlri := path.GetNlri() n, _ := nlri.Serialize() @@ -357,27 +342,115 @@ func (s *Server) GetRib(ctx context.Context, arg *api.GetRibRequest) (*api.GetRi } func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer) error { - switch arg.Type { - case api.Resource_ADJ_IN, api.Resource_GLOBAL: - default: - return fmt.Errorf("unsupported resource type: %v", arg.Type) + w, err := func() (*Watcher, error) { + switch arg.Type { + case api.Resource_GLOBAL: + return s.bgpServer.Watch(WatchBestPath()), nil + case api.Resource_ADJ_IN: + if arg.PostPolicy { + return s.bgpServer.Watch(WatchPostUpdate()), nil + } + return s.bgpServer.Watch(WatchUpdate()), nil + default: + return nil, fmt.Errorf("unsupported resource type: %v", arg.Type) + } + }() + if err != nil { + return nil } - req := NewGrpcRequest(REQ_MONITOR_RIB, arg.Name, bgp.RouteFamily(arg.Family), arg) - s.bgpServerCh <- req - return handleMultipleResponses(req, func(res *GrpcResponse) error { - return stream.Send(res.Data.(*api.Destination)) - }) + return func() error { + defer func() { w.Stop() }() + + sendPath := func(pathList []*table.Path) error { + dsts := make(map[string]*api.Destination) + for _, path := range pathList { + if path == nil { + continue + } + if dst, y := dsts[path.GetNlri().String()]; y { + dst.Paths = append(dst.Paths, toPathApi(table.GLOBAL_RIB_NAME, path)) + } else { + dsts[path.GetNlri().String()] = &api.Destination{ + Prefix: path.GetNlri().String(), + Paths: []*api.Path{toPathApi(table.GLOBAL_RIB_NAME, path)}, + } + } + } + for _, dst := range dsts { + if err := stream.Send(dst); err != nil { + return err + } + } + return nil + } + for { + select { + case ev := <-w.Event(): + switch msg := ev.(type) { + case *watcherEventBestPathMsg: + if err := sendPath(func() []*table.Path { + if len(msg.multiPathList) > 0 { + l := make([]*table.Path, 0) + for _, p := range msg.multiPathList { + l = append(l, p...) + } + return l + } else { + return msg.pathList + } + }()); err != nil { + return err + } + case *watcherEventUpdateMsg: + if err := sendPath(msg.pathList); err != nil { + return err + } + } + } + } + }() } func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_MonitorPeerStateServer) error { - var rf bgp.RouteFamily - req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil) - s.bgpServerCh <- req + return func() error { + w := s.bgpServer.Watch(WatchPeerState()) + defer func() { w.Stop() }() - return handleMultipleResponses(req, func(res *GrpcResponse) error { - return stream.Send(res.Data.(*api.Peer)) - }) + for { + select { + case ev := <-w.Event(): + switch msg := ev.(type) { + case *watcherEventStateChangedMsg: + if len(arg.Name) > 0 && arg.Name != msg.peerAddress.String() { + continue + } + if err := stream.Send(&api.Peer{ + Conf: &api.PeerConf{ + PeerAs: msg.peerAS, + LocalAs: msg.localAS, + NeighborAddress: msg.peerAddress.String(), + Id: msg.peerID.String(), + }, + Info: &api.PeerState{ + PeerAs: msg.peerAS, + LocalAs: msg.localAS, + NeighborAddress: msg.peerAddress.String(), + BgpState: msg.state.String(), + AdminState: msg.adminState.String(), + }, + Transport: &api.Transport{ + LocalAddress: msg.localAddress.String(), + LocalPort: uint32(msg.localPort), + RemotePort: uint32(msg.peerPort), + }, + }); err != nil { + return err + } + } + } + } + }() } func (s *Server) neighbor(reqType int, address string, d interface{}) (interface{}, error) { |