From c3560d1224f7759455b6916843341b20355dbfe5 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Thu, 28 Jul 2016 20:47:01 +0900 Subject: move gRPC-related code for RPKI to grpc_server.go Signed-off-by: FUJITA Tomonori --- server/grpc_server.go | 12 ++-- server/rpki.go | 162 ++++++++++++++++++++++++-------------------------- server/server.go | 137 ++++++++++++++++++++++++++++-------------- 3 files changed, 175 insertions(+), 136 deletions(-) (limited to 'server') diff --git a/server/grpc_server.go b/server/grpc_server.go index d67d85cc..00e8e263 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -686,12 +686,12 @@ func (s *Server) SoftResetRpki(ctx context.Context, arg *api.SoftResetRpkiReques } func (s *Server) GetRpki(ctx context.Context, arg *api.GetRpkiRequest) (*api.GetRpkiResponse, error) { - d, err := s.get(REQ_GET_RPKI, arg) + servers, err := s.bgpServer.GetRpki() if err != nil { return nil, err } - l := make([]*api.Rpki, 0) - for _, s := range d.([]*config.RpkiServer) { + l := make([]*api.Rpki, 0, len(servers)) + for _, s := range servers { received := &s.State.RpkiMessages.RpkiReceived sent := &s.State.RpkiMessages.RpkiSent rpki := &api.Rpki{ @@ -725,12 +725,12 @@ func (s *Server) GetRpki(ctx context.Context, arg *api.GetRpkiRequest) (*api.Get } func (s *Server) GetRoa(ctx context.Context, arg *api.GetRoaRequest) (*api.GetRoaResponse, error) { - d, err := s.get(REQ_ROA, arg) + roas, err := s.bgpServer.GetRoa(bgp.RouteFamily(arg.Family)) if err != nil { return nil, err } - l := make([]*api.Roa, 0, len(d.([]*ROA))) - for _, r := range d.([]*ROA) { + l := make([]*api.Roa, 0, len(roas)) + for _, r := range roas { host, port, _ := net.SplitHostPort(r.Src) l = append(l, &api.Roa{ As: r.AS, diff --git a/server/rpki.go b/server/rpki.go index fb675314..c91ee7c0 100644 --- a/server/rpki.go +++ b/server/rpki.go @@ -423,102 +423,96 @@ func (c *roaManager) handleRTRMsg(client *roaClient, state *config.RpkiServerSta } } -func (c *roaManager) handleGRPC(grpcReq *GrpcRequest) *GrpcResponse { - switch grpcReq.RequestType { - case REQ_GET_RPKI: - f := func(tree *radix.Tree) (map[string]uint32, map[string]uint32) { - records := make(map[string]uint32) - prefixes := make(map[string]uint32) +func (c *roaManager) GetServers() []*config.RpkiServer { + f := func(tree *radix.Tree) (map[string]uint32, map[string]uint32) { + records := make(map[string]uint32) + prefixes := make(map[string]uint32) - tree.Walk(func(s string, v interface{}) bool { - b, _ := v.(*roaBucket) - tmpRecords := make(map[string]uint32) - for _, roa := range b.entries { - tmpRecords[roa.Src]++ - } + tree.Walk(func(s string, v interface{}) bool { + b, _ := v.(*roaBucket) + tmpRecords := make(map[string]uint32) + for _, roa := range b.entries { + tmpRecords[roa.Src]++ + } - for src, r := range tmpRecords { - if r > 0 { - records[src] += r - prefixes[src]++ - } + for src, r := range tmpRecords { + if r > 0 { + records[src] += r + prefixes[src]++ } - return false - }) - return records, prefixes - } + } + return false + }) + return records, prefixes + } - recordsV4, prefixesV4 := f(c.Roas[bgp.RF_IPv4_UC]) - recordsV6, prefixesV6 := f(c.Roas[bgp.RF_IPv6_UC]) + recordsV4, prefixesV4 := f(c.Roas[bgp.RF_IPv4_UC]) + recordsV6, prefixesV6 := f(c.Roas[bgp.RF_IPv6_UC]) - l := make([]*config.RpkiServer, 0, len(c.clientMap)) - for _, client := range c.clientMap { - state := &client.state + l := make([]*config.RpkiServer, 0, len(c.clientMap)) + for _, client := range c.clientMap { + state := &client.state - addr, port, _ := net.SplitHostPort(client.host) - l = append(l, &config.RpkiServer{ - Config: config.RpkiServerConfig{ - Address: addr, - Port: func() uint32 { p, _ := strconv.Atoi(port); return uint32(p) }(), - }, - State: client.state, - }) + addr, port, _ := net.SplitHostPort(client.host) + l = append(l, &config.RpkiServer{ + Config: config.RpkiServerConfig{ + Address: addr, + Port: func() uint32 { p, _ := strconv.Atoi(port); return uint32(p) }(), + }, + State: client.state, + }) - if client.conn == nil { - state.Up = false - } else { - state.Up = true - } - f := func(m map[string]uint32, key string) uint32 { - if r, ok := m[key]; ok { - return r - } - return 0 - } - state.RecordsV4 = f(recordsV4, client.host) - state.RecordsV6 = f(recordsV6, client.host) - state.PrefixesV4 = f(prefixesV4, client.host) - state.PrefixesV6 = f(prefixesV6, client.host) - state.SerialNumber = client.serialNumber - } - return &GrpcResponse{ - Data: l, + if client.conn == nil { + state.Up = false + } else { + state.Up = true } - case REQ_ROA: - if len(c.clientMap) == 0 { - return &GrpcResponse{ - ResponseErr: fmt.Errorf("RPKI server isn't configured."), + f := func(m map[string]uint32, key string) uint32 { + if r, ok := m[key]; ok { + return r } + return 0 } - var rfList []bgp.RouteFamily - switch grpcReq.RouteFamily { - case bgp.RF_IPv4_UC: - rfList = []bgp.RouteFamily{bgp.RF_IPv4_UC} - case bgp.RF_IPv6_UC: - rfList = []bgp.RouteFamily{bgp.RF_IPv6_UC} - default: - rfList = []bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC} - } - l := make([]*ROA, 0) - for _, rf := range rfList { - if tree, ok := c.Roas[rf]; ok { - tree.Walk(func(s string, v interface{}) bool { - b, _ := v.(*roaBucket) - var roaList roas - for _, r := range b.entries { - roaList = append(roaList, r) - } - sort.Sort(roaList) - for _, roa := range roaList { - l = append(l, roa) - } - return false - }) - } + state.RecordsV4 = f(recordsV4, client.host) + state.RecordsV6 = f(recordsV6, client.host) + state.PrefixesV4 = f(prefixesV4, client.host) + state.PrefixesV6 = f(prefixesV6, client.host) + state.SerialNumber = client.serialNumber + } + return l +} + +func (c *roaManager) GetRoa(family bgp.RouteFamily) ([]*ROA, error) { + if len(c.clientMap) == 0 { + return []*ROA{}, fmt.Errorf("RPKI server isn't configured.") + } + var rfList []bgp.RouteFamily + switch family { + case bgp.RF_IPv4_UC: + rfList = []bgp.RouteFamily{bgp.RF_IPv4_UC} + case bgp.RF_IPv6_UC: + rfList = []bgp.RouteFamily{bgp.RF_IPv6_UC} + default: + rfList = []bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC} + } + l := make([]*ROA, 0) + for _, rf := range rfList { + if tree, ok := c.Roas[rf]; ok { + tree.Walk(func(s string, v interface{}) bool { + b, _ := v.(*roaBucket) + var roaList roas + for _, r := range b.entries { + roaList = append(roaList, r) + } + sort.Sort(roaList) + for _, roa := range roaList { + l = append(l, roa) + } + return false + }) } - return &GrpcResponse{Data: l} } - return nil + return l, nil } func validatePath(ownAs uint32, tree *radix.Tree, cidr string, asPath *bgp.PathAttributeAsPath) config.RpkiValidationResultType { diff --git a/server/server.go b/server/server.go index 4caa38b2..5d04d365 100644 --- a/server/server.go +++ b/server/server.go @@ -850,25 +850,6 @@ func (s *BgpServer) StartZebraClient(x *config.Zebra) (err error) { return err } -func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) error { - for _, s := range c { - ch := make(chan *GrpcResponse) - server.GrpcReqCh <- &GrpcRequest{ - RequestType: REQ_ADD_RPKI, - Data: &api.AddRpkiRequest{ - Address: s.Config.Address, - Port: s.Config.Port, - Lifetime: s.Config.RecordLifetime, - }, - ResponseCh: ch, - } - if err := (<-ch).Err(); err != nil { - return err - } - } - return nil -} - func (s *BgpServer) AddBmp(c *config.BmpServerConfig) (err error) { ch := make(chan struct{}) defer func() { <-ch }() @@ -1418,12 +1399,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { } grpcReq.ResponseCh <- &GrpcResponse{Data: &api.SoftResetNeighborResponse{}} close(grpcReq.ResponseCh) - case REQ_ADD_RPKI, REQ_DELETE_RPKI, REQ_ENABLE_RPKI, REQ_DISABLE_RPKI, REQ_RESET_RPKI, REQ_SOFT_RESET_RPKI: - server.handleModRpki(grpcReq) - case REQ_ROA, REQ_GET_RPKI: - rsp := server.roaManager.handleGRPC(grpcReq) - grpcReq.ResponseCh <- rsp - close(grpcReq.ResponseCh) default: err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType) goto ERROR @@ -2635,30 +2610,100 @@ func (s *BgpServer) ValidateRib(prefix string) (err error) { return err } -func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) { - done := func(grpcReq *GrpcRequest, data interface{}, e error) { - result := &GrpcResponse{ - ResponseErr: e, - Data: data, - } - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) +func (s *BgpServer) GetRpki() (l []*config.RpkiServer, err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + l = s.roaManager.GetServers() + } + return l, err +} + +func (s *BgpServer) GetRoa(family bgp.RouteFamily) (l []*ROA, err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + l, err = s.roaManager.GetRoa(family) + } + return l, err +} + +func (s *BgpServer) AddRpki(c *config.RpkiServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.roaManager.AddServer(net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))), c.RecordLifetime) + } + return err +} + +func (s *BgpServer) DeleteRpki(c *config.RpkiServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.roaManager.DeleteServer(c.Address) + } + return err +} + +func (s *BgpServer) EnableRpki(c *config.RpkiServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.roaManager.Enable(c.Address) + } + return err +} + +func (s *BgpServer) DisableRpki(c *config.RpkiServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.roaManager.Disable(c.Address) } + return err +} - switch arg := grpcReq.Data.(type) { - case *api.AddRpkiRequest: - done(grpcReq, &api.AddRpkiResponse{}, server.roaManager.AddServer(net.JoinHostPort(arg.Address, strconv.Itoa(int(arg.Port))), arg.Lifetime)) - case *api.DeleteRpkiRequest: - done(grpcReq, &api.DeleteRpkiResponse{}, server.roaManager.DeleteServer(arg.Address)) - case *api.EnableRpkiRequest: - done(grpcReq, &api.EnableRpkiResponse{}, server.roaManager.Enable(arg.Address)) - case *api.DisableRpkiRequest: - done(grpcReq, &api.DisableRpkiResponse{}, server.roaManager.Disable(arg.Address)) - case *api.ResetRpkiRequest: - done(grpcReq, &api.ResetRpkiResponse{}, server.roaManager.Reset(arg.Address)) - case *api.SoftResetRpkiRequest: - done(grpcReq, &api.SoftResetRpkiResponse{}, server.roaManager.SoftReset(arg.Address)) +func (s *BgpServer) ResetRpki(c *config.RpkiServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.roaManager.Reset(c.Address) } + return err +} + +func (s *BgpServer) SoftResetRpki(c *config.RpkiServerConfig) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + err = s.roaManager.SoftReset(c.Address) + } + return err } type WatchEventType string -- cgit v1.2.3