diff options
-rw-r--r-- | api/gobgp.pb.go | 115 | ||||
-rw-r--r-- | api/gobgp.proto | 19 | ||||
-rw-r--r-- | docs/sources/rpki.md | 14 | ||||
-rw-r--r-- | gobgp/common.go | 2 | ||||
-rw-r--r-- | gobgp/mrt.go | 4 | ||||
-rw-r--r-- | gobgp/rpki.go | 51 | ||||
-rw-r--r-- | server/grpc_server.go | 10 | ||||
-rw-r--r-- | server/rpki.go | 137 | ||||
-rw-r--r-- | server/server.go | 15 |
9 files changed, 300 insertions, 67 deletions
diff --git a/api/gobgp.pb.go b/api/gobgp.pb.go index e76bc45c..182845ea 100644 --- a/api/gobgp.pb.go +++ b/api/gobgp.pb.go @@ -39,6 +39,9 @@ It has these top-level messages: PolicyDefinition ApplyPolicy MrtMessage + RPKIConf + RPKIState + RPKI ROA Vrf */ @@ -718,6 +721,48 @@ func (m *MrtMessage) Reset() { *m = MrtMessage{} } func (m *MrtMessage) String() string { return proto.CompactTextString(m) } func (*MrtMessage) ProtoMessage() {} +type RPKIConf struct { + Address string `protobuf:"bytes,1,opt,name=address" json:"address,omitempty"` +} + +func (m *RPKIConf) Reset() { *m = RPKIConf{} } +func (m *RPKIConf) String() string { return proto.CompactTextString(m) } +func (*RPKIConf) ProtoMessage() {} + +type RPKIState struct { + Uptime int64 `protobuf:"varint,1,opt,name=uptime" json:"uptime,omitempty"` + Downtime int64 `protobuf:"varint,2,opt,name=downtime" json:"downtime,omitempty"` + ReceivedIpv4 int32 `protobuf:"varint,3,opt,name=received_ipv4" json:"received_ipv4,omitempty"` + ReceivedIpv6 int32 `protobuf:"varint,4,opt,name=received_ipv6" json:"received_ipv6,omitempty"` +} + +func (m *RPKIState) Reset() { *m = RPKIState{} } +func (m *RPKIState) String() string { return proto.CompactTextString(m) } +func (*RPKIState) ProtoMessage() {} + +type RPKI struct { + Conf *RPKIConf `protobuf:"bytes,1,opt,name=conf" json:"conf,omitempty"` + State *RPKIState `protobuf:"bytes,2,opt,name=state" json:"state,omitempty"` +} + +func (m *RPKI) Reset() { *m = RPKI{} } +func (m *RPKI) String() string { return proto.CompactTextString(m) } +func (*RPKI) ProtoMessage() {} + +func (m *RPKI) GetConf() *RPKIConf { + if m != nil { + return m.Conf + } + return nil +} + +func (m *RPKI) GetState() *RPKIState { + if m != nil { + return m.State + } + return nil +} + type ROA struct { As uint32 `protobuf:"varint,1,opt,name=as" json:"as,omitempty"` Prefixlen uint32 `protobuf:"varint,2,opt,name=prefixlen" json:"prefixlen,omitempty"` @@ -770,6 +815,7 @@ type GrpcClient interface { MonitorPeerState(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_MonitorPeerStateClient, error) GetMrt(ctx context.Context, in *MrtArguments, opts ...grpc.CallOption) (Grpc_GetMrtClient, error) GetRPKI(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_GetRPKIClient, error) + GetROA(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_GetROAClient, error) GetVrfs(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_GetVrfsClient, error) ModVrf(ctx context.Context, in *ModVrfArguments, opts ...grpc.CallOption) (*Error, error) } @@ -1176,7 +1222,7 @@ func (c *grpcClient) GetRPKI(ctx context.Context, in *Arguments, opts ...grpc.Ca } type Grpc_GetRPKIClient interface { - Recv() (*ROA, error) + Recv() (*RPKI, error) grpc.ClientStream } @@ -1184,7 +1230,39 @@ type grpcGetRPKIClient struct { grpc.ClientStream } -func (x *grpcGetRPKIClient) Recv() (*ROA, error) { +func (x *grpcGetRPKIClient) Recv() (*RPKI, error) { + m := new(RPKI) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *grpcClient) GetROA(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_GetROAClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[10], c.cc, "/api.Grpc/GetROA", opts...) + if err != nil { + return nil, err + } + x := &grpcGetROAClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Grpc_GetROAClient interface { + Recv() (*ROA, error) + grpc.ClientStream +} + +type grpcGetROAClient struct { + grpc.ClientStream +} + +func (x *grpcGetROAClient) Recv() (*ROA, error) { m := new(ROA) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -1193,7 +1271,7 @@ func (x *grpcGetRPKIClient) Recv() (*ROA, error) { } func (c *grpcClient) GetVrfs(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (Grpc_GetVrfsClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[10], c.cc, "/api.Grpc/GetVrfs", opts...) + stream, err := grpc.NewClientStream(ctx, &_Grpc_serviceDesc.Streams[11], c.cc, "/api.Grpc/GetVrfs", opts...) if err != nil { return nil, err } @@ -1256,6 +1334,7 @@ type GrpcServer interface { MonitorPeerState(*Arguments, Grpc_MonitorPeerStateServer) error GetMrt(*MrtArguments, Grpc_GetMrtServer) error GetRPKI(*Arguments, Grpc_GetRPKIServer) error + GetROA(*Arguments, Grpc_GetROAServer) error GetVrfs(*Arguments, Grpc_GetVrfsServer) error ModVrf(context.Context, *ModVrfArguments) (*Error, error) } @@ -1597,7 +1676,7 @@ func _Grpc_GetRPKI_Handler(srv interface{}, stream grpc.ServerStream) error { } type Grpc_GetRPKIServer interface { - Send(*ROA) error + Send(*RPKI) error grpc.ServerStream } @@ -1605,7 +1684,28 @@ type grpcGetRPKIServer struct { grpc.ServerStream } -func (x *grpcGetRPKIServer) Send(m *ROA) error { +func (x *grpcGetRPKIServer) Send(m *RPKI) error { + return x.ServerStream.SendMsg(m) +} + +func _Grpc_GetROA_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Arguments) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(GrpcServer).GetROA(m, &grpcGetROAServer{stream}) +} + +type Grpc_GetROAServer interface { + Send(*ROA) error + grpc.ServerStream +} + +type grpcGetROAServer struct { + grpc.ServerStream +} + +func (x *grpcGetROAServer) Send(m *ROA) error { return x.ServerStream.SendMsg(m) } @@ -1745,6 +1845,11 @@ var _Grpc_serviceDesc = grpc.ServiceDesc{ ServerStreams: true, }, { + StreamName: "GetROA", + Handler: _Grpc_GetROA_Handler, + ServerStreams: true, + }, + { StreamName: "GetVrfs", Handler: _Grpc_GetVrfs_Handler, ServerStreams: true, diff --git a/api/gobgp.proto b/api/gobgp.proto index 80b88c02..f08d5b26 100644 --- a/api/gobgp.proto +++ b/api/gobgp.proto @@ -39,7 +39,8 @@ service Grpc { rpc MonitorBestChanged(Arguments) returns (stream Destination) {} rpc MonitorPeerState(Arguments) returns (stream Peer) {} rpc GetMrt(MrtArguments) returns (stream MrtMessage) {} - rpc GetRPKI(Arguments) returns (stream ROA) {} + rpc GetRPKI(Arguments) returns (stream RPKI) {} + rpc GetROA(Arguments) returns (stream ROA) {} rpc GetVrfs(Arguments) returns (stream Vrf) {} rpc ModVrf(ModVrfArguments) returns (Error) {} } @@ -295,6 +296,22 @@ message MrtMessage { bytes data = 1; } +message RPKIConf { + string address = 1; +} + +message RPKIState { + int64 uptime = 1; + int64 downtime = 2; + int32 received_ipv4 = 3; + int32 received_ipv6 = 4; +} + +message RPKI { + RPKIConf conf = 1; + RPKIState state = 2; +} + message ROA { uint32 as = 1; uint32 prefixlen = 2; diff --git a/docs/sources/rpki.md b/docs/sources/rpki.md index 834cb7aa..552b0c48 100644 --- a/docs/sources/rpki.md +++ b/docs/sources/rpki.md @@ -56,26 +56,28 @@ and get the ROA (Route Origin Authorization) information in the following way: ```bash -$ gobgp rpki|head -n4 +$ gobgp rpki server +Session State Uptime #IPv4/IPv6 records +210.173.170.254 Up 00:03:06 14823/2168 +``` + +```bash +$ gobgp rpki table 210.173.170.254|head -n4 Network Maxlen AS 2.0.0.0/12 16 3215 2.0.0.0/16 16 3215 2.1.0.0/16 16 3215 -$ gobgp rpki |wc -l -14576 ``` By default, IPv4's ROA information is shown. You can see IPv6's like: ```bash -$ gobgp rpki -a ipv6|head -n4 +$ gobgp rpki -a ipv6 table 210.173.170.254|head -n4 fujita@ubuntu:~$ gobgp rpki -a ipv6|head -n3 Network Maxlen AS 2001:608::/32 32 5539 2001:610::/32 48 1103 2001:610:240::/42 42 3333 -$ gobgp rpki -a ipv6|wc -l -2150 ``` We configure the peer 10.0.255.1 to send three routes: diff --git a/gobgp/common.go b/gobgp/common.go index f61cf2ad..1d8c6613 100644 --- a/gobgp/common.go +++ b/gobgp/common.go @@ -60,6 +60,8 @@ const ( CMD_DUMP = "dump" CMD_INJECT = "inject" CMD_RPKI = "rpki" + CMD_RPKI_TABLE = "table" + CMD_RPKI_SERVER = "server" CMD_VRF = "vrf" ) diff --git a/gobgp/mrt.go b/gobgp/mrt.go index d3f89f48..c182d23a 100644 --- a/gobgp/mrt.go +++ b/gobgp/mrt.go @@ -296,8 +296,8 @@ func injectMrt(r string, filename string, count int) error { ch <- &api.ModPathArguments{ Resource: resource, Path: path, - Asn: peers[e.PeerIndex].AS, - Id: peers[e.PeerIndex].BgpId.String(), + Asn: peers[e.PeerIndex].AS, + Id: peers[e.PeerIndex].BgpId.String(), } } diff --git a/gobgp/rpki.go b/gobgp/rpki.go index 89632eac..0e299a69 100644 --- a/gobgp/rpki.go +++ b/gobgp/rpki.go @@ -24,18 +24,48 @@ import ( "io" "net" "os" + "time" ) +func showRPKIServer(args []string) error { + arg := &api.Arguments{} + + stream, err := client.GetRPKI(context.Background(), arg) + if err != nil { + fmt.Println(err) + return err + } + format := "%-18s %-6s %-10s %s\n" + fmt.Printf(format, "Session", "State", "Uptime", "#IPv4/IPv6 records") + for { + r, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + return err + } + s := "Up" + uptime := int64(time.Now().Sub(time.Unix(r.State.Uptime, 0)).Seconds()) + + fmt.Printf(format, fmt.Sprintf(r.Conf.Address), s, fmt.Sprint(formatTimedelta(uptime)), fmt.Sprintf("%d/%d", r.State.ReceivedIpv4, r.State.ReceivedIpv6)) + } + return nil +} + func showRPKITable(args []string) error { + if len(args) == 0 { + return fmt.Errorf("Needs to specify RPKI server address") + } rf, err := checkAddressFamily(net.IP{}) if err != nil { fmt.Println(err) os.Exit(1) } arg := &api.Arguments{ - Rf: uint32(rf), + Rf: uint32(rf), + Name: args[0], } - stream, err := client.GetRPKI(context.Background(), arg) + stream, err := client.GetROA(context.Background(), arg) if err != nil { fmt.Println(err) return err @@ -64,10 +94,25 @@ func showRPKITable(args []string) error { func NewRPKICmd() *cobra.Command { rpkiCmd := &cobra.Command{ Use: CMD_RPKI, + } + + serverCmd := &cobra.Command{ + Use: CMD_RPKI_SERVER, + Run: func(cmd *cobra.Command, args []string) { + showRPKIServer(args) + }, + } + + rpkiCmd.AddCommand(serverCmd) + + tableCmd := &cobra.Command{ + Use: CMD_RPKI_TABLE, Run: func(cmd *cobra.Command, args []string) { showRPKITable(args) }, } - rpkiCmd.PersistentFlags().StringVarP(&subOpts.AddressFamily, "address-family", "a", "", "address family") + tableCmd.PersistentFlags().StringVarP(&subOpts.AddressFamily, "address-family", "a", "", "address family") + + rpkiCmd.AddCommand(tableCmd) return rpkiCmd } diff --git a/server/grpc_server.go b/server/grpc_server.go index b56e2e14..ee8e5de1 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -83,6 +83,7 @@ const ( REQ_MRT_GLOBAL_RIB REQ_MRT_LOCAL_RIB REQ_RPKI + REQ_ROA REQ_VRF REQ_VRFS REQ_VRF_MOD @@ -513,6 +514,15 @@ func (s *Server) GetRPKI(arg *api.Arguments, stream api.Grpc_GetRPKIServer) erro s.bgpServerCh <- req return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.RPKI)) + }) +} + +func (s *Server) GetROA(arg *api.Arguments, stream api.Grpc_GetROAServer) error { + req := NewGrpcRequest(REQ_ROA, arg.Name, bgp.RouteFamily(arg.Rf), nil) + s.bgpServerCh <- req + + return handleMultipleResponses(req, func(res *GrpcResponse) error { return stream.Send(res.Data.(*api.ROA)) }) } diff --git a/server/rpki.go b/server/rpki.go index 53a56e04..dc8a94cc 100644 --- a/server/rpki.go +++ b/server/rpki.go @@ -19,12 +19,14 @@ import ( "bufio" "bytes" "fmt" + log "github.com/Sirupsen/logrus" "github.com/armon/go-radix" "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" "net" + "time" ) type roa struct { @@ -48,12 +50,12 @@ func (r *roa) toApiStruct() *api.ROA { } type roaClient struct { - url string roas map[bgp.RouteFamily]*radix.Tree - outgoing chan *roa + outgoing chan []byte + config config.RpkiServers } -func (c *roaClient) recieveROA() chan *roa { +func (c *roaClient) recieveROA() chan []byte { return c.outgoing } @@ -65,33 +67,91 @@ func roa2key(r *roa) string { return buffer.String()[:r.PrefixLen] } -func (c *roaClient) handleRTRMsg(r *roa) { - if r.Prefix.To4() != nil { - c.roas[bgp.RF_IPv4_UC].Insert(roa2key(r), r) +func (c *roaClient) handleRTRMsg(buf []byte) { + received := &c.config.RpkiServerList[0].RpkiServerState.RpkiMessages.RpkiReceived + + m, _ := bgp.ParseRTR(buf) + if m != nil { + switch msg := m.(type) { + case *bgp.RTRSerialNotify: + received.SerialNotify++ + case *bgp.RTRSerialQuery: + case *bgp.RTRResetQuery: + case *bgp.RTRCacheResponse: + received.CacheResponse++ + case *bgp.RTRIPPrefix: + p := make([]byte, len(msg.Prefix)) + copy(p, msg.Prefix) + r := &roa{ + AS: msg.AS, + PrefixLen: msg.PrefixLen, + MaxLen: msg.MaxLen, + Prefix: p, + } + if r.Prefix.To4() != nil { + received.Ipv4Prefix++ + c.roas[bgp.RF_IPv4_UC].Insert(roa2key(r), r) + } else { + received.Ipv6Prefix++ + c.roas[bgp.RF_IPv6_UC].Insert(roa2key(r), r) + } + case *bgp.RTREndOfData: + received.EndOfData++ + case *bgp.RTRCacheReset: + received.CacheReset++ + case *bgp.RTRErrorReport: + } } else { - c.roas[bgp.RF_IPv6_UC].Insert(roa2key(r), r) + received.Error++ } } func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) { - if tree, ok := c.roas[grpcReq.RouteFamily]; ok { + switch grpcReq.RequestType { + case REQ_RPKI: results := make([]*GrpcResponse, 0) - tree.Walk(func(s string, v interface{}) bool { - r, _ := v.(*roa) + for _, s := range c.config.RpkiServerList { + state := &s.RpkiServerState + rpki := &api.RPKI{ + Conf: &api.RPKIConf{ + Address: s.RpkiServerConfig.Address.String(), + }, + State: &api.RPKIState{ + Uptime: state.Uptime, + ReceivedIpv4: int32(c.roas[bgp.RF_IPv4_UC].Len()), + ReceivedIpv6: int32(c.roas[bgp.RF_IPv6_UC].Len()), + }, + } result := &GrpcResponse{} - result.Data = r.toApiStruct() + result.Data = rpki results = append(results, result) - return false - }) + } go sendMultipleResponses(grpcReq, results) + + case REQ_ROA: + if len(c.config.RpkiServerList) == 0 || c.config.RpkiServerList[0].RpkiServerConfig.Address.String() != grpcReq.Name { + result := &GrpcResponse{} + result.ResponseErr = fmt.Errorf("RPKI server that has %v doesn't exist.", grpcReq.Name) + + grpcReq.ResponseCh <- result + break + } + + if tree, ok := c.roas[grpcReq.RouteFamily]; ok { + results := make([]*GrpcResponse, 0) + tree.Walk(func(s string, v interface{}) bool { + r, _ := v.(*roa) + result := &GrpcResponse{} + result.Data = r.toApiStruct() + results = append(results, result) + return false + }) + go sendMultipleResponses(grpcReq, results) + } } } func (c *roaClient) validate(pathList []*table.Path) { - if c.url == "" { - return - } - for _, path := range pathList { if tree, ok := c.roas[path.GetRouteFamily()]; ok { _, n, _ := net.ParseCIDR(path.GetNlri().String()) @@ -115,16 +175,28 @@ func (c *roaClient) validate(pathList []*table.Path) { } } -func newROAClient(url string) (*roaClient, error) { +func newROAClient(conf config.RpkiServers) (*roaClient, error) { + var url string + c := &roaClient{ - url: url, - roas: make(map[bgp.RouteFamily]*radix.Tree), + roas: make(map[bgp.RouteFamily]*radix.Tree), + config: conf, } c.roas[bgp.RF_IPv4_UC] = radix.New() c.roas[bgp.RF_IPv6_UC] = radix.New() - if url == "" { + if len(conf.RpkiServerList) == 0 { return c, nil + } else { + if len(conf.RpkiServerList) > 1 { + log.Warn("currently only one RPKI server is supposed") + } + if conf.RpkiServerList[0].RpkiServerConfig.Address.To16() == nil { + url = fmt.Sprintf("%s", conf.RpkiServerList[0].RpkiServerConfig.Address) + } else { + url = fmt.Sprintf("[%s]", conf.RpkiServerList[0].RpkiServerConfig.Address) + } + url += fmt.Sprintf(":%d", conf.RpkiServerList[0].RpkiServerConfig.Port) } conn, err := net.Dial("tcp", url) @@ -132,33 +204,22 @@ func newROAClient(url string) (*roaClient, error) { return c, err } + state := &conf.RpkiServerList[0].RpkiServerState + state.Uptime = time.Now().Unix() r := bgp.NewRTRResetQuery() data, _ := r.Serialize() conn.Write(data) + state.RpkiMessages.RpkiSent.ResetQuery++ reader := bufio.NewReader(conn) scanner := bufio.NewScanner(reader) scanner.Split(bgp.SplitRTR) - ch := make(chan *roa) + ch := make(chan []byte) c.outgoing = ch - go func(ch chan *roa) { + go func(ch chan []byte) { for scanner.Scan() { - m, _ := bgp.ParseRTR(scanner.Bytes()) - if m != nil { - switch msg := m.(type) { - case *bgp.RTRIPPrefix: - p := make([]byte, len(msg.Prefix)) - copy(p, msg.Prefix) - ch <- &roa{ - AS: msg.AS, - PrefixLen: msg.PrefixLen, - MaxLen: msg.MaxLen, - Prefix: p, - } - } - - } + ch <- scanner.Bytes() } }(ch) diff --git a/server/server.go b/server/server.go index af85883b..51b84c73 100644 --- a/server/server.go +++ b/server/server.go @@ -102,7 +102,7 @@ func NewBgpServer(port int) *BgpServer { b.localRibMap = make(map[string]*LocalRib) b.neighborMap = make(map[string]*Peer) b.listenPort = port - b.roaClient, _ = newROAClient("") + b.roaClient, _ = newROAClient(config.RpkiServers{}) return &b } @@ -227,16 +227,7 @@ func (server *BgpServer) Serve() { select { case c := <-server.rpkiConfigCh: - if len(c.RpkiServerList) > 0 { - var url string - if c.RpkiServerList[0].RpkiServerConfig.Address.To16() == nil { - url = fmt.Sprintf("%s", c.RpkiServerList[0].RpkiServerConfig.Address) - } else { - url = fmt.Sprintf("[%s]", c.RpkiServerList[0].RpkiServerConfig.Address) - } - url += fmt.Sprintf(":%d", c.RpkiServerList[0].RpkiServerConfig.Port) - server.roaClient, _ = newROAClient(url) - } + server.roaClient, _ = newROAClient(c) case rmsg := <-server.roaClient.recieveROA(): server.roaClient.handleRTRMsg(rmsg) case zmsg := <-zapiMsgCh: @@ -1492,7 +1483,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { server.broadcastReqs = append(server.broadcastReqs, grpcReq) case REQ_MRT_GLOBAL_RIB, REQ_MRT_LOCAL_RIB: server.handleMrt(grpcReq) - case REQ_RPKI: + case REQ_ROA, REQ_RPKI: server.roaClient.handleGRPC(grpcReq) case REQ_VRF, REQ_VRFS, REQ_VRF_MOD: pathList := server.handleVrfRequest(grpcReq) |