From 955409c37ce17daf346e30aa1d1e2d40767ebb43 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Mon, 31 Aug 2015 22:00:09 +0900 Subject: rpki: support show the state of RPKI servers Signed-off-by: FUJITA Tomonori --- server/grpc_server.go | 10 ++++ server/rpki.go | 137 ++++++++++++++++++++++++++++++++++++-------------- server/server.go | 15 ++---- 3 files changed, 112 insertions(+), 50 deletions(-) (limited to 'server') diff --git a/server/grpc_server.go b/server/grpc_server.go index b56e2e14..ee8e5de1 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -83,6 +83,7 @@ const ( REQ_MRT_GLOBAL_RIB REQ_MRT_LOCAL_RIB REQ_RPKI + REQ_ROA REQ_VRF REQ_VRFS REQ_VRF_MOD @@ -512,6 +513,15 @@ func (s *Server) GetRPKI(arg *api.Arguments, stream api.Grpc_GetRPKIServer) erro req := NewGrpcRequest(REQ_RPKI, "", bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.RPKI)) + }) +} + +func (s *Server) GetROA(arg *api.Arguments, stream api.Grpc_GetROAServer) error { + req := NewGrpcRequest(REQ_ROA, arg.Name, bgp.RouteFamily(arg.Rf), nil) + s.bgpServerCh <- req + return handleMultipleResponses(req, func(res *GrpcResponse) error { return stream.Send(res.Data.(*api.ROA)) }) diff --git a/server/rpki.go b/server/rpki.go index 53a56e04..dc8a94cc 100644 --- a/server/rpki.go +++ b/server/rpki.go @@ -19,12 +19,14 @@ import ( "bufio" "bytes" "fmt" + log "github.com/Sirupsen/logrus" "github.com/armon/go-radix" "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" "net" + "time" ) type roa struct { @@ -48,12 +50,12 @@ func (r *roa) toApiStruct() *api.ROA { } type roaClient struct { - url string roas map[bgp.RouteFamily]*radix.Tree - outgoing chan *roa + outgoing chan []byte + config config.RpkiServers } -func (c *roaClient) recieveROA() chan *roa { +func (c *roaClient) recieveROA() chan []byte { return c.outgoing } @@ -65,33 +67,91 @@ func roa2key(r *roa) string { return buffer.String()[:r.PrefixLen] } -func (c *roaClient) handleRTRMsg(r *roa) { - if r.Prefix.To4() != nil { - c.roas[bgp.RF_IPv4_UC].Insert(roa2key(r), r) +func (c *roaClient) handleRTRMsg(buf []byte) { + received := &c.config.RpkiServerList[0].RpkiServerState.RpkiMessages.RpkiReceived + + m, _ := bgp.ParseRTR(buf) + if m != nil { + switch msg := m.(type) { + case *bgp.RTRSerialNotify: + received.SerialNotify++ + case *bgp.RTRSerialQuery: + case *bgp.RTRResetQuery: + case *bgp.RTRCacheResponse: + received.CacheResponse++ + case *bgp.RTRIPPrefix: + p := make([]byte, len(msg.Prefix)) + copy(p, msg.Prefix) + r := &roa{ + AS: msg.AS, + PrefixLen: msg.PrefixLen, + MaxLen: msg.MaxLen, + Prefix: p, + } + if r.Prefix.To4() != nil { + received.Ipv4Prefix++ + c.roas[bgp.RF_IPv4_UC].Insert(roa2key(r), r) + } else { + received.Ipv6Prefix++ + c.roas[bgp.RF_IPv6_UC].Insert(roa2key(r), r) + } + case *bgp.RTREndOfData: + received.EndOfData++ + case *bgp.RTRCacheReset: + received.CacheReset++ + case *bgp.RTRErrorReport: + } } else { - c.roas[bgp.RF_IPv6_UC].Insert(roa2key(r), r) + received.Error++ } } func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) { - if tree, ok := c.roas[grpcReq.RouteFamily]; ok { + switch grpcReq.RequestType { + case REQ_RPKI: results := make([]*GrpcResponse, 0) - tree.Walk(func(s string, v interface{}) bool { - r, _ := v.(*roa) + for _, s := range c.config.RpkiServerList { + state := &s.RpkiServerState + rpki := &api.RPKI{ + Conf: &api.RPKIConf{ + Address: s.RpkiServerConfig.Address.String(), + }, + State: &api.RPKIState{ + Uptime: state.Uptime, + ReceivedIpv4: int32(c.roas[bgp.RF_IPv4_UC].Len()), + ReceivedIpv6: int32(c.roas[bgp.RF_IPv6_UC].Len()), + }, + } result := &GrpcResponse{} - result.Data = r.toApiStruct() + result.Data = rpki results = append(results, result) - return false - }) + } go sendMultipleResponses(grpcReq, results) + + case REQ_ROA: + if len(c.config.RpkiServerList) == 0 || c.config.RpkiServerList[0].RpkiServerConfig.Address.String() != grpcReq.Name { + result := &GrpcResponse{} + result.ResponseErr = fmt.Errorf("RPKI server that has %v doesn't exist.", grpcReq.Name) + + grpcReq.ResponseCh <- result + break + } + + if tree, ok := c.roas[grpcReq.RouteFamily]; ok { + results := make([]*GrpcResponse, 0) + tree.Walk(func(s string, v interface{}) bool { + r, _ := v.(*roa) + result := &GrpcResponse{} + result.Data = r.toApiStruct() + results = append(results, result) + return false + }) + go sendMultipleResponses(grpcReq, results) + } } } func (c *roaClient) validate(pathList []*table.Path) { - if c.url == "" { - return - } - for _, path := range pathList { if tree, ok := c.roas[path.GetRouteFamily()]; ok { _, n, _ := net.ParseCIDR(path.GetNlri().String()) @@ -115,16 +175,28 @@ func (c *roaClient) validate(pathList []*table.Path) { } } -func newROAClient(url string) (*roaClient, error) { +func newROAClient(conf config.RpkiServers) (*roaClient, error) { + var url string + c := &roaClient{ - url: url, - roas: make(map[bgp.RouteFamily]*radix.Tree), + roas: make(map[bgp.RouteFamily]*radix.Tree), + config: conf, } c.roas[bgp.RF_IPv4_UC] = radix.New() c.roas[bgp.RF_IPv6_UC] = radix.New() - if url == "" { + if len(conf.RpkiServerList) == 0 { return c, nil + } else { + if len(conf.RpkiServerList) > 1 { + log.Warn("currently only one RPKI server is supposed") + } + if conf.RpkiServerList[0].RpkiServerConfig.Address.To16() == nil { + url = fmt.Sprintf("%s", conf.RpkiServerList[0].RpkiServerConfig.Address) + } else { + url = fmt.Sprintf("[%s]", conf.RpkiServerList[0].RpkiServerConfig.Address) + } + url += fmt.Sprintf(":%d", conf.RpkiServerList[0].RpkiServerConfig.Port) } conn, err := net.Dial("tcp", url) @@ -132,33 +204,22 @@ func newROAClient(url string) (*roaClient, error) { return c, err } + state := &conf.RpkiServerList[0].RpkiServerState + state.Uptime = time.Now().Unix() r := bgp.NewRTRResetQuery() data, _ := r.Serialize() conn.Write(data) + state.RpkiMessages.RpkiSent.ResetQuery++ reader := bufio.NewReader(conn) scanner := bufio.NewScanner(reader) scanner.Split(bgp.SplitRTR) - ch := make(chan *roa) + ch := make(chan []byte) c.outgoing = ch - go func(ch chan *roa) { + go func(ch chan []byte) { for scanner.Scan() { - m, _ := bgp.ParseRTR(scanner.Bytes()) - if m != nil { - switch msg := m.(type) { - case *bgp.RTRIPPrefix: - p := make([]byte, len(msg.Prefix)) - copy(p, msg.Prefix) - ch <- &roa{ - AS: msg.AS, - PrefixLen: msg.PrefixLen, - MaxLen: msg.MaxLen, - Prefix: p, - } - } - - } + ch <- scanner.Bytes() } }(ch) diff --git a/server/server.go b/server/server.go index af85883b..51b84c73 100644 --- a/server/server.go +++ b/server/server.go @@ -102,7 +102,7 @@ func NewBgpServer(port int) *BgpServer { b.localRibMap = make(map[string]*LocalRib) b.neighborMap = make(map[string]*Peer) b.listenPort = port - b.roaClient, _ = newROAClient("") + b.roaClient, _ = newROAClient(config.RpkiServers{}) return &b } @@ -227,16 +227,7 @@ func (server *BgpServer) Serve() { select { case c := <-server.rpkiConfigCh: - if len(c.RpkiServerList) > 0 { - var url string - if c.RpkiServerList[0].RpkiServerConfig.Address.To16() == nil { - url = fmt.Sprintf("%s", c.RpkiServerList[0].RpkiServerConfig.Address) - } else { - url = fmt.Sprintf("[%s]", c.RpkiServerList[0].RpkiServerConfig.Address) - } - url += fmt.Sprintf(":%d", c.RpkiServerList[0].RpkiServerConfig.Port) - server.roaClient, _ = newROAClient(url) - } + server.roaClient, _ = newROAClient(c) case rmsg := <-server.roaClient.recieveROA(): server.roaClient.handleRTRMsg(rmsg) case zmsg := <-zapiMsgCh: @@ -1492,7 +1483,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { server.broadcastReqs = append(server.broadcastReqs, grpcReq) case REQ_MRT_GLOBAL_RIB, REQ_MRT_LOCAL_RIB: server.handleMrt(grpcReq) - case REQ_RPKI: + case REQ_ROA, REQ_RPKI: server.roaClient.handleGRPC(grpcReq) case REQ_VRF, REQ_VRFS, REQ_VRF_MOD: pathList := server.handleVrfRequest(grpcReq) -- cgit v1.2.3