diff options
Diffstat (limited to 'server/rpki.go')
-rw-r--r-- | server/rpki.go | 214 |
1 files changed, 161 insertions, 53 deletions
diff --git a/server/rpki.go b/server/rpki.go index e4350f6e..f0006f20 100644 --- a/server/rpki.go +++ b/server/rpki.go @@ -24,8 +24,10 @@ import ( "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" + "gopkg.in/tomb.v2" "net" "strconv" + "strings" "time" ) @@ -36,22 +38,67 @@ type roaBucket struct { } type roa struct { + Src string MaxLen uint8 AS []uint32 } -type roaClient struct { - AS uint32 - roas map[bgp.RouteFamily]*radix.Tree - outgoing chan []byte - config config.RpkiServers +const ( + CONNECTED uint8 = iota + DISCONNECTED + RTR +) + +type roaClientEvent struct { + eventType uint8 + src string + conn *net.TCPConn + data []byte +} + +type roaManager struct { + AS uint32 + roas map[bgp.RouteFamily]*radix.Tree + config config.RpkiServers + eventCh chan *roaClientEvent + clientMap map[string]*roaClient } -func (c *roaClient) recieveROA() chan []byte { - return c.outgoing +func (c *roaManager) recieveROA() chan *roaClientEvent { + return c.eventCh +} + +func (m *roaManager) handleROAEvent(ev *roaClientEvent) { + client, y := m.clientMap[ev.src] + if !y { + if ev.eventType == CONNECTED { + ev.conn.Close() + } + log.Error("can't find %s roa server configuration", ev.src) + return + } + switch ev.eventType { + case DISCONNECTED: + log.Info("roa server is disconnected, ", ev.src) + client.state.Downtime = time.Now().Unix() + // clear state + client.state.RpkiMessages = config.RpkiMessages{} + client.conn.Close() + client.conn = nil + client.t = tomb.Tomb{} + client.t.Go(client.tryConnect) + case CONNECTED: + log.Info("roa server is connected, ", ev.src) + client.conn = ev.conn + client.state.Uptime = time.Now().Unix() + client.t = tomb.Tomb{} + client.t.Go(client.established) + case RTR: + m.handleRTRMsg(ev.src, client.state, ev.data) + } } -func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) { +func addROA(host string, tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) { key := table.IpToRadixkey(prefix, prefixLen) b, _ := tree.Get(key) if b == nil { @@ -61,6 +108,7 @@ func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) r := &roa{ AS: []uint32{as}, MaxLen: maxLen, + Src: host, } b := &roaBucket{ @@ -74,7 +122,7 @@ func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) bucket := b.(*roaBucket) found := false for _, r := range bucket.entries { - if r.MaxLen == maxLen { + if r.MaxLen == maxLen && r.Src == host { found = true r.AS = append(r.AS, as) } @@ -83,14 +131,15 @@ func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) r := &roa{ MaxLen: maxLen, AS: []uint32{as}, + Src: host, } bucket.entries = append(bucket.entries, r) } } } -func (c *roaClient) handleRTRMsg(buf []byte) { - received := &c.config.RpkiServerList[0].RpkiServerState.RpkiMessages.RpkiReceived +func (c *roaManager) handleRTRMsg(host string, state *config.RpkiServerState, buf []byte) { + received := &state.RpkiMessages.RpkiReceived m, _ := bgp.ParseRTR(buf) if m != nil { @@ -110,7 +159,7 @@ func (c *roaClient) handleRTRMsg(buf []byte) { received.Ipv6Prefix++ tree = c.roas[bgp.RF_IPv6_UC] } - addROA(tree, msg.AS, msg.Prefix, msg.PrefixLen, msg.MaxLen) + addROA(host, tree, msg.AS, msg.Prefix, msg.PrefixLen, msg.MaxLen) case *bgp.RTREndOfData: received.EndOfData++ case *bgp.RTRCacheReset: @@ -122,20 +171,33 @@ func (c *roaClient) handleRTRMsg(buf []byte) { } } -func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) { +func splitHostPort(network string) (host string, port int) { + if strings.HasPrefix(network, "[") { + l := strings.Split(network, "]:") + port, _ := strconv.Atoi(l[1]) + return l[0][1:], port + } else { + l := strings.Split(network, ":") + port, _ := strconv.Atoi(l[1]) + return l[0], port + } +} + +func (c *roaManager) handleGRPC(grpcReq *GrpcRequest) { switch grpcReq.RequestType { case REQ_RPKI: results := make([]*GrpcResponse, 0) - for _, s := range c.config.RpkiServerList { - state := &s.RpkiServerState + for _, client := range c.clientMap { + state := client.state + received := &state.RpkiMessages.RpkiReceived rpki := &api.RPKI{ Conf: &api.RPKIConf{ - Address: s.RpkiServerConfig.Address.String(), + Address: client.addr, }, State: &api.RPKIState{ Uptime: state.Uptime, - ReceivedIpv4: int32(c.roas[bgp.RF_IPv4_UC].Len()), - ReceivedIpv6: int32(c.roas[bgp.RF_IPv6_UC].Len()), + ReceivedIpv4: received.Ipv4Prefix, + ReceivedIpv6: received.Ipv6Prefix, }, } result := &GrpcResponse{} @@ -145,13 +207,12 @@ func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) { go sendMultipleResponses(grpcReq, results) case REQ_ROA: - if len(c.config.RpkiServerList) == 0 { + if len(c.clientMap) == 0 { result := &GrpcResponse{} result.ResponseErr = fmt.Errorf("RPKI server isn't configured.") grpcReq.ResponseCh <- result break } - conf := c.config.RpkiServerList[0].RpkiServerConfig results := make([]*GrpcResponse, 0) var rfList []bgp.RouteFamily switch grpcReq.RouteFamily { @@ -169,14 +230,15 @@ func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) { for _, r := range b.entries { for _, as := range r.AS { result := &GrpcResponse{} + host, port := splitHostPort(r.Src) result.Data = &api.ROA{ As: as, Maxlen: uint32(r.MaxLen), Prefixlen: uint32(b.PrefixLen), Prefix: b.Prefix.String(), Conf: &api.RPKIConf{ - Address: conf.Address.String(), - RemotePort: conf.Port, + Address: host, + RemotePort: uint32(port), }, } results = append(results, result) @@ -239,7 +301,7 @@ func validatePath(ownAs uint32, tree *radix.Tree, cidr string, asPath *bgp.PathA } } -func (c *roaClient) validate(pathList []*table.Path) { +func (c *roaManager) validate(pathList []*table.Path) { if c.roas[bgp.RF_IPv4_UC].Len() == 0 && c.roas[bgp.RF_IPv6_UC].Len() == 0 { return } @@ -250,50 +312,96 @@ func (c *roaClient) validate(pathList []*table.Path) { } } -func newROAClient(as uint32, conf config.RpkiServers) (*roaClient, error) { - var url string +type roaClient struct { + t tomb.Tomb + host string + addr string + conn *net.TCPConn + state *config.RpkiServerState + eventCh chan *roaClientEvent +} - c := &roaClient{ - AS: as, - roas: make(map[bgp.RouteFamily]*radix.Tree), - config: conf, +func (c *roaClient) kill() { + c.t.Kill(nil) + if c.conn != nil { + c.conn.Close() } - c.roas[bgp.RF_IPv4_UC] = radix.New() - c.roas[bgp.RF_IPv6_UC] = radix.New() +} - if len(conf.RpkiServerList) == 0 { - return c, nil - } else { - if len(conf.RpkiServerList) > 1 { - log.Warn("currently only one RPKI server is supposed") +func (c *roaClient) tryConnect() error { + for c.t.Alive() { + conn, err := net.Dial("tcp", c.host) + if err != nil { + time.Sleep(30 * time.Second) + } else { + c.eventCh <- &roaClientEvent{ + eventType: CONNECTED, + src: c.host, + conn: conn.(*net.TCPConn), + } + return nil } - c := conf.RpkiServerList[0].RpkiServerConfig - url = net.JoinHostPort(c.Address.String(), strconv.Itoa(int(c.Port))) } + return nil +} - conn, err := net.Dial("tcp", url) - if err != nil { - return c, err +func (c *roaClient) established() error { + defer c.conn.Close() + + disconnected := func() { + c.eventCh <- &roaClientEvent{ + eventType: DISCONNECTED, + src: c.host, + } } - state := &conf.RpkiServerList[0].RpkiServerState - state.Uptime = time.Now().Unix() r := bgp.NewRTRResetQuery() data, _ := r.Serialize() - conn.Write(data) - state.RpkiMessages.RpkiSent.ResetQuery++ - reader := bufio.NewReader(conn) + _, err := c.conn.Write(data) + if err != nil { + disconnected() + return nil + } + + c.state.RpkiMessages.RpkiSent.ResetQuery++ + + reader := bufio.NewReader(c.conn) scanner := bufio.NewScanner(reader) scanner.Split(bgp.SplitRTR) - ch := make(chan []byte) - c.outgoing = ch + for scanner.Scan() { + c.eventCh <- &roaClientEvent{ + eventType: RTR, + src: c.host, + data: scanner.Bytes(), + } + } + disconnected() + return nil +} + +func newROAManager(as uint32, conf config.RpkiServers) (*roaManager, error) { + m := &roaManager{ + AS: as, + roas: make(map[bgp.RouteFamily]*radix.Tree), + config: conf, + } + m.roas[bgp.RF_IPv4_UC] = radix.New() + m.roas[bgp.RF_IPv6_UC] = radix.New() + m.eventCh = make(chan *roaClientEvent) + m.clientMap = make(map[string]*roaClient) - go func(ch chan []byte) { - for scanner.Scan() { - ch <- scanner.Bytes() + for _, entry := range conf.RpkiServerList { + c := entry.RpkiServerConfig + client := &roaClient{ + host: net.JoinHostPort(c.Address.String(), strconv.Itoa(int(c.Port))), + addr: c.Address.String(), + eventCh: m.eventCh, + state: &entry.RpkiServerState, } - }(ch) + m.clientMap[client.host] = client + client.t.Go(client.tryConnect) + } - return c, nil + return m, nil } |