diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/rpki.go | 214 | ||||
-rw-r--r-- | server/rpki_test.go | 26 | ||||
-rw-r--r-- | server/server.go | 18 |
3 files changed, 183 insertions, 75 deletions
diff --git a/server/rpki.go b/server/rpki.go index e4350f6e..f0006f20 100644 --- a/server/rpki.go +++ b/server/rpki.go @@ -24,8 +24,10 @@ import ( "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" + "gopkg.in/tomb.v2" "net" "strconv" + "strings" "time" ) @@ -36,22 +38,67 @@ type roaBucket struct { } type roa struct { + Src string MaxLen uint8 AS []uint32 } -type roaClient struct { - AS uint32 - roas map[bgp.RouteFamily]*radix.Tree - outgoing chan []byte - config config.RpkiServers +const ( + CONNECTED uint8 = iota + DISCONNECTED + RTR +) + +type roaClientEvent struct { + eventType uint8 + src string + conn *net.TCPConn + data []byte +} + +type roaManager struct { + AS uint32 + roas map[bgp.RouteFamily]*radix.Tree + config config.RpkiServers + eventCh chan *roaClientEvent + clientMap map[string]*roaClient } -func (c *roaClient) recieveROA() chan []byte { - return c.outgoing +func (c *roaManager) recieveROA() chan *roaClientEvent { + return c.eventCh +} + +func (m *roaManager) handleROAEvent(ev *roaClientEvent) { + client, y := m.clientMap[ev.src] + if !y { + if ev.eventType == CONNECTED { + ev.conn.Close() + } + log.Error("can't find %s roa server configuration", ev.src) + return + } + switch ev.eventType { + case DISCONNECTED: + log.Info("roa server is disconnected, ", ev.src) + client.state.Downtime = time.Now().Unix() + // clear state + client.state.RpkiMessages = config.RpkiMessages{} + client.conn.Close() + client.conn = nil + client.t = tomb.Tomb{} + client.t.Go(client.tryConnect) + case CONNECTED: + log.Info("roa server is connected, ", ev.src) + client.conn = ev.conn + client.state.Uptime = time.Now().Unix() + client.t = tomb.Tomb{} + client.t.Go(client.established) + case RTR: + m.handleRTRMsg(ev.src, client.state, ev.data) + } } -func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) { +func addROA(host string, tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) { key := table.IpToRadixkey(prefix, prefixLen) b, _ := tree.Get(key) if b == nil { @@ -61,6 +108,7 @@ func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) r := &roa{ AS: []uint32{as}, MaxLen: maxLen, + Src: host, } b := &roaBucket{ @@ -74,7 +122,7 @@ func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) bucket := b.(*roaBucket) found := false for _, r := range bucket.entries { - if r.MaxLen == maxLen { + if r.MaxLen == maxLen && r.Src == host { found = true r.AS = append(r.AS, as) } @@ -83,14 +131,15 @@ func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) r := &roa{ MaxLen: maxLen, AS: []uint32{as}, + Src: host, } bucket.entries = append(bucket.entries, r) } } } -func (c *roaClient) handleRTRMsg(buf []byte) { - received := &c.config.RpkiServerList[0].RpkiServerState.RpkiMessages.RpkiReceived +func (c *roaManager) handleRTRMsg(host string, state *config.RpkiServerState, buf []byte) { + received := &state.RpkiMessages.RpkiReceived m, _ := bgp.ParseRTR(buf) if m != nil { @@ -110,7 +159,7 @@ func (c *roaClient) handleRTRMsg(buf []byte) { received.Ipv6Prefix++ tree = c.roas[bgp.RF_IPv6_UC] } - addROA(tree, msg.AS, msg.Prefix, msg.PrefixLen, msg.MaxLen) + addROA(host, tree, msg.AS, msg.Prefix, msg.PrefixLen, msg.MaxLen) case *bgp.RTREndOfData: received.EndOfData++ case *bgp.RTRCacheReset: @@ -122,20 +171,33 @@ func (c *roaClient) handleRTRMsg(buf []byte) { } } -func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) { +func splitHostPort(network string) (host string, port int) { + if strings.HasPrefix(network, "[") { + l := strings.Split(network, "]:") + port, _ := strconv.Atoi(l[1]) + return l[0][1:], port + } else { + l := strings.Split(network, ":") + port, _ := strconv.Atoi(l[1]) + return l[0], port + } +} + +func (c *roaManager) handleGRPC(grpcReq *GrpcRequest) { switch grpcReq.RequestType { case REQ_RPKI: results := make([]*GrpcResponse, 0) - for _, s := range c.config.RpkiServerList { - state := &s.RpkiServerState + for _, client := range c.clientMap { + state := client.state + received := &state.RpkiMessages.RpkiReceived rpki := &api.RPKI{ Conf: &api.RPKIConf{ - Address: s.RpkiServerConfig.Address.String(), + Address: client.addr, }, State: &api.RPKIState{ Uptime: state.Uptime, - ReceivedIpv4: int32(c.roas[bgp.RF_IPv4_UC].Len()), - ReceivedIpv6: int32(c.roas[bgp.RF_IPv6_UC].Len()), + ReceivedIpv4: received.Ipv4Prefix, + ReceivedIpv6: received.Ipv6Prefix, }, } result := &GrpcResponse{} @@ -145,13 +207,12 @@ func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) { go sendMultipleResponses(grpcReq, results) case REQ_ROA: - if len(c.config.RpkiServerList) == 0 { + if len(c.clientMap) == 0 { result := &GrpcResponse{} result.ResponseErr = fmt.Errorf("RPKI server isn't configured.") grpcReq.ResponseCh <- result break } - conf := c.config.RpkiServerList[0].RpkiServerConfig results := make([]*GrpcResponse, 0) var rfList []bgp.RouteFamily switch grpcReq.RouteFamily { @@ -169,14 +230,15 @@ func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) { for _, r := range b.entries { for _, as := range r.AS { result := &GrpcResponse{} + host, port := splitHostPort(r.Src) result.Data = &api.ROA{ As: as, Maxlen: uint32(r.MaxLen), Prefixlen: uint32(b.PrefixLen), Prefix: b.Prefix.String(), Conf: &api.RPKIConf{ - Address: conf.Address.String(), - RemotePort: conf.Port, + Address: host, + RemotePort: uint32(port), }, } results = append(results, result) @@ -239,7 +301,7 @@ func validatePath(ownAs uint32, tree *radix.Tree, cidr string, asPath *bgp.PathA } } -func (c *roaClient) validate(pathList []*table.Path) { +func (c *roaManager) validate(pathList []*table.Path) { if c.roas[bgp.RF_IPv4_UC].Len() == 0 && c.roas[bgp.RF_IPv6_UC].Len() == 0 { return } @@ -250,50 +312,96 @@ func (c *roaClient) validate(pathList []*table.Path) { } } -func newROAClient(as uint32, conf config.RpkiServers) (*roaClient, error) { - var url string +type roaClient struct { + t tomb.Tomb + host string + addr string + conn *net.TCPConn + state *config.RpkiServerState + eventCh chan *roaClientEvent +} - c := &roaClient{ - AS: as, - roas: make(map[bgp.RouteFamily]*radix.Tree), - config: conf, +func (c *roaClient) kill() { + c.t.Kill(nil) + if c.conn != nil { + c.conn.Close() } - c.roas[bgp.RF_IPv4_UC] = radix.New() - c.roas[bgp.RF_IPv6_UC] = radix.New() +} - if len(conf.RpkiServerList) == 0 { - return c, nil - } else { - if len(conf.RpkiServerList) > 1 { - log.Warn("currently only one RPKI server is supposed") +func (c *roaClient) tryConnect() error { + for c.t.Alive() { + conn, err := net.Dial("tcp", c.host) + if err != nil { + time.Sleep(30 * time.Second) + } else { + c.eventCh <- &roaClientEvent{ + eventType: CONNECTED, + src: c.host, + conn: conn.(*net.TCPConn), + } + return nil } - c := conf.RpkiServerList[0].RpkiServerConfig - url = net.JoinHostPort(c.Address.String(), strconv.Itoa(int(c.Port))) } + return nil +} - conn, err := net.Dial("tcp", url) - if err != nil { - return c, err +func (c *roaClient) established() error { + defer c.conn.Close() + + disconnected := func() { + c.eventCh <- &roaClientEvent{ + eventType: DISCONNECTED, + src: c.host, + } } - state := &conf.RpkiServerList[0].RpkiServerState - state.Uptime = time.Now().Unix() r := bgp.NewRTRResetQuery() data, _ := r.Serialize() - conn.Write(data) - state.RpkiMessages.RpkiSent.ResetQuery++ - reader := bufio.NewReader(conn) + _, err := c.conn.Write(data) + if err != nil { + disconnected() + return nil + } + + c.state.RpkiMessages.RpkiSent.ResetQuery++ + + reader := bufio.NewReader(c.conn) scanner := bufio.NewScanner(reader) scanner.Split(bgp.SplitRTR) - ch := make(chan []byte) - c.outgoing = ch + for scanner.Scan() { + c.eventCh <- &roaClientEvent{ + eventType: RTR, + src: c.host, + data: scanner.Bytes(), + } + } + disconnected() + return nil +} + +func newROAManager(as uint32, conf config.RpkiServers) (*roaManager, error) { + m := &roaManager{ + AS: as, + roas: make(map[bgp.RouteFamily]*radix.Tree), + config: conf, + } + m.roas[bgp.RF_IPv4_UC] = radix.New() + m.roas[bgp.RF_IPv6_UC] = radix.New() + m.eventCh = make(chan *roaClientEvent) + m.clientMap = make(map[string]*roaClient) - go func(ch chan []byte) { - for scanner.Scan() { - ch <- scanner.Bytes() + for _, entry := range conf.RpkiServerList { + c := entry.RpkiServerConfig + client := &roaClient{ + host: net.JoinHostPort(c.Address.String(), strconv.Itoa(int(c.Port))), + addr: c.Address.String(), + eventCh: m.eventCh, + state: &entry.RpkiServerState, } - }(ch) + m.clientMap[client.host] = client + client.t.Go(client.tryConnect) + } - return c, nil + return m, nil } diff --git a/server/rpki_test.go b/server/rpki_test.go index bb2cf088..ca620d20 100644 --- a/server/rpki_test.go +++ b/server/rpki_test.go @@ -60,8 +60,8 @@ func TestValidate0(t *testing.T) { assert := assert.New(t) tree := radix.New() - addROA(tree, 100, net.ParseIP("192.168.0.0").To4(), 24, 32) - addROA(tree, 200, net.ParseIP("192.168.0.0").To4(), 24, 24) + addROA("", tree, 100, net.ParseIP("192.168.0.0").To4(), 24, 32) + addROA("", tree, 200, net.ParseIP("192.168.0.0").To4(), 24, 24) var r config.RpkiValidationResultType @@ -88,7 +88,7 @@ func TestValidate1(t *testing.T) { assert := assert.New(t) tree := radix.New() - addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16) + addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16) var r config.RpkiValidationResultType @@ -117,7 +117,7 @@ func TestValidate3(t *testing.T) { assert := assert.New(t) tree1 := radix.New() - addROA(tree1, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16) + addROA("", tree1, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16) var r config.RpkiValidationResultType @@ -128,7 +128,7 @@ func TestValidate3(t *testing.T) { assert.Equal(r, config.RPKI_VALIDATION_RESULT_TYPE_INVALID) tree2 := radix.New() - addROA(tree2, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24) + addROA("", tree2, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24) r = validateOne(tree2, "10.0.0.0/17", "65000") assert.Equal(r, config.RPKI_VALIDATION_RESULT_TYPE_VALID) @@ -138,8 +138,8 @@ func TestValidate4(t *testing.T) { assert := assert.New(t) tree := radix.New() - addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16) - addROA(tree, 65001, net.ParseIP("10.0.0.0").To4(), 16, 16) + addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16) + addROA("", tree, 65001, net.ParseIP("10.0.0.0").To4(), 16, 16) var r config.RpkiValidationResultType @@ -154,8 +154,8 @@ func TestValidate5(t *testing.T) { assert := assert.New(t) tree := radix.New() - addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 17, 17) - addROA(tree, 65000, net.ParseIP("10.0.128.0").To4(), 17, 17) + addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 17, 17) + addROA("", tree, 65000, net.ParseIP("10.0.128.0").To4(), 17, 17) var r config.RpkiValidationResultType @@ -167,7 +167,7 @@ func TestValidate6(t *testing.T) { assert := assert.New(t) tree := radix.New() - addROA(tree, 0, net.ParseIP("10.0.0.0").To4(), 8, 32) + addROA("", tree, 0, net.ParseIP("10.0.0.0").To4(), 8, 32) var r config.RpkiValidationResultType @@ -185,7 +185,7 @@ func TestValidate7(t *testing.T) { assert := assert.New(t) tree := radix.New() - addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24) + addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24) var r config.RpkiValidationResultType @@ -203,8 +203,8 @@ func TestValidate8(t *testing.T) { assert := assert.New(t) tree := radix.New() - addROA(tree, 0, net.ParseIP("10.0.0.0").To4(), 16, 24) - addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24) + addROA("", tree, 0, net.ParseIP("10.0.0.0").To4(), 16, 24) + addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24) var r config.RpkiValidationResultType diff --git a/server/server.go b/server/server.go index 7f3e5756..c5875299 100644 --- a/server/server.go +++ b/server/server.go @@ -94,7 +94,7 @@ type BgpServer struct { neighborMap map[string]*Peer globalRib *table.TableManager zclient *zebra.Client - roaClient *roaClient + roaManager *roaManager bmpClient *bmpClient bmpConnCh chan *bmpConn shutdown bool @@ -115,7 +115,7 @@ func NewBgpServer(port int) *BgpServer { b.neighborMap = make(map[string]*Peer) b.listenPort = port b.watchers = make(map[watcherType]watcher) - b.roaClient, _ = newROAClient(0, config.RpkiServers{}) + b.roaManager, _ = newROAManager(0, config.RpkiServers{}) b.policy = table.NewRoutingPolicy() return &b } @@ -168,7 +168,7 @@ func (server *BgpServer) Serve() { } server.bmpClient, _ = newBMPClient(config.BmpServers{BmpServerList: []config.BmpServer{}}, server.bmpConnCh) - server.roaClient, _ = newROAClient(g.GlobalConfig.As, config.RpkiServers{}) + server.roaManager, _ = newROAManager(g.GlobalConfig.As, config.RpkiServers{}) if g.Mrt.FileName != "" { w, err := newMrtWatcher(g.Mrt.FileName) @@ -321,7 +321,7 @@ func (server *BgpServer) Serve() { select { case c := <-server.rpkiConfigCh: - server.roaClient, _ = newROAClient(server.bgpConfig.Global.GlobalConfig.As, c) + server.roaManager, _ = newROAManager(server.bgpConfig.Global.GlobalConfig.As, c) case c := <-server.bmpConfigCh: server.bmpClient, _ = newBMPClient(c, server.bmpConnCh) case c := <-server.bmpConnCh: @@ -344,8 +344,8 @@ func (server *BgpServer) Serve() { msgList: bmpMsgList, } server.broadcastMsgs = append(server.broadcastMsgs, m) - case rmsg := <-server.roaClient.recieveROA(): - server.roaClient.handleRTRMsg(rmsg) + case rmsg := <-server.roaManager.recieveROA(): + server.roaManager.handleROAEvent(rmsg) case zmsg := <-zapiMsgCh: m := handleZapiMsg(zmsg, server) if len(m) > 0 { @@ -875,7 +875,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan * msgs = append(msgs, newSenderMsg(peer, msgList)) } if len(pathList) > 0 { - server.roaClient.validate(pathList) + server.roaManager.validate(pathList) m, altered := server.propagateUpdate(peer, pathList) msgs = append(msgs, m...) @@ -1821,7 +1821,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { case REQ_MOD_RPKI: server.handleModRpki(grpcReq) case REQ_ROA, REQ_RPKI: - server.roaClient.handleGRPC(grpcReq) + server.roaManager.handleGRPC(grpcReq) case REQ_VRF, REQ_VRFS, REQ_VRF_MOD: pathList := server.handleVrfRequest(grpcReq) if len(pathList) > 0 { @@ -2397,7 +2397,7 @@ func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) { r.RpkiServerConfig.Address = net.ParseIP(arg.Address) r.RpkiServerConfig.Port = arg.Port server.bgpConfig.RpkiServers.RpkiServerList = append(server.bgpConfig.RpkiServers.RpkiServerList, r) - server.roaClient, _ = newROAClient(server.bgpConfig.Global.GlobalConfig.As, server.bgpConfig.RpkiServers) + server.roaManager, _ = newROAManager(server.bgpConfig.Global.GlobalConfig.As, server.bgpConfig.RpkiServers) grpcDone(grpcReq, nil) return } |