summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/rpki.go
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-07-07 13:48:38 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-07-07 20:44:25 +0900
commitc4775c42510d1f1ddd55036dc19e982712fa6a0b (patch)
tree6ec8b61d4338c809e239e3003a2d32d480898e22 /pkg/server/rpki.go
parentb3079759aa13172fcb548a83da9a9653d8d5fed4 (diff)
follow Standard Go Project Layout
https://github.com/golang-standards/project-layout Now you can see clearly what are private and public library code. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'pkg/server/rpki.go')
-rw-r--r--pkg/server/rpki.go712
1 files changed, 712 insertions, 0 deletions
diff --git a/pkg/server/rpki.go b/pkg/server/rpki.go
new file mode 100644
index 00000000..606b18ab
--- /dev/null
+++ b/pkg/server/rpki.go
@@ -0,0 +1,712 @@
+// Copyright (C) 2015,2016 Nippon Telegraph and Telephone Corporation.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "net"
+ "sort"
+ "strconv"
+ "time"
+
+ "github.com/osrg/gobgp/internal/pkg/config"
+ "github.com/osrg/gobgp/internal/pkg/table"
+ "github.com/osrg/gobgp/pkg/packet/bgp"
+ "github.com/osrg/gobgp/pkg/packet/rtr"
+
+ "github.com/armon/go-radix"
+ log "github.com/sirupsen/logrus"
+ "golang.org/x/net/context"
+)
+
+const (
+ CONNECT_RETRY_INTERVAL = 30
+)
+
+func before(a, b uint32) bool {
+ return int32(a-b) < 0
+}
+
+type RoaBucket struct {
+ Prefix *table.IPPrefix
+ entries []*table.ROA
+}
+
+func (r *RoaBucket) GetEntries() []*table.ROA {
+ return r.entries
+}
+
+type roas []*table.ROA
+
+func (r roas) Len() int {
+ return len(r)
+}
+
+func (r roas) Swap(i, j int) {
+ r[i], r[j] = r[j], r[i]
+}
+
+func (r roas) Less(i, j int) bool {
+ r1 := r[i]
+ r2 := r[j]
+
+ if r1.MaxLen < r2.MaxLen {
+ return true
+ } else if r1.MaxLen > r2.MaxLen {
+ return false
+ }
+
+ if r1.AS < r2.AS {
+ return true
+ }
+ return false
+}
+
+type ROAEventType uint8
+
+const (
+ CONNECTED ROAEventType = iota
+ DISCONNECTED
+ RTR
+ LIFETIMEOUT
+)
+
+type ROAEvent struct {
+ EventType ROAEventType
+ Src string
+ Data []byte
+ conn *net.TCPConn
+}
+
+type roaManager struct {
+ AS uint32
+ Roas map[bgp.RouteFamily]*radix.Tree
+ eventCh chan *ROAEvent
+ clientMap map[string]*roaClient
+}
+
+func NewROAManager(as uint32) (*roaManager, error) {
+ m := &roaManager{
+ AS: as,
+ Roas: make(map[bgp.RouteFamily]*radix.Tree),
+ }
+ m.Roas[bgp.RF_IPv4_UC] = radix.New()
+ m.Roas[bgp.RF_IPv6_UC] = radix.New()
+ m.eventCh = make(chan *ROAEvent)
+ m.clientMap = make(map[string]*roaClient)
+ return m, nil
+}
+
+func (c *roaManager) enabled() bool {
+ return len(c.clientMap) != 0
+}
+
+func (m *roaManager) SetAS(as uint32) error {
+ if m.AS != 0 {
+ return fmt.Errorf("AS was already configured")
+ }
+ m.AS = as
+ return nil
+}
+
+func (m *roaManager) AddServer(host string, lifetime int64) error {
+ address, port, err := net.SplitHostPort(host)
+ if err != nil {
+ return err
+ }
+ if lifetime == 0 {
+ lifetime = 3600
+ }
+ if _, ok := m.clientMap[host]; ok {
+ return fmt.Errorf("ROA server exists %s", host)
+ }
+ m.clientMap[host] = NewRoaClient(address, port, m.eventCh, lifetime)
+ return nil
+}
+
+func (m *roaManager) DeleteServer(host string) error {
+ client, ok := m.clientMap[host]
+ if !ok {
+ return fmt.Errorf("ROA server doesn't exists %s", host)
+ }
+ client.stop()
+ m.deleteAllROA(host)
+ delete(m.clientMap, host)
+ return nil
+}
+
+func (m *roaManager) deleteAllROA(network string) {
+ for _, tree := range m.Roas {
+ deleteKeys := make([]string, 0, tree.Len())
+ tree.Walk(func(s string, v interface{}) bool {
+ b, _ := v.(*RoaBucket)
+ newEntries := make([]*table.ROA, 0, len(b.entries))
+ for _, r := range b.entries {
+ if r.Src != network {
+ newEntries = append(newEntries, r)
+ }
+ }
+ if len(newEntries) > 0 {
+ b.entries = newEntries
+ } else {
+ deleteKeys = append(deleteKeys, s)
+ }
+ return false
+ })
+ for _, key := range deleteKeys {
+ tree.Delete(key)
+ }
+ }
+}
+
+func (m *roaManager) Enable(address string) error {
+ for network, client := range m.clientMap {
+ add, _, _ := net.SplitHostPort(network)
+ if add == address {
+ client.enable(client.serialNumber)
+ return nil
+ }
+ }
+ return fmt.Errorf("ROA server not found %s", address)
+}
+
+func (m *roaManager) Disable(address string) error {
+ for network, client := range m.clientMap {
+ add, _, _ := net.SplitHostPort(network)
+ if add == address {
+ client.reset()
+ m.deleteAllROA(add)
+ return nil
+ }
+ }
+ return fmt.Errorf("ROA server not found %s", address)
+}
+
+func (m *roaManager) Reset(address string) error {
+ return m.Disable(address)
+}
+
+func (m *roaManager) SoftReset(address string) error {
+ for network, client := range m.clientMap {
+ add, _, _ := net.SplitHostPort(network)
+ if add == address {
+ client.softReset()
+ m.deleteAllROA(network)
+ return nil
+ }
+ }
+ return fmt.Errorf("ROA server not found %s", address)
+}
+
+func (c *roaManager) ReceiveROA() chan *ROAEvent {
+ return c.eventCh
+}
+
+func (c *roaClient) lifetimeout() {
+ c.eventCh <- &ROAEvent{
+ EventType: LIFETIMEOUT,
+ Src: c.host,
+ }
+}
+
+func (m *roaManager) HandleROAEvent(ev *ROAEvent) {
+ client, y := m.clientMap[ev.Src]
+ if !y {
+ if ev.EventType == CONNECTED {
+ ev.conn.Close()
+ }
+ log.WithFields(log.Fields{"Topic": "rpki"}).Errorf("Can't find %s ROA server configuration", ev.Src)
+ return
+ }
+ switch ev.EventType {
+ case DISCONNECTED:
+ log.WithFields(log.Fields{"Topic": "rpki"}).Infof("ROA server %s is disconnected", ev.Src)
+ client.state.Downtime = time.Now().Unix()
+ // clear state
+ client.endOfData = false
+ client.pendingROAs = make([]*table.ROA, 0)
+ client.state.RpkiMessages = config.RpkiMessages{}
+ client.conn = nil
+ go client.tryConnect()
+ client.timer = time.AfterFunc(time.Duration(client.lifetime)*time.Second, client.lifetimeout)
+ client.oldSessionID = client.sessionID
+ case CONNECTED:
+ log.WithFields(log.Fields{"Topic": "rpki"}).Infof("ROA server %s is connected", ev.Src)
+ client.conn = ev.conn
+ client.state.Uptime = time.Now().Unix()
+ go client.established()
+ case RTR:
+ m.handleRTRMsg(client, &client.state, ev.Data)
+ case LIFETIMEOUT:
+ // a) already reconnected but hasn't received
+ // EndOfData -> needs to delete stale ROAs
+ // b) not reconnected -> needs to delete stale ROAs
+ //
+ // c) already reconnected and received EndOfData so
+ // all stale ROAs were deleted -> timer was cancelled
+ // so should not be here.
+ if client.oldSessionID != client.sessionID {
+ log.WithFields(log.Fields{"Topic": "rpki"}).Infof("Reconnected to %s. Ignore timeout", client.host)
+ } else {
+ log.WithFields(log.Fields{"Topic": "rpki"}).Infof("Deleting all ROAs due to timeout with:%s", client.host)
+ m.deleteAllROA(client.host)
+ }
+ }
+}
+
+func (m *roaManager) roa2tree(roa *table.ROA) (*radix.Tree, string) {
+ tree := m.Roas[bgp.RF_IPv4_UC]
+ if roa.Family == bgp.AFI_IP6 {
+ tree = m.Roas[bgp.RF_IPv6_UC]
+ }
+ return tree, table.IpToRadixkey(roa.Prefix.Prefix, roa.Prefix.Length)
+}
+
+func (m *roaManager) deleteROA(roa *table.ROA) {
+ tree, key := m.roa2tree(roa)
+ b, _ := tree.Get(key)
+ if b != nil {
+ bucket := b.(*RoaBucket)
+ newEntries := make([]*table.ROA, 0, len(bucket.entries))
+ for _, r := range bucket.entries {
+ if !r.Equal(roa) {
+ newEntries = append(newEntries, r)
+ }
+ }
+ if len(newEntries) != len(bucket.entries) {
+ bucket.entries = newEntries
+ if len(newEntries) == 0 {
+ tree.Delete(key)
+ }
+ return
+ }
+ }
+ log.WithFields(log.Fields{
+ "Topic": "rpki",
+ "Prefix": roa.Prefix.Prefix.String(),
+ "Prefix Length": roa.Prefix.Length,
+ "AS": roa.AS,
+ "Max Length": roa.MaxLen,
+ }).Info("Can't withdraw a ROA")
+}
+
+func (m *roaManager) DeleteROA(roa *table.ROA) {
+ m.deleteROA(roa)
+}
+
+func (m *roaManager) addROA(roa *table.ROA) {
+ tree, key := m.roa2tree(roa)
+ b, _ := tree.Get(key)
+ var bucket *RoaBucket
+ if b == nil {
+ bucket = &RoaBucket{
+ Prefix: roa.Prefix,
+ entries: make([]*table.ROA, 0),
+ }
+ tree.Insert(key, bucket)
+ } else {
+ bucket = b.(*RoaBucket)
+ for _, r := range bucket.entries {
+ if r.Equal(roa) {
+ // we already have the same one
+ return
+ }
+ }
+ }
+ bucket.entries = append(bucket.entries, roa)
+}
+
+func (m *roaManager) AddROA(roa *table.ROA) {
+ m.addROA(roa)
+}
+
+func (c *roaManager) handleRTRMsg(client *roaClient, state *config.RpkiServerState, buf []byte) {
+ received := &state.RpkiMessages.RpkiReceived
+
+ m, err := rtr.ParseRTR(buf)
+ if err == nil {
+ switch msg := m.(type) {
+ case *rtr.RTRSerialNotify:
+ if before(client.serialNumber, msg.RTRCommon.SerialNumber) {
+ client.enable(client.serialNumber)
+ } else if client.serialNumber == msg.RTRCommon.SerialNumber {
+ // nothing
+ } else {
+ // should not happen. try to get the whole ROAs.
+ client.softReset()
+ }
+ received.SerialNotify++
+ case *rtr.RTRSerialQuery:
+ case *rtr.RTRResetQuery:
+ case *rtr.RTRCacheResponse:
+ received.CacheResponse++
+ client.endOfData = false
+ case *rtr.RTRIPPrefix:
+ family := bgp.AFI_IP
+ if msg.Type == rtr.RTR_IPV4_PREFIX {
+ received.Ipv4Prefix++
+ } else {
+ family = bgp.AFI_IP6
+ received.Ipv6Prefix++
+ }
+ roa := table.NewROA(family, msg.Prefix, msg.PrefixLen, msg.MaxLen, msg.AS, client.host)
+ if (msg.Flags & 1) == 1 {
+ if client.endOfData {
+ c.addROA(roa)
+ } else {
+ client.pendingROAs = append(client.pendingROAs, roa)
+ }
+ } else {
+ c.deleteROA(roa)
+ }
+ case *rtr.RTREndOfData:
+ received.EndOfData++
+ if client.sessionID != msg.RTRCommon.SessionID {
+ // remove all ROAs related with the
+ // previous session
+ c.deleteAllROA(client.host)
+ }
+ client.sessionID = msg.RTRCommon.SessionID
+ client.serialNumber = msg.RTRCommon.SerialNumber
+ client.endOfData = true
+ if client.timer != nil {
+ client.timer.Stop()
+ client.timer = nil
+ }
+ for _, roa := range client.pendingROAs {
+ c.addROA(roa)
+ }
+ client.pendingROAs = make([]*table.ROA, 0)
+ case *rtr.RTRCacheReset:
+ client.softReset()
+ received.CacheReset++
+ case *rtr.RTRErrorReport:
+ received.Error++
+ }
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "rpki",
+ "Host": client.host,
+ "Error": err,
+ }).Info("Failed to parse an RTR message")
+ }
+}
+
+func (c *roaManager) GetServers() []*config.RpkiServer {
+ f := func(tree *radix.Tree) (map[string]uint32, map[string]uint32) {
+ records := make(map[string]uint32)
+ prefixes := make(map[string]uint32)
+
+ tree.Walk(func(s string, v interface{}) bool {
+ b, _ := v.(*RoaBucket)
+ tmpRecords := make(map[string]uint32)
+ for _, roa := range b.entries {
+ tmpRecords[roa.Src]++
+ }
+
+ for src, r := range tmpRecords {
+ if r > 0 {
+ records[src] += r
+ prefixes[src]++
+ }
+ }
+ return false
+ })
+ return records, prefixes
+ }
+
+ recordsV4, prefixesV4 := f(c.Roas[bgp.RF_IPv4_UC])
+ recordsV6, prefixesV6 := f(c.Roas[bgp.RF_IPv6_UC])
+
+ l := make([]*config.RpkiServer, 0, len(c.clientMap))
+ for _, client := range c.clientMap {
+ state := &client.state
+
+ if client.conn == nil {
+ state.Up = false
+ } else {
+ state.Up = true
+ }
+ f := func(m map[string]uint32, key string) uint32 {
+ if r, ok := m[key]; ok {
+ return r
+ }
+ return 0
+ }
+ state.RecordsV4 = f(recordsV4, client.host)
+ state.RecordsV6 = f(recordsV6, client.host)
+ state.PrefixesV4 = f(prefixesV4, client.host)
+ state.PrefixesV6 = f(prefixesV6, client.host)
+ state.SerialNumber = client.serialNumber
+
+ addr, port, _ := net.SplitHostPort(client.host)
+ l = append(l, &config.RpkiServer{
+ Config: config.RpkiServerConfig{
+ Address: addr,
+ // Note: RpkiServerConfig.Port is uint32 type, but the TCP/UDP
+ // port is 16-bit length.
+ Port: func() uint32 { p, _ := strconv.ParseUint(port, 10, 16); return uint32(p) }(),
+ },
+ State: client.state,
+ })
+ }
+ return l
+}
+
+func (c *roaManager) GetRoa(family bgp.RouteFamily) ([]*table.ROA, error) {
+ if len(c.clientMap) == 0 {
+ return []*table.ROA{}, fmt.Errorf("RPKI server isn't configured.")
+ }
+ var rfList []bgp.RouteFamily
+ switch family {
+ case bgp.RF_IPv4_UC:
+ rfList = []bgp.RouteFamily{bgp.RF_IPv4_UC}
+ case bgp.RF_IPv6_UC:
+ rfList = []bgp.RouteFamily{bgp.RF_IPv6_UC}
+ default:
+ rfList = []bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC}
+ }
+ l := make([]*table.ROA, 0)
+ for _, rf := range rfList {
+ if tree, ok := c.Roas[rf]; ok {
+ tree.Walk(func(s string, v interface{}) bool {
+ b, _ := v.(*RoaBucket)
+ var roaList roas
+ for _, r := range b.entries {
+ roaList = append(roaList, r)
+ }
+ sort.Sort(roaList)
+ for _, roa := range roaList {
+ l = append(l, roa)
+ }
+ return false
+ })
+ }
+ }
+ return l, nil
+}
+
+func ValidatePath(ownAs uint32, tree *radix.Tree, cidr string, asPath *bgp.PathAttributeAsPath) *table.Validation {
+ var as uint32
+
+ validation := &table.Validation{
+ Status: config.RPKI_VALIDATION_RESULT_TYPE_NOT_FOUND,
+ Reason: table.RPKI_VALIDATION_REASON_TYPE_NONE,
+ Matched: make([]*table.ROA, 0),
+ UnmatchedLength: make([]*table.ROA, 0),
+ UnmatchedAs: make([]*table.ROA, 0),
+ }
+
+ if asPath == nil || len(asPath.Value) == 0 {
+ as = ownAs
+ } else {
+ param := asPath.Value[len(asPath.Value)-1]
+ switch param.GetType() {
+ case bgp.BGP_ASPATH_ATTR_TYPE_SEQ:
+ asList := param.GetAS()
+ if len(asList) == 0 {
+ as = ownAs
+ } else {
+ as = asList[len(asList)-1]
+ }
+ case bgp.BGP_ASPATH_ATTR_TYPE_CONFED_SET, bgp.BGP_ASPATH_ATTR_TYPE_CONFED_SEQ:
+ as = ownAs
+ default:
+ return validation
+ }
+ }
+ _, n, _ := net.ParseCIDR(cidr)
+ ones, _ := n.Mask.Size()
+ prefixLen := uint8(ones)
+ key := table.IpToRadixkey(n.IP, prefixLen)
+ _, b, _ := tree.LongestPrefix(key)
+ if b == nil {
+ return validation
+ }
+
+ var bucket *RoaBucket
+ fn := radix.WalkFn(func(k string, v interface{}) bool {
+ bucket, _ = v.(*RoaBucket)
+ for _, r := range bucket.entries {
+ if prefixLen <= r.MaxLen {
+ if r.AS != 0 && r.AS == as {
+ validation.Matched = append(validation.Matched, r)
+ } else {
+ validation.UnmatchedAs = append(validation.UnmatchedAs, r)
+ }
+ } else {
+ validation.UnmatchedLength = append(validation.UnmatchedLength, r)
+ }
+ }
+ return false
+ })
+ tree.WalkPath(key, fn)
+
+ if len(validation.Matched) != 0 {
+ validation.Status = config.RPKI_VALIDATION_RESULT_TYPE_VALID
+ validation.Reason = table.RPKI_VALIDATION_REASON_TYPE_NONE
+ } else if len(validation.UnmatchedAs) != 0 {
+ validation.Status = config.RPKI_VALIDATION_RESULT_TYPE_INVALID
+ validation.Reason = table.RPKI_VALIDATION_REASON_TYPE_AS
+ } else if len(validation.UnmatchedLength) != 0 {
+ validation.Status = config.RPKI_VALIDATION_RESULT_TYPE_INVALID
+ validation.Reason = table.RPKI_VALIDATION_REASON_TYPE_LENGTH
+ } else {
+ validation.Status = config.RPKI_VALIDATION_RESULT_TYPE_NOT_FOUND
+ validation.Reason = table.RPKI_VALIDATION_REASON_TYPE_NONE
+ }
+
+ return validation
+}
+
+func (c *roaManager) validate(path *table.Path) *table.Validation {
+ if len(c.clientMap) == 0 || path.IsWithdraw || path.IsEOR() {
+ // RPKI isn't enabled or invalid path
+ return nil
+ }
+ if tree, ok := c.Roas[path.GetRouteFamily()]; ok {
+ return ValidatePath(c.AS, tree, path.GetNlri().String(), path.GetAsPath())
+ }
+ return nil
+}
+
+type roaClient struct {
+ host string
+ conn *net.TCPConn
+ state config.RpkiServerState
+ eventCh chan *ROAEvent
+ sessionID uint16
+ oldSessionID uint16
+ serialNumber uint32
+ timer *time.Timer
+ lifetime int64
+ endOfData bool
+ pendingROAs []*table.ROA
+ cancelfnc context.CancelFunc
+ ctx context.Context
+}
+
+func NewRoaClient(address, port string, ch chan *ROAEvent, lifetime int64) *roaClient {
+ ctx, cancel := context.WithCancel(context.Background())
+ c := &roaClient{
+ host: net.JoinHostPort(address, port),
+ eventCh: ch,
+ lifetime: lifetime,
+ pendingROAs: make([]*table.ROA, 0),
+ ctx: ctx,
+ cancelfnc: cancel,
+ }
+ go c.tryConnect()
+ return c
+}
+
+func (c *roaClient) enable(serial uint32) error {
+ if c.conn != nil {
+ r := rtr.NewRTRSerialQuery(c.sessionID, serial)
+ data, _ := r.Serialize()
+ _, err := c.conn.Write(data)
+ if err != nil {
+ return err
+ }
+ c.state.RpkiMessages.RpkiSent.SerialQuery++
+ }
+ return nil
+}
+
+func (c *roaClient) softReset() error {
+ if c.conn != nil {
+ r := rtr.NewRTRResetQuery()
+ data, _ := r.Serialize()
+ _, err := c.conn.Write(data)
+ if err != nil {
+ return err
+ }
+ c.state.RpkiMessages.RpkiSent.ResetQuery++
+ c.endOfData = false
+ c.pendingROAs = make([]*table.ROA, 0)
+ }
+ return nil
+}
+
+func (c *roaClient) reset() {
+ if c.conn != nil {
+ c.conn.Close()
+ }
+}
+
+func (c *roaClient) stop() {
+ c.cancelfnc()
+ c.reset()
+}
+
+func (c *roaClient) tryConnect() {
+ for {
+ select {
+ case <-c.ctx.Done():
+ return
+ default:
+ }
+ if conn, err := net.Dial("tcp", c.host); err != nil {
+ // better to use context with timeout
+ time.Sleep(CONNECT_RETRY_INTERVAL * time.Second)
+ } else {
+ c.eventCh <- &ROAEvent{
+ EventType: CONNECTED,
+ Src: c.host,
+ conn: conn.(*net.TCPConn),
+ }
+ return
+ }
+ }
+}
+
+func (c *roaClient) established() (err error) {
+ defer func() {
+ c.conn.Close()
+ c.eventCh <- &ROAEvent{
+ EventType: DISCONNECTED,
+ Src: c.host,
+ }
+ }()
+
+ if err := c.softReset(); err != nil {
+ return err
+ }
+
+ for {
+ header := make([]byte, rtr.RTR_MIN_LEN)
+ if _, err = io.ReadFull(c.conn, header); err != nil {
+ return err
+ }
+ totalLen := binary.BigEndian.Uint32(header[4:8])
+ if totalLen < rtr.RTR_MIN_LEN {
+ return fmt.Errorf("too short header length %v", totalLen)
+ }
+
+ body := make([]byte, totalLen-rtr.RTR_MIN_LEN)
+ if _, err = io.ReadFull(c.conn, body); err != nil {
+ return
+ }
+
+ c.eventCh <- &ROAEvent{
+ EventType: RTR,
+ Src: c.host,
+ Data: append(header, body...),
+ }
+ }
+}