summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/rpki.go214
-rw-r--r--server/rpki_test.go26
-rw-r--r--server/server.go18
3 files changed, 183 insertions, 75 deletions
diff --git a/server/rpki.go b/server/rpki.go
index e4350f6e..f0006f20 100644
--- a/server/rpki.go
+++ b/server/rpki.go
@@ -24,8 +24,10 @@ import (
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
"github.com/osrg/gobgp/table"
+ "gopkg.in/tomb.v2"
"net"
"strconv"
+ "strings"
"time"
)
@@ -36,22 +38,67 @@ type roaBucket struct {
}
type roa struct {
+ Src string
MaxLen uint8
AS []uint32
}
-type roaClient struct {
- AS uint32
- roas map[bgp.RouteFamily]*radix.Tree
- outgoing chan []byte
- config config.RpkiServers
+const (
+ CONNECTED uint8 = iota
+ DISCONNECTED
+ RTR
+)
+
+type roaClientEvent struct {
+ eventType uint8
+ src string
+ conn *net.TCPConn
+ data []byte
+}
+
+type roaManager struct {
+ AS uint32
+ roas map[bgp.RouteFamily]*radix.Tree
+ config config.RpkiServers
+ eventCh chan *roaClientEvent
+ clientMap map[string]*roaClient
}
-func (c *roaClient) recieveROA() chan []byte {
- return c.outgoing
+func (c *roaManager) recieveROA() chan *roaClientEvent {
+ return c.eventCh
+}
+
+func (m *roaManager) handleROAEvent(ev *roaClientEvent) {
+ client, y := m.clientMap[ev.src]
+ if !y {
+ if ev.eventType == CONNECTED {
+ ev.conn.Close()
+ }
+ log.Error("can't find %s roa server configuration", ev.src)
+ return
+ }
+ switch ev.eventType {
+ case DISCONNECTED:
+ log.Info("roa server is disconnected, ", ev.src)
+ client.state.Downtime = time.Now().Unix()
+ // clear state
+ client.state.RpkiMessages = config.RpkiMessages{}
+ client.conn.Close()
+ client.conn = nil
+ client.t = tomb.Tomb{}
+ client.t.Go(client.tryConnect)
+ case CONNECTED:
+ log.Info("roa server is connected, ", ev.src)
+ client.conn = ev.conn
+ client.state.Uptime = time.Now().Unix()
+ client.t = tomb.Tomb{}
+ client.t.Go(client.established)
+ case RTR:
+ m.handleRTRMsg(ev.src, client.state, ev.data)
+ }
}
-func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) {
+func addROA(host string, tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8) {
key := table.IpToRadixkey(prefix, prefixLen)
b, _ := tree.Get(key)
if b == nil {
@@ -61,6 +108,7 @@ func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8)
r := &roa{
AS: []uint32{as},
MaxLen: maxLen,
+ Src: host,
}
b := &roaBucket{
@@ -74,7 +122,7 @@ func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8)
bucket := b.(*roaBucket)
found := false
for _, r := range bucket.entries {
- if r.MaxLen == maxLen {
+ if r.MaxLen == maxLen && r.Src == host {
found = true
r.AS = append(r.AS, as)
}
@@ -83,14 +131,15 @@ func addROA(tree *radix.Tree, as uint32, prefix []byte, prefixLen, maxLen uint8)
r := &roa{
MaxLen: maxLen,
AS: []uint32{as},
+ Src: host,
}
bucket.entries = append(bucket.entries, r)
}
}
}
-func (c *roaClient) handleRTRMsg(buf []byte) {
- received := &c.config.RpkiServerList[0].RpkiServerState.RpkiMessages.RpkiReceived
+func (c *roaManager) handleRTRMsg(host string, state *config.RpkiServerState, buf []byte) {
+ received := &state.RpkiMessages.RpkiReceived
m, _ := bgp.ParseRTR(buf)
if m != nil {
@@ -110,7 +159,7 @@ func (c *roaClient) handleRTRMsg(buf []byte) {
received.Ipv6Prefix++
tree = c.roas[bgp.RF_IPv6_UC]
}
- addROA(tree, msg.AS, msg.Prefix, msg.PrefixLen, msg.MaxLen)
+ addROA(host, tree, msg.AS, msg.Prefix, msg.PrefixLen, msg.MaxLen)
case *bgp.RTREndOfData:
received.EndOfData++
case *bgp.RTRCacheReset:
@@ -122,20 +171,33 @@ func (c *roaClient) handleRTRMsg(buf []byte) {
}
}
-func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) {
+func splitHostPort(network string) (host string, port int) {
+ if strings.HasPrefix(network, "[") {
+ l := strings.Split(network, "]:")
+ port, _ := strconv.Atoi(l[1])
+ return l[0][1:], port
+ } else {
+ l := strings.Split(network, ":")
+ port, _ := strconv.Atoi(l[1])
+ return l[0], port
+ }
+}
+
+func (c *roaManager) handleGRPC(grpcReq *GrpcRequest) {
switch grpcReq.RequestType {
case REQ_RPKI:
results := make([]*GrpcResponse, 0)
- for _, s := range c.config.RpkiServerList {
- state := &s.RpkiServerState
+ for _, client := range c.clientMap {
+ state := client.state
+ received := &state.RpkiMessages.RpkiReceived
rpki := &api.RPKI{
Conf: &api.RPKIConf{
- Address: s.RpkiServerConfig.Address.String(),
+ Address: client.addr,
},
State: &api.RPKIState{
Uptime: state.Uptime,
- ReceivedIpv4: int32(c.roas[bgp.RF_IPv4_UC].Len()),
- ReceivedIpv6: int32(c.roas[bgp.RF_IPv6_UC].Len()),
+ ReceivedIpv4: received.Ipv4Prefix,
+ ReceivedIpv6: received.Ipv6Prefix,
},
}
result := &GrpcResponse{}
@@ -145,13 +207,12 @@ func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) {
go sendMultipleResponses(grpcReq, results)
case REQ_ROA:
- if len(c.config.RpkiServerList) == 0 {
+ if len(c.clientMap) == 0 {
result := &GrpcResponse{}
result.ResponseErr = fmt.Errorf("RPKI server isn't configured.")
grpcReq.ResponseCh <- result
break
}
- conf := c.config.RpkiServerList[0].RpkiServerConfig
results := make([]*GrpcResponse, 0)
var rfList []bgp.RouteFamily
switch grpcReq.RouteFamily {
@@ -169,14 +230,15 @@ func (c *roaClient) handleGRPC(grpcReq *GrpcRequest) {
for _, r := range b.entries {
for _, as := range r.AS {
result := &GrpcResponse{}
+ host, port := splitHostPort(r.Src)
result.Data = &api.ROA{
As: as,
Maxlen: uint32(r.MaxLen),
Prefixlen: uint32(b.PrefixLen),
Prefix: b.Prefix.String(),
Conf: &api.RPKIConf{
- Address: conf.Address.String(),
- RemotePort: conf.Port,
+ Address: host,
+ RemotePort: uint32(port),
},
}
results = append(results, result)
@@ -239,7 +301,7 @@ func validatePath(ownAs uint32, tree *radix.Tree, cidr string, asPath *bgp.PathA
}
}
-func (c *roaClient) validate(pathList []*table.Path) {
+func (c *roaManager) validate(pathList []*table.Path) {
if c.roas[bgp.RF_IPv4_UC].Len() == 0 && c.roas[bgp.RF_IPv6_UC].Len() == 0 {
return
}
@@ -250,50 +312,96 @@ func (c *roaClient) validate(pathList []*table.Path) {
}
}
-func newROAClient(as uint32, conf config.RpkiServers) (*roaClient, error) {
- var url string
+type roaClient struct {
+ t tomb.Tomb
+ host string
+ addr string
+ conn *net.TCPConn
+ state *config.RpkiServerState
+ eventCh chan *roaClientEvent
+}
- c := &roaClient{
- AS: as,
- roas: make(map[bgp.RouteFamily]*radix.Tree),
- config: conf,
+func (c *roaClient) kill() {
+ c.t.Kill(nil)
+ if c.conn != nil {
+ c.conn.Close()
}
- c.roas[bgp.RF_IPv4_UC] = radix.New()
- c.roas[bgp.RF_IPv6_UC] = radix.New()
+}
- if len(conf.RpkiServerList) == 0 {
- return c, nil
- } else {
- if len(conf.RpkiServerList) > 1 {
- log.Warn("currently only one RPKI server is supposed")
+func (c *roaClient) tryConnect() error {
+ for c.t.Alive() {
+ conn, err := net.Dial("tcp", c.host)
+ if err != nil {
+ time.Sleep(30 * time.Second)
+ } else {
+ c.eventCh <- &roaClientEvent{
+ eventType: CONNECTED,
+ src: c.host,
+ conn: conn.(*net.TCPConn),
+ }
+ return nil
}
- c := conf.RpkiServerList[0].RpkiServerConfig
- url = net.JoinHostPort(c.Address.String(), strconv.Itoa(int(c.Port)))
}
+ return nil
+}
- conn, err := net.Dial("tcp", url)
- if err != nil {
- return c, err
+func (c *roaClient) established() error {
+ defer c.conn.Close()
+
+ disconnected := func() {
+ c.eventCh <- &roaClientEvent{
+ eventType: DISCONNECTED,
+ src: c.host,
+ }
}
- state := &conf.RpkiServerList[0].RpkiServerState
- state.Uptime = time.Now().Unix()
r := bgp.NewRTRResetQuery()
data, _ := r.Serialize()
- conn.Write(data)
- state.RpkiMessages.RpkiSent.ResetQuery++
- reader := bufio.NewReader(conn)
+ _, err := c.conn.Write(data)
+ if err != nil {
+ disconnected()
+ return nil
+ }
+
+ c.state.RpkiMessages.RpkiSent.ResetQuery++
+
+ reader := bufio.NewReader(c.conn)
scanner := bufio.NewScanner(reader)
scanner.Split(bgp.SplitRTR)
- ch := make(chan []byte)
- c.outgoing = ch
+ for scanner.Scan() {
+ c.eventCh <- &roaClientEvent{
+ eventType: RTR,
+ src: c.host,
+ data: scanner.Bytes(),
+ }
+ }
+ disconnected()
+ return nil
+}
+
+func newROAManager(as uint32, conf config.RpkiServers) (*roaManager, error) {
+ m := &roaManager{
+ AS: as,
+ roas: make(map[bgp.RouteFamily]*radix.Tree),
+ config: conf,
+ }
+ m.roas[bgp.RF_IPv4_UC] = radix.New()
+ m.roas[bgp.RF_IPv6_UC] = radix.New()
+ m.eventCh = make(chan *roaClientEvent)
+ m.clientMap = make(map[string]*roaClient)
- go func(ch chan []byte) {
- for scanner.Scan() {
- ch <- scanner.Bytes()
+ for _, entry := range conf.RpkiServerList {
+ c := entry.RpkiServerConfig
+ client := &roaClient{
+ host: net.JoinHostPort(c.Address.String(), strconv.Itoa(int(c.Port))),
+ addr: c.Address.String(),
+ eventCh: m.eventCh,
+ state: &entry.RpkiServerState,
}
- }(ch)
+ m.clientMap[client.host] = client
+ client.t.Go(client.tryConnect)
+ }
- return c, nil
+ return m, nil
}
diff --git a/server/rpki_test.go b/server/rpki_test.go
index bb2cf088..ca620d20 100644
--- a/server/rpki_test.go
+++ b/server/rpki_test.go
@@ -60,8 +60,8 @@ func TestValidate0(t *testing.T) {
assert := assert.New(t)
tree := radix.New()
- addROA(tree, 100, net.ParseIP("192.168.0.0").To4(), 24, 32)
- addROA(tree, 200, net.ParseIP("192.168.0.0").To4(), 24, 24)
+ addROA("", tree, 100, net.ParseIP("192.168.0.0").To4(), 24, 32)
+ addROA("", tree, 200, net.ParseIP("192.168.0.0").To4(), 24, 24)
var r config.RpkiValidationResultType
@@ -88,7 +88,7 @@ func TestValidate1(t *testing.T) {
assert := assert.New(t)
tree := radix.New()
- addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16)
+ addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16)
var r config.RpkiValidationResultType
@@ -117,7 +117,7 @@ func TestValidate3(t *testing.T) {
assert := assert.New(t)
tree1 := radix.New()
- addROA(tree1, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16)
+ addROA("", tree1, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16)
var r config.RpkiValidationResultType
@@ -128,7 +128,7 @@ func TestValidate3(t *testing.T) {
assert.Equal(r, config.RPKI_VALIDATION_RESULT_TYPE_INVALID)
tree2 := radix.New()
- addROA(tree2, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24)
+ addROA("", tree2, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24)
r = validateOne(tree2, "10.0.0.0/17", "65000")
assert.Equal(r, config.RPKI_VALIDATION_RESULT_TYPE_VALID)
@@ -138,8 +138,8 @@ func TestValidate4(t *testing.T) {
assert := assert.New(t)
tree := radix.New()
- addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16)
- addROA(tree, 65001, net.ParseIP("10.0.0.0").To4(), 16, 16)
+ addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 16)
+ addROA("", tree, 65001, net.ParseIP("10.0.0.0").To4(), 16, 16)
var r config.RpkiValidationResultType
@@ -154,8 +154,8 @@ func TestValidate5(t *testing.T) {
assert := assert.New(t)
tree := radix.New()
- addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 17, 17)
- addROA(tree, 65000, net.ParseIP("10.0.128.0").To4(), 17, 17)
+ addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 17, 17)
+ addROA("", tree, 65000, net.ParseIP("10.0.128.0").To4(), 17, 17)
var r config.RpkiValidationResultType
@@ -167,7 +167,7 @@ func TestValidate6(t *testing.T) {
assert := assert.New(t)
tree := radix.New()
- addROA(tree, 0, net.ParseIP("10.0.0.0").To4(), 8, 32)
+ addROA("", tree, 0, net.ParseIP("10.0.0.0").To4(), 8, 32)
var r config.RpkiValidationResultType
@@ -185,7 +185,7 @@ func TestValidate7(t *testing.T) {
assert := assert.New(t)
tree := radix.New()
- addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24)
+ addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24)
var r config.RpkiValidationResultType
@@ -203,8 +203,8 @@ func TestValidate8(t *testing.T) {
assert := assert.New(t)
tree := radix.New()
- addROA(tree, 0, net.ParseIP("10.0.0.0").To4(), 16, 24)
- addROA(tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24)
+ addROA("", tree, 0, net.ParseIP("10.0.0.0").To4(), 16, 24)
+ addROA("", tree, 65000, net.ParseIP("10.0.0.0").To4(), 16, 24)
var r config.RpkiValidationResultType
diff --git a/server/server.go b/server/server.go
index 7f3e5756..c5875299 100644
--- a/server/server.go
+++ b/server/server.go
@@ -94,7 +94,7 @@ type BgpServer struct {
neighborMap map[string]*Peer
globalRib *table.TableManager
zclient *zebra.Client
- roaClient *roaClient
+ roaManager *roaManager
bmpClient *bmpClient
bmpConnCh chan *bmpConn
shutdown bool
@@ -115,7 +115,7 @@ func NewBgpServer(port int) *BgpServer {
b.neighborMap = make(map[string]*Peer)
b.listenPort = port
b.watchers = make(map[watcherType]watcher)
- b.roaClient, _ = newROAClient(0, config.RpkiServers{})
+ b.roaManager, _ = newROAManager(0, config.RpkiServers{})
b.policy = table.NewRoutingPolicy()
return &b
}
@@ -168,7 +168,7 @@ func (server *BgpServer) Serve() {
}
server.bmpClient, _ = newBMPClient(config.BmpServers{BmpServerList: []config.BmpServer{}}, server.bmpConnCh)
- server.roaClient, _ = newROAClient(g.GlobalConfig.As, config.RpkiServers{})
+ server.roaManager, _ = newROAManager(g.GlobalConfig.As, config.RpkiServers{})
if g.Mrt.FileName != "" {
w, err := newMrtWatcher(g.Mrt.FileName)
@@ -321,7 +321,7 @@ func (server *BgpServer) Serve() {
select {
case c := <-server.rpkiConfigCh:
- server.roaClient, _ = newROAClient(server.bgpConfig.Global.GlobalConfig.As, c)
+ server.roaManager, _ = newROAManager(server.bgpConfig.Global.GlobalConfig.As, c)
case c := <-server.bmpConfigCh:
server.bmpClient, _ = newBMPClient(c, server.bmpConnCh)
case c := <-server.bmpConnCh:
@@ -344,8 +344,8 @@ func (server *BgpServer) Serve() {
msgList: bmpMsgList,
}
server.broadcastMsgs = append(server.broadcastMsgs, m)
- case rmsg := <-server.roaClient.recieveROA():
- server.roaClient.handleRTRMsg(rmsg)
+ case rmsg := <-server.roaManager.recieveROA():
+ server.roaManager.handleROAEvent(rmsg)
case zmsg := <-zapiMsgCh:
m := handleZapiMsg(zmsg, server)
if len(m) > 0 {
@@ -875,7 +875,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan *
msgs = append(msgs, newSenderMsg(peer, msgList))
}
if len(pathList) > 0 {
- server.roaClient.validate(pathList)
+ server.roaManager.validate(pathList)
m, altered := server.propagateUpdate(peer, pathList)
msgs = append(msgs, m...)
@@ -1821,7 +1821,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
case REQ_MOD_RPKI:
server.handleModRpki(grpcReq)
case REQ_ROA, REQ_RPKI:
- server.roaClient.handleGRPC(grpcReq)
+ server.roaManager.handleGRPC(grpcReq)
case REQ_VRF, REQ_VRFS, REQ_VRF_MOD:
pathList := server.handleVrfRequest(grpcReq)
if len(pathList) > 0 {
@@ -2397,7 +2397,7 @@ func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) {
r.RpkiServerConfig.Address = net.ParseIP(arg.Address)
r.RpkiServerConfig.Port = arg.Port
server.bgpConfig.RpkiServers.RpkiServerList = append(server.bgpConfig.RpkiServers.RpkiServerList, r)
- server.roaClient, _ = newROAClient(server.bgpConfig.Global.GlobalConfig.As, server.bgpConfig.RpkiServers)
+ server.roaManager, _ = newROAManager(server.bgpConfig.Global.GlobalConfig.As, server.bgpConfig.RpkiServers)
grpcDone(grpcReq, nil)
return
}