summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--api/gobgp.pb.go188
-rw-r--r--api/gobgp.proto3
-rw-r--r--server/grpc_server.go28
-rw-r--r--server/server.go33
4 files changed, 181 insertions, 71 deletions
diff --git a/api/gobgp.pb.go b/api/gobgp.pb.go
index 7aeb6e26..0b6689dd 100644
--- a/api/gobgp.pb.go
+++ b/api/gobgp.pb.go
@@ -1182,7 +1182,6 @@ type GrpcClient interface {
GetNeighbor(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Peer, error)
GetRib(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_GetRibClient, error)
GetAdjRib(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_GetAdjRibClient, error)
- MonitorBestChanged(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorBestChangedClient, error)
Reset(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Error, error)
SoftReset(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Error, error)
SoftResetIn(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Error, error)
@@ -1196,6 +1195,8 @@ type GrpcClient interface {
GetPolicyRoutePolicies(ctx context.Context, in *PolicyArguments, opts ...grpc.CallOption) (Grpc_GetPolicyRoutePoliciesClient, error)
GetPolicyRoutePolicy(ctx context.Context, in *PolicyArguments, opts ...grpc.CallOption) (*PolicyDefinition, error)
ModPolicyRoutePolicy(ctx context.Context, opts ...grpc.CallOption) (Grpc_ModPolicyRoutePolicyClient, error)
+ MonitorBestChanged(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorBestChangedClient, error)
+ MonitorPeerState(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorPeerStateClient, error)
}
type grpcClient struct {
@@ -1311,38 +1312,6 @@ func (x *grpcGetAdjRibClient) Recv() (*Path, error) {
return m, nil
}
-func (c *grpcClient) MonitorBestChanged(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorBestChangedClient, error) {
- stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[3], c.cc, "/api.Grpc/MonitorBestChanged", opts...)
- if err != nil {
- return nil, err
- }
- x := &grpcMonitorBestChangedClient{stream}
- if err := x.ClientStream.SendMsg(in); err != nil {
- return nil, err
- }
- if err := x.ClientStream.CloseSend(); err != nil {
- return nil, err
- }
- return x, nil
-}
-
-type Grpc_MonitorBestChangedClient interface {
- Recv() (*Path, error)
- grpc.ClientStream
-}
-
-type grpcMonitorBestChangedClient struct {
- grpc.ClientStream
-}
-
-func (x *grpcMonitorBestChangedClient) Recv() (*Path, error) {
- m := new(Path)
- if err := x.ClientStream.RecvMsg(m); err != nil {
- return nil, err
- }
- return m, nil
-}
-
func (c *grpcClient) Reset(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (*Error, error) {
out := new(Error)
err := grpc.Invoke(ctx, "/api.Grpc/Reset", in, out, c.cc, opts...)
@@ -1407,7 +1376,7 @@ func (c *grpcClient) Disable(ctx context.Context, in *Arguments, opts ...grpc.Ca
}
func (c *grpcClient) ModPath(ctx context.Context, opts ...grpc.CallOption) (Grpc_ModPathClient, error) {
- stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[4], c.cc, "/api.Grpc/ModPath", opts...)
+ stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[3], c.cc, "/api.Grpc/ModPath", opts...)
if err != nil {
return nil, err
}
@@ -1450,7 +1419,7 @@ func (c *grpcClient) GetNeighborPolicy(ctx context.Context, in *Arguments, opts
}
func (c *grpcClient) ModNeighborPolicy(ctx context.Context, opts ...grpc.CallOption) (Grpc_ModNeighborPolicyClient, error) {
- stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[5], c.cc, "/api.Grpc/ModNeighborPolicy", opts...)
+ stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[4], c.cc, "/api.Grpc/ModNeighborPolicy", opts...)
if err != nil {
return nil, err
}
@@ -1481,7 +1450,7 @@ func (x *grpcModNeighborPolicyClient) Recv() (*Error, error) {
}
func (c *grpcClient) GetPolicyRoutePolicies(ctx context.Context, in *PolicyArguments, opts ...grpc.CallOption) (Grpc_GetPolicyRoutePoliciesClient, error) {
- stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[6], c.cc, "/api.Grpc/GetPolicyRoutePolicies", opts...)
+ stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[5], c.cc, "/api.Grpc/GetPolicyRoutePolicies", opts...)
if err != nil {
return nil, err
}
@@ -1522,7 +1491,7 @@ func (c *grpcClient) GetPolicyRoutePolicy(ctx context.Context, in *PolicyArgumen
}
func (c *grpcClient) ModPolicyRoutePolicy(ctx context.Context, opts ...grpc.CallOption) (Grpc_ModPolicyRoutePolicyClient, error) {
- stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[7], c.cc, "/api.Grpc/ModPolicyRoutePolicy", opts...)
+ stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[6], c.cc, "/api.Grpc/ModPolicyRoutePolicy", opts...)
if err != nil {
return nil, err
}
@@ -1552,6 +1521,70 @@ func (x *grpcModPolicyRoutePolicyClient) Recv() (*Error, error) {
return m, nil
}
+func (c *grpcClient) MonitorBestChanged(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorBestChangedClient, error) {
+ stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[7], c.cc, "/api.Grpc/MonitorBestChanged", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &grpcMonitorBestChangedClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type Grpc_MonitorBestChangedClient interface {
+ Recv() (*Path, error)
+ grpc.ClientStream
+}
+
+type grpcMonitorBestChangedClient struct {
+ grpc.ClientStream
+}
+
+func (x *grpcMonitorBestChangedClient) Recv() (*Path, error) {
+ m := new(Path)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func (c *grpcClient) MonitorPeerState(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorPeerStateClient, error) {
+ stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[8], c.cc, "/api.Grpc/MonitorPeerState", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &grpcMonitorPeerStateClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type Grpc_MonitorPeerStateClient interface {
+ Recv() (*Peer, error)
+ grpc.ClientStream
+}
+
+type grpcMonitorPeerStateClient struct {
+ grpc.ClientStream
+}
+
+func (x *grpcMonitorPeerStateClient) Recv() (*Peer, error) {
+ m := new(Peer)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
// Server API for Grpc service
type GrpcServer interface {
@@ -1559,7 +1592,6 @@ type GrpcServer interface {
GetNeighbor(context.Context, *Arguments) (*Peer, error)
GetRib(*Arguments, Grpc_GetRibServer) error
GetAdjRib(*Arguments, Grpc_GetAdjRibServer) error
- MonitorBestChanged(*Arguments, Grpc_MonitorBestChangedServer) error
Reset(context.Context, *Arguments) (*Error, error)
SoftReset(context.Context, *Arguments) (*Error, error)
SoftResetIn(context.Context, *Arguments) (*Error, error)
@@ -1573,6 +1605,8 @@ type GrpcServer interface {
GetPolicyRoutePolicies(*PolicyArguments, Grpc_GetPolicyRoutePoliciesServer) error
GetPolicyRoutePolicy(context.Context, *PolicyArguments) (*PolicyDefinition, error)
ModPolicyRoutePolicy(Grpc_ModPolicyRoutePolicyServer) error
+ MonitorBestChanged(*Arguments, Grpc_MonitorBestChangedServer) error
+ MonitorPeerState(*Arguments, Grpc_MonitorPeerStateServer) error
}
func RegisterGrpcServer(s *grpc.Server, srv GrpcServer) {
@@ -1654,27 +1688,6 @@ func (x *grpcGetAdjRibServer) Send(m *Path) error {
return x.ServerStream.SendMsg(m)
}
-func _Grpc_MonitorBestChanged_Handler(srv interface{}, stream grpc.ServerStream) error {
- m := new(Arguments)
- if err := stream.RecvMsg(m); err != nil {
- return err
- }
- return srv.(GrpcServer).MonitorBestChanged(m, &grpcMonitorBestChangedServer{stream})
-}
-
-type Grpc_MonitorBestChangedServer interface {
- Send(*Path) error
- grpc.ServerStream
-}
-
-type grpcMonitorBestChangedServer struct {
- grpc.ServerStream
-}
-
-func (x *grpcMonitorBestChangedServer) Send(m *Path) error {
- return x.ServerStream.SendMsg(m)
-}
-
func _Grpc_Reset_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
in := new(Arguments)
if err := codec.Unmarshal(buf, in); err != nil {
@@ -1882,6 +1895,48 @@ func (x *grpcModPolicyRoutePolicyServer) Recv() (*PolicyArguments, error) {
return m, nil
}
+func _Grpc_MonitorBestChanged_Handler(srv interface{}, stream grpc.ServerStream) error {
+ m := new(Arguments)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(GrpcServer).MonitorBestChanged(m, &grpcMonitorBestChangedServer{stream})
+}
+
+type Grpc_MonitorBestChangedServer interface {
+ Send(*Path) error
+ grpc.ServerStream
+}
+
+type grpcMonitorBestChangedServer struct {
+ grpc.ServerStream
+}
+
+func (x *grpcMonitorBestChangedServer) Send(m *Path) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func _Grpc_MonitorPeerState_Handler(srv interface{}, stream grpc.ServerStream) error {
+ m := new(Arguments)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(GrpcServer).MonitorPeerState(m, &grpcMonitorPeerStateServer{stream})
+}
+
+type Grpc_MonitorPeerStateServer interface {
+ Send(*Peer) error
+ grpc.ServerStream
+}
+
+type grpcMonitorPeerStateServer struct {
+ grpc.ServerStream
+}
+
+func (x *grpcMonitorPeerStateServer) Send(m *Peer) error {
+ return x.ServerStream.SendMsg(m)
+}
+
var _Grpc_serviceDesc = grpc.ServiceDesc{
ServiceName: "api.Grpc",
HandlerType: (*GrpcServer)(nil),
@@ -1944,11 +1999,6 @@ var _Grpc_serviceDesc = grpc.ServiceDesc{
ServerStreams: true,
},
{
- StreamName: "MonitorBestChanged",
- Handler: _Grpc_MonitorBestChanged_Handler,
- ServerStreams: true,
- },
- {
StreamName: "ModPath",
Handler: _Grpc_ModPath_Handler,
ClientStreams: true,
@@ -1970,5 +2020,15 @@ var _Grpc_serviceDesc = grpc.ServiceDesc{
ServerStreams: true,
ClientStreams: true,
},
+ {
+ StreamName: "MonitorBestChanged",
+ Handler: _Grpc_MonitorBestChanged_Handler,
+ ServerStreams: true,
+ },
+ {
+ StreamName: "MonitorPeerState",
+ Handler: _Grpc_MonitorPeerState_Handler,
+ ServerStreams: true,
+ },
},
}
diff --git a/api/gobgp.proto b/api/gobgp.proto
index 9a7717f3..2c6c70d5 100644
--- a/api/gobgp.proto
+++ b/api/gobgp.proto
@@ -24,7 +24,6 @@ service Grpc {
rpc GetNeighbor(Arguments) returns (Peer) {}
rpc GetRib(Arguments) returns (stream Destination) {}
rpc GetAdjRib(Arguments) returns (stream Path) {}
- rpc MonitorBestChanged(Arguments) returns (stream Path) {}
rpc Reset(Arguments) returns (Error) {}
rpc SoftReset(Arguments) returns (Error) {}
rpc SoftResetIn(Arguments) returns (Error) {}
@@ -38,6 +37,8 @@ service Grpc {
rpc GetPolicyRoutePolicies(PolicyArguments) returns (stream PolicyDefinition) {}
rpc GetPolicyRoutePolicy(PolicyArguments) returns (PolicyDefinition) {}
rpc ModPolicyRoutePolicy(stream PolicyArguments) returns (stream Error) {}
+ rpc MonitorBestChanged(Arguments) returns (stream Path) {}
+ rpc MonitorPeerState(Arguments) returns (stream Peer) {}
}
message Error {
diff --git a/server/grpc_server.go b/server/grpc_server.go
index aa36a264..12a3a87a 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -48,7 +48,6 @@ const (
REQ_GLOBAL_RIB
REQ_GLOBAL_ADD
REQ_GLOBAL_DELETE
- REQ_GLOBAL_MONITOR_BEST_CHANGED
REQ_POLICY_PREFIX
REQ_POLICY_PREFIXES
REQ_POLICY_PREFIX_ADD
@@ -74,6 +73,8 @@ const (
REQ_POLICY_COMMUNITY_ADD
REQ_POLICY_COMMUNITY_DELETE
REQ_POLICY_COMMUNITIES_DELETE
+ REQ_MONITOR_GLOBAL_BEST_CHANGED
+ REQ_MONITOR_NEIGHBOR_PEER_STATE
)
const GRPC_PORT = 8080
@@ -210,7 +211,7 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.Grpc_MonitorB
var reqType int
switch arg.Resource {
case api.Resource_GLOBAL:
- reqType = REQ_GLOBAL_MONITOR_BEST_CHANGED
+ reqType = REQ_MONITOR_GLOBAL_BEST_CHANGED
default:
return fmt.Errorf("unsupported resource type: %v", arg.Resource)
}
@@ -237,6 +238,27 @@ END:
return err
}
+func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.Grpc_MonitorPeerStateServer) error {
+ var rf bgp.RouteFamily
+ req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.RouterId, rf, nil)
+ s.bgpServerCh <- req
+
+ var err error
+
+ for res := range req.ResponseCh {
+ if err = res.Err(); err != nil {
+ log.Debug(err.Error())
+ goto END
+ }
+ if err = stream.Send(res.Data.(*api.Peer)); err != nil {
+ goto END
+ }
+ }
+END:
+ req.EndCh <- struct{}{}
+ return err
+}
+
func (s *Server) neighbor(reqType int, arg *api.Arguments) (*api.Error, error) {
rf, err := convertAf2Rf(arg.Af)
if err != nil {
@@ -555,7 +577,7 @@ func NewGrpcRequest(reqType int, remoteAddr string, rf bgp.RouteFamily, d interf
RouteFamily: rf,
RemoteAddr: remoteAddr,
ResponseCh: make(chan *GrpcResponse),
- EndCh: make(chan struct{}),
+ EndCh: make(chan struct{}, 1),
Data: d,
}
return r
diff --git a/server/server.go b/server/server.go
index cc19ecc7..7bfedc49 100644
--- a/server/server.go
+++ b/server/server.go
@@ -219,7 +219,7 @@ func (server *BgpServer) Serve() {
server.neighborMap[name] = peer
peer.outgoing = make(chan *bgp.BGPMessage, 128)
peer.startFSMHandler(incoming)
-
+ server.broadcastPeerState(peer)
case config := <-server.deletedPeerCh:
addr := config.NeighborAddress.String()
SetTcpMD5SigSockopts(listener(config.NeighborAddress), addr, "")
@@ -415,6 +415,10 @@ func (server *BgpServer) broadcastBests(bests []table.Path) {
}
remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs))
for _, req := range server.broadcastReqs {
+ if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED {
+ remainReqs = append(remainReqs, req)
+ continue
+ }
select {
case <-req.EndCh:
continue
@@ -426,6 +430,28 @@ func (server *BgpServer) broadcastBests(bests []table.Path) {
}
}
+func (server *BgpServer) broadcastPeerState(peer *Peer) {
+ result := &GrpcResponse{
+ Data: peer.ToApiStruct(),
+ }
+ remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs))
+ for _, req := range server.broadcastReqs {
+ ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE
+ ignore = ignore || (req.RemoteAddr != "" && req.RemoteAddr != peer.config.NeighborAddress.String())
+ if ignore {
+ remainReqs = append(remainReqs, req)
+ continue
+ }
+ select {
+ case <-req.EndCh:
+ continue
+ case req.ResponseCh <- result:
+ }
+ remainReqs = append(remainReqs, req)
+ }
+ server.broadcastReqs = remainReqs
+}
+
func (server *BgpServer) propagateUpdate(neighborAddress string, RouteServerClient bool, pathList []table.Path) []*SenderMsg {
msgs := make([]*SenderMsg, 0)
@@ -528,6 +554,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan *
peer.config.BgpNeighborCommonState = config.BgpNeighborCommonState{}
}
peer.startFSMHandler(incoming)
+ server.broadcastPeerState(peer)
case FSM_MSG_BGP_MESSAGE:
switch m := e.MsgData.(type) {
@@ -814,8 +841,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
close(grpcReq.ResponseCh)
}
- case REQ_GLOBAL_MONITOR_BEST_CHANGED:
- server.broadcastReqs = append(server.broadcastReqs, grpcReq)
case REQ_NEIGHBORS:
for _, peer := range server.neighborMap {
result := &GrpcResponse{
@@ -1089,6 +1114,8 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
case REQ_POLICY_PREFIXES_DELETE, REQ_POLICY_NEIGHBORS_DELETE, REQ_POLICY_ASPATHS_DELETE,
REQ_POLICY_COMMUNITIES_DELETE, REQ_POLICY_ROUTEPOLICIES_DELETE:
server.handleGrpcDelPolicies(grpcReq)
+ case REQ_MONITOR_GLOBAL_BEST_CHANGED, REQ_MONITOR_NEIGHBOR_PEER_STATE:
+ server.broadcastReqs = append(server.broadcastReqs, grpcReq)
default:
errmsg := fmt.Errorf("Unknown request type: %v", grpcReq.RequestType)
result := &GrpcResponse{