diff options
-rw-r--r-- | api/gobgp.pb.go | 188 | ||||
-rw-r--r-- | api/gobgp.proto | 3 | ||||
-rw-r--r-- | server/grpc_server.go | 28 | ||||
-rw-r--r-- | server/server.go | 33 |
4 files changed, 181 insertions, 71 deletions
diff --git a/api/gobgp.pb.go b/api/gobgp.pb.go index 7aeb6e26..0b6689dd 100644 --- a/api/gobgp.pb.go +++ b/api/gobgp.pb.go @@ -1182,7 +1182,6 @@ type GrpcClient interface { GetNeighbor(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Peer, error) GetRib(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_GetRibClient, error) GetAdjRib(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_GetAdjRibClient, error) - MonitorBestChanged(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorBestChangedClient, error) Reset(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Error, error) SoftReset(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Error, error) SoftResetIn(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Error, error) @@ -1196,6 +1195,8 @@ type GrpcClient interface { GetPolicyRoutePolicies(ctx context.Context, in *PolicyArguments, opts ...grpc.CallOption) (Grpc_GetPolicyRoutePoliciesClient, error) GetPolicyRoutePolicy(ctx context.Context, in *PolicyArguments, opts ...grpc.CallOption) (*PolicyDefinition, error) ModPolicyRoutePolicy(ctx context.Context, opts ...grpc.CallOption) (Grpc_ModPolicyRoutePolicyClient, error) + MonitorBestChanged(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorBestChangedClient, error) + MonitorPeerState(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorPeerStateClient, error) } type grpcClient struct { @@ -1311,38 +1312,6 @@ func (x *grpcGetAdjRibClient) Recv() (*Path, error) { return m, nil } -func (c *grpcClient) MonitorBestChanged(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorBestChangedClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[3], c.cc, "/api.Grpc/MonitorBestChanged", opts...) - if err != nil { - return nil, err - } - x := &grpcMonitorBestChangedClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Grpc_MonitorBestChangedClient interface { - Recv() (*Path, error) - grpc.ClientStream -} - -type grpcMonitorBestChangedClient struct { - grpc.ClientStream -} - -func (x *grpcMonitorBestChangedClient) Recv() (*Path, error) { - m := new(Path) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - func (c *grpcClient) Reset(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Error, error) { out := new(Error) err := grpc.Invoke(ctx, "/api.Grpc/Reset", in, out, c.cc, opts...) @@ -1407,7 +1376,7 @@ func (c *grpcClient) Disable(ctx context.Context, in *Arguments, opts ...grpc.Ca } func (c *grpcClient) ModPath(ctx context.Context, opts ...grpc.CallOption) (Grpc_ModPathClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[4], c.cc, "/api.Grpc/ModPath", opts...) + stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[3], c.cc, "/api.Grpc/ModPath", opts...) if err != nil { return nil, err } @@ -1450,7 +1419,7 @@ func (c *grpcClient) GetNeighborPolicy(ctx context.Context, in *Arguments, opts } func (c *grpcClient) ModNeighborPolicy(ctx context.Context, opts ...grpc.CallOption) (Grpc_ModNeighborPolicyClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[5], c.cc, "/api.Grpc/ModNeighborPolicy", opts...) + stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[4], c.cc, "/api.Grpc/ModNeighborPolicy", opts...) if err != nil { return nil, err } @@ -1481,7 +1450,7 @@ func (x *grpcModNeighborPolicyClient) Recv() (*Error, error) { } func (c *grpcClient) GetPolicyRoutePolicies(ctx context.Context, in *PolicyArguments, opts ...grpc.CallOption) (Grpc_GetPolicyRoutePoliciesClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[6], c.cc, "/api.Grpc/GetPolicyRoutePolicies", opts...) + stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[5], c.cc, "/api.Grpc/GetPolicyRoutePolicies", opts...) if err != nil { return nil, err } @@ -1522,7 +1491,7 @@ func (c *grpcClient) GetPolicyRoutePolicy(ctx context.Context, in *PolicyArgumen } func (c *grpcClient) ModPolicyRoutePolicy(ctx context.Context, opts ...grpc.CallOption) (Grpc_ModPolicyRoutePolicyClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[7], c.cc, "/api.Grpc/ModPolicyRoutePolicy", opts...) + stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[6], c.cc, "/api.Grpc/ModPolicyRoutePolicy", opts...) if err != nil { return nil, err } @@ -1552,6 +1521,70 @@ func (x *grpcModPolicyRoutePolicyClient) Recv() (*Error, error) { return m, nil } +func (c *grpcClient) MonitorBestChanged(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorBestChangedClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[7], c.cc, "/api.Grpc/MonitorBestChanged", opts...) + if err != nil { + return nil, err + } + x := &grpcMonitorBestChangedClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Grpc_MonitorBestChangedClient interface { + Recv() (*Path, error) + grpc.ClientStream +} + +type grpcMonitorBestChangedClient struct { + grpc.ClientStream +} + +func (x *grpcMonitorBestChangedClient) Recv() (*Path, error) { + m := new(Path) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *grpcClient) MonitorPeerState(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorPeerStateClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[8], c.cc, "/api.Grpc/MonitorPeerState", opts...) + if err != nil { + return nil, err + } + x := &grpcMonitorPeerStateClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Grpc_MonitorPeerStateClient interface { + Recv() (*Peer, error) + grpc.ClientStream +} + +type grpcMonitorPeerStateClient struct { + grpc.ClientStream +} + +func (x *grpcMonitorPeerStateClient) Recv() (*Peer, error) { + m := new(Peer) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Server API for Grpc service type GrpcServer interface { @@ -1559,7 +1592,6 @@ type GrpcServer interface { GetNeighbor(context.Context, *Arguments) (*Peer, error) GetRib(*Arguments, Grpc_GetRibServer) error GetAdjRib(*Arguments, Grpc_GetAdjRibServer) error - MonitorBestChanged(*Arguments, Grpc_MonitorBestChangedServer) error Reset(context.Context, *Arguments) (*Error, error) SoftReset(context.Context, *Arguments) (*Error, error) SoftResetIn(context.Context, *Arguments) (*Error, error) @@ -1573,6 +1605,8 @@ type GrpcServer interface { GetPolicyRoutePolicies(*PolicyArguments, Grpc_GetPolicyRoutePoliciesServer) error GetPolicyRoutePolicy(context.Context, *PolicyArguments) (*PolicyDefinition, error) ModPolicyRoutePolicy(Grpc_ModPolicyRoutePolicyServer) error + MonitorBestChanged(*Arguments, Grpc_MonitorBestChangedServer) error + MonitorPeerState(*Arguments, Grpc_MonitorPeerStateServer) error } func RegisterGrpcServer(s *grpc.Server, srv GrpcServer) { @@ -1654,27 +1688,6 @@ func (x *grpcGetAdjRibServer) Send(m *Path) error { return x.ServerStream.SendMsg(m) } -func _Grpc_MonitorBestChanged_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(Arguments) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(GrpcServer).MonitorBestChanged(m, &grpcMonitorBestChangedServer{stream}) -} - -type Grpc_MonitorBestChangedServer interface { - Send(*Path) error - grpc.ServerStream -} - -type grpcMonitorBestChangedServer struct { - grpc.ServerStream -} - -func (x *grpcMonitorBestChangedServer) Send(m *Path) error { - return x.ServerStream.SendMsg(m) -} - func _Grpc_Reset_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { in := new(Arguments) if err := codec.Unmarshal(buf, in); err != nil { @@ -1882,6 +1895,48 @@ func (x *grpcModPolicyRoutePolicyServer) Recv() (*PolicyArguments, error) { return m, nil } +func _Grpc_MonitorBestChanged_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Arguments) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(GrpcServer).MonitorBestChanged(m, &grpcMonitorBestChangedServer{stream}) +} + +type Grpc_MonitorBestChangedServer interface { + Send(*Path) error + grpc.ServerStream +} + +type grpcMonitorBestChangedServer struct { + grpc.ServerStream +} + +func (x *grpcMonitorBestChangedServer) Send(m *Path) error { + return x.ServerStream.SendMsg(m) +} + +func _Grpc_MonitorPeerState_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Arguments) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(GrpcServer).MonitorPeerState(m, &grpcMonitorPeerStateServer{stream}) +} + +type Grpc_MonitorPeerStateServer interface { + Send(*Peer) error + grpc.ServerStream +} + +type grpcMonitorPeerStateServer struct { + grpc.ServerStream +} + +func (x *grpcMonitorPeerStateServer) Send(m *Peer) error { + return x.ServerStream.SendMsg(m) +} + var _Grpc_serviceDesc = grpc.ServiceDesc{ ServiceName: "api.Grpc", HandlerType: (*GrpcServer)(nil), @@ -1944,11 +1999,6 @@ var _Grpc_serviceDesc = grpc.ServiceDesc{ ServerStreams: true, }, { - StreamName: "MonitorBestChanged", - Handler: _Grpc_MonitorBestChanged_Handler, - ServerStreams: true, - }, - { StreamName: "ModPath", Handler: _Grpc_ModPath_Handler, ClientStreams: true, @@ -1970,5 +2020,15 @@ var _Grpc_serviceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "MonitorBestChanged", + Handler: _Grpc_MonitorBestChanged_Handler, + ServerStreams: true, + }, + { + StreamName: "MonitorPeerState", + Handler: _Grpc_MonitorPeerState_Handler, + ServerStreams: true, + }, }, } diff --git a/api/gobgp.proto b/api/gobgp.proto index 9a7717f3..2c6c70d5 100644 --- a/api/gobgp.proto +++ b/api/gobgp.proto @@ -24,7 +24,6 @@ service Grpc { rpc GetNeighbor(Arguments) returns (Peer) {} rpc GetRib(Arguments) returns (stream Destination) {} rpc GetAdjRib(Arguments) returns (stream Path) {} - rpc MonitorBestChanged(Arguments) returns (stream Path) {} rpc Reset(Arguments) returns (Error) {} rpc SoftReset(Arguments) returns (Error) {} rpc SoftResetIn(Arguments) returns (Error) {} @@ -38,6 +37,8 @@ service Grpc { rpc GetPolicyRoutePolicies(PolicyArguments) returns (stream PolicyDefinition) {} rpc GetPolicyRoutePolicy(PolicyArguments) returns (PolicyDefinition) {} rpc ModPolicyRoutePolicy(stream PolicyArguments) returns (stream Error) {} + rpc MonitorBestChanged(Arguments) returns (stream Path) {} + rpc MonitorPeerState(Arguments) returns (stream Peer) {} } message Error { diff --git a/server/grpc_server.go b/server/grpc_server.go index aa36a264..12a3a87a 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -48,7 +48,6 @@ const ( REQ_GLOBAL_RIB REQ_GLOBAL_ADD REQ_GLOBAL_DELETE - REQ_GLOBAL_MONITOR_BEST_CHANGED REQ_POLICY_PREFIX REQ_POLICY_PREFIXES REQ_POLICY_PREFIX_ADD @@ -74,6 +73,8 @@ const ( REQ_POLICY_COMMUNITY_ADD REQ_POLICY_COMMUNITY_DELETE REQ_POLICY_COMMUNITIES_DELETE + REQ_MONITOR_GLOBAL_BEST_CHANGED + REQ_MONITOR_NEIGHBOR_PEER_STATE ) const GRPC_PORT = 8080 @@ -210,7 +211,7 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorB var reqType int switch arg.Resource { case api.Resource_GLOBAL: - reqType = REQ_GLOBAL_MONITOR_BEST_CHANGED + reqType = REQ_MONITOR_GLOBAL_BEST_CHANGED default: return fmt.Errorf("unsupported resource type: %v", arg.Resource) } @@ -237,6 +238,27 @@ END: return err } +func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPeerStateServer) error { + var rf bgp.RouteFamily + req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.RouterId, rf, nil) + s.bgpServerCh <- req + + var err error + + for res := range req.ResponseCh { + if err = res.Err(); err != nil { + log.Debug(err.Error()) + goto END + } + if err = stream.Send(res.Data.(*api.Peer)); err != nil { + goto END + } + } +END: + req.EndCh <- struct{}{} + return err +} + func (s *Server) neighbor(reqType int, arg *api.Arguments) (*api.Error, error) { rf, err := convertAf2Rf(arg.Af) if err != nil { @@ -555,7 +577,7 @@ func NewGrpcRequest(reqType int, remoteAddr string, rf bgp.RouteFamily, d interf RouteFamily: rf, RemoteAddr: remoteAddr, ResponseCh: make(chan *GrpcResponse), - EndCh: make(chan struct{}), + EndCh: make(chan struct{}, 1), Data: d, } return r diff --git a/server/server.go b/server/server.go index cc19ecc7..7bfedc49 100644 --- a/server/server.go +++ b/server/server.go @@ -219,7 +219,7 @@ func (server *BgpServer) Serve() { server.neighborMap[name] = peer peer.outgoing = make(chan *bgp.BGPMessage, 128) peer.startFSMHandler(incoming) - + server.broadcastPeerState(peer) case config := <-server.deletedPeerCh: addr := config.NeighborAddress.String() SetTcpMD5SigSockopts(listener(config.NeighborAddress), addr, "") @@ -415,6 +415,10 @@ func (server *BgpServer) broadcastBests(bests []table.Path) { } remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) for _, req := range server.broadcastReqs { + if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED { + remainReqs = append(remainReqs, req) + continue + } select { case <-req.EndCh: continue @@ -426,6 +430,28 @@ func (server *BgpServer) broadcastBests(bests []table.Path) { } } +func (server *BgpServer) broadcastPeerState(peer *Peer) { + result := &GrpcResponse{ + Data: peer.ToApiStruct(), + } + remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) + for _, req := range server.broadcastReqs { + ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE + ignore = ignore || (req.RemoteAddr != "" && req.RemoteAddr != peer.config.NeighborAddress.String()) + if ignore { + remainReqs = append(remainReqs, req) + continue + } + select { + case <-req.EndCh: + continue + case req.ResponseCh <- result: + } + remainReqs = append(remainReqs, req) + } + server.broadcastReqs = remainReqs +} + func (server *BgpServer) propagateUpdate(neighborAddress string, RouteServerClient bool, pathList []table.Path) []*SenderMsg { msgs := make([]*SenderMsg, 0) @@ -528,6 +554,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan * peer.config.BgpNeighborCommonState = config.BgpNeighborCommonState{} } peer.startFSMHandler(incoming) + server.broadcastPeerState(peer) case FSM_MSG_BGP_MESSAGE: switch m := e.MsgData.(type) { @@ -814,8 +841,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { close(grpcReq.ResponseCh) } - case REQ_GLOBAL_MONITOR_BEST_CHANGED: - server.broadcastReqs = append(server.broadcastReqs, grpcReq) case REQ_NEIGHBORS: for _, peer := range server.neighborMap { result := &GrpcResponse{ @@ -1089,6 +1114,8 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { case REQ_POLICY_PREFIXES_DELETE, REQ_POLICY_NEIGHBORS_DELETE, REQ_POLICY_ASPATHS_DELETE, REQ_POLICY_COMMUNITIES_DELETE, REQ_POLICY_ROUTEPOLICIES_DELETE: server.handleGrpcDelPolicies(grpcReq) + case REQ_MONITOR_GLOBAL_BEST_CHANGED, REQ_MONITOR_NEIGHBOR_PEER_STATE: + server.broadcastReqs = append(server.broadcastReqs, grpcReq) default: errmsg := fmt.Errorf("Unknown request type: %v", grpcReq.RequestType) result := &GrpcResponse{ |